13 #ifndef ROOT_TProcessExecutor 14 #define ROOT_TProcessExecutor 29 #include <type_traits> 45 template<
class F,
class Cond = noReferenceCond<F>>
47 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
49 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
56 template<
class F,
class R,
class Cond = noReferenceCond<F>>
58 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
62 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
65 template<
class T>
void Collect(std::vector<T> &reslist);
78 enum class ETask : unsigned char {
97 template<
class F,
class Cond>
100 using retType = decltype(func());
107 if (nTimes < oldNWorkers)
110 bool ok =
Fork(worker);
114 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
115 return std::vector<retType>();
120 std::vector<retType> reslist;
121 reslist.reserve(fNToProcess);
139 template<
class F,
class T,
class Cond>
143 using retType = decltype(func(args.front()));
151 if (args.size() < oldNWorkers)
154 bool ok =
Fork(worker);
158 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
159 return std::vector<retType>();
164 std::vector<retType> reslist;
165 reslist.reserve(fNToProcess);
166 std::vector<unsigned> range(fNToProcess);
167 std::iota(range.begin(), range.end(), 0);
183 template<
class F,
class INTEGER,
class Cond>
186 std::vector<INTEGER> vargs(args.size());
187 std::copy(args.begin(), args.end(), vargs.begin());
188 const auto &reslist =
Map(func, vargs);
198 template<
class F,
class R,
class Cond>
201 using retType = decltype(func());
208 if (nTimes < oldNWorkers)
211 bool ok =
Fork(worker);
214 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
220 std::vector<retType> reslist;
230 return redfunc(reslist);
239 template<
class F,
class T,
class R,
class Cond>
243 using retType = decltype(func(args.front()));
250 if (args.size() < oldNWorkers)
253 bool ok =
Fork(worker);
256 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
257 return decltype(func(args.front()))();
262 std::vector<retType> reslist;
265 std::iota(range.begin(), range.end(), 0);
273 return Reduce(reslist, redfunc);
279 template<
class T,
class R>
283 static_assert(std::is_same<decltype(redfunc(objs)),
T>::value,
"redfunc does not have the correct signature");
284 return redfunc(objs);
292 unsigned code = msg.first;
294 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
299 if(msg.second !=
nullptr)
300 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
303 const char *str = ReadBuffer<const char*>(msg.second.get());
304 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 305 "Continuing execution ignoring these entries.", str);
310 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
327 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
329 }
else if (msg.first < 1000)
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Namespace for new ROOT classes and functions.
This class works together with TProcessExecutor to allow the execution of functions in server process...
unsigned GetNWorkers() const
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
unsigned fNProcessed
number of arguments already passed to the workers
#define R(a, b, c, d, e, f, g, h, i)
void Remove(TSocket *s)
Remove a certain socket from the monitor.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
void Reset()
Reset TProcessExecutor's state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
a MapReduce method with arguments is being executed
The message contains the result of a function execution.
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function with the argument contained in the message.
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
Base class for multiprocess applications' clients.
static constexpr double s
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
a MapReduce method with no arguments is being executed
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of< F()>::type
This method behaves just like Map, but an additional redfunc function must be provided.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Tell the client there was an error while processing.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...