Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMP.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "TEnv.h"
15#include "TMPWorkerTree.h"
16
17#include <thread>
18static const std::thread::id MAIN_THREAD_ID = std::this_thread::get_id();
19
20//////////////////////////////////////////////////////////////////////////
21///
22/// \class ROOT::TTreeProcessorMP
23/// \brief This class provides an interface to process a TTree dataset
24/// in parallel with multi-process technology
25///
26/// ###ROOT::TTreeProcessorMP::Process
27/// The possible usages of the Process method are the following:\n
28/// * `Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess)`:
29/// func is executed nToProcess times with argument a TTreeReader&, initialized for
30/// the TTree with name treeName, from the dataset `<dataset>`. The dataset can be
31/// expressed as:
32/// \code{.cpp}
33/// const std::string& fileName -> single file name
34/// const std::vector<std::string>& fileNames -> vector of file names
35/// TFileCollection& files -> collection of TFileInfo objects
36/// TChain& files -> TChain with the file paths
37/// TTree& tree -> Reference to an existing TTree object
38/// \endcode
39/// For legacy, the following signature is also supported:
40/// * `Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)`:
41/// where selector is a TSelector derived class describing the analysis and the other arguments
42/// have the same meaning as above.
43///
44/// For either set of signatures, the processing function is executed as many times as
45/// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
46/// or set via SetNWorkers. It defaults to the number of cores.\n
47/// A collection containing the result of each execution is returned.\n
48/// **Note:** the user is responsible for the deletion of any object that might
49/// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
50/// deletes what it returns, it simply forgets it.\n
51/// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
52/// executed takes more than a few seconds, otherwise the overhead introduced
53/// by Process will outrun the benefits of parallel execution on most machines.
54///
55/// \param func
56/// \parblock
57/// a lambda expression, an std::function, a loaded macro, a
58/// functor class or a function that takes zero arguments (for the first signature)
59/// or one (for the second signature).
60/// \endparblock
61/// \param args
62/// \parblock
63/// a standard container (vector, list, deque), an initializer list
64/// or a pointer to a TCollection (TList*, TObjArray*, ...).
65/// \endparblock
66/// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
67/// in the overhead of copying data from the TCollection to an STL container. Only
68/// use it when absolutely necessary.\n
69/// **Note:** in cases where the function to be executed takes more than
70/// zero/one argument but all are fixed except zero/one, the function can be wrapped
71/// in a lambda or via std::bind to give it the right signature.\n
72/// **Note:** the user should take care of initializing random seeds differently in each
73/// process (e.g. using the process id in the seed). Otherwise several parallel executions
74/// might generate the same sequence of pseudo-random numbers.
75///
76/// #### Return value:
77/// Methods taking 'F func' return the return type of F.
78/// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
79/// content is owned by the caller.
80///
81/// #### Examples:
82///
83/// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
84///
85//////////////////////////////////////////////////////////////////////////
86
87namespace ROOT {
88//////////////////////////////////////////////////////////////////////////
89/// Class constructor.
90/// nWorkers is the number of times this ROOT session will be forked, i.e.
91/// the number of workers that will be spawned.
96
97//////////////////////////////////////////////////////////////////////////
98/// TSelector-based tree processing: memory resident tree
101{
102
103 // Warn for yet unimplemented functionality
104 if (jFirst > 0) {
105 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
106 jFirst = 0;
107 }
108
109 // Warn about forking of already threaded program
110 if (std::this_thread::get_id() != MAIN_THREAD_ID) {
111 Warning("Process",
112 "multiprocessing from an already multi-threaded program "
113 "(first thread '%zu' vs current thread id '%zu') might lead to inefficient forking",
114 std::hash<std::thread::id>{}(MAIN_THREAD_ID), std::hash<std::thread::id>{}(std::this_thread::get_id()));
115 }
116
117 //prepare environment
118 Reset();
120 selector.Begin(nullptr);
121
122 // Check the entry list
123 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
124 //fork
125 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
126 bool ok = Fork(worker);
127 if(!ok) {
128 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
129 return nullptr;
130 }
131
132 //divide entries equally between workers
134
135 //tell workers to start processing entries
136 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
137 std::vector<UInt_t> args(nWorkers);
138 std::iota(args.begin(), args.end(), 0);
140 if (fNProcessed < nWorkers)
141 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
142 " Some entries might not be processed.");
143
144 //collect results, distribute new tasks
145 std::vector<TObject*> outLists;
147
148 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
150
152 auto outList = static_cast<TList*>(redfunc(outLists));
153
154 // Import the resulting list in the selector
155 selector.ImportOutput(outList);
156 // outList is empty after this: just delete it
157 delete outList;
158
159 // Finalize the selector tasks
160 selector.Terminate();
161
162 //clean-up and return
163 ReapWorkers();
165 return selector.GetOutputList();
166}
167
168//////////////////////////////////////////////////////////////////////////
169/// TSelector-based tree processing: dataset as a vector of files
170TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
171 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
172{
173
174 // Warn for yet unimplemented functionality
175 if (jFirst > 0) {
176 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
177 jFirst = 0;
178 }
179
180 //prepare environment
181 Reset();
183 selector.Begin(nullptr);
184
185 // Check the entry list
186 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
187 //fork
188 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
189 bool ok = Fork(worker);
190 if (!ok) {
191 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
192 return nullptr;
193 }
194
195 Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
196
197 if (procByFile) {
198 if (fileNames.size() < nWorkers) {
199 // TTree entry granularity: for each file, we divide entries equally between workers
201 // Tell workers to start processing entries
202 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
203 std::vector<UInt_t> args(nWorkers);
204 std::iota(args.begin(), args.end(), 0);
206 if (fNProcessed < nWorkers)
207 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
208 " Some entries might not be processed");
209 } else {
210 // File granularity: each worker processes one whole file as a single task
212 fNToProcess = fileNames.size();
213 std::vector<UInt_t> args(nWorkers);
214 std::iota(args.begin(), args.end(), 0);
216 if (fNProcessed < nWorkers)
217 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
218 " Some entries might not be processed.");
219 }
220 } else {
221 // TTree entry granularity: for each file, we divide entries equally between workers
223 // Tell workers to start processing entries
224 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
225 std::vector<UInt_t> args(nWorkers);
226 std::iota(args.begin(), args.end(), 0);
228 if (fNProcessed < nWorkers)
229 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
230 " Some entries might not be processed.");
231 }
232
233 // collect results, distribute new tasks
234 std::vector<TObject*> outLists;
236
237 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
239
241 auto outList = static_cast<TList*>(redfunc(outLists));
242
243 // Import the resulting list in the selector
244 selector.ImportOutput(outList);
245 // outList is empty after this: just delete it
246 delete outList;
247
248 // Finalize the selector tasks
249 selector.Terminate();
250
251 //clean-up and return
252 ReapWorkers();
254
255 return selector.GetOutputList();
256}
257
258//////////////////////////////////////////////////////////////////////////
259/// TSelector-based tree processing: dataset as a TFileCollection
261 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
262{
263 std::vector<std::string> fileNames(files.GetNFiles());
264 UInt_t count = 0;
265 for(auto f : *static_cast<THashList*>(files.GetList()))
266 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
267
268 TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
269 return rl;
270}
271
272//////////////////////////////////////////////////////////////////////////
273/// TSelector-based tree processing: dataset as a TChain
274TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
276{
277 TObjArray* filelist = files.GetListOfFiles();
278 std::vector<std::string> fileNames(filelist->GetEntries());
279 UInt_t count = 0;
280 for(auto f : *filelist)
281 fileNames[count++] = f->GetTitle();
282
283 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
284}
285
286//////////////////////////////////////////////////////////////////////////
287/// TSelector-based tree processing: dataset as a single file
288TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
289 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
290{
291 std::vector<std::string> singleFileName(1, fileName);
292 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
293}
294
295///
296/// No TEntryList versions of selector processor
297///
298
299TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
300 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
301{
303 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
304}
305
306TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
308{
310 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
311}
312
319
326
332
333/// Fix list of lists before merging (to avoid errors about duplicated objects)
334void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
335
336 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
337 TList *firstlist = new TList;
338 TList *oldlist = (TList *) lists[0];
340 TObject *o = nullptr;
341 while ((o = nxo())) { firstlist->Add(o); }
342 oldlist->SetOwner(false);
343 lists.erase(lists.begin());
344 lists.insert(lists.begin(), firstlist);
345 delete oldlist;
346}
347
348//////////////////////////////////////////////////////////////////////////
349/// Reset TTreeProcessorMP's state.
356
357//////////////////////////////////////////////////////////////////////////
358/// Reply to a worker who is idle.
359/// If still events to process, tell the worker. Otherwise
360/// ask for a result
362{
363 if (fNProcessed < fNToProcess) {
364 //we are executing a "greedy worker" task
367 else if (fTaskType == ETask::kProcByFile)
369 ++fNProcessed;
370 } else
372}
373
374} // namespace ROOT
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define f(i)
Definition RSha256.hxx:104
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:208
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
static const std::thread::id MAIN_THREAD_ID
Merge collection of TObjects.
Definition PoolUtils.h:35
const_iterator begin() const
unsigned fNProcessed
number of arguments already passed to the workers
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
bool IsValid() const
Definition TEntryList.h:84
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
A doubly linked list.
Definition TList.h:38
Base class for multiprocess applications' clients.
Definition TMPClient.h:23
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign selector tree processing.
An array of TObjects.
Definition TObjArray.h:31
Mother of all ROOT objects.
Definition TObject.h:42
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
virtual TList * GetOutputList() const
Definition TSelector.h:69
virtual void Begin(TTree *)
Definition TSelector.h:54
virtual void Terminate()
Definition TSelector.h:71
This class implements client sockets.
Definition TSocket.h:39
A TTree represents a columnar dataset.
Definition TTree.h:89
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40