Logo ROOT  
Reference Guide
TProcessExecutor.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include "TEnv.h"
15
16//////////////////////////////////////////////////////////////////////////
17///
18/// \class ROOT::TProcessExecutor
19/// \ingroup Parallelism
20/// \brief This class provides a simple interface to execute the same task
21/// multiple times in parallel, possibly with different arguments every
22/// time. This mimics the behaviour of python's pool.Map method.
23///
24/// ###ROOT::TProcessExecutor::Map
25/// This class inherits its interfaces from ROOT::TExecutor\n.
26/// The two possible usages of the Map method are:\n
27/// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
28/// * Map(F func, T& args): func is executed on each element of the collection of arguments args
29///
30/// For either signature, func is executed as many times as needed by a pool of
31/// fNWorkers workers; the number of workers can be passed to the constructor
32/// or set via SetNWorkers. It defaults to the number of cores.\n
33/// A collection containing the result of each execution is returned.\n
34/// **Note:** the user is responsible for the deletion of any object that might
35/// be created upon execution of func, returned objects included: ROOT::TProcessExecutor never
36/// deletes what it returns, it simply forgets it.\n
37/// **Note:** that the usage of ROOT::TProcessExecutor::Map is indicated only when the task to be
38/// executed takes more than a few seconds, otherwise the overhead introduced
39/// by Map will outrun the benefits of parallel execution on most machines.
40///
41/// \param func
42/// \parblock
43/// a lambda expression, an std::function, a loaded macro, a
44/// functor class or a function that takes zero arguments (for the first signature)
45/// or one (for the second signature).
46/// \endparblock
47/// \param args
48/// \parblock
49/// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
50/// An integer only for the first.
51/// \endparblock
52/// **Note:** in cases where the function to be executed takes more than
53/// zero/one argument but all are fixed except zero/one, the function can be wrapped
54/// in a lambda or via std::bind to give it the right signature.\n
55/// **Note:** the user should take care of initializing random seeds differently in each
56/// process (e.g. using the process id in the seed). Otherwise several parallel executions
57/// might generate the same sequence of pseudo-random numbers.
58///
59/// #### Return value:
60/// An std::vector. The elements in the container
61/// will be the objects returned by func.
62///
63///
64/// #### Examples:
65///
66/// ~~~{.cpp}
67/// root[] ROOT::TProcessExecutor pool; auto hists = pool.Map(CreateHisto, 10);
68/// root[] ROOT::TProcessExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
69/// ~~~
70///
71/// ###ROOT::TProcessExecutor::MapReduce
72/// This set of methods behaves exactly like Map, but takes an additional
73/// function as a third argument. This function is applied to the set of
74/// objects returned by the corresponding Map execution to "squash" them
75/// to a single object.
76///
77/// ####Examples:
78/// ~~~{.cpp}
79/// root[] ROOT::TProcessExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
80/// root[] ROOT::TProcessExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
81/// ~~~
82///
83//////////////////////////////////////////////////////////////////////////
84
85namespace ROOT {
86//////////////////////////////////////////////////////////////////////////
87/// Class constructor.
88/// nWorkers is the number of times this ROOT session will be forked, i.e.
89/// the number of workers that will be spawned.
90TProcessExecutor::TProcessExecutor(unsigned nWorkers) : TMPClient(nWorkers)
91{
92 Reset();
93}
94
95//////////////////////////////////////////////////////////////////////////
96/// Reset TProcessExecutor's state.
98{
99 fNProcessed = 0;
100 fNToProcess = 0;
102}
103
104//////////////////////////////////////////////////////////////////////////
105/// Reply to a worker who just sent a result.
106/// If another argument to process exists, tell the worker. Otherwise
107/// send a shutdown order.
109{
110 if (fNProcessed < fNToProcess) {
111 //this cannot be a "greedy worker" task
112 if (fTaskType == ETask::kMap)
114 else if (fTaskType == ETask::kMapWithArg)
116 ++fNProcessed;
117 } else //whatever the task is, we are done
119}
120
121
122//////////////////////////////////////////////////////////////////////////
123/// Reply to a worker who is idle.
124/// If another argument to process exists, tell the worker. Otherwise
125/// ask for a result
127{
128 if (fNProcessed < fNToProcess) {
129 //we are executing a "greedy worker" task
132 else if (fTaskType == ETask::kMapRed)
134 ++fNProcessed;
135 } else
137}
138
139} // namespace ROOT
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
ETask fTaskType
the kind of task that is being executed, if any
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
@ kSendResult
Ask for a kFuncResult/kProcResult.
Definition: MPCode.h:36
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition: MPCode.h:32
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kExecFunc
Execute function without arguments.
Definition: MPCode.h:31
VSD Structures.
Definition: StringConv.hxx:21
static constexpr double s