Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
20#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
21#include "ROOT/RLogger.hxx"
22#include "RtypesCore.h" // Long64_t
23#include "TStopwatch.h"
24#include "TBranchElement.h"
25#include "TBranchObject.h"
26#include "TChain.h"
27#include "TEntryList.h"
28#include "TFile.h"
29#include "TFriendElement.h"
30#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
31#include "TTreeReader.h"
32#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
33
34#ifdef R__USE_IMT
37#include "ROOT/RSlotStack.hxx"
38#endif
39
40#ifdef R__HAS_ROOT7
41#include "ROOT/RNTuple.hxx"
42#include "ROOT/RNTupleDS.hxx"
43#endif
44
45#ifdef R__UNIX
46// Functions needed to perform EOS XRootD redirection in ChangeSpec
47#include <optional>
48#include "TEnv.h"
49#include "TSystem.h"
50#ifndef R__FBSD
51#include <sys/xattr.h>
52#else
53#include <sys/extattr.h>
54#endif
55#ifdef R__MACOSX
56/* On macOS getxattr takes two extra arguments that should be set to 0 */
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
58#endif
59#ifdef R__FBSD
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
61#endif
62#endif
63
64#include <algorithm>
65#include <atomic>
66#include <cassert>
67#include <functional>
68#include <iostream>
69#include <memory>
70#include <stdexcept>
71#include <string>
72#include <sstream>
73#include <thread>
74#include <unordered_map>
75#include <vector>
76#include <set>
77#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
78
79using namespace ROOT::Detail::RDF;
80using namespace ROOT::Internal::RDF;
81
82namespace {
83/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
84/// This allows different RLoopManager instances to share these data.
85/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
86/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
87/// much faster than jitting each RLoopManager's code separately.
88std::string &GetCodeToJit()
89{
90 static std::string code;
91 return code;
92}
93
94bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
95{
96 return (leaves.find(leaf) != leaves.end());
97}
98
99///////////////////////////////////////////////////////////////////////////////
100/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
101/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
102void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
103 const std::string &friendName, bool allowDuplicates)
104{
105 if (!friendName.empty()) {
106 // In case of a friend tree, users might prepend its name/alias to the branch names
107 const auto friendBName = friendName + "." + branchName;
108 if (bNamesReg.insert(friendBName).second)
109 bNames.push_back(friendBName);
110 }
111
112 if (allowDuplicates || friendName.empty()) {
113 if (bNamesReg.insert(branchName).second)
114 bNames.push_back(branchName);
115 }
116}
117
118///////////////////////////////////////////////////////////////////////////////
119/// This overload makes sure that the TLeaf has not been already inserted.
120void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
121 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
122 bool allowDuplicates)
123{
125 if (!canAdd) {
126 return;
127 }
128
130
131 foundLeaves.insert(leaf);
132}
133
134void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
135 std::string prefix, std::string &friendName, bool allowDuplicates)
136{
137 for (auto sb : *b->GetListOfBranches()) {
138 TBranch *subBranch = static_cast<TBranch *>(sb);
139 auto subBranchName = std::string(subBranch->GetName());
140 auto fullName = prefix + subBranchName;
141
142 std::string newPrefix;
143 if (!prefix.empty())
144 newPrefix = fullName + ".";
145
147
148 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
150 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
154
155 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.GetBranch(subBranchName.c_str()))
157 }
158}
159
160void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
161 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
162{
163 std::set<TLeaf *> foundLeaves;
164 if (!analysedTrees.insert(&t).second) {
165 return;
166 }
167
168 const auto branches = t.GetListOfBranches();
169 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
170 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
171 // operations
172 if (!t.GetTree()) {
173 std::string err("GetBranchNames: error in opening the tree ");
174 err += t.GetName();
175 throw std::runtime_error(err);
176 }
177 if (branches) {
178 for (auto b : *branches) {
179 TBranch *branch = static_cast<TBranch *>(b);
180 const auto branchName = std::string(branch->GetName());
181 if (branch->IsA() == TBranch::Class()) {
182 // Leaf list
183 auto listOfLeaves = branch->GetListOfLeaves();
184 if (listOfLeaves->GetEntriesUnsafe() == 1) {
185 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
187 }
188
189 for (auto leaf : *listOfLeaves) {
190 auto castLeaf = static_cast<TLeaf *>(leaf);
191 const auto leafName = std::string(leaf->GetName());
192 const auto fullName = branchName + "." + leafName;
194 }
195 } else if (branch->IsA() == TBranchObject::Class()) {
196 // TBranchObject
199 } else {
200 // TBranchElement
201 // Check if there is explicit or implicit dot in the name
202
203 bool dotIsImplied = false;
204 auto be = dynamic_cast<TBranchElement *>(b);
205 if (!be)
206 throw std::runtime_error("GetBranchNames: unsupported branch type");
207 // TClonesArray (3) and STL collection (4)
208 if (be->GetType() == 3 || be->GetType() == 4)
209 dotIsImplied = true;
210
211 if (dotIsImplied || branchName.back() == '.')
213 else
215
217 }
218 }
219 }
220
221 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
222 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
223 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
224 auto friendTrees = t.GetTree()->GetListOfFriends();
225
226 if (!friendTrees)
227 return;
228
229 for (auto friendTreeObj : *friendTrees) {
230 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
231
232 std::string frName;
234 if (alias != nullptr)
235 frName = std::string(alias);
236 else
237 frName = std::string(friendTree->GetName());
238
240 }
241}
242
243void ThrowIfNSlotsChanged(unsigned int nSlots)
244{
246 if (currentSlots != nSlots) {
247 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
248 std::to_string(nSlots) + ", but when starting the event loop it was " +
249 std::to_string(currentSlots) + ".";
250 if (currentSlots > nSlots)
251 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
252 else
253 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
254 throw std::runtime_error(msg);
255 }
256}
257
258/**
259\struct MaxTreeSizeRAII
260\brief Scope-bound change of `TTree::fgMaxTreeSize`.
261
262This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
263changes it to maximum at construction time and restores it back at destruction
264time. Needed for issue #6523 and should be reverted when #6640 will be solved.
265*/
266struct MaxTreeSizeRAII {
267 Long64_t fOldMaxTreeSize;
268
269 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
270 {
271 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
272 }
273
274 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
275};
276
277struct DatasetLogInfo {
278 std::string fDataSet;
279 ULong64_t fRangeStart;
280 ULong64_t fRangeEnd;
281 unsigned int fSlot;
282};
283
284std::string LogRangeProcessing(const DatasetLogInfo &info)
285{
286 std::stringstream msg;
287 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
288 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
289 return msg.str();
290}
291
292DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
293{
294 const auto tree = r.GetTree();
295 const auto chain = dynamic_cast<TChain *>(tree);
296 std::string what;
297 if (chain) {
298 auto files = chain->GetListOfFiles();
299 std::vector<std::string> treeNames;
300 std::vector<std::string> fileNames;
301 for (TObject *f : *files) {
302 treeNames.emplace_back(f->GetName());
303 fileNames.emplace_back(f->GetTitle());
304 }
305 what = "trees {";
306 for (const auto &t : treeNames) {
307 what += t + ",";
308 }
309 what.back() = '}';
310 what += " in files {";
311 for (const auto &f : fileNames) {
312 what += f + ",";
313 }
314 what.back() = '}';
315 } else {
316 assert(tree != nullptr); // to make clang-tidy happy
317 const auto treeName = tree->GetName();
318 what = std::string("tree \"") + treeName + "\"";
319 const auto file = tree->GetCurrentFile();
320 if (file)
321 what += std::string(" in file \"") + file->GetName() + "\"";
322 }
323 const auto entryRange = r.GetEntriesRange();
324 const ULong64_t end = entryRange.second == -1ll ? tree->GetEntries() : entryRange.second;
325 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
326}
327
328auto MakeDatasetColReadersKey(const std::string &colName, const std::type_info &ti)
329{
330 // We use a combination of column name and column type name as the key because in some cases we might end up
331 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
332 // df.Sum<vector<int>>("stdVectorBranch");
333 // df.Sum<RVecI>("stdVectorBranch");
334 return colName + ':' + ti.name();
335}
336} // anonymous namespace
337
338namespace ROOT {
339namespace Detail {
340namespace RDF {
341
342/// A RAII object that calls RLoopManager::CleanUpTask at destruction
354
355} // namespace RDF
356} // namespace Detail
357} // namespace ROOT
358
359///////////////////////////////////////////////////////////////////////////////
360/// Get all the branches names, including the ones of the friend trees
362{
363 std::set<std::string> bNamesSet;
365 std::set<TTree *> analysedTrees;
366 std::string emptyFrName = "";
368 return bNames;
369}
370
372 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})),
373 fDefaultColumns(defaultBranches),
374 fNSlots(RDFInternal::GetNSlots()),
375 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
376 fNewSampleNotifier(fNSlots),
377 fSampleInfos(fNSlots),
378 fDatasetColumnReaders(fNSlots)
379{
380}
381
382RLoopManager::RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches)
383 : fTree(std::move(tree)),
384 fDefaultColumns(defaultBranches),
385 fNSlots(RDFInternal::GetNSlots()),
386 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
387 fNewSampleNotifier(fNSlots),
388 fSampleInfos(fNSlots),
389 fDatasetColumnReaders(fNSlots)
390{
391}
392
394 : fEmptyEntryRange(0, nEmptyEntries),
395 fNSlots(RDFInternal::GetNSlots()),
396 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
397 fNewSampleNotifier(fNSlots),
398 fSampleInfos(fNSlots),
399 fDatasetColumnReaders(fNSlots)
400{
401}
402
403RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
404 : fDefaultColumns(defaultBranches),
405 fNSlots(RDFInternal::GetNSlots()),
406 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
407 fDataSource(std::move(ds)),
408 fNewSampleNotifier(fNSlots),
409 fSampleInfos(fNSlots),
410 fDatasetColumnReaders(fNSlots)
411{
412 fDataSource->SetNSlots(fNSlots);
413}
414
416 : fNSlots(RDFInternal::GetNSlots()),
417 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
418 fNewSampleNotifier(fNSlots),
419 fSampleInfos(fNSlots),
420 fDatasetColumnReaders(fNSlots)
421{
422 ChangeSpec(std::move(spec));
423}
424
425#ifdef R__UNIX
426namespace {
427std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
428{
429 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
430 // If so, we create a redirected sample ID with the full xroot URL.
431 TString expandedUrl(path.data());
433 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
434 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
435 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
436 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
437 if (len > 0) {
438 std::string xurl(len, 0);
439 std::string fileNameFromUrl{fileurl.GetFile()};
440 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
441 // Sometimes the `getxattr` call may return an invalid URL due
442 // to the POSIX attribute not being yet completely filled by EOS.
443 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
444 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
445 return xurl + '/' + datasetName.data();
446 }
447 }
448 }
449 }
450 }
451
452 return std::nullopt;
453}
454} // namespace
455#endif
456
457/**
458 * @brief Changes the internal TTree held by the RLoopManager.
459 *
460 * @warning This method may lead to potentially dangerous interactions if used
461 * after the construction of the RDataFrame. Changing the specification makes
462 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
463 * new specification refers to exactly the same number of columns, with the
464 * same names and types. The actual use case of this method is moving the
465 * processing of the same RDataFrame to a different range of entries of the
466 * same dataset (which may be stored in a different set of files).
467 *
468 * @param spec The specification of the dataset to be adopted.
469 */
471{
472 // Change the range of entries to be processed
473 fBeginEntry = spec.GetEntryRangeBegin();
474 fEndEntry = spec.GetEntryRangeEnd();
475
476 // Store the samples
477 fSamples = spec.MoveOutSamples();
478 fSampleMap.clear();
479
480 // Create the internal main chain
482 for (auto &sample : fSamples) {
483 const auto &trees = sample.GetTreeNames();
484 const auto &files = sample.GetFileNameGlobs();
485 for (std::size_t i = 0ul; i < files.size(); ++i) {
486 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
487 // (see https://github.com/root-project/root/pull/8820 for why)
488 const auto fullpath = files[i] + "?#" + trees[i];
489 chain->Add(fullpath.c_str());
490 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
491 // change this easily because of backward compatibility: the sample ID
492 // is exposed to users via RSampleInfo and DefinePerSample).
493 const auto sampleId = files[i] + '/' + trees[i];
494 fSampleMap.insert({sampleId, &sample});
495#ifdef R__UNIX
496 // Also add redirected EOS xroot URL when available
498 fSampleMap.insert({redirectedSampleId.value(), &sample});
499#endif
500 }
501 }
502 SetTree(std::move(chain));
503
504 // Create friends from the specification and connect them to the main chain
505 const auto &friendInfo = spec.GetFriendInfo();
507 for (std::size_t i = 0ul; i < fFriends.size(); i++) {
508 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
509 fTree->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
510 }
511}
512
513/// Run event loop with no source files, in parallel.
515{
516#ifdef R__USE_IMT
518 // Working with an empty tree.
519 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
520 const auto nEmptyEntries = GetNEmptyEntries();
521 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
522 auto remainder = nEmptyEntries % (fNSlots * 2);
523 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
524 ULong64_t begin = fEmptyEntryRange.first;
525 while (begin < fEmptyEntryRange.second) {
526 ULong64_t end = begin + nEntriesPerSlot;
527 if (remainder > 0) {
528 ++end;
529 --remainder;
530 }
531 entryRanges.emplace_back(begin, end);
532 begin = end;
533 }
534
535 // Each task will generate a subrange of entries
536 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
538 auto slot = slotRAII.fSlot;
539 RCallCleanUpTask cleanup(*this, slot);
540 InitNodeSlots(nullptr, slot);
541 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
542 try {
544 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
546 }
547 } catch (...) {
548 // Error might throw in experiment frameworks like CMSSW
549 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
550 throw;
551 }
552 };
553
555 pool.Foreach(genFunction, entryRanges);
556
557#endif // not implemented otherwise
558}
559
560/// Run event loop with no source files, in sequence.
562{
563 InitNodeSlots(nullptr, 0);
565 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
566 RCallCleanUpTask cleanup(*this);
567 try {
572 }
573 } catch (...) {
574 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
575 throw;
576 }
577}
578
579namespace {
580/// Return true on succesful entry read.
581///
582/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
583/// there can be other situations where the read is still valid. For now, these
584/// are:
585/// - If there was no match of the current entry in one or more friend trees
586/// according to their respective indexes.
587/// - If there was a missing branch at the start of a new tree in the dataset.
588///
589/// In such situations, although the entry is not complete, the processing
590/// should not be aborted and nodes of the computation graph will take action
591/// accordingly.
593{
594 treeReader.Next();
595 switch (treeReader.GetEntryStatus()) {
596 case TTreeReader::kEntryValid: return true;
597 case TTreeReader::kIndexedFriendNoMatch: return true;
599 default: return false;
600 }
601}
602} // namespace
603
604/// Run event loop over one or multiple ROOT files, in parallel.
606{
607#ifdef R__USE_IMT
608 if (fEndEntry == fBeginEntry) // empty range => no work needed
609 return;
611 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
612 auto tp =
613 (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
614 ? std::make_unique<ROOT::TTreeProcessorMT>(*fTree, fNSlots, std::make_pair(fBeginEntry, fEndEntry),
616 : std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots, fSuppressErrorsForMissingBranches);
617
618 std::atomic<ULong64_t> entryCount(0ull);
619
620 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
622 auto slot = slotRAII.fSlot;
623 RCallCleanUpTask cleanup(*this, slot, &r);
626 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
627 const auto nEntries = entryRange.second - entryRange.first;
628 auto count = entryCount.fetch_add(nEntries);
629 try {
630 // recursive call to check filters and conditionally execute actions
631 while (validTTreeReaderRead(r)) {
634 }
635 RunAndCheckFilters(slot, count++);
636 }
637 } catch (...) {
638 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
639 throw;
640 }
641 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
642 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
643 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
644 // something went wrong in the TTreeReader event loop
645 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
646 std::to_string(r.GetEntryStatus()));
647 }
648 });
649#endif // no-op otherwise (will not be called)
650}
651
652/// Run event loop over one or multiple ROOT files, in sequence.
654{
655 TTreeReader r(fTree.get(), fTree->GetEntryList(), /*warnAboutLongerFriends*/ true,
657 if (0 == fTree->GetEntriesFast() || fBeginEntry == fEndEntry)
658 return;
659 // Apply the range if there is any
660 // In case of a chain with a total of N entries, calling SetEntriesRange(N + 1, ...) does not error out
661 // This is a bug, reported here: https://github.com/root-project/root/issues/10774
662 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
663 if (r.SetEntriesRange(fBeginEntry, fEndEntry) != TTreeReader::kEntryValid)
664 throw std::logic_error("Something went wrong in initializing the TTreeReader.");
665
666 RCallCleanUpTask cleanup(*this, 0u, &r);
667 InitNodeSlots(&r, 0);
669
670 // recursive call to check filters and conditionally execute actions
671 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
672 try {
675 UpdateSampleInfo(/*slot*/0, r);
676 }
677 RunAndCheckFilters(0, r.GetCurrentEntry());
678 }
679 } catch (...) {
680 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
681 throw;
682 }
683 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
684 // something went wrong in the TTreeReader event loop
685 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
686 std::to_string(r.GetEntryStatus()));
687 }
688}
689
690/// Run event loop over data accessed through a DataSource, in sequence.
692{
693 assert(fDataSource != nullptr);
694 fDataSource->Initialize();
695 auto ranges = fDataSource->GetEntryRanges();
696 while (!ranges.empty() && fNStopsReceived < fNChildren) {
697 InitNodeSlots(nullptr, 0u);
698 fDataSource->InitSlot(0u, 0ull);
699 RCallCleanUpTask cleanup(*this);
700 try {
701 for (const auto &range : ranges) {
702 const auto start = range.first;
703 const auto end = range.second;
704 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
705 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
706 if (fDataSource->SetEntry(0u, entry)) {
708 }
709 }
710 }
711 } catch (...) {
712 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
713 throw;
714 }
715 fDataSource->FinalizeSlot(0u);
716 ranges = fDataSource->GetEntryRanges();
717 }
718 fDataSource->Finalize();
719}
720
721/// Run event loop over data accessed through a DataSource, in parallel.
723{
724#ifdef R__USE_IMT
725 assert(fDataSource != nullptr);
728
729 // Each task works on a subrange of entries
730 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
732 const auto slot = slotRAII.fSlot;
733 InitNodeSlots(nullptr, slot);
734 RCallCleanUpTask cleanup(*this, slot);
735 fDataSource->InitSlot(slot, range.first);
736 const auto start = range.first;
737 const auto end = range.second;
738 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
739 try {
740 for (auto entry = start; entry < end; ++entry) {
741 if (fDataSource->SetEntry(slot, entry)) {
743 }
744 }
745 } catch (...) {
746 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
747 throw;
748 }
749 fDataSource->FinalizeSlot(slot);
750 };
751
752 fDataSource->Initialize();
753 auto ranges = fDataSource->GetEntryRanges();
754 while (!ranges.empty()) {
755 pool.Foreach(runOnRange, ranges);
756 ranges = fDataSource->GetEntryRanges();
757 }
758 fDataSource->Finalize();
759#endif // not implemented otherwise (never called)
760}
761
762/// Execute actions and make sure named filters are called for each event.
763/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
765{
766 // data-block callbacks run before the rest of the graph
768 for (auto &callback : fSampleCallbacks)
769 callback.second(slot, fSampleInfos[slot]);
771 }
772
773 for (auto *actionPtr : fBookedActions)
774 actionPtr->Run(slot, entry);
776 namedFilterPtr->CheckFilters(slot, entry);
777 for (auto &callback : fCallbacksEveryNEvents)
778 callback(slot);
779}
780
781/// Build TTreeReaderValues for all nodes
782/// This method loops over all filters, actions and other booked objects and
783/// calls their `InitSlot` method, to get them ready for running a task.
785{
787 for (auto *ptr : fBookedActions)
788 ptr->InitSlot(r, slot);
789 for (auto *ptr : fBookedFilters)
790 ptr->InitSlot(r, slot);
791 for (auto *ptr : fBookedDefines)
792 ptr->InitSlot(r, slot);
793 for (auto *ptr : fBookedVariations)
794 ptr->InitSlot(r, slot);
795
796 for (auto &callback : fCallbacksOnce)
797 callback(slot);
798}
799
801 if (r != nullptr) {
802 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
803 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
804 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
805 }
806 // Whatever the data source, initially set the "new data block" flag:
807 // - for TChains, this ensures that we don't skip the first data block because
808 // the correct tree is already loaded
809 // - for RDataSources and empty sources, which currently don't have data blocks, this
810 // ensures that we run once per task
812}
813
814void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
816 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
817}
818
820 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
821 auto *tree = r.GetTree()->GetTree();
822 R__ASSERT(tree != nullptr);
823 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
824 auto *file = tree->GetCurrentFile();
825 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
826
827 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
828 R__ASSERT(range.first >= 0);
829 if (range.second == -1) {
830 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
831 }
832 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
833 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
834 if (fSampleMap.empty()) {
836 } else {
837 if (fSampleMap.find(id) == fSampleMap.end())
838 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
840 }
841}
842
843/// Initialize all nodes of the functional graph before running the event loop.
844/// This method is called once per event-loop and performs generic initialization
845/// operations that do not depend on the specific processing slot (i.e. operations
846/// that are common for all threads).
848{
850 for (auto *filter : fBookedFilters)
851 filter->InitNode();
852 for (auto *range : fBookedRanges)
853 range->InitNode();
854 for (auto *ptr : fBookedActions)
855 ptr->Initialize();
856}
857
858/// Perform clean-up operations. To be called at the end of each event loop.
860{
861 fMustRunNamedFilters = false;
862
863 // forget RActions and detach TResultProxies
864 for (auto *ptr : fBookedActions)
865 ptr->Finalize();
866
867 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
868 fBookedActions.clear();
869
870 // reset children counts
871 fNChildren = 0;
872 fNStopsReceived = 0;
873 for (auto *ptr : fBookedFilters)
874 ptr->ResetChildrenCount();
875 for (auto *ptr : fBookedRanges)
876 ptr->ResetChildrenCount();
877
879 fCallbacksOnce.clear();
880}
881
882/// Perform clean-up operations. To be called at the end of each task execution.
884{
885 if (r != nullptr)
886 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
887 for (auto *ptr : fBookedActions)
888 ptr->FinalizeSlot(slot);
889 for (auto *ptr : fBookedFilters)
890 ptr->FinalizeSlot(slot);
891 for (auto *ptr : fBookedDefines)
892 ptr->FinalizeSlot(slot);
893
895 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
896 // because the TTreeReader object changes at every task
897 for (auto &v : fDatasetColumnReaders[slot])
898 v.second.reset();
899 }
900}
901
902/// Add RDF nodes that require just-in-time compilation to the computation graph.
903/// This method also clears the contents of GetCodeToJit().
905{
906 {
908 if (GetCodeToJit().empty()) {
909 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
910 return;
911 }
912 }
913
914 const std::string code = []() {
916 return std::move(GetCodeToJit());
917 }();
918
919 TStopwatch s;
920 s.Start();
921 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
922 s.Stop();
923 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
924 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
925 : " in less than 1ms.");
926}
927
928/// Trigger counting of number of children nodes for each node of the functional graph.
929/// This is done once before starting the event loop. Each action sends an `increase children count` signal
930/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
931/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
932/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
933/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
935{
936 for (auto *actionPtr : fBookedActions)
937 actionPtr->TriggerChildrenCount();
939 namedFilterPtr->TriggerChildrenCount();
940}
941
942/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
943/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
944/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
946{
947 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
948 MaxTreeSizeRAII ctxtmts;
949
950 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
951
953
954 if (jit)
955 Jit();
956
957 InitNodes();
958
959 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
960 // we use RAII: even in case of an exception, the destructor of the object is invoked and
961 // all the cleanup takes place.
962 class NodesCleanerRAII {
964
965 public:
967 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
968 };
969
971
972 TStopwatch s;
973 s.Start();
974
975 switch (fLoopType) {
979 case ELoopType::kNoFiles: RunEmptySource(); break;
982 }
983 s.Stop();
984
985 fNRuns++;
986
987 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
988 << s.RealTime() << "s elapsed).";
989}
990
991/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
996
998{
999 return fTree.get();
1000}
1001
1007
1014
1016{
1017 fBookedFilters.emplace_back(filterPtr);
1018 if (filterPtr->HasName()) {
1019 fBookedNamedFilters.emplace_back(filterPtr);
1020 fMustRunNamedFilters = true;
1021 }
1022}
1023
1029
1031{
1032 fBookedRanges.emplace_back(rangePtr);
1033}
1034
1039
1041{
1042 fBookedDefines.emplace_back(ptr);
1043}
1044
1050
1055
1060
1061// dummy call, end of recursive chain of calls
1063{
1064 return true;
1065}
1066
1067/// Call `FillReport` on all booked filters
1069{
1070 for (const auto *fPtr : fBookedNamedFilters)
1071 fPtr->FillReport(rep);
1072}
1073
1074void RLoopManager::SetTree(std::shared_ptr<TTree> tree)
1075{
1076 fTree = std::move(tree);
1077
1078 TChain *ch = nullptr;
1079 if ((ch = dynamic_cast<TChain *>(fTree.get())))
1081}
1082
1083void RLoopManager::ToJitExec(const std::string &code) const
1084{
1086 GetCodeToJit().append(code);
1087}
1088
1089void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1090{
1091 if (everyNEvents == 0ull)
1092 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1093 else
1094 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1095}
1096
1097std::vector<std::string> RLoopManager::GetFiltersNames()
1098{
1099 std::vector<std::string> filters;
1100 for (auto *filter : fBookedFilters) {
1101 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1102 filters.push_back(name);
1103 }
1104 return filters;
1105}
1106
1107std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1108{
1109 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1110 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1111 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1112 return nodes;
1113}
1114
1115std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1116{
1117 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1118 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1119 std::copy(fRunActions.begin(), fRunActions.end(), it);
1120 return actions;
1121}
1122
1123std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1124 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1125{
1126 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1127 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1129 return duplicateRLoopManagerIt->second;
1130
1131 std::string name;
1132 if (fDataSource) {
1133 name = fDataSource->GetLabel();
1134 } else if (fTree) {
1135 name = fTree->GetName();
1136 if (name.empty())
1137 name = fTree->ClassName();
1138 } else {
1139 name = "Empty source\nEntries: " + std::to_string(GetNEmptyEntries());
1140 }
1141 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1143 visitedMap[(void *)this] = thisNode;
1144 return thisNode;
1145}
1146
1147////////////////////////////////////////////////////////////////////////////
1148/// Return all valid TTree::Branch names (caching results for subsequent calls).
1149/// Never use fBranchNames directy, always request it through this method.
1151{
1152 if (fValidBranchNames.empty() && fTree) {
1153 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
1154 }
1155 return fValidBranchNames;
1156}
1157
1158/// Return true if AddDataSourceColumnReaders was called for column name col.
1159bool RLoopManager::HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
1160{
1161 const auto key = MakeDatasetColReadersKey(col, ti);
1162 assert(fDataSource != nullptr);
1163 // since data source column readers are always added for all slots at the same time,
1164 // if the reader is present for slot 0 we have it for all other slots as well.
1165 return fDatasetColumnReaders[0].find(key) != fDatasetColumnReaders[0].end();
1166}
1167
1169 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1170 const std::type_info &ti)
1171{
1172 const auto key = MakeDatasetColReadersKey(col, ti);
1173 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1174 assert(readers.size() == fNSlots);
1175
1176 for (auto slot = 0u; slot < fNSlots; ++slot) {
1177 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1178 }
1179}
1180
1181// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1182/// \brief Register a new RTreeColumnReader with this RLoopManager.
1183/// \return A shared pointer to the inserted column reader.
1184RColumnReaderBase *RLoopManager::AddTreeColumnReader(unsigned int slot, const std::string &col,
1185 std::unique_ptr<RColumnReaderBase> &&reader,
1186 const std::type_info &ti)
1187{
1189 const auto key = MakeDatasetColReadersKey(col, ti);
1190 // if a reader for this column and this slot was already there, we are doing something wrong
1191 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1192 auto *rptr = reader.get();
1193 readers[key] = std::move(reader);
1194 return rptr;
1195}
1196
1198RLoopManager::GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
1199{
1200 const auto key = MakeDatasetColReadersKey(col, ti);
1201 auto it = fDatasetColumnReaders[slot].find(key);
1202 if (it != fDatasetColumnReaders[slot].end())
1203 return it->second.get();
1204 else
1205 return nullptr;
1206}
1207
1209{
1210 if (callback)
1211 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1212}
1213
1214void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1215{
1216 fEmptyEntryRange = std::move(newRange);
1217}
1218
1220{
1221 fBeginEntry = begin;
1222 fEndEntry = end;
1223}
1224
1225/**
1226 * \brief Helper function to open a file (or the first file from a glob).
1227 * This function is used at construction time of an RDataFrame, to check the
1228 * concrete type of the dataset stored inside the file.
1229 */
1230std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
1231{
1232 // Follow same logic in TChain::Add to find the correct string to look for globbing:
1233 // - If the extension ".root" is present in the file name, pass along the basename.
1234 // - If not, use the "?" token to delimit the part of the string which represents the basename.
1235 // - Otherwise, pass the full filename.
1236 auto &&baseNameAndQuery = [&fileNameGlob]() {
1237 constexpr std::string_view delim{".root"};
1238 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
1239 it != fileNameGlob.end()) {
1240 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
1241 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
1242 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
1243 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
1244 else
1245 return std::make_pair(fileNameGlob, std::string_view{});
1246 }();
1247 // Captured structured bindings variable are only valid since C++20
1248 auto &&baseName = baseNameAndQuery.first;
1249 auto &&query = baseNameAndQuery.second;
1250
1251 const auto nameHasWildcard = [&baseName]() {
1252 constexpr std::array<char, 4> wildCards{'[', ']', '*', '?'}; // Wildcards accepted by TChain::Add
1253 return std::any_of(wildCards.begin(), wildCards.end(),
1254 [&baseName](auto &&wc) { return baseName.find(wc) != std::string_view::npos; });
1255 }();
1256
1257 // Open first file in case of glob, suppose all files in the glob use the same data format
1258 std::string fileToOpen{nameHasWildcard
1259 ? ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName})[0] + std::string{query}
1260 : fileNameGlob};
1261
1262 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
1263 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
1264 if (!inFile || inFile->IsZombie())
1265 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
1266
1267 return inFile;
1268}
1269
1270std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1273{
1274 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1275 // Creating an RDataFrame with a non-existing file will throw early rather
1276 // than wait for the start of the graph execution.
1277 if (checkFile) {
1279 }
1280 std::string datasetNameInt{datasetName};
1281 std::string fileNameGlobInt{fileNameGlob};
1283 chain->Add(fileNameGlobInt.c_str());
1284 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1285 return lm;
1286}
1287
1288std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1289ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1290 const std::vector<std::string> &defaultColumns, bool checkFile)
1291{
1292 if (fileNameGlobs.size() == 0)
1293 throw std::invalid_argument("RDataFrame: empty list of input files.");
1294 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1295 // Creating an RDataFrame with a non-existing file will throw early rather
1296 // than wait for the start of the graph execution.
1297 if (checkFile) {
1299 }
1300 std::string treeNameInt(datasetName);
1302 for (auto &f : fileNameGlobs)
1303 chain->Add(f.c_str());
1304 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1305 return lm;
1306}
1307
1308#ifdef R__HAS_ROOT7
1309std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1310ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob,
1312{
1313 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlob);
1314 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1315 return lm;
1316}
1317
1318std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1319ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1321{
1322 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlobs);
1323 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1324 return lm;
1325}
1326
1327std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1328ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob,
1330{
1331
1333
1334 if (inFile->Get<TTree>(datasetName.data())) {
1335 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1336 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1338 }
1339
1340 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1341 "\" in file \"" + inFile->GetName() + "\".");
1342}
1343
1344std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1345ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1347{
1348
1349 if (fileNameGlobs.size() == 0)
1350 throw std::invalid_argument("RDataFrame: empty list of input files.");
1351
1353
1354 if (inFile->Get<TTree>(datasetName.data())) {
1355 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1356 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1358 }
1359
1360 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1361 "\" in file \"" + inFile->GetName() + "\".");
1362}
1363#endif
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:365
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< std::string > fSuppressErrorsForMissingBranches
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A Branch for the case of an object.
static TClass * Class()
A TTree is a list of TBranches.
Definition TBranch.h:93
static TClass * Class()
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4094
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:139
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1274
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4841
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5294
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9197
virtual TObjArray * GetListOfBranches()
Definition TTree.h:528
virtual TTree * GetTree() const
Definition TTree.h:557
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6032
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::Experimental::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:301
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:189
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:345
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
R__EXTERN TVirtualRWMutex * gCoreMutex
static const char * what
Definition stlLoader.cc:5
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.