38#include <unordered_map>
52static std::string &GetCodeToJit()
54 static std::string code;
58static bool ContainsLeaf(
const std::set<TLeaf *> &leaves,
TLeaf *leaf)
60 return (leaves.find(leaf) != leaves.end());
66static void UpdateList(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
67 const std::string &friendName)
70 if (!friendName.empty()) {
72 const auto friendBName = friendName +
"." + branchName;
73 if (bNamesReg.insert(friendBName).second)
74 bNames.push_back(friendBName);
77 if (bNamesReg.insert(branchName).second)
78 bNames.push_back(branchName);
83static void UpdateList(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
84 const std::string &friendName, std::set<TLeaf *> &foundLeaves,
TLeaf *leaf,
bool allowDuplicates)
86 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
91 UpdateList(bNamesReg, bNames, branchName, friendName);
93 foundLeaves.insert(leaf);
97 std::string prefix, std::string &friendName)
99 for (
auto sb : *
b->GetListOfBranches()) {
101 auto subBranchName = std::string(subBranch->
GetName());
102 auto fullName = prefix + subBranchName;
104 std::string newPrefix;
106 newPrefix = fullName +
".";
108 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName);
110 auto branchDirectlyFromTree = t.
GetBranch(fullName.c_str());
111 if (!branchDirectlyFromTree)
112 branchDirectlyFromTree = t.
FindBranch(fullName.c_str());
113 if (branchDirectlyFromTree)
114 UpdateList(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName);
117 UpdateList(bNamesReg, bNames, subBranchName, friendName);
121static void GetBranchNamesImpl(
TTree &t, std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
122 std::set<TTree *> &analysedTrees, std::string &friendName,
bool allowDuplicates)
124 std::set<TLeaf *> foundLeaves;
125 if (!analysedTrees.insert(&t).second) {
134 std::string err(
"GetBranchNames: error in opening the tree ");
136 throw std::runtime_error(err);
139 for (
auto b : *branches) {
141 const auto branchName = std::string(branch->
GetName());
142 if (branch->IsA() == TBranch::Class()) {
145 if (listOfLeaves->GetEntries() == 1) {
146 auto leaf =
static_cast<TLeaf *
>(listOfLeaves->UncheckedAt(0));
147 UpdateList(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
150 for (
auto leaf : *listOfLeaves) {
151 auto castLeaf =
static_cast<TLeaf *
>(leaf);
152 const auto leafName = std::string(leaf->
GetName());
153 const auto fullName = branchName +
"." + leafName;
154 UpdateList(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
156 }
else if (branch->IsA() == TBranchObject::Class()) {
158 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName);
159 UpdateList(bNamesReg, bNames, branchName, friendName);
164 bool dotIsImplied =
false;
167 throw std::runtime_error(
"GetBranchNames: unsupported branch type");
169 if (be->GetType() == 3 || be->GetType() == 4)
172 if (dotIsImplied || branchName.back() ==
'.')
173 ExploreBranch(t, bNamesReg, bNames, branch,
"", friendName);
175 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName);
177 UpdateList(bNamesReg, bNames, branchName, friendName);
190 for (
auto friendTreeObj : *friendTrees) {
195 if (alias !=
nullptr)
196 frName = std::string(alias);
198 frName = std::string(friendTree->GetName());
200 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
204static void ThrowIfNSlotsChanged(
unsigned int nSlots)
207 if (currentSlots != nSlots) {
208 std::string msg =
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
209 std::to_string(nSlots) +
", but when starting the event loop it was " +
210 std::to_string(currentSlots) +
".";
211 if (currentSlots > nSlots)
212 msg +=
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
214 msg +=
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
215 throw std::runtime_error(msg);
227struct MaxTreeSizeRAII {
230 MaxTreeSizeRAII() : fOldMaxTreeSize(
TTree::GetMaxTreeSize())
238struct DatasetLogInfo {
239 std::string fDataSet;
245std::string LogRangeProcessing(
const DatasetLogInfo &info)
247 std::stringstream msg;
248 msg <<
"Processing " << info.fDataSet <<
": entry range [" << info.fRangeStart <<
"," << info.fRangeEnd - 1
249 <<
"], using slot " << info.fSlot <<
" in thread " << std::this_thread::get_id() <<
'.';
253DatasetLogInfo TreeDatasetLogInfo(
const TTreeReader &
r,
unsigned int slot)
255 const auto tree =
r.GetTree();
256 const auto chain =
dynamic_cast<TChain *
>(
tree);
260 std::vector<std::string> treeNames;
261 std::vector<std::string> fileNames;
263 treeNames.emplace_back(
f->GetName());
264 fileNames.emplace_back(
f->GetTitle());
267 for (
const auto &t : treeNames) {
271 what +=
" in files {";
272 for (
const auto &
f : fileNames) {
277 const auto treeName =
tree->GetName();
278 what = std::string(
"tree \"") + treeName +
"\"";
279 const auto file =
tree->GetCurrentFile();
281 what += std::string(
" in file \"") +
file->GetName() +
"\"";
283 const auto entryRange =
r.GetEntriesRange();
284 const ULong64_t end = entryRange.second == -1ll ?
tree->GetEntries() : entryRange.second;
285 return {std::move(
what),
static_cast<ULong64_t>(entryRange.first), end, slot};
295struct RCallCleanUpTask {
301 : fLoopManager(lm), fArg(arg), fReader(reader)
304 ~RCallCleanUpTask() { fLoopManager.
CleanUpTask(fReader, fArg); }
315 std::set<std::string> bNamesSet;
317 std::set<TTree *> analysedTrees;
318 std::string emptyFrName =
"";
319 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
324 : fTree(std::shared_ptr<
TTree>(
tree, [](
TTree *) {})), fDefaultColumns(defaultBranches),
327 fDataBlockNotifier(fNSlots)
340 fDataSource(std::move(ds)), fDataBlockNotifier(fNSlots)
348 RSlotRAII(
RSlotStack &slotStack) : fSlotStack(slotStack), fSlot(slotStack.GetSlot()) {}
349 ~RSlotRAII() { fSlotStack.
ReturnSlot(fSlot); }
361 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
369 entryRanges.emplace_back(start, end);
374 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
375 RSlotRAII slotRAII(slotStack);
376 auto slot = slotRAII.fSlot;
379 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({
"an empty source", range.first, range.second, slot});
381 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
386 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
392 pool.
Foreach(genFunction, entryRanges);
408 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
419 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*
fTree, entryList,
fNSlots);
421 std::atomic<ULong64_t> entryCount(0ull);
423 tp->Process([
this, &slotStack, &entryCount](
TTreeReader &
r) ->
void {
424 RSlotRAII slotRAII(slotStack);
425 auto slot = slotRAII.fSlot;
428 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(
r, slot));
429 const auto entryRange =
r.GetEntriesRange();
430 const auto nEntries = entryRange.second - entryRange.first;
431 auto count = entryCount.fetch_add(nEntries);
438 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
449 if (0 ==
fTree->GetEntriesFast())
453 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(
r, 0u));
462 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
467 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
468 std::to_string(
r.GetEntryStatus()));
483 for (
const auto &range : ranges) {
484 const auto start = range.first;
485 const auto end = range.second;
494 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
512 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
513 RSlotRAII slotRAII(slotStack);
514 const auto slot = slotRAII.fSlot;
518 const auto start = range.first;
519 const auto end = range.second;
522 for (
auto entry = start; entry < end; ++entry) {
528 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
536 while (!ranges.empty()) {
537 pool.
Foreach(runOnRange, ranges);
557 actionPtr->Run(slot, entry);
559 namedFilterPtr->CheckFilters(slot, entry);
571 ptr->InitSlot(
r, slot);
573 ptr->InitSlot(
r, slot);
623 ptr->ResetChildrenCount();
625 ptr->ResetChildrenCount();
638 ptr->FinalizeSlot(slot);
640 ptr->FinaliseSlot(slot);
650 const std::string code = std::move(GetCodeToJit());
652 R__LOG_INFO(RDFLogChannel()) <<
"Nothing to jit and execute.";
660 R__LOG_INFO(RDFLogChannel()) <<
"Just-in-time compilation phase completed"
661 << (s.
RealTime() > 1
e-3 ?
" in " + std::to_string(s.
RealTime()) +
" seconds." :
".");
673 actionPtr->TriggerChildrenCount();
675 namedFilterPtr->TriggerChildrenCount();
683 MaxTreeSizeRAII ctxtmts;
770 fPtr->FillReport(rep);
776 GetCodeToJit().append(code);
781 if (everyNEvents == 0ull)
789 std::vector<std::string>
filters;
791 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
824 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
name);
826 thisNode->SetCounter(0);
unsigned long long ULong64_t
R__EXTERN TVirtualMutex * gROOTMutex
#define R__LOCKGUARD(mutex)
The head node of a RDF computation graph.
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
void SetupDataBlockCallbacks(TTreeReader *r, unsigned int slot)
bool fMustRunNamedFilters
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
const ULong64_t fNEmptyEntries
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
friend struct RCallCleanUpTask
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::vector< RRangeBase * > fBookedRanges
std::vector< TCallback > fCallbacks
Registered callbacks.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
RDFInternal::RDataBlockNotifier fDataBlockNotifier
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
const unsigned int fNSlots
std::vector< Callback_t > fDataBlockCallbacks
Registered callbacks to call at the beginning of each "data block".
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
unsigned int GetNSlots() const
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void AddDataBlockCallback(std::function< void(unsigned int)> &&callback)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
TNotifyLink< RDataBlockFlag > & GetChainNotifyLink(unsigned int slot)
void UnsetFlag(unsigned int slot)
void SetFlag(unsigned int slot)
bool CheckFlag(unsigned int slot) const
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
void ReturnSlot(unsigned int slotNumber)
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
A TTree is a list of TBranches.
TObjArray * GetListOfLeaves()
A chain is a collection of files containing TTree objects.
TObjArray * GetListOfFiles() const
A List of entry numbers in a TTree or TChain.
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
virtual const char * GetName() const
Returns name of object.
Mother of all ROOT objects.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
@ kEntryNotFound
the tree entry number does not exist
A TTree represents a columnar dataset.
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
virtual TObjArray * GetListOfBranches()
virtual TTree * GetTree() const
virtual TList * GetListOfFriends() const
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
std::vector< std::string > ColumnNames_t
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
Long64_t InterpreterCalc(const std::string &code, const std::string &context)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.