13#ifndef ROOT_TProcessExecutor
14#define ROOT_TProcessExecutor
45 template<
class F,
class Cond = noReferenceCond<F>>
46 auto Map(
F func,
unsigned nTimes) -> std::vector<
typename std::result_of<
F()>
::type>;
47 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
49 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
50 auto Map(
F func, std::vector<T> &args) -> std::vector<
typename std::result_of<
F(
T)>
::type>;
56 template<
class F,
class R,
class Cond = noReferenceCond<F>>
57 auto MapReduce(
F func,
unsigned nTimes,
R redfunc) ->
typename std::result_of<
F()>
::type;
58 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
59 auto MapReduce(
F func, std::vector<T> &args,
R redfunc) ->
typename std::result_of<
F(
T)>
::type;
62 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
65 template<
class T>
void Collect(std::vector<T> &reslist);
78 enum class ETask : unsigned char {
97template<
class F,
class Cond>
100 using retType =
decltype(func());
106 unsigned oldNWorkers = GetNWorkers();
107 if (nTimes < oldNWorkers)
110 bool ok = Fork(worker);
111 SetNWorkers(oldNWorkers);
114 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
115 return std::vector<retType>();
119 fNToProcess = nTimes;
120 std::vector<retType> reslist;
121 reslist.reserve(fNToProcess);
129 fTaskType = ETask::kNoTask;
139template<
class F,
class T,
class Cond>
143 using retType =
decltype(func(args.front()));
146 fTaskType = ETask::kMapWithArg;
150 unsigned oldNWorkers = GetNWorkers();
151 if (args.size() < oldNWorkers)
152 SetNWorkers(args.size());
154 bool ok = Fork(worker);
155 SetNWorkers(oldNWorkers);
158 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
159 return std::vector<retType>();
163 fNToProcess = args.size();
164 std::vector<retType> reslist;
165 reslist.reserve(fNToProcess);
166 std::vector<unsigned> range(fNToProcess);
167 std::iota(range.begin(), range.end(), 0);
175 fTaskType = ETask::kNoTask;
183template<
class F,
class INTEGER,
class Cond>
186 std::vector<INTEGER> vargs(args.size());
187 std::copy(args.begin(), args.end(), vargs.begin());
188 const auto &reslist =
Map(func, vargs);
198template<
class F,
class R,
class Cond>
201 using retType =
decltype(func());
204 fTaskType= ETask::kMapRed;
207 unsigned oldNWorkers = GetNWorkers();
208 if (nTimes < oldNWorkers)
211 bool ok = Fork(worker);
212 SetNWorkers(oldNWorkers);
214 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
219 fNToProcess = nTimes;
220 std::vector<retType> reslist;
221 reslist.reserve(fNToProcess);
229 fTaskType= ETask::kNoTask;
230 return redfunc(reslist);
239template<
class F,
class T,
class R,
class Cond>
243 using retType =
decltype(func(args.front()));
246 fTaskType= ETask::kMapRedWithArg;
249 unsigned oldNWorkers = GetNWorkers();
250 if (args.size() < oldNWorkers)
251 SetNWorkers(args.size());
253 bool ok = Fork(worker);
254 SetNWorkers(oldNWorkers);
256 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
257 return decltype(func(args.front()))();
261 fNToProcess = args.size();
262 std::vector<retType> reslist;
263 reslist.reserve(fNToProcess);
264 std::vector<unsigned> range(fNToProcess);
265 std::iota(range.begin(), range.end(), 0);
272 fTaskType= ETask::kNoTask;
273 return Reduce(reslist, redfunc);
279template<
class T,
class R>
283 static_assert(std::is_same<
decltype(redfunc(objs)),
T>::value,
"redfunc does not have the correct signature");
284 return redfunc(objs);
292 unsigned code = msg.first;
294 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
299 if(msg.second !=
nullptr)
300 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
303 const char *str = ReadBuffer<const char*>(msg.second.get());
304 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
305 "Continuing execution ignoring these entries.", str);
310 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
327 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
329 }
else if (msg.first < 1000)
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
#define R(a, b, c, d, e, f, g, h, i)
void Error(const char *location, const char *msgfmt,...)
This class defines an interface to execute the same task multiple times in parallel,...
This class provides a simple interface to execute the same task multiple times in parallel,...
ETask fTaskType
the kind of task that is being executed, if any
ETask
A collection of the types of tasks that TProcessExecutor can execute.
@ kNoTask
no task is being executed
@ kMapWithArg
a Map method with arguments is being executed
@ kMapRed
a MapReduce method with no arguments is being executed
@ kMapRedWithArg
a MapReduce method with arguments is being executed
@ kMap
a Map method with no arguments is being executed
TProcessExecutor & operator=(const TProcessExecutor &)=delete
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned fNProcessed
number of arguments already passed to the workers
unsigned GetNWorkers() const
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor(const TProcessExecutor &)=delete
unsigned fNToProcess
total number of arguments to pass to the workers
void Reset()
Reset TProcessExecutor's state.
void SetNWorkers(unsigned n)
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
~TProcessExecutor()=default
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
unsigned GetNWorkers() const
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Remove(TSocket *s)
Remove a certain socket from the monitor.
This class works together with TProcessExecutor to allow the execution of functions in server process...
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
@ kRecvError
Error while reading from the socket.
@ kIdling
We are ready for the next task.
@ kFuncResult
The message contains the result of a function execution.
@ kExecFuncWithArg
Execute function with the argument contained in the message.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kProcError
Tell the client there was an error while processing.
@ kExecFunc
Execute function without arguments.
@ kProcResult
The message contains the result of the processing of a TTree.
auto Map(const RVec< T > &v, F &&f) -> RVec< decltype(f(v[0]))>
Create new collection applying a callable to the elements of the input collection.
Namespace for new ROOT classes and functions.
static constexpr double s