12 #ifndef ROOT_TPoolProcessor
13 #define ROOT_TPoolProcessor
30 #include <type_traits>
42 auto th1p =
dynamic_cast<TH1*
>(res);
48 auto ttreep =
dynamic_cast<TTree*
>(res);
49 if(ttreep !=
nullptr) {
54 auto tentrylist =
dynamic_cast<TEntryList*
>(res);
55 if(tentrylist !=
nullptr) {
60 auto teventlist =
dynamic_cast<TEventList*
>(res);
61 if(teventlist !=
nullptr) {
73 TPoolProcessor(
F procFunc,
const std::vector<std::string>& fileNames,
const std::string& treeName,
unsigned nWorkers,
ULong64_t maxEntries);
78 void Init(
int fd,
unsigned workerN);
100 fFileNames(fileNames), fTreeName(treeName), fTree(
nullptr),
101 fNWorkers(nWorkers), fMaxNEntries(maxEntries),
102 fProcessedEntries(0), fReducedResult(), fCanReduce(
false)
109 fFileNames(), fTreeName(), fTree(tree),
110 fNWorkers(nWorkers), fMaxNEntries(maxEntries),
111 fProcessedEntries(0), fReducedResult(), fCanReduce(
false)
118 unsigned code = msg.first;
130 std::string reply =
"S" + std::to_string(GetNWorker());
131 reply +=
": unknown code received: " + std::to_string(code);
141 fMaxNEntries = EvalMaxEntries(fMaxNEntries);
151 unsigned nProcessed = 0;
155 std::cerr <<
"[S]: Process:kProcTree fTree undefined!\n";
159 nProcessed = ReadBuffer<unsigned>(msg.second.get());
161 fileN = nProcessed / fNWorkers;
164 fileN = ReadBuffer<unsigned>(msg.second.get());
167 std::unique_ptr<TFile> fp;
168 TTree *tree =
nullptr;
174 fp.reset(
OpenFile(fTree->GetCurrentFile()->GetName()));
176 fp.reset(
OpenFile(fFileNames[fileN]));
185 tree = RetrieveTree(fp.get());
186 if(tree ==
nullptr) {
202 unsigned nBunch = nEntries / fNWorkers;
203 unsigned rangeN = nProcessed % fNWorkers;
204 start = rangeN*nBunch;
205 if(rangeN < (fNWorkers-1))
206 finish = (rangeN+1)*nBunch;
217 if (fProcessedEntries + finish - start > fMaxNEntries)
218 finish = start + fMaxNEntries - fProcessedEntries;
225 std::string reply =
"S" + std::to_string(GetNWorker());
226 reply +=
": could not set TTreeReader to range " + std::to_string(start) +
" " + std::to_string(finish);
232 auto res = fProcFunc(reader);
238 fProcessedEntries += finish - start;
244 fReducedResult = res;
247 if(fMaxNEntries == fProcessedEntries)
261 if (fp ==
nullptr || fp->
IsZombie()) {
262 std::string reply =
"S" + std::to_string(GetNWorker());
263 reply.append(
": could not open file ");
264 reply.append(fileName);
278 TTree *tree =
nullptr;
279 if(fTreeName ==
"") {
290 tree =
static_cast<TTree*
>(fp->
Get(fTreeName.c_str()));
292 if (tree ==
nullptr) {
293 std::string reply =
"S" + std::to_string(GetNWorker());
294 std::stringstream ss;
295 ss <<
": cannot find tree with name " << fTreeName <<
" in file " << fp->
GetName();
296 reply.append(ss.str());
311 if(GetNWorker() < fNWorkers-1)
312 return maxEntries/fNWorkers;
314 return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
std::result_of< F(std::reference_wrapper< TTreeReader >)>::type fReducedResult
the results of the executions of fProcFunc merged together
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result ...
TObject * ReduceObjects(const std::vector< TObject * > &objs)
Merge collection of TObjects.
Tell a TPoolProcessor to process the tree that was passed to it at construction time.
Small helper to encapsulate whether to return the value pointed to by the iterator or its address...
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Tell a TPoolProcessor which tree and entries range to process. The object sent is a TreeRangeInfo...
virtual void SetDirectory(TDirectory *dir)
By default when an histogram is created, it is added to the list of histogram objects in the current ...
ULong64_t fMaxNEntries
the maximum number of entries to be processed by this worker
TFile * OpenFile(const TString &fin)
virtual TList * GetListOfKeys() const
std::string fTreeName
the name of the tree to be processed
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
virtual const char * GetClassName() const
TFile * OpenFile(const std::string &fileName)
std::vector< std::string > fFileNames
the files to be processed by all workers
ClassImp(TIterator) Bool_t TIterator return false
Compare two iterator objects.
TTree * RetrieveTree(TFile *fp)
Tell a TPoolProcessor which tree to process. The object sent is a TreeInfo.
We are ready for the next task.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
void Process(unsigned code, MPCodeBufPair &msg)
Book space in a file, create I/O buffers, to fill them, (un)compress them.
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a TPool client.
F fProcFunc
the function to be executed
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir...
TPoolProcessor(F procFunc, const std::vector< std::string > &fileNames, const std::string &treeName, unsigned nWorkers, ULong64_t maxEntries)
A TEventList object is a list of selected events (entries) in a TTree.
virtual const char * GetName() const
Returns name of object.
unsigned fNWorkers
the number of workers spawned
The message contains the result of the processing of a TTree.
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Tell the client there was an error while processing.
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Ask for a kFuncResult/kProcResult.
unsigned long long ULong64_t
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual Long64_t GetEntries() const
A TTree object has a header with a name and a title.
A List of entry numbers in a TTree or TChain.
EEntryStatus SetEntriesRange(Long64_t first, Long64_t last)
Set the range of entries to be processed.
ULong64_t fProcessedEntries
the number of entries processed by this worker so far
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcPool::Pro...