Logo ROOT  
Reference Guide
TProcessExecutor.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
20#include "ROOT/TSeq.hxx"
21#include "TError.h"
22#include "TFileCollection.h"
23#include "TFileInfo.h"
24#include "THashList.h"
25#include "TMPClient.h"
26#include "TMPWorkerExecutor.h"
27
28#include <algorithm> //std::generate
29#include <numeric> //std::iota
30#include <string>
31#include <type_traits> //std::result_of, std::enable_if
32#include <functional> //std::reference_wrapper
33#include <vector>
34
35namespace ROOT {
36
37class TProcessExecutor : public TExecutorCRTP<TProcessExecutor>, private TMPClient {
39public:
40 explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
41 ~TProcessExecutor() = default;
42 //it doesn't make sense for a TProcessExecutor to be copied
45
46 // Map
47 //
49
50 // MapReduce
51 // Redefinition of the MapReduce classes of the base class, to adapt them to
52 // TProcessExecutor's logic
54 template<class F, class R, class Cond = noReferenceCond<F>>
55 auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
56 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
57 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
58 template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
59 auto MapReduce(F func, const std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
60
61 // Reduce
62 //
64
66
67 //////////////////////////////////////////////////////////////////////////
68 /// \brief Return the number of pooled parallel workers.
69 ///
70 /// \return The number of workers in the pool.
71 unsigned GetPoolSize() const { return TMPClient::GetNWorkers(); }
72
73private:
74 // Implementation of the Map functions declared in the parent class (TExecutorCRTP)
75 //
76 template<class F, class Cond = noReferenceCond<F>>
77 auto MapImpl(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
78 template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
79 auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
80 template<class F, class T, class Cond = noReferenceCond<F, T>>
81 auto MapImpl(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
82 template<class F, class T, class Cond = noReferenceCond<F, T>>
83 auto MapImpl(F func, const std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
84
85 template<class T> void Collect(std::vector<T> &reslist);
86 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
87
88 void Reset();
90 void ReplyToIdle(TSocket *s);
91
92 unsigned fNProcessed; ///< number of arguments already passed to the workers
93 unsigned fNToProcess; ///< total number of arguments to pass to the workers
94
95 /// A collection of the types of tasks that TProcessExecutor can execute.
96 /// It is used to interpret in the right way and properly reply to the
97 /// messages received (see, for example, TProcessExecutor::HandleInput)
98 enum class ETask : unsigned char {
99 kNoTask, ///< no task is being executed
100 kMap, ///< a Map method with no arguments is being executed
101 kMapWithArg, ///< a Map method with arguments is being executed
102 kMapRed, ///< a MapReduce method with no arguments is being executed
103 kMapRedWithArg ///< a MapReduce method with arguments is being executed
104 };
105
106 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
107};
108
109
110/************ TEMPLATE METHODS IMPLEMENTATION ******************/
111
112//////////////////////////////////////////////////////////////////////////
113/// \brief Execute a function without arguments several times in parallel.
114/// Implementation of the Map method.
115///
116/// \copydetails TExecutorCRTP::Map(F func,unsigned nTimes)
117template<class F, class Cond>
118auto TProcessExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>
119{
120 using retType = decltype(func());
121 //prepare environment
122 Reset();
123 fTaskType = ETask::kMap;
124
125 //fork max(nTimes, fNWorkers) times
126 unsigned oldNWorkers = GetPoolSize();
127 if (nTimes < oldNWorkers)
128 SetNWorkers(nTimes);
129 TMPWorkerExecutor<F> worker(func);
130 bool ok = Fork(worker);
131 SetNWorkers(oldNWorkers);
132 if (!ok)
133 {
134 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
135 return std::vector<retType>();
136 }
137
138 //give out tasks
139 fNToProcess = nTimes;
140 std::vector<retType> reslist;
141 reslist.reserve(fNToProcess);
142 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
143
144 //collect results, give out other tasks if needed
145 Collect(reslist);
146
147 //clean-up and return
148 ReapWorkers();
149 fTaskType = ETask::kNoTask;
150 return reslist;
151}
152
153//////////////////////////////////////////////////////////////////////////
154/// \brief Execute a function over the elements of a vector in parallel
155/// Implementation of the Map method.
156///
157/// \copydetails TExecutorCRTP::Map(F func,std::vector<T> &args)
158template<class F, class T, class Cond>
159auto TProcessExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
160{
161 //check whether func is callable
162 using retType = decltype(func(args.front()));
163 //prepare environment
164 Reset();
165 fTaskType = ETask::kMapWithArg;
166
167 //fork max(args.size(), fNWorkers) times
168 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
169 unsigned oldNWorkers = GetPoolSize();
170 if (args.size() < oldNWorkers)
171 SetNWorkers(args.size());
172 TMPWorkerExecutor<F, T> worker(func, args);
173 bool ok = Fork(worker);
174 SetNWorkers(oldNWorkers);
175 if (!ok)
176 {
177 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
178 return std::vector<retType>();
179 }
180
181 //give out tasks
182 fNToProcess = args.size();
183 std::vector<retType> reslist;
184 reslist.reserve(fNToProcess);
185 std::vector<unsigned> range(fNToProcess);
186 std::iota(range.begin(), range.end(), 0);
187 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
188
189 //collect results, give out other tasks if needed
190 Collect(reslist);
191
192 //clean-up and return
193 ReapWorkers();
194 fTaskType = ETask::kNoTask;
195 return reslist;
196}
197
198//////////////////////////////////////////////////////////////////////////
199/// \brief Execute a function over the elements of an immutable vector in parallel
200/// Implementation of the Map method.
201///
202/// \copydetails TExecutorCRTP::Map(F func,const std::vector<T> &args)
203template<class F, class T, class Cond>
204auto TProcessExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
205{
206 //check whether func is callable
207 using retType = decltype(func(args.front()));
208 //prepare environment
209 Reset();
210 fTaskType = ETask::kMapWithArg;
211
212 //fork max(args.size(), fNWorkers) times
213 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
214 unsigned oldNWorkers = GetPoolSize();
215 if (args.size() < oldNWorkers)
216 SetNWorkers(args.size());
217 TMPWorkerExecutor<F, T> worker(func, args);
218 bool ok = Fork(worker);
219 SetNWorkers(oldNWorkers);
220 if (!ok)
221 {
222 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
223 return std::vector<retType>();
224 }
225
226 //give out tasks
227 fNToProcess = args.size();
228 std::vector<retType> reslist;
229 reslist.reserve(fNToProcess);
230 std::vector<unsigned> range(fNToProcess);
231 std::iota(range.begin(), range.end(), 0);
232 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
233
234 //collect results, give out other tasks if needed
235 Collect(reslist);
236
237 //clean-up and return
238 ReapWorkers();
239 fTaskType = ETask::kNoTask;
240 return reslist;
241}
242
243//////////////////////////////////////////////////////////////////////////
244/// \brief Execute a function over a sequence of indexes in parallel.
245/// Implementation of the Map method.
246///
247/// \copydetails TExecutorCRTP::Map(F func,ROOT::TSeq<INTEGER> args)
248template<class F, class INTEGER, class Cond>
249auto TProcessExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>
250{
251 std::vector<INTEGER> vargs(args.size());
252 std::copy(args.begin(), args.end(), vargs.begin());
253 const auto &reslist = Map(func, vargs);
254 return reslist;
255}
256
257//////////////////////////////////////////////////////////////////////////
258/// \brief Execute a function `nTimes` in parallel (Map) and accumulate the results into a single value (Reduce).
259/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,unsigned nTimes,R redfunc)
260template<class F, class R, class Cond>
261auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
262{
263 using retType = decltype(func());
264 //prepare environment
265 Reset();
266 fTaskType= ETask::kMapRed;
267
268 //fork max(nTimes, fNWorkers) times
269 unsigned oldNWorkers = GetPoolSize();
270 if (nTimes < oldNWorkers)
271 SetNWorkers(nTimes);
272 TMPWorkerExecutor<F, void, R> worker(func, redfunc);
273 bool ok = Fork(worker);
274 SetNWorkers(oldNWorkers);
275 if (!ok) {
276 std::cerr << "[E][C] Could not fork. Aborting operation\n";
277 return retType();
278 }
279
280 //give workers their first task
281 fNToProcess = nTimes;
282 std::vector<retType> reslist;
283 reslist.reserve(fNToProcess);
284 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
285
286 //collect results/give workers their next task
287 Collect(reslist);
288
289 //clean-up and return
290 ReapWorkers();
291 fTaskType= ETask::kNoTask;
292 return redfunc(reslist);
293}
294
295//////////////////////////////////////////////////////////////////////////
296/// \brief Execute a function in parallel over the elements of a vector (Map) and accumulate the results into a single value (Reduce).
297/// Benefits from partial reduction into `nChunks` intermediate results.
298///
299/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,std::vector<T> &args,R redfunc,unsigned nChunks).
300template<class F, class T, class R, class Cond>
301auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
302{
303
304 using retType = decltype(func(args.front()));
305 //prepare environment
306 Reset();
307 fTaskType= ETask::kMapRedWithArg;
308
309 //fork max(args.size(), fNWorkers) times
310 unsigned oldNWorkers = GetPoolSize();
311 if (args.size() < oldNWorkers)
312 SetNWorkers(args.size());
313 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
314 bool ok = Fork(worker);
315 SetNWorkers(oldNWorkers);
316 if (!ok) {
317 std::cerr << "[E][C] Could not fork. Aborting operation\n";
318 return decltype(func(args.front()))();
319 }
320
321 //give workers their first task
322 fNToProcess = args.size();
323 std::vector<retType> reslist;
324 reslist.reserve(fNToProcess);
325 std::vector<unsigned> range(fNToProcess);
326 std::iota(range.begin(), range.end(), 0);
327 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
328
329 //collect results/give workers their next task
330 Collect(reslist);
331
332 ReapWorkers();
333 fTaskType= ETask::kNoTask;
334 return Reduce(reslist, redfunc);
335}
336
337//////////////////////////////////////////////////////////////////////////
338/// \brief Execute a function in parallel over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce).
339/// Benefits from partial reduction into `nChunks` intermediate results.
340///
341/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,const std::vector<T> &args,R redfunc,unsigned nChunks).
342template<class F, class T, class R, class Cond>
343auto TProcessExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
344{
345
346 using retType = decltype(func(args.front()));
347 //prepare environment
348 Reset();
349 fTaskType= ETask::kMapRedWithArg;
350
351 //fork max(args.size(), fNWorkers) times
352 unsigned oldNWorkers = GetPoolSize();
353 if (args.size() < oldNWorkers)
354 SetNWorkers(args.size());
355 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
356 bool ok = Fork(worker);
357 SetNWorkers(oldNWorkers);
358 if (!ok) {
359 std::cerr << "[E][C] Could not fork. Aborting operation\n";
360 return decltype(func(args.front()))();
361 }
362
363 //give workers their first task
364 fNToProcess = args.size();
365 std::vector<retType> reslist;
366 reslist.reserve(fNToProcess);
367 std::vector<unsigned> range(fNToProcess);
368 std::iota(range.begin(), range.end(), 0);
369 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
370
371 //collect results/give workers their next task
372 Collect(reslist);
373
374 ReapWorkers();
375 fTaskType= ETask::kNoTask;
376 return Reduce(reslist, redfunc);
377}
378
379//////////////////////////////////////////////////////////////////////////
380/// Handle message and reply to the worker
381template<class T>
382void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
383{
384 unsigned code = msg.first;
385 if (code == MPCode::kFuncResult) {
386 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
388 } else if (code == MPCode::kIdling) {
389 ReplyToIdle(s);
390 } else if(code == MPCode::kProcResult) {
391 if(msg.second != nullptr)
392 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
394 } else if(code == MPCode::kProcError) {
395 const char *str = ReadBuffer<const char*>(msg.second.get());
396 Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
397 "Continuing execution ignoring these entries.", str);
398 ReplyToIdle(s);
399 delete [] str;
400 } else {
401 // UNKNOWN CODE
402 Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
403 }
404}
405
406//////////////////////////////////////////////////////////////////////////
407/// Listen for messages sent by the workers and call the appropriate handler function.
408/// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
409/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
410template<class T>
411void TProcessExecutor::Collect(std::vector<T> &reslist)
412{
413 TMonitor &mon = GetMonitor();
414 mon.ActivateAll();
415 while (mon.GetActive() > 0) {
416 TSocket *s = mon.Select();
417 MPCodeBufPair msg = MPRecv(s);
418 if (msg.first == MPCode::kRecvError) {
419 Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
420 Remove(s);
421 } else if (msg.first < 1000)
422 HandlePoolCode(msg, s, reslist);
423 else
424 HandleMPCode(msg, s);
425 }
426}
427
428} // ROOT namespace
429
430#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:32
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
Definition: MPSendRecv.cxx:54
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define R(a, b, c, d, e, f, g, h, i)
Definition: RSha256.hxx:110
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition: TError.cxx:187
int type
Definition: TGX11.cxx:121
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
auto MapImpl(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute a function without arguments several times in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Definition: TSeq.hxx:66
Base class for multiprocess applications' clients.
Definition: TMPClient.h:23
unsigned GetNWorkers() const
Definition: TMPClient.h:40
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:334
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition: TMPClient.h:39
TMonitor & GetMonitor()
Definition: TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:300
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
auto Map(Args &&... args)
Create new collection applying a callable to the elements of the input collection.
Definition: RVec.hxx:2025
const Int_t n
Definition: legend1.C:16
#define F(x, y, z)
@ kRecvError
Error while reading from the socket.
Definition: MPCode.h:51
@ kIdling
We are ready for the next task.
Definition: MPCode.h:35
@ kFuncResult
The message contains the result of a function execution.
Definition: MPCode.h:33
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition: MPCode.h:32
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kProcError
Tell the client there was an error while processing.
Definition: MPCode.h:44
@ kExecFunc
Execute function without arguments.
Definition: MPCode.h:31
@ kProcResult
The message contains the result of the processing of a TTree.
Definition: MPCode.h:42
double T(double x)
Definition: ChebyshevPol.h:34
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
static constexpr double s