13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
39 template <
typename F,
typename... Args>
65 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
68 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
96 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
99 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
169 template<
class T>
void Collect(std::vector<T> &reslist);
172 void FixLists(std::vector<TObject*> &lists);
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
203 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
209 unsigned nWorkers = GetNWorkers();
212 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
215 bool ok = Fork(worker);
217 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
222 if(fileNames.size() < nWorkers) {
224 fTaskType = ETask::kProcByRange;
226 fNToProcess = nWorkers*fileNames.size();
227 std::vector<unsigned> args(nWorkers);
228 std::iota(args.begin(), args.end(), 0);
230 if(fNProcessed < nWorkers)
231 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
234 fTaskType = ETask::kProcByFile;
235 fNToProcess = fileNames.size();
236 std::vector<unsigned> args(nWorkers);
237 std::iota(args.begin(), args.end(), 0);
239 if(fNProcessed < nWorkers)
240 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
244 std::vector<TObject*> reslist;
249 auto res = redfunc(reslist);
253 fTaskType = ETask::kNoTask;
254 return static_cast<retType
>(res);
263 std::vector<std::string> singleFileName(1, fileName);
264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
273 std::vector<std::string> fileNames(files.GetNFiles());
275 for(
auto f : *
static_cast<THashList*
>(files.GetList()))
276 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
287 TObjArray* filelist = files.GetListOfFiles();
288 std::vector<std::string> fileNames(filelist->
GetEntries());
290 for(
auto f : *filelist)
291 fileNames[count++] =
f->GetTitle();
293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
307 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
313 unsigned nWorkers = GetNWorkers();
316 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
319 bool ok = Fork(worker);
321 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
326 fTaskType = ETask::kProcByRange;
329 fNToProcess = nWorkers;
330 std::vector<unsigned> args(nWorkers);
331 std::iota(args.begin(), args.end(), 0);
333 if(fNProcessed < nWorkers)
334 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
337 std::vector<TObject*> reslist;
342 auto res = redfunc(reslist);
346 fTaskType = ETask::kNoTask;
347 return static_cast<retType
>(res);
361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
401 return Process(
tree, procFunc, noelist, nToProcess, jFirst);
409 unsigned code = msg.first;
413 if(msg.second !=
nullptr)
414 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
417 const char *str = ReadBuffer<const char*>(msg.second.get());
418 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
424 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
441 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
443 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned long long ULong64_t
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Merge collection of TObjects.
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
~TTreeProcessorMP()=default
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
std::result_of_t< F(Args...)> InvokeResult_t
An adapter for std::invoke_result that falls back to std::result_of if the former is not available.
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Int_t GetEntries() const override
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A TTree represents a columnar dataset.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
static constexpr double s