Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: G Ganis Jan 2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerTree
13#define ROOT_TMPWorkerTree
14
15#include "TMPWorker.h"
16#include "TFile.h"
17#include "TEntryList.h"
18#include "TEventList.h"
19#include "TH1.h"
20#include "TKey.h"
21#include "TSelector.h"
22#include "TTree.h"
23#include "TTreeCache.h"
24#include "TTreeReader.h"
25
26#include <memory> //unique_ptr
27#include <string>
28#include <sstream>
29#include <type_traits> //std::result_of
30#include <unistd.h> //pid_t
31#include <vector>
32
33class TMPWorkerTree : public TMPWorker {
34
35public:
37 TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
38 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
39 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
40 virtual ~TMPWorkerTree();
41
42 // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
43 TMPWorkerTree(const TMPWorkerTree &) = delete;
45
46protected:
47
48 void CloseFile();
50 void HandleInput(MPCodeBufPair& msg); ///< Execute instructions received from a MP client
51 void Init(int fd, UInt_t workerN);
52 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
53 std::string &errmsg);
54 TFile *OpenFile(const std::string& fileName);
55 virtual void Process(UInt_t, MPCodeBufPair &) {}
57 virtual void SendResult() { }
58 void Setup();
60
61 std::vector<std::string> fFileNames; ///< the files to be processed by all workers
62 std::string fTreeName; ///< the name of the tree to be processed
63 TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
64 TFile *fFile; ///< last open file
65 TEntryList *fEntryList; ///< entrylist
66 ULong64_t fFirstEntry; ///< first entry to br processed
67
68private:
69
70 // TTree cache handling
71 TTreeCache *fTreeCache; ///< instance of the tree cache for the tree
72 Bool_t fTreeCacheIsLearning; ///< Whether cache is in learning phase
73 Bool_t fUseTreeCache; ///< Control usage of the tree cache
74 Long64_t fCacheSize; ///< Cache size
75};
76
77template<class F>
79public:
80 TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
81 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
82 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
84 {
85 }
86 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
87 ULong64_t firstEntry)
88 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
89 fCanReduce(false)
90 {
91 }
92 virtual ~TMPWorkerTreeFunc() {}
93
94private:
95 void Process(UInt_t code, MPCodeBufPair &msg);
96 void SendResult();
97
98 F fProcFunc; ///< copy the function to be executed
99 /// the results of the executions of fProcFunc merged together
100 std::result_of_t<F(std::reference_wrapper<TTreeReader>)> fReducedResult;
101 /// true if fReducedResult can be reduced with a new result, false until we have produced one result
103};
104
106public:
107 TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
108 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
109 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
110 fCallBegin(true)
111 {
112 }
113 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
114 ULong64_t firstEntry)
115 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
116 {
117 }
118 virtual ~TMPWorkerTreeSel() {}
119
120private:
121 void Process(UInt_t code, MPCodeBufPair &msg);
122 void SendResult();
123
124 TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
125 bool fCallBegin = true;
126};
127
128//////////////////////////////////////////////////////////////////////////
129/// Auxiliary templated functions
130/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
131/// problem of that object being automatically owned by the current open file.
132/// For these three types, we call SetDirectory(nullptr) to detach the returned
133/// object from the file we are reading the TTree from.
134/// Note: the only sane case in which this should happen is when a TH1F* is
135/// returned.
136template <class T, std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
137 !std::is_constructible<TCollection *, T>::value> * = nullptr>
138void DetachRes(T res)
139{
140 auto th1p = dynamic_cast<TH1*>(res);
141 if(th1p != nullptr) {
142 th1p->SetDirectory(nullptr);
143 return;
144 }
145 auto ttreep = dynamic_cast<TTree*>(res);
146 if(ttreep != nullptr) {
147 ttreep->SetDirectory(nullptr);
148 return;
149 }
150 auto tentrylist = dynamic_cast<TEntryList*>(res);
151 if(tentrylist != nullptr) {
152 tentrylist->SetDirectory(nullptr);
153 return;
154 }
155 auto teventlist = dynamic_cast<TEventList*>(res);
156 if(teventlist != nullptr) {
157 teventlist->SetDirectory(nullptr);
158 return;
159 }
160 return;
161}
162
163// Specialization for TCollections
164template <class T,
165 std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TCollection *, T>::value> * = nullptr>
166void DetachRes(T res)
167{
168 if (res) {
169 TIter nxo(res);
170 TObject *obj = 0;
171 while ((obj = nxo())) {
172 DetachRes(obj);
173 }
174 }
175}
176
177//////////////////////////////////////////////////////////////////////////
178/// Generic function processing SendResult and Process overload
179
180template<class F>
182{
183 //send back result
184 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
185}
186
187template <class F>
189{
190
191 Long64_t start = 0;
192 Long64_t finish = 0;
193 TEntryList *enl = 0;
194 std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
195 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
196 reply = sn + errmsg;
197 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
198 return;
199 }
200
201 // create a TTreeReader that reads this range of entries
202 TTreeReader reader(fTree, enl);
203
204 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
205 if(status != TTreeReader::kEntryValid) {
206 reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
207 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
208 return;
209 }
210
211 //execute function
212 auto res = fProcFunc(reader);
213
214 //detach result from file if needed (currently needed for TH1, TTree, TEventList)
215 DetachRes(res);
216
217 //update the number of processed entries
218 fProcessedEntries += finish - start;
219
220 if(fCanReduce) {
222 fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
223 } else {
224 fCanReduce = true;
225 fReducedResult = res;
226 }
227
228 if(fMaxNEntries == fProcessedEntries)
229 //we are done forever
230 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
231 else
232 //we are done for now
233 MPSend(GetSocket(), MPCode::kIdling);
234}
235
236#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned int UInt_t
Definition RtypesCore.h:46
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
void DetachRes(T res)
Auxiliary templated functions If the user lambda returns a TH1F*, TTree*, TEventList*,...
Merge collection of TObjects.
Definition PoolUtils.h:35
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
TH1 is the base class of all histogram classes in ROOT.
Definition TH1.h:58
virtual void SetDirectory(TDirectory *dir)
By default, when a histogram is created, it is added to the list of histogram objects in the current ...
Definition TH1.cxx:8767
Templated derivation of TMPWorkerTree handlign generic function tree processing.
virtual ~TMPWorkerTreeFunc()
TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
std::result_of_t< F(std::reference_wrapper< TTreeReader >)> fReducedResult
the results of the executions of fProcFunc merged together
TMPWorkerTreeFunc(F procFunc, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
void SendResult()
Generic function processing SendResult and Process overload.
void Process(UInt_t code, MPCodeBufPair &msg)
F fProcFunc
copy the function to be executed
Templated derivation of TMPWorkerTree handlign selector tree processing.
void SendResult()
Selector processing SendResult and Process overload.
virtual ~TMPWorkerTreeSel()
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
void Process(UInt_t code, MPCodeBufPair &msg)
Selector specialization.
TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
TMPWorkerTreeSel(TSelector &selector, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
This class works in conjunction with TTreeProcessorMP, reacting to messages received from it as speci...
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the required tree and evaluate the processing range.
TMPWorkerTree(const TMPWorkerTree &)=delete
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
void Setup()
Auxiliary method for common initialization.
ULong64_t fFirstEntry
first entry to br processed
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
TMPWorkerTree & operator=(const TMPWorkerTree &)=delete
Bool_t fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
void Init(int fd, UInt_t workerN)
Init overload defining max entries.
virtual void Process(UInt_t, MPCodeBufPair &)
virtual ~TMPWorkerTree()
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
void HandleInput(MPCodeBufPair &msg)
Execute instructions received from a MP client.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:26
Mother of all ROOT objects.
Definition TObject.h:41
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryValid
data read okay
EEntryStatus SetEntriesRange(Long64_t beginEntry, Long64_t endEntry)
Set the begin and end entry numbers.
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition TTree.cxx:8932
#define F(x, y, z)
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
double T(double x)
Definition tree.py:1