50#include <unordered_map>
64static std::string &GetCodeToJit()
66 static std::string code;
70static bool ContainsLeaf(
const std::set<TLeaf *> &leaves,
TLeaf *leaf)
72 return (leaves.find(leaf) != leaves.end());
78static void InsertBranchName(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
79 const std::string &friendName,
bool allowDuplicates)
81 if (!friendName.empty()) {
83 const auto friendBName = friendName +
"." + branchName;
84 if (bNamesReg.insert(friendBName).second)
85 bNames.push_back(friendBName);
88 if (allowDuplicates || friendName.empty()) {
89 if (bNamesReg.insert(branchName).second)
90 bNames.push_back(branchName);
96static void InsertBranchName(std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
const std::string &branchName,
97 const std::string &friendName, std::set<TLeaf *> &foundLeaves,
TLeaf *leaf,
100 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
105 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
107 foundLeaves.insert(leaf);
111 std::string prefix, std::string &friendName,
bool allowDuplicates)
113 for (
auto sb : *
b->GetListOfBranches()) {
115 auto subBranchName = std::string(subBranch->
GetName());
116 auto fullName = prefix + subBranchName;
118 std::string newPrefix;
120 newPrefix = fullName +
".";
122 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName, allowDuplicates);
124 auto branchDirectlyFromTree = t.
GetBranch(fullName.c_str());
125 if (!branchDirectlyFromTree)
126 branchDirectlyFromTree = t.
FindBranch(fullName.c_str());
127 if (branchDirectlyFromTree)
128 InsertBranchName(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName,
132 InsertBranchName(bNamesReg, bNames, subBranchName, friendName, allowDuplicates);
136static void GetBranchNamesImpl(
TTree &t, std::set<std::string> &bNamesReg,
ColumnNames_t &bNames,
137 std::set<TTree *> &analysedTrees, std::string &friendName,
bool allowDuplicates)
139 std::set<TLeaf *> foundLeaves;
140 if (!analysedTrees.insert(&t).second) {
149 std::string err(
"GetBranchNames: error in opening the tree ");
151 throw std::runtime_error(err);
154 for (
auto b : *branches) {
156 const auto branchName = std::string(branch->
GetName());
157 if (branch->IsA() == TBranch::Class()) {
160 if (listOfLeaves->GetEntriesUnsafe() == 1) {
161 auto leaf =
static_cast<TLeaf *
>(listOfLeaves->UncheckedAt(0));
162 InsertBranchName(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
165 for (
auto leaf : *listOfLeaves) {
166 auto castLeaf =
static_cast<TLeaf *
>(leaf);
167 const auto leafName = std::string(leaf->
GetName());
168 const auto fullName = branchName +
"." + leafName;
169 InsertBranchName(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
171 }
else if (branch->IsA() == TBranchObject::Class()) {
173 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName, allowDuplicates);
174 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
179 bool dotIsImplied =
false;
182 throw std::runtime_error(
"GetBranchNames: unsupported branch type");
184 if (be->GetType() == 3 || be->GetType() == 4)
187 if (dotIsImplied || branchName.back() ==
'.')
188 ExploreBranch(t, bNamesReg, bNames, branch,
"", friendName, allowDuplicates);
190 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName, allowDuplicates);
192 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
205 for (
auto friendTreeObj : *friendTrees) {
210 if (alias !=
nullptr)
211 frName = std::string(alias);
213 frName = std::string(friendTree->GetName());
215 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
219static void ThrowIfNSlotsChanged(
unsigned int nSlots)
222 if (currentSlots != nSlots) {
223 std::string msg =
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
224 std::to_string(nSlots) +
", but when starting the event loop it was " +
225 std::to_string(currentSlots) +
".";
226 if (currentSlots > nSlots)
227 msg +=
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
229 msg +=
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
230 throw std::runtime_error(msg);
242struct MaxTreeSizeRAII {
245 MaxTreeSizeRAII() : fOldMaxTreeSize(
TTree::GetMaxTreeSize())
253struct DatasetLogInfo {
254 std::string fDataSet;
260std::string LogRangeProcessing(
const DatasetLogInfo &info)
262 std::stringstream msg;
263 msg <<
"Processing " << info.fDataSet <<
": entry range [" << info.fRangeStart <<
"," << info.fRangeEnd - 1
264 <<
"], using slot " << info.fSlot <<
" in thread " << std::this_thread::get_id() <<
'.';
268DatasetLogInfo TreeDatasetLogInfo(
const TTreeReader &
r,
unsigned int slot)
270 const auto tree =
r.GetTree();
271 const auto chain =
dynamic_cast<TChain *
>(
tree);
275 std::vector<std::string> treeNames;
276 std::vector<std::string> fileNames;
278 treeNames.emplace_back(
f->GetName());
279 fileNames.emplace_back(
f->GetTitle());
282 for (
const auto &t : treeNames) {
286 what +=
" in files {";
287 for (
const auto &
f : fileNames) {
292 assert(
tree !=
nullptr);
293 const auto treeName =
tree->GetName();
294 what = std::string(
"tree \"") + treeName +
"\"";
295 const auto file =
tree->GetCurrentFile();
297 what += std::string(
" in file \"") +
file->GetName() +
"\"";
299 const auto entryRange =
r.GetEntriesRange();
300 const ULong64_t end = entryRange.second == -1ll ?
tree->GetEntries() : entryRange.second;
301 return {std::move(
what),
static_cast<ULong64_t>(entryRange.first), end, slot};
331 std::set<std::string> bNamesSet;
333 std::set<TTree *> analysedTrees;
334 std::string emptyFrName =
"";
335 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
340 : fTree(std::shared_ptr<
TTree>(
tree, [](
TTree *) {})), fDefaultColumns(defaultBranches),
343 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
350 fSampleInfos(fNSlots)
357 fDataSource(std::move(ds)), fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
363 : fBeginEntry(spec.fEntryRange.fBegin), fEndEntry(spec.fEntryRange.fEnd), fNSlots(
RDFInternal::
GetNSlots()),
365 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots)
367 auto chain = std::make_shared<TChain>(spec.fTreeNames.size() == 1 ? spec.fTreeNames[0].c_str() :
"");
368 if (spec.fTreeNames.size() == 1) {
372 for (
const auto &
f : spec.fFileNameGlobs)
373 chain->
Add(
f.c_str());
377 for (
auto i = 0u; i < spec.fFileNameGlobs.size(); i++) {
378 const auto fullpath = spec.fFileNameGlobs[i] +
"?#" + spec.fTreeNames[i];
379 chain->Add(fullpath.c_str());
385 const auto &friendNames = spec.fFriendInfo.fFriendNames;
386 const auto &friendFileNames = spec.fFriendInfo.fFriendFileNames;
387 const auto &friendChainSubNames = spec.fFriendInfo.fFriendChainSubNames;
388 const auto nFriends = friendNames.size();
390 for (
auto i = 0u; i < nFriends; ++i) {
391 const auto &thisFriendName = friendNames[i].first;
392 const auto &thisFriendAlias = friendNames[i].second;
393 const auto &thisFriendFiles = friendFileNames[i];
394 const auto &thisFriendChainSubNames = friendChainSubNames[i];
397 auto frChain = std::make_unique<TChain>(thisFriendName.c_str());
398 const auto nFileNames = friendFileNames[i].size();
399 if (thisFriendChainSubNames.empty()) {
402 for (
auto j = 0u; j < nFileNames; ++j) {
403 frChain->Add(thisFriendFiles[j].c_str());
408 for (
auto j = 0u; j < nFileNames; ++j) {
409 frChain->Add((thisFriendFiles[j] +
"?#" + thisFriendChainSubNames[j]).c_str());
414 fTree->AddFriend(frChain.get(), thisFriendAlias.c_str());
416 fFriends.emplace_back(std::move(frChain));
436 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
444 entryRanges.emplace_back(start, end);
449 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
451 auto slot = slotRAII.
fSlot;
457 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
462 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
468 pool.
Foreach(genFunction, entryRanges);
485 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
496 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*
fTree, entryList,
fNSlots);
498 std::atomic<ULong64_t> entryCount(0ull);
500 tp->Process([
this, &slotStack, &entryCount](
TTreeReader &
r) ->
void {
502 auto slot = slotRAII.
fSlot;
506 const auto entryRange =
r.GetEntriesRange();
507 const auto nEntries = entryRange.second - entryRange.first;
508 auto count = entryCount.fetch_add(nEntries);
518 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
525 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
526 std::to_string(
r.GetEntryStatus()));
543 throw std::logic_error(
"Something went wrong in initializing the TTreeReader.");
558 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
563 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
564 std::to_string(
r.GetEntryStatus()));
579 for (
const auto &range : ranges) {
580 const auto start = range.first;
581 const auto end = range.second;
590 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
608 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
610 const auto slot = slotRAII.
fSlot;
614 const auto start = range.first;
615 const auto end = range.second;
618 for (
auto entry = start; entry < end; ++entry) {
624 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
632 while (!ranges.empty()) {
633 pool.
Foreach(runOnRange, ranges);
652 actionPtr->Run(slot, entry);
654 namedFilterPtr->CheckFilters(slot, entry);
666 ptr->InitSlot(
r, slot);
668 ptr->InitSlot(
r, slot);
670 ptr->InitSlot(
r, slot);
672 ptr->InitSlot(
r, slot);
694 "Empty source, range: {" + std::to_string(range.first) +
", " + std::to_string(range.second) +
"}", range);
699 auto *
tree =
r.GetTree()->GetTree();
702 auto *
file =
tree->GetCurrentFile();
703 const std::string fname =
file !=
nullptr ?
file->GetName() :
"#inmemorytree#";
706 std::pair<Long64_t, Long64_t> range =
r.GetEntriesRange();
708 if (range.second == -1) {
709 range.second =
tree->GetEntries();
746 ptr->ResetChildrenCount();
748 ptr->ResetChildrenCount();
761 ptr->FinalizeSlot(slot);
763 ptr->FinaliseSlot(slot);
765 ptr->FinaliseSlot(slot);
775 const std::string code = std::move(GetCodeToJit());
786 << (s.
RealTime() > 1
e-3 ?
" in " + std::to_string(s.
RealTime()) +
" seconds." :
".");
798 actionPtr->TriggerChildrenCount();
800 namedFilterPtr->TriggerChildrenCount();
808 MaxTreeSizeRAII ctxtmts;
918 fPtr->FillReport(rep);
924 GetCodeToJit().append(code);
929 if (everyNEvents == 0ull)
937 std::vector<std::string>
filters;
939 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
972 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
name);
974 thisNode->SetCounter(0);
unsigned long long ULong64_t
R__EXTERN TVirtualMutex * gROOTMutex
#define R__LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
bool fMustRunNamedFilters
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< std::unique_ptr< TTree > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
const ULong64_t fNEmptyEntries
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::vector< RRangeBase * > fBookedRanges
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::vector< RDFInternal::RCallback > fCallbacks
Registered callbacks.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
void SetTree(const std::shared_ptr< TTree > &tree)
const unsigned int fNSlots
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
virtual ROOT::RDF::SampleCallback_t GetSampleCallback()=0
void SetFlag(unsigned int slot)
bool CheckFlag(unsigned int slot) const
void UnsetFlag(unsigned int slot)
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
void ReturnSlot(unsigned int slotNumber)
This type includes all parts of RVariation that do not depend on the callable signature.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
A TTree is a list of TBranches.
TObjArray * GetListOfLeaves()
A chain is a collection of files containing TTree objects.
TObjArray * GetListOfFiles() const
A List of entry numbers in a TTree or TChain.
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
virtual const char * GetName() const
Returns name of object.
Mother of all ROOT objects.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
virtual TObjArray * GetListOfBranches()
virtual TTree * GetTree() const
virtual TList * GetListOfFriends() const
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
ROOT::Experimental::RLogChannel & RDFLogChannel()
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::vector< std::string > ColumnNames_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with a RDataFrame computation graph via e....
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
TMatrixT< Element > & Add(TMatrixT< Element > &target, Element scalar, const TMatrixT< Element > &source)
Modify addition: target += scalar * source.
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
RLoopManager & fLoopManager
RSlotRAII(RSlotStack &slotStack)