103 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
110 selector.
Begin(
nullptr);
115 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
116 bool ok =
Fork(worker);
118 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
127 std::vector<UInt_t> args(nWorkers);
128 std::iota(args.begin(), args.end(), 0);
131 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
132 " Some entries might not be processed.");
135 std::vector<TObject*> outLists;
142 auto outList =
static_cast<TList*
>(redfunc(outLists));
166 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
173 selector.
Begin(
nullptr);
178 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
179 bool ok =
Fork(worker);
181 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
188 if (fileNames.size() < nWorkers) {
193 std::vector<UInt_t> args(nWorkers);
194 std::iota(args.begin(), args.end(), 0);
197 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
198 " Some entries might not be processed");
203 std::vector<UInt_t> args(nWorkers);
204 std::iota(args.begin(), args.end(), 0);
207 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
208 " Some entries might not be processed.");
215 std::vector<UInt_t> args(nWorkers);
216 std::iota(args.begin(), args.end(), 0);
219 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
220 " Some entries might not be processed.");
224 std::vector<TObject*> outLists;
231 auto outList =
static_cast<TList*
>(redfunc(outLists));
253 std::vector<std::string> fileNames(files.
GetNFiles());
256 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
258 TList *rl =
Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
268 std::vector<std::string> fileNames(filelist->
GetEntries());
270 for(
auto f : *filelist)
271 fileNames[count++] =
f->GetTitle();
273 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
281 std::vector<std::string> singleFileName(1, fileName);
282 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
293 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
300 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
307 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
314 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
320 return Process(tree, selector, noelist, nToProcess, jFirst);
331 while ((o = nxo())) { firstlist->
Add(o); }
333 lists.erase(lists.begin());
334 lists.insert(lists.begin(), firstlist);
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned long long ULong64_t
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Merge collection of TObjects.
unsigned fNProcessed
number of arguments already passed to the workers
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
A chain is a collection of files containing TTree objects.
TObjArray * GetListOfFiles() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
A List of entry numbers in a TTree or TChain.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Long64_t GetNFiles() const
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
void Add(TObject *obj) override
Base class for multiprocess applications' clients.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign selector tree processing.
Int_t GetEntries() const override
Return the number of objects in array (i.e.
Mother of all ROOT objects.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
virtual TList * GetOutputList() const
virtual void Begin(TTree *)
A TTree represents a columnar dataset.
@ kSendResult
Ask for a kFuncResult/kProcResult.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...