Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
14#include "ROOT/InternalTreeUtils.hxx" // RNoCleanupNotifier
20
21#include <functional>
22#include <limits>
23#include <map>
24#include <memory>
25#include <string>
26#include <unordered_map>
27#include <vector>
28
29// forward declarations
30class TTree;
31class TTreeReader;
32class TDirectory;
33
34namespace ROOT {
35namespace RDF {
36class RCutFlowReport;
37class RDataSource;
38} // ns RDF
39
40namespace Internal {
41namespace RDF {
42std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
43
44class GraphNode;
45class RActionBase;
46class RVariationBase;
47
48namespace GraphDrawing {
50} // ns GraphDrawing
51
52using Callback_t = std::function<void(unsigned int)>;
53
54class RCallback {
57 std::vector<ULong64_t> fCounters;
58
59public:
60 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
61 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
62 {
63 }
64
65 void operator()(unsigned int slot)
66 {
67 auto &c = fCounters[slot];
68 ++c;
69 if (c == fEveryN) {
70 c = 0ull;
71 fFun(slot);
72 }
73 }
74};
75
78 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
79
80public:
81 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
82
83 void operator()(unsigned int slot)
84 {
85 if (fHasBeenCalled[slot] == 1)
86 return;
87 fFun(slot);
88 fHasBeenCalled[slot] = 1;
89 }
90};
91
92} // namespace RDF
93} // namespace Internal
94} // namespace ROOT
95
96namespace ROOT {
97namespace Detail {
98namespace RDF {
100
101class RFilterBase;
102class RRangeBase;
103class RDefineBase;
105
106/// The head node of a RDF computation graph.
107/// This class is responsible of running the event loop.
108class RLoopManager : public RNodeBase {
109 using ColumnNames_t = std::vector<std::string>;
111
112 friend struct RCallCleanUpTask;
113
114 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
115 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
116 std::vector<RFilterBase *> fBookedFilters;
117 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
118 std::vector<RRangeBase *> fBookedRanges;
119 std::vector<RDefineBase *> fBookedDefines;
120 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
121
122 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
123 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
124 std::shared_ptr<TTree> fTree{nullptr};
126 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
127
128 /// Keys are `fname + "/" + treename` as RSampleInfo::fID; Values are pointers to the corresponding sample
129 std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
130 /// Samples need to survive throughout the whole event loop, hence stored as an attribute
131 std::vector<ROOT::RDF::Experimental::RSample> fSamples;
132
133 /// Friends of the fTree. Only used if we constructed fTree ourselves.
134 std::vector<std::unique_ptr<TChain>> fFriends;
136 /// Range of entries created when no data source is specified.
137 std::pair<ULong64_t, ULong64_t> fEmptyEntryRange{};
138 const unsigned int fNSlots{1};
140 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
141 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
142 /// Registered callbacks to be executed every N events.
143 /// The registration happens via the RegisterCallback method.
144 std::vector<RDFInternal::RCallback> fCallbacksEveryNEvents;
145 /// Registered callbacks to invoke just once before running the loop.
146 /// The registration happens via the RegisterCallback method.
147 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
148 /// Registered callbacks to call at the beginning of each "data block".
149 /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
150 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
152 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
153 unsigned int fNRuns{0}; ///< Number of event loops run
154
155 /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
156 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
157
158 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
160
162
163 void RunEmptySourceMT();
164 void RunEmptySource();
165 void RunTreeProcessorMT();
166 void RunTreeReader();
167 void RunDataSourceMT();
168 void RunDataSource();
169 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
170 void InitNodeSlots(TTreeReader *r, unsigned int slot);
171 void InitNodes();
172 void CleanUpNodes();
173 void CleanUpTask(TTreeReader *r, unsigned int slot);
174 void EvalChildrenCounts();
175 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
176 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
177 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
178
179public:
180 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
181 RLoopManager(ULong64_t nEmptyEntries);
182 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
184 RLoopManager(const RLoopManager &) = delete;
186
188 void Jit();
189 RLoopManager *GetLoopManagerUnchecked() final { return this; }
190 void Run(bool jit = true);
192 TTree *GetTree() const;
195 RDataSource *GetDataSource() const { return fDataSource.get(); }
196 void Register(RDFInternal::RActionBase *actionPtr);
197 void Deregister(RDFInternal::RActionBase *actionPtr);
198 void Register(RFilterBase *filterPtr);
199 void Deregister(RFilterBase *filterPtr);
200 void Register(RRangeBase *rangePtr);
201 void Deregister(RRangeBase *rangePtr);
202 void Register(RDefineBase *definePtr);
203 void Deregister(RDefineBase *definePtr);
206 bool CheckFilters(unsigned int, Long64_t) final;
207 unsigned int GetNSlots() const { return fNSlots; }
208 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
209 /// End of recursive chain of calls, does nothing
211 void SetTree(std::shared_ptr<TTree> tree);
212 void IncrChildrenCount() final { ++fNChildren; }
213 void StopProcessing() final { ++fNStopsReceived; }
214 void ToJitExec(const std::string &) const;
215 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
216 unsigned int GetNRuns() const { return fNRuns; }
217 bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const;
218 void AddDataSourceColumnReaders(const std::string &col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
219 const std::type_info &ti);
220 RColumnReaderBase *AddTreeColumnReader(unsigned int slot, const std::string &col,
221 std::unique_ptr<RColumnReaderBase> &&reader, const std::type_info &ti);
222 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const;
223
224 /// End of recursive chain of calls, does nothing
225 void AddFilterName(std::vector<std::string> &) final {}
226 /// For each booked filter, returns either the name or "Unnamed Filter"
227 std::vector<std::string> GetFiltersNames();
228
229 /// Return all graph edges known to RLoopManager
230 /// This includes Filters and Ranges but not Defines.
231 std::vector<RNodeBase *> GetGraphEdges() const;
232
233 /// Return all actions, either booked or already run
234 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
235
236 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
237 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
238
240
241 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
242
243 void SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange);
245};
246
247} // ns RDF
248} // ns Detail
249} // namespace ROOT
250
251#endif
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
RLoopManager & operator=(const RLoopManager &)=delete
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
::TDirectory * GetDirectory() const
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
void AddFilterName(std::vector< std::string > &) final
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
Base class for non-leaf nodes of the computational graph.
Definition RNodeBase.hxx:43
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
Helper class that provides the operation graph nodes.
RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
std::vector< ULong64_t > fCounters
void operator()(unsigned int slot)
ROneTimeCallback(Callback_t &&f, unsigned int nSlots)
This type includes all parts of RVariation that do not depend on the callable signature.
A dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
std::function< void(unsigned int)> Callback_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
Definition tree.py:1
A RAII object that calls RLoopManager::CleanUpTask at destruction.