13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
39 template <
typename F,
typename... Args>
65 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
TEntryList &entries,
68 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
TEntryList &entries,
96 template<
class F>
auto Process(
const std::vector<std::string>& fileNames,
F procFunc,
99 template<
class F>
auto Process(
const std::string& fileName,
F procFunc,
169 template<
class T>
void Collect(std::vector<T> &reslist);
172 void FixLists(std::vector<TObject*> &lists);
197 static_assert(std::is_constructible<TObject*, retType>::value,
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
203 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
212 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
215 bool ok =
Fork(worker);
217 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
222 if(fileNames.size() < nWorkers) {
227 std::vector<unsigned> args(nWorkers);
228 std::iota(args.begin(), args.end(), 0);
231 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
236 std::vector<unsigned> args(nWorkers);
237 std::iota(args.begin(), args.end(), 0);
240 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
244 std::vector<TObject*> reslist;
249 auto res = redfunc(reslist);
254 return static_cast<retType
>(res);
263 std::vector<std::string> singleFileName(1, fileName);
264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
273 std::vector<std::string> fileNames(files.GetNFiles());
275 for(
auto f : *
static_cast<THashList*
>(files.GetList()))
276 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
287 TObjArray* filelist = files.GetListOfFiles();
288 std::vector<std::string> fileNames(filelist->
GetEntries());
290 for(
auto f : *filelist)
291 fileNames[count++] =
f->GetTitle();
293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
303 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
307 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
316 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
319 bool ok =
Fork(worker);
321 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
330 std::vector<unsigned> args(nWorkers);
331 std::iota(args.begin(), args.end(), 0);
334 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
337 std::vector<TObject*> reslist;
342 auto res = redfunc(reslist);
347 return static_cast<retType
>(res);
361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
401 return Process(tree, procFunc, noelist, nToProcess, jFirst);
409 unsigned code = msg.first;
413 if(msg.second !=
nullptr)
414 reslist.push_back(std::move(
ReadBuffer<T>(msg.second.get())));
418 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
424 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
441 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
443 }
else if (msg.first < 1000)
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
T ReadBuffer(TBufferFile *buf)
One of the template functions used to read objects from messages.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Merge collection of TObjects.
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
~TTreeProcessorMP()=default
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects).
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
unsigned GetNWorkers() const
TMPClient(unsigned nWorkers=0)
Class constructor.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork().
void Remove(TSocket *s)
Remove a certain socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Int_t GetEntries() const override
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
This class implements client sockets.
A TTree represents a columnar dataset.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
@ kProcError
Tell the client there was an error while processing.
@ kProcResult
The message contains the result of the processing of a TTree.