104 selector.
Begin(
nullptr);
107 TPoolPlayer worker(selector, &tree, nWorkers, nToProcess/nWorkers);
108 bool ok =
Fork(worker);
110 Error(
"TProcessExecutor::ProcTree",
"[E][C] Could not fork. Aborting operation");
119 std::vector<unsigned> args(nWorkers);
120 std::iota(args.begin(), args.end(), 0);
122 if (fNProcessed < nWorkers)
123 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers." 124 " Some entries might not be processed.");
127 std::vector<TObject*> outLists;
134 auto outList =
static_cast<TList*
>(redfunc(outLists));
137 for(
auto obj : *outList) {
140 outList->SetOwner(
false);
159 selector.
Begin(
nullptr);
162 TPoolPlayer worker(selector, fileNames, treeName, nWorkers, nToProcess);
163 bool ok =
Fork(worker);
165 Error(
"TProcessExecutor::ProcTree",
"[E][C] Could not fork. Aborting operation");
172 if (fileNames.size() < nWorkers) {
177 std::vector<unsigned> args(nWorkers);
178 std::iota(args.begin(), args.end(), 0);
180 if (fNProcessed < nWorkers)
181 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers." 182 " Some entries might not be processed");
187 std::vector<unsigned> args(nWorkers);
188 std::iota(args.begin(), args.end(), 0);
190 if (fNProcessed < nWorkers)
191 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers." 192 " Some entries might not be processed.");
199 std::vector<unsigned> args(nWorkers);
200 std::iota(args.begin(), args.end(), 0);
202 if (fNProcessed < nWorkers)
203 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers." 204 " Some entries might not be processed.");
208 std::vector<TObject*> outLists;
215 auto outList =
static_cast<TList*
>(redfunc(outLists));
218 for(
auto obj : *outList) {
221 outList->SetOwner(
false);
237 std::vector<std::string> fileNames(files.
GetNFiles());
239 for(
auto f : *static_cast<THashList*>(files.
GetList()))
240 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
242 TList *rl =
ProcTree(fileNames, selector, treeName, nToProcess);
251 std::vector<std::string> fileNames(filelist->
GetEntries());
253 for(
auto f : *filelist)
254 fileNames[count++] =
f->GetTitle();
256 return ProcTree(fileNames, selector, treeName, nToProcess);
263 std::vector<std::string> singleFileName(1, fileName);
264 return ProcTree(singleFileName, selector, treeName, nToProcess);
275 while ((o = nxo())) { firstlist->
Add(o); }
277 lists.erase(lists.begin());
278 lists.insert(lists.begin(), firstlist);
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned fNProcessed
number of arguments already passed to the workers
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
TObjArray * GetListOfFiles() const
a Map method with arguments is being executed
virtual void Begin(TTree *)
Merge collection of TObjects.
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
Long64_t GetNFiles() const
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Execute function with the argument contained in the message.
a ProcTree method is being executed and each worker will process a certain range of each file ...
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Execute function without arguments.
Used by the client to tell servers to shutdown.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Ask for a kFuncResult/kProcResult.
unsigned fNToProcess
total number of arguments to pass to the workers
unsigned long long ULong64_t
Base class for multiprocess applications' clients.
void Reset()
Reset TProcessExecutor's state.
a ProcTree method is being executed and each worker will process a different file ...
Mother of all ROOT objects.
virtual void Add(TObject *obj)
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
Int_t GetEntries() const
Return the number of objects in array (i.e.
A TTree object has a header with a name and a title.
Class describing a generic file including meta information.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
void FixLists(std::vector< TObject *> &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
unsigned GetNWorkers() const
virtual TList * GetOutputList() const
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.