13 #ifndef ROOT_TTreeProcessorMP 14 #define ROOT_TTreeProcessorMP 32 #include <type_traits> 62 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
64 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
65 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
67 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
70 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
73 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
76 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
93 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
95 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
96 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
98 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
101 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
104 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
106 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
166 template<
class T>
void Collect(std::vector<T> &reslist);
169 void FixLists(std::vector<TObject*> &lists);
191 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 193 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
194 static_assert(std::is_constructible<TObject*, retType>::value,
195 "procFunc must return a pointer to a class inheriting from TObject," 196 " and must take a reference to TTreeReader as the only argument");
200 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
209 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
211 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
212 bool ok =
Fork(worker);
214 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
219 if(fileNames.size() < nWorkers) {
224 std::vector<unsigned> args(nWorkers);
225 std::iota(args.begin(), args.end(), 0);
227 if(fNProcessed < nWorkers)
228 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
233 std::vector<unsigned> args(nWorkers);
234 std::iota(args.begin(), args.end(), 0);
236 if(fNProcessed < nWorkers)
237 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 std::vector<TObject*> reslist;
246 auto res = redfunc(reslist);
251 return static_cast<retType
>(res);
258 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 260 std::vector<std::string> singleFileName(1, fileName);
261 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
268 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 270 std::vector<std::string> fileNames(files.GetNFiles());
272 for(
auto f : *static_cast<THashList*>(files.GetList()))
273 fileNames[count++] =
static_cast<TFileInfo*
>(f)->GetCurrentUrl()->GetUrl();
275 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
282 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 284 TObjArray* filelist = files.GetListOfFiles();
285 std::vector<std::string> fileNames(filelist->
GetEntries());
287 for(
auto f : *filelist)
290 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
297 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 299 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
300 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
313 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
316 bool ok =
Fork(worker);
318 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
327 std::vector<unsigned> args(nWorkers);
328 std::iota(args.begin(), args.end(), 0);
330 if(fNProcessed < nWorkers)
331 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
334 std::vector<TObject*> reslist;
339 auto res = redfunc(reslist);
344 return static_cast<retType
>(res);
355 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 358 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
365 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 368 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
375 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 378 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
385 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 388 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
395 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 398 return Process(
tree, procFunc, noelist, nToProcess, jFirst);
406 unsigned code = msg.first;
410 if(msg.second !=
nullptr)
411 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
414 const char *str = ReadBuffer<const char*>(msg.second.get());
415 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 416 "Continuing execution ignoring these entries.", str);
421 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
438 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
440 }
else if (msg.first < 1000)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Namespace for new ROOT classes and functions.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
unsigned GetNWorkers() const
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
a Process method is being executed and each worker will process a different file
void Remove(TSocket *s)
Remove a certain socket from the monitor.
unsigned fNProcessed
number of arguments already passed to the workers
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
~TTreeProcessorMP()=default
void FixLists(std::vector< TObject *> &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
Merge collection of TObjects.
The message contains the result of the processing of a TTree.
void Reset()
Reset TTreeProcessorMP's state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
Process a TTree dataset with a functor.
void SetNWorkers(unsigned n)
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo...
TTreeProcessorMP(unsigned nWorkers=0)
a Process method is being executed and each worker will process a certain range of each file ...
unsigned fNToProcess
total number of arguments to pass to the workers
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
void Warning(const char *location, const char *msgfmt,...)
Used by the client to tell servers to shutdown.
unsigned GetNWorkers() const
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned long long ULong64_t
Base class for multiprocess applications' clients.
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Templated derivation of TMPWorkerTree handlign generic function tree processing.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
virtual const char * GetTitle() const
Returns title of object.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containing TTree objects.
Int_t GetEntries() const
Return the number of objects in array (i.e.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
A TTree object has a header with a name and a title.
Class describing a generic file including meta information.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Tell the client there was an error while processing.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A List of entry numbers in a TTree or TChain.
We are ready for the next task.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.