Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
TTreeProcessorMP.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TTreeProcessorMP
14#define ROOT_TTreeProcessorMP
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
19#include "ROOT/TypeTraits.hxx" // InvokeResult_t
20#include "TChain.h"
21#include "TChainElement.h"
22#include "TError.h"
23#include "TFileCollection.h"
24#include "TFileInfo.h"
25#include "THashList.h"
26#include "TMPClient.h"
27#include "TMPWorkerTree.h"
28#include "TSelector.h"
29#include "TTreeReader.h"
30#include <algorithm> //std::generate
31#include <numeric> //std::iota
32#include <string>
33#include <functional> //std::reference_wrapper
34#include <vector>
35
36namespace ROOT {
37
38class TTreeProcessorMP : private TMPClient {
39 template <typename F, typename... Args>
40 using InvokeResult_t = ROOT::TypeTraits::InvokeResult_t<F, Args...>;
41
42public:
43 explicit TTreeProcessorMP(UInt_t nWorkers = 0); //default number of workers is the number of processors
44 ~TTreeProcessorMP() = default;
45 //it doesn't make sense for a TTreeProcessorMP to be copied
48
49 /// \brief Process a TTree dataset with a functor
50 /// \tparam F functor returning a pointer to TObject or inheriting classes and
51 /// taking a TTreeReader& (both enforced at compile-time)
52 ///
53 /// Dataset definition:
54 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
55 /// \param[in] fileName string with the path of the files with the TTree to process
56 /// \param[in] collection TFileCollection with the files with the TTree to process
57 /// \param[in] chain TChain with the files with the TTree to process
58 /// \param[in] tree TTree to process
59 ///
60 /// \param[in] entries TEntryList to filter the dataset
61 /// \param[in] treeName Name of the TTree to process
62 /// \param[in] nToProcess Number of entries to process (0 means all)
63 /// \param[in] jFirst First entry to process (0 means the first of the first file)
64 ///
65 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
66 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
68 template<class F> auto Process(const std::string& fileName, F procFunc, TEntryList &entries,
69 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
71 template<class F> auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
72 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
74 template<class F> auto Process(TChain& chain, F procFunc, TEntryList &entries,
75 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
77 template<class F> auto Process(TTree& tree, F procFunc, TEntryList &entries,
78 ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
80
81 /// \brief Process a TTree dataset with a functor: version without entry list
82 /// \tparam F functor returning a pointer to TObject or inheriting classes and
83 /// taking a TTreeReader& (both enforced at compile-time)
84 ///
85 /// Dataset definition:
86 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
87 /// \param[in] fileName string with the path of the files with the TTree to process
88 /// \param[in] collection TFileCollection with the files with the TTree to process
89 /// \param[in] chain TChain with the files with the TTree to process
90 /// \param[in] tree TTree to process
91 ///
92 /// \param[in] treeName Name of the TTree to process
93 /// \param[in] nToProcess Number of entries to process (0 means all)
94 /// \param[in] jFirst First entry to process (0 means the first of the first file)
95 ///
96 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc,
97 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
99 template<class F> auto Process(const std::string& fileName, F procFunc,
100 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
102 template<class F> auto Process(TFileCollection& files, F procFunc,
103 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
105 template<class F> auto Process(TChain& files, F procFunc,
106 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
108 template<class F> auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
110
111
112 /// \brief Process a TTree dataset with a selector
113 ///
114 /// Dataset definition:
115 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
116 /// \param[in] fileName string with the path of the files with the TTree to process
117 /// \param[in] collection TFileCollection with the files with the TTree to process
118 /// \param[in] chain TChain with the files with the TTree to process
119 /// \param[in] tree TTree to process
120 ///
121 /// \param[in] selector Instance of TSelector to be applied to the dataset
122 /// \param[in] entries TEntryList to filter the dataset
123 /// \param[in] treeName Name of the TTree to process
124 /// \param[in] nToProcess Number of entries to process (0 means all)
125 /// \param[in] jFirst First entry to process (0 means the first of the first file)
126 ///
127 // these versions require a TSelector
128 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
129 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
130 TList* Process(const std::string &fileName, TSelector& selector, TEntryList &entries,
131 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
132 TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
133 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
134 TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
135 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
136 TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
137 ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
138
139
140 /// \brief Process a TTree dataset with a selector: version without entry list
141 ///
142 /// Dataset definition:
143 /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
144 /// \param[in] fileName string with the path of the files with the TTree to process
145 /// \param[in] collection TFileCollection with the files with the TTree to process
146 /// \param[in] chain TChain with the files with the TTree to process
147 /// \param[in] tree TTree to process
148 ///
149 /// \param[in] selector Instance of TSelector to be applied to the dataset
150 /// \param[in] treeName Name of the TTree to process
151 /// \param[in] nToProcess Number of entries to process (0 means all)
152 /// \param[in] jFirst First entry to process (0 means the first of the first file)
153 ///
154 // these versions require a TSelector
155 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector,
156 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
157 TList* Process(const std::string &fileName, TSelector& selector,
158 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
159 TList* Process(TFileCollection& files, TSelector& selector,
160 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
161 TList* Process(TChain& files, TSelector& selector,
162 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
163 TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
164
166 unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
167
168private:
169 template<class T> void Collect(std::vector<T> &reslist);
170 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
171
172 void FixLists(std::vector<TObject*> &lists);
173 void Reset();
174 void ReplyToIdle(TSocket *s);
175
176 unsigned fNProcessed; ///< number of arguments already passed to the workers
177 unsigned fNToProcess; ///< total number of arguments to pass to the workers
178
179 /// A collection of the types of tasks that TTreeProcessorMP can execute.
180 /// It is used to interpret in the right way and properly reply to the
181 /// messages received (see, for example, TTreeProcessorMP::HandleInput)
182 enum class ETask : unsigned char {
183 kNoTask, ///< no task is being executed
184 kProcByRange, ///< a Process method is being executed and each worker will process a certain range of each file
185 kProcByFile ///< a Process method is being executed and each worker will process a different file
186 };
187
188 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
189};
190
191template<class F>
192auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
193 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
195{
197 static_assert(std::is_constructible<TObject*, retType>::value,
198 "procFunc must return a pointer to a class inheriting from TObject,"
199 " and must take a reference to TTreeReader as the only argument");
200
201 // Warn for yet unimplemented functionality
202 if (jFirst > 0) {
203 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
204 jFirst = 0;
205 }
206
207 //prepare environment
208 Reset();
209 unsigned nWorkers = GetNWorkers();
210
211 // Check th entry list
212 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
213 //fork
214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
215 bool ok = Fork(worker);
216 if(!ok) {
217 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
218 return nullptr;
219 }
220
221
222 if(fileNames.size() < nWorkers) {
223 //TTree entry granularity. For each file, we divide entries equally between workers
225 //Tell workers to start processing entries
226 fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
227 std::vector<unsigned> args(nWorkers);
228 std::iota(args.begin(), args.end(), 0);
230 if(fNProcessed < nWorkers)
231 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
232 } else {
233 //file granularity. each worker processes one whole file as a single task
235 fNToProcess = fileNames.size();
236 std::vector<unsigned> args(nWorkers);
237 std::iota(args.begin(), args.end(), 0);
239 if(fNProcessed < nWorkers)
240 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 }
242
243 //collect results, distribute new tasks
244 std::vector<TObject*> reslist;
245 Collect(reslist);
246
247 //merge
249 auto res = redfunc(reslist);
250
251 //clean-up and return
252 ReapWorkers();
254 return static_cast<retType>(res);
255}
256
257
258template<class F>
259auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc, TEntryList &entries,
260 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
262{
263 std::vector<std::string> singleFileName(1, fileName);
264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
265}
266
267
268template<class F>
270 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
272{
273 std::vector<std::string> fileNames(files.GetNFiles());
274 unsigned count = 0;
275 for(auto f : *static_cast<THashList*>(files.GetList()))
276 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
277
278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
279}
280
281
282template<class F>
283auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
284 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
286{
287 TObjArray* filelist = files.GetListOfFiles();
288 std::vector<std::string> fileNames(filelist->GetEntries());
289 unsigned count = 0;
290 for(auto f : *filelist)
291 fileNames[count++] = f->GetTitle();
292
293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
294}
295
296
297template<class F>
298auto TTreeProcessorMP::Process(TTree& tree, F procFunc, TEntryList &entries,
299 ULong64_t nToProcess, ULong64_t jFirst)
301{
303 static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304
305 // Warn for yet unimplemented functionality
306 if (jFirst > 0) {
307 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
308 jFirst = 0;
309 }
310
311 //prepare environment
312 Reset();
313 unsigned nWorkers = GetNWorkers();
314
315 // Check th entry list
316 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
317 //fork
318 TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
319 bool ok = Fork(worker);
320 if(!ok) {
321 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
322 return nullptr;
323 }
324
325 //divide entries equally between workers
327
328 //tell workers to start processing entries
329 fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
330 std::vector<unsigned> args(nWorkers);
331 std::iota(args.begin(), args.end(), 0);
333 if(fNProcessed < nWorkers)
334 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
335
336 //collect results, distribute new tasks
337 std::vector<TObject*> reslist;
338 Collect(reslist);
339
340 //merge
342 auto res = redfunc(reslist);
343
344 //clean-up and return
345 ReapWorkers();
347 return static_cast<retType>(res);
348}
349
350
351///
352/// No TEntryList versions of generic processor
353///
354
355template<class F>
356auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,
357 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
359{
360 TEntryList noelist;
361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
362}
363
364
365template<class F>
366auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,
367 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
369{
370 TEntryList noelist;
371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
372}
373
374
375template<class F>
377 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
379{
380 TEntryList noelist;
381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
382}
383
384
385template<class F>
386auto TTreeProcessorMP::Process(TChain& files, F procFunc,
387 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
389{
390 TEntryList noelist;
391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
392}
393
394
395template<class F>
396auto TTreeProcessorMP::Process(TTree& tree, F procFunc,
397 ULong64_t nToProcess, ULong64_t jFirst)
399{
400 TEntryList noelist;
401 return Process(tree, procFunc, noelist, nToProcess, jFirst);
402}
403
404//////////////////////////////////////////////////////////////////////////
405/// Handle message and reply to the worker
406template<class T>
407void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
408{
409 unsigned code = msg.first;
410 if (code == MPCode::kIdling) {
411 ReplyToIdle(s);
412 } else if(code == MPCode::kProcResult) {
413 if(msg.second != nullptr)
414 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
416 } else if(code == MPCode::kProcError) {
417 const char *str = ReadBuffer<const char*>(msg.second.get());
418 Error("TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
419 "Continuing execution ignoring these entries.", str);
420 ReplyToIdle(s);
421 delete [] str;
422 } else {
423 // UNKNOWN CODE
424 Error("TTreeProcessorMP::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
425 }
426}
427
428//////////////////////////////////////////////////////////////////////////
429/// Listen for messages sent by the workers and call the appropriate handler function.
430/// TTreeProcessorMP::HandlePoolCode is called on messages with a code < 1000 and
431/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
432template<class T>
433void TTreeProcessorMP::Collect(std::vector<T> &reslist)
434{
435 TMonitor &mon = GetMonitor();
436 mon.ActivateAll();
437 while (mon.GetActive() > 0) {
438 TSocket *s = mon.Select();
439 MPCodeBufPair msg = MPRecv(s);
440 if (msg.first == MPCode::kRecvError) {
441 Error("TTreeProcessorMP::Collect", "[E][C] Lost connection to a worker");
442 Remove(s);
443 } else if (msg.first < 1000)
444 HandlePoolCode(msg, s, reslist);
445 else
446 HandleMPCode(msg, s);
447 }
448}
449
450} // ROOT namespace
451
452#endif
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
T ReadBuffer(TBufferFile *buf)
One of the template functions used to read objects from messages.
Definition MPSendRecv.h:158
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
#define f(i)
Definition RSha256.hxx:104
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
Definition RtypesCore.h:60
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
Merge collection of TObjects.
Definition PoolUtils.h:35
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
unsigned fNProcessed
number of arguments already passed to the workers
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
TTreeProcessorMP(const TTreeProcessorMP &)=delete
void Reset()
Reset TTreeProcessorMP's state.
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects).
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TTreeProcessorMP(UInt_t nWorkers=0)
Class constructor.
ETask
A collection of the types of tasks that TTreeProcessorMP can execute.
@ kNoTask
no task is being executed
@ kProcByRange
a Process method is being executed and each worker will process a certain range of each file
@ kProcByFile
a Process method is being executed and each worker will process a different file
ETask fTaskType
the kind of task that is being executed, if any
unsigned fNToProcess
total number of arguments to pass to the workers
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Process(const std::vector< std::string > &fileNames, F procFunc, TEntryList &entries, const std::string &treeName="", ULong64_t nToProcess=0, ULong64_t jFirst=0) -> InvokeResult_t< F, std::reference_wrapper< TTreeReader > >
Process a TTree dataset with a functor.
TTreeProcessorMP & operator=(const TTreeProcessorMP &)=delete
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Class describing a generic file including meta information.
Definition TFileInfo.h:39
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
A doubly linked list.
Definition TList.h:38
unsigned GetNWorkers() const
Definition TMPClient.h:40
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition TMPClient.cxx:51
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork().
Definition TMPClient.h:39
TMonitor & GetMonitor()
Definition TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual void ActivateAll()
Activate all de-activated sockets.
Definition TMonitor.cxx:267
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:321
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:437
An array of TObjects.
Definition TObjArray.h:31
Int_t GetEntries() const override
Return the number of objects in array (i.e.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
This class implements client sockets.
Definition TSocket.h:39
A TTree represents a columnar dataset.
Definition TTree.h:89
const Int_t n
Definition legend1.C:16
#define F(x, y, z)
@ kProcFile
Tell a TMPWorkerTree which tree to process. The object sent is a TreeInfo.
Definition MPCode.h:38
@ kRecvError
Error while reading from the socket.
Definition MPCode.h:51
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kProcRange
Tell a TMPWorkerTree which tree and entries range to process. The object sent is a TreeRangeInfo.
Definition MPCode.h:39
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition MPCode.h:49
@ kProcTree
Tell a TMPWorkerTree to process the tree that was passed to it at construction time.
Definition MPCode.h:40
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42