Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
distrdf004_dask_lxbatch.py
Go to the documentation of this file.
1# \file
2# \ingroup tutorial_dataframe
3#
4# Configure a Dask connection to a HTCondor cluster hosted by the CERN batch
5# service. To reproduce this tutorial, run the following steps:
6#
7# 1. Login to lxplus
8# 2. Source an LCG release (minimum LCG104). See
9# https://lcgdocs.web.cern.ch/lcgdocs/lcgreleases/introduction/ for details
10# 3. Install the `dask_lxplus` package, which provides the `CernCluster` class
11# needed to properly connect to the CERN condor pools. See
12# https://batchdocs.web.cern.ch/specialpayload/dask.html for instructions
13# 4. Run this tutorial
14#
15# The tutorial defines resources that each job will request to the condor
16# scheduler, then creates a Dask client that can be used by RDataFrame to
17# distribute computations.
18#
19# \macro_code
20#
21# \date September 2023
22# \author Vincenzo Eduardo Padulano CERN
23from datetime import datetime
24import socket
25import time
26
27from dask.distributed import Client
28from dask_lxplus import CernCluster
29
30import ROOT
31
32
33def create_connection() -> Client:
34 """
35 Creates a connection to HTCondor cluster offered by the CERN batch service.
36 Returns a Dask client that RDataFrame will use to distribute computations.
37 """
38 # The resources described in the specified arguments to this class represent
39 # the submission of a single job and will spawn a single Dask worker when
40 # the condor scheduler launches the job. Specifically, this example has Dask
41 # workers each with 1 core and 2 GB of memory.
42 cluster = CernCluster(
43 cores=1,
44 memory='2000MB',
45 disk='1GB',
46 death_timeout='60',
47 lcg=True,
48 nanny=True,
49 container_runtime='none',
50 scheduler_options={
51 'port': 8786,
52 'host': socket.gethostname(),
53 },
54 job_extra={
55 'MY.JobFlavour': '"espresso"',
56 },
57 )
58
59 # The scale method allows to launch N jobs with the description above (thus
60 # N Dask workers). Calling this method on the cluster object launches the
61 # condor jobs (i.e. it is equivalent to `condor_submit myjob.sub`). In this
62 # example, two jobs are requested so two Dask workers will be eventually
63 # launched for a total of 2 cores.
64 n_workers = 2
65 cluster.scale(n_workers)
66
67 # The Dask client can be created after the condor jobs have been submitted.
68 # At this point, the jobs may or may not have actually started. Thus, it is
69 # not guaranteed that the application already has the requested resources
70 # available.
71 client = Client(cluster)
72
73 # It is possible to tell the Dask client to wait until the condor scheduler
74 # has started the requested jobs and launched the Dask workers.
75 # The client will wait until 'n_workers' workers have been launched. In this
76 # example, the client waits for all the jobs requested to start before
77 # continuing with the application.
78 print(f"Waiting for {n_workers} workers to start.")
79 start = time.time()
81 end = time.time()
82 print(f"All workers are ready, took {round(end - start, 2)} seconds.")
83
84 return client
85
86
87def run_analysis(connection: Client) -> None:
88 """
89 Run a simple example with RDataFrame, using the previously created
90 connection to the HTCondor cluster.
91 """
92 df = ROOT.RDataFrame(10_000, executor=connection).Define(
93 "x", "gRandom->Rndm() * 100")
94
95 nentries = df.Count()
96 meanv = df.Mean("x")
97 maxv = df.Max("x")
98 minv = df.Min("x")
99
100 print(f"Dataset has {nentries.GetValue()} entries")
101 print("Column x stats:")
102 print(f"\tmean: {meanv.GetValue()}")
103 print(f"\tmax: {maxv.GetValue()}")
104 print(f"\tmin: {minv.GetValue()}")
105
106
107if __name__ == "__main__":
108 connection = create_connection()
109 print(f"Starting the computations at {datetime.now()}")
110 start = time.time()
111 run_analysis(connection)
112 end = time.time()
113 print(f"Computations ended at {datetime.now()}, "
114 f"took {round(end - start, 2)} seconds.")
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...