13 #ifndef ROOT_TTreeProcessorMP 14 #define ROOT_TTreeProcessorMP 32 #include <type_traits> 62 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
64 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
65 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
67 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
70 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
73 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
76 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
93 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
95 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
96 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
98 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
101 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
104 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
106 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
166 template<
class T>
void Collect(std::vector<T> &reslist);
169 void FixLists(std::vector<TObject*> &lists);
191 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 193 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
194 static_assert(std::is_constructible<TObject*, retType>::value,
195 "procFunc must return a pointer to a class inheriting from TObject," 196 " and must take a reference to TTreeReader as the only argument");
200 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
209 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
211 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
212 bool ok =
Fork(worker);
214 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
219 if(fileNames.size() < nWorkers) {
224 std::vector<unsigned> args(nWorkers);
225 std::iota(args.begin(), args.end(), 0);
227 if(fNProcessed < nWorkers)
228 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
233 std::vector<unsigned> args(nWorkers);
234 std::iota(args.begin(), args.end(), 0);
236 if(fNProcessed < nWorkers)
237 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 std::vector<TObject*> reslist;
246 auto res = redfunc(reslist);
251 return static_cast<retType
>(res);
258 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 260 std::vector<std::string> singleFileName(1, fileName);
261 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
268 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 270 std::vector<std::string> fileNames(files.GetNFiles());
272 for(
auto f : *static_cast<THashList*>(files.GetList()))
273 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
275 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
282 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 284 TObjArray* filelist = files.GetListOfFiles();
285 std::vector<std::string> fileNames(filelist->
GetEntries());
287 for(
auto f : *filelist)
288 fileNames[count++] =
f->GetTitle();
290 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
297 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 299 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
300 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
313 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
316 bool ok =
Fork(worker);
318 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
327 std::vector<unsigned> args(nWorkers);
328 std::iota(args.begin(), args.end(), 0);
330 if(fNProcessed < nWorkers)
331 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
334 std::vector<TObject*> reslist;
339 auto res = redfunc(reslist);
344 return static_cast<retType
>(res);
355 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 358 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
365 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 368 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
375 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 378 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
385 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 388 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
395 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type 398 return Process(
tree, procFunc, noelist, nToProcess, jFirst);
406 unsigned code = msg.first;
410 if(msg.second !=
nullptr)
411 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
414 const char *str = ReadBuffer<const char*>(msg.second.get());
415 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 416 "Continuing execution ignoring these entries.", str);
421 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
438 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
440 }
else if (msg.first < 1000)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Namespace for new ROOT classes and functions.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
unsigned GetNWorkers() const
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
a Process method is being executed and each worker will process a different file
void Remove(TSocket *s)
Remove a certain socket from the monitor.
unsigned fNProcessed
number of arguments already passed to the workers
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
~TTreeProcessorMP()=default
void FixLists(std::vector< TObject *> &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
Merge collection of TObjects.
The message contains the result of the processing of a TTree.
void Reset()
Reset TTreeProcessorMP's state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
Process a TTree dataset with a functor.
void SetNWorkers(unsigned n)
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo...
TTreeProcessorMP(unsigned nWorkers=0)
a Process method is being executed and each worker will process a certain range of each file ...
unsigned fNToProcess
total number of arguments to pass to the workers
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
void Warning(const char *location, const char *msgfmt,...)
Used by the client to tell servers to shutdown.
unsigned GetNWorkers() const
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned long long ULong64_t
Base class for multiprocess applications' clients.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Templated derivation of TMPWorkerTree handlign generic function tree processing.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containing TTree objects.
Int_t GetEntries() const
Return the number of objects in array (i.e.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
A TTree object has a header with a name and a title.
Class describing a generic file including meta information.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Tell the client there was an error while processing.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
A List of entry numbers in a TTree or TChain.
We are ready for the next task.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.