Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: G Ganis Jan 2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerTree
13#define ROOT_TMPWorkerTree
14
15#include "ROOT/TypeTraits.hxx" // InvokeResult_t
16#include "TMPWorker.h"
17#include "TFile.h"
18#include "TEntryList.h"
19#include "TEventList.h"
20#include "TH1.h"
21#include "TKey.h"
22#include "TSelector.h"
23#include "TTree.h"
24#include "TTreeCache.h"
25#include "TTreeReader.h"
26
27#include <memory> //unique_ptr
28#include <string>
29#include <sstream>
30#include <type_traits> //std::enable_if_t
31#include <unistd.h> //pid_t
32#include <vector>
33
34class TMPWorkerTree : public TMPWorker {
35
36public:
38 TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
39 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
40 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
41 ~TMPWorkerTree() override;
42
43 // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
44 TMPWorkerTree(const TMPWorkerTree &) = delete;
46
47protected:
48
49 void CloseFile();
51 void HandleInput(MPCodeBufPair& msg) override; ///< Execute instructions received from a MP client
52 void Init(int fd, UInt_t workerN) override;
53 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
54 std::string &errmsg);
55 TFile *OpenFile(const std::string& fileName);
56 virtual void Process(UInt_t, MPCodeBufPair &) {}
58 virtual void SendResult() { }
59 void Setup();
61
62 std::vector<std::string> fFileNames; ///< the files to be processed by all workers
63 std::string fTreeName; ///< the name of the tree to be processed
64 TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
65 TFile *fFile; ///< last open file
66 TEntryList *fEntryList; ///< entrylist
67 ULong64_t fFirstEntry; ///< first entry to br processed
68
69private:
70
71 // TTree cache handling
72 TTreeCache *fTreeCache; ///< instance of the tree cache for the tree
73 Bool_t fTreeCacheIsLearning; ///< Whether cache is in learning phase
74 Bool_t fUseTreeCache; ///< Control usage of the tree cache
75 Long64_t fCacheSize; ///< Cache size
76};
77
78template<class F>
80public:
81 TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
82 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
83 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
85 {
86 }
87 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
88 ULong64_t firstEntry)
89 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
90 fCanReduce(false)
91 {
92 }
93 ~TMPWorkerTreeFunc() override {}
94
95private:
96 void Process(UInt_t code, MPCodeBufPair &msg) override;
97 void SendResult() override;
98
99 F fProcFunc; ///< copy the function to be executed
100 /// the results of the executions of fProcFunc merged together
101 ROOT::TypeTraits::InvokeResult_t<F, std::reference_wrapper<TTreeReader>> fReducedResult;
102 /// true if fReducedResult can be reduced with a new result, false until we have produced one result
104};
105
107public:
108 TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
109 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
110 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
111 fCallBegin(true)
112 {
113 }
114 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
115 ULong64_t firstEntry)
116 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
117 {
118 }
119 ~TMPWorkerTreeSel() override {}
120
121private:
122 void Process(UInt_t code, MPCodeBufPair &msg) override;
123 void SendResult() override;
124
125 TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
126 bool fCallBegin = true;
127};
128
129//////////////////////////////////////////////////////////////////////////
130/// Auxiliary templated functions
131/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
132/// problem of that object being automatically owned by the current open file.
133/// For these three types, we call SetDirectory(nullptr) to detach the returned
134/// object from the file we are reading the TTree from.
135/// Note: the only sane case in which this should happen is when a TH1F* is
136/// returned.
137template <class T, std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
138 !std::is_constructible<TCollection *, T>::value> * = nullptr>
139void DetachRes(T res)
140{
141 auto th1p = dynamic_cast<TH1*>(res);
142 if(th1p != nullptr) {
143 th1p->SetDirectory(nullptr);
144 return;
145 }
146 auto ttreep = dynamic_cast<TTree*>(res);
147 if(ttreep != nullptr) {
148 ttreep->SetDirectory(nullptr);
149 return;
150 }
151 auto tentrylist = dynamic_cast<TEntryList*>(res);
152 if(tentrylist != nullptr) {
153 tentrylist->SetDirectory(nullptr);
154 return;
155 }
156 auto teventlist = dynamic_cast<TEventList*>(res);
157 if(teventlist != nullptr) {
158 teventlist->SetDirectory(nullptr);
159 return;
160 }
161 return;
162}
163
164// Specialization for TCollections
165template <class T,
166 std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TCollection *, T>::value> * = nullptr>
167void DetachRes(T res)
168{
169 if (res) {
170 TIter nxo(res);
171 TObject *obj = nullptr;
172 while ((obj = nxo())) {
173 DetachRes(obj);
174 }
175 }
176}
177
178//////////////////////////////////////////////////////////////////////////
179/// Generic function processing SendResult and Process overload
180
181template<class F>
183{
184 //send back result
185 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
186}
187
188template <class F>
190{
191
192 Long64_t start = 0;
193 Long64_t finish = 0;
194 TEntryList *enl = 0;
195 std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
196 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
197 reply = sn + errmsg;
198 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
199 return;
200 }
201
202 // create a TTreeReader that reads this range of entries
203 TTreeReader reader(fTree, enl);
204
205 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
206 if(status != TTreeReader::kEntryValid) {
207 reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
208 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
209 return;
210 }
211
212 //execute function
213 auto res = fProcFunc(reader);
214
215 //detach result from file if needed (currently needed for TH1, TTree, TEventList)
216 DetachRes(res);
217
218 //update the number of processed entries
219 fProcessedEntries += finish - start;
220
221 if(fCanReduce) {
223 fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
224 } else {
225 fCanReduce = true;
226 fReducedResult = res;
227 }
228
229 if(fMaxNEntries == fProcessedEntries)
230 //we are done forever
231 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
232 else
233 //we are done for now
234 MPSend(GetSocket(), MPCode::kIdling);
235}
236
237#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned int UInt_t
Definition RtypesCore.h:46
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
void DetachRes(T res)
Auxiliary templated functions If the user lambda returns a TH1F*, TTree*, TEventList*,...
Merge collection of TObjects.
Definition PoolUtils.h:35
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir.
A ROOT file is composed of a header, followed by consecutive data records (TKey instances) with a wel...
Definition TFile.h:53
TH1 is the base class of all histogram classes in ROOT.
Definition TH1.h:58
virtual void SetDirectory(TDirectory *dir)
By default, when a histogram is created, it is added to the list of histogram objects in the current ...
Definition TH1.cxx:8854
Templated derivation of TMPWorkerTree handlign generic function tree processing.
TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
void SendResult() override
Generic function processing SendResult and Process overload.
ROOT::TypeTraits::InvokeResult_t< F, std::reference_wrapper< TTreeReader > > fReducedResult
the results of the executions of fProcFunc merged together
~TMPWorkerTreeFunc() override
void Process(UInt_t code, MPCodeBufPair &msg) override
TMPWorkerTreeFunc(F procFunc, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
F fProcFunc
copy the function to be executed
Templated derivation of TMPWorkerTree handlign selector tree processing.
void SendResult() override
Selector processing SendResult and Process overload.
~TMPWorkerTreeSel() override
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
TMPWorkerTreeSel(TSelector &selector, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
void Process(UInt_t code, MPCodeBufPair &msg) override
Selector specialization.
This class works in conjunction with TTreeProcessorMP, reacting to messages received from it as speci...
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the required tree and evaluate the processing range.
TMPWorkerTree(const TMPWorkerTree &)=delete
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
~TMPWorkerTree() override
void Init(int fd, UInt_t workerN) override
Init overload defining max entries.
void HandleInput(MPCodeBufPair &msg) override
Execute instructions received from a MP client.
void Setup()
Auxiliary method for common initialization.
ULong64_t fFirstEntry
first entry to br processed
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
TMPWorkerTree & operator=(const TMPWorkerTree &)=delete
Bool_t fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
Bool_t fTreeCacheIsLearning
Whether cache is in learning phase.
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
virtual void Process(UInt_t, MPCodeBufPair &)
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:25
Mother of all ROOT objects.
Definition TObject.h:41
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryValid
data read okay
EEntryStatus SetEntriesRange(Long64_t beginEntry, Long64_t endEntry)
Set the range of entries to be loaded by Next(); end will not be loaded.
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition TTree.cxx:8953
#define F(x, y, z)
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
double T(double x)
Definition tree.py:1