1// Author: Enrico Guiraud, Danilo Piparo CERN 03/2017
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
14#include "ROOT/InternalTreeUtils.hxx" // RNoCleanupNotifier
21#include <functional>
22#include <limits>
23#include <map>
24#include <memory>
25#include <string>
26#include <unordered_map>
27#include <vector>
29// forward declarations
30class TTree;
31class TTreeReader;
32class TDirectory;
34namespace ROOT {
35namespace RDF {
36class RCutFlowReport;
37class RDataSource;
38} // ns RDF
40namespace Internal {
41namespace RDF {
42std::vector<std::string> GetBranchNames(TTree &t, bool allowDuplicates = true);
44class GraphNode;
45class RActionBase;
46class RVariationBase;
48namespace GraphDrawing {
50} // ns GraphDrawing
52using Callback_t = std::function<void(unsigned int)>;
54class RCallback {
57 std::vector<ULong64_t> fCounters;
60 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
61 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
62 {
63 }
65 void operator()(unsigned int slot)
66 {
67 auto &c = fCounters[slot];
68 ++c;
69 if (c == fEveryN) {
70 c = 0ull;
71 fFun(slot);
72 }
73 }
78 std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
81 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
83 void operator()(unsigned int slot)
84 {
85 if (fHasBeenCalled[slot] == 1)
86 return;
87 fFun(slot);
88 fHasBeenCalled[slot] = 1;
89 }
92} // namespace RDF
93} // namespace Internal
94} // namespace ROOT
96namespace ROOT {
97namespace Detail {
98namespace RDF {
101class RFilterBase;
102class RRangeBase;
103class RDefineBase;
106/// The head node of a RDF computation graph.
107/// This class is responsible of running the event loop.
108class RLoopManager : public RNodeBase {
109 using ColumnNames_t = std::vector<std::string>;
112 friend struct RCallCleanUpTask;
114 std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
115 std::vector<RDFInternal::RActionBase *> fRunActions; ///< Non-owning pointers to actions already run
116 std::vector<RFilterBase *> fBookedFilters;
117 std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
118 std::vector<RRangeBase *> fBookedRanges;
119 std::vector<RDefineBase *> fBookedDefines;
120 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
122 /// Shared pointer to the input TTree. It does not delete the pointee if the TTree/TChain was passed directly as an
123 /// argument to RDataFrame's ctor (in which case we let users retain ownership).
124 std::shared_ptr<TTree> fTree{nullptr};
126 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
128 /// Keys are `fname + "/" + treename` as RSampleInfo::fID; Values are pointers to the corresponding sample
129 std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
130 /// Samples need to survive throughout the whole event loop, hence stored as an attribute
131 std::vector<ROOT::RDF::Experimental::RSample> fSamples;
133 std::vector<std::unique_ptr<TTree>> fFriends; ///< Friends of the fTree. Only used if we constructed fTree ourselves.
136 const unsigned int fNSlots{1};
138 const ELoopType fLoopType; ///< The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
139 const std::unique_ptr<RDataSource> fDataSource; ///< Owning pointer to a data-source object. Null if no data-source
140 std::vector<RDFInternal::RCallback> fCallbacks; ///< Registered callbacks
141 /// Registered callbacks to invoke just once before running the loop
142 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
143 /// Registered callbacks to call at the beginning of each "data block".
144 /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
145 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
147 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
148 unsigned int fNRuns{0}; ///< Number of event loops run
150 /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
151 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
153 /// Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
158 void RunEmptySourceMT();
159 void RunEmptySource();
160 void RunTreeProcessorMT();
161 void RunTreeReader();
162 void RunDataSourceMT();
163 void RunDataSource();
164 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
165 void InitNodeSlots(TTreeReader *r, unsigned int slot);
166 void InitNodes();
167 void CleanUpNodes();
168 void CleanUpTask(TTreeReader *r, unsigned int slot);
169 void EvalChildrenCounts();
170 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
171 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
172 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
175 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
176 RLoopManager(ULong64_t nEmptyEntries);
177 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
179 RLoopManager(const RLoopManager &) = delete;
183 void Jit();
184 RLoopManager *GetLoopManagerUnchecked() final { return this; }
185 void Run(bool jit = true);
187 TTree *GetTree() const;
190 RDataSource *GetDataSource() const { return fDataSource.get(); }
191 void Register(RDFInternal::RActionBase *actionPtr);
192 void Deregister(RDFInternal::RActionBase *actionPtr);
193 void Register(RFilterBase *filterPtr);
194 void Deregister(RFilterBase *filterPtr);
195 void Register(RRangeBase *rangePtr);
196 void Deregister(RRangeBase *rangePtr);
197 void Register(RDefineBase *definePtr);
198 void Deregister(RDefineBase *definePtr);
201 bool CheckFilters(unsigned int, Long64_t) final;
202 unsigned int GetNSlots() const { return fNSlots; }
203 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
204 /// End of recursive chain of calls, does nothing
206 void SetTree(std::shared_ptr<TTree> tree);
207 void IncrChildrenCount() final { ++fNChildren; }
208 void StopProcessing() final { ++fNStopsReceived; }
209 void ToJitExec(const std::string &) const;
210 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
211 unsigned int GetNRuns() const { return fNRuns; }
212 bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const;
213 void AddDataSourceColumnReaders(const std::string &col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
214 const std::type_info &ti);
215 RColumnReaderBase *AddTreeColumnReader(unsigned int slot, const std::string &col,
216 std::unique_ptr<RColumnReaderBase> &&reader, const std::type_info &ti);
217 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const;
219 /// End of recursive chain of calls, does nothing
220 void AddFilterName(std::vector<std::string> &) final {}
221 /// For each booked filter, returns either the name or "Unnamed Filter"
222 std::vector<std::string> GetFiltersNames();
224 /// Return all graph edges known to RLoopManager
225 /// This includes Filters and Ranges but not Defines.
226 std::vector<RNodeBase *> GetGraphEdges() const;
228 /// Return all actions, either booked or already run
229 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
231 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
232 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
236 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
239} // ns RDF
240} // ns Detail
241} // namespace ROOT
