Logo ROOT   6.12/07
Reference Guide
TProcessExecutor.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "TEnv.h"
15 
16 //////////////////////////////////////////////////////////////////////////
17 ///
18 /// \class ROOT::TProcessExecutor
19 /// \ingroup Parallelism
20 /// \brief This class provides a simple interface to execute the same task
21 /// multiple times in parallel, possibly with different arguments every
22 /// time. This mimics the behaviour of python's pool.Map method.
23 ///
24 /// ###ROOT::TProcessExecutor::Map
25 /// This class inherits its interfaces from ROOT::TExecutor\n.
26 /// The two possible usages of the Map method are:\n
27 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
28 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
29 ///
30 /// For either signature, func is executed as many times as needed by a pool of
31 /// fNWorkers workers; the number of workers can be passed to the constructor
32 /// or set via SetNWorkers. It defaults to the number of cores.\n
33 /// A collection containing the result of each execution is returned.\n
34 /// **Note:** the user is responsible for the deletion of any object that might
35 /// be created upon execution of func, returned objects included: ROOT::TProcessExecutor never
36 /// deletes what it returns, it simply forgets it.\n
37 /// **Note:** that the usage of ROOT::TProcessExecutor::Map is indicated only when the task to be
38 /// executed takes more than a few seconds, otherwise the overhead introduced
39 /// by Map will outrun the benefits of parallel execution on most machines.
40 ///
41 /// \param func
42 /// \parblock
43 /// a lambda expression, an std::function, a loaded macro, a
44 /// functor class or a function that takes zero arguments (for the first signature)
45 /// or one (for the second signature).
46 /// \endparblock
47 /// \param args
48 /// \parblock
49 /// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
50 /// An integer only for the first.
51 /// \endparblock
52 /// **Note:** in cases where the function to be executed takes more than
53 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
54 /// in a lambda or via std::bind to give it the right signature.\n
55 /// **Note:** the user should take care of initializing random seeds differently in each
56 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
57 /// might generate the same sequence of pseudo-random numbers.
58 ///
59 /// #### Return value:
60 /// An std::vector. The elements in the container
61 /// will be the objects returned by func.
62 ///
63 ///
64 /// #### Examples:
65 ///
66 /// ~~~{.cpp}
67 /// root[] ROOT::TProcessExecutor pool; auto hists = pool.Map(CreateHisto, 10);
68 /// root[] ROOT::TProcessExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
69 /// ~~~
70 ///
71 /// ###ROOT::TProcessExecutor::MapReduce
72 /// This set of methods behaves exactly like Map, but takes an additional
73 /// function as a third argument. This function is applied to the set of
74 /// objects returned by the corresponding Map execution to "squash" them
75 /// to a single object.
76 ///
77 /// ####Examples:
78 /// ~~~{.cpp}
79 /// root[] ROOT::TProcessExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
80 /// root[] ROOT::TProcessExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
81 /// ~~~
82 ///
83 //////////////////////////////////////////////////////////////////////////
84 
85 namespace ROOT {
86 //////////////////////////////////////////////////////////////////////////
87 /// Class constructor.
88 /// nWorkers is the number of times this ROOT session will be forked, i.e.
89 /// the number of workers that will be spawned.
90 TProcessExecutor::TProcessExecutor(unsigned nWorkers) : TMPClient(nWorkers)
91 {
92  Reset();
93 }
94 
95 //////////////////////////////////////////////////////////////////////////
96 /// Reset TProcessExecutor's state.
98 {
99  fNProcessed = 0;
100  fNToProcess = 0;
102 }
103 
104 //////////////////////////////////////////////////////////////////////////
105 /// Reply to a worker who just sent a result.
106 /// If another argument to process exists, tell the worker. Otherwise
107 /// send a shutdown order.
109 {
110  if (fNProcessed < fNToProcess) {
111  //this cannot be a "greedy worker" task
112  if (fTaskType == ETask::kMap)
114  else if (fTaskType == ETask::kMapWithArg)
116  ++fNProcessed;
117  } else //whatever the task is, we are done
119 }
120 
121 
122 //////////////////////////////////////////////////////////////////////////
123 /// Reply to a worker who is idle.
124 /// If another argument to process exists, tell the worker. Otherwise
125 /// ask for a result
127 {
128  if (fNProcessed < fNToProcess) {
129  //we are executing a "greedy worker" task
132  else if (fTaskType == ETask::kMapRed)
134  ++fNProcessed;
135  } else
137 }
138 
139 } // namespace ROOT
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
unsigned fNProcessed
number of arguments already passed to the workers
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
void Reset()
Reset TProcessExecutor&#39;s state.
a MapReduce method with arguments is being executed
Execute function with the argument contained in the message.
Definition: MPCode.h:32
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
unsigned fNToProcess
total number of arguments to pass to the workers
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
static constexpr double s
a MapReduce method with no arguments is being executed
Execute function without arguments.
Definition: MPCode.h:31