102 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
108 UInt_t nWorkers = GetNWorkers();
109 selector.
Begin(
nullptr);
115 bool ok = Fork(worker);
117 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
122 fTaskType = ETask::kProcByRange;
125 fNToProcess = nWorkers;
126 std::vector<UInt_t> args(nWorkers);
127 std::iota(args.begin(), args.end(), 0);
129 if (fNProcessed < nWorkers)
130 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
131 " Some entries might not be processed.");
134 std::vector<TObject*> outLists;
141 auto outList =
static_cast<TList*
>(redfunc(outLists));
153 fTaskType = ETask::kNoTask;
165 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
171 UInt_t nWorkers = GetNWorkers();
172 selector.
Begin(
nullptr);
177 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
178 bool ok = Fork(worker);
180 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
187 if (fileNames.size() < nWorkers) {
189 fTaskType = ETask::kProcByRange;
191 fNToProcess = nWorkers*fileNames.size();
192 std::vector<UInt_t> args(nWorkers);
193 std::iota(args.begin(), args.end(), 0);
195 if (fNProcessed < nWorkers)
196 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
197 " Some entries might not be processed");
200 fTaskType = ETask::kProcByFile;
201 fNToProcess = fileNames.size();
202 std::vector<UInt_t> args(nWorkers);
203 std::iota(args.begin(), args.end(), 0);
205 if (fNProcessed < nWorkers)
206 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
207 " Some entries might not be processed.");
211 fTaskType = ETask::kProcByRange;
213 fNToProcess = nWorkers*fileNames.size();
214 std::vector<UInt_t> args(nWorkers);
215 std::iota(args.begin(), args.end(), 0);
217 if (fNProcessed < nWorkers)
218 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
219 " Some entries might not be processed.");
223 std::vector<TObject*> outLists;
230 auto outList =
static_cast<TList*
>(redfunc(outLists));
242 fTaskType = ETask::kNoTask;
252 std::vector<std::string> fileNames(files.
GetNFiles());
255 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
257 TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
267 std::vector<std::string> fileNames(filelist->
GetEntries());
269 for(
auto f : *filelist)
270 fileNames[count++] =
f->GetTitle();
272 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
280 std::vector<std::string> singleFileName(1, fileName);
281 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
288TList *TTreeProcessorMP::Process(
const std::vector<std::string> &fileNames,
TSelector &selector,
292 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
295TList *TTreeProcessorMP::Process(
const std::string &fileName,
TSelector &selector,
const std::string &treeName,
299 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
306 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
313 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
319 return Process(
tree, selector, noelist, nToProcess, jFirst);
323void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
330 while ((o = nxo())) { firstlist->
Add(o); }
332 lists.erase(lists.begin());
333 lists.insert(lists.begin(), firstlist);
339void TTreeProcessorMP::Reset()
343 fTaskType = ETask::kNoTask;
352 if (fNProcessed < fNToProcess) {
354 if (fTaskType == ETask::kProcByRange)
356 else if (fTaskType == ETask::kProcByFile)
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned long long ULong64_t
void Error(const char *location, const char *msgfmt,...)
void Warning(const char *location, const char *msgfmt,...)
Merge collection of TObjects.
TTreeProcessorMP(unsigned nWorkers=0)
A chain is a collection of files containing TTree objects.
TObjArray * GetListOfFiles() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
A List of entry numbers in a TTree or TChain.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Long64_t GetNFiles() const
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
virtual void Add(TObject *obj)
Base class for multiprocess applications' clients.
Templated derivation of TMPWorkerTree handlign selector tree processing.
Int_t GetEntries() const
Return the number of objects in array (i.e.
Mother of all ROOT objects.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
virtual TList * GetOutputList() const
virtual void Begin(TTree *)
A TTree represents a columnar dataset.
@ kSendResult
Ask for a kFuncResult/kProcResult.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
static constexpr double s