Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.hxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
2
3/*************************************************************************
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#ifndef ROOT_RLOOPMANAGER
12#define ROOT_RLOOPMANAGER
13
14#include "ROOT/InternalTreeUtils.hxx" // RNoCleanupNotifier
20
21#include <functional>
22#include <limits>
23#include <map>
24#include <memory>
25#include <string>
26#include <unordered_map>
27#include <vector>
28
29// forward declarations
30class TTree;
31class TTreeReader;
32class TDirectory;
33
34namespace ROOT {
35namespace RDF {
36class RCutFlowReport;
37class RDataSource;
38} // ns RDF
39
40namespace Internal {
41namespace RDF {
42std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
43
44class GraphNode;
45class RActionBase;
46class RVariationBase;
47
48namespace GraphDrawing {
50} // ns GraphDrawing
51
52using Callback_t = std::function<void(unsigned int)>;
53
54class RCallback {
57 std::vector<ULong64_t> fCounters;
58
59public:
60 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
61 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
62 {
63 }
64
65 void operator()(unsigned int slot)
66 {
67 auto &c = fCounters[slot];
68 ++c;
69 if (c == fEveryN) {
70 c = 0ull;
71 fFun(slot);
72 }
73 }
74};
75
78 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
79
80public:
81 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
82
83 void operator()(unsigned int slot)
84 {
85 if (fHasBeenCalled[slot] == 1)
86 return;
87 fFun(slot);
88 fHasBeenCalled[slot] = 1;
89 }
90};
91
92} // namespace RDF
93} // namespace Internal
94} // namespace ROOT
95
96namespace ROOT {
97namespace Detail {
98namespace RDF {
100
101class RFilterBase;
102class RRangeBase;
103class RDefineBase;
105
106/// The head node of a RDF computation graph.
107/// This class is responsible of running the event loop.
108class RLoopManager : public RNodeBase {
109 using ColumnNames_t = std::vector<std::string>;
111
112 friend struct RCallCleanUpTask;
113
114 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
115 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
116 std::vector<RFilterBase *> fBookedFilters;
117 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
118 std::vector<RRangeBase *> fBookedRanges;
119 std::vector<RDefineBase *> fBookedDefines;
120 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
121
122 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
123 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
124 std::shared_ptr<TTree> fTree{nullptr};
126 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
127
128 /// Keys are `fname + "/" + treename` as RSampleInfo::fID; Values are pointers to the corresponding sample
129 std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
130 /// Samples need to survive throughout the whole event loop, hence stored as an attribute
131 std::vector<ROOT::RDF::Experimental::RSample> fSamples;
132
133 /// Friends of the fTree. Only used if we constructed fTree ourselves.
134 std::vector<std::unique_ptr<TChain>> fFriends;
136 /// Range of entries created when no data source is specified.
137 std::pair<ULong64_t, ULong64_t> fEmptyEntryRange{};
138 const unsigned int fNSlots{1};
140 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
141 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
142 /// Registered callbacks to be executed every N events.
143 /// The registration happens via the RegisterCallback method.
144 std::vector<RDFInternal::RCallback> fCallbacksEveryNEvents;
145 /// Registered callbacks to invoke just once before running the loop.
146 /// The registration happens via the RegisterCallback method.
147 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
148 /// Registered callbacks to call at the beginning of each "data block".
149 /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
150 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
152 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
153 unsigned int fNRuns{0}; ///< Number of event loops run
154
155 /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
156 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
157
158 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
160
162
163 void RunEmptySourceMT();
164 void RunEmptySource();
165 void RunTreeProcessorMT();
166 void RunTreeReader();
167 void RunDataSourceMT();
168 void RunDataSource();
169 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
170 void InitNodeSlots(TTreeReader *r, unsigned int slot);
171 void InitNodes();
172 void CleanUpNodes();
173 void CleanUpTask(TTreeReader *r, unsigned int slot);
174 void EvalChildrenCounts();
175 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
176 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
177 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
178
179public:
180 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
181 RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches);
182 RLoopManager(ULong64_t nEmptyEntries);
183 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
185 RLoopManager(const RLoopManager &) = delete;
187
189 void Jit();
190 RLoopManager *GetLoopManagerUnchecked() final { return this; }
191 void Run(bool jit = true);
193 TTree *GetTree() const;
196 RDataSource *GetDataSource() const { return fDataSource.get(); }
197 void Register(RDFInternal::RActionBase *actionPtr);
198 void Deregister(RDFInternal::RActionBase *actionPtr);
199 void Register(RFilterBase *filterPtr);
200 void Deregister(RFilterBase *filterPtr);
201 void Register(RRangeBase *rangePtr);
202 void Deregister(RRangeBase *rangePtr);
203 void Register(RDefineBase *definePtr);
204 void Deregister(RDefineBase *definePtr);
207 bool CheckFilters(unsigned int, Long64_t) final;
208 unsigned int GetNSlots() const { return fNSlots; }
209 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
210 /// End of recursive chain of calls, does nothing
212 void SetTree(std::shared_ptr<TTree> tree);
213 void IncrChildrenCount() final { ++fNChildren; }
214 void StopProcessing() final { ++fNStopsReceived; }
215 void ToJitExec(const std::string &) const;
216 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
217 unsigned int GetNRuns() const { return fNRuns; }
218 bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const;
219 void AddDataSourceColumnReaders(const std::string &col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
220 const std::type_info &ti);
221 RColumnReaderBase *AddTreeColumnReader(unsigned int slot, const std::string &col,
222 std::unique_ptr<RColumnReaderBase> &&reader, const std::type_info &ti);
223 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const;
224
225 /// End of recursive chain of calls, does nothing
226 void AddFilterName(std::vector<std::string> &) final {}
227 /// For each booked filter, returns either the name or "Unnamed Filter"
228 std::vector<std::string> GetFiltersNames();
229
230 /// Return all graph edges known to RLoopManager
231 /// This includes Filters and Ranges but not Defines.
232 std::vector<RNodeBase *> GetGraphEdges() const;
233
234 /// Return all actions, either booked or already run
235 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
236
237 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
238 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
239
241
242 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
243
244 void SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange);
246};
247
248/// \brief Create an RLoopManager that reads a TChain.
249/// \param[in] datasetName Name of the TChain
250/// \param[in] fileNameGlob File name (or glob) in which the TChain is stored.
251/// \param[in] defaultColumns List of default columns, see
252/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
253/// \return the RLoopManager instance.
254std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
255CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
256 const std::vector<std::string> &defaultColumns, bool checkFile = true);
257
258/// \brief Create an RLoopManager that reads a TChain.
259/// \param[in] datasetName Name of the TChain
260/// \param[in] fileNameGlobs List of file names (potentially globs).
261/// \param[in] defaultColumns List of default columns, see
262/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
263/// \return the RLoopManager instance.
264std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
265CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
266 const std::vector<std::string> &defaultColumns, bool checkFile = true);
267
268#ifdef R__HAS_ROOT7
269/// \brief Create an RLoopManager that reads an RNTuple.
270/// \param[in] datasetName Name of the RNTuple
271/// \param[in] fileNameGlob File name (or glob) in which the RNTuple is stored.
272/// \param[in] defaultColumns List of default columns, see
273/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
274/// \return the RLoopManager instance.
275std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
276 std::string_view fileNameGlob,
277 const std::vector<std::string> &defaultColumns);
278
279/// \brief Create an RLoopManager that reads multiple RNTuples chained vertically.
280/// \param[in] datasetName Name of the RNTuple
281/// \param[in] fileNameGlobs List of file names (potentially globs).
282/// \param[in] defaultColumns List of default columns, see
283/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
284/// \return the RLoopManager instance.
285std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
286 const std::vector<std::string> &fileNameGlobs,
287 const std::vector<std::string> &defaultColumns);
288
289/// \brief Create an RLoopManager opening a file and checking the data format of the dataset.
290/// \param[in] datasetName Name of the dataset in the file.
291/// \param[in] fileNameGlob File name (or glob) in which the dataset is stored.
292/// \param[in] defaultColumns List of default columns, see
293/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
294/// \throws std::invalid_argument if the file could not be opened.
295/// \return an RLoopManager of the appropriate data source.
296std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
297 std::string_view fileNameGlob,
298 const std::vector<std::string> &defaultColumns);
299
300/// \brief Create an RLoopManager that reads many files. The first is opened to infer the data source type.
301/// \param[in] datasetName Name of the dataset.
302/// \param[in] fileNameGlobs List of file names (potentially globs).
303/// \param[in] defaultColumns List of default columns, see
304/// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
305/// \throws std::invalid_argument if the file could not be opened.
306/// \return an RLoopManager of the appropriate data source.
307std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
308 const std::vector<std::string> &fileNameGlobs,
309 const std::vector<std::string> &defaultColumns);
310#endif
311
312} // namespace RDF
313} // namespace Detail
314} // namespace ROOT
315
316#endif
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
RLoopManager & operator=(const RLoopManager &)=delete
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
::TDirectory * GetDirectory() const
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
void AddFilterName(std::vector< std::string > &) final
End of recursive chain of calls, does nothing.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
void PartialReport(ROOT::RDF::RCutFlowReport &) const final
End of recursive chain of calls, does nothing.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RLoopManager(const RLoopManager &)=delete
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
RLoopManager * GetLoopManagerUnchecked() final
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
Base class for non-leaf nodes of the computational graph.
Definition RNodeBase.hxx:43
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
Helper class that provides the operation graph nodes.
RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
std::vector< ULong64_t > fCounters
void operator()(unsigned int slot)
ROneTimeCallback(Callback_t &&f, unsigned int nSlots)
This type includes all parts of RVariation that do not depend on the callable signature.
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
Describe directory structure in memory.
Definition TDirectory.h:45
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
std::function< void(unsigned int)> Callback_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
A RAII object that calls RLoopManager::CleanUpTask at destruction.