12 #ifndef ROOT_TProcessExecutor 13 #define ROOT_TProcessExecutor 33 #include <type_traits> 48 template<
class F,
class Cond = noReferenceCond<F>>
51 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
53 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
60 template<
class F>
auto ProcTree(
const std::vector<std::string>& fileNames,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
61 template<
class F>
auto ProcTree(
const std::string& fileName,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
62 template<
class F>
auto ProcTree(
TFileCollection& files,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
63 template<
class F>
auto ProcTree(
TChain& files,
F procFunc,
const std::string& treeName =
"",
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
64 template<
class F>
auto ProcTree(
TTree&
tree,
F procFunc,
ULong64_t nToProcess = 0) ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
75 template<
class T,
class BINARYOP>
auto Reduce(
const std::vector<T> &objs, BINARYOP redfunc)-> decltype(redfunc(objs.front(), objs.front())) =
delete;
79 template<
class T>
void Collect(std::vector<T> &reslist);
82 void FixLists(std::vector<TObject*> &lists);
93 enum class ETask : unsigned char {
112 template<
class F,
class Cond>
115 using retType = decltype(
func());
122 if (nTimes < oldNWorkers)
125 bool ok =
Fork(worker);
129 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
130 return std::vector<retType>();
135 std::vector<retType> reslist;
136 reslist.reserve(fNToProcess);
153 template<
class F,
class T,
class Cond>
157 using retType = decltype(
func(args.front()));
165 if (args.size() < oldNWorkers)
168 bool ok =
Fork(worker);
172 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
173 return std::vector<retType>();
178 std::vector<retType> reslist;
179 reslist.reserve(fNToProcess);
180 std::vector<unsigned> range(fNToProcess);
181 std::iota(range.begin(), range.end(), 0);
193 template<
class F,
class INTEGER,
class Cond>
196 std::vector<INTEGER> vargs(args.
size());
197 std::copy(args.
begin(), args.
end(), vargs.begin());
198 const auto &reslist =
Map(func, vargs);
208 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
209 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
217 bool ok =
Fork(worker);
219 Error(
"TProcessExecutor::ProcTree",
"[E][C] Could not fork. Aborting operation.");
223 if(fileNames.size() < nWorkers) {
228 std::vector<unsigned> args(nWorkers);
229 std::iota(args.begin(), args.end(), 0);
231 if(fNProcessed < nWorkers)
232 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
237 std::vector<unsigned> args(nWorkers);
238 std::iota(args.begin(), args.end(), 0);
240 if(fNProcessed < nWorkers)
241 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
245 std::vector<TObject*> reslist;
250 auto res = redfunc(reslist);
255 return static_cast<retType
>(res);
262 std::vector<std::string> singleFileName(1, fileName);
263 return ProcTree(singleFileName, procFunc, treeName, nToProcess);
270 std::vector<std::string> fileNames(files.GetNFiles());
272 for(
auto f : *static_cast<THashList*>(files.GetList()))
273 fileNames[count++] =
static_cast<TFileInfo*
>(
f)->GetCurrentUrl()->GetUrl();
275 return ProcTree(fileNames, procFunc, treeName, nToProcess);
282 TObjArray* filelist = files.GetListOfFiles();
283 std::vector<std::string> fileNames(filelist->
GetEntries());
285 for(
auto f : *filelist)
286 fileNames[count++] =
f->GetTitle();
288 return ProcTree(fileNames, procFunc, treeName, nToProcess);
295 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::
type;
296 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304 bool ok =
Fork(worker);
306 Error(
"TProcessExecutor::ProcTree",
"[E][C] Could not fork. Aborting operation.");
315 std::vector<unsigned> args(nWorkers);
316 std::iota(args.begin(), args.end(), 0);
318 if(fNProcessed < nWorkers)
319 Error(
"TProcessExecutor::ProcTree",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
322 std::vector<TObject*> reslist;
327 auto res = redfunc(reslist);
332 return static_cast<retType
>(res);
340 unsigned code = msg.first;
342 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
347 if(msg.second !=
nullptr)
348 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
351 const char *str = ReadBuffer<const char*>(msg.second.get());
352 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n" 353 "Continuing execution ignoring these entries.", str);
358 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
375 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
377 }
else if (msg.first < 1000)
auto Map(F func, unsigned nTimes) -> std::vector< typename std::result_of< F()>::type >
Execute func (with no arguments) nTimes in parallel.
void Collect(std::vector< T > &reslist)
Listen for messages sent by the workers and call the appropriate handler function.
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector< T > &reslist)
Handle message and reply to the worker.
The message contains the result of a function execution.
auto ProcTree(const std::vector< std::string > &fileNames, F procFunc, const std::string &treeName="", ULong64_t nToProcess=0) -> typename std::result_of< F(std::reference_wrapper< TTreeReader >)>::type
void SetNWorkers(unsigned n)
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
This namespace contains pre-defined functions to be used in conjuction with TExecutor::Map and TExecu...
TProcessExecutor & operator=(const TProcessExecutor &)=delete
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
auto Reduce(const std::vector< T > &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))=delete
Error while reading from the socket.
unsigned fNProcessed
number of arguments already passed to the workers
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
We are ready for the next task.
unsigned GetNWorkers() const
void Remove(TSocket *s)
Remove a certain socket from the monitor.
a Map method with arguments is being executed
Merge collection of TObjects.
TProcessExecutor(unsigned nWorkers=0)
Class constructor.
TSocket * Select()
Return pointer to socket for which an event is waiting.
void ReplyToFuncResult(TSocket *s)
Reply to a worker who just sent a result.
unsigned GetNWorkers() const
Execute function with the argument contained in the message.
a ProcTree method is being executed and each worker will process a certain range of each file ...
a Map method with no arguments is being executed
ETask fTaskType
the kind of task that is being executed, if any
The message contains the result of the processing of a TTree.
no task is being executed
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Execute function without arguments.
Used by the client to tell servers to shutdown.
Tell the client there was an error while processing.
void ReplyToIdle(TSocket *s)
Reply to a worker who is idle.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
~TProcessExecutor()=default
unsigned fNToProcess
total number of arguments to pass to the workers
void FixLists(std::vector< TObject * > &lists)
Fix list of lists before merging (to avoid errors about duplicated objects)
A pseudo container class which is a generator of indices.
unsigned long long ULong64_t
This class works together with TProcessExecutor to allow the execution of functions in server process...
double func(double *x, double *p)
Base class for multiprocess applications' clients.
void SetNWorkers(unsigned n)
Set the number of workers that will be spawned by the next call to Fork()
void Reset()
Reset TProcessExecutor's state.
Int_t GetEntries() const
Return the number of objects in array (i.e.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
a ProcTree method is being executed and each worker will process a different file ...
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
A chain is a collection of files containg TTree objects.
A TTree object has a header with a name and a title.
Class describing a generic file including meta information.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
ETask
A collection of the types of tasks that TProcessExecutor can execute.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
MPCodeBufPair MPRecv(TSocket *s)
Receive message from a socket.
void Error(ErrorHandler_t func, int code, const char *va_(fmt),...)
Write error message and call a handler, if required.