13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
62 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
64 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
65 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
67 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
70 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
73 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
76 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
93 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
95 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
96 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
98 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
101 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
104 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
106 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
166 template<
class T>
void Collect(std::vector<T> &reslist);
169 void FixLists(std::vector<TObject*> &lists);
191 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
193 using retType =
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
194 static_assert(std::is_constructible<TObject*, retType>::value,
195 "procFunc must return a pointer to a class inheriting from TObject,"
196 " and must take a reference to TTreeReader as the only argument");
200 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
206 unsigned nWorkers = GetNWorkers();
209 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
211 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
212 bool ok = Fork(worker);
214 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
219 if(fileNames.size() < nWorkers) {
221 fTaskType = ETask::kProcByRange;
223 fNToProcess = nWorkers*fileNames.size();
224 std::vector<unsigned> args(nWorkers);
225 std::iota(args.begin(), args.end(), 0);
227 if(fNProcessed < nWorkers)
228 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
231 fTaskType = ETask::kProcByFile;
232 fNToProcess = fileNames.size();
233 std::vector<unsigned> args(nWorkers);
234 std::iota(args.begin(), args.end(), 0);
236 if(fNProcessed < nWorkers)
237 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 std::vector<TObject*> reslist;
246 auto res = redfunc(reslist);
250 fTaskType = ETask::kNoTask;
251 return static_cast<retType
>(res);
258 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
260 std::vector<std::string> singleFileName(1, fileName);
261 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
268 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
270 std::vector<std::string> fileNames(files.GetNFiles());
272 for(
auto f : *
static_cast<THashList*
>(files.GetList()))
273 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
275 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
282 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
284 TObjArray* filelist = files.GetListOfFiles();
285 std::vector<std::string> fileNames(filelist->
GetEntries());
287 for(
auto f : *filelist)
288 fileNames[count++] =
f->GetTitle();
290 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
297 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
299 using retType =
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
300 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
310 unsigned nWorkers = GetNWorkers();
313 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
316 bool ok = Fork(worker);
318 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
323 fTaskType = ETask::kProcByRange;
326 fNToProcess = nWorkers;
327 std::vector<unsigned> args(nWorkers);
328 std::iota(args.begin(), args.end(), 0);
330 if(fNProcessed < nWorkers)
331 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
334 std::vector<TObject*> reslist;
339 auto res = redfunc(reslist);
343 fTaskType = ETask::kNoTask;
344 return static_cast<retType
>(res);
355 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
358 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
365 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
368 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
375 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
378 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
385 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
388 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
395 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
398 return Process(
tree, procFunc, noelist, nToProcess, jFirst);
406 unsigned code = msg.first;
410 if(msg.second !=
nullptr)
411 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
414 const char *str = ReadBuffer<const char*>(msg.second.get());
415 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
416 "Continuing execution ignoring these entries.", str);
421 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
438 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
440 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned long long ULong64_t
void Error(const char *location, const char *msgfmt,...)
void Warning(const char *location, const char *msgfmt,...)
Merge collection of TObjects.
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
~TTreeProcessorMP()=default
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
Process a TTree dataset with a functor.
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
TTreeProcessorMP(unsigned nWorkers=0)
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Int_t GetEntries() const
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A TTree represents a columnar dataset.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static constexpr double s