105 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
112 "multiprocessing from an already multi-threaded program "
113 "(first thread '%zu' vs current thread id '%zu') might lead to inefficient forking",
114 std::hash<std::thread::id>{}(
MAIN_THREAD_ID), std::hash<std::thread::id>{}(std::this_thread::get_id()));
120 selector.
Begin(
nullptr);
123 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
125 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
126 bool ok =
Fork(worker);
128 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
137 std::vector<UInt_t> args(nWorkers);
138 std::iota(args.begin(), args.end(), 0);
141 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
142 " Some entries might not be processed.");
145 std::vector<TObject*> outLists;
152 auto outList =
static_cast<TList*
>(redfunc(outLists));
176 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
183 selector.
Begin(
nullptr);
186 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
188 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
189 bool ok =
Fork(worker);
191 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
195 Int_t procByFile =
gEnv->GetValue(
"MultiProc.TestProcByFile", 0);
198 if (fileNames.size() < nWorkers) {
203 std::vector<UInt_t> args(nWorkers);
204 std::iota(args.begin(), args.end(), 0);
207 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
208 " Some entries might not be processed");
213 std::vector<UInt_t> args(nWorkers);
214 std::iota(args.begin(), args.end(), 0);
217 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
218 " Some entries might not be processed.");
225 std::vector<UInt_t> args(nWorkers);
226 std::iota(args.begin(), args.end(), 0);
229 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
230 " Some entries might not be processed.");
234 std::vector<TObject*> outLists;
241 auto outList =
static_cast<TList*
>(redfunc(outLists));
263 std::vector<std::string> fileNames(files.
GetNFiles());
266 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
268 TList *rl =
Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
278 std::vector<std::string> fileNames(filelist->
GetEntries());
280 for(
auto f : *filelist)
281 fileNames[count++] =
f->GetTitle();
283 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
291 std::vector<std::string> singleFileName(1, fileName);
292 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
303 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
310 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
317 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
324 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
330 return Process(tree, selector, noelist, nToProcess, jFirst);
341 while ((o = nxo())) { firstlist->
Add(o); }
343 lists.erase(lists.begin());
344 lists.insert(lists.begin(), firstlist);
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
int Int_t
Signed integer 4 bytes (int).
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
static const std::thread::id MAIN_THREAD_ID
Merge collection of TObjects.
unsigned fNProcessed
number of arguments already passed to the workers
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects).
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
A chain is a collection of files containing TTree objects.
TObjArray * GetListOfFiles() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Long64_t GetNFiles() const
Class describing a generic file including meta information.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
void Add(TObject *obj) override
TMPClient(unsigned nWorkers=0)
Class constructor.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign selector tree processing.
Int_t GetEntries() const override
Return the number of objects in array (i.e.
Mother of all ROOT objects.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
virtual TList * GetOutputList() const
virtual void Begin(TTree *)
This class implements client sockets.
A TTree represents a columnar dataset.
@ kSendResult
Ask for a kFuncResult/kProcResult.
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.