13 #ifndef ROOT_TProcessExecutor 14 #define ROOT_TProcessExecutor 31 #include <type_traits> 47 template<
class F,
class Cond = noReferenceCond<F>>
49 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
51 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
58 template<
class T,
class R>
T Reduce(
const std::vector<T> &objs,
R redfunc);
61 template<
class T>
void Collect(std::vector<T> &reslist);
74 enum class ETask : unsigned char {
91 template<
class F,
class Cond>
94 using retType = decltype(
func());
101 if (nTimes < oldNWorkers)
104 bool ok =
Fork(worker);
108 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
109 return std::vector<retType>();
114 std::vector<retType> reslist;
115 reslist.reserve(fNToProcess);
133 template<
class F,
class T,
class Cond>
137 using retType = decltype(
func(args.front()));
145 if (args.size() < oldNWorkers)
148 bool ok =
Fork(worker);
152 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
153 return std::vector<retType>();
158 std::vector<retType> reslist;
159 reslist.reserve(fNToProcess);
160 std::vector<unsigned> range(fNToProcess);
161 std::iota(range.begin(), range.end(), 0);
177 template<
class F,
class INTEGER,
class Cond>
180 std::vector<INTEGER> vargs(args.size());
181 std::copy(args.begin(), args.end(), vargs.begin());
182 const auto &reslist =
Map(
func, vargs);
189 template<
class T,
class R>
193 static_assert(std::is_same<decltype(redfunc(objs)),
T>::value,
"redfunc does not have the correct signature");
194 return redfunc(objs);
202 unsigned code = msg.first;
204 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
209 if(msg.second !=
nullptr)
210 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
213 const char *str = ReadBuffer<const char*>(msg.second.get());
214 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 215 "Continuing execution ignoring these entries.", str);
220 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
237 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
239 }
else if (msg.first < 1000)
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Namespace for new ROOT classes and functions.
This class works together with TProcessExecutor to allow the execution of functions in server process...
unsigned GetNWorkers() const
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Error while reading from the socket.
This class defines an interface to execute the same task multiple times in parallel, possibly with different arguments every time.
unsigned fNProcessed
number of arguments already passed to the workers
void Remove(TSocket *s)
Remove a certain socket from the monitor.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
a Map method with arguments is being executed
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
The message contains the result of the processing of a TTree.
void Reset()
Reset TProcessExecutor's state.
TSocket * Select()
Return pointer to socket for which an event is waiting.
The message contains the result of a function execution.
This class provides a simple interface to execute the same task multiple times in parallel...
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function with the argument contained in the message.
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
no task is being executed
Used by the client to tell servers to shutdown.
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers
A pseudo container class which is a generator of indices.
double func(double *x, double *p)
Base class for multiprocess applications' clients.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Tell the client there was an error while processing.
ETask
A collection of the types of tasks that TProcessExecutor can execute.
We are ready for the next task.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
unsigned GetNWorkers() const
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.
Execute function without arguments.
T Reduce(const std::vector< T > &objs, R redfunc)
"Reduce" an std::vector into a single object by passing a function as the second argument defining th...