23from datetime
import datetime
27from dask.distributed
import Client
28from dask_lxplus
import CernCluster
31RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame
34def create_connection() -> Client:
36 Creates a connection to HTCondor cluster offered by the CERN batch service.
37 Returns a Dask client that RDataFrame will use to distribute computations.
43 cluster = CernCluster(
50 container_runtime=
'none',
53 'host': socket.gethostname(),
56 'MY.JobFlavour':
'"espresso"',
66 cluster.scale(n_workers)
72 client = Client(cluster)
79 print(f
"Waiting for {n_workers} workers to start.")
81 client.wait_for_workers(n_workers)
83 print(f
"All workers are ready, took {round(end - start, 2)} seconds.")
88def run_analysis(connection: Client) ->
None:
90 Run a simple example with RDataFrame, using the previously created
91 connection to the HTCondor cluster.
93 df = RDataFrame(10_000, daskclient=connection).Define(
94 "x",
"gRandom->Rndm() * 100")
101 print(f
"Dataset has {nentries.GetValue()} entries")
102 print(
"Column x stats:")
103 print(f
"\tmean: {meanv.GetValue()}")
104 print(f
"\tmax: {maxv.GetValue()}")
105 print(f
"\tmin: {minv.GetValue()}")
108if __name__ ==
"__main__":
109 connection = create_connection()
110 print(f
"Starting the computations at {datetime.now()}")
112 run_analysis(connection)
114 print(f
"Computations ended at {datetime.now()}, "
115 f
"took {round(end - start, 2)} seconds.")