13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
39 template <
typename F,
typename... Args>
197 static_assert(std::is_constructible<TObject*, retType>::value,
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
203 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
212 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
217 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
224 fTaskType = ETask::kProcByRange;
227 std::vector<unsigned> args(
nWorkers);
228 std::iota(args.begin(), args.end(), 0);
231 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
234 fTaskType = ETask::kProcByFile;
236 std::vector<unsigned> args(
nWorkers);
237 std::iota(args.begin(), args.end(), 0);
240 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
253 fTaskType = ETask::kNoTask;
254 return static_cast<retType>(res);
303 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
307 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
316 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
321 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
326 fTaskType = ETask::kProcByRange;
330 std::vector<unsigned> args(
nWorkers);
331 std::iota(args.begin(), args.end(), 0);
334 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
346 fTaskType = ETask::kNoTask;
347 return static_cast<retType>(res);
409 unsigned code =
msg.first;
413 if(
msg.second !=
nullptr)
418 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
424 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
437 while (
mon.GetActive() > 0) {
441 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
443 }
else if (
msg.first < 1000)
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
unsigned long long ULong64_t
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Merge collection of TObjects.
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
~TTreeProcessorMP()=default
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A TTree represents a columnar dataset.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...