13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
62 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
64 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
65 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
67 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
70 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
73 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
76 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
93 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
95 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
96 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
98 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
101 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
104 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
106 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
166 template<
class T>
void Collect(std::vector<T> &reslist);
169 void FixLists(std::vector<TObject*> &lists);
191 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
193 using retType =
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
194 static_assert(std::is_constructible<TObject*, retType>::value,
195 "procFunc must return a pointer to a class inheriting from TObject,"
196 " and must take a reference to TTreeReader as the only argument");
200 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
206 unsigned nWorkers = GetNWorkers();
209 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
211 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
212 bool ok = Fork(worker);
214 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
219 if(fileNames.size() < nWorkers) {
221 fTaskType = ETask::kProcByRange;
223 fNToProcess = nWorkers*fileNames.size();
224 std::vector<unsigned> args(nWorkers);
225 std::iota(args.begin(), args.end(), 0);
227 if(fNProcessed < nWorkers)
228 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
231 fTaskType = ETask::kProcByFile;
232 fNToProcess = fileNames.size();
233 std::vector<unsigned> args(nWorkers);
234 std::iota(args.begin(), args.end(), 0);
236 if(fNProcessed < nWorkers)
237 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 std::vector<TObject*> reslist;
246 auto res = redfunc(reslist);
250 fTaskType = ETask::kNoTask;
251 return static_cast<retType
>(res);
258 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
260 std::vector<std::string> singleFileName(1, fileName);
261 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
268 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
270 std::vector<std::string> fileNames(files.GetNFiles());
272 for(
auto f : *
static_cast<THashList*
>(files.GetList()))
273 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
275 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
282 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
284 TObjArray* filelist = files.GetListOfFiles();
285 std::vector<std::string> fileNames(filelist->
GetEntries());
287 for(
auto f : *filelist)
288 fileNames[count++] =
f->GetTitle();
290 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
297 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
299 using retType =
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type;
300 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
310 unsigned nWorkers = GetNWorkers();
313 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
316 bool ok = Fork(worker);
318 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
323 fTaskType = ETask::kProcByRange;
326 fNToProcess = nWorkers;
327 std::vector<unsigned> args(nWorkers);
328 std::iota(args.begin(), args.end(), 0);
330 if(fNProcessed < nWorkers)
331 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
334 std::vector<TObject*> reslist;
339 auto res = redfunc(reslist);
343 fTaskType = ETask::kNoTask;
344 return static_cast<retType
>(res);
355 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
358 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
365 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
368 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
375 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
378 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
385 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
388 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
395 ->
typename std::result_of<
F(std::reference_wrapper<TTreeReader>)>
::type
398 return Process(
tree, procFunc, noelist, nToProcess, jFirst);
406 unsigned code = msg.first;
410 if(msg.second !=
nullptr)
411 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
414 const char *str = ReadBuffer<const char*>(msg.second.get());
415 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
416 "Continuing execution ignoring these entries.", str);
421 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
438 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
440 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned long long ULong64_t
void Error(const char *location, const char *msgfmt,...)
void Warning(const char *location, const char *msgfmt,...)
Merge collection of TObjects.
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
~TTreeProcessorMP()=default
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
Process a TTree dataset with a functor.
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
TTreeProcessorMP(unsigned nWorkers=0)
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
A List of entry numbers in a TTree or TChain.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Int_t GetEntries() const
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A TTree object has a header with a name and a title.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.
Namespace for new ROOT classes and functions.
static constexpr double s