13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
54 template<
class F,
class R,
class Cond = noReferenceCond<F>>
55 auto MapReduce(
F func,
unsigned nTimes,
R redfunc) ->
typename std::result_of<
F()>
::type;
56 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
57 auto MapReduce(
F func, std::vector<T> &args,
R redfunc) ->
typename std::result_of<
F(T)>
::type;
58 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
59 auto MapReduce(
F func,
const std::vector<T> &args,
R redfunc) ->
typename std::result_of<
F(T)>
::type;
76 template<
class F,
class Cond = noReferenceCond<F>>
77 auto MapImpl(
F func,
unsigned nTimes) -> std::vector<
typename std::result_of<
F()>
::type>;
78 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
80 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
81 auto MapImpl(
F func, std::vector<T> &args) -> std::vector<
typename std::result_of<
F(T)>
::type>;
82 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
83 auto MapImpl(
F func,
const std::vector<T> &args) -> std::vector<
typename std::result_of<
F(T)>
::type>;
85 template<
class T>
void Collect(std::vector<T> &reslist);
98 enum class ETask :
unsigned char {
117template<
class F,
class Cond>
120 using retType =
decltype(func());
123 fTaskType = ETask::kMap;
126 unsigned oldNWorkers = GetPoolSize();
127 if (nTimes < oldNWorkers)
130 bool ok = Fork(worker);
131 SetNWorkers(oldNWorkers);
134 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
135 return std::vector<retType>();
139 fNToProcess = nTimes;
140 std::vector<retType> reslist;
141 reslist.reserve(fNToProcess);
149 fTaskType = ETask::kNoTask;
158template<
class F,
class T,
class Cond>
162 using retType =
decltype(func(args.front()));
165 fTaskType = ETask::kMapWithArg;
169 unsigned oldNWorkers = GetPoolSize();
170 if (args.size() < oldNWorkers)
171 SetNWorkers(args.size());
173 bool ok = Fork(worker);
174 SetNWorkers(oldNWorkers);
177 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
178 return std::vector<retType>();
182 fNToProcess = args.size();
183 std::vector<retType> reslist;
184 reslist.reserve(fNToProcess);
185 std::vector<unsigned> range(fNToProcess);
186 std::iota(range.begin(), range.end(), 0);
194 fTaskType = ETask::kNoTask;
203template<
class F,
class T,
class Cond>
207 using retType =
decltype(func(args.front()));
210 fTaskType = ETask::kMapWithArg;
214 unsigned oldNWorkers = GetPoolSize();
215 if (args.size() < oldNWorkers)
216 SetNWorkers(args.size());
218 bool ok = Fork(worker);
219 SetNWorkers(oldNWorkers);
222 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
223 return std::vector<retType>();
227 fNToProcess = args.size();
228 std::vector<retType> reslist;
229 reslist.reserve(fNToProcess);
230 std::vector<unsigned> range(fNToProcess);
231 std::iota(range.begin(), range.end(), 0);
239 fTaskType = ETask::kNoTask;
248template<
class F,
class INTEGER,
class Cond>
251 std::vector<INTEGER> vargs(args.size());
252 std::copy(args.begin(), args.end(), vargs.begin());
253 const auto &reslist = Map(func, vargs);
260template<
class F,
class R,
class Cond>
263 using retType =
decltype(func());
266 fTaskType= ETask::kMapRed;
269 unsigned oldNWorkers = GetPoolSize();
270 if (nTimes < oldNWorkers)
273 bool ok = Fork(worker);
274 SetNWorkers(oldNWorkers);
276 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
281 fNToProcess = nTimes;
282 std::vector<retType> reslist;
283 reslist.reserve(fNToProcess);
291 fTaskType= ETask::kNoTask;
292 return redfunc(reslist);
300template<
class F,
class T,
class R,
class Cond>
304 using retType =
decltype(func(args.front()));
307 fTaskType= ETask::kMapRedWithArg;
310 unsigned oldNWorkers = GetPoolSize();
311 if (args.size() < oldNWorkers)
312 SetNWorkers(args.size());
314 bool ok = Fork(worker);
315 SetNWorkers(oldNWorkers);
317 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
318 return decltype(func(args.front()))();
322 fNToProcess = args.size();
323 std::vector<retType> reslist;
324 reslist.reserve(fNToProcess);
325 std::vector<unsigned> range(fNToProcess);
326 std::iota(range.begin(), range.end(), 0);
333 fTaskType= ETask::kNoTask;
334 return Reduce(reslist, redfunc);
342template<
class F,
class T,
class R,
class Cond>
346 using retType =
decltype(func(args.front()));
349 fTaskType= ETask::kMapRedWithArg;
352 unsigned oldNWorkers = GetPoolSize();
353 if (args.size() < oldNWorkers)
354 SetNWorkers(args.size());
356 bool ok = Fork(worker);
357 SetNWorkers(oldNWorkers);
359 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
360 return decltype(func(args.front()))();
364 fNToProcess = args.size();
365 std::vector<retType> reslist;
366 reslist.reserve(fNToProcess);
367 std::vector<unsigned> range(fNToProcess);
368 std::iota(range.begin(), range.end(), 0);
375 fTaskType= ETask::kNoTask;
376 return Reduce(reslist, redfunc);
384 unsigned code = msg.first;
386 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
391 if(msg.second !=
nullptr)
392 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
395 const char *str = ReadBuffer<const char*>(msg.second.get());
396 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
397 "Continuing execution ignoring these entries.", str);
402 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
419 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
421 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute a function without arguments several times.
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
auto MapImpl(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute a function without arguments several times in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
~TProcessExecutor()=default
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kFuncResult
The message contains the result of a function execution.
@ kExecFuncWithArg
Execute function with the argument contained in the message.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcError
Tell the client there was an error while processing.
@ kExecFunc
Execute function without arguments.
@ kProcResult
The message contains the result of the processing of a TTree.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...