13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
39 template <
typename F,
typename... Args>
65 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
68 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
96 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
99 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
169 template<
class T>
void Collect(std::vector<T> &reslist);
172 void FixLists(std::vector<TObject*> &lists);
197 static_assert(std::is_constructible<TObject*, retType>::value,
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
203 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
209 unsigned nWorkers = GetNWorkers();
212 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
215 bool ok = Fork(worker);
217 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
222 if(fileNames.size() < nWorkers) {
224 fTaskType = ETask::kProcByRange;
226 fNToProcess = nWorkers*fileNames.size();
227 std::vector<unsigned> args(nWorkers);
228 std::iota(args.begin(), args.end(), 0);
230 if(fNProcessed < nWorkers)
231 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
234 fTaskType = ETask::kProcByFile;
235 fNToProcess = fileNames.size();
236 std::vector<unsigned> args(nWorkers);
237 std::iota(args.begin(), args.end(), 0);
239 if(fNProcessed < nWorkers)
240 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
244 std::vector<TObject*> reslist;
249 auto res = redfunc(reslist);
253 fTaskType = ETask::kNoTask;
254 return static_cast<retType
>(res);
263 std::vector<std::string> singleFileName(1, fileName);
264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
273 std::vector<std::string> fileNames(files.GetNFiles());
275 for(
auto f : *
static_cast<THashList*
>(files.GetList()))
276 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
287 TObjArray* filelist = files.GetListOfFiles();
288 std::vector<std::string> fileNames(filelist->
GetEntries());
290 for(
auto f : *filelist)
291 fileNames[count++] =
f->GetTitle();
293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
303 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
307 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
313 unsigned nWorkers = GetNWorkers();
316 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
319 bool ok = Fork(worker);
321 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
326 fTaskType = ETask::kProcByRange;
329 fNToProcess = nWorkers;
330 std::vector<unsigned> args(nWorkers);
331 std::iota(args.begin(), args.end(), 0);
333 if(fNProcessed < nWorkers)
334 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
337 std::vector<TObject*> reslist;
342 auto res = redfunc(reslist);
346 fTaskType = ETask::kNoTask;
347 return static_cast<retType
>(res);
361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
401 return Process(tree, procFunc, noelist, nToProcess, jFirst);
409 unsigned code = msg.first;
413 if(msg.second !=
nullptr)
414 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
417 const char *str = ReadBuffer<const char*>(msg.second.get());
418 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
424 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
441 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
443 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned long long ULong64_t
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Merge collection of TObjects.
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
~TTreeProcessorMP()=default
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Int_t GetEntries() const override
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A TTree represents a columnar dataset.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...