13#ifndef ROOT_TProcessExecutor 
   14#define ROOT_TProcessExecutor 
   55   template<
class F, 
class R, 
class Cond = val
idMapReturnCond<F>>
 
   57   template<
class F, 
class T, 
class R, 
class Cond = val
idMapReturnCond<F, T>>
 
   59   template<
class F, 
class T, 
class R, 
class Cond = val
idMapReturnCond<F, T>>
 
   77   template<
class F, 
class Cond = val
idMapReturnCond<F>>
 
   78   auto MapImpl(F func, 
unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
 
   79   template<
class F, 
class INTEGER, 
class Cond = val
idMapReturnCond<F, INTEGER>>
 
   81   template<
class F, 
class T, 
class Cond = val
idMapReturnCond<F, T>>
 
   82   auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
 
   83   template<
class F, 
class T, 
class Cond = val
idMapReturnCond<F, T>>
 
   84   auto MapImpl(F func, 
const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
 
   99   enum class ETask : 
unsigned char {
 
 
 
  118template<
class F, 
class Cond>
 
  121   using retType = 
decltype(func());
 
  124   fTaskType = ETask::kMap;
 
  135      Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
 
  136      return std::vector<retType>();
 
  150   fTaskType = ETask::kNoTask;
 
 
  159template<
class F, 
class T, 
class Cond>
 
  163   using retType = 
decltype(func(args.front()));
 
  166   fTaskType = ETask::kMapWithArg;
 
  172      SetNWorkers(args.size());
 
  178      Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
 
  179      return std::vector<retType>();
 
  183   fNToProcess = args.size();
 
  186   std::vector<unsigned> 
range(fNToProcess);
 
  195   fTaskType = ETask::kNoTask;
 
 
  204template<
class F, 
class T, 
class Cond>
 
  208   using retType = 
decltype(func(args.front()));
 
  211   fTaskType = ETask::kMapWithArg;
 
  217      SetNWorkers(args.size());
 
  223      Error(
"TProcessExecutor::Map", 
"[E][C] Could not fork. Aborting operation.");
 
  224      return std::vector<retType>();
 
  228   fNToProcess = args.size();
 
  231   std::vector<unsigned> 
range(fNToProcess);
 
  240   fTaskType = ETask::kNoTask;
 
 
  249template<
class F, 
class INTEGER, 
class Cond>
 
  252   std::vector<INTEGER> 
vargs(args.size());
 
  253   std::copy(args.begin(), args.end(), 
vargs.
begin());
 
 
  261template<
class F, 
class R, 
class Cond>
 
  264   using retType = 
decltype(func());
 
  267   fTaskType= ETask::kMapRed;
 
  277      std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
 
  292   fTaskType= ETask::kNoTask;
 
 
  301template<
class F, 
class T, 
class R, 
class Cond>
 
  305   using retType = 
decltype(func(args.front()));
 
  308   fTaskType= ETask::kMapRedWithArg;
 
  313      SetNWorkers(args.size());
 
  318      std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
 
  319      return decltype(func(args.front()))();
 
  323   fNToProcess = args.size();
 
  326   std::vector<unsigned> 
range(fNToProcess);
 
  334   fTaskType= ETask::kNoTask;
 
 
  343template<
class F, 
class T, 
class R, 
class Cond>
 
  347   using retType = 
decltype(func(args.front()));
 
  350   fTaskType= ETask::kMapRedWithArg;
 
  355      SetNWorkers(args.size());
 
  360      std::cerr << 
"[E][C] Could not fork. Aborting operation\n";
 
  361      return decltype(func(args.front()))();
 
  365   fNToProcess = args.size();
 
  368   std::vector<unsigned> 
range(fNToProcess);
 
  376   fTaskType= ETask::kNoTask;
 
 
  385   unsigned code = 
msg.first;
 
  392      if(
msg.second != 
nullptr)
 
  397      Error(
"TProcessExecutor::HandlePoolCode", 
"[E][C] a worker encountered an error: %s\n" 
  398                                         "Continuing execution ignoring these entries.", str);
 
  403      Error(
"TProcessExecutor::HandlePoolCode", 
"[W][C] unknown code received from server. code=%d", code);
 
 
  416   while (
mon.GetActive() > 0) {
 
  420         Error(
"TProcessExecutor::Collect", 
"[E][C] Lost connection to a worker");
 
  422      } 
else if (
msg.first < 1000)
 
 
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
 
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
 
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
 
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
 
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
 
const_iterator begin() const
 
const_iterator end() const
 
This class defines an interface to execute the same task multiple times, possibly in parallel and wit...
 
auto Map(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times.
 
T * Reduce(const std::vector< T * > &mergeObjs)
"Reduce" an std::vector into a single object by using the object's Merge method.
 
This class provides a simple interface to execute the same task multiple times in parallel,...
 
ETask fTaskType
the kind of task that is being executed, if any
 
unsigned GetPoolSize() const
Return the number of pooled parallel workers.
 
ETask
A collection of the types of tasks that TProcessExecutor can execute.
 
@ kNoTask
no task is being executed
 
@ kMapWithArg
a Map method with arguments is being executed
 
@ kMapRed
a MapReduce method with no arguments is being executed
 
@ kMapRedWithArg
a MapReduce method with arguments is being executed
 
@ kMap
a Map method with no arguments is being executed
 
TProcessExecutor & operator=(const TProcessExecutor &)=delete
 
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
 
unsigned fNProcessed
number of arguments already passed to the workers
 
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
 
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
 
TProcessExecutor(const TProcessExecutor &)=delete
 
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
 
unsigned fNToProcess
total number of arguments to pass to the workers
 
auto MapImpl(F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > >
Execute a function without arguments several times in parallel.
 
void Reset()
Reset TProcessExecutor's state.
 
void SetNWorkers(unsigned n)
 
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
 
~TProcessExecutor()=default
 
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
 
A pseudo container class which is a generator of indices.
 
Base class for multiprocess applications' clients.
 
unsigned GetNWorkers() const
 
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
 
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
 
void Remove(TSocket *s)
Remove a certain socket from the monitor.
 
@ kRecvError
Error while reading from the socket.
 
@ kIdling
We are ready for the next task.
 
@ kFuncResult
The message contains the result of a function execution.
 
@ kExecFuncWithArg
Execute function with the argument contained in the message.
 
@ kShutdownOrder
Used by the client to tell servers to shutdown.
 
@ kProcError
Tell the client there was an error while processing.
 
@ kExecFunc
Execute function without arguments.
 
@ kProcResult
The message contains the result of the processing of a TTree.
 
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...