Logo ROOT  
Reference Guide
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
14#include "ROOT/InternalTreeUtils.hxx" // RNoCleanupNotifier
20
21#include <functional>
22#include <limits>
23#include <map>
24#include <memory>
25#include <string>
26#include <unordered_map>
27#include <vector>
28
29// forward declarations
30class TTree;
31class TTreeReader;
32class TDirectory;
33
34namespace ROOT {
35namespace RDF {
36class RCutFlowReport;
37class RDataSource;
38} // ns RDF
39
40namespace Internal {
41namespace RDF {
42std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
43
44class GraphNode;
45class RActionBase;
46class RVariationBase;
47
48namespace GraphDrawing {
50} // ns GraphDrawing
51
52using Callback_t = std::function<void(unsigned int)>;
53
54class RCallback {
57 std::vector<ULong64_t> fCounters;
58
59public:
60 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
61 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
62 {
63 }
64
65 void operator()(unsigned int slot)
66 {
67 auto &c = fCounters[slot];
68 ++c;
69 if (c == fEveryN) {
70 c = 0ull;
71 fFun(slot);
72 }
73 }
74};
75
78 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
79
80public:
81 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
82
83 void operator()(unsigned int slot)
84 {
85 if (fHasBeenCalled[slot] == 1)
86 return;
87 fFun(slot);
88 fHasBeenCalled[slot] = 1;
89 }
90};
91
92} // namespace RDF
93} // namespace Internal
94} // namespace ROOT
95
96namespace ROOT {
97namespace Detail {
98namespace RDF {
100
101class RFilterBase;
102class RRangeBase;
103class RDefineBase;
105
106/// The head node of a RDF computation graph.
107/// This class is responsible of running the event loop.
108class RLoopManager : public RNodeBase {
109 using ColumnNames_t = std::vector<std::string>;
110 enum class ELoopType { kROOTFiles, kROOTFilesMT, kNoFiles, kNoFilesMT, kDataSource, kDataSourceMT };
111
112 friend struct RCallCleanUpTask;
113
114 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
115 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
116 std::vector<RFilterBase *> fBookedFilters;
117 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
118 std::vector<RRangeBase *> fBookedRanges;
119 std::vector<RDefineBase *> fBookedDefines;
120 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
121
122 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
123 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
124 std::shared_ptr<TTree> fTree{nullptr};
126 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
127 std::vector<std::unique_ptr<TTree>> fFriends; ///< Friends of the fTree. Only used if we constructed fTree ourselves.
130 const unsigned int fNSlots{1};
132 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
133 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
134 std::vector<RDFInternal::RCallback> fCallbacks; ///< Registered callbacks
135 /// Registered callbacks to invoke just once before running the loop
136 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
137 /// Registered callbacks to call at the beginning of each "data block".
138 /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
139 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
141 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
142 unsigned int fNRuns{0}; ///< Number of event loops run
143
144 /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
145 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
146
147 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
149
151
152 void RunEmptySourceMT();
153 void RunEmptySource();
154 void RunTreeProcessorMT();
155 void RunTreeReader();
156 void RunDataSourceMT();
157 void RunDataSource();
158 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
159 void InitNodeSlots(TTreeReader *r, unsigned int slot);
160 void InitNodes();
161 void CleanUpNodes();
162 void CleanUpTask(TTreeReader *r, unsigned int slot);
163 void EvalChildrenCounts();
164 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
165 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
166 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
167
168public:
169 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
170 RLoopManager(ULong64_t nEmptyEntries);
171 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
173 RLoopManager(const RLoopManager &) = delete;
175
177 void Jit();
178 RLoopManager *GetLoopManagerUnchecked() final { return this; }
179 void Run(bool jit = true);
181 TTree *GetTree() const;
184 RDataSource *GetDataSource() const { return fDataSource.get(); }
185 void Register(RDFInternal::RActionBase *actionPtr);
186 void Deregister(RDFInternal::RActionBase *actionPtr);
187 void Register(RFilterBase *filterPtr);
188 void Deregister(RFilterBase *filterPtr);
189 void Register(RRangeBase *rangePtr);
190 void Deregister(RRangeBase *rangePtr);
191 void Register(RDefineBase *definePtr);
192 void Deregister(RDefineBase *definePtr);
195 bool CheckFilters(unsigned int, Long64_t) final;
196 unsigned int GetNSlots() const { return fNSlots; }
197 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
198 /// End of recursive chain of calls, does nothing
200 void SetTree(std::shared_ptr<TTree> tree);
201 void IncrChildrenCount() final { ++fNChildren; }
202 void StopProcessing() final { ++fNStopsReceived; }
203 void ToJitExec(const std::string &) const;
204 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
205 unsigned int GetNRuns() const { return fNRuns; }
206 bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const;
207 void AddDataSourceColumnReaders(const std::string &col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
208 const std::type_info &ti);
209 RColumnReaderBase *AddTreeColumnReader(unsigned int slot, const std::string &col,
210 std::unique_ptr<RColumnReaderBase> &&reader, const std::type_info &ti);
211 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const;
212
213 /// End of recursive chain of calls, does nothing
214 void AddFilterName(std::vector<std::string> &) final {}
215 /// For each booked filter, returns either the name or "Unnamed Filter"
216 std::vector<std::string> GetFiltersNames();
217
218 /// Return all graph edges known to RLoopManager
219 /// This includes Filters and Ranges but not Defines.
220 std::vector<RNodeBase *> GetGraphEdges() const;
221
222 /// Return all actions, either booked or already run
223 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
224
225 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
226 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
227
229
230 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
231};
232
233} // ns RDF
234} // ns Detail
235} // namespace ROOT
236
237#endif
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
long long Long64_t
Definition: RtypesCore.h:80
unsigned long long ULong64_t
Definition: RtypesCore.h:81
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
ULong64_t GetNEmptyEntries() const
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
RLoopManager & operator=(const RLoopManager &)=delete
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
unsigned int GetNRuns() const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
::TDirectory * GetDirectory() const
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< std::unique_ptr< TTree > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
void AddFilterName(std::vector< std::string > &) final
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::vector< RDFInternal::RCallback > fCallbacks
Registered callbacks.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
Base class for non-leaf nodes of the computational graph.
Definition: RNodeBase.hxx:43
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: RNodeBase.hxx:46
Helper class that provides the operation graph nodes.
RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
std::vector< ULong64_t > fCounters
void operator()(unsigned int slot)
void operator()(unsigned int slot)
ROneTimeCallback(Callback_t &&f, unsigned int nSlots)
This type includes all parts of RVariation that do not depend on the callable signature.
A dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition: TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:44
A TTree represents a columnar dataset.
Definition: TTree.h:79
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
std::function< void(unsigned int)> Callback_t
void(off) SmallVectorTemplateBase< T
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
Definition: RSampleInfo.hxx:84
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:167
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Definition: tree.py:1
A RAII object that calls RLoopManager::CleanUpTask at destruction.