Logo ROOT   6.10/09
Reference Guide
TProcessExecutor.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "TEnv.h"
15 
16 //////////////////////////////////////////////////////////////////////////
17 ///
18 /// \class ROOT::TProcessExecutor
19 /// \brief This class provides a simple interface to execute the same task
20 /// multiple times in parallel, possibly with different arguments every
21 /// time. This mimics the behaviour of python's pool.Map method.
22 ///
23 /// ###ROOT::TProcessExecutor::Map
24 /// This class inherits its interfaces from ROOT::TExecutor\n.
25 /// The two possible usages of the Map method are:\n
26 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
27 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
28 ///
29 /// For either signature, func is executed as many times as needed by a pool of
30 /// fNWorkers workers; the number of workers can be passed to the constructor
31 /// or set via SetNWorkers. It defaults to the number of cores.\n
32 /// A collection containing the result of each execution is returned.\n
33 /// **Note:** the user is responsible for the deletion of any object that might
34 /// be created upon execution of func, returned objects included: ROOT::TProcessExecutor never
35 /// deletes what it returns, it simply forgets it.\n
36 /// **Note:** that the usage of ROOT::TProcessExecutor::Map is indicated only when the task to be
37 /// executed takes more than a few seconds, otherwise the overhead introduced
38 /// by Map will outrun the benefits of parallel execution on most machines.
39 ///
40 /// \param func
41 /// \parblock
42 /// a lambda expression, an std::function, a loaded macro, a
43 /// functor class or a function that takes zero arguments (for the first signature)
44 /// or one (for the second signature).
45 /// \endparblock
46 /// \param args
47 /// \parblock
48 /// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
49 /// An integer only for the first.
50 /// \endparblock
51 /// **Note:** in cases where the function to be executed takes more than
52 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
53 /// in a lambda or via std::bind to give it the right signature.\n
54 /// **Note:** the user should take care of initializing random seeds differently in each
55 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
56 /// might generate the same sequence of pseudo-random numbers.
57 ///
58 /// #### Return value:
59 /// An std::vector. The elements in the container
60 /// will be the objects returned by func.
61 ///
62 ///
63 /// #### Examples:
64 ///
65 /// ~~~{.cpp}
66 /// root[] ROOT::TProcessExecutor pool; auto hists = pool.Map(CreateHisto, 10);
67 /// root[] ROOT::TProcessExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
68 /// ~~~
69 ///
70 /// ###ROOT::TProcessExecutor::MapReduce
71 /// This set of methods behaves exactly like Map, but takes an additional
72 /// function as a third argument. This function is applied to the set of
73 /// objects returned by the corresponding Map execution to "squash" them
74 /// to a single object.
75 ///
76 /// ####Examples:
77 /// ~~~{.cpp}
78 /// root[] ROOT::TProcessExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
79 /// root[] ROOT::TProcessExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
80 /// ~~~
81 ///
82 //////////////////////////////////////////////////////////////////////////
83 
84 namespace ROOT {
85 //////////////////////////////////////////////////////////////////////////
86 /// Class constructor.
87 /// nWorkers is the number of times this ROOT session will be forked, i.e.
88 /// the number of workers that will be spawned.
89 TProcessExecutor::TProcessExecutor(unsigned nWorkers) : TMPClient(nWorkers)
90 {
91  Reset();
92 }
93 
94 //////////////////////////////////////////////////////////////////////////
95 /// Reset TProcessExecutor's state.
97 {
98  fNProcessed = 0;
99  fNToProcess = 0;
101 }
102 
103 //////////////////////////////////////////////////////////////////////////
104 /// Reply to a worker who just sent a result.
105 /// If another argument to process exists, tell the worker. Otherwise
106 /// send a shutdown order.
108 {
109  if (fNProcessed < fNToProcess) {
110  //this cannot be a "greedy worker" task
111  if (fTaskType == ETask::kMap)
113  else if (fTaskType == ETask::kMapWithArg)
115  ++fNProcessed;
116  } else //whatever the task is, we are done
118 }
119 
120 
121 //////////////////////////////////////////////////////////////////////////
122 /// Reply to a worker who is idle.
123 /// If another argument to process exists, tell the worker. Otherwise
124 /// ask for a result
126 {
127  if (fNProcessed < fNToProcess) {
128  //we are executing a "greedy worker" task
131  else if (fTaskType == ETask::kMap)
133  ++fNProcessed;
134  } else
136 }
137 
138 } // namespace ROOT
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
unsigned fNProcessed
number of arguments already passed to the workers
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
void Reset()
Reset TProcessExecutor&#39;s state.
Execute function with the argument contained in the message.
Definition: MPCode.h:32
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
unsigned fNToProcess
total number of arguments to pass to the workers
Base class for multiprocess applications&#39; clients.
Definition: TMPClient.h:23
Execute function without arguments.
Definition: MPCode.h:31