13 #ifndef ROOT_TProcessExecutor 14 #define ROOT_TProcessExecutor 31 #include <type_traits> 47 template<
class F,
class Cond = noReferenceCond<F>>
49 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
51 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
58 template<
class F,
class R,
class Cond = noReferenceCond<F>>
60 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
64 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
67 template<
class T>
void Collect(std::vector<T> &reslist);
80 enum class ETask : unsigned char {
99 template<
class F,
class Cond>
102 using retType = decltype(func());
109 if (nTimes < oldNWorkers)
112 bool ok =
Fork(worker);
116 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
117 return std::vector<retType>();
122 std::vector<retType> reslist;
123 reslist.reserve(fNToProcess);
141 template<
class F,
class T,
class Cond>
145 using retType = decltype(func(args.front()));
153 if (args.size() < oldNWorkers)
156 bool ok =
Fork(worker);
160 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
161 return std::vector<retType>();
166 std::vector<retType> reslist;
167 reslist.reserve(fNToProcess);
168 std::vector<unsigned> range(fNToProcess);
169 std::iota(range.begin(), range.end(), 0);
185 template<
class F,
class INTEGER,
class Cond>
188 std::vector<INTEGER> vargs(args.size());
189 std::copy(args.begin(), args.end(), vargs.begin());
190 const auto &reslist =
Map(func, vargs);
200 template<
class F,
class R,
class Cond>
203 using retType = decltype(func());
210 if (nTimes < oldNWorkers)
213 bool ok =
Fork(worker);
216 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
222 std::vector<retType> reslist;
232 return redfunc(reslist);
241 template<
class F,
class T,
class R,
class Cond>
245 using retType = decltype(func(args.front()));
252 if (args.size() < oldNWorkers)
255 bool ok =
Fork(worker);
258 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
259 return decltype(func(args.front()))();
264 std::vector<retType> reslist;
267 std::iota(range.begin(), range.end(), 0);
275 return Reduce(reslist, redfunc);
281 template<
class T,
class R>
285 static_assert(std::is_same<decltype(redfunc(objs)),
T>::value,
"redfunc does not have the correct signature");
286 return redfunc(objs);
294 unsigned code = msg.first;
296 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
301 if(msg.second !=
nullptr)
302 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
305 const char *str = ReadBuffer<const char*>(msg.second.get());
306 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 307 "Continuing execution ignoring these entries.", str);
312 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
329 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
331 }
else if (msg.first < 1000)
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Namespace for new ROOT classes and functions.
This class works together with TProcessExecutor to allow the execution of functions in server process...
unsigned GetNWorkers() const
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
unsigned fNProcessed
number of arguments already passed to the workers
void Remove(TSocket *s)
Remove a certain socket from the monitor.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
void Reset()
Reset TProcessExecutor's state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
a MapReduce method with arguments is being executed
The message contains the result of a function execution.
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function with the argument contained in the message.
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
a MapReduce method with no arguments is being executed
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Tell the client there was an error while processing.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...