Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TProcessExecutor.hxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3// Modified: G Ganis Jan 2017
4
5/*************************************************************************
6 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
15
16#include "MPCode.h"
17#include "MPSendRecv.h"
18#include "PoolUtils.h"
20#include "ROOT/TSeq.hxx"
21#include "ROOT/TypeTraits.hxx"
22#include "TError.h"
23#include "TFileCollection.h"
24#include "TFileInfo.h"
25#include "THashList.h"
26#include "TMPClient.h"
27#include "TMPWorkerExecutor.h"
28
29#include <algorithm> //std::generate
30#include <numeric> //std::iota
31#include <string>
32#include <functional> //std::reference_wrapper
33#include <vector>
34
35namespace ROOT {
36
37class TProcessExecutor : public TExecutorCRTP<TProcessExecutor>, private TMPClient {
39
40public:
41 explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
42 ~TProcessExecutor() = default;
43 //it doesn't make sense for a TProcessExecutor to be copied
46
47 // Map
48 //
50
51 // MapReduce
52 // Redefinition of the MapReduce classes of the base class, to adapt them to
53 // TProcessExecutor's logic
55 template<class F, class R, class Cond = validMapReturnCond<F>>
56 auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>;
57 template<class F, class T, class R, class Cond = validMapReturnCond<F, T>>
58 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
59 template<class F, class T, class R, class Cond = validMapReturnCond<F, T>>
60 auto MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
61
62 // Reduce
63 //
65
67
68 //////////////////////////////////////////////////////////////////////////
69 /// \brief Return the number of pooled parallel workers.
70 ///
71 /// \return The number of workers in the pool.
72 unsigned GetPoolSize() const { return TMPClient::GetNWorkers(); }
73
74private:
75 // Implementation of the Map functions declared in the parent class (TExecutorCRTP)
76 //
77 template<class F, class Cond = validMapReturnCond<F>>
78 auto MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
79 template<class F, class INTEGER, class Cond = validMapReturnCond<F, INTEGER>>
80 auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>;
81 template<class F, class T, class Cond = validMapReturnCond<F, T>>
82 auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
83 template<class F, class T, class Cond = validMapReturnCond<F, T>>
84 auto MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
85
86 template<class T> void Collect(std::vector<T> &reslist);
87 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
88
89 void Reset();
91 void ReplyToIdle(TSocket *s);
92
93 unsigned fNProcessed; ///< number of arguments already passed to the workers
94 unsigned fNToProcess; ///< total number of arguments to pass to the workers
95
96 /// A collection of the types of tasks that TProcessExecutor can execute.
97 /// It is used to interpret in the right way and properly reply to the
98 /// messages received (see, for example, TProcessExecutor::HandleInput)
99 enum class ETask : unsigned char {
100 kNoTask, ///< no task is being executed
101 kMap, ///< a Map method with no arguments is being executed
102 kMapWithArg, ///< a Map method with arguments is being executed
103 kMapRed, ///< a MapReduce method with no arguments is being executed
104 kMapRedWithArg ///< a MapReduce method with arguments is being executed
105 };
106
107 ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
108};
109
110
111/************ TEMPLATE METHODS IMPLEMENTATION ******************/
112
113//////////////////////////////////////////////////////////////////////////
114/// \brief Execute a function without arguments several times in parallel.
115/// Implementation of the Map method.
116///
117/// \copydetails TExecutorCRTP::Map(F func,unsigned nTimes)
118template<class F, class Cond>
119auto TProcessExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>
120{
121 using retType = decltype(func());
122 //prepare environment
123 Reset();
124 fTaskType = ETask::kMap;
125
126 //fork max(nTimes, fNWorkers) times
127 unsigned oldNWorkers = GetPoolSize();
128 if (nTimes < oldNWorkers)
129 SetNWorkers(nTimes);
130 TMPWorkerExecutor<F> worker(func);
131 bool ok = Fork(worker);
132 SetNWorkers(oldNWorkers);
133 if (!ok)
134 {
135 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
136 return std::vector<retType>();
137 }
138
139 //give out tasks
140 fNToProcess = nTimes;
141 std::vector<retType> reslist;
142 reslist.reserve(fNToProcess);
143 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
144
145 //collect results, give out other tasks if needed
146 Collect(reslist);
147
148 //clean-up and return
149 ReapWorkers();
150 fTaskType = ETask::kNoTask;
151 return reslist;
152}
153
154//////////////////////////////////////////////////////////////////////////
155/// \brief Execute a function over the elements of a vector in parallel
156/// Implementation of the Map method.
157///
158/// \copydetails TExecutorCRTP::Map(F func,std::vector<T> &args)
159template<class F, class T, class Cond>
160auto TProcessExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
161{
162 //check whether func is callable
163 using retType = decltype(func(args.front()));
164 //prepare environment
165 Reset();
166 fTaskType = ETask::kMapWithArg;
167
168 //fork max(args.size(), fNWorkers) times
169 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
170 unsigned oldNWorkers = GetPoolSize();
171 if (args.size() < oldNWorkers)
172 SetNWorkers(args.size());
173 TMPWorkerExecutor<F, T> worker(func, args);
174 bool ok = Fork(worker);
175 SetNWorkers(oldNWorkers);
176 if (!ok)
177 {
178 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
179 return std::vector<retType>();
180 }
181
182 //give out tasks
183 fNToProcess = args.size();
184 std::vector<retType> reslist;
185 reslist.reserve(fNToProcess);
186 std::vector<unsigned> range(fNToProcess);
187 std::iota(range.begin(), range.end(), 0);
188 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
189
190 //collect results, give out other tasks if needed
191 Collect(reslist);
192
193 //clean-up and return
194 ReapWorkers();
195 fTaskType = ETask::kNoTask;
196 return reslist;
197}
198
199//////////////////////////////////////////////////////////////////////////
200/// \brief Execute a function over the elements of an immutable vector in parallel
201/// Implementation of the Map method.
202///
203/// \copydetails TExecutorCRTP::Map(F func,const std::vector<T> &args)
204template<class F, class T, class Cond>
205auto TProcessExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
206{
207 //check whether func is callable
208 using retType = decltype(func(args.front()));
209 //prepare environment
210 Reset();
211 fTaskType = ETask::kMapWithArg;
212
213 //fork max(args.size(), fNWorkers) times
214 //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
215 unsigned oldNWorkers = GetPoolSize();
216 if (args.size() < oldNWorkers)
217 SetNWorkers(args.size());
218 TMPWorkerExecutor<F, T> worker(func, args);
219 bool ok = Fork(worker);
220 SetNWorkers(oldNWorkers);
221 if (!ok)
222 {
223 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
224 return std::vector<retType>();
225 }
226
227 //give out tasks
228 fNToProcess = args.size();
229 std::vector<retType> reslist;
230 reslist.reserve(fNToProcess);
231 std::vector<unsigned> range(fNToProcess);
232 std::iota(range.begin(), range.end(), 0);
233 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
234
235 //collect results, give out other tasks if needed
236 Collect(reslist);
237
238 //clean-up and return
239 ReapWorkers();
240 fTaskType = ETask::kNoTask;
241 return reslist;
242}
243
244//////////////////////////////////////////////////////////////////////////
245/// \brief Execute a function over a sequence of indexes in parallel.
246/// Implementation of the Map method.
247///
248/// \copydetails TExecutorCRTP::Map(F func,ROOT::TSeq<INTEGER> args)
249template<class F, class INTEGER, class Cond>
250auto TProcessExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>
251{
252 std::vector<INTEGER> vargs(args.size());
253 std::copy(args.begin(), args.end(), vargs.begin());
254 const auto &reslist = Map(func, vargs);
255 return reslist;
256}
257
258//////////////////////////////////////////////////////////////////////////
259/// \brief Execute a function `nTimes` in parallel (Map) and accumulate the results into a single value (Reduce).
260/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,unsigned nTimes,R redfunc)
261template<class F, class R, class Cond>
262auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>
263{
264 using retType = decltype(func());
265 //prepare environment
266 Reset();
267 fTaskType= ETask::kMapRed;
268
269 //fork max(nTimes, fNWorkers) times
270 unsigned oldNWorkers = GetPoolSize();
271 if (nTimes < oldNWorkers)
272 SetNWorkers(nTimes);
273 TMPWorkerExecutor<F, void, R> worker(func, redfunc);
274 bool ok = Fork(worker);
275 SetNWorkers(oldNWorkers);
276 if (!ok) {
277 std::cerr << "[E][C] Could not fork. Aborting operation\n";
278 return retType();
279 }
280
281 //give workers their first task
282 fNToProcess = nTimes;
283 std::vector<retType> reslist;
284 reslist.reserve(fNToProcess);
285 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
286
287 //collect results/give workers their next task
288 Collect(reslist);
289
290 //clean-up and return
291 ReapWorkers();
292 fTaskType= ETask::kNoTask;
293 return redfunc(reslist);
294}
295
296//////////////////////////////////////////////////////////////////////////
297/// \brief Execute a function in parallel over the elements of a vector (Map) and accumulate the results into a single value (Reduce).
298/// Benefits from partial reduction into `nChunks` intermediate results.
299///
300/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,std::vector<T> &args,R redfunc,unsigned nChunks).
301template<class F, class T, class R, class Cond>
302auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
303{
304
305 using retType = decltype(func(args.front()));
306 //prepare environment
307 Reset();
308 fTaskType= ETask::kMapRedWithArg;
309
310 //fork max(args.size(), fNWorkers) times
311 unsigned oldNWorkers = GetPoolSize();
312 if (args.size() < oldNWorkers)
313 SetNWorkers(args.size());
314 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
315 bool ok = Fork(worker);
316 SetNWorkers(oldNWorkers);
317 if (!ok) {
318 std::cerr << "[E][C] Could not fork. Aborting operation\n";
319 return decltype(func(args.front()))();
320 }
321
322 //give workers their first task
323 fNToProcess = args.size();
324 std::vector<retType> reslist;
325 reslist.reserve(fNToProcess);
326 std::vector<unsigned> range(fNToProcess);
327 std::iota(range.begin(), range.end(), 0);
328 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
329
330 //collect results/give workers their next task
331 Collect(reslist);
332
333 ReapWorkers();
334 fTaskType= ETask::kNoTask;
335 return Reduce(reslist, redfunc);
336}
337
338//////////////////////////////////////////////////////////////////////////
339/// \brief Execute a function in parallel over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce).
340/// Benefits from partial reduction into `nChunks` intermediate results.
341///
342/// \copydetails ROOT::Internal::TExecutor::MapReduce(F func,const std::vector<T> &args,R redfunc,unsigned nChunks).
343template<class F, class T, class R, class Cond>
344auto TProcessExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
345{
346
347 using retType = decltype(func(args.front()));
348 //prepare environment
349 Reset();
350 fTaskType= ETask::kMapRedWithArg;
351
352 //fork max(args.size(), fNWorkers) times
353 unsigned oldNWorkers = GetPoolSize();
354 if (args.size() < oldNWorkers)
355 SetNWorkers(args.size());
356 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
357 bool ok = Fork(worker);
358 SetNWorkers(oldNWorkers);
359 if (!ok) {
360 std::cerr << "[E][C] Could not fork. Aborting operation\n";
361 return decltype(func(args.front()))();
362 }
363
364 //give workers their first task
365 fNToProcess = args.size();
366 std::vector<retType> reslist;
367 reslist.reserve(fNToProcess);
368 std::vector<unsigned> range(fNToProcess);
369 std::iota(range.begin(), range.end(), 0);
370 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
371
372 //collect results/give workers their next task
373 Collect(reslist);
374
375 ReapWorkers();
376 fTaskType= ETask::kNoTask;
377 return Reduce(reslist, redfunc);
378}
379
380//////////////////////////////////////////////////////////////////////////
381/// Handle message and reply to the worker
382template<class T>
383void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
384{
385 unsigned code = msg.first;
386 if (code == MPCode::kFuncResult) {
387 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
389 } else if (code == MPCode::kIdling) {
390 ReplyToIdle(s);
391 } else if(code == MPCode::kProcResult) {
392 if(msg.second != nullptr)
393 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
395 } else if(code == MPCode::kProcError) {
396 const char *str = ReadBuffer<const char*>(msg.second.get());
397 Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
398 "Continuing execution ignoring these entries.", str);
399 ReplyToIdle(s);
400 delete [] str;
401 } else {
402 // UNKNOWN CODE
403 Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
404 }
405}
406
407//////////////////////////////////////////////////////////////////////////
408/// Listen for messages sent by the workers and call the appropriate handler function.
409/// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
410/// TMPClient::HandleMPCode is called on messages with a code >= 1000.
411template<class T>
412void TProcessExecutor::Collect(std::vector<T> &reslist)
413{
414 TMonitor &mon = GetMonitor();
415 mon.ActivateAll();
416 while (mon.GetActive() > 0) {
417 TSocket *s = mon.Select();
418 MPCodeBufPair msg = MPRecv(s);
419 if (msg.first == MPCode::kRecvError) {
420 Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
421 Remove(s);
422 } else if (msg.first < 1000)
423 HandlePoolCode(msg, s, reslist);
424 else
425 HandleMPCode(msg, s);
426 }
427}
428
429} // ROOT namespace
430
431#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
auto Map(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times.
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
unsigned fNToProcess
total number of arguments to pass to the workers
auto MapImpl(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times in parallel.
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Definition TSeq.hxx:67
Base class for multiprocess applications' clients.
Definition TMPClient.h:23
unsigned GetNWorkers() const
Definition TMPClient.h:40
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
Definition TMPClient.h:39
TMonitor & GetMonitor()
Definition TMPClient.h:36
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
Definition TMonitor.cxx:268
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
const Int_t n
Definition legend1.C:16
#define F(x, y, z)
@ kRecvError
Error while reading from the socket.
Definition MPCode.h:51
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kFuncResult
The message contains the result of a function execution.
Definition MPCode.h:33
@ kExecFuncWithArg
Execute function with the argument contained in the message.
Definition MPCode.h:32
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition MPCode.h:49
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kExecFunc
Execute function without arguments.
Definition MPCode.h:31
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...