Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
16
17#include <functional>
18#include <map>
19#include <memory>
20#include <string>
21#include <vector>
22
23// forward declarations
24class TTree;
25class TTreeReader;
26class TDirectory;
27
28namespace ROOT {
29namespace RDF {
30class RCutFlowReport;
31class RDataSource;
32} // ns RDF
33
34namespace Internal {
35namespace RDF {
36std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
37
38class RActionBase;
39class GraphNode;
40
41namespace GraphDrawing {
43} // ns GraphDrawing
44} // ns RDF
45} // ns Internal
46
47namespace Detail {
48namespace RDF {
50
51class RFilterBase;
52class RRangeBase;
54using ColumnNames_t = std::vector<std::string>;
55
56/// The head node of a RDF computation graph.
57/// This class is responsible of running the event loop.
58class RLoopManager : public RNodeBase {
60 using Callback_t = std::function<void(unsigned int)>;
61 class TCallback {
64 std::vector<ULong64_t> fCounters;
65
66 public:
67 TCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
68 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
69 {
70 }
71
72 void operator()(unsigned int slot)
73 {
74 auto &c = fCounters[slot];
75 ++c;
76 if (c == fEveryN) {
77 c = 0ull;
78 fFun(slot);
79 }
80 }
81 };
82
85 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
86
87 public:
88 TOneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
89
90 void operator()(unsigned int slot)
91 {
92 if (fHasBeenCalled[slot] == 1)
93 return;
94 fFun(slot);
95 fHasBeenCalled[slot] = 1;
96 }
97 };
98
99 friend struct RCallCleanUpTask;
100
101 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
102 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
103 std::vector<RFilterBase *> fBookedFilters;
104 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
105 std::vector<RRangeBase *> fBookedRanges;
106
107 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
108 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
109 std::shared_ptr<TTree> fTree{nullptr};
112 const unsigned int fNSlots{1};
114 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
115 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
116 std::map<std::string, std::string> fAliasColumnNameMap; ///< ColumnNameAlias-columnName pairs
117 std::vector<TCallback> fCallbacks; ///< Registered callbacks
118 std::vector<TOneTimeCallback> fCallbacksOnce; ///< Registered callbacks to invoke just once before running the loop
119 std::vector<Callback_t> fDataBlockCallbacks; ///< Registered callbacks to call at the beginning of each "data block"
121 unsigned int fNRuns{0}; ///< Number of event loops run
122
123 /// Registry of per-slot value pointers for booked data-source columns
124 std::map<std::string, std::vector<void *>> fDSValuePtrMap;
125
126 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
128
130 void RunEmptySourceMT();
131 void RunEmptySource();
132 void RunTreeProcessorMT();
133 void RunTreeReader();
134 void RunDataSourceMT();
135 void RunDataSource();
136 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
137 void InitNodeSlots(TTreeReader *r, unsigned int slot);
138 void InitNodes();
139 void CleanUpNodes();
140 void CleanUpTask(TTreeReader *r, unsigned int slot);
141 void EvalChildrenCounts();
142 void SetupDataBlockCallbacks(TTreeReader *r, unsigned int slot);
143
144public:
145 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
146 RLoopManager(ULong64_t nEmptyEntries);
147 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
148 RLoopManager(const RLoopManager &) = delete;
150
152 void Jit();
153 RLoopManager *GetLoopManagerUnchecked() final { return this; }
154 void Run();
156 TTree *GetTree() const;
159 RDataSource *GetDataSource() const { return fDataSource.get(); }
160 void Book(RDFInternal::RActionBase *actionPtr);
161 void Deregister(RDFInternal::RActionBase *actionPtr);
162 void Book(RFilterBase *filterPtr);
163 void Deregister(RFilterBase *filterPtr);
164 void Book(RRangeBase *rangePtr);
165 void Deregister(RRangeBase *rangePtr);
166 bool CheckFilters(unsigned int, Long64_t) final;
167 unsigned int GetNSlots() const { return fNSlots; }
168 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
169 /// End of recursive chain of calls, does nothing
171 void SetTree(const std::shared_ptr<TTree> &tree) { fTree = tree; }
172 void IncrChildrenCount() final { ++fNChildren; }
173 void StopProcessing() final { ++fNStopsReceived; }
174 void ToJitExec(const std::string &) const;
175 void AddColumnAlias(const std::string &alias, const std::string &colName) { fAliasColumnNameMap[alias] = colName; }
176 const std::map<std::string, std::string> &GetAliasMap() const { return fAliasColumnNameMap; }
177 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
178 unsigned int GetNRuns() const { return fNRuns; }
179 bool HasDSValuePtrs(const std::string &col) const;
180 const std::map<std::string, std::vector<void *>> &GetDSValuePtrs() const { return fDSValuePtrMap; }
181 void AddDSValuePtrs(const std::string &col, const std::vector<void *> ptrs);
182
183 /// End of recursive chain of calls, does nothing
184 void AddFilterName(std::vector<std::string> &) {}
185 /// For each booked filter, returns either the name or "Unnamed Filter"
186 std::vector<std::string> GetFiltersNames();
187
188 /// Return all graph edges known to RLoopManager
189 /// This includes Filters and Ranges but not Defines.
190 std::vector<RNodeBase *> GetGraphEdges() const;
191
192 /// Return all actions, either booked or already run
193 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
194
195 std::vector<RDFInternal::RActionBase *> GetBookedActions() { return fBookedActions; }
196 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> GetGraph();
197
199
200 void AddDataBlockCallback(std::function<void(unsigned int)> &&callback);
201};
202
203} // ns RDF
204} // ns Detail
205} // ns ROOT
206
207#endif
ROOT::R::TRInterface & r
Definition Object.C:4
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
long long Long64_t
Definition RtypesCore.h:73
unsigned long long ULong64_t
Definition RtypesCore.h:74
typedef void((*Func_t)())
TCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
TOneTimeCallback(Callback_t &&f, unsigned int nSlots)
The head node of a RDF computation graph.
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
const std::map< std::string, std::string > & GetAliasMap() const
std::function< void(unsigned int)> Callback_t
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
RLoopManager & operator=(const RLoopManager &)=delete
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
::TDirectory * GetDirectory() const
void SetupDataBlockCallbacks(TTreeReader *r, unsigned int slot)
std::vector< RDFInternal::RActionBase * > GetBookedActions()
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
void AddFilterName(std::vector< std::string > &)
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::map< std::string, std::string > fAliasColumnNameMap
ColumnNameAlias-columnName pairs.
std::vector< TCallback > fCallbacks
Registered callbacks.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
RDFInternal::RDataBlockNotifier fDataBlockNotifier
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddColumnAlias(const std::string &alias, const std::string &colName)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
const std::map< std::string, std::vector< void * > > & GetDSValuePtrs() const
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
void SetTree(const std::shared_ptr< TTree > &tree)
std::vector< Callback_t > fDataBlockCallbacks
Registered callbacks to call at the beginning of each "data block".
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void AddDataBlockCallback(std::function< void(unsigned int)> &&callback)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
Base class for non-leaf nodes of the computational graph.
Definition RNodeBase.hxx:41
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:45
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:44
Helper class that provides the operation graph nodes.
Class used to create the operation graph to be printed in the dot representation.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
std::vector< std::string > ColumnNames_t
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition tree.py:1