Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
distrdf002_dask_connection.py
Go to the documentation of this file.
1## \file
2## \ingroup tutorial_dataframe
3## \notebook -draw
4## Configure a Dask connection and fill two histograms distributedly.
5##
6## This tutorial shows the ingredients needed to setup the connection to a Dask
7## cluster (e.g. a `LocalCluster` for a single machine). After this initial
8## setup, an RDataFrame with distributed capabilities is created and connected
9## to a Dask `Client` instance. Finally, a couple of histograms are drawn from
10## the created columns in the dataset. Relevant documentation can be found at
11## http://distributed.dask.org/en/stable .
12##
13## \macro_code
14## \macro_image
15##
16## \date February 2022
17## \author Vincenzo Eduardo Padulano
18from dask.distributed import LocalCluster, Client
19import ROOT
20
21# Point RDataFrame calls to Dask RDataFrame object
22RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
23
24
25def create_connection():
26 """
27 Setup connection to a Dask cluster. Two ingredients are needed:
28 1. Creating a cluster object that represents computing resources. This can be
29 done in various ways depending on the type of resources at disposal. To use
30 only the local machine (e.g. your laptop), a `LocalCluster` object can be
31 used. This step can be skipped if you have access to an existing Dask
32 cluster; in that case, the cluster administrator should provide you with a
33 URL to connect to the cluster in step 2. More options for cluster creation
34 can be found in the Dask docs at
35 http://distributed.dask.org/en/stable/api.html#cluster .
36 2. Creating a Dask client object that connects to the cluster. This accepts
37 directly the object previously created. In case the cluster was setup
38 externally, you need to provide an endpoint URL to the client, e.g.
39 'https://myscheduler.domain:8786'.
40
41 Through Dask, you can connect to various types of cluster resources. For
42 example, you can connect together a set of machines through SSH and use them
43 to run your computations. This is done through the `SSHCluster` class. For
44 example:
45
46 ```python
47 from dask.distributed import SSHCluster
48 cluster = SSHCluster(
49 # A list with machine host names, the first name will be used as
50 # scheduler, following names will become workers.
51 hosts=["machine1","machine2","machine3"],
52 # A dictionary of options for each worker node, here we set the number
53 # of cores to be used on each node.
54 worker_options={"nprocs":4,},
55 )
56 ```
57
58 Another common usecase is interfacing Dask to a batch system like HTCondor or
59 Slurm. A separate package called dask-jobqueue (https://jobqueue.dask.org)
60 extends the available Dask cluster classes to enable running Dask computations
61 as batch jobs. In this case, the cluster object usually receives the parameters
62 that would be written in the job description file. For example:
63
64 ```python
65 from dask_jobqueue import HTCondorCluster
66 cluster = HTCondorCluster(
67 cores=1,
68 memory='2000MB',
69 disk='1000MB',
70 )
71 # Use the scale method to send as many jobs as needed
72 cluster.scale(4)
73 ```
74
75 In this tutorial, a cluster object is created for the local machine, using
76 multiprocessing (processes=True) on 2 workers (n_workers=2) each using only
77 1 core (threads_per_worker=1) and 2GiB of RAM (memory_limit="2GiB").
78 """
79 cluster = LocalCluster(n_workers=2, threads_per_worker=1, processes=True, memory_limit="2GiB")
80 client = Client(cluster)
81 return client
82
83
84# This tutorial uses Python multiprocessing, so the creation of the cluster
85# needs to be wrapped in the main clause as described in the Python docs
86# https://docs.python.org/3/library/multiprocessing.html
87if __name__ == "__main__":
88
89 # Create the connection to the mock Dask cluster on the local machine
90 connection = create_connection()
91 # Create an RDataFrame that will use Dask as a backend for computations
92 df = RDataFrame(1000, daskclient=connection)
93
94 # Set the random seed and define two columns of the dataset with random numbers.
95 ROOT.gRandom.SetSeed(1)
96 df_1 = df.Define("gaus", "gRandom->Gaus(10, 1)").Define("exponential", "gRandom->Exp(10)")
97
98 # Book an histogram for each column
99 h_gaus = df_1.Histo1D(("gaus", "Normal distribution", 50, 0, 30), "gaus")
100 h_exp = df_1.Histo1D(("exponential", "Exponential distribution", 50, 0, 30), "exponential")
101
102 # Plot the histograms side by side on a canvas
103 c = ROOT.TCanvas("distrdf002", "distrdf002", 800, 400)
104 c.Divide(2, 1)
105 c.cd(1)
106 h_gaus.DrawCopy()
107 c.cd(2)
108 h_exp.DrawCopy()
109
110 # Save the canvas
111 c.SaveAs("distrdf002_dask_connection.png")
112 print("Saved figure to distrdf002_dask_connection.png")