Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
distrdf004_dask_lxbatch.py
Go to the documentation of this file.
1# \file
2# \ingroup tutorial_dataframe
3#
4# Configure a Dask connection to a HTCondor cluster hosted by the CERN batch
5# service. To reproduce this tutorial, run the following steps:
6#
7# 1. Login to lxplus
8# 2. Source an LCG release (minimum LCG104). See
9# https://lcgdocs.web.cern.ch/lcgdocs/lcgreleases/introduction/ for details
10# 3. Install the `dask_lxplus` package, which provides the `CernCluster` class
11# needed to properly connect to the CERN condor pools. See
12# https://batchdocs.web.cern.ch/specialpayload/dask.html for instructions
13# 4. Run this tutorial
14#
15# The tutorial defines resources that each job will request to the condor
16# scheduler, then creates a Dask client that can be used by RDataFrame to
17# distribute computations.
18#
19# \macro_code
20#
21# \date September 2023
22# \author Vincenzo Eduardo Padulano CERN
23from datetime import datetime
24import socket
25import time
26
27from dask.distributed import Client
28from dask_lxplus import CernCluster
29
30import ROOT
31RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
32
33
34def create_connection() -> Client:
35 """
36 Creates a connection to HTCondor cluster offered by the CERN batch service.
37 Returns a Dask client that RDataFrame will use to distribute computations.
38 """
39 # The resources described in the specified arguments to this class represent
40 # the submission of a single job and will spawn a single Dask worker when
41 # the condor scheduler launches the job. Specifically, this example has Dask
42 # workers each with 1 core and 2 GB of memory.
43 cluster = CernCluster(
44 cores=1,
45 memory='2000MB',
46 disk='1GB',
47 death_timeout='60',
48 lcg=True,
49 nanny=True,
50 container_runtime='none',
51 scheduler_options={
52 'port': 8786,
53 'host': socket.gethostname(),
54 },
55 job_extra={
56 'MY.JobFlavour': '"espresso"',
57 },
58 )
59
60 # The scale method allows to launch N jobs with the description above (thus
61 # N Dask workers). Calling this method on the cluster object launches the
62 # condor jobs (i.e. it is equivalent to `condor_submit myjob.sub`). In this
63 # example, two jobs are requested so two Dask workers will be eventually
64 # launched for a total of 2 cores.
65 n_workers = 2
66 cluster.scale(n_workers)
67
68 # The Dask client can be created after the condor jobs have been submitted.
69 # At this point, the jobs may or may not have actually started. Thus, it is
70 # not guaranteed that the application already has the requested resources
71 # available.
72 client = Client(cluster)
73
74 # It is possible to tell the Dask client to wait until the condor scheduler
75 # has started the requested jobs and launched the Dask workers.
76 # The client will wait until 'n_workers' workers have been launched. In this
77 # example, the client waits for all the jobs requested to start before
78 # continuing with the application.
79 print(f"Waiting for {n_workers} workers to start.")
80 start = time.time()
81 client.wait_for_workers(n_workers)
82 end = time.time()
83 print(f"All workers are ready, took {round(end - start, 2)} seconds.")
84
85 return client
86
87
88def run_analysis(connection: Client) -> None:
89 """
90 Run a simple example with RDataFrame, using the previously created
91 connection to the HTCondor cluster.
92 """
93 df = RDataFrame(10_000, daskclient=connection).Define(
94 "x", "gRandom->Rndm() * 100")
95
96 nentries = df.Count()
97 meanv = df.Mean("x")
98 maxv = df.Max("x")
99 minv = df.Min("x")
100
101 print(f"Dataset has {nentries.GetValue()} entries")
102 print("Column x stats:")
103 print(f"\tmean: {meanv.GetValue()}")
104 print(f"\tmax: {maxv.GetValue()}")
105 print(f"\tmin: {minv.GetValue()}")
106
107
108if __name__ == "__main__":
109 connection = create_connection()
110 print(f"Starting the computations at {datetime.now()}")
111 start = time.time()
112 run_analysis(connection)
113 end = time.time()
114 print(f"Computations ended at {datetime.now()}, "
115 f"took {round(end - start, 2)} seconds.")