13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
55 template<
class F,
class R,
class Cond = val
idMapReturnCond<F>>
57 template<
class F,
class T,
class R,
class Cond = val
idMapReturnCond<F, T>>
59 template<
class F,
class T,
class R,
class Cond = val
idMapReturnCond<F, T>>
77 template<
class F,
class Cond = val
idMapReturnCond<F>>
78 auto MapImpl(
F func,
unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
79 template<
class F,
class INTEGER,
class Cond = val
idMapReturnCond<F, INTEGER>>
81 template<
class F,
class T,
class Cond = val
idMapReturnCond<F, T>>
82 auto MapImpl(
F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
83 template<
class F,
class T,
class Cond = val
idMapReturnCond<F, T>>
84 auto MapImpl(
F func,
const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
86 template<
class T>
void Collect(std::vector<T> &reslist);
99 enum class ETask :
unsigned char {
118template<
class F,
class Cond>
121 using retType =
decltype(func());
124 fTaskType = ETask::kMap;
127 unsigned oldNWorkers = GetPoolSize();
128 if (nTimes < oldNWorkers)
131 bool ok = Fork(worker);
132 SetNWorkers(oldNWorkers);
135 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
136 return std::vector<retType>();
140 fNToProcess = nTimes;
141 std::vector<retType> reslist;
142 reslist.reserve(fNToProcess);
150 fTaskType = ETask::kNoTask;
159template<
class F,
class T,
class Cond>
163 using retType =
decltype(func(args.front()));
166 fTaskType = ETask::kMapWithArg;
170 unsigned oldNWorkers = GetPoolSize();
171 if (args.size() < oldNWorkers)
172 SetNWorkers(args.size());
174 bool ok = Fork(worker);
175 SetNWorkers(oldNWorkers);
178 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
179 return std::vector<retType>();
183 fNToProcess = args.size();
184 std::vector<retType> reslist;
185 reslist.reserve(fNToProcess);
186 std::vector<unsigned> range(fNToProcess);
187 std::iota(range.begin(), range.end(), 0);
195 fTaskType = ETask::kNoTask;
204template<
class F,
class T,
class Cond>
208 using retType =
decltype(func(args.front()));
211 fTaskType = ETask::kMapWithArg;
215 unsigned oldNWorkers = GetPoolSize();
216 if (args.size() < oldNWorkers)
217 SetNWorkers(args.size());
219 bool ok = Fork(worker);
220 SetNWorkers(oldNWorkers);
223 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
224 return std::vector<retType>();
228 fNToProcess = args.size();
229 std::vector<retType> reslist;
230 reslist.reserve(fNToProcess);
231 std::vector<unsigned> range(fNToProcess);
232 std::iota(range.begin(), range.end(), 0);
240 fTaskType = ETask::kNoTask;
249template<
class F,
class INTEGER,
class Cond>
252 std::vector<INTEGER> vargs(args.size());
253 std::copy(args.begin(), args.end(), vargs.begin());
254 const auto &reslist = Map(func, vargs);
261template<
class F,
class R,
class Cond>
264 using retType =
decltype(func());
267 fTaskType= ETask::kMapRed;
270 unsigned oldNWorkers = GetPoolSize();
271 if (nTimes < oldNWorkers)
274 bool ok = Fork(worker);
275 SetNWorkers(oldNWorkers);
277 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
282 fNToProcess = nTimes;
283 std::vector<retType> reslist;
284 reslist.reserve(fNToProcess);
292 fTaskType= ETask::kNoTask;
293 return redfunc(reslist);
301template<
class F,
class T,
class R,
class Cond>
305 using retType =
decltype(func(args.front()));
308 fTaskType= ETask::kMapRedWithArg;
311 unsigned oldNWorkers = GetPoolSize();
312 if (args.size() < oldNWorkers)
313 SetNWorkers(args.size());
315 bool ok = Fork(worker);
316 SetNWorkers(oldNWorkers);
318 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
319 return decltype(func(args.front()))();
323 fNToProcess = args.size();
324 std::vector<retType> reslist;
325 reslist.reserve(fNToProcess);
326 std::vector<unsigned> range(fNToProcess);
327 std::iota(range.begin(), range.end(), 0);
334 fTaskType= ETask::kNoTask;
335 return Reduce(reslist, redfunc);
343template<
class F,
class T,
class R,
class Cond>
347 using retType =
decltype(func(args.front()));
350 fTaskType= ETask::kMapRedWithArg;
353 unsigned oldNWorkers = GetPoolSize();
354 if (args.size() < oldNWorkers)
355 SetNWorkers(args.size());
357 bool ok = Fork(worker);
358 SetNWorkers(oldNWorkers);
360 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
361 return decltype(func(args.front()))();
365 fNToProcess = args.size();
366 std::vector<retType> reslist;
367 reslist.reserve(fNToProcess);
368 std::vector<unsigned> range(fNToProcess);
369 std::iota(range.begin(), range.end(), 0);
376 fTaskType= ETask::kNoTask;
377 return Reduce(reslist, redfunc);
385 unsigned code = msg.first;
387 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
392 if(msg.second !=
nullptr)
393 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
396 const char *str = ReadBuffer<const char*>(msg.second.get());
397 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
398 "Continuing execution ignoring these entries.", str);
403 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
420 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
422 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
auto Map(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times.
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
unsigned fNToProcess
total number of arguments to pass to the workers
auto MapImpl(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times in parallel.
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
~TProcessExecutor()=default
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kFuncResult
The message contains the result of a function execution.
@ kExecFuncWithArg
Execute function with the argument contained in the message.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcError
Tell the client there was an error while processing.
@ kExecFunc
Execute function without arguments.
@ kProcResult
The message contains the result of the processing of a TTree.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...