Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMP.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "TEnv.h"
15#include "TMPWorkerTree.h"
16
17#include <thread>
18static const std::thread::id MAIN_THREAD_ID = std::this_thread::get_id();
19
20//////////////////////////////////////////////////////////////////////////
21///
22/// \class ROOT::TTreeProcessorMP
23/// \ingroup Parallelism
24/// \brief This class provides an interface to process a TTree dataset
25/// in parallel with multi-process technology
26///
27/// ###ROOT::TTreeProcessorMP::Process
28/// The possible usages of the Process method are the following:\n
29/// * `Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess)`:
30/// func is executed nToProcess times with argument a TTreeReader&, initialized for
31/// the TTree with name treeName, from the dataset `<dataset>`. The dataset can be
32/// expressed as:
33/// \code{.cpp}
34/// const std::string& fileName -> single file name
35/// const std::vector<std::string>& fileNames -> vector of file names
36/// TFileCollection& files -> collection of TFileInfo objects
37/// TChain& files -> TChain with the file paths
38/// TTree& tree -> Reference to an existing TTree object
39/// \endcode
40/// For legacy, the following signature is also supported:
41/// * `Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess)`:
42/// where selector is a TSelector derived class describing the analysis and the other arguments
43/// have the same meaning as above.
44///
45/// For either set of signatures, the processing function is executed as many times as
46/// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
47/// or set via SetNWorkers. It defaults to the number of cores.\n
48/// A collection containing the result of each execution is returned.\n
49/// **Note:** the user is responsible for the deletion of any object that might
50/// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
51/// deletes what it returns, it simply forgets it.\n
52/// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
53/// executed takes more than a few seconds, otherwise the overhead introduced
54/// by Process will outrun the benefits of parallel execution on most machines.
55///
56/// \param func
57/// \parblock
58/// a lambda expression, an std::function, a loaded macro, a
59/// functor class or a function that takes zero arguments (for the first signature)
60/// or one (for the second signature).
61/// \endparblock
62/// \param args
63/// \parblock
64/// a standard container (vector, list, deque), an initializer list
65/// or a pointer to a TCollection (TList*, TObjArray*, ...).
66/// \endparblock
67/// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
68/// in the overhead of copying data from the TCollection to an STL container. Only
69/// use it when absolutely necessary.\n
70/// **Note:** in cases where the function to be executed takes more than
71/// zero/one argument but all are fixed except zero/one, the function can be wrapped
72/// in a lambda or via std::bind to give it the right signature.\n
73/// **Note:** the user should take care of initializing random seeds differently in each
74/// process (e.g. using the process id in the seed). Otherwise several parallel executions
75/// might generate the same sequence of pseudo-random numbers.
76///
77/// #### Return value:
78/// Methods taking 'F func' return the return type of F.
79/// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
80/// content is owned by the caller.
81///
82/// #### Examples:
83///
84/// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
85///
86//////////////////////////////////////////////////////////////////////////
87
88namespace ROOT {
89//////////////////////////////////////////////////////////////////////////
90/// Class constructor.
91/// nWorkers is the number of times this ROOT session will be forked, i.e.
92/// the number of workers that will be spawned.
97
98//////////////////////////////////////////////////////////////////////////
99/// TSelector-based tree processing: memory resident tree
102{
103
104 // Warn for yet unimplemented functionality
105 if (jFirst > 0) {
106 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
107 jFirst = 0;
108 }
109
110 // Warn about forking of already threaded program
111 if (std::this_thread::get_id() != MAIN_THREAD_ID) {
112 Warning("Process",
113 "multiprocessing from an already multi-threaded program "
114 "(first thread '%zu' vs current thread id '%zu') might lead to inefficient forking",
115 std::hash<std::thread::id>{}(MAIN_THREAD_ID), std::hash<std::thread::id>{}(std::this_thread::get_id()));
116 }
117
118 //prepare environment
119 Reset();
121 selector.Begin(nullptr);
122
123 // Check the entry list
124 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
125 //fork
126 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
127 bool ok = Fork(worker);
128 if(!ok) {
129 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
130 return nullptr;
131 }
132
133 //divide entries equally between workers
135
136 //tell workers to start processing entries
137 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
138 std::vector<UInt_t> args(nWorkers);
139 std::iota(args.begin(), args.end(), 0);
141 if (fNProcessed < nWorkers)
142 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
143 " Some entries might not be processed.");
144
145 //collect results, distribute new tasks
146 std::vector<TObject*> outLists;
148
149 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
151
153 auto outList = static_cast<TList*>(redfunc(outLists));
154
155 // Import the resulting list in the selector
156 selector.ImportOutput(outList);
157 // outList is empty after this: just delete it
158 delete outList;
159
160 // Finalize the selector tasks
161 selector.Terminate();
162
163 //clean-up and return
164 ReapWorkers();
166 return selector.GetOutputList();
167}
168
169//////////////////////////////////////////////////////////////////////////
170/// TSelector-based tree processing: dataset as a vector of files
171TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
172 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
173{
174
175 // Warn for yet unimplemented functionality
176 if (jFirst > 0) {
177 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
178 jFirst = 0;
179 }
180
181 //prepare environment
182 Reset();
184 selector.Begin(nullptr);
185
186 // Check the entry list
187 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
188 //fork
189 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
190 bool ok = Fork(worker);
191 if (!ok) {
192 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
193 return nullptr;
194 }
195
196 Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
197
198 if (procByFile) {
199 if (fileNames.size() < nWorkers) {
200 // TTree entry granularity: for each file, we divide entries equally between workers
202 // Tell workers to start processing entries
203 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
204 std::vector<UInt_t> args(nWorkers);
205 std::iota(args.begin(), args.end(), 0);
207 if (fNProcessed < nWorkers)
208 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
209 " Some entries might not be processed");
210 } else {
211 // File granularity: each worker processes one whole file as a single task
213 fNToProcess = fileNames.size();
214 std::vector<UInt_t> args(nWorkers);
215 std::iota(args.begin(), args.end(), 0);
217 if (fNProcessed < nWorkers)
218 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
219 " Some entries might not be processed.");
220 }
221 } else {
222 // TTree entry granularity: for each file, we divide entries equally between workers
224 // Tell workers to start processing entries
225 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
226 std::vector<UInt_t> args(nWorkers);
227 std::iota(args.begin(), args.end(), 0);
229 if (fNProcessed < nWorkers)
230 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
231 " Some entries might not be processed.");
232 }
233
234 // collect results, distribute new tasks
235 std::vector<TObject*> outLists;
237
238 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
240
242 auto outList = static_cast<TList*>(redfunc(outLists));
243
244 // Import the resulting list in the selector
245 selector.ImportOutput(outList);
246 // outList is empty after this: just delete it
247 delete outList;
248
249 // Finalize the selector tasks
250 selector.Terminate();
251
252 //clean-up and return
253 ReapWorkers();
255
256 return selector.GetOutputList();
257}
258
259//////////////////////////////////////////////////////////////////////////
260/// TSelector-based tree processing: dataset as a TFileCollection
262 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
263{
264 std::vector<std::string> fileNames(files.GetNFiles());
265 UInt_t count = 0;
266 for(auto f : *static_cast<THashList*>(files.GetList()))
267 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
268
269 TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
270 return rl;
271}
272
273//////////////////////////////////////////////////////////////////////////
274/// TSelector-based tree processing: dataset as a TChain
275TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
277{
278 TObjArray* filelist = files.GetListOfFiles();
279 std::vector<std::string> fileNames(filelist->GetEntries());
280 UInt_t count = 0;
281 for(auto f : *filelist)
282 fileNames[count++] = f->GetTitle();
283
284 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
285}
286
287//////////////////////////////////////////////////////////////////////////
288/// TSelector-based tree processing: dataset as a single file
289TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
290 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
291{
292 std::vector<std::string> singleFileName(1, fileName);
293 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
294}
295
296///
297/// No TEntryList versions of selector processor
298///
299
300TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
301 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
302{
304 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
305}
306
307TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
309{
311 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
312}
313
320
327
333
334/// Fix list of lists before merging (to avoid errors about duplicated objects)
335void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
336
337 // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
338 TList *firstlist = new TList;
339 TList *oldlist = (TList *) lists[0];
341 TObject *o = nullptr;
342 while ((o = nxo())) { firstlist->Add(o); }
343 oldlist->SetOwner(false);
344 lists.erase(lists.begin());
345 lists.insert(lists.begin(), firstlist);
346 delete oldlist;
347}
348
349//////////////////////////////////////////////////////////////////////////
350/// Reset TTreeProcessorMP's state.
357
358//////////////////////////////////////////////////////////////////////////
359/// Reply to a worker who is idle.
360/// If still events to process, tell the worker. Otherwise
361/// ask for a result
363{
364 if (fNProcessed < fNToProcess) {
365 //we are executing a "greedy worker" task
368 else if (fTaskType == ETask::kProcByFile)
370 ++fNProcessed;
371 } else
373}
374
375} // namespace ROOT
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define f(i)
Definition RSha256.hxx:104
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:208
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
static const std::thread::id MAIN_THREAD_ID
Merge collection of TObjects.
Definition PoolUtils.h:35
const_iterator begin() const
unsigned fNProcessed
number of arguments already passed to the workers
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
bool IsValid() const
Definition TEntryList.h:84
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
A doubly linked list.
Definition TList.h:38
Base class for multiprocess applications' clients.
Definition TMPClient.h:23
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign selector tree processing.
An array of TObjects.
Definition TObjArray.h:31
Mother of all ROOT objects.
Definition TObject.h:42
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
virtual void ImportOutput(TList *output)
Imports the content of 'output' in the internal output list.
virtual TList * GetOutputList() const
Definition TSelector.h:69
virtual void Begin(TTree *)
Definition TSelector.h:54
virtual void Terminate()
Definition TSelector.h:71
A TTree represents a columnar dataset.
Definition TTree.h:89
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition MPCode.h:36
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40