ROOT  6.06/09
Reference Guide
TProcPool.h
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TProcPool
13 #define ROOT_TProcPool
14 
15 #include "TMPClient.h"
16 #include "MPSendRecv.h"
17 #include "TCollection.h"
18 #include "TPoolWorker.h"
19 #include "TObjArray.h"
20 #include "PoolUtils.h"
21 #include "MPCode.h"
22 #include "TPoolProcessor.h"
23 #include "TTreeReader.h"
24 #include "TFileCollection.h"
25 #include "TChain.h"
26 #include "TChainElement.h"
27 #include "THashList.h"
28 #include "TFileInfo.h"
29 #include <vector>
30 #include <string>
31 #include <initializer_list>
32 #include <type_traits> //std::result_of, std::enable_if
33 #include <numeric> //std::iota
34 #include <algorithm> //std::generate
35 #include <functional> //std::reference_wrapper
36 #include <iostream>
37 
38 class TProcPool : private TMPClient {
39 public:
40  explicit TProcPool(unsigned nWorkers = 0); //default number of workers is the number of processors
42  //it doesn't make sense for a TProcPool to be copied
43  TProcPool(const TProcPool &) = delete;
44  TProcPool &operator=(const TProcPool &) = delete;
45 
46  // Map
47  //these late return types allow for a compile-time check of compatibility between function signatures and args,
48  //and a compile-time check that the argument list implements a front() method (all STL sequence containers have it)
49  template<class F> auto Map(F func, unsigned nTimes) -> std::vector<decltype(func())>;
50  template<class F, class T> auto Map(F func, T &args) -> std::vector < decltype(++(args.begin()), args.end(), func(args.front())) >;
51  /// \cond doxygen should ignore these methods
52  template<class F> TObjArray Map(F func, TCollection &args);
53  template<class F, class T> auto Map(F func, std::initializer_list<T> args) -> std::vector<decltype(func(*args.begin()))>;
54  template<class F, class T> auto Map(F func, std::vector<T> &args) -> std::vector<decltype(func(args.front()))>;
55  /// \endcond
56 
57  // MapReduce
58  // the late return types also check at compile-time whether redfunc is compatible with func,
59  // other than checking that func is compatible with the type of arguments.
60  // a static_assert check in TProcPool::Reduce is used to check that redfunc is compatible with the type returned by func
61  template<class F, class R> auto MapReduce(F func, unsigned nTimes, R redfunc) -> decltype(func());
62  template<class F, class T, class R> auto MapReduce(F func, T &args, R redfunc) -> decltype(++(args.begin()), args.end(), func(args.front()));
63  /// \cond doxygen should ignore these methods
64  template<class F, class R> auto MapReduce(F func, TCollection &args, R redfunc) -> decltype(func(nullptr));
65  template<class F, class T, class R> auto MapReduce(F func, std::initializer_list<T> args, R redfunc) -> decltype(func(*args.begin()));
66  template<class F, class T, class R> auto MapReduce(F func, std::vector<T> &args, R redfunc) -> decltype(func(args.front()));
67  /// \endcond
68 
69  // ProcTree
70  // this version requires that procFunc returns a ptr to TObject or inheriting classes and takes a TTreeReader& (both enforced at compile-time)
71  template<class F> auto ProcTree(const std::vector<std::string>& fileNames, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
72  template<class F> auto ProcTree(const std::string& fileName, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
73  template<class F> auto ProcTree(TFileCollection& files, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
74  template<class F> auto ProcTree(TChain& files, F procFunc, const std::string& treeName = "", ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
75  template<class F> auto ProcTree(TTree& tree, F procFunc, ULong64_t nToProcess = 0) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
76 
77  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
78  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
79 
80 private:
81  template<class T> void Collect(std::vector<T> &reslist);
82  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
83 
84  void Reset();
85  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
86  void ReplyToFuncResult(TSocket *s);
87  void ReplyToIdle(TSocket *s);
88 
89  unsigned fNProcessed; ///< number of arguments already passed to the workers
90  unsigned fNToProcess; ///< total number of arguments to pass to the workers
91 
92  /// A collection of the types of tasks that TProcPool can execute.
93  /// It is used to interpret in the right way and properly reply to the
94  /// messages received (see, for example, TProcPool::HandleInput)
95  enum class ETask : unsigned {
96  kNoTask = 0, ///< no task is being executed
97  kMap, ///< a Map method with no arguments is being executed
98  kMapWithArg, ///< a Map method with arguments is being executed
99  kMapRed, ///< a MapReduce method with no arguments is being executed
100  kMapRedWithArg, ///< a MapReduce method with arguments is being executed
101  kProcByRange, ///< a ProcTree method is being executed and each worker will process a certain range of each file
102  kProcByFile, ///< a ProcTree method is being executed and each worker will process a different file
103  } fTask; ///< the kind of task that is being executed, if any
104 };
105 
106 
107 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
108 
109 //////////////////////////////////////////////////////////////////////////
110 /// Execute func (with no arguments) nTimes in parallel.
111 /// A vector containg executions' results is returned.
112 /// Functions that take more than zero arguments can be executed (with
113 /// fixed arguments) by wrapping them in a lambda or with std::bind.
114 template<class F>
115 auto TProcPool::Map(F func, unsigned nTimes) -> std::vector<decltype(func())>
116 {
117  using retType = decltype(func());
118  //prepare environment
119  Reset();
120  fTask = ETask::kMap;
121 
122  //fork max(nTimes, fNWorkers) times
123  unsigned oldNWorkers = GetNWorkers();
124  if (nTimes < oldNWorkers)
125  SetNWorkers(nTimes);
126  TPoolWorker<F> worker(func);
127  bool ok = Fork(worker);
128  SetNWorkers(oldNWorkers);
129  if (!ok)
130  {
131  std::cerr << "[E][C] Could not fork. Aborting operation\n";
132  return std::vector<retType>();
133  }
134 
135  //give out tasks
136  fNToProcess = nTimes;
137  std::vector<retType> reslist;
138  reslist.reserve(fNToProcess);
139  fNProcessed = Broadcast(PoolCode::kExecFunc, fNToProcess);
140 
141  //collect results, give out other tasks if needed
142  Collect(reslist);
143 
144  //clean-up and return
145  ReapWorkers();
146  fTask = ETask::kNoTask;
147  return reslist;
148 }
149 
150 
151 //////////////////////////////////////////////////////////////////////////
152 /// Execute func in parallel distributing the elements of the args collection between the workers.
153 /// See class description for the valid types of collections and containers that can be used.
154 /// A vector containing each execution's result is returned. The user is responsible of deleting
155 /// objects that might be created upon the execution of func, returned objects included.
156 /// **Note:** the collection of arguments is modified by Map and should be considered empty or otherwise
157 /// invalidated after Map's execution (std::move might be applied to it).
158 template<class F, class T>
159 auto TProcPool::Map(F func, T &args) -> std::vector < decltype(++(args.begin()), args.end(), func(args.front())) >
160 {
161  std::vector<typename T::value_type> vargs(
162  std::make_move_iterator(std::begin(args)),
163  std::make_move_iterator(std::end(args))
164  );
165  const auto &reslist = Map(func, vargs);
166  return reslist;
167 }
168 
169 
170 // tell doxygen to ignore this (\endcond closes the statement)
171 /// \cond
172 template<class F>
174 {
175  // check the function returns something from which we can build a TObject*
176  static_assert(std::is_constructible<TObject *, typename std::result_of<F(TObject *)>::type>::value,
177  "func should return a pointer to TObject or derived classes");
178 
179  //build vector with same elements as args
180  std::vector<TObject *> vargs(args.GetSize());
181  auto it = vargs.begin();
182  for (auto o : args) {
183  *it = o;
184  ++it;
185  }
186 
187  //call Map
188  const auto &reslist = Map(func, vargs);
189 
190  //build TObjArray with same elements as reslist
191  TObjArray resarray;
192  for (const auto &res : reslist)
193  resarray.Add(res);
194  return resarray;
195 }
196 
197 
198 template<class F, class T>
199 auto TProcPool::Map(F func, std::initializer_list<T> args) -> std::vector<decltype(func(*args.begin()))>
200 {
201  std::vector<T> vargs(std::move(args));
202  const auto &reslist = Map(func, vargs);
203  return reslist;
204 }
205 
206 
207 // actual implementation of the Map method. all other calls with arguments eventually
208 // call this one
209 template<class F, class T>
210 auto TProcPool::Map(F func, std::vector<T> &args) -> std::vector<decltype(func(args.front()))>
211 {
212  //check whether func is callable
213  using retType = decltype(func(args.front()));
214  //prepare environment
215  Reset();
216  fTask = ETask::kMapWithArg;
217 
218  //fork max(args.size(), fNWorkers) times
219  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TPoolWorker moved its content away
220  unsigned oldNWorkers = GetNWorkers();
221  if (args.size() < oldNWorkers)
222  SetNWorkers(args.size());
223  TPoolWorker<F, T> worker(func, args);
224  bool ok = Fork(worker);
225  SetNWorkers(oldNWorkers);
226  if (!ok)
227  {
228  std::cerr << "[E][C] Could not fork. Aborting operation\n";
229  return std::vector<retType>();
230  }
231 
232  //give out tasks
233  fNToProcess = args.size();
234  std::vector<retType> reslist;
235  reslist.reserve(fNToProcess);
236  std::vector<unsigned> range(fNToProcess);
237  std::iota(range.begin(), range.end(), 0);
238  fNProcessed = Broadcast(PoolCode::kExecFuncWithArg, range);
239 
240  //collect results, give out other tasks if needed
241  Collect(reslist);
242 
243  //clean-up and return
244  ReapWorkers();
245  fTask = ETask::kNoTask;
246  return reslist;
247 }
248 // tell doxygen to stop ignoring code
249 /// \endcond
250 
251 
252 //////////////////////////////////////////////////////////////////////////
253 /// This method behaves just like Map, but an additional redfunc function
254 /// must be provided. redfunc is applied to the vector Map would return and
255 /// must return the same type as func. In practice, redfunc can be used to
256 /// "squash" the vector returned by Map into a single object by merging,
257 /// adding, mixing the elements of the vector.
258 template<class F, class R>
259 auto TProcPool::MapReduce(F func, unsigned nTimes, R redfunc) -> decltype(func())
260 {
261  using retType = decltype(func());
262  //prepare environment
263  Reset();
264  fTask = ETask::kMapRed;
265 
266  //fork max(nTimes, fNWorkers) times
267  unsigned oldNWorkers = GetNWorkers();
268  if (nTimes < oldNWorkers)
269  SetNWorkers(nTimes);
270  TPoolWorker<F, void, R> worker(func, redfunc);
271  bool ok = Fork(worker);
272  SetNWorkers(oldNWorkers);
273  if (!ok) {
274  std::cerr << "[E][C] Could not fork. Aborting operation\n";
275  return retType();
276  }
277 
278  //give workers their first task
279  fNToProcess = nTimes;
280  std::vector<retType> reslist;
281  reslist.reserve(fNToProcess);
282  fNProcessed = Broadcast(PoolCode::kExecFunc, fNToProcess);
283 
284  //collect results/give workers their next task
285  Collect(reslist);
286 
287  //clean-up and return
288  ReapWorkers();
289  fTask = ETask::kNoTask;
290  return redfunc(reslist);
291 }
292 
293 //////////////////////////////////////////////////////////////////////////
294 /// This method behaves just like Map, but an additional redfunc function
295 /// must be provided. redfunc is applied to the vector Map would return and
296 /// must return the same type as func. In practice, redfunc can be used to
297 /// "squash" the vector returned by Map into a single object by merging,
298 /// adding, mixing the elements of the vector.
299 template<class F, class T, class R>
300 auto TProcPool::MapReduce(F func, T &args, R redfunc) -> decltype(++(args.begin()), args.end(), func(args.front()))
301 {
302  std::vector<typename T::value_type> vargs(
303  std::make_move_iterator(std::begin(args)),
304  std::make_move_iterator(std::end(args))
305  );
306  const auto &reslist = MapReduce(func, vargs, redfunc);
307  return reslist;
308 }
309 
310 /// \cond doxygen should ignore these methods
311 template<class F, class R>
312 auto TProcPool::MapReduce(F func, TCollection &args, R redfunc) -> decltype(func(nullptr))
313 {
314  //build vector with same elements as args
315  std::vector<TObject *> vargs(args.GetSize());
316  auto it = vargs.begin();
317  for (auto o : args) {
318  *it = o;
319  ++it;
320  }
321 
322  //call MapReduce
323  auto res = MapReduce(func, vargs, redfunc); //TODO useless copying by value here, but it looks like the return type of this MapReduce is a reference otherwise
324 
325  return res;
326 }
327 
328 
329 template<class F, class T, class R>
330 auto TProcPool::MapReduce(F func, std::initializer_list<T> args, R redfunc) -> decltype(func(*args.begin()))
331 {
332  std::vector<T> vargs(std::move(args));
333  const auto &reslist = MapReduce(func, vargs, redfunc);
334  return reslist;
335 }
336 
337 
338 template<class F, class T, class R>
339 auto TProcPool::MapReduce(F func, std::vector<T> &args, R redfunc) -> decltype(func(args.front()))
340 {
341  using retType = decltype(func(args.front()));
342  //prepare environment
343  Reset();
344  fTask = ETask::kMapRedWithArg;
345 
346  //fork max(args.size(), fNWorkers) times
347  unsigned oldNWorkers = GetNWorkers();
348  if (args.size() < oldNWorkers)
349  SetNWorkers(args.size());
350  TPoolWorker<F, T, R> worker(func, args, redfunc);
351  bool ok = Fork(worker);
352  SetNWorkers(oldNWorkers);
353  if (!ok) {
354  std::cerr << "[E][C] Could not fork. Aborting operation\n";
355  return retType();
356  }
357 
358  //give workers their first task
359  fNToProcess = args.size();
360  std::vector<retType> reslist;
361  reslist.reserve(fNToProcess);
362  std::vector<unsigned> range(fNToProcess);
363  std::iota(range.begin(), range.end(), 0);
364  fNProcessed = Broadcast(PoolCode::kExecFuncWithArg, range);
365 
366  //collect results/give workers their next task
367  Collect(reslist);
368 
369  ReapWorkers();
370  fTask = ETask::kNoTask;
371  return redfunc(reslist);
372 }
373 /// \endcond
374 
375 
376 template<class F>
377 auto TProcPool::ProcTree(const std::vector<std::string>& fileNames, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
378 {
379  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
380  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
381 
382  //prepare environment
383  Reset();
384  unsigned nWorkers = GetNWorkers();
385 
386  //fork
387  TPoolProcessor<F> worker(procFunc, fileNames, treeName, nWorkers, nToProcess);
388  bool ok = Fork(worker);
389  if(!ok) {
390  std::cerr << "[E][C] Could not fork. Aborting operation\n";
391  return nullptr;
392  }
393 
394  if(fileNames.size() < nWorkers) {
395  //TTree entry granularity. For each file, we divide entries equally between workers
396  fTask = ETask::kProcByRange;
397  //Tell workers to start processing entries
398  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
399  std::vector<unsigned> args(nWorkers);
400  std::iota(args.begin(), args.end(), 0);
401  fNProcessed = Broadcast(PoolCode::kProcRange, args);
402  if(fNProcessed < nWorkers)
403  std::cerr << "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.\n";
404  } else {
405  //file granularity. each worker processes one whole file as a single task
406  fTask = ETask::kProcByFile;
407  fNToProcess = fileNames.size();
408  std::vector<unsigned> args(nWorkers);
409  std::iota(args.begin(), args.end(), 0);
410  fNProcessed = Broadcast(PoolCode::kProcFile, args);
411  if(fNProcessed < nWorkers)
412  std::cerr << "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.\n";
413  }
414 
415  //collect results, distribute new tasks
416  std::vector<TObject*> reslist;
417  Collect(reslist);
418 
419  //merge
420  TObject* res = PoolUtils::ReduceObjects(reslist);
421 
422  //clean-up and return
423  ReapWorkers();
424  fTask = ETask::kNoTask;
425  return static_cast<retType>(res);
426 }
427 
428 
429 template<class F>
430 auto TProcPool::ProcTree(const std::string& fileName, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
431 {
432  std::vector<std::string> singleFileName(1, fileName);
433  return ProcTree(singleFileName, procFunc, treeName, nToProcess);
434 }
435 
436 
437 template<class F>
438 auto TProcPool::ProcTree(TFileCollection& files, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
439 {
440  std::vector<std::string> fileNames(files.GetNFiles());
441  unsigned count = 0;
442  for(auto f : *static_cast<THashList*>(files.GetList()))
443  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetFile();
444 
445  return ProcTree(fileNames, procFunc, treeName, nToProcess);
446 }
447 
448 
449 template<class F>
450 auto TProcPool::ProcTree(TChain& files, F procFunc, const std::string& treeName, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
451 {
452  TObjArray* filelist = files.GetListOfFiles();
453  std::vector<std::string> fileNames(filelist->GetEntries());
454  unsigned count = 0;
455  for(auto f : *filelist)
456  fileNames[count++] = f->GetTitle();
457 
458  return ProcTree(fileNames, procFunc, treeName, nToProcess);
459 }
460 
461 
462 template<class F>
463 auto TProcPool::ProcTree(TTree& tree, F procFunc, ULong64_t nToProcess) -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
464 {
465  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
466  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
467 
468  //prepare environment
469  Reset();
470  unsigned nWorkers = GetNWorkers();
471 
472  //fork
473  TPoolProcessor<F> worker(procFunc, &tree, nWorkers, nToProcess);
474  bool ok = Fork(worker);
475  if(!ok) {
476  std::cerr << "[E][C] Could not fork. Aborting operation\n";
477  return nullptr;
478  }
479 
480  //divide entries equally between workers
481  fTask = ETask::kProcByRange;
482 
483  //tell workers to start processing entries
484  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
485  std::vector<unsigned> args(nWorkers);
486  std::iota(args.begin(), args.end(), 0);
487  fNProcessed = Broadcast(PoolCode::kProcTree, args);
488  if(fNProcessed < nWorkers)
489  std::cerr << "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.\n";
490 
491  //collect results, distribute new tasks
492  std::vector<TObject*> reslist;
493  Collect(reslist);
494 
495  //merge
496  TObject* res = PoolUtils::ReduceObjects(reslist);
497 
498  //clean-up and return
499  ReapWorkers();
500  fTask = ETask::kNoTask;
501  return static_cast<retType>(res);
502 }
503 
504 //////////////////////////////////////////////////////////////////////////
505 /// Listen for messages sent by the workers and call the appropriate handler function.
506 /// TProcPool::HandlePoolCode is called on messages with a code < 1000 and
507 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
508 template<class T>
509 void TProcPool::Collect(std::vector<T> &reslist)
510 {
511  TMonitor &mon = GetMonitor();
512  mon.ActivateAll();
513  while (mon.GetActive() > 0) {
514  TSocket *s = mon.Select();
515  MPCodeBufPair msg = MPRecv(s);
516  if (msg.first == MPCode::kRecvError) {
517  std::cerr << "[E][C] Lost connection to a worker\n";
518  Remove(s);
519  } else if (msg.first < 1000)
520  HandlePoolCode(msg, s, reslist);
521  else
522  HandleMPCode(msg, s);
523  }
524 }
525 
526 
527 //////////////////////////////////////////////////////////////////////////
528 /// Handle message and reply to the worker
529 template<class T>
530 void TProcPool::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
531 {
532  unsigned code = msg.first;
533  if (code == PoolCode::kFuncResult) {
534  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
536  } else if (code == PoolCode::kIdling) {
537  ReplyToIdle(s);
538  } else if(code == PoolCode::kProcResult) {
539  if(msg.second != nullptr)
540  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
542  } else if(code == PoolCode::kProcError) {
543  const char *str = ReadBuffer<const char*>(msg.second.get());
544  std::cerr << "[E][C] a worker encountered an error: " << str << "\n"
545  << "Continuing execution ignoring these entries.\n";
546  ReplyToIdle(s);
547  delete [] str;
548  } else {
549  // UNKNOWN CODE
550  std::cerr << "[W][C] unknown code received from server. code=" << code << "\n";
551  }
552 }
553 
554 /// Check that redfunc has the right signature and call it on objs
555 template<class T, class R>
556 T TProcPool::Reduce(const std::vector<T> &objs, R redfunc)
557 {
558  // check we can apply reduce to objs
559  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
560 
561  return redfunc(objs);
562 }
563 
564 #endif
a MapReduce method with no arguments is being executed
The message contains the result of a function execution.
Definition: PoolUtils.h:31
~TProcPool()
Definition: TProcPool.h:41
An array of TObjects.
Definition: TObjArray.h:39
TObject * ReduceObjects(const std::vector< TObject * > &objs)
Merge collection of TObjects.
Definition: PoolUtils.cxx:11
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Definition: PoolUtils.h:38
T Reduce(const std::vector< T > &objs, R redfunc)
Check that redfunc has the right signature and call it on objs.
Definition: TProcPool.h:556
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
Definition: PoolUtils.h:37
double T(double x)
Definition: ChebyshevPol.h:34
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Definition: TProcPool.cxx:117
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
Definition: TProcPool.h:509
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
a MapReduce method with arguments is being executed
ETask
A collection of the types of tasks that TProcPool can execute.
Definition: TProcPool.h:95
auto Map(F func, unsigned nTimes) -> std::vector< decltype(func())>
Execute func (with no arguments) nTimes in parallel.
Definition: TProcPool.h:115
Error while reading from the socket.
Definition: MPCode.h:34
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
Definition: PoolUtils.h:36
We are ready for the next task.
Definition: PoolUtils.h:33
no task is being executed
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:239
unsigned GetNWorkers() const
Definition: TProcPool.h:78
enum TProcPool::ETask fTask
the kind of task that is being executed, if any
a ProcTree method is being executed and each worker will process a certain range of each file ...
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
Definition: TProcPool.h:377
unsigned fNToProcess
total number of arguments to pass to the workers
Definition: TProcPool.h:90
This class provides a simple interface to execute the same task multiple times in parallel...
Definition: TProcPool.h:38
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TProcPool(unsigned nWorkers=0)
Class constructor.
Definition: TProcPool.cxx:79
#define F(x, y, z)
Collection abstract base class.
Definition: TCollection.h:48
TProcPool & operator=(const TProcPool &)=delete
Execute function with the argument contained in the message.
Definition: PoolUtils.h:30
void Reset(Detail::TBranchProxy *x)
The message contains the result of the processing of a TTree.
Definition: PoolUtils.h:39
void SetNWorkers(unsigned n)
Definition: TProcPool.h:77
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:20
Execute function without arguments.
Definition: PoolUtils.h:29
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
Tell the client there was an error while processing.
Definition: PoolUtils.h:41
virtual Int_t GetSize() const
Definition: TCollection.h:95
double f(double x)
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
int type
Definition: TGX11.cxx:120
unsigned long long ULong64_t
Definition: RtypesCore.h:70
This class works together with TProcPool to allow the execution of functions in server processes...
Definition: TPoolWorker.h:40
double func(double *x, double *p)
Definition: stressTF1.cxx:213
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:493
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:273
a ProcTree method is being executed and each worker will process a different file ...
Mother of all ROOT objects.
Definition: TObject.h:58
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
Definition: TProcPool.cxx:99
a Map method with no arguments is being executed
auto MapReduce(F func, unsigned nTimes, R redfunc) -> decltype(func())
This method behaves just like Map, but an additional redfunc function must be provided.
Definition: TProcPool.h:259
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
Definition: TProcPool.h:530
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
Definition: TChain.h:35
void Add(TObject *obj)
Definition: TObjArray.h:75
A TTree object has a header with a name and a title.
Definition: TTree.h:94
a Map method with arguments is being executed
Class describing a generic file including meta information.
Definition: TFileInfo.h:50
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
float value
Definition: math.cpp:443
const Int_t n
Definition: legend1.C:16
TMonitor & GetMonitor()
Definition: TMPClient.h:36
TRandom3 R
a TMatrixD.
Definition: testIO.cxx:28
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:43
void Reset()
Reset TProcPool's state.
Definition: TProcPool.cxx:87
unsigned fNProcessed
number of arguments already passed to the workers
Definition: TProcPool.h:89