This class provides a simple interface to execute the same task multiple times in parallel, possibly with different arguments every time.
This class inherits its interfaces from ROOT::TExecutor
. The two possible usages of the Map method are:
For either signature, func is executed as many times as needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor or set via SetNWorkers. It typically defaults to the number of cores.
A collection containing the result of each execution is returned.
Note: the user is responsible for the deletion of any object that might be created upon execution of func, returned objects included: ROOT::TProcessExecutor never deletes what it returns, it simply forgets it.
Note: that the usage of ROOT::TProcessExecutor::Map is indicated only when the task to be executed takes more than a few seconds, otherwise the overhead introduced by Map will outrun the benefits of parallel execution on most machines.
func | a callable object, such as a lambda expression, an std::function, a functor object or a function that takes zero arguments (for the first signature) or one (for the second signature). |
args | a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature. An integer only for the first. |
Note: in cases where the function to be executed takes more than zero/one argument but all are fixed except zero/one, the function can be wrapped in a lambda or via std::bind to give it the right signature.
Note: the user should take care of initializing random seeds differently in each process (e.g. using the process id in the seed). Otherwise several parallel executions might generate the same sequence of pseudo-random numbers.
An std::vector. The elements in the container will be the objects returned by func.
This set of methods behaves exactly like Map, but takes an additional function as a third argument. This function is applied to the set of objects returned by the corresponding Map execution to "squash" them into a single object.
Definition at line 37 of file TProcessExecutor.hxx.
Public Member Functions | |
TProcessExecutor (const TProcessExecutor &)=delete | |
TProcessExecutor (unsigned nWorkers=0) | |
Class constructor. | |
~TProcessExecutor ()=default | |
unsigned | GetPoolSize () const |
Return the number of pooled parallel workers. | |
template<class F , class T , class R , class Cond = validMapReturnCond<F, T>> | |
auto | MapReduce (F func, const std::vector< T > &args, R redfunc) -> InvokeResult_t< F, T > |
Execute a function in parallel over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce). | |
template<class F , class T , class R , class Cond = validMapReturnCond<F, T>> | |
auto | MapReduce (F func, std::vector< T > &args, R redfunc) -> InvokeResult_t< F, T > |
Execute a function in parallel over the elements of a vector (Map) and accumulate the results into a single value (Reduce). | |
template<class F , class R , class Cond = validMapReturnCond<F>> | |
auto | MapReduce (F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F > |
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce). | |
TProcessExecutor & | operator= (const TProcessExecutor &)=delete |
void | SetNWorkers (unsigned n) |
Public Member Functions inherited from ROOT::TExecutorCRTP< TProcessExecutor > | |
TExecutorCRTP ()=default | |
TExecutorCRTP (const TExecutorCRTP &)=delete | |
auto | Map (F func, const std::vector< T > &args) -> std::vector< InvokeResult_t< F, T > > |
Execute a function over the elements of an immutable vector. | |
auto | Map (F func, ROOT::TSeq< INTEGER > args) -> std::vector< InvokeResult_t< F, INTEGER > > |
Execute a function over a sequence of indexes. | |
auto | Map (F func, std::initializer_list< T > args) -> std::vector< InvokeResult_t< F, T > > |
Execute a function over the elements of an initializer_list. | |
auto | Map (F func, std::vector< T > &args) -> std::vector< InvokeResult_t< F, T > > |
Execute a function over the elements of a vector. | |
auto | Map (F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > > |
Execute a function without arguments several times. | |
T * | MapReduce (F func, const std::vector< T * > &args) |
Execute a function over the TObject-inheriting elements of an immutable vector (Map) and merge the objects into a single one (Reduce). | |
auto | MapReduce (F func, const std::vector< T > &args, R redfunc) -> InvokeResult_t< F, T > |
Execute a function over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce). | |
auto | MapReduce (F func, ROOT::TSeq< INTEGER > args, R redfunc) -> InvokeResult_t< F, INTEGER > |
Execute a function over a sequence of indexes (Map) and accumulate the results into a single value (Reduce). | |
auto | MapReduce (F func, std::initializer_list< T > args, R redfunc) -> InvokeResult_t< F, T > |
Execute a function over the elements of an initializer_list (Map) and accumulate the results into a single value (Reduce). | |
T * | MapReduce (F func, std::vector< T * > &args) |
Execute a function over the TObject-inheriting elements of a vector (Map) and merge the objects into a single one (Reduce). | |
auto | MapReduce (F func, std::vector< T > &args, R redfunc) -> InvokeResult_t< F, T > |
Execute a function over the elements of a vector (Map) and accumulate the results into a single value (Reduce). | |
auto | MapReduce (F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F > |
Execute a function without arguments several times (Map) and accumulate the results into a single value (Reduce). | |
TExecutorCRTP & | operator= (const TExecutorCRTP &)=delete |
T * | Reduce (const std::vector< T * > &mergeObjs) |
"Reduce" an std::vector into a single object by using the object's Merge method. | |
auto | Reduce (const std::vector< T > &objs, R redfunc) -> decltype(redfunc(objs)) |
"Reduce" an std::vector into a single object by passing a function as the second argument defining the reduction operation. | |
Private Types | |
enum class | ETask : unsigned char { kNoTask , kMap , kMapWithArg , kMapRed , kMapRedWithArg } |
A collection of the types of tasks that TProcessExecutor can execute. More... | |
Private Member Functions | |
template<class T > | |
void | Collect (std::vector< T > &reslist) |
Listen for messages sent by the workers and call the appropriate handler function. | |
template<class T > | |
void | HandlePoolCode (MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist) |
Handle message and reply to the worker. | |
template<class F , class T , class Cond = validMapReturnCond<F, T>> | |
auto | MapImpl (F func, const std::vector< T > &args) -> std::vector< InvokeResult_t< F, T > > |
Execute a function over the elements of an immutable vector in parallel Implementation of the Map method. | |
template<class F , class INTEGER , class Cond = validMapReturnCond<F, INTEGER>> | |
auto | MapImpl (F func, ROOT::TSeq< INTEGER > args) -> std::vector< InvokeResult_t< F, INTEGER > > |
Execute a function over a sequence of indexes in parallel. | |
template<class F , class T , class Cond = validMapReturnCond<F, T>> | |
auto | MapImpl (F func, std::vector< T > &args) -> std::vector< InvokeResult_t< F, T > > |
Execute a function over the elements of a vector in parallel Implementation of the Map method. | |
template<class F , class Cond = validMapReturnCond<F>> | |
auto | MapImpl (F func, unsigned nTimes) -> std::vector< InvokeResult_t< F > > |
Execute a function without arguments several times in parallel. | |
void | ReplyToFuncResult (TSocket *s) |
Reply to a worker who just sent a result. | |
void | ReplyToIdle (TSocket *s) |
Reply to a worker who is idle. | |
void | Reset () |
Reset TProcessExecutor's state. | |
Private Member Functions inherited from TMPClient | |
TMPClient (const TMPClient &)=delete | |
TMPClient (unsigned nWorkers=0) | |
Class constructor. | |
~TMPClient () | |
Class destructor. | |
template<class T > | |
unsigned | Broadcast (unsigned code, const std::vector< T > &objs) |
Send a message with a different object to each server. | |
template<class T > | |
unsigned | Broadcast (unsigned code, std::initializer_list< T > &objs) |
Send a message with a different object to each server. | |
template<class T > | |
unsigned | Broadcast (unsigned code, T obj, unsigned nMessages=0) |
Send a message containing code and obj to each worker, up to a maximum number of nMessages workers. | |
unsigned | Broadcast (unsigned code, unsigned nMessages=0) |
Send a message with the specified code to at most nMessages workers. | |
void | DeActivate (TSocket *s) |
DeActivate a certain socket. | |
bool | Fork (TMPWorker &server) |
This method forks the ROOT session into fNWorkers children processes. | |
bool | GetIsParent () const |
TMonitor & | GetMonitor () |
unsigned | GetNWorkers () const |
void | HandleMPCode (MPCodeBufPair &msg, TSocket *sender) |
Handle messages containing an EMPCode. | |
TMPClient & | operator= (const TMPClient &)=delete |
void | ReapWorkers () |
Wait on worker processes and remove their pids from fWorkerPids. | |
void | Remove (TSocket *s) |
Remove a certain socket from the monitor. | |
void | SetNWorkers (unsigned n) |
Set the number of workers that will be spawned by the next call to Fork() | |
Private Attributes | |
unsigned | fNProcessed |
number of arguments already passed to the workers | |
unsigned | fNToProcess |
total number of arguments to pass to the workers | |
ETask | fTaskType = ETask::kNoTask |
the kind of task that is being executed, if any | |
friend | TExecutorCRTP |
Additional Inherited Members | |
Protected Types inherited from ROOT::TExecutorCRTP< TProcessExecutor > | |
using | InvokeResult_t = ROOT::TypeTraits::InvokeResult_t< F, Args... > |
using | validMapReturnCond = std::enable_if_t<!std::is_reference< InvokeResult_t< F, T... > >::value &&!std::is_void< InvokeResult_t< F, T... > >::value > |
type definition used in templated functions for not allowing mapping functions that return references or void. | |
#include <ROOT/TProcessExecutor.hxx>
|
strongprivate |
A collection of the types of tasks that TProcessExecutor can execute.
It is used to interpret in the right way and properly reply to the messages received (see, for example, TProcessExecutor::HandleInput)
Definition at line 99 of file TProcessExecutor.hxx.
|
explicit |
Class constructor.
nWorkers | Number of times this ROOT session will be forked, i.e. the number of workers that will be spawned. |
Definition at line 90 of file TProcessExecutor.cxx.
|
default |
|
delete |
|
private |
Listen for messages sent by the workers and call the appropriate handler function.
TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and TMPClient::HandleMPCode is called on messages with a code >= 1000.
Definition at line 412 of file TProcessExecutor.hxx.
|
inline |
Return the number of pooled parallel workers.
Definition at line 72 of file TProcessExecutor.hxx.
|
private |
Handle message and reply to the worker.
Definition at line 383 of file TProcessExecutor.hxx.
|
private |
Execute a function over the elements of an immutable vector in parallel Implementation of the Map method.
func | Function to be executed on the elements of the vector passed as second parameter. |
args | Vector of elements passed as an argument to func . |
Definition at line 205 of file TProcessExecutor.hxx.
|
private |
Execute a function over a sequence of indexes in parallel.
Implementation of the Map method.
func | Function to be executed. Must take an element of the sequence passed assecond argument as a parameter. |
args | Sequence of indexes to execute func on. |
Definition at line 250 of file TProcessExecutor.hxx.
|
private |
Execute a function over the elements of a vector in parallel Implementation of the Map method.
func | Function to be executed on the elements of the vector passed as second parameter. |
args | Vector of elements passed as an argument to func . |
Definition at line 160 of file TProcessExecutor.hxx.
|
private |
Execute a function without arguments several times in parallel.
Implementation of the Map method.
func | Function to be executed. |
nTimes | Number of times function should be called. |
Definition at line 119 of file TProcessExecutor.hxx.
auto ROOT::TProcessExecutor::MapReduce | ( | F | func, |
const std::vector< T > & | args, | ||
R | redfunc | ||
) | -> InvokeResult_t<F, T> |
Execute a function in parallel over the elements of an immutable vector (Map) and accumulate the results into a single value (Reduce).
Benefits from partial reduction into nChunks
intermediate results.
Benefits from partial reduction into nChunks
intermediate results if the execution policy is multithreaded. Otherwise, it ignores the nChunks argument and performs a normal MapReduce operation.
func | Function to be executed. Must take an element of the sequence passed assecond argument as a parameter. |
args | Immutable vector, whose elements are passed as an argument to func . |
redfunc | Reduction function to combine the results of the calls to func into partial results, and these into a final result. Must return the same type as func and should be callable with const std::vector<T> where T is the output of func . |
nChunks | Number of chunks to split the input data for processing. |
Definition at line 344 of file TProcessExecutor.hxx.
auto ROOT::TProcessExecutor::MapReduce | ( | F | func, |
std::vector< T > & | args, | ||
R | redfunc | ||
) | -> InvokeResult_t<F, T> |
Execute a function in parallel over the elements of a vector (Map) and accumulate the results into a single value (Reduce).
Benefits from partial reduction into nChunks
intermediate results.
Benefits from partial reduction into nChunks
intermediate results if the execution policy is multithreaded. Otherwise, it ignores the nChunks argument and performs a normal MapReduce operation.
func | Function to be executed. Must take an element of the sequence passed assecond argument as a parameter. |
args | Vector of elements passed as an argument to func . |
redfunc | Reduction function to combine the results of the calls to func into partial results, and these into a final result. Must return the same type as func and should be callable with const std::vector<T> where T is the output of func . |
nChunks | Number of chunks to split the input data for processing. |
Definition at line 302 of file TProcessExecutor.hxx.
auto ROOT::TProcessExecutor::MapReduce | ( | F | func, |
unsigned | nTimes, | ||
R | redfunc | ||
) | -> InvokeResult_t<F> |
Execute a function nTimes
in parallel (Map) and accumulate the results into a single value (Reduce).
func | Function to be executed. |
nTimes | Number of times function should be called. |
redfunc | Reduction function to combine the results of the calls to func . Must return the same type as func . |
Definition at line 262 of file TProcessExecutor.hxx.
|
delete |
|
private |
Reply to a worker who just sent a result.
If another argument to process exists, tell the worker. Otherwise send a shutdown order.
Definition at line 108 of file TProcessExecutor.cxx.
|
private |
Reply to a worker who is idle.
If another argument to process exists, tell the worker. Otherwise ask for a result
Definition at line 126 of file TProcessExecutor.cxx.
|
private |
Reset TProcessExecutor's state.
Definition at line 97 of file TProcessExecutor.cxx.
|
inline |
Definition at line 66 of file TProcessExecutor.hxx.
|
private |
number of arguments already passed to the workers
Definition at line 93 of file TProcessExecutor.hxx.
|
private |
total number of arguments to pass to the workers
Definition at line 94 of file TProcessExecutor.hxx.
|
private |
the kind of task that is being executed, if any
Definition at line 107 of file TProcessExecutor.hxx.
|
private |
Definition at line 38 of file TProcessExecutor.hxx.