Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
20#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
21#include "ROOT/RLogger.hxx"
22#include "RtypesCore.h" // Long64_t
23#include "TStopwatch.h"
24#include "TBranchElement.h"
25#include "TBranchObject.h"
26#include "TChain.h"
27#include "TEntryList.h"
28#include "TFile.h"
29#include "TFriendElement.h"
30#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
31#include "TTreeReader.h"
32#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
33
34#ifdef R__USE_IMT
37#include "ROOT/RSlotStack.hxx"
38#endif
39
40#ifdef R__HAS_ROOT7
41#include "ROOT/RNTuple.hxx"
42#include "ROOT/RNTupleDS.hxx"
43#endif
44
45#include <algorithm>
46#include <atomic>
47#include <cassert>
48#include <functional>
49#include <iostream>
50#include <memory>
51#include <stdexcept>
52#include <string>
53#include <sstream>
54#include <thread>
55#include <unordered_map>
56#include <vector>
57#include <set>
58#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
59
60using namespace ROOT::Detail::RDF;
61using namespace ROOT::Internal::RDF;
62
63namespace {
64/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
65/// This allows different RLoopManager instances to share these data.
66/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
67/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
68/// much faster than jitting each RLoopManager's code separately.
69std::string &GetCodeToJit()
70{
71 static std::string code;
72 return code;
73}
74
75bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
76{
77 return (leaves.find(leaf) != leaves.end());
78}
79
80///////////////////////////////////////////////////////////////////////////////
81/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
82/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
83void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
84 const std::string &friendName, bool allowDuplicates)
85{
86 if (!friendName.empty()) {
87 // In case of a friend tree, users might prepend its name/alias to the branch names
88 const auto friendBName = friendName + "." + branchName;
89 if (bNamesReg.insert(friendBName).second)
90 bNames.push_back(friendBName);
91 }
92
93 if (allowDuplicates || friendName.empty()) {
94 if (bNamesReg.insert(branchName).second)
95 bNames.push_back(branchName);
96 }
97}
98
99///////////////////////////////////////////////////////////////////////////////
100/// This overload makes sure that the TLeaf has not been already inserted.
101void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
102 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
103 bool allowDuplicates)
104{
105 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
106 if (!canAdd) {
107 return;
108 }
109
110 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
111
112 foundLeaves.insert(leaf);
113}
114
115void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
116 std::string prefix, std::string &friendName, bool allowDuplicates)
117{
118 for (auto sb : *b->GetListOfBranches()) {
119 TBranch *subBranch = static_cast<TBranch *>(sb);
120 auto subBranchName = std::string(subBranch->GetName());
121 auto fullName = prefix + subBranchName;
122
123 std::string newPrefix;
124 if (!prefix.empty())
125 newPrefix = fullName + ".";
126
127 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName, allowDuplicates);
128
129 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
130 if (!branchDirectlyFromTree)
131 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
132 if (branchDirectlyFromTree)
133 InsertBranchName(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName,
134 allowDuplicates);
135
136 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.GetBranch(subBranchName.c_str()))
137 InsertBranchName(bNamesReg, bNames, subBranchName, friendName, allowDuplicates);
138 }
139}
140
141void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
142 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
143{
144 std::set<TLeaf *> foundLeaves;
145 if (!analysedTrees.insert(&t).second) {
146 return;
147 }
148
149 const auto branches = t.GetListOfBranches();
150 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
151 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
152 // operations
153 if (!t.GetTree()) {
154 std::string err("GetBranchNames: error in opening the tree ");
155 err += t.GetName();
156 throw std::runtime_error(err);
157 }
158 if (branches) {
159 for (auto b : *branches) {
160 TBranch *branch = static_cast<TBranch *>(b);
161 const auto branchName = std::string(branch->GetName());
162 if (branch->IsA() == TBranch::Class()) {
163 // Leaf list
164 auto listOfLeaves = branch->GetListOfLeaves();
165 if (listOfLeaves->GetEntriesUnsafe() == 1) {
166 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
167 InsertBranchName(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
168 }
169
170 for (auto leaf : *listOfLeaves) {
171 auto castLeaf = static_cast<TLeaf *>(leaf);
172 const auto leafName = std::string(leaf->GetName());
173 const auto fullName = branchName + "." + leafName;
174 InsertBranchName(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
175 }
176 } else if (branch->IsA() == TBranchObject::Class()) {
177 // TBranchObject
178 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName, allowDuplicates);
179 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
180 } else {
181 // TBranchElement
182 // Check if there is explicit or implicit dot in the name
183
184 bool dotIsImplied = false;
185 auto be = dynamic_cast<TBranchElement *>(b);
186 if (!be)
187 throw std::runtime_error("GetBranchNames: unsupported branch type");
188 // TClonesArray (3) and STL collection (4)
189 if (be->GetType() == 3 || be->GetType() == 4)
190 dotIsImplied = true;
191
192 if (dotIsImplied || branchName.back() == '.')
193 ExploreBranch(t, bNamesReg, bNames, branch, "", friendName, allowDuplicates);
194 else
195 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName, allowDuplicates);
196
197 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
198 }
199 }
200 }
201
202 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
203 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
204 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
205 auto friendTrees = t.GetTree()->GetListOfFriends();
206
207 if (!friendTrees)
208 return;
209
210 for (auto friendTreeObj : *friendTrees) {
211 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
212
213 std::string frName;
214 auto alias = t.GetFriendAlias(friendTree);
215 if (alias != nullptr)
216 frName = std::string(alias);
217 else
218 frName = std::string(friendTree->GetName());
219
220 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
221 }
222}
223
224void ThrowIfNSlotsChanged(unsigned int nSlots)
225{
226 const auto currentSlots = RDFInternal::GetNSlots();
227 if (currentSlots != nSlots) {
228 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
229 std::to_string(nSlots) + ", but when starting the event loop it was " +
230 std::to_string(currentSlots) + ".";
231 if (currentSlots > nSlots)
232 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
233 else
234 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
235 throw std::runtime_error(msg);
236 }
237}
238
239/**
240\struct MaxTreeSizeRAII
241\brief Scope-bound change of `TTree::fgMaxTreeSize`.
242
243This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
244changes it to maximum at construction time and restores it back at destruction
245time. Needed for issue #6523 and should be reverted when #6640 will be solved.
246*/
247struct MaxTreeSizeRAII {
248 Long64_t fOldMaxTreeSize;
249
250 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
251 {
252 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
253 }
254
255 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
256};
257
258struct DatasetLogInfo {
259 std::string fDataSet;
260 ULong64_t fRangeStart;
261 ULong64_t fRangeEnd;
262 unsigned int fSlot;
263};
264
265std::string LogRangeProcessing(const DatasetLogInfo &info)
266{
267 std::stringstream msg;
268 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
269 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
270 return msg.str();
271}
272
273DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
274{
275 const auto tree = r.GetTree();
276 const auto chain = dynamic_cast<TChain *>(tree);
277 std::string what;
278 if (chain) {
279 auto files = chain->GetListOfFiles();
280 std::vector<std::string> treeNames;
281 std::vector<std::string> fileNames;
282 for (TObject *f : *files) {
283 treeNames.emplace_back(f->GetName());
284 fileNames.emplace_back(f->GetTitle());
285 }
286 what = "trees {";
287 for (const auto &t : treeNames) {
288 what += t + ",";
289 }
290 what.back() = '}';
291 what += " in files {";
292 for (const auto &f : fileNames) {
293 what += f + ",";
294 }
295 what.back() = '}';
296 } else {
297 assert(tree != nullptr); // to make clang-tidy happy
298 const auto treeName = tree->GetName();
299 what = std::string("tree \"") + treeName + "\"";
300 const auto file = tree->GetCurrentFile();
301 if (file)
302 what += std::string(" in file \"") + file->GetName() + "\"";
303 }
304 const auto entryRange = r.GetEntriesRange();
305 const ULong64_t end = entryRange.second == -1ll ? tree->GetEntries() : entryRange.second;
306 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
307}
308
309auto MakeDatasetColReadersKey(const std::string &colName, const std::type_info &ti)
310{
311 // We use a combination of column name and column type name as the key because in some cases we might end up
312 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
313 // df.Sum<vector<int>>("stdVectorBranch");
314 // df.Sum<RVecI>("stdVectorBranch");
315 return colName + ':' + ti.name();
316}
317} // anonymous namespace
318
319namespace ROOT {
320namespace Detail {
321namespace RDF {
322
323/// A RAII object that calls RLoopManager::CleanUpTask at destruction
326 unsigned int fArg;
328
329 RCallCleanUpTask(RLoopManager &lm, unsigned int arg = 0u, TTreeReader *reader = nullptr)
330 : fLoopManager(lm), fArg(arg), fReader(reader)
331 {
332 }
334};
335
336} // namespace RDF
337} // namespace Detail
338} // namespace ROOT
339
340///////////////////////////////////////////////////////////////////////////////
341/// Get all the branches names, including the ones of the friend trees
343{
344 std::set<std::string> bNamesSet;
345 ColumnNames_t bNames;
346 std::set<TTree *> analysedTrees;
347 std::string emptyFrName = "";
348 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
349 return bNames;
350}
351
352RLoopManager::RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
353 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
354 fNSlots(RDFInternal::GetNSlots()),
355 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
356 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots), fDatasetColumnReaders(fNSlots)
357{
358}
359
360RLoopManager::RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches)
361 : fTree(std::move(tree)),
362 fDefaultColumns(defaultBranches),
368{
369}
370
371RLoopManager::RLoopManager(ULong64_t nEmptyEntries)
372 : fEmptyEntryRange(0, nEmptyEntries),
378{
379}
380
381RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
382 : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
385{
386 fDataSource->SetNSlots(fNSlots);
387}
388
398
399/**
400 * @brief Changes the internal TTree held by the RLoopManager.
401 *
402 * @warning This method may lead to potentially dangerous interactions if used
403 * after the construction of the RDataFrame. Changing the specification makes
404 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
405 * new specification refers to exactly the same number of columns, with the
406 * same names and types. The actual use case of this method is moving the
407 * processing of the same RDataFrame to a different range of entries of the
408 * same dataset (which may be stored in a different set of files).
409 *
410 * @param spec The specification of the dataset to be adopted.
411 */
412void RLoopManager::ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
413{
414 // Change the range of entries to be processed
415 fBeginEntry = spec.GetEntryRangeBegin();
416 fEndEntry = spec.GetEntryRangeEnd();
417
418 // Store the samples
419 fSamples = spec.MoveOutSamples();
420 fSampleMap.clear();
421
422 // Create the internal main chain
424 for (auto &sample : fSamples) {
425 const auto &trees = sample.GetTreeNames();
426 const auto &files = sample.GetFileNameGlobs();
427 for (std::size_t i = 0ul; i < files.size(); ++i) {
428 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
429 // (see https://github.com/root-project/root/pull/8820 for why)
430 const auto fullpath = files[i] + "?#" + trees[i];
431 chain->Add(fullpath.c_str());
432 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
433 // change this easily because of backward compatibility: the sample ID
434 // is exposed to users via RSampleInfo and DefinePerSample).
435 const auto sampleId = files[i] + '/' + trees[i];
436 fSampleMap.insert({sampleId, &sample});
437 }
438 }
439 SetTree(std::move(chain));
440
441 // Create friends from the specification and connect them to the main chain
442 const auto &friendInfo = spec.GetFriendInfo();
444 for (std::size_t i = 0ul; i < fFriends.size(); i++) {
445 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
446 fTree->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
447 }
448}
449
450/// Run event loop with no source files, in parallel.
451void RLoopManager::RunEmptySourceMT()
452{
453#ifdef R__USE_IMT
455 // Working with an empty tree.
456 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
457 const auto nEmptyEntries = GetNEmptyEntries();
458 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
459 auto remainder = nEmptyEntries % (fNSlots * 2);
460 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
461 ULong64_t begin = fEmptyEntryRange.first;
462 while (begin < fEmptyEntryRange.second) {
463 ULong64_t end = begin + nEntriesPerSlot;
464 if (remainder > 0) {
465 ++end;
466 --remainder;
467 }
468 entryRanges.emplace_back(begin, end);
469 begin = end;
470 }
471
472 // Each task will generate a subrange of entries
473 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
474 ROOT::Internal::RSlotStackRAII slotRAII(slotStack);
475 auto slot = slotRAII.fSlot;
476 RCallCleanUpTask cleanup(*this, slot);
477 InitNodeSlots(nullptr, slot);
478 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
479 try {
480 UpdateSampleInfo(slot, range);
481 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
482 RunAndCheckFilters(slot, currEntry);
483 }
484 } catch (...) {
485 // Error might throw in experiment frameworks like CMSSW
486 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
487 throw;
488 }
489 };
490
492 pool.Foreach(genFunction, entryRanges);
493
494#endif // not implemented otherwise
495}
496
497/// Run event loop with no source files, in sequence.
498void RLoopManager::RunEmptySource()
499{
500 InitNodeSlots(nullptr, 0);
501 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing(
502 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
503 RCallCleanUpTask cleanup(*this);
504 try {
506 for (ULong64_t currEntry = fEmptyEntryRange.first;
507 currEntry < fEmptyEntryRange.second && fNStopsReceived < fNChildren; ++currEntry) {
508 RunAndCheckFilters(0, currEntry);
509 }
510 } catch (...) {
511 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
512 throw;
513 }
514}
515
516/// Run event loop over one or multiple ROOT files, in parallel.
517void RLoopManager::RunTreeProcessorMT()
518{
519#ifdef R__USE_IMT
520 if (fEndEntry == fBeginEntry) // empty range => no work needed
521 return;
523 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
524 auto tp = (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
525 ? std::make_unique<ROOT::TTreeProcessorMT>(*fTree, fNSlots, std::make_pair(fBeginEntry, fEndEntry))
526 : std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots);
527
528 std::atomic<ULong64_t> entryCount(0ull);
529
530 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
531 ROOT::Internal::RSlotStackRAII slotRAII(slotStack);
532 auto slot = slotRAII.fSlot;
533 RCallCleanUpTask cleanup(*this, slot, &r);
534 InitNodeSlots(&r, slot);
535 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, slot));
536 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
537 const auto nEntries = entryRange.second - entryRange.first;
538 auto count = entryCount.fetch_add(nEntries);
539 try {
540 // recursive call to check filters and conditionally execute actions
541 while (r.Next()) {
542 if (fNewSampleNotifier.CheckFlag(slot)) {
543 UpdateSampleInfo(slot, r);
544 }
545 RunAndCheckFilters(slot, count++);
546 }
547 } catch (...) {
548 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
549 throw;
550 }
551 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
552 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
553 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
554 // something went wrong in the TTreeReader event loop
555 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
556 std::to_string(r.GetEntryStatus()));
557 }
558 });
559#endif // no-op otherwise (will not be called)
560}
561
562/// Run event loop over one or multiple ROOT files, in sequence.
563void RLoopManager::RunTreeReader()
564{
565 TTreeReader r(fTree.get(), fTree->GetEntryList());
566 if (0 == fTree->GetEntriesFast() || fBeginEntry == fEndEntry)
567 return;
568 // Apply the range if there is any
569 // In case of a chain with a total of N entries, calling SetEntriesRange(N + 1, ...) does not error out
570 // This is a bug, reported here: https://github.com/root-project/root/issues/10774
571 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
572 if (r.SetEntriesRange(fBeginEntry, fEndEntry) != TTreeReader::kEntryValid)
573 throw std::logic_error("Something went wrong in initializing the TTreeReader.");
574
575 RCallCleanUpTask cleanup(*this, 0u, &r);
576 InitNodeSlots(&r, 0);
577 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, 0u));
578
579 // recursive call to check filters and conditionally execute actions
580 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
581 try {
582 while (r.Next() && fNStopsReceived < fNChildren) {
583 if (fNewSampleNotifier.CheckFlag(0)) {
584 UpdateSampleInfo(/*slot*/0, r);
585 }
586 RunAndCheckFilters(0, r.GetCurrentEntry());
587 }
588 } catch (...) {
589 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
590 throw;
591 }
592 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
593 // something went wrong in the TTreeReader event loop
594 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
595 std::to_string(r.GetEntryStatus()));
596 }
597}
598
599/// Run event loop over data accessed through a DataSource, in sequence.
600void RLoopManager::RunDataSource()
601{
602 assert(fDataSource != nullptr);
603 fDataSource->Initialize();
604 auto ranges = fDataSource->GetEntryRanges();
605 while (!ranges.empty() && fNStopsReceived < fNChildren) {
606 InitNodeSlots(nullptr, 0u);
607 fDataSource->InitSlot(0u, 0ull);
608 RCallCleanUpTask cleanup(*this);
609 try {
610 for (const auto &range : ranges) {
611 const auto start = range.first;
612 const auto end = range.second;
613 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
614 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
615 if (fDataSource->SetEntry(0u, entry)) {
616 RunAndCheckFilters(0u, entry);
617 }
618 }
619 }
620 } catch (...) {
621 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
622 throw;
623 }
624 fDataSource->FinalizeSlot(0u);
625 ranges = fDataSource->GetEntryRanges();
626 }
627 fDataSource->Finalize();
628}
629
630/// Run event loop over data accessed through a DataSource, in parallel.
631void RLoopManager::RunDataSourceMT()
632{
633#ifdef R__USE_IMT
634 assert(fDataSource != nullptr);
637
638 // Each task works on a subrange of entries
639 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
640 ROOT::Internal::RSlotStackRAII slotRAII(slotStack);
641 const auto slot = slotRAII.fSlot;
642 InitNodeSlots(nullptr, slot);
643 RCallCleanUpTask cleanup(*this, slot);
644 fDataSource->InitSlot(slot, range.first);
645 const auto start = range.first;
646 const auto end = range.second;
647 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
648 try {
649 for (auto entry = start; entry < end; ++entry) {
650 if (fDataSource->SetEntry(slot, entry)) {
651 RunAndCheckFilters(slot, entry);
652 }
653 }
654 } catch (...) {
655 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
656 throw;
657 }
658 fDataSource->FinalizeSlot(slot);
659 };
660
661 fDataSource->Initialize();
662 auto ranges = fDataSource->GetEntryRanges();
663 while (!ranges.empty()) {
664 pool.Foreach(runOnRange, ranges);
665 ranges = fDataSource->GetEntryRanges();
666 }
667 fDataSource->Finalize();
668#endif // not implemented otherwise (never called)
669}
670
671/// Execute actions and make sure named filters are called for each event.
672/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
673void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
674{
675 // data-block callbacks run before the rest of the graph
676 if (fNewSampleNotifier.CheckFlag(slot)) {
677 for (auto &callback : fSampleCallbacks)
678 callback.second(slot, fSampleInfos[slot]);
679 fNewSampleNotifier.UnsetFlag(slot);
680 }
681
682 for (auto *actionPtr : fBookedActions)
683 actionPtr->Run(slot, entry);
684 for (auto *namedFilterPtr : fBookedNamedFilters)
685 namedFilterPtr->CheckFilters(slot, entry);
686 for (auto &callback : fCallbacksEveryNEvents)
687 callback(slot);
688}
689
690/// Build TTreeReaderValues for all nodes
691/// This method loops over all filters, actions and other booked objects and
692/// calls their `InitSlot` method, to get them ready for running a task.
693void RLoopManager::InitNodeSlots(TTreeReader *r, unsigned int slot)
694{
695 SetupSampleCallbacks(r, slot);
696 for (auto *ptr : fBookedActions)
697 ptr->InitSlot(r, slot);
698 for (auto *ptr : fBookedFilters)
699 ptr->InitSlot(r, slot);
700 for (auto *ptr : fBookedDefines)
701 ptr->InitSlot(r, slot);
702 for (auto *ptr : fBookedVariations)
703 ptr->InitSlot(r, slot);
704
705 for (auto &callback : fCallbacksOnce)
706 callback(slot);
707}
708
709void RLoopManager::SetupSampleCallbacks(TTreeReader *r, unsigned int slot) {
710 if (r != nullptr) {
711 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
712 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
713 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
714 }
715 // Whatever the data source, initially set the "new data block" flag:
716 // - for TChains, this ensures that we don't skip the first data block because
717 // the correct tree is already loaded
718 // - for RDataSources and empty sources, which currently don't have data blocks, this
719 // ensures that we run once per task
720 fNewSampleNotifier.SetFlag(slot);
721}
722
723void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
725 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
726}
727
728void RLoopManager::UpdateSampleInfo(unsigned int slot, TTreeReader &r) {
729 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
730 auto *tree = r.GetTree()->GetTree();
731 R__ASSERT(tree != nullptr);
732 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
733 auto *file = tree->GetCurrentFile();
734 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
735
736 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
737 R__ASSERT(range.first >= 0);
738 if (range.second == -1) {
739 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
740 }
741 const std::string &id = fname + '/' + treename;
742 fSampleInfos[slot] = fSampleMap.empty() ? RSampleInfo(id, range) : RSampleInfo(id, range, fSampleMap[id]);
743}
744
745/// Initialize all nodes of the functional graph before running the event loop.
746/// This method is called once per event-loop and performs generic initialization
747/// operations that do not depend on the specific processing slot (i.e. operations
748/// that are common for all threads).
749void RLoopManager::InitNodes()
750{
752 for (auto *filter : fBookedFilters)
753 filter->InitNode();
754 for (auto *range : fBookedRanges)
755 range->InitNode();
756 for (auto *ptr : fBookedActions)
757 ptr->Initialize();
758}
759
760/// Perform clean-up operations. To be called at the end of each event loop.
761void RLoopManager::CleanUpNodes()
762{
763 fMustRunNamedFilters = false;
764
765 // forget RActions and detach TResultProxies
766 for (auto *ptr : fBookedActions)
767 ptr->Finalize();
768
769 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
770 fBookedActions.clear();
771
772 // reset children counts
773 fNChildren = 0;
774 fNStopsReceived = 0;
775 for (auto *ptr : fBookedFilters)
776 ptr->ResetChildrenCount();
777 for (auto *ptr : fBookedRanges)
778 ptr->ResetChildrenCount();
779
781 fCallbacksOnce.clear();
782}
783
784/// Perform clean-up operations. To be called at the end of each task execution.
785void RLoopManager::CleanUpTask(TTreeReader *r, unsigned int slot)
786{
787 if (r != nullptr)
788 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
789 for (auto *ptr : fBookedActions)
790 ptr->FinalizeSlot(slot);
791 for (auto *ptr : fBookedFilters)
792 ptr->FinalizeSlot(slot);
793 for (auto *ptr : fBookedDefines)
794 ptr->FinalizeSlot(slot);
795
797 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
798 // because the TTreeReader object changes at every task
799 for (auto &v : fDatasetColumnReaders[slot])
800 v.second.reset();
801 }
802}
803
804/// Add RDF nodes that require just-in-time compilation to the computation graph.
805/// This method also clears the contents of GetCodeToJit().
806void RLoopManager::Jit()
807{
808 {
810 if (GetCodeToJit().empty()) {
811 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
812 return;
813 }
814 }
815
816 const std::string code = []() {
818 return std::move(GetCodeToJit());
819 }();
820
821 TStopwatch s;
822 s.Start();
823 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
824 s.Stop();
825 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
826 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
827 : " in less than 1ms.");
828}
829
830/// Trigger counting of number of children nodes for each node of the functional graph.
831/// This is done once before starting the event loop. Each action sends an `increase children count` signal
832/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
833/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
834/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
835/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
836void RLoopManager::EvalChildrenCounts()
837{
838 for (auto *actionPtr : fBookedActions)
839 actionPtr->TriggerChildrenCount();
840 for (auto *namedFilterPtr : fBookedNamedFilters)
841 namedFilterPtr->TriggerChildrenCount();
842}
843
844/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
845/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
846/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
847void RLoopManager::Run(bool jit)
848{
849 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
850 MaxTreeSizeRAII ctxtmts;
851
852 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
853
854 ThrowIfNSlotsChanged(GetNSlots());
855
856 if (jit)
857 Jit();
858
859 InitNodes();
860
861 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
862 // we use RAII: even in case of an exception, the destructor of the object is invoked and
863 // all the cleanup takes place.
864 class NodesCleanerRAII {
865 RLoopManager &fRLM;
866
867 public:
868 NodesCleanerRAII(RLoopManager &thisRLM) : fRLM(thisRLM) {}
869 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
870 };
871
872 NodesCleanerRAII runKeeper(*this);
873
874 TStopwatch s;
875 s.Start();
876
877 switch (fLoopType) {
881 case ELoopType::kNoFiles: RunEmptySource(); break;
884 }
885 s.Stop();
886
887 fNRuns++;
888
889 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
890 << s.RealTime() << "s elapsed).";
891}
892
893/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
894const ColumnNames_t &RLoopManager::GetDefaultColumnNames() const
895{
896 return fDefaultColumns;
897}
898
899TTree *RLoopManager::GetTree() const
900{
901 return fTree.get();
902}
903
904void RLoopManager::Register(RDFInternal::RActionBase *actionPtr)
905{
906 fBookedActions.emplace_back(actionPtr);
907 AddSampleCallback(actionPtr, actionPtr->GetSampleCallback());
908}
909
910void RLoopManager::Deregister(RDFInternal::RActionBase *actionPtr)
911{
914 fSampleCallbacks.erase(actionPtr);
915}
916
917void RLoopManager::Register(RFilterBase *filterPtr)
918{
919 fBookedFilters.emplace_back(filterPtr);
920 if (filterPtr->HasName()) {
921 fBookedNamedFilters.emplace_back(filterPtr);
923 }
924}
925
926void RLoopManager::Deregister(RFilterBase *filterPtr)
927{
930}
931
932void RLoopManager::Register(RRangeBase *rangePtr)
933{
934 fBookedRanges.emplace_back(rangePtr);
935}
936
937void RLoopManager::Deregister(RRangeBase *rangePtr)
938{
940}
941
942void RLoopManager::Register(RDefineBase *ptr)
943{
944 fBookedDefines.emplace_back(ptr);
945}
946
947void RLoopManager::Deregister(RDefineBase *ptr)
948{
950 fSampleCallbacks.erase(ptr);
951}
952
953void RLoopManager::Register(RDFInternal::RVariationBase *v)
954{
955 fBookedVariations.emplace_back(v);
956}
957
958void RLoopManager::Deregister(RDFInternal::RVariationBase *v)
959{
961}
962
963// dummy call, end of recursive chain of calls
964bool RLoopManager::CheckFilters(unsigned int, Long64_t)
965{
966 return true;
967}
968
969/// Call `FillReport` on all booked filters
970void RLoopManager::Report(ROOT::RDF::RCutFlowReport &rep) const
971{
972 for (const auto *fPtr : fBookedNamedFilters)
973 fPtr->FillReport(rep);
974}
975
976void RLoopManager::SetTree(std::shared_ptr<TTree> tree)
977{
978 fTree = std::move(tree);
979
980 TChain *ch = nullptr;
981 if ((ch = dynamic_cast<TChain *>(fTree.get())))
982 fNoCleanupNotifier.RegisterChain(*ch);
983}
984
985void RLoopManager::ToJitExec(const std::string &code) const
986{
988 GetCodeToJit().append(code);
989}
990
991void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
992{
993 if (everyNEvents == 0ull)
994 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
995 else
996 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
997}
998
999std::vector<std::string> RLoopManager::GetFiltersNames()
1000{
1001 std::vector<std::string> filters;
1002 for (auto *filter : fBookedFilters) {
1003 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1004 filters.push_back(name);
1005 }
1006 return filters;
1007}
1008
1009std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1010{
1011 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1012 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1013 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1014 return nodes;
1015}
1016
1017std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1018{
1019 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1020 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1021 std::copy(fRunActions.begin(), fRunActions.end(), it);
1022 return actions;
1023}
1024
1025std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1026 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1027{
1028 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1029 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1030 if (duplicateRLoopManagerIt != visitedMap.end())
1031 return duplicateRLoopManagerIt->second;
1032
1033 std::string name;
1034 if (fDataSource) {
1035 name = fDataSource->GetLabel();
1036 } else if (fTree) {
1037 name = fTree->GetName();
1038 if (name.empty())
1039 name = fTree->ClassName();
1040 } else {
1041 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1042 }
1043 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1045 visitedMap[(void *)this] = thisNode;
1046 return thisNode;
1047}
1048
1049////////////////////////////////////////////////////////////////////////////
1050/// Return all valid TTree::Branch names (caching results for subsequent calls).
1051/// Never use fBranchNames directy, always request it through this method.
1052const ColumnNames_t &RLoopManager::GetBranchNames()
1053{
1054 if (fValidBranchNames.empty() && fTree) {
1055 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
1056 }
1057 return fValidBranchNames;
1058}
1059
1060/// Return true if AddDataSourceColumnReaders was called for column name col.
1061bool RLoopManager::HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
1062{
1063 const auto key = MakeDatasetColReadersKey(col, ti);
1064 assert(fDataSource != nullptr);
1065 // since data source column readers are always added for all slots at the same time,
1066 // if the reader is present for slot 0 we have it for all other slots as well.
1067 return fDatasetColumnReaders[0].find(key) != fDatasetColumnReaders[0].end();
1068}
1069
1070void RLoopManager::AddDataSourceColumnReaders(const std::string &col,
1071 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1072 const std::type_info &ti)
1073{
1074 const auto key = MakeDatasetColReadersKey(col, ti);
1075 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1076 assert(readers.size() == fNSlots);
1077
1078 for (auto slot = 0u; slot < fNSlots; ++slot) {
1079 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1080 }
1081}
1082
1083// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1084/// \brief Register a new RTreeColumnReader with this RLoopManager.
1085/// \return A shared pointer to the inserted column reader.
1086RColumnReaderBase *RLoopManager::AddTreeColumnReader(unsigned int slot, const std::string &col,
1087 std::unique_ptr<RColumnReaderBase> &&reader,
1088 const std::type_info &ti)
1089{
1090 auto &readers = fDatasetColumnReaders[slot];
1091 const auto key = MakeDatasetColReadersKey(col, ti);
1092 // if a reader for this column and this slot was already there, we are doing something wrong
1093 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1094 auto *rptr = reader.get();
1095 readers[key] = std::move(reader);
1096 return rptr;
1097}
1098
1100RLoopManager::GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
1101{
1102 const auto key = MakeDatasetColReadersKey(col, ti);
1103 auto it = fDatasetColumnReaders[slot].find(key);
1104 if (it != fDatasetColumnReaders[slot].end())
1105 return it->second.get();
1106 else
1107 return nullptr;
1108}
1109
1110void RLoopManager::AddSampleCallback(void *nodePtr, SampleCallback_t &&callback)
1111{
1112 if (callback)
1113 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1114}
1115
1116void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1117{
1118 fEmptyEntryRange = std::move(newRange);
1119}
1120
1121/**
1122 * \brief Helper function to open a file (or the first file from a glob).
1123 * This function is used at construction time of an RDataFrame, to check the
1124 * concrete type of the dataset stored inside the file.
1125 */
1126std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
1127{
1128 // Follow same logic in TChain::Add to find the correct string to look for globbing:
1129 // - If the extension ".root" is present in the file name, pass along the basename.
1130 // - If not, use the "?" token to delimit the part of the string which represents the basename.
1131 // - Otherwise, pass the full filename.
1132 auto &&baseNameAndQuery = [&fileNameGlob]() {
1133 constexpr std::string_view delim{".root"};
1134 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
1135 it != fileNameGlob.end()) {
1136 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
1137 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
1138 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
1139 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
1140 else
1141 return std::make_pair(fileNameGlob, std::string_view{});
1142 }();
1143 // Captured structured bindings variable are only valid since C++20
1144 auto &&baseName = baseNameAndQuery.first;
1145 auto &&query = baseNameAndQuery.second;
1146
1147 const auto nameHasWildcard = [&baseName]() {
1148 constexpr std::array<char, 4> wildCards{'[', ']', '*', '?'}; // Wildcards accepted by TChain::Add
1149 return std::any_of(wildCards.begin(), wildCards.end(),
1150 [&baseName](auto &&wc) { return baseName.find(wc) != std::string_view::npos; });
1151 }();
1152
1153 // Open first file in case of glob, suppose all files in the glob use the same data format
1154 std::string fileToOpen{nameHasWildcard
1155 ? ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName})[0] + std::string{query}
1156 : fileNameGlob};
1157
1158 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
1159 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
1160 if (!inFile || inFile->IsZombie())
1161 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
1162
1163 return inFile;
1164}
1165
1166std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1167ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
1168 const ROOT::RDF::ColumnNames_t &defaultColumns, bool checkFile)
1169{
1170 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1171 // Creating an RDataFrame with a non-existing file will throw early rather
1172 // than wait for the start of the graph execution.
1173 if (checkFile) {
1174 OpenFileWithSanityChecks(fileNameGlob);
1175 }
1176 std::string datasetNameInt{datasetName};
1177 std::string fileNameGlobInt{fileNameGlob};
1178 auto chain = ROOT::Internal::TreeUtils::MakeChainForMT(datasetNameInt.c_str());
1179 chain->Add(fileNameGlobInt.c_str());
1180 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1181 return lm;
1182}
1183
1184std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1185ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1186 const std::vector<std::string> &defaultColumns, bool checkFile)
1187{
1188 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1189 // Creating an RDataFrame with a non-existing file will throw early rather
1190 // than wait for the start of the graph execution.
1191 if (checkFile) {
1192 OpenFileWithSanityChecks(fileNameGlobs[0]);
1193 }
1194 std::string treeNameInt(datasetName);
1195 auto chain = ROOT::Internal::TreeUtils::MakeChainForMT(treeNameInt);
1196 for (auto &f : fileNameGlobs)
1197 chain->Add(f.c_str());
1198 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1199 return lm;
1200}
1201
1202#ifdef R__HAS_ROOT7
1203std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1204ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob,
1205 const ROOT::RDF::ColumnNames_t &defaultColumns)
1206{
1207 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlob);
1208 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1209 return lm;
1210}
1211
1212std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1213ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1214 const ROOT::RDF::ColumnNames_t &defaultColumns)
1215{
1216 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlobs);
1217 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1218 return lm;
1219}
1220
1221std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1222ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob,
1223 const ROOT::RDF::ColumnNames_t &defaultColumns)
1224{
1225
1226 auto inFile = OpenFileWithSanityChecks(fileNameGlob);
1227
1228 if (inFile->Get<TTree>(datasetName.data())) {
1229 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1230 } else if (inFile->Get<ROOT::Experimental::RNTuple>(datasetName.data())) {
1231 return CreateLMFromRNTuple(datasetName, fileNameGlob, defaultColumns);
1232 }
1233
1234 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1235 "\" in file \"" + inFile->GetName() + "\".");
1236}
1237
1238std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1239ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1240 const ROOT::RDF::ColumnNames_t &defaultColumns)
1241{
1242
1243 auto inFile = OpenFileWithSanityChecks(fileNameGlobs[0]);
1244
1245 if (inFile->Get<TTree>(datasetName.data())) {
1246 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1247 } else if (inFile->Get<ROOT::Experimental::RNTuple>(datasetName.data())) {
1248 return CreateLMFromRNTuple(datasetName, fileNameGlobs, defaultColumns);
1249 }
1250
1251 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1252 "\" in file \"" + inFile->GetName() + "\".");
1253}
1254#endif
true
Register systematic variations for multiple existing columns using auto-generated tags.
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:365
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
#define R__ASSERT(e)
Definition TError.h:118
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
char name[80]
Definition TGX11.cxx:110
Int_t i
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
virtual ROOT::RDF::SampleCallback_t GetSampleCallback()=0
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
static TClass * Class()
A TTree is a list of TBranches.
Definition TBranch.h:93
static TClass * Class()
TClass * IsA() const override
Definition TBranch.h:295
TObjArray * GetListOfLeaves()
Definition TBranch.h:247
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4110
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4841
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5294
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9187
virtual TObjArray * GetListOfBranches()
Definition TTree.h:491
virtual TTree * GetTree() const
Definition TTree.h:520
virtual TList * GetListOfFriends() const
Definition TTree.h:493
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6032
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
Retrieve the full path(s) to a TTree or the trees in a TChain.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::Experimental::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
Special implementation of ROOT::RRangeCast for TCollection, including a check that the cast target ty...
Definition TObject.h:384
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:301
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:189
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:345
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:595
R__EXTERN TVirtualRWMutex * gCoreMutex
static const char * what
Definition stlLoader.cc:5
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.