Logo ROOT   6.10/09
Reference Guide
TDFNodes.cxx
Go to the documentation of this file.
1 // Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 #include "RConfigure.h" // R__USE_IMT
12 #include "ROOT/TDFNodes.hxx"
13 #include "ROOT/TSpinMutex.hxx"
15 #ifdef R__USE_IMT
16 #include "ROOT/TThreadExecutor.hxx"
17 #endif
18 #include "RtypesCore.h" // Long64_t
19 #include "TROOT.h" // IsImplicitMTEnabled
20 #include "TTreeReader.h"
21 
22 #include <cassert>
23 #include <mutex>
24 #include <numeric> // std::accumulate
25 #include <string>
26 class TDirectory;
27 class TTree;
28 using namespace ROOT::Detail::TDF;
29 using namespace ROOT::Internal::TDF;
30 
31 namespace ROOT {
32 namespace Internal {
33 namespace TDF {
34 
35 TActionBase::TActionBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int nSlots)
36  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fNSlots(nSlots)
37 {
38 }
39 
40 } // end NS TDF
41 } // end NS Internal
42 } // end NS ROOT
43 
44 TCustomColumnBase::TCustomColumnBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, std::string_view name,
45  unsigned int nSlots)
46  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fName(name), fNSlots(nSlots){};
47 
48 ColumnNames_t TCustomColumnBase::GetTmpBranches() const
49 {
50  return fTmpBranches;
51 }
52 
53 std::string TCustomColumnBase::GetName() const
54 {
55  return fName;
56 }
57 
59 {
60  return fImplPtr;
61 }
62 
63 TFilterBase::TFilterBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, std::string_view name,
64  unsigned int nSlots)
65  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fLastCheckedEntry(nSlots, -1), fLastResult(nSlots),
66  fAccepted(nSlots), fRejected(nSlots), fName(name), fNSlots(nSlots)
67 {
68 }
69 
71 {
72  return fImplPtr;
73 }
74 
75 ColumnNames_t TFilterBase::GetTmpBranches() const
76 {
77  return fTmpBranches;
78 }
79 
81 {
82  return !fName.empty();
83 };
84 
86 {
87  if (fName.empty()) // PrintReport is no-op for unnamed filters
88  return;
89  const auto accepted = std::accumulate(fAccepted.begin(), fAccepted.end(), 0ULL);
90  const auto all = accepted + std::accumulate(fRejected.begin(), fRejected.end(), 0ULL);
91  double perc = accepted;
92  if (all > 0) perc /= all;
93  perc *= 100.;
94  Printf("%-10s: pass=%-10lld all=%-10lld -- %8.3f %%", fName.c_str(), accepted, all, perc);
95 }
96 
97 // This is an helper class to allow to pick a slot without resorting to a map
98 // indexed by thread ids.
99 // WARNING: this class does not work as a regular stack. The size is
100 // fixed at construction time and no blocking is foreseen.
101 class TSlotStack {
102 private:
103  unsigned int fCursor;
104  std::vector<unsigned int> fBuf;
105  ROOT::TSpinMutex fMutex;
106 
107 public:
108  TSlotStack() = delete;
109  TSlotStack(unsigned int size) : fCursor(size), fBuf(size) { std::iota(fBuf.begin(), fBuf.end(), 0U); }
110  void Push(unsigned int slotNumber);
111  unsigned int Pop();
112 };
113 
114 void TSlotStack::Push(unsigned int slotNumber)
115 {
116  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
117  fBuf[fCursor++] = slotNumber;
118  assert(fCursor <= fBuf.size() && "TSlotStack assumes that at most a fixed number of values can be present in the "
119  "stack. fCursor is greater than the size of the internal buffer. This violates "
120  "such assumption.");
121 }
122 
123 unsigned int TSlotStack::Pop()
124 {
125  assert(fCursor > 0 &&
126  "TSlotStack assumes that a value can be always popped. fCursor is <=0 and this violates such assumption.");
127  std::lock_guard<ROOT::TSpinMutex> guard(fMutex);
128  return fBuf[--fCursor];
129 }
130 
131 TLoopManager::TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
132  : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultBranches(defaultBranches),
134 {
135 }
136 
138 {
139 }
140 
141 void TLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
142 {
143  for (auto &actionPtr : fBookedActions) actionPtr->Run(slot, entry);
144  for (auto &namedFilterPtr : fBookedNamedFilters) namedFilterPtr->CheckFilters(slot, entry);
145 }
146 
147 /// Perform clean-up operations. To be called at the end of each task execution.
148 void TLoopManager::CleanUpTask(unsigned int slot)
149 {
150  for (auto &ptr : fBookedActions) ptr->ClearValueReaders(slot);
151  for (auto &ptr : fBookedFilters) ptr->ClearValueReaders(slot);
152  for (auto &pair : fBookedBranches) pair.second->ClearValueReaders(slot);
153 }
154 
156 {
157 #ifdef R__USE_IMT
159  TSlotStack slotStack(fNSlots);
160  InitNodes();
161 
162  if (fNEmptyEntries > 0) {
163  // Working with an empty tree.
164  // Evenly partition the entries according to fNSlots
165  const auto nEntriesPerSlot = fNEmptyEntries / fNSlots;
166  auto remainder = fNEmptyEntries % fNSlots;
167  std::vector<std::pair<Long64_t, Long64_t>> entryRanges;
168  Long64_t start = 0;
169  while (start < fNEmptyEntries) {
170  Long64_t end = start + nEntriesPerSlot;
171  if (remainder > 0) {
172  ++end;
173  --remainder;
174  }
175  entryRanges.emplace_back(start, end);
176  start = end;
177  }
178 
179  // Each task will generate a subrange of entries
180  auto genFunction = [this, &slotStack](const std::pair<Long64_t, Long64_t> &range) {
181  auto slot = slotStack.Pop();
182  InitAllNodes(nullptr, slot);
183  for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
184  RunAndCheckFilters(slot, currEntry);
185  }
186  CleanUpTask(slot);
187  slotStack.Push(slot);
188  };
189 
191  pool.Foreach(genFunction, entryRanges);
192  } else {
193  using ttpmt_t = ROOT::TTreeProcessorMT;
194  std::unique_ptr<ttpmt_t> tp;
195  tp.reset(new ttpmt_t(*fTree));
196 
197  tp->Process([this, &slotStack](TTreeReader &r) -> void {
198  auto slot = slotStack.Pop();
199  InitAllNodes(&r, slot);
200  // recursive call to check filters and conditionally execute actions
201  while (r.Next()) {
203  }
204  CleanUpTask(slot);
205  slotStack.Push(slot);
206  });
207  }
208  } else {
209 #endif // R__USE_IMT
210  InitNodes();
211  if (fNEmptyEntries > 0) {
212  InitAllNodes(nullptr, 0);
213  for (Long64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
214  RunAndCheckFilters(0, currEntry);
215  }
216  } else {
217  TTreeReader r(fTree.get());
218  InitAllNodes(&r, 0);
219 
220  // recursive call to check filters and conditionally execute actions
221  // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
222  while (r.Next() && fNStopsReceived < fNChildren) {
223  RunAndCheckFilters(0, r.GetCurrentEntry());
224  }
225  }
226 #ifdef R__USE_IMT
227  }
228 #endif // R__USE_IMT
229 
230  fHasRunAtLeastOnce = true;
231  // forget actions
232  fBookedActions.clear();
233  // make all TResultProxies ready
234  for (auto readiness : fResProxyReadiness) {
235  *readiness.get() = true;
236  }
237  // forget TResultProxies
238  fResProxyReadiness.clear();
239 }
240 
241 /// Build TTreeReaderValues for all nodes
242 ///
243 /// This method loops over all filters, actions and other booked objects and
244 /// calls their `BuildReaderValues` methods. It is called once per node per slot, before
245 /// running the event loop. It also informs each node of the TTreeReader that
246 /// a particular slot will be using.
247 void TLoopManager::InitAllNodes(TTreeReader *r, unsigned int slot)
248 {
249  // booked branches must be initialized first
250  // because actions and filters might need to point to the values encapsulate
251  for (auto &bookedBranch : fBookedBranches) bookedBranch.second->Init(r, slot);
252  for (auto &ptr : fBookedActions) ptr->Init(r, slot);
253  for (auto &ptr : fBookedFilters) ptr->Init(r, slot);
254 }
255 
256 /// Initialize all nodes of the functional graph before running the event loop.
257 /// This method is called once per event-loop and performs generic initialization
258 /// operations that do not depend on the specific processing slot (i.e. operations
259 /// that are common for all threads).
261 {
262  for (auto &namedFilterPtr : fBookedNamedFilters) namedFilterPtr->ResetReportCount();
263 }
264 
266 {
267  return this;
268 }
269 
270 const ColumnNames_t &TLoopManager::GetDefaultBranches() const
271 {
272  return fDefaultBranches;
273 }
274 
275 TTree *TLoopManager::GetTree() const
276 {
277  return fTree.get();
278 }
279 
281 {
282  auto it = fBookedBranches.find(name);
283  return it == fBookedBranches.end() ? nullptr : it->second.get();
284 }
285 
287 {
288  return fDirPtr;
289 }
290 
291 void TLoopManager::Book(const ActionBasePtr_t &actionPtr)
292 {
293  fBookedActions.emplace_back(actionPtr);
294 }
295 
296 void TLoopManager::Book(const FilterBasePtr_t &filterPtr)
297 {
298  fBookedFilters.emplace_back(filterPtr);
299  if (filterPtr->HasName()) {
300  fBookedNamedFilters.emplace_back(filterPtr);
301  }
302 }
303 
305 {
306  fBookedBranches[branchPtr->GetName()] = branchPtr;
307 }
308 
309 void TLoopManager::Book(const std::shared_ptr<bool> &readinessPtr)
310 {
311  fResProxyReadiness.emplace_back(readinessPtr);
312 }
313 
314 void TLoopManager::Book(const RangeBasePtr_t &rangePtr)
315 {
316  fBookedRanges.emplace_back(rangePtr);
317 }
318 
319 // dummy call, end of recursive chain of calls
320 bool TLoopManager::CheckFilters(int, unsigned int)
321 {
322  return true;
323 }
324 
325 /// Call `PrintReport` on all booked filters
327 {
328  for (const auto &fPtr : fBookedNamedFilters) fPtr->PrintReport();
329 }
330 
331 TRangeBase::TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop,
332  unsigned int stride, unsigned int nSlots)
333  : fImplPtr(implPtr), fTmpBranches(tmpBranches), fStart(start), fStop(stop), fStride(stride), fNSlots(nSlots)
334 {
335 }
336 
338 {
339  return fImplPtr;
340 }
341 
342 ColumnNames_t TRangeBase::GetTmpBranches() const
343 {
344  return fTmpBranches;
345 }
void Foreach(F func, unsigned nTimes)
Execute func (with no arguments) nTimes in parallel.
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:48
FilterBaseVec_t fBookedFilters
Definition: TDFNodes.hxx:53
long long Long64_t
Definition: RtypesCore.h:69
TTreeReader is a simple, robust and fast interface to read values from a TTree, TChain or TNtuple...
Definition: TTreeReader.h:43
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
Long64_t GetCurrentEntry() const
Returns the index of the current entry being read.
Definition: TTreeReader.h:209
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:70
TRangeBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches, unsigned int start, unsigned int stop, unsigned int stride, unsigned int nSlots)
Definition: TDFNodes.cxx:331
TTree()
Default constructor and I/O constructor.
Definition: TTree.cxx:652
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:337
TLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
Definition: TDFNodes.cxx:131
std::map< std::string, TmpBranchBasePtr_t > fBookedBranches
Definition: TDFNodes.hxx:55
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
Definition: TDFNodes.cxx:260
STL namespace.
A spin mutex class which respects the STL interface for mutexes.
Definition: TSpinMutex.hxx:40
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: TDFNodes.hxx:64
void InitAllNodes(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes.
Definition: TDFNodes.cxx:247
void Book(const ActionBasePtr_t &actionPtr)
Definition: TDFNodes.cxx:291
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
Definition: TDFNodes.cxx:148
const ColumnNames_t fTmpBranches
Definition: TDFNodes.hxx:215
std::vector< ULong64_t > fAccepted
Definition: TDFNodes.hxx:386
std::vector< ULong64_t > fRejected
Definition: TDFNodes.hxx:387
std::shared_ptr< TFilterBase > FilterBasePtr_t
Definition: TDFNodes.hxx:44
TCustomColumnBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name, unsigned int nSlots)
Definition: TDFNodes.cxx:44
const ColumnNames_t & GetDefaultBranches() const
Definition: TDFNodes.cxx:270
void Report() const
Call PrintReport on all booked filters.
Definition: TDFNodes.cxx:326
bool CheckFilters(int, unsigned int)
Definition: TDFNodes.cxx:320
const ColumnNames_t fTmpBranches
Definition: TDFNodes.hxx:383
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:342
ActionBaseVec_t fBookedActions
Definition: TDFNodes.hxx:52
This class provides a simple interface to execute the same task multiple times in parallel...
const unsigned int fNSlots
Number of thread slots used by this node, inherited from parent node.
Definition: TDFNodes.hxx:283
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Definition: TDFNodes.cxx:141
TRandom2 r(17)
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: TDFNodes.hxx:65
std::shared_ptr< TCustomColumnBase > TmpBranchBasePtr_t
Definition: TDFNodes.hxx:42
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:381
const unsigned int fNSlots
Definition: TDFNodes.hxx:62
FilterBaseVec_t fBookedNamedFilters
Definition: TDFNodes.hxx:54
#define Printf
Definition: TGeoToOCC.h:18
TFilterBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name, unsigned int nSlots)
Definition: TDFNodes.cxx:63
Describe directory structure in memory.
Definition: TDirectory.h:34
const unsigned int fNSlots
Number of thread slots used by this node.
Definition: TDFNodes.hxx:216
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:487
const ColumnNames_t fDefaultBranches
Definition: TDFNodes.hxx:60
ColumnNames_t GetTmpBranches() const
Definition: TDFNodes.cxx:75
TCustomColumnBase * GetBookedBranch(const std::string &name) const
Definition: TDFNodes.cxx:280
std::shared_ptr< TDFInternal::TActionBase > ActionBasePtr_t
Definition: TDFNodes.hxx:39
unsigned int GetNSlots() const
Definition: TDFNodes.hxx:92
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:277
TLoopManager * GetImplPtr() const
Definition: TDFNodes.cxx:58
Bool_t Next()
Move to the next entry (or index of the TEntryList if that is set).
Definition: TTreeReader.h:161
std::shared_ptr< TTree > fTree
Definition: TDFNodes.hxx:59
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:552
unsigned int GetNSlots()
Definition: TDFUtils.cxx:125
Definition: tree.py:1
std::vector< std::shared_ptr< bool > > fResProxyReadiness
Definition: TDFNodes.hxx:57
TLoopManager * fImplPtr
A raw pointer to the TLoopManager at the root of this functional graph.
Definition: TDFNodes.hxx:212
A class to process the entries of a TTree in parallel.
::TDirectory * GetDirectory() const
Definition: TDFNodes.cxx:286
std::shared_ptr< TRangeBase > RangeBasePtr_t
Definition: TDFNodes.hxx:47