Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMPWorkerTree.h
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: G Ganis Jan 2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TMPWorkerTree
13#define ROOT_TMPWorkerTree
14
15#include "ROOT/TypeTraits.hxx" // InvokeResult_t
16#include "TMPWorker.h"
17#include "TFile.h"
18#include "TEntryList.h"
19#include "TEventList.h"
20#include "TH1.h"
21#include "TKey.h"
22#include "TSelector.h"
23#include "TTree.h"
24#include "TTreeCache.h"
25#include "TTreeReader.h"
26
27#include <memory> //unique_ptr
28#include <string>
29#include <sstream>
30#include <type_traits> //std::enable_if_t
31#include <unistd.h> //pid_t
32#include <vector>
33
34class TMPWorkerTree : public TMPWorker {
35
36public:
38 TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
39 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
40 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
41 ~TMPWorkerTree() override;
42
43 // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
44 TMPWorkerTree(const TMPWorkerTree &) = delete;
46
47protected:
48
49 void CloseFile();
51 void HandleInput(MPCodeBufPair& msg) override; ///< Execute instructions received from a MP client
52 void Init(int fd, UInt_t workerN) override;
53 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
54 std::string &errmsg);
55 TFile *OpenFile(const std::string& fileName);
56 virtual void Process(UInt_t, MPCodeBufPair &) {}
58 virtual void SendResult() { }
59 void Setup();
60 void SetupTreeCache(TTree *tree);
61
62 std::vector<std::string> fFileNames; ///< the files to be processed by all workers
63 std::string fTreeName; ///< the name of the tree to be processed
64 TTree *fTree; ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
65 TFile *fFile; ///< last open file
66 TEntryList *fEntryList; ///< entrylist
67 ULong64_t fFirstEntry; ///< first entry to br processed
68
69private:
70
71 // TTree cache handling
72 TTreeCache *fTreeCache; ///< instance of the tree cache for the tree
73 bool fTreeCacheIsLearning; ///< Whether cache is in learning phase
74 bool fUseTreeCache; ///< Control usage of the tree cache
75 Long64_t fCacheSize; ///< Cache size
76};
77
78template<class F>
80public:
81 TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
82 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
83 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
85 {
86 }
87 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
88 ULong64_t firstEntry)
89 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
90 fCanReduce(false)
91 {
92 }
93 ~TMPWorkerTreeFunc() override {}
94
95private:
96 void Process(UInt_t code, MPCodeBufPair &msg) override;
97 void SendResult() override;
98
99 F fProcFunc; ///< copy the function to be executed
100 /// the results of the executions of fProcFunc merged together
101 ROOT::TypeTraits::InvokeResult_t<F, std::reference_wrapper<TTreeReader>> fReducedResult;
102 /// true if fReducedResult can be reduced with a new result, false until we have produced one result
104};
105
107public:
108 TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
109 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
110 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
111 fCallBegin(true)
112 {
113 }
114 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
115 ULong64_t firstEntry)
116 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
117 {
118 }
119 ~TMPWorkerTreeSel() override {}
120
121private:
122 void Process(UInt_t code, MPCodeBufPair &msg) override;
123 void SendResult() override;
124
125 TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
126 bool fCallBegin = true;
127};
128
129//////////////////////////////////////////////////////////////////////////
130/// Auxiliary templated functions
131/// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
132/// problem of that object being automatically owned by the current open file.
133/// For these three types, we call SetDirectory(nullptr) to detach the returned
134/// object from the file we are reading the TTree from.
135/// Note: the only sane case in which this should happen is when a TH1F* is
136/// returned.
137template <class T, std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
138 !std::is_constructible<TCollection *, T>::value> * = nullptr>
139void DetachRes(T res)
140{
141 auto th1p = dynamic_cast<TH1*>(res);
142 if(th1p != nullptr) {
143 th1p->SetDirectory(nullptr);
144 return;
145 }
146 auto ttreep = dynamic_cast<TTree*>(res);
147 if(ttreep != nullptr) {
148 ttreep->SetDirectory(nullptr);
149 return;
150 }
151 auto tentrylist = dynamic_cast<TEntryList*>(res);
152 if(tentrylist != nullptr) {
153 tentrylist->SetDirectory(nullptr);
154 return;
155 }
156 auto teventlist = dynamic_cast<TEventList*>(res);
157 if(teventlist != nullptr) {
158 teventlist->SetDirectory(nullptr);
159 return;
160 }
161 return;
162}
163
164// Specialization for TCollections
165template <class T,
166 std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TCollection *, T>::value> * = nullptr>
167void DetachRes(T res)
168{
169 if (res) {
170 TIter nxo(res);
171 TObject *obj = nullptr;
172 while ((obj = nxo())) {
173 DetachRes(obj);
174 }
175 }
176}
177
178//////////////////////////////////////////////////////////////////////////
179/// Generic function processing SendResult and Process overload
180
181template<class F>
183{
184 //send back result
185 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
186}
187
188template <class F>
190{
191
192 Long64_t start = 0;
193 Long64_t finish = 0;
194 TEntryList *enl = nullptr;
195 std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
196 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
197 reply = sn + errmsg;
198 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
199 return;
200 }
201
202 // create a TTreeReader that reads this range of entries
203 TTreeReader reader(fTree, enl);
204
205 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
206 if(status != TTreeReader::kEntryValid) {
207 reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
208 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
209 return;
210 }
211
212 //execute function
213 auto res = fProcFunc(reader);
214
215 //detach result from file if needed (currently needed for TH1, TTree, TEventList)
216 DetachRes(res);
217
218 //update the number of processed entries
219 fProcessedEntries += finish - start;
220
221 if(fCanReduce) {
223 fReducedResult = static_cast<decltype(fReducedResult)>(redfunc({res, fReducedResult})); //TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
224 } else {
225 fCanReduce = true;
226 fReducedResult = res;
227 }
228
229 if(fMaxNEntries == fProcessedEntries)
230 //we are done forever
231 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
232 else
233 //we are done for now
234 MPSend(GetSocket(), MPCode::kIdling);
235}
236
237#endif
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition MPSendRecv.h:32
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
unsigned int UInt_t
Definition RtypesCore.h:46
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
void DetachRes(T res)
Auxiliary templated functions If the user lambda returns a TH1F*, TTree*, TEventList*,...
Merge collection of TObjects.
Definition PoolUtils.h:35
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual void SetDirectory(TDirectory *dir)
Add reference to directory dir. dir can be 0.
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual void SetDirectory(TDirectory *dir)
Remove reference to this EventList from current directory and add reference to new directory dir.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
TH1 is the base class of all histogram classes in ROOT.
Definition TH1.h:59
virtual void SetDirectory(TDirectory *dir)
By default, when a histogram is created, it is added to the list of histogram objects in the current ...
Definition TH1.cxx:8905
Templated derivation of TMPWorkerTree handlign generic function tree processing.
TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
bool fCanReduce
true if fReducedResult can be reduced with a new result, false until we have produced one result
void SendResult() override
Generic function processing SendResult and Process overload.
ROOT::TypeTraits::InvokeResult_t< F, std::reference_wrapper< TTreeReader > > fReducedResult
the results of the executions of fProcFunc merged together
~TMPWorkerTreeFunc() override
void Process(UInt_t code, MPCodeBufPair &msg) override
TMPWorkerTreeFunc(F procFunc, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
F fProcFunc
copy the function to be executed
Templated derivation of TMPWorkerTree handlign selector tree processing.
void SendResult() override
Selector processing SendResult and Process overload.
~TMPWorkerTreeSel() override
TSelector & fSelector
pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
TMPWorkerTreeSel(TSelector &selector, const std::vector< std::string > &fileNames, TEntryList *entries, const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
void Process(UInt_t code, MPCodeBufPair &msg) override
Selector specialization.
This class works in conjunction with TTreeProcessorMP, reacting to messages received from it as speci...
Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl, std::string &errmsg)
Load the required tree and evaluate the processing range.
TMPWorkerTree(const TMPWorkerTree &)=delete
TTree * fTree
pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecut...
TFile * fFile
last open file
~TMPWorkerTree() override
void Init(int fd, UInt_t workerN) override
Init overload defining max entries.
void HandleInput(MPCodeBufPair &msg) override
Execute instructions received from a MP client.
void Setup()
Auxiliary method for common initialization.
ULong64_t fFirstEntry
first entry to br processed
ULong64_t EvalMaxEntries(ULong64_t maxEntries)
Max entries evaluation.
TTree * RetrieveTree(TFile *fp)
Retrieve a tree from an open file.
TEntryList * fEntryList
entrylist
TMPWorkerTree()
Class constructors.
TMPWorkerTree & operator=(const TMPWorkerTree &)=delete
bool fUseTreeCache
Control usage of the tree cache.
std::vector< std::string > fFileNames
the files to be processed by all workers
TFile * OpenFile(const std::string &fileName)
Handle file opening.
virtual void SendResult()
virtual void Process(UInt_t, MPCodeBufPair &)
Long64_t fCacheSize
Cache size.
std::string fTreeName
the name of the tree to be processed
void CloseFile()
Handle file closing.
void SetupTreeCache(TTree *tree)
Tree cache handling.
TTreeCache * fTreeCache
instance of the tree cache for the tree
bool fTreeCacheIsLearning
Whether cache is in learning phase.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition TMPWorker.h:25
Mother of all ROOT objects.
Definition TObject.h:41
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Definition TSelector.h:31
A cache to speed-up the reading of ROOT datasets.
Definition TTreeCache.h:32
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryValid
data read okay
EEntryStatus SetEntriesRange(Long64_t beginEntry, Long64_t endEntry)
Set the range of entries to be loaded by Next(); end will not be loaded.
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual void SetDirectory(TDirectory *dir)
Change the tree's directory.
Definition TTree.cxx:8966
#define F(x, y, z)
@ kIdling
We are ready for the next task.
Definition MPCode.h:35
@ kProcError
Tell the client there was an error while processing.
Definition MPCode.h:44
@ kProcResult
The message contains the result of the processing of a TTree.
Definition MPCode.h:42
double T(double x)