Logo ROOT  
Reference Guide
TTreeProcessorMP.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
19#include "ROOT/TypeTraits.hxx" // InvokeResult_t
20#include "TChain.h"
21#include "TChainElement.h"
22#include "TError.h"
23#include "TFileCollection.h"
24#include "TFileInfo.h"
25#include "THashList.h"
26#include "TMPClient.h"
27#include "TMPWorkerTree.h"
28#include "TSelector.h"
29#include "TTreeReader.h"
30#include <algorithm> //std::generate
31#include <numeric> //std::iota
32#include <string>
33#include <functional> //std::reference_wrapper
34#include <vector>
35
36namespace ROOT {
37
38class TTreeProcessorMP : private TMPClient {
39 template <typename F, typename... Args>
41
42public:
43 explicit TTreeProcessorMP(UInt_t nWorkers = 0); //default number of workers is the number of processors
44 ~TTreeProcessorMP() = default;
45 //it doesn't make sense for a TTreeProcessorMP to be copied
48
49 /// \brief Process a TTree dataset with a functor
50 /// \tparam F functor returning a pointer to TObject or inheriting classes and
51 /// taking a TTreeReader& (both enforced at compile-time)
52 ///
53 /// Dataset definition:
54 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
55 /// \param[in] fileName string with the path of the files with the TTree to process
56 /// \param[in] collection TFileCollection with the files with the TTree to process
57 /// \param[in] chain TChain with the files with the TTree to process
58 /// \param[in] tree TTree to process
59 ///
60 /// \param[in] entries TEntryList to filter the dataset
61 /// \param[in] treeName Name of the TTree to process
62 /// \param[in] nToProcess Number of entries to process (0 means all)
63 /// \param[in] jFirst First entry to process (0 means the first of the first file)
64 ///
65 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
66 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
68 template<class F> auto Process(const std::string& fileName, F procFunc, TEntryList &entries,
69 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
71 template<class F> auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
72 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
74 template<class F> auto Process(TChain& chain, F procFunc, TEntryList &entries,
75 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
77 template<class F> auto Process(TTree& tree, F procFunc, TEntryList &entries,
78 ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
80
81 /// \brief Process a TTree dataset with a functor: version without entry list
82 /// \tparam F functor returning a pointer to TObject or inheriting classes and
83 /// taking a TTreeReader& (both enforced at compile-time)
84 ///
85 /// Dataset definition:
86 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
87 /// \param[in] fileName string with the path of the files with the TTree to process
88 /// \param[in] collection TFileCollection with the files with the TTree to process
89 /// \param[in] chain TChain with the files with the TTree to process
90 /// \param[in] tree TTree to process
91 ///
92 /// \param[in] treeName Name of the TTree to process
93 /// \param[in] nToProcess Number of entries to process (0 means all)
94 /// \param[in] jFirst First entry to process (0 means the first of the first file)
95 ///
96 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc,
97 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
99 template<class F> auto Process(const std::string& fileName, F procFunc,
100 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
102 template<class F> auto Process(TFileCollection& files, F procFunc,
103 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
105 template<class F> auto Process(TChain& files, F procFunc,
106 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
108 template<class F> auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
110
111
112 /// \brief Process a TTree dataset with a selector
113 ///
114 /// Dataset definition:
115 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
116 /// \param[in] fileName string with the path of the files with the TTree to process
117 /// \param[in] collection TFileCollection with the files with the TTree to process
118 /// \param[in] chain TChain with the files with the TTree to process
119 /// \param[in] tree TTree to process
120 ///
121 /// \param[in] selector Instance of TSelector to be applied to the dataset
122 /// \param[in] entries TEntryList to filter the dataset
123 /// \param[in] treeName Name of the TTree to process
124 /// \param[in] nToProcess Number of entries to process (0 means all)
125 /// \param[in] jFirst First entry to process (0 means the first of the first file)
126 ///
127 // these versions require a TSelector
128 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
129 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
130 TList* Process(const std::string &fileName, TSelector& selector, TEntryList &entries,
131 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
132 TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
133 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
134 TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
135 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
136 TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
137 ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
138
139
140 /// \brief Process a TTree dataset with a selector: version without entry list
141 ///
142 /// Dataset definition:
143 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
144 /// \param[in] fileName string with the path of the files with the TTree to process
145 /// \param[in] collection TFileCollection with the files with the TTree to process
146 /// \param[in] chain TChain with the files with the TTree to process
147 /// \param[in] tree TTree to process
148 ///
149 /// \param[in] selector Instance of TSelector to be applied to the dataset
150 /// \param[in] treeName Name of the TTree to process
151 /// \param[in] nToProcess Number of entries to process (0 means all)
152 /// \param[in] jFirst First entry to process (0 means the first of the first file)
153 ///
154 // these versions require a TSelector
155 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector,
156 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
157 TList* Process(const std::string &fileName, TSelector& selector,
158 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
159 TList* Process(TFileCollection& files, TSelector& selector,
160 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
161 TList* Process(TChain& files, TSelector& selector,
162 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
163 TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
164
166 unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
167
168private:
169 template<class T> void Collect(std::vector<T> &reslist);
170 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
171
172 void FixLists(std::vector<TObject*> &lists);
173 void Reset();
174 void ReplyToIdle(TSocket *s);
175
176 unsigned fNProcessed; ///< number of arguments already passed to the workers
177 unsigned fNToProcess; ///< total number of arguments to pass to the workers
178
179 /// A collection of the types of tasks that TTreeProcessorMP can execute.
180 /// It is used to interpret in the right way and properly reply to the
181 /// messages received (see, for example, TTreeProcessorMP::HandleInput)
182 enum class ETask : unsigned char {
183 kNoTask, ///< no task is being executed
184 kProcByRange, ///< a Process method is being executed and each worker will process a certain range of each file
185 kProcByFile ///< a Process method is being executed and each worker will process a different file
186 };
187
188 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
189};
190
191template<class F>
192auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
193 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
195{
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
200
201 // Warn for yet unimplemented functionality
202 if (jFirst > 0) {
203 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
204 jFirst = 0;
205 }
206
207 //prepare environment
208 Reset();
209 unsigned nWorkers = GetNWorkers();
210
211 // Check th entry list
212 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
213 //fork
214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
215 bool ok = Fork(worker);
216 if(!ok) {
217 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
218 return nullptr;
219 }
220
221
222 if(fileNames.size() < nWorkers) {
223 //TTree entry granularity. For each file, we divide entries equally between workers
224 fTaskType = ETask::kProcByRange;
225 //Tell workers to start processing entries
226 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
227 std::vector<unsigned> args(nWorkers);
228 std::iota(args.begin(), args.end(), 0);
229 fNProcessed = Broadcast(MPCode::kProcRange, args);
230 if(fNProcessed < nWorkers)
231 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
232 } else {
233 //file granularity. each worker processes one whole file as a single task
234 fTaskType = ETask::kProcByFile;
235 fNToProcess = fileNames.size();
236 std::vector<unsigned> args(nWorkers);
237 std::iota(args.begin(), args.end(), 0);
238 fNProcessed = Broadcast(MPCode::kProcFile, args);
239 if(fNProcessed < nWorkers)
240 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 }
242
243 //collect results, distribute new tasks
244 std::vector<TObject*> reslist;
245 Collect(reslist);
246
247 //merge
249 auto res = redfunc(reslist);
250
251 //clean-up and return
252 ReapWorkers();
253 fTaskType = ETask::kNoTask;
254 return static_cast<retType>(res);
255}
256
257
258template<class F>
259auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc, TEntryList &entries,
260 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
262{
263 std::vector<std::string> singleFileName(1, fileName);
264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
265}
266
267
268template<class F>
270 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
272{
273 std::vector<std::string> fileNames(files.GetNFiles());
274 unsigned count = 0;
275 for(auto f : *static_cast<THashList*>(files.GetList()))
276 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
277
278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
279}
280
281
282template<class F>
283auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
284 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
286{
287 TObjArray* filelist = files.GetListOfFiles();
288 std::vector<std::string> fileNames(filelist->GetEntries());
289 unsigned count = 0;
290 for(auto f : *filelist)
291 fileNames[count++] = f->GetTitle();
292
293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
294}
295
296
297template<class F>
299 ULong64_t nToProcess, ULong64_t jFirst)
301{
303 static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304
305 // Warn for yet unimplemented functionality
306 if (jFirst > 0) {
307 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
308 jFirst = 0;
309 }
310
311 //prepare environment
312 Reset();
313 unsigned nWorkers = GetNWorkers();
314
315 // Check th entry list
316 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
317 //fork
318 TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
319 bool ok = Fork(worker);
320 if(!ok) {
321 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
322 return nullptr;
323 }
324
325 //divide entries equally between workers
326 fTaskType = ETask::kProcByRange;
327
328 //tell workers to start processing entries
329 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
330 std::vector<unsigned> args(nWorkers);
331 std::iota(args.begin(), args.end(), 0);
332 fNProcessed = Broadcast(MPCode::kProcTree, args);
333 if(fNProcessed < nWorkers)
334 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
335
336 //collect results, distribute new tasks
337 std::vector<TObject*> reslist;
338 Collect(reslist);
339
340 //merge
342 auto res = redfunc(reslist);
343
344 //clean-up and return
345 ReapWorkers();
346 fTaskType = ETask::kNoTask;
347 return static_cast<retType>(res);
348}
349
350
351///
352/// No TEntryList versions of generic processor
353///
354
355template<class F>
356auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,
357 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
359{
360 TEntryList noelist;
361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
362}
363
364
365template<class F>
366auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,
367 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
369{
370 TEntryList noelist;
371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
372}
373
374
375template<class F>
377 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
379{
380 TEntryList noelist;
381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
382}
383
384
385template<class F>
386auto TTreeProcessorMP::Process(TChain& files, F procFunc,
387 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
389{
390 TEntryList noelist;
391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
392}
393
394
395template<class F>
397 ULong64_t nToProcess, ULong64_t jFirst)
399{
400 TEntryList noelist;
401 return Process(tree, procFunc, noelist, nToProcess, jFirst);
402}
403
404//////////////////////////////////////////////////////////////////////////
405/// Handle message and reply to the worker
406template<class T>
407void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
408{
409 unsigned code = msg.first;
410 if (code == MPCode::kIdling) {
411 ReplyToIdle(s);
412 } else if(code == MPCode::kProcResult) {
413 if(msg.second != nullptr)
414 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
416 } else if(code == MPCode::kProcError) {
417 const char *str = ReadBuffer<const char*>(msg.second.get());
418 Error("TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
420 ReplyToIdle(s);
421 delete [] str;
422 } else {
423 // UNKNOWN CODE
424 Error("TTreeProcessorMP::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
425 }
426}
427
428//////////////////////////////////////////////////////////////////////////
429/// Listen for messages sent by the workers and call the appropriate handler function.
430/// TTreeProcessorMP::HandlePoolCode is called on messages with a code < 1000 and
431/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
432template<class T>
433void TTreeProcessorMP::Collect(std::vector<T> &reslist)
434{
435 TMonitor &mon = GetMonitor();
436 mon.ActivateAll();
437 while (mon.GetActive() > 0) {
438 TSocket *s = mon.Select();
439 MPCodeBufPair msg = MPRecv(s);
440 if (msg.first == MPCode::kRecvError) {
441 Error("TTreeProcessorMP::Collect", "[E][C] Lost connection to a worker");
442 Remove(s);
443 } else if (msg.first < 1000)
444 HandlePoolCode(msg, s, reslist);
445 else
446 HandleMPCode(msg, s);
447 }
448}
449
450} // ROOT namespace
451
452#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:32
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define f(i)
Definition: RSha256.hxx:104
unsigned long long ULong64_t
Definition: RtypesCore.h:81
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition: TError.cxx:231
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
Merge collection of TObjects.
Definition: PoolUtils.h:35
This class provides an interface to process a TTree dataset in parallel with multi-process technology...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
unsigned GetNWorkers() const
void SetNWorkers(unsigned n)
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
std::result_of_t< F(Args...)> InvokeResult_t
An adapter for std::invoke_result that falls back to std::result_of if the former is not available.
Definition: TypeTraits.hxx:201
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition: TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition: THashList.h:34
A doubly linked list.
Definition: TList.h:38
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:334
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
TMonitor & GetMonitor()
Definition: TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:300
Templated derivation of TMPWorkerTree handlign generic function tree processing.
Definition: TMPWorkerTree.h:79
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
An array of TObjects.
Definition: TObjArray.h:31
Int_t GetEntries() const override
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:523
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:31
A TTree represents a columnar dataset.
Definition: TTree.h:79
const Int_t n
Definition: legend1.C:16
#define F(x, y, z)
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition: MPCode.h:38
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition: MPCode.h:39
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition: MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition: MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
static constexpr double s
Definition: tree.py:1