Configure a Dask connection to a HTCondor cluster hosted by the CERN batch service.
The tutorial defines resources that each job will request to the condor scheduler, then creates a Dask client that can be used by RDataFrame to distribute computations.
from datetime import datetime
import socket
import time
from dask.distributed import Client
from dask_lxplus import CernCluster
import ROOT
def create_connection() -> Client:
"""
Creates a connection to HTCondor cluster offered by the CERN batch service.
Returns a Dask client that RDataFrame will use to distribute computations.
"""
cluster = CernCluster(
cores=1,
memory='2000MB',
disk='1GB',
death_timeout='60',
lcg=True,
nanny=True,
container_runtime='none',
scheduler_options={
'port': 8786,
'host': socket.gethostname(),
},
job_extra={
'MY.JobFlavour': '"espresso"',
},
)
n_workers = 2
cluster.scale(n_workers)
client = Client(cluster)
print(f"Waiting for {n_workers} workers to start.")
start = time.time()
client.wait_for_workers(n_workers)
end = time.time()
print(f"All workers are ready, took {round(end - start, 2)} seconds.")
return client
def run_analysis(connection: Client) -> None:
"""
Run a simple example with RDataFrame, using the previously created
connection to the HTCondor cluster.
"""
"x", "gRandom->Rndm() * 100")
nentries = df.Count()
meanv = df.Mean("x")
maxv = df.Max("x")
minv = df.Min("x")
print(f"Dataset has {nentries.GetValue()} entries")
print("Column x stats:")
print(f"\tmean: {meanv.GetValue()}")
print(f"\tmax: {maxv.GetValue()}")
print(f"\tmin: {minv.GetValue()}")
if __name__ == "__main__":
connection = create_connection()
print(f"Starting the computations at {datetime.now()}")
start = time.time()
run_analysis(connection)
end = time.time()
print(f"Computations ended at {datetime.now()}, "
f"took {round(end - start, 2)} seconds.")
ROOT's RDataFrame offers a modern, high-level interface for analysis of data stored in TTree ,...