Logo ROOT   6.16/01
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
19#include "TError.h"
20#include "TFileCollection.h"
21#include "TFileInfo.h"
22#include "THashList.h"
23#include "TMPClient.h"
24#include "ROOT/TExecutor.hxx"
25#include "TMPWorkerExecutor.h"
26#include <algorithm> //std::generate
27#include <numeric> //std::iota
28#include <string>
29#include <type_traits> //std::result_of, std::enable_if
30#include <functional> //std::reference_wrapper
31#include <vector>
32
33namespace ROOT {
34
35class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
36public:
37 explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
38 ~TProcessExecutor() = default;
39 //it doesn't make sense for a TProcessExecutor to be copied
42
43 // Map
45 template<class F, class Cond = noReferenceCond<F>>
46 auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
47 template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
48 auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
49 template<class F, class T, class Cond = noReferenceCond<F, T>>
50 auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
51
53 unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
54
56 template<class F, class R, class Cond = noReferenceCond<F>>
57 auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
58 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
59 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
60
62 template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
63
64private:
65 template<class T> void Collect(std::vector<T> &reslist);
66 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
67
68 void Reset();
70 void ReplyToIdle(TSocket *s);
71
72 unsigned fNProcessed; ///< number of arguments already passed to the workers
73 unsigned fNToProcess; ///< total number of arguments to pass to the workers
74
75 /// A collection of the types of tasks that TProcessExecutor can execute.
76 /// It is used to interpret in the right way and properly reply to the
77 /// messages received (see, for example, TProcessExecutor::HandleInput)
78 enum class ETask : unsigned char {
79 kNoTask, ///< no task is being executed
80 kMap, ///< a Map method with no arguments is being executed
81 kMapWithArg, ///< a Map method with arguments is being executed
82 kMapRed, ///< a MapReduce method with no arguments is being executed
83 kMapRedWithArg ///< a MapReduce method with arguments is being executed
84 };
85
86 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
87};
88
89
90/************ TEMPLATE METHODS IMPLEMENTATION ******************/
91
92//////////////////////////////////////////////////////////////////////////
93/// Execute func (with no arguments) nTimes in parallel.
94/// A vector containg executions' results is returned.
95/// Functions that take more than zero arguments can be executed (with
96/// fixed arguments) by wrapping them in a lambda or with std::bind.
97template<class F, class Cond>
98auto TProcessExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>
99{
100 using retType = decltype(func());
101 //prepare environment
102 Reset();
103 fTaskType = ETask::kMap;
104
105 //fork max(nTimes, fNWorkers) times
106 unsigned oldNWorkers = GetNWorkers();
107 if (nTimes < oldNWorkers)
108 SetNWorkers(nTimes);
109 TMPWorkerExecutor<F> worker(func);
110 bool ok = Fork(worker);
111 SetNWorkers(oldNWorkers);
112 if (!ok)
113 {
114 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
115 return std::vector<retType>();
116 }
117
118 //give out tasks
119 fNToProcess = nTimes;
120 std::vector<retType> reslist;
121 reslist.reserve(fNToProcess);
122 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
123
124 //collect results, give out other tasks if needed
125 Collect(reslist);
126
127 //clean-up and return
128 ReapWorkers();
129 fTaskType = ETask::kNoTask;
130 return reslist;
131}
132
133//////////////////////////////////////////////////////////////////////////
134/// Execute func in parallel, taking an element of an
135/// std::vector as argument.
136/// A vector containg executions' results is returned.
137// actual implementation of the Map method. all other calls with arguments eventually
138// call this one
139template<class F, class T, class Cond>
140auto TProcessExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
141{
142 //check whether func is callable
143 using retType = decltype(func(args.front()));
144 //prepare environment
145 Reset();
146 fTaskType = ETask::kMapWithArg;
147
148 //fork max(args.size(), fNWorkers) times
149 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
150 unsigned oldNWorkers = GetNWorkers();
151 if (args.size() < oldNWorkers)
152 SetNWorkers(args.size());
153 TMPWorkerExecutor<F, T> worker(func, args);
154 bool ok = Fork(worker);
155 SetNWorkers(oldNWorkers);
156 if (!ok)
157 {
158 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
159 return std::vector<retType>();
160 }
161
162 //give out tasks
163 fNToProcess = args.size();
164 std::vector<retType> reslist;
165 reslist.reserve(fNToProcess);
166 std::vector<unsigned> range(fNToProcess);
167 std::iota(range.begin(), range.end(), 0);
168 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
169
170 //collect results, give out other tasks if needed
171 Collect(reslist);
172
173 //clean-up and return
174 ReapWorkers();
175 fTaskType = ETask::kNoTask;
176 return reslist;
177}
178
179//////////////////////////////////////////////////////////////////////////
180/// Execute func in parallel, taking an element of a
181/// sequence as argument.
182/// A vector containg executions' results is returned.
183template<class F, class INTEGER, class Cond>
184auto TProcessExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>
185{
186 std::vector<INTEGER> vargs(args.size());
187 std::copy(args.begin(), args.end(), vargs.begin());
188 const auto &reslist = Map(func, vargs);
189 return reslist;
190}
191
192//////////////////////////////////////////////////////////////////////////
193/// This method behaves just like Map, but an additional redfunc function
194/// must be provided. redfunc is applied to the vector Map would return and
195/// must return the same type as func. In practice, redfunc can be used to
196/// "squash" the vector returned by Map into a single object by merging,
197/// adding, mixing the elements of the vector.
198template<class F, class R, class Cond>
199auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
200{
201 using retType = decltype(func());
202 //prepare environment
203 Reset();
204 fTaskType= ETask::kMapRed;
205
206 //fork max(nTimes, fNWorkers) times
207 unsigned oldNWorkers = GetNWorkers();
208 if (nTimes < oldNWorkers)
209 SetNWorkers(nTimes);
210 TMPWorkerExecutor<F, void, R> worker(func, redfunc);
211 bool ok = Fork(worker);
212 SetNWorkers(oldNWorkers);
213 if (!ok) {
214 std::cerr << "[E][C] Could not fork. Aborting operation\n";
215 return retType();
216 }
217
218 //give workers their first task
219 fNToProcess = nTimes;
220 std::vector<retType> reslist;
221 reslist.reserve(fNToProcess);
222 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
223
224 //collect results/give workers their next task
225 Collect(reslist);
226
227 //clean-up and return
228 ReapWorkers();
229 fTaskType= ETask::kNoTask;
230 return redfunc(reslist);
231}
232
233//////////////////////////////////////////////////////////////////////////
234/// This method behaves just like Map, but an additional redfunc function
235/// must be provided. redfunc is applied to the vector Map would return and
236/// must return the same type as func. In practice, redfunc can be used to
237/// "squash" the vector returned by Map into a single object by merging,
238/// adding, mixing the elements of the vector.
239template<class F, class T, class R, class Cond>
240auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
241{
242
243 using retType = decltype(func(args.front()));
244 //prepare environment
245 Reset();
246 fTaskType= ETask::kMapRedWithArg;
247
248 //fork max(args.size(), fNWorkers) times
249 unsigned oldNWorkers = GetNWorkers();
250 if (args.size() < oldNWorkers)
251 SetNWorkers(args.size());
252 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
253 bool ok = Fork(worker);
254 SetNWorkers(oldNWorkers);
255 if (!ok) {
256 std::cerr << "[E][C] Could not fork. Aborting operation\n";
257 return decltype(func(args.front()))();
258 }
259
260 //give workers their first task
261 fNToProcess = args.size();
262 std::vector<retType> reslist;
263 reslist.reserve(fNToProcess);
264 std::vector<unsigned> range(fNToProcess);
265 std::iota(range.begin(), range.end(), 0);
266 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
267
268 //collect results/give workers their next task
269 Collect(reslist);
270
271 ReapWorkers();
272 fTaskType= ETask::kNoTask;
273 return Reduce(reslist, redfunc);
274}
275
276//////////////////////////////////////////////////////////////////////////
277/// "Reduce" an std::vector into a single object by passing a
278/// function as the second argument defining the reduction operation.
279template<class T, class R>
280T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
281{
282 // check we can apply reduce to objs
283 static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
284 return redfunc(objs);
285}
286
287//////////////////////////////////////////////////////////////////////////
288/// Handle message and reply to the worker
289template<class T>
290void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
291{
292 unsigned code = msg.first;
293 if (code == MPCode::kFuncResult) {
294 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
296 } else if (code == MPCode::kIdling) {
297 ReplyToIdle(s);
298 } else if(code == MPCode::kProcResult) {
299 if(msg.second != nullptr)
300 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
302 } else if(code == MPCode::kProcError) {
303 const char *str = ReadBuffer<const char*>(msg.second.get());
304 Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
305 "Continuing execution ignoring these entries.", str);
306 ReplyToIdle(s);
307 delete [] str;
308 } else {
309 // UNKNOWN CODE
310 Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
311 }
312}
313
314//////////////////////////////////////////////////////////////////////////
315/// Listen for messages sent by the workers and call the appropriate handler function.
316/// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
317/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
318template<class T>
319void TProcessExecutor::Collect(std::vector<T> &reslist)
320{
321 TMonitor &mon = GetMonitor();
322 mon.ActivateAll();
323 while (mon.GetActive() > 0) {
324 TSocket *s = mon.Select();
325 MPCodeBufPair msg = MPRecv(s);
326 if (msg.first == MPCode::kRecvError) {
327 Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
328 Remove(s);
329 } else if (msg.first < 1000)
330 HandlePoolCode(msg, s, reslist);
331 else
332 HandleMPCode(msg, s);
333 }
334}
335
336} // ROOT namespace
337
338#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
void Error(const char *location, const char *msgfmt,...)
int type
Definition: TGX11.cxx:120
This class defines an interface to execute the same task multiple times in parallel,...
Definition: TExecutor.hxx:61
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
unsigned GetNWorkers() const
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
TMonitor & GetMonitor()
Definition: TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
const Int_t n
Definition: legend1.C:16
#define F(x, y, z)
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
@ kFuncResult
The message contains the result of a function execution.
Definition: MPCode.h:33
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition: MPCode.h:32
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kProcError
Tell the client there was an error while processing.
Definition: MPCode.h:44
@ kExecFunc
Execute function without arguments.
Definition: MPCode.h:31
@ kProcResult
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
double T(double x)
Definition: ChebyshevPol.h:34
auto Map(const RVec< T > &v, F &&f) -> RVec< decltype(f(v[0]))>
Create new collection applying a callable to the elements of the input collection.
Definition: RVec.hxx:777
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
static constexpr double s