Logo ROOT   6.07/09
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TProcessExecutor
13 #define ROOT_TProcessExecutor
14 
15 #include "MPCode.h"
16 #include "MPSendRecv.h"
17 #include "PoolUtils.h"
18 #include "TChain.h"
19 #include "TChainElement.h"
20 #include "TError.h"
21 #include "TFileCollection.h"
22 #include "TFileInfo.h"
23 #include "THashList.h"
24 #include "TMPClient.h"
25 #include "ROOT/TExecutor.hxx"
26 #include "TPoolProcessor.h"
27 #include "TPoolWorker.h"
28 #include "TSelector.h"
29 #include "TTreeReader.h"
30 #include <algorithm> //std::generate
31 #include <numeric> //std::iota
32 #include <string>
33 #include <type_traits> //std::result_of, std::enable_if
34 #include <functional> //std::reference_wrapper
35 #include <vector>
36 
37 namespace ROOT {
38 
39 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
40 public:
41  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
42  ~TProcessExecutor() = default;
43  //it doesn't make sense for a TProcessExecutor to be copied
44  TProcessExecutor(const TProcessExecutor &) = delete;
45  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
46 
47  // Map
48  template<class F, class Cond = noReferenceCond<F>>
49  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
50  /// \cond
51  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
53  template<class F, class T, class Cond = noReferenceCond<F, T>>
54  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
55  /// \endcond
57 
58  // ProcTree
59  // these versions requires that procFunc returns a ptr to TObject or inheriting classes and takes a TTreeReader& (both enforced at compile-time)
60  template<class F> auto ProcTree(const std::vector<std::string>& fileNames, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
61  template<class F> auto ProcTree(const std::string& fileName, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
62  template<class F> auto ProcTree(TFileCollection& files, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
63  template<class F> auto ProcTree(TChain& files, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
64  template<class F> auto ProcTree(TTree& tree, F procFunc, ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
65  // these versions require a TSelector
66  TList* ProcTree(const std::vector<std::string>& fileNames, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
67  TList* ProcTree(const std::string &fileName, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
68  TList* ProcTree(TFileCollection& files, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
69  TList* ProcTree(TChain& files, TSelector& selector, const std::string& treeName = "", ULong64_t nToProcess = 0);
70  TList* ProcTree(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0);
71 
72  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
73  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
74 
75  template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc)-> decltype(redfunc(objs.front(), objs.front())) = delete;
77 
78 private:
79  template<class T> void Collect(std::vector<T> &reslist);
80  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
81 
82  void FixLists(std::vector<TObject*> &lists);
83  void Reset();
84  void ReplyToFuncResult(TSocket *s);
85  void ReplyToIdle(TSocket *s);
86 
87  unsigned fNProcessed; ///< number of arguments already passed to the workers
88  unsigned fNToProcess; ///< total number of arguments to pass to the workers
89 
90  /// A collection of the types of tasks that TProcessExecutor can execute.
91  /// It is used to interpret in the right way and properly reply to the
92  /// messages received (see, for example, TProcessExecutor::HandleInput)
93  enum class ETask : unsigned char {
94  kNoTask, ///< no task is being executed
95  kMap, ///< a Map method with no arguments is being executed
96  kMapWithArg, ///< a Map method with arguments is being executed
97  kProcByRange, ///< a ProcTree method is being executed and each worker will process a certain range of each file
98  kProcByFile, ///< a ProcTree method is being executed and each worker will process a different file
99  };
100 
101  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
102 };
103 
104 
105 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
106 
107 //////////////////////////////////////////////////////////////////////////
108 /// Execute func (with no arguments) nTimes in parallel.
109 /// A vector containg executions' results is returned.
110 /// Functions that take more than zero arguments can be executed (with
111 /// fixed arguments) by wrapping them in a lambda or with std::bind.
112 template<class F, class Cond>
114 {
115  using retType = decltype(func());
116  //prepare environment
117  Reset();
119 
120  //fork max(nTimes, fNWorkers) times
121  unsigned oldNWorkers = GetNWorkers();
122  if (nTimes < oldNWorkers)
123  SetNWorkers(nTimes);
124  TPoolWorker<F> worker(func);
125  bool ok = Fork(worker);
126  SetNWorkers(oldNWorkers);
127  if (!ok)
128  {
129  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
130  return std::vector<retType>();
131  }
132 
133  //give out tasks
134  fNToProcess = nTimes;
135  std::vector<retType> reslist;
136  reslist.reserve(fNToProcess);
137  fNProcessed = Broadcast(PoolCode::kExecFunc, fNToProcess);
138 
139  //collect results, give out other tasks if needed
140  Collect(reslist);
141 
142  //clean-up and return
143  ReapWorkers();
145  return reslist;
146 }
147 
148 // tell doxygen to ignore this (\endcond closes the statement)
149 /// \cond
150 
151 // actual implementation of the Map method. all other calls with arguments eventually
152 // call this one
153 template<class F, class T, class Cond>
155 {
156  //check whether func is callable
157  using retType = decltype(func(args.front()));
158  //prepare environment
159  Reset();
161 
162  //fork max(args.size(), fNWorkers) times
163  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TPoolWorker moved its content away
164  unsigned oldNWorkers = GetNWorkers();
165  if (args.size() < oldNWorkers)
166  SetNWorkers(args.size());
167  TPoolWorker<F, T> worker(func, args);
168  bool ok = Fork(worker);
169  SetNWorkers(oldNWorkers);
170  if (!ok)
171  {
172  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
173  return std::vector<retType>();
174  }
175 
176  //give out tasks
177  fNToProcess = args.size();
178  std::vector<retType> reslist;
179  reslist.reserve(fNToProcess);
180  std::vector<unsigned> range(fNToProcess);
181  std::iota(range.begin(), range.end(), 0);
183 
184  //collect results, give out other tasks if needed
185  Collect(reslist);
186 
187  //clean-up and return
188  ReapWorkers();
190  return reslist;
191 }
192 
193 template<class F, class INTEGER, class Cond>
195 {
196  std::vector<INTEGER> vargs(args.size());
197  std::copy(args.begin(), args.end(), vargs.begin());
198  const auto &reslist = Map(func, vargs);
199  return reslist;
200 }
201 // tell doxygen to stop ignoring code
202 /// \endcond
203 
204 
205 template<class F>
206 auto TProcessExecutor::ProcTree(const std::vector<std::string>& fileNames, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
207 {
208  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
209  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
210 
211  //prepare environment
212  Reset();
213  unsigned nWorkers = GetNWorkers();
214 
215  //fork
216  TPoolProcessor<F> worker(procFunc, fileNames, treeName, nWorkers, nToProcess);
217  bool ok = Fork(worker);
218  if(!ok) {
219  Error("TProcessExecutor::ProcTree", "[E][C] Could not fork. Aborting operation.");
220  return nullptr;
221  }
222 
223  if(fileNames.size() < nWorkers) {
224  //TTree entry granularity. For each file, we divide entries equally between workers
226  //Tell workers to start processing entries
227  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
228  std::vector<unsigned> args(nWorkers);
229  std::iota(args.begin(), args.end(), 0);
231  if(fNProcessed < nWorkers)
232  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
233  } else {
234  //file granularity. each worker processes one whole file as a single task
236  fNToProcess = fileNames.size();
237  std::vector<unsigned> args(nWorkers);
238  std::iota(args.begin(), args.end(), 0);
240  if(fNProcessed < nWorkers)
241  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
242  }
243 
244  //collect results, distribute new tasks
245  std::vector<TObject*> reslist;
246  Collect(reslist);
247 
248  //merge
250  auto res = redfunc(reslist);
251 
252  //clean-up and return
253  ReapWorkers();
255  return static_cast<retType>(res);
256 }
257 
258 
259 template<class F>
260 auto TProcessExecutor::ProcTree(const std::string& fileName, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
261 {
262  std::vector<std::string> singleFileName(1, fileName);
263  return ProcTree(singleFileName, procFunc, treeName, nToProcess);
264 }
265 
266 
267 template<class F>
268 auto TProcessExecutor::ProcTree(TFileCollection& files, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
269 {
270  std::vector<std::string> fileNames(files.GetNFiles());
271  unsigned count = 0;
272  for(auto f : *static_cast<THashList*>(files.GetList()))
273  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
274 
275  return ProcTree(fileNames, procFunc, treeName, nToProcess);
276 }
277 
278 
279 template<class F>
280 auto TProcessExecutor::ProcTree(TChain& files, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
281 {
282  TObjArray* filelist = files.GetListOfFiles();
283  std::vector<std::string> fileNames(filelist->GetEntries());
284  unsigned count = 0;
285  for(auto f : *filelist)
286  fileNames[count++] = f->GetTitle();
287 
288  return ProcTree(fileNames, procFunc, treeName, nToProcess);
289 }
290 
291 
292 template<class F>
293 auto TProcessExecutor::ProcTree(TTree& tree, F procFunc, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
294 {
295  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
296  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
297 
298  //prepare environment
299  Reset();
300  unsigned nWorkers = GetNWorkers();
301 
302  //fork
303  TPoolProcessor<F> worker(procFunc, &tree, nWorkers, nToProcess);
304  bool ok = Fork(worker);
305  if(!ok) {
306  Error("TProcessExecutor::ProcTree", "[E][C] Could not fork. Aborting operation.");
307  return nullptr;
308  }
309 
310  //divide entries equally between workers
312 
313  //tell workers to start processing entries
314  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
315  std::vector<unsigned> args(nWorkers);
316  std::iota(args.begin(), args.end(), 0);
318  if(fNProcessed < nWorkers)
319  Error("TProcessExecutor::ProcTree", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
320 
321  //collect results, distribute new tasks
322  std::vector<TObject*> reslist;
323  Collect(reslist);
324 
325  //merge
327  auto res = redfunc(reslist);
328 
329  //clean-up and return
330  ReapWorkers();
332  return static_cast<retType>(res);
333 }
334 
335 //////////////////////////////////////////////////////////////////////////
336 /// Handle message and reply to the worker
337 template<class T>
338 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
339 {
340  unsigned code = msg.first;
341  if (code == PoolCode::kFuncResult) {
342  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
344  } else if (code == PoolCode::kIdling) {
345  ReplyToIdle(s);
346  } else if(code == PoolCode::kProcResult) {
347  if(msg.second != nullptr)
348  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
350  } else if(code == PoolCode::kProcError) {
351  const char *str = ReadBuffer<const char*>(msg.second.get());
352  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
353  "Continuing execution ignoring these entries.", str);
354  ReplyToIdle(s);
355  delete [] str;
356  } else {
357  // UNKNOWN CODE
358  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
359  }
360 }
361 
362 //////////////////////////////////////////////////////////////////////////
363 /// Listen for messages sent by the workers and call the appropriate handler function.
364 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
365 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
366 template<class T>
367 void TProcessExecutor::Collect(std::vector<T> &reslist)
368 {
369  TMonitor &mon = GetMonitor();
370  mon.ActivateAll();
371  while (mon.GetActive() > 0) {
372  TSocket *s = mon.Select();
373  MPCodeBufPair msg = MPRecv(s);
374  if (msg.first == MPCode::kRecvError) {
375  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
376  Remove(s);
377  } else if (msg.first < 1000)
378  HandlePoolCode(msg, s, reslist);
379  else
380  HandleMPCode(msg, s);
381  }
382 }
383 
384 } // ROOT namespace
385 
386 #endif
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
iterator begin() const
Definition: TSeq.hxx:133
The message contains the result of a function execution.
Definition: PoolUtils.h:33
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
An array of TObjects.
Definition: TObjArray.h:39
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:99
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:40
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:39
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
Definition: StringConv.hxx:21
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
auto Reduce(const std::vector< T > &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))=delete
Error while reading from the socket.
Definition: MPCode.h:34
unsigned fNProcessed
number of arguments already passed to the workers
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:38
We are ready for the next task.
Definition: PoolUtils.h:35
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:263
a Map method with arguments is being executed
Merge collection of TObjects.
Definition: PoolUtils.h:63
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
A doubly linked list.
Definition: TList.h:47
#define F(x, y, z)
unsigned GetNWorkers() const
Execute function with the argument contained in the message.
Definition: PoolUtils.h:32
a ProcTree method is being executed and each worker will process a certain range of each file ...
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
The message contains the result of the processing of a TTree.
Definition: PoolUtils.h:42
no task is being executed
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
Execute function without arguments.
Definition: PoolUtils.h:31
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
Tell the client there was an error while processing.
Definition: PoolUtils.h:44
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
double f(double x)
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
unsigned fNToProcess
total number of arguments to pass to the workers
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
This class works together with TProcessExecutor to allow the execution of functions in server process...
Definition: TPoolWorker.h:77
double func(double *x, double *p)
Definition: stressTF1.cxx:213
std::size_t size() const
Definition: TSeq.hxx:150
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
void Reset()
Reset TProcessExecutor&#39;s state.
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:494
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:297
a ProcTree method is being executed and each worker will process a different file ...
Class that contains a list of TFileInfo&#39;s and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
iterator end() const
Definition: TSeq.hxx:136
Definition: tree.py:1
A TTree object has a header with a name and a title.
Definition: TTree.h:98
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:216
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition: TSelector.h:39
ETask
A collection of the types of tasks that TProcessExecutor can execute.
const Int_t n
Definition: legend1.C:16
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:276
TMonitor & GetMonitor()
Definition: TMPClient.h:36
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.