13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
40 template <
typename F,
typename... Args>
58 template<
class F,
class R,
class Cond = noReferenceCond<F>>
60 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
62 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
80 template<
class F,
class Cond = noReferenceCond<F>>
81 auto MapImpl(
F func,
unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
82 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
84 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
85 auto MapImpl(
F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
86 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
87 auto MapImpl(
F func,
const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
89 template<
class T>
void Collect(std::vector<T> &reslist);
121template<
class F,
class Cond>
124 using retType =
decltype(func());
127 fTaskType = ETask::kMap;
130 unsigned oldNWorkers = GetPoolSize();
131 if (nTimes < oldNWorkers)
134 bool ok = Fork(worker);
135 SetNWorkers(oldNWorkers);
138 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
139 return std::vector<retType>();
143 fNToProcess = nTimes;
144 std::vector<retType> reslist;
145 reslist.reserve(fNToProcess);
153 fTaskType = ETask::kNoTask;
162template<
class F,
class T,
class Cond>
166 using retType =
decltype(func(args.front()));
169 fTaskType = ETask::kMapWithArg;
173 unsigned oldNWorkers = GetPoolSize();
174 if (args.size() < oldNWorkers)
175 SetNWorkers(args.size());
177 bool ok = Fork(worker);
178 SetNWorkers(oldNWorkers);
181 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
182 return std::vector<retType>();
186 fNToProcess = args.size();
187 std::vector<retType> reslist;
188 reslist.reserve(fNToProcess);
189 std::vector<unsigned> range(fNToProcess);
190 std::iota(range.begin(), range.end(), 0);
198 fTaskType = ETask::kNoTask;
207template<
class F,
class T,
class Cond>
211 using retType =
decltype(func(args.front()));
214 fTaskType = ETask::kMapWithArg;
218 unsigned oldNWorkers = GetPoolSize();
219 if (args.size() < oldNWorkers)
220 SetNWorkers(args.size());
222 bool ok = Fork(worker);
223 SetNWorkers(oldNWorkers);
226 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
227 return std::vector<retType>();
231 fNToProcess = args.size();
232 std::vector<retType> reslist;
233 reslist.reserve(fNToProcess);
234 std::vector<unsigned> range(fNToProcess);
235 std::iota(range.begin(), range.end(), 0);
243 fTaskType = ETask::kNoTask;
252template<
class F,
class INTEGER,
class Cond>
255 std::vector<INTEGER> vargs(args.size());
256 std::copy(args.begin(), args.end(), vargs.begin());
257 const auto &reslist = Map(func, vargs);
264template<
class F,
class R,
class Cond>
267 using retType =
decltype(func());
270 fTaskType= ETask::kMapRed;
273 unsigned oldNWorkers = GetPoolSize();
274 if (nTimes < oldNWorkers)
277 bool ok = Fork(worker);
278 SetNWorkers(oldNWorkers);
280 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
285 fNToProcess = nTimes;
286 std::vector<retType> reslist;
287 reslist.reserve(fNToProcess);
295 fTaskType= ETask::kNoTask;
296 return redfunc(reslist);
304template<
class F,
class T,
class R,
class Cond>
308 using retType =
decltype(func(args.front()));
311 fTaskType= ETask::kMapRedWithArg;
314 unsigned oldNWorkers = GetPoolSize();
315 if (args.size() < oldNWorkers)
316 SetNWorkers(args.size());
318 bool ok = Fork(worker);
319 SetNWorkers(oldNWorkers);
321 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
322 return decltype(func(args.front()))();
326 fNToProcess = args.size();
327 std::vector<retType> reslist;
328 reslist.reserve(fNToProcess);
329 std::vector<unsigned> range(fNToProcess);
330 std::iota(range.begin(), range.end(), 0);
337 fTaskType= ETask::kNoTask;
338 return Reduce(reslist, redfunc);
346template<
class F,
class T,
class R,
class Cond>
350 using retType =
decltype(func(args.front()));
353 fTaskType= ETask::kMapRedWithArg;
356 unsigned oldNWorkers = GetPoolSize();
357 if (args.size() < oldNWorkers)
358 SetNWorkers(args.size());
360 bool ok = Fork(worker);
361 SetNWorkers(oldNWorkers);
363 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
364 return decltype(func(args.front()))();
368 fNToProcess = args.size();
369 std::vector<retType> reslist;
370 reslist.reserve(fNToProcess);
371 std::vector<unsigned> range(fNToProcess);
372 std::iota(range.begin(), range.end(), 0);
379 fTaskType= ETask::kNoTask;
380 return Reduce(reslist, redfunc);
388 unsigned code = msg.first;
390 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
395 if(msg.second !=
nullptr)
396 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
399 const char *str = ReadBuffer<const char*>(msg.second.get());
400 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
401 "Continuing execution ignoring these entries.", str);
406 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
423 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
425 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
auto Map(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times.
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
ROOT::TypeTraits::InvokeResult_t< F, Args... > InvokeResult_t
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
unsigned fNToProcess
total number of arguments to pass to the workers
auto MapImpl(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times in parallel.
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
~TProcessExecutor()=default
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kFuncResult
The message contains the result of a function execution.
@ kExecFuncWithArg
Execute function with the argument contained in the message.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcError
Tell the client there was an error while processing.
@ kExecFunc
Execute function without arguments.
@ kProcResult
The message contains the result of the processing of a TTree.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.