Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
20#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
21#include "ROOT/RLogger.hxx"
22#include "RtypesCore.h" // Long64_t
23#include "TStopwatch.h"
24#include "TBranchElement.h"
25#include "TBranchObject.h"
26#include "TChain.h"
27#include "TEntryList.h"
28#include "TFile.h"
29#include "TFriendElement.h"
30#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
31#include "TTreeReader.h"
32#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
33
34#ifdef R__USE_IMT
37#include "ROOT/RSlotStack.hxx"
38#endif
39
40#ifdef R__HAS_ROOT7
41#include "ROOT/RNTuple.hxx"
42#include "ROOT/RNTupleDS.hxx"
43#endif
44
45#ifdef R__UNIX
46// Functions needed to perform EOS XRootD redirection in ChangeSpec
47#include "TEnv.h"
48#include "TSystem.h"
49#ifndef R__FBSD
50#include <sys/xattr.h>
51#else
52#include <sys/extattr.h>
53#endif
54#ifdef R__MACOSX
55/* On macOS getxattr takes two extra arguments that should be set to 0 */
56#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
57#endif
58#ifdef R__FBSD
59#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
60#endif
61#endif
62
63#include <algorithm>
64#include <atomic>
65#include <cassert>
66#include <functional>
67#include <iostream>
68#include <memory>
69#include <stdexcept>
70#include <string>
71#include <sstream>
72#include <thread>
73#include <unordered_map>
74#include <vector>
75#include <set>
76#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
77
78using namespace ROOT::Detail::RDF;
79using namespace ROOT::Internal::RDF;
80
81namespace {
82/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
83/// This allows different RLoopManager instances to share these data.
84/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
85/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
86/// much faster than jitting each RLoopManager's code separately.
87std::string &GetCodeToJit()
88{
89 static std::string code;
90 return code;
91}
92
93bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
94{
95 return (leaves.find(leaf) != leaves.end());
96}
97
98///////////////////////////////////////////////////////////////////////////////
99/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
100/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
101void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
102 const std::string &friendName, bool allowDuplicates)
103{
104 if (!friendName.empty()) {
105 // In case of a friend tree, users might prepend its name/alias to the branch names
106 const auto friendBName = friendName + "." + branchName;
107 if (bNamesReg.insert(friendBName).second)
108 bNames.push_back(friendBName);
109 }
110
111 if (allowDuplicates || friendName.empty()) {
112 if (bNamesReg.insert(branchName).second)
113 bNames.push_back(branchName);
114 }
115}
116
117///////////////////////////////////////////////////////////////////////////////
118/// This overload makes sure that the TLeaf has not been already inserted.
119void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
120 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
121 bool allowDuplicates)
122{
124 if (!canAdd) {
125 return;
126 }
127
129
130 foundLeaves.insert(leaf);
131}
132
133void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
134 std::string prefix, std::string &friendName, bool allowDuplicates)
135{
136 for (auto sb : *b->GetListOfBranches()) {
137 TBranch *subBranch = static_cast<TBranch *>(sb);
138 auto subBranchName = std::string(subBranch->GetName());
139 auto fullName = prefix + subBranchName;
140
141 std::string newPrefix;
142 if (!prefix.empty())
143 newPrefix = fullName + ".";
144
146
147 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
149 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
153
154 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.GetBranch(subBranchName.c_str()))
156 }
157}
158
159void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
160 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
161{
162 std::set<TLeaf *> foundLeaves;
163 if (!analysedTrees.insert(&t).second) {
164 return;
165 }
166
167 const auto branches = t.GetListOfBranches();
168 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
169 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
170 // operations
171 if (!t.GetTree()) {
172 std::string err("GetBranchNames: error in opening the tree ");
173 err += t.GetName();
174 throw std::runtime_error(err);
175 }
176 if (branches) {
177 for (auto b : *branches) {
178 TBranch *branch = static_cast<TBranch *>(b);
179 const auto branchName = std::string(branch->GetName());
180 if (branch->IsA() == TBranch::Class()) {
181 // Leaf list
182 auto listOfLeaves = branch->GetListOfLeaves();
183 if (listOfLeaves->GetEntriesUnsafe() == 1) {
184 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
186 }
187
188 for (auto leaf : *listOfLeaves) {
189 auto castLeaf = static_cast<TLeaf *>(leaf);
190 const auto leafName = std::string(leaf->GetName());
191 const auto fullName = branchName + "." + leafName;
193 }
194 } else if (branch->IsA() == TBranchObject::Class()) {
195 // TBranchObject
198 } else {
199 // TBranchElement
200 // Check if there is explicit or implicit dot in the name
201
202 bool dotIsImplied = false;
203 auto be = dynamic_cast<TBranchElement *>(b);
204 if (!be)
205 throw std::runtime_error("GetBranchNames: unsupported branch type");
206 // TClonesArray (3) and STL collection (4)
207 if (be->GetType() == 3 || be->GetType() == 4)
208 dotIsImplied = true;
209
210 if (dotIsImplied || branchName.back() == '.')
212 else
214
216 }
217 }
218 }
219
220 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
221 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
222 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
223 auto friendTrees = t.GetTree()->GetListOfFriends();
224
225 if (!friendTrees)
226 return;
227
228 for (auto friendTreeObj : *friendTrees) {
229 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
230
231 std::string frName;
233 if (alias != nullptr)
234 frName = std::string(alias);
235 else
236 frName = std::string(friendTree->GetName());
237
239 }
240}
241
242void ThrowIfNSlotsChanged(unsigned int nSlots)
243{
245 if (currentSlots != nSlots) {
246 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
247 std::to_string(nSlots) + ", but when starting the event loop it was " +
248 std::to_string(currentSlots) + ".";
249 if (currentSlots > nSlots)
250 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
251 else
252 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
253 throw std::runtime_error(msg);
254 }
255}
256
257/**
258\struct MaxTreeSizeRAII
259\brief Scope-bound change of `TTree::fgMaxTreeSize`.
260
261This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
262changes it to maximum at construction time and restores it back at destruction
263time. Needed for issue #6523 and should be reverted when #6640 will be solved.
264*/
265struct MaxTreeSizeRAII {
266 Long64_t fOldMaxTreeSize;
267
268 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
269 {
270 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
271 }
272
273 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
274};
275
276struct DatasetLogInfo {
277 std::string fDataSet;
278 ULong64_t fRangeStart;
279 ULong64_t fRangeEnd;
280 unsigned int fSlot;
281};
282
283std::string LogRangeProcessing(const DatasetLogInfo &info)
284{
285 std::stringstream msg;
286 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
287 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
288 return msg.str();
289}
290
291DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
292{
295 std::string what;
296 if (chain) {
297 auto files = chain->GetListOfFiles();
298 std::vector<std::string> treeNames;
299 std::vector<std::string> fileNames;
300 for (TObject *f : *files) {
301 treeNames.emplace_back(f->GetName());
302 fileNames.emplace_back(f->GetTitle());
303 }
304 what = "trees {";
305 for (const auto &t : treeNames) {
306 what += t + ",";
307 }
308 what.back() = '}';
309 what += " in files {";
310 for (const auto &f : fileNames) {
311 what += f + ",";
312 }
313 what.back() = '}';
314 } else {
315 assert(tree != nullptr); // to make clang-tidy happy
317 what = std::string("tree \"") + treeName + "\"";
319 if (file)
320 what += std::string(" in file \"") + file->GetName() + "\"";
321 }
322 const auto entryRange = r.GetEntriesRange();
324 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
325}
326
327auto MakeDatasetColReadersKey(const std::string &colName, const std::type_info &ti)
328{
329 // We use a combination of column name and column type name as the key because in some cases we might end up
330 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
331 // df.Sum<vector<int>>("stdVectorBranch");
332 // df.Sum<RVecI>("stdVectorBranch");
333 return colName + ':' + ti.name();
334}
335} // anonymous namespace
336
337namespace ROOT {
338namespace Detail {
339namespace RDF {
340
341/// A RAII object that calls RLoopManager::CleanUpTask at destruction
353
354} // namespace RDF
355} // namespace Detail
356} // namespace ROOT
357
358///////////////////////////////////////////////////////////////////////////////
359/// Get all the branches names, including the ones of the friend trees
361{
362 std::set<std::string> bNamesSet;
364 std::set<TTree *> analysedTrees;
365 std::string emptyFrName = "";
367 return bNames;
368}
369
371 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})),
372 fDefaultColumns(defaultBranches),
373 fNSlots(RDFInternal::GetNSlots()),
374 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
375 fNewSampleNotifier(fNSlots),
376 fSampleInfos(fNSlots),
377 fDatasetColumnReaders(fNSlots)
378{
379}
380
381RLoopManager::RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches)
382 : fTree(std::move(tree)),
383 fDefaultColumns(defaultBranches),
384 fNSlots(RDFInternal::GetNSlots()),
385 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
386 fNewSampleNotifier(fNSlots),
387 fSampleInfos(fNSlots),
388 fDatasetColumnReaders(fNSlots)
389{
390}
391
393 : fEmptyEntryRange(0, nEmptyEntries),
394 fNSlots(RDFInternal::GetNSlots()),
395 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
396 fNewSampleNotifier(fNSlots),
397 fSampleInfos(fNSlots),
398 fDatasetColumnReaders(fNSlots)
399{
400}
401
402RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
403 : fDefaultColumns(defaultBranches),
404 fNSlots(RDFInternal::GetNSlots()),
405 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
406 fDataSource(std::move(ds)),
407 fNewSampleNotifier(fNSlots),
408 fSampleInfos(fNSlots),
409 fDatasetColumnReaders(fNSlots)
410{
411 fDataSource->SetNSlots(fNSlots);
412}
413
415 : fNSlots(RDFInternal::GetNSlots()),
416 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
417 fNewSampleNotifier(fNSlots),
418 fSampleInfos(fNSlots),
419 fDatasetColumnReaders(fNSlots)
420{
421 ChangeSpec(std::move(spec));
422}
423
424#ifdef R__UNIX
425namespace {
426std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
427{
428 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
429 // If so, we create a redirected sample ID with the full xroot URL.
430 TString expandedUrl(path.data());
432 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
433 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
434 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
435 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
436 if (len > 0) {
437 std::string xurl(len, 0);
438 std::string fileNameFromUrl{fileurl.GetFile()};
439 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
440 // Sometimes the `getxattr` call may return an invalid URL due
441 // to the POSIX attribute not being yet completely filled by EOS.
442 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
443 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
444 return xurl + '/' + datasetName.data();
445 }
446 }
447 }
448 }
449 }
450
451 return std::nullopt;
452}
453} // namespace
454#endif
455
456/**
457 * @brief Changes the internal TTree held by the RLoopManager.
458 *
459 * @warning This method may lead to potentially dangerous interactions if used
460 * after the construction of the RDataFrame. Changing the specification makes
461 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
462 * new specification refers to exactly the same number of columns, with the
463 * same names and types. The actual use case of this method is moving the
464 * processing of the same RDataFrame to a different range of entries of the
465 * same dataset (which may be stored in a different set of files).
466 *
467 * @param spec The specification of the dataset to be adopted.
468 */
470{
471 // Change the range of entries to be processed
472 fBeginEntry = spec.GetEntryRangeBegin();
473 fEndEntry = spec.GetEntryRangeEnd();
474
475 // Store the samples
476 fSamples = spec.MoveOutSamples();
477 fSampleMap.clear();
478
479 // Create the internal main chain
481 for (auto &sample : fSamples) {
482 const auto &trees = sample.GetTreeNames();
483 const auto &files = sample.GetFileNameGlobs();
484 for (std::size_t i = 0ul; i < files.size(); ++i) {
485 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
486 // (see https://github.com/root-project/root/pull/8820 for why)
487 const auto fullpath = files[i] + "?#" + trees[i];
488 chain->Add(fullpath.c_str());
489 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
490 // change this easily because of backward compatibility: the sample ID
491 // is exposed to users via RSampleInfo and DefinePerSample).
492 const auto sampleId = files[i] + '/' + trees[i];
493 fSampleMap.insert({sampleId, &sample});
494#ifdef R__UNIX
495 // Also add redirected EOS xroot URL when available
497 fSampleMap.insert({redirectedSampleId.value(), &sample});
498#endif
499 }
500 }
501 SetTree(std::move(chain));
502
503 // Create friends from the specification and connect them to the main chain
504 const auto &friendInfo = spec.GetFriendInfo();
506 for (std::size_t i = 0ul; i < fFriends.size(); i++) {
507 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
508 fTree->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
509 }
510}
511
512/// Run event loop with no source files, in parallel.
514{
515#ifdef R__USE_IMT
517 // Working with an empty tree.
518 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
519 const auto nEmptyEntries = GetNEmptyEntries();
520 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
521 auto remainder = nEmptyEntries % (fNSlots * 2);
522 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
523 ULong64_t begin = fEmptyEntryRange.first;
524 while (begin < fEmptyEntryRange.second) {
525 ULong64_t end = begin + nEntriesPerSlot;
526 if (remainder > 0) {
527 ++end;
528 --remainder;
529 }
530 entryRanges.emplace_back(begin, end);
531 begin = end;
532 }
533
534 // Each task will generate a subrange of entries
535 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
537 auto slot = slotRAII.fSlot;
538 RCallCleanUpTask cleanup(*this, slot);
539 InitNodeSlots(nullptr, slot);
540 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
541 try {
543 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
545 }
546 } catch (...) {
547 // Error might throw in experiment frameworks like CMSSW
548 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
549 throw;
550 }
551 };
552
554 pool.Foreach(genFunction, entryRanges);
555
556#endif // not implemented otherwise
557}
558
559/// Run event loop with no source files, in sequence.
561{
562 InitNodeSlots(nullptr, 0);
564 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
565 RCallCleanUpTask cleanup(*this);
566 try {
571 }
572 } catch (...) {
573 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
574 throw;
575 }
576}
577
578namespace {
579/// Return true on succesful entry read.
580///
581/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
582/// there can be other situations where the read is still valid. For now, these
583/// are:
584/// - If there was no match of the current entry in one or more friend trees
585/// according to their respective indexes.
586/// - If there was a missing branch at the start of a new tree in the dataset.
587///
588/// In such situations, although the entry is not complete, the processing
589/// should not be aborted and nodes of the computation graph will take action
590/// accordingly.
592{
593 treeReader.Next();
594 switch (treeReader.GetEntryStatus()) {
595 case TTreeReader::kEntryValid: return true;
596 case TTreeReader::kIndexedFriendNoMatch: return true;
598 default: return false;
599 }
600}
601} // namespace
602
603/// Run event loop over one or multiple ROOT files, in parallel.
605{
606#ifdef R__USE_IMT
607 if (fEndEntry == fBeginEntry) // empty range => no work needed
608 return;
610 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
611 auto tp =
612 (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
613 ? std::make_unique<ROOT::TTreeProcessorMT>(*fTree, fNSlots, std::make_pair(fBeginEntry, fEndEntry),
615 : std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots, fSuppressErrorsForMissingBranches);
616
617 std::atomic<ULong64_t> entryCount(0ull);
618
619 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
621 auto slot = slotRAII.fSlot;
622 RCallCleanUpTask cleanup(*this, slot, &r);
625 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
626 const auto nEntries = entryRange.second - entryRange.first;
627 auto count = entryCount.fetch_add(nEntries);
628 try {
629 // recursive call to check filters and conditionally execute actions
630 while (validTTreeReaderRead(r)) {
633 }
634 RunAndCheckFilters(slot, count++);
635 }
636 } catch (...) {
637 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
638 throw;
639 }
640 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
641 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
642 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
643 // something went wrong in the TTreeReader event loop
644 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
645 std::to_string(r.GetEntryStatus()));
646 }
647 });
648
649 auto &&processedEntries = entryCount.load();
650 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
652 Warning("RDataFrame::Run",
653 "RDataFrame stopped processing after %lld entries, whereas an entry range (begin=%lld,end=%lld) was "
654 "requested. Consider adjusting the end value of the entry range to a maximum of %lld.",
656 }
657#endif // no-op otherwise (will not be called)
658}
659
660/// Run event loop over one or multiple ROOT files, in sequence.
662{
663 TTreeReader r(fTree.get(), fTree->GetEntryList(), /*warnAboutLongerFriends*/ true,
665 if (0 == fTree->GetEntriesFast() || fBeginEntry == fEndEntry)
666 return;
667 // Apply the range if there is any
668 // In case of a chain with a total of N entries, calling SetEntriesRange(N + 1, ...) does not error out
669 // This is a bug, reported here: https://github.com/root-project/root/issues/10774
670 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
671 if (r.SetEntriesRange(fBeginEntry, fEndEntry) != TTreeReader::kEntryValid)
672 throw std::logic_error("Something went wrong in initializing the TTreeReader.");
673
674 RCallCleanUpTask cleanup(*this, 0u, &r);
675 InitNodeSlots(&r, 0);
677
678 // recursive call to check filters and conditionally execute actions
679 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
681 try {
684 UpdateSampleInfo(/*slot*/0, r);
685 }
686 RunAndCheckFilters(0, r.GetCurrentEntry());
688 }
689 } catch (...) {
690 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
691 throw;
692 }
693 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
694 // something went wrong in the TTreeReader event loop
695 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
696 std::to_string(r.GetEntryStatus()));
697 }
698
699 if (fEndEntry != std::numeric_limits<Long64_t>::max() && (fEndEntry - fBeginEntry) > processedEntries) {
700 Warning("RDataFrame::Run",
701 "RDataFrame stopped processing after %lld entries, whereas an entry range (begin=%lld,end=%lld) was "
702 "requested. Consider adjusting the end value of the entry range to a maximum of %lld.",
704 }
705}
706
707namespace {
708struct DSRunRAII {
710 DSRunRAII(ROOT::RDF::RDataSource &ds) : fDS(ds) { fDS.Initialize(); }
711 ~DSRunRAII() { fDS.Finalize(); }
712};
713} // namespace
714
715/// Run event loop over data accessed through a DataSource, in sequence.
717{
718 assert(fDataSource != nullptr);
719 DSRunRAII _{*fDataSource};
720 auto ranges = fDataSource->GetEntryRanges();
721 while (!ranges.empty() && fNStopsReceived < fNChildren) {
722 InitNodeSlots(nullptr, 0u);
723 fDataSource->InitSlot(0u, 0ull);
724 RCallCleanUpTask cleanup(*this);
725 try {
726 for (const auto &range : ranges) {
727 const auto start = range.first;
728 const auto end = range.second;
729 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
730 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
731 if (fDataSource->SetEntry(0u, entry)) {
733 }
734 }
735 }
736 } catch (...) {
737 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
738 throw;
739 }
740 fDataSource->FinalizeSlot(0u);
741 ranges = fDataSource->GetEntryRanges();
742 }
743}
744
745/// Run event loop over data accessed through a DataSource, in parallel.
747{
748#ifdef R__USE_IMT
749 assert(fDataSource != nullptr);
752
753 // Each task works on a subrange of entries
754 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
756 const auto slot = slotRAII.fSlot;
757 InitNodeSlots(nullptr, slot);
758 RCallCleanUpTask cleanup(*this, slot);
759 fDataSource->InitSlot(slot, range.first);
760 const auto start = range.first;
761 const auto end = range.second;
762 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
763 try {
764 for (auto entry = start; entry < end; ++entry) {
765 if (fDataSource->SetEntry(slot, entry)) {
767 }
768 }
769 } catch (...) {
770 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
771 throw;
772 }
773 fDataSource->FinalizeSlot(slot);
774 };
775
776 DSRunRAII _{*fDataSource};
777 auto ranges = fDataSource->GetEntryRanges();
778 while (!ranges.empty()) {
779 pool.Foreach(runOnRange, ranges);
780 ranges = fDataSource->GetEntryRanges();
781 }
782#endif // not implemented otherwise (never called)
783}
784
785/// Execute actions and make sure named filters are called for each event.
786/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
788{
789 // data-block callbacks run before the rest of the graph
791 for (auto &callback : fSampleCallbacks)
792 callback.second(slot, fSampleInfos[slot]);
794 }
795
796 for (auto *actionPtr : fBookedActions)
797 actionPtr->Run(slot, entry);
799 namedFilterPtr->CheckFilters(slot, entry);
800 for (auto &callback : fCallbacksEveryNEvents)
801 callback(slot);
802}
803
804/// Build TTreeReaderValues for all nodes
805/// This method loops over all filters, actions and other booked objects and
806/// calls their `InitSlot` method, to get them ready for running a task.
808{
810 for (auto *ptr : fBookedActions)
811 ptr->InitSlot(r, slot);
812 for (auto *ptr : fBookedFilters)
813 ptr->InitSlot(r, slot);
814 for (auto *ptr : fBookedDefines)
815 ptr->InitSlot(r, slot);
816 for (auto *ptr : fBookedVariations)
817 ptr->InitSlot(r, slot);
818
819 for (auto &callback : fCallbacksOnce)
820 callback(slot);
821}
822
824 if (r != nullptr) {
825 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
826 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
827 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
828 }
829 // Whatever the data source, initially set the "new data block" flag:
830 // - for TChains, this ensures that we don't skip the first data block because
831 // the correct tree is already loaded
832 // - for RDataSources and empty sources, which currently don't have data blocks, this
833 // ensures that we run once per task
835}
836
837void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
839 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
840}
841
843 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
844 auto *tree = r.GetTree()->GetTree();
845 R__ASSERT(tree != nullptr);
846 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
847 auto *file = tree->GetCurrentFile();
848 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
849
850 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
851 R__ASSERT(range.first >= 0);
852 if (range.second == -1) {
853 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
854 }
855 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
856 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
857 if (fSampleMap.empty()) {
859 } else {
860 if (fSampleMap.find(id) == fSampleMap.end())
861 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
863 }
864}
865
866/// Initialize all nodes of the functional graph before running the event loop.
867/// This method is called once per event-loop and performs generic initialization
868/// operations that do not depend on the specific processing slot (i.e. operations
869/// that are common for all threads).
871{
873 for (auto *filter : fBookedFilters)
874 filter->InitNode();
875 for (auto *range : fBookedRanges)
876 range->InitNode();
877 for (auto *ptr : fBookedActions)
878 ptr->Initialize();
879}
880
881/// Perform clean-up operations. To be called at the end of each event loop.
883{
884 fMustRunNamedFilters = false;
885
886 // forget RActions and detach TResultProxies
887 for (auto *ptr : fBookedActions)
888 ptr->Finalize();
889
890 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
891 fBookedActions.clear();
892
893 // reset children counts
894 fNChildren = 0;
895 fNStopsReceived = 0;
896 for (auto *ptr : fBookedFilters)
897 ptr->ResetChildrenCount();
898 for (auto *ptr : fBookedRanges)
899 ptr->ResetChildrenCount();
900
902 fCallbacksOnce.clear();
903}
904
905/// Perform clean-up operations. To be called at the end of each task execution.
907{
908 if (r != nullptr)
909 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
910 for (auto *ptr : fBookedActions)
911 ptr->FinalizeSlot(slot);
912 for (auto *ptr : fBookedFilters)
913 ptr->FinalizeSlot(slot);
914 for (auto *ptr : fBookedDefines)
915 ptr->FinalizeSlot(slot);
916
918 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
919 // because the TTreeReader object changes at every task
920 for (auto &v : fDatasetColumnReaders[slot])
921 v.second.reset();
922 }
923}
924
925/// Add RDF nodes that require just-in-time compilation to the computation graph.
926/// This method also clears the contents of GetCodeToJit().
928{
929 {
931 if (GetCodeToJit().empty()) {
932 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
933 return;
934 }
935 }
936
937 const std::string code = []() {
939 return std::move(GetCodeToJit());
940 }();
941
942 TStopwatch s;
943 s.Start();
944 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
945 s.Stop();
946 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
947 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
948 : " in less than 1ms.");
949}
950
951/// Trigger counting of number of children nodes for each node of the functional graph.
952/// This is done once before starting the event loop. Each action sends an `increase children count` signal
953/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
954/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
955/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
956/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
958{
959 for (auto *actionPtr : fBookedActions)
960 actionPtr->TriggerChildrenCount();
962 namedFilterPtr->TriggerChildrenCount();
963}
964
965/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
966/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
967/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
969{
970 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
971 MaxTreeSizeRAII ctxtmts;
972
973 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
974
976
977 if (jit)
978 Jit();
979
980 InitNodes();
981
982 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
983 // we use RAII: even in case of an exception, the destructor of the object is invoked and
984 // all the cleanup takes place.
985 class NodesCleanerRAII {
987
988 public:
990 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
991 };
992
994
995 TStopwatch s;
996 s.Start();
997
998 switch (fLoopType) {
1002 case ELoopType::kNoFiles: RunEmptySource(); break;
1003 case ELoopType::kROOTFiles: RunTreeReader(); break;
1005 }
1006 s.Stop();
1007
1008 fNRuns++;
1009
1010 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
1011 << s.RealTime() << "s elapsed).";
1012}
1013
1014/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
1019
1021{
1022 return fTree.get();
1023}
1024
1030
1037
1039{
1040 fBookedFilters.emplace_back(filterPtr);
1041 if (filterPtr->HasName()) {
1042 fBookedNamedFilters.emplace_back(filterPtr);
1043 fMustRunNamedFilters = true;
1044 }
1045}
1046
1052
1054{
1055 fBookedRanges.emplace_back(rangePtr);
1056}
1057
1062
1064{
1065 fBookedDefines.emplace_back(ptr);
1066}
1067
1073
1078
1083
1084// dummy call, end of recursive chain of calls
1086{
1087 return true;
1088}
1089
1090/// Call `FillReport` on all booked filters
1092{
1093 for (const auto *fPtr : fBookedNamedFilters)
1094 fPtr->FillReport(rep);
1095}
1096
1097void RLoopManager::SetTree(std::shared_ptr<TTree> tree)
1098{
1099 fTree = std::move(tree);
1100
1101 TChain *ch = nullptr;
1102 if ((ch = dynamic_cast<TChain *>(fTree.get())))
1104}
1105
1106void RLoopManager::ToJitExec(const std::string &code) const
1107{
1109 GetCodeToJit().append(code);
1110}
1111
1112void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1113{
1114 if (everyNEvents == 0ull)
1115 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1116 else
1117 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1118}
1119
1120std::vector<std::string> RLoopManager::GetFiltersNames()
1121{
1122 std::vector<std::string> filters;
1123 for (auto *filter : fBookedFilters) {
1124 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1125 filters.push_back(name);
1126 }
1127 return filters;
1128}
1129
1130std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1131{
1132 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1133 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1134 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1135 return nodes;
1136}
1137
1138std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1139{
1140 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1141 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1142 std::copy(fRunActions.begin(), fRunActions.end(), it);
1143 return actions;
1144}
1145
1146std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1147 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1148{
1149 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1150 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1152 return duplicateRLoopManagerIt->second;
1153
1154 std::string name;
1155 if (fDataSource) {
1156 name = fDataSource->GetLabel();
1157 } else if (fTree) {
1158 name = fTree->GetName();
1159 if (name.empty())
1160 name = fTree->ClassName();
1161 } else {
1162 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1163 }
1164 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1166 visitedMap[(void *)this] = thisNode;
1167 return thisNode;
1168}
1169
1170////////////////////////////////////////////////////////////////////////////
1171/// Return all valid TTree::Branch names (caching results for subsequent calls).
1172/// Never use fBranchNames directy, always request it through this method.
1174{
1175 if (fValidBranchNames.empty() && fTree) {
1176 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
1177 }
1178 return fValidBranchNames;
1179}
1180
1181/// Return true if AddDataSourceColumnReaders was called for column name col.
1182bool RLoopManager::HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
1183{
1184 const auto key = MakeDatasetColReadersKey(col, ti);
1185 assert(fDataSource != nullptr);
1186 // since data source column readers are always added for all slots at the same time,
1187 // if the reader is present for slot 0 we have it for all other slots as well.
1188 return fDatasetColumnReaders[0].find(key) != fDatasetColumnReaders[0].end();
1189}
1190
1192 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1193 const std::type_info &ti)
1194{
1195 const auto key = MakeDatasetColReadersKey(col, ti);
1196 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1197 assert(readers.size() == fNSlots);
1198
1199 for (auto slot = 0u; slot < fNSlots; ++slot) {
1200 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1201 }
1202}
1203
1204// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1205/// \brief Register a new RTreeColumnReader with this RLoopManager.
1206/// \return A shared pointer to the inserted column reader.
1207RColumnReaderBase *RLoopManager::AddTreeColumnReader(unsigned int slot, const std::string &col,
1208 std::unique_ptr<RColumnReaderBase> &&reader,
1209 const std::type_info &ti)
1210{
1212 const auto key = MakeDatasetColReadersKey(col, ti);
1213 // if a reader for this column and this slot was already there, we are doing something wrong
1214 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1215 auto *rptr = reader.get();
1216 readers[key] = std::move(reader);
1217 return rptr;
1218}
1219
1221RLoopManager::GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
1222{
1223 const auto key = MakeDatasetColReadersKey(col, ti);
1224 auto it = fDatasetColumnReaders[slot].find(key);
1225 if (it != fDatasetColumnReaders[slot].end())
1226 return it->second.get();
1227 else
1228 return nullptr;
1229}
1230
1232{
1233 if (callback)
1234 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1235}
1236
1237void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1238{
1239 fEmptyEntryRange = std::move(newRange);
1240}
1241
1243{
1244 fBeginEntry = begin;
1245 fEndEntry = end;
1246}
1247
1248/**
1249 * \brief Helper function to open a file (or the first file from a glob).
1250 * This function is used at construction time of an RDataFrame, to check the
1251 * concrete type of the dataset stored inside the file.
1252 */
1253std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
1254{
1255 // Follow same logic in TChain::Add to find the correct string to look for globbing:
1256 // - If the extension ".root" is present in the file name, pass along the basename.
1257 // - If not, use the "?" token to delimit the part of the string which represents the basename.
1258 // - Otherwise, pass the full filename.
1259 auto &&baseNameAndQuery = [&fileNameGlob]() {
1260 constexpr std::string_view delim{".root"};
1261 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
1262 it != fileNameGlob.end()) {
1263 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
1264 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
1265 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
1266 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
1267 else
1268 return std::make_pair(fileNameGlob, std::string_view{});
1269 }();
1270 // Captured structured bindings variable are only valid since C++20
1271 auto &&baseName = baseNameAndQuery.first;
1272 auto &&query = baseNameAndQuery.second;
1273
1274 const auto nameHasWildcard = [&baseName]() {
1275 constexpr std::array<char, 4> wildCards{'[', ']', '*', '?'}; // Wildcards accepted by TChain::Add
1276 return std::any_of(wildCards.begin(), wildCards.end(),
1277 [&baseName](auto &&wc) { return baseName.find(wc) != std::string_view::npos; });
1278 }();
1279
1280 // Open first file in case of glob, suppose all files in the glob use the same data format
1281 std::string fileToOpen{nameHasWildcard
1282 ? ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName})[0] + std::string{query}
1283 : fileNameGlob};
1284
1285 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
1286 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
1287 if (!inFile || inFile->IsZombie())
1288 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
1289
1290 return inFile;
1291}
1292
1293std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1296{
1297 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1298 // Creating an RDataFrame with a non-existing file will throw early rather
1299 // than wait for the start of the graph execution.
1300 if (checkFile) {
1302 }
1303 std::string datasetNameInt{datasetName};
1304 std::string fileNameGlobInt{fileNameGlob};
1306 chain->Add(fileNameGlobInt.c_str());
1307 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1308 return lm;
1309}
1310
1311std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1312ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1313 const std::vector<std::string> &defaultColumns, bool checkFile)
1314{
1315 if (fileNameGlobs.size() == 0)
1316 throw std::invalid_argument("RDataFrame: empty list of input files.");
1317 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1318 // Creating an RDataFrame with a non-existing file will throw early rather
1319 // than wait for the start of the graph execution.
1320 if (checkFile) {
1322 }
1323 std::string treeNameInt(datasetName);
1325 for (auto &f : fileNameGlobs)
1326 chain->Add(f.c_str());
1327 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1328 return lm;
1329}
1330
1331#ifdef R__HAS_ROOT7
1332std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1333ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob,
1335{
1336 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlob);
1337 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1338 return lm;
1339}
1340
1341std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1342ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1344{
1345 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlobs);
1346 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1347 return lm;
1348}
1349
1350std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1351ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob,
1353{
1354
1356
1357 if (inFile->Get<TTree>(datasetName.data())) {
1358 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1359 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1361 }
1362
1363 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1364 "\" in file \"" + inFile->GetName() + "\".");
1365}
1366
1367std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1368ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1370{
1371
1372 if (fileNameGlobs.size() == 0)
1373 throw std::invalid_argument("RDataFrame: empty list of input files.");
1374
1376
1377 if (inFile->Get<TTree>(datasetName.data())) {
1378 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1379 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1381 }
1382
1383 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1384 "\" in file \"" + inFile->GetName() + "\".");
1385}
1386#endif
1387
1388// outlined to pin virtual table
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< std::string > fSuppressErrorsForMissingBranches
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void Initialize()
Convenience method called before starting an event-loop.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A Branch for the case of an object.
static TClass * Class()
A TTree is a list of TBranches.
Definition TBranch.h:93
static TClass * Class()
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4130
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:139
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1274
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:45
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4834
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5287
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9201
virtual TObjArray * GetListOfBranches()
Definition TTree.h:528
virtual TTree * GetTree() const
Definition TTree.h:557
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6025
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:301
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:187
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:345
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
R__EXTERN TVirtualRWMutex * gCoreMutex
static const char * what
Definition stlLoader.cc:5
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.