Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
20#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
21#include "ROOT/RLogger.hxx"
22#include "RtypesCore.h" // Long64_t
23#include "TStopwatch.h"
24#include "TBranchElement.h"
25#include "TBranchObject.h"
26#include "TChain.h"
27#include "TEntryList.h"
28#include "TFile.h"
29#include "TFriendElement.h"
30#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
31#include "TTreeReader.h"
32#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
33
34#include "ROOT/RTTreeDS.hxx"
35
36#ifdef R__USE_IMT
39#include "ROOT/RSlotStack.hxx"
40#endif
41
42#ifdef R__HAS_ROOT7
43#include "ROOT/RNTuple.hxx"
44#include "ROOT/RNTupleDS.hxx"
45#endif
46
47#ifdef R__UNIX
48// Functions needed to perform EOS XRootD redirection in ChangeSpec
49#include "TEnv.h"
50#include "TSystem.h"
51#ifndef R__FBSD
52#include <sys/xattr.h>
53#else
54#include <sys/extattr.h>
55#endif
56#ifdef R__MACOSX
57/* On macOS getxattr takes two extra arguments that should be set to 0 */
58#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
59#endif
60#ifdef R__FBSD
61#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
62#endif
63#endif
64
65#include <algorithm>
66#include <atomic>
67#include <cassert>
68#include <functional>
69#include <iostream>
70#include <memory>
71#include <stdexcept>
72#include <string>
73#include <sstream>
74#include <thread>
75#include <unordered_map>
76#include <vector>
77#include <set>
78#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
79
80using namespace ROOT::Detail::RDF;
81using namespace ROOT::Internal::RDF;
82
83namespace {
84/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
85/// This allows different RLoopManager instances to share these data.
86/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
87/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
88/// much faster than jitting each RLoopManager's code separately.
89std::string &GetCodeToJit()
90{
91 static std::string code;
92 return code;
93}
94
95bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
96{
97 return (leaves.find(leaf) != leaves.end());
98}
99
100///////////////////////////////////////////////////////////////////////////////
101/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
102/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
103void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
104 const std::string &friendName, bool allowDuplicates)
105{
106 if (!friendName.empty()) {
107 // In case of a friend tree, users might prepend its name/alias to the branch names
108 const auto friendBName = friendName + "." + branchName;
109 if (bNamesReg.insert(friendBName).second)
110 bNames.push_back(friendBName);
111 }
112
113 if (allowDuplicates || friendName.empty()) {
114 if (bNamesReg.insert(branchName).second)
115 bNames.push_back(branchName);
116 }
117}
118
119///////////////////////////////////////////////////////////////////////////////
120/// This overload makes sure that the TLeaf has not been already inserted.
121void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
122 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
123 bool allowDuplicates)
124{
126 if (!canAdd) {
127 return;
128 }
129
131
132 foundLeaves.insert(leaf);
133}
134
135void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
136 std::string prefix, std::string &friendName, bool allowDuplicates)
137{
138 for (auto sb : *b->GetListOfBranches()) {
139 TBranch *subBranch = static_cast<TBranch *>(sb);
140 auto subBranchName = std::string(subBranch->GetName());
141 auto fullName = prefix + subBranchName;
142
143 std::string newPrefix;
144 if (!prefix.empty())
145 newPrefix = fullName + ".";
146
148
149 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
151 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
155
156 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.GetBranch(subBranchName.c_str()))
158 }
159}
160
161void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
162 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
163{
164 std::set<TLeaf *> foundLeaves;
165 if (!analysedTrees.insert(&t).second) {
166 return;
167 }
168
169 const auto branches = t.GetListOfBranches();
170 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
171 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
172 // operations
173 if (!t.GetTree()) {
174 std::string err("GetBranchNames: error in opening the tree ");
175 err += t.GetName();
176 throw std::runtime_error(err);
177 }
178 if (branches) {
179 for (auto b : *branches) {
180 TBranch *branch = static_cast<TBranch *>(b);
181 const auto branchName = std::string(branch->GetName());
182 if (branch->IsA() == TBranch::Class()) {
183 // Leaf list
184 auto listOfLeaves = branch->GetListOfLeaves();
185 if (listOfLeaves->GetEntriesUnsafe() == 1) {
186 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
188 }
189
190 for (auto leaf : *listOfLeaves) {
191 auto castLeaf = static_cast<TLeaf *>(leaf);
192 const auto leafName = std::string(leaf->GetName());
193 const auto fullName = branchName + "." + leafName;
195 }
196 } else if (branch->IsA() == TBranchObject::Class()) {
197 // TBranchObject
200 } else {
201 // TBranchElement
202 // Check if there is explicit or implicit dot in the name
203
204 bool dotIsImplied = false;
205 auto be = dynamic_cast<TBranchElement *>(b);
206 if (!be)
207 throw std::runtime_error("GetBranchNames: unsupported branch type");
208 // TClonesArray (3) and STL collection (4)
209 if (be->GetType() == 3 || be->GetType() == 4)
210 dotIsImplied = true;
211
212 if (dotIsImplied || branchName.back() == '.')
214 else
216
218 }
219 }
220 }
221
222 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
223 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
224 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
225 auto friendTrees = t.GetTree()->GetListOfFriends();
226
227 if (!friendTrees)
228 return;
229
230 for (auto friendTreeObj : *friendTrees) {
231 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
232
233 std::string frName;
235 if (alias != nullptr)
236 frName = std::string(alias);
237 else
238 frName = std::string(friendTree->GetName());
239
241 }
242}
243
244void ThrowIfNSlotsChanged(unsigned int nSlots)
245{
247 if (currentSlots != nSlots) {
248 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
249 std::to_string(nSlots) + ", but when starting the event loop it was " +
250 std::to_string(currentSlots) + ".";
251 if (currentSlots > nSlots)
252 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
253 else
254 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
255 throw std::runtime_error(msg);
256 }
257}
258
259/**
260\struct MaxTreeSizeRAII
261\brief Scope-bound change of `TTree::fgMaxTreeSize`.
262
263This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
264changes it to maximum at construction time and restores it back at destruction
265time. Needed for issue #6523 and should be reverted when #6640 will be solved.
266*/
267struct MaxTreeSizeRAII {
268 Long64_t fOldMaxTreeSize;
269
270 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
271 {
272 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
273 }
274
275 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
276};
277
278struct DatasetLogInfo {
279 std::string fDataSet;
280 ULong64_t fRangeStart;
281 ULong64_t fRangeEnd;
282 unsigned int fSlot;
283};
284
285std::string LogRangeProcessing(const DatasetLogInfo &info)
286{
287 std::stringstream msg;
288 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
289 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
290 return msg.str();
291}
292
293DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
294{
295 const auto r.GetTree();
296 const auto tree);
297 std::string what;
298 if (chain) {
299 auto files = chain->GetListOfFiles();
300 std::vector<std::string> treeNames;
301 std::vector<std::string> fileNames;
302 for (TObject *f : *files) {
303 treeNames.emplace_back(f->GetName());
304 fileNames.emplace_back(f->GetTitle());
305 }
306 what = "trees {";
307 for (const auto &t : treeNames) {
308 what += t + ",";
309 }
310 what.back() = '}';
311 what += " in files {";
312 for (const auto &f : fileNames) {
313 what += f + ",";
314 }
315 what.back() = '}';
316 } else {
317 assert(tree != nullptr); // to make clang-tidy happy
318 const auto tree->GetName();
319 what = std::string("tree \"") + treeName + "\"";
320 const auto file = tree->GetCurrentFile();
321 if (file)
322 what += std::string(" in file \"") + file->GetName() + "\"";
323 }
324 const auto entryRange = r.GetEntriesRange();
325 const entryRange.second;
326 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
327}
328
329auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
330{
331 // We use a combination of column name and column type name as the key because in some cases we might end up
332 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
333 // df.Sum<vector<int>>("stdVectorBranch");
334 // df.Sum<RVecI>("stdVectorBranch");
335 return std::string(colName) + ':' + ti.name();
336}
337} // anonymous namespace
338
339namespace ROOT {
340namespace Detail {
341namespace RDF {
342
343/// A RAII object that calls RLoopManager::CleanUpTask at destruction
355
356} // namespace RDF
357} // namespace Detail
358} // namespace ROOT
359
360///////////////////////////////////////////////////////////////////////////////
361/// Get all the branches names, including the ones of the friend trees
363{
364 std::set<std::string> bNamesSet;
366 std::set<TTree *> analysedTrees;
367 std::string emptyFrName = "";
369 return bNames;
370}
371
372ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
373 : fDefaultColumns(defaultColumns),
374 fNSlots(RDFInternal::GetNSlots()),
375 fNewSampleNotifier(fNSlots),
376 fSampleInfos(fNSlots),
377 fDatasetColumnReaders(fNSlots)
378{
379}
380
382 : fDefaultColumns(defaultBranches),
383 fNSlots(RDFInternal::GetNSlots()),
384 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
385 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
386 fNewSampleNotifier(fNSlots),
387 fSampleInfos(fNSlots),
388 fDatasetColumnReaders(fNSlots)
389{
390 fDataSource->SetNSlots(fNSlots);
391}
392
394 : fEmptyEntryRange(0, nEmptyEntries),
395 fNSlots(RDFInternal::GetNSlots()),
396 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
397 fNewSampleNotifier(fNSlots),
398 fSampleInfos(fNSlots),
399 fDatasetColumnReaders(fNSlots)
400{
401}
402
403RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
404 : fDefaultColumns(defaultBranches),
405 fNSlots(RDFInternal::GetNSlots()),
406 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
407 fDataSource(std::move(ds)),
408 fNewSampleNotifier(fNSlots),
409 fSampleInfos(fNSlots),
410 fDatasetColumnReaders(fNSlots)
411{
412 fDataSource->SetNSlots(fNSlots);
413}
414
416 : fNSlots(RDFInternal::GetNSlots()),
417 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
418 fNewSampleNotifier(fNSlots),
419 fSampleInfos(fNSlots),
420 fDatasetColumnReaders(fNSlots)
421{
422 ChangeSpec(std::move(spec));
423}
424
425#ifdef R__UNIX
426namespace {
427std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
428{
429 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
430 // If so, we create a redirected sample ID with the full xroot URL.
431 TString expandedUrl(path.data());
433 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
434 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
435 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
436 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
437 if (len > 0) {
438 std::string xurl(len, 0);
439 std::string fileNameFromUrl{fileurl.GetFile()};
440 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
441 // Sometimes the `getxattr` call may return an invalid URL due
442 // to the POSIX attribute not being yet completely filled by EOS.
443 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
444 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
445 return xurl + '/' + datasetName.data();
446 }
447 }
448 }
449 }
450 }
451
452 return std::nullopt;
453}
454} // namespace
455#endif
456
457/**
458 * @brief Changes the internal TTree held by the RLoopManager.
459 *
460 * @warning This method may lead to potentially dangerous interactions if used
461 * after the construction of the RDataFrame. Changing the specification makes
462 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
463 * new specification refers to exactly the same number of columns, with the
464 * same names and types. The actual use case of this method is moving the
465 * processing of the same RDataFrame to a different range of entries of the
466 * same dataset (which may be stored in a different set of files).
467 *
468 * @param spec The specification of the dataset to be adopted.
469 */
471{
472 // Change the range of entries to be processed
473 fBeginEntry = spec.GetEntryRangeBegin();
474 fEndEntry = spec.GetEntryRangeEnd();
475
476 // Store the samples
477 fSamples = spec.MoveOutSamples();
478 fSampleMap.clear();
479
480 // Create the internal main chain
482 for (auto &sample : fSamples) {
483 const auto &trees = sample.GetTreeNames();
484 const auto &files = sample.GetFileNameGlobs();
485 for (std::size_t i = 0ul; i < files.size(); ++i) {
486 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
487 // (see https://github.com/root-project/root/pull/8820 for why)
488 const auto fullpath = files[i] + "?#" + trees[i];
489 chain->Add(fullpath.c_str());
490 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
491 // change this easily because of backward compatibility: the sample ID
492 // is exposed to users via RSampleInfo and DefinePerSample).
493 const auto sampleId = files[i] + '/' + trees[i];
494 fSampleMap.insert({sampleId, &sample});
495#ifdef R__UNIX
496 // Also add redirected EOS xroot URL when available
498 fSampleMap.insert({redirectedSampleId.value(), &sample});
499#endif
500 }
501 }
502 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
503 fDataSource->SetNSlots(fNSlots);
504 for (unsigned int slot{}; slot < fNSlots; slot++) {
505 for (auto &v : fDatasetColumnReaders[slot])
506 v.second.reset();
507 }
508}
509
510/// Run event loop with no source files, in parallel.
512{
513#ifdef R__USE_IMT
515 // Working with an empty tree.
516 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
517 const auto nEmptyEntries = GetNEmptyEntries();
518 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
519 auto remainder = nEmptyEntries % (fNSlots * 2);
520 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
521 ULong64_t begin = fEmptyEntryRange.first;
522 while (begin < fEmptyEntryRange.second) {
523 ULong64_t end = begin + nEntriesPerSlot;
524 if (remainder > 0) {
525 ++end;
526 --remainder;
527 }
528 entryRanges.emplace_back(begin, end);
529 begin = end;
530 }
531
532 // Each task will generate a subrange of entries
533 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
535 auto slot = slotRAII.fSlot;
536 RCallCleanUpTask cleanup(*this, slot);
537 InitNodeSlots(nullptr, slot);
538 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
539 try {
541 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
543 }
544 } catch (...) {
545 // Error might throw in experiment frameworks like CMSSW
546 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
547 throw;
548 }
549 };
550
552 pool.Foreach(genFunction, entryRanges);
553
554#endif // not implemented otherwise
555}
556
557/// Run event loop with no source files, in sequence.
559{
560 InitNodeSlots(nullptr, 0);
562 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
563 RCallCleanUpTask cleanup(*this);
564 try {
569 }
570 } catch (...) {
571 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
572 throw;
573 }
574}
575
576namespace {
577/// Return true on succesful entry read.
578///
579/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
580/// there can be other situations where the read is still valid. For now, these
581/// are:
582/// - If there was no match of the current entry in one or more friend trees
583/// according to their respective indexes.
584/// - If there was a missing branch at the start of a new tree in the dataset.
585///
586/// In such situations, although the entry is not complete, the processing
587/// should not be aborted and nodes of the computation graph will take action
588/// accordingly.
590{
591 treeReader.Next();
592 switch (treeReader.GetEntryStatus()) {
593 case TTreeReader::kEntryValid: return true;
594 case TTreeReader::kIndexedFriendNoMatch: return true;
596 default: return false;
597 }
598}
599} // namespace
600
601/// Run event loop over one or multiple ROOT files, in parallel.
603{
604#ifdef R__USE_IMT
605 if (fEndEntry == fBeginEntry) // empty range => no work needed
606 return;
608 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
609 auto tp =
610 (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
611 ? std::make_unique<ROOT::TTreeProcessorMT>(*fTree, fNSlots, std::make_pair(fBeginEntry, fEndEntry),
613 : std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots, fSuppressErrorsForMissingBranches);
614
615 std::atomic<ULong64_t> entryCount(0ull);
616
617 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
619 auto slot = slotRAII.fSlot;
620 RCallCleanUpTask cleanup(*this, slot, &r);
623 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
624 const auto nEntries = entryRange.second - entryRange.first;
625 auto count = entryCount.fetch_add(nEntries);
626 try {
627 // recursive call to check filters and conditionally execute actions
628 while (validTTreeReaderRead(r)) {
631 }
632 RunAndCheckFilters(slot, count++);
633 }
634 } catch (...) {
635 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
636 throw;
637 }
638 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
639 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
640 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
641 // something went wrong in the TTreeReader event loop
642 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
643 std::to_string(r.GetEntryStatus()));
644 }
645 });
646
647 auto &&processedEntries = entryCount.load();
648 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
650 Warning("RDataFrame::Run",
651 "RDataFrame stopped processing after %lld entries, whereas an entry range (begin=%lld,end=%lld) was "
652 "requested. Consider adjusting the end value of the entry range to a maximum of %lld.",
654 }
655#endif // no-op otherwise (will not be called)
656}
657
658/// Run event loop over one or multiple ROOT files, in sequence.
660{
661 TTreeReader r(fTree.get(), fTree->GetEntryList(), /*warnAboutLongerFriends*/ true,
663 if (0 == fTree->GetEntriesFast() || fBeginEntry == fEndEntry)
664 return;
665 // Apply the range if there is any
666 // In case of a chain with a total of N entries, calling SetEntriesRange(N + 1, ...) does not error out
667 // This is a bug, reported here: https://github.com/root-project/root/issues/10774
668 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
669 if (r.SetEntriesRange(fBeginEntry, fEndEntry) != TTreeReader::kEntryValid)
670 throw std::logic_error("Something went wrong in initializing the TTreeReader.");
671
672 RCallCleanUpTask cleanup(*this, 0u, &r);
673 InitNodeSlots(&r, 0);
675
676 // recursive call to check filters and conditionally execute actions
677 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
679 try {
682 UpdateSampleInfo(/*slot*/0, r);
683 }
684 RunAndCheckFilters(0, r.GetCurrentEntry());
686 }
687 } catch (...) {
688 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
689 throw;
690 }
691 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
692 // something went wrong in the TTreeReader event loop
693 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
694 std::to_string(r.GetEntryStatus()));
695 }
696
697 if (fEndEntry != std::numeric_limits<Long64_t>::max() && (fEndEntry - fBeginEntry) > processedEntries) {
698 Warning("RDataFrame::Run",
699 "RDataFrame stopped processing after %lld entries, whereas an entry range (begin=%lld,end=%lld) was "
700 "requested. Consider adjusting the end value of the entry range to a maximum of %lld.",
702 }
703}
704
705namespace {
706struct DSRunRAII {
708 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
709 {
711 }
712 ~DSRunRAII() { fDS.Finalize(); }
713};
714} // namespace
715
718 unsigned int fSlot;
721 TTreeReader *treeReader = nullptr)
722 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
723 {
724 fLM.InitNodeSlots(fTreeReader, fSlot);
725 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
726 }
728};
729
730/// Run event loop over data accessed through a DataSource, in sequence.
732{
733 assert(fDataSource != nullptr);
734 // Shortcut if the entry range would result in not reading anything
735 if (fBeginEntry == fEndEntry)
736 return;
737 // Apply global entry range if necessary
738 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
739 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
740 // Initialize data source and book finalization
742 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
743 // sources that need it (currently only TTree).
744 RCallCleanUpTask cleanup(*this);
745
746 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
747 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
748 // TTree).
749 std::uint64_t processedEntries{};
750 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
751 do {
752
754
755 ranges = fDataSource->GetEntryRanges();
756
758
759 try {
760 for (const auto &range : ranges) {
761 const auto start = range.first;
762 const auto end = range.second;
763 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
764 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
765 if (fDataSource->SetEntry(0u, entry)) {
767 }
769 }
770 }
771 } catch (...) {
772 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
773 throw;
774 }
775
776 } while (!ranges.empty() && fNStopsReceived < fNChildren);
777
779
780 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
781 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
782 std::ostringstream buf{};
783 buf << "RDataFrame stopped processing after ";
784 buf << processedEntries;
785 buf << " entries, whereas an entry range (begin=";
786 buf << fBeginEntry;
787 buf << ",end=";
788 buf << fEndEntry;
789 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
790 buf << (fBeginEntry + processedEntries);
791 buf << ".";
792 Warning("RDataFrame::Run", "%s", buf.str().c_str());
793 }
794}
795
796/// Run event loop over data accessed through a DataSource, in parallel.
798{
799#ifdef R__USE_IMT
800 assert(fDataSource != nullptr);
801 // Shortcut if the entry range would result in not reading anything
802 if (fBeginEntry == fEndEntry)
803 return;
804 // Apply global entry range if necessary
805 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
806 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
807
809
811
812#endif // not implemented otherwise (never called)
813}
814
815/// Execute actions and make sure named filters are called for each event.
816/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
818{
819 // data-block callbacks run before the rest of the graph
821 for (auto &callback : fSampleCallbacks)
822 callback.second(slot, fSampleInfos[slot]);
824 }
825
826 for (auto *actionPtr : fBookedActions)
827 actionPtr->Run(slot, entry);
829 namedFilterPtr->CheckFilters(slot, entry);
830 for (auto &callback : fCallbacksEveryNEvents)
831 callback(slot);
832}
833
834/// Build TTreeReaderValues for all nodes
835/// This method loops over all filters, actions and other booked objects and
836/// calls their `InitSlot` method, to get them ready for running a task.
838{
840 for (auto *ptr : fBookedActions)
841 ptr->InitSlot(r, slot);
842 for (auto *ptr : fBookedFilters)
843 ptr->InitSlot(r, slot);
844 for (auto *ptr : fBookedDefines)
845 ptr->InitSlot(r, slot);
846 for (auto *ptr : fBookedVariations)
847 ptr->InitSlot(r, slot);
848
849 for (auto &callback : fCallbacksOnce)
850 callback(slot);
851}
852
854 if (r != nullptr) {
855 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
856 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
857 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
858 }
859 // Whatever the data source, initially set the "new data block" flag:
860 // - for TChains, this ensures that we don't skip the first data block because
861 // the correct tree is already loaded
862 // - for RDataSources and empty sources, which currently don't have data blocks, this
863 // ensures that we run once per task
865}
866
867void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
869 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
870}
871
873 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
874 auto *tree = r.GetTree()->GetTree();
875 R__ASSERT(tree != nullptr);
876 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
877 auto *file = tree->GetCurrentFile();
878 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
879
880 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
881 R__ASSERT(range.first >= 0);
882 if (range.second == -1) {
883 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
884 }
885 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
886 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
887 if (fSampleMap.empty()) {
889 } else {
890 if (fSampleMap.find(id) == fSampleMap.end())
891 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
893 }
894}
895
896/// Initialize all nodes of the functional graph before running the event loop.
897/// This method is called once per event-loop and performs generic initialization
898/// operations that do not depend on the specific processing slot (i.e. operations
899/// that are common for all threads).
901{
903 for (auto *filter : fBookedFilters)
904 filter->InitNode();
905 for (auto *range : fBookedRanges)
906 range->InitNode();
907 for (auto *ptr : fBookedActions)
908 ptr->Initialize();
909}
910
911/// Perform clean-up operations. To be called at the end of each event loop.
913{
914 fMustRunNamedFilters = false;
915
916 // forget RActions and detach TResultProxies
917 for (auto *ptr : fBookedActions)
918 ptr->Finalize();
919
920 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
921 fBookedActions.clear();
922
923 // reset children counts
924 fNChildren = 0;
925 fNStopsReceived = 0;
926 for (auto *ptr : fBookedFilters)
927 ptr->ResetChildrenCount();
928 for (auto *ptr : fBookedRanges)
929 ptr->ResetChildrenCount();
930
932 fCallbacksOnce.clear();
933}
934
935/// Perform clean-up operations. To be called at the end of each task execution.
937{
938 if (r != nullptr)
939 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
940 for (auto *ptr : fBookedActions)
941 ptr->FinalizeSlot(slot);
942 for (auto *ptr : fBookedFilters)
943 ptr->FinalizeSlot(slot);
944 for (auto *ptr : fBookedDefines)
945 ptr->FinalizeSlot(slot);
946
947 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
948 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
949 // because the TTreeReader object changes at every task
950 for (auto &v : fDatasetColumnReaders[slot])
951 v.second.reset();
952 }
953}
954
955/// Add RDF nodes that require just-in-time compilation to the computation graph.
956/// This method also clears the contents of GetCodeToJit().
958{
959 {
961 if (GetCodeToJit().empty()) {
962 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
963 return;
964 }
965 }
966
967 const std::string code = []() {
969 return std::move(GetCodeToJit());
970 }();
971
972 TStopwatch s;
973 s.Start();
974 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
975 s.Stop();
976 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
977 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
978 : " in less than 1ms.");
979}
980
981/// Trigger counting of number of children nodes for each node of the functional graph.
982/// This is done once before starting the event loop. Each action sends an `increase children count` signal
983/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
984/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
985/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
986/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
988{
989 for (auto *actionPtr : fBookedActions)
990 actionPtr->TriggerChildrenCount();
992 namedFilterPtr->TriggerChildrenCount();
993}
994
995/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
996/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
997/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
999{
1000 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
1001 MaxTreeSizeRAII ctxtmts;
1002
1003 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
1004
1006
1007 if (jit)
1008 Jit();
1009
1010 InitNodes();
1011
1012 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
1013 // we use RAII: even in case of an exception, the destructor of the object is invoked and
1014 // all the cleanup takes place.
1015 class NodesCleanerRAII {
1017
1018 public:
1020 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
1021 };
1022
1024
1025 TStopwatch s;
1026 s.Start();
1027
1028 switch (fLoopType) {
1030 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
1031 break;
1035 case ELoopType::kNoFiles: RunEmptySource(); break;
1036 case ELoopType::kROOTFiles: RunTreeReader(); break;
1038 }
1039 s.Stop();
1040
1041 fNRuns++;
1042
1043 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
1044 << s.RealTime() << "s elapsed).";
1045}
1046
1047/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
1052
1054{
1055 return fTree.get();
1056}
1057
1063
1070
1072{
1073 fBookedFilters.emplace_back(filterPtr);
1074 if (filterPtr->HasName()) {
1075 fBookedNamedFilters.emplace_back(filterPtr);
1076 fMustRunNamedFilters = true;
1077 }
1078}
1079
1085
1087{
1088 fBookedRanges.emplace_back(rangePtr);
1089}
1090
1095
1097{
1098 fBookedDefines.emplace_back(ptr);
1099}
1100
1106
1111
1116
1117// dummy call, end of recursive chain of calls
1119{
1120 return true;
1121}
1122
1123/// Call `FillReport` on all booked filters
1125{
1126 for (const auto *fPtr : fBookedNamedFilters)
1127 fPtr->FillReport(rep);
1128}
1129
1130void RLoopManager::SetTree(std::shared_ptr<TTree> tree)
1131{
1132 fTree = std::move(tree);
1133
1134 TChain *ch = nullptr;
1135 if ((ch = dynamic_cast<TChain *>(fTree.get())))
1137}
1138
1139void RLoopManager::ToJitExec(const std::string &code) const
1140{
1142 GetCodeToJit().append(code);
1143}
1144
1145void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1146{
1147 if (everyNEvents == 0ull)
1148 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1149 else
1150 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1151}
1152
1153std::vector<std::string> RLoopManager::GetFiltersNames()
1154{
1155 std::vector<std::string> filters;
1156 for (auto *filter : fBookedFilters) {
1157 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1158 filters.push_back(name);
1159 }
1160 return filters;
1161}
1162
1163std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1164{
1165 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1166 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1167 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1168 return nodes;
1169}
1170
1171std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1172{
1173 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1174 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1175 std::copy(fRunActions.begin(), fRunActions.end(), it);
1176 return actions;
1177}
1178
1179std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1180 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1181{
1182 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1183 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1185 return duplicateRLoopManagerIt->second;
1186
1187 std::string name;
1188 if (fDataSource) {
1189 name = fDataSource->GetLabel();
1190 } else if (fTree) {
1191 name = fTree->GetName();
1192 if (name.empty())
1193 name = fTree->ClassName();
1194 } else {
1195 name = "Empty source\nEntries: " + std::to_string(GetNEmptyEntries());
1196 }
1197 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1199 visitedMap[(void *)this] = thisNode;
1200 return thisNode;
1201}
1202
1203////////////////////////////////////////////////////////////////////////////
1204/// Return all valid TTree::Branch names (caching results for subsequent calls).
1205/// Never use fBranchNames directy, always request it through this method.
1207{
1208 if (fValidBranchNames.empty() && fTree) {
1209 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
1210 }
1211 if (fValidBranchNames.empty() && fDataSource) {
1212 fValidBranchNames = fDataSource->GetColumnNames();
1213 }
1214 return fValidBranchNames;
1215}
1216
1217/// Return true if AddDataSourceColumnReaders was called for column name col.
1218bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
1219{
1220 const auto key = MakeDatasetColReadersKey(col, ti);
1221 assert(fDataSource != nullptr);
1222 // since data source column readers are always added for all slots at the same time,
1223 // if the reader is present for slot 0 we have it for all other slots as well.
1224 auto it = fDatasetColumnReaders[0].find(key);
1225 return (it != fDatasetColumnReaders[0].end() && it->second);
1226}
1227
1229 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1230 const std::type_info &ti)
1231{
1232 const auto key = MakeDatasetColReadersKey(col, ti);
1233 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1234 assert(readers.size() == fNSlots);
1235
1236 for (auto slot = 0u; slot < fNSlots; ++slot) {
1237 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1238 }
1239}
1240
1241// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1242/// \brief Register a new RTreeColumnReader with this RLoopManager.
1243/// \return A shared pointer to the inserted column reader.
1245 std::unique_ptr<RColumnReaderBase> &&reader,
1246 const std::type_info &ti)
1247{
1249 const auto key = MakeDatasetColReadersKey(col, ti);
1250 // if a reader for this column and this slot was already there, we are doing something wrong
1251 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1252 auto *rptr = reader.get();
1253 readers[key] = std::move(reader);
1254 return rptr;
1255}
1256
1258 const std::type_info &ti, TTreeReader *treeReader)
1259{
1261 const auto key = MakeDatasetColReadersKey(col, ti);
1262 // if a reader for this column and this slot was already there, we are doing something wrong
1263 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1264 assert(fDataSource && "Missing RDataSource to add column reader.");
1265
1267
1268 return readers[key].get();
1269}
1270
1272RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1273{
1274 const auto key = MakeDatasetColReadersKey(col, ti);
1275 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1276 return it->second.get();
1277 else
1278 return nullptr;
1279}
1280
1282{
1283 if (callback)
1284 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1285}
1286
1287void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1288{
1289 fEmptyEntryRange = std::move(newRange);
1290}
1291
1293{
1294 fBeginEntry = begin;
1295 fEndEntry = end;
1296}
1297
1299{
1300 fTTreeLifeline = std::move(lifeline);
1301}
1302
1303/**
1304 * \brief Helper function to open a file (or the first file from a glob).
1305 * This function is used at construction time of an RDataFrame, to check the
1306 * concrete type of the dataset stored inside the file.
1307 */
1308std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
1309{
1310 // Follow same logic in TChain::Add to find the correct string to look for globbing:
1311 // - If the extension ".root" is present in the file name, pass along the basename.
1312 // - If not, use the "?" token to delimit the part of the string which represents the basename.
1313 // - Otherwise, pass the full filename.
1314 auto &&baseNameAndQuery = [&fileNameGlob]() {
1315 constexpr std::string_view delim{".root"};
1316 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
1317 it != fileNameGlob.end()) {
1318 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
1319 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
1320 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
1321 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
1322 else
1323 return std::make_pair(fileNameGlob, std::string_view{});
1324 }();
1325 // Captured structured bindings variable are only valid since C++20
1326 auto &&baseName = baseNameAndQuery.first;
1327 auto &&query = baseNameAndQuery.second;
1328
1329 const auto nameHasWildcard = [&baseName]() {
1330 constexpr std::array<char, 4> wildCards{'[', ']', '*', '?'}; // Wildcards accepted by TChain::Add
1331 return std::any_of(wildCards.begin(), wildCards.end(),
1332 [&baseName](auto &&wc) { return baseName.find(wc) != std::string_view::npos; });
1333 }();
1334
1335 // Open first file in case of glob, suppose all files in the glob use the same data format
1336 std::string fileToOpen{nameHasWildcard
1337 ? ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName})[0] + std::string{query}
1338 : fileNameGlob};
1339
1340 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
1341 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
1342 if (!inFile || inFile->IsZombie())
1343 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
1344
1345 return inFile;
1346}
1347
1348std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1351{
1352 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1353 // Creating an RDataFrame with a non-existing file will throw early rather
1354 // than wait for the start of the graph execution.
1355 if (checkFile) {
1357 }
1358
1359 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1360 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1361 return lm;
1362}
1363
1364std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1365ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1366 const std::vector<std::string> &defaultColumns, bool checkFile)
1367{
1368 if (fileNameGlobs.size() == 0)
1369 throw std::invalid_argument("RDataFrame: empty list of input files.");
1370 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1371 // Creating an RDataFrame with a non-existing file will throw early rather
1372 // than wait for the start of the graph execution.
1373 if (checkFile) {
1375 }
1376 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1377 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1378 return lm;
1379}
1380
1381#ifdef R__HAS_ROOT7
1382std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1383ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob,
1385{
1386 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1387 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1388 return lm;
1389}
1390
1391std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1392ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1394{
1395 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1396 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1397 return lm;
1398}
1399
1400std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1401ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob,
1403{
1404
1406
1407 if (inFile->Get<TTree>(datasetName.data())) {
1408 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1409 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1411 }
1412
1413 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1414 "\" in file \"" + inFile->GetName() + "\".");
1415}
1416
1417std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1418ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1420{
1421
1422 if (fileNameGlobs.size() == 0)
1423 throw std::invalid_argument("RDataFrame: empty list of input files.");
1424
1426
1427 if (inFile->Get<TTree>(datasetName.data())) {
1428 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1429 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1431 }
1432
1433 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1434 "\" in file \"" + inFile->GetName() + "\".");
1435}
1436#endif
1437
1438// outlined to pin virtual table
1440
1441void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1442{
1443 if (dataSource) {
1444 fDataSource = std::move(dataSource);
1445 fDataSource->SetNSlots(fNSlots);
1446 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1447 }
1448}
1449
1450void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1452 std::atomic<ULong64_t> &entryCount)
1453{
1454#ifdef R__USE_IMT
1456 const auto &slot = slotRAII.fSlot;
1457
1458 const auto &[start, end] = entryRange;
1459 const auto nEntries = end - start;
1460 entryCount.fetch_add(nEntries);
1461
1462 RCallCleanUpTask cleanup(*this, slot);
1463 RDSRangeRAII _{*this, slot, start};
1464
1465 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1466
1467 try {
1468 for (auto entry = start; entry < end; ++entry) {
1469 if (fDataSource->SetEntry(slot, entry)) {
1470 RunAndCheckFilters(slot, entry);
1471 }
1472 }
1473 } catch (...) {
1474 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1475 throw;
1476 }
1477 fDataSource->FinalizeSlot(slot);
1478#else
1479 (void)entryRange;
1480 (void)slotStack;
1481 (void)entryCount;
1482#endif
1483}
1484
1486 std::atomic<ULong64_t> &entryCount)
1487{
1488#ifdef R__USE_IMT
1490 const auto &slot = slotRAII.fSlot;
1491
1492 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1493 const auto &[start, end] = entryRange;
1494 const auto nEntries = end - start;
1495 auto count = entryCount.fetch_add(nEntries);
1496
1497 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1498 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1499
1501 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1502 try {
1503 // recursive call to check filters and conditionally execute actions
1505 if (fNewSampleNotifier.CheckFlag(slot)) {
1506 UpdateSampleInfo(slot, treeReader);
1507 }
1508 RunAndCheckFilters(slot, count++);
1509 }
1510 } catch (...) {
1511 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1512 throw;
1513 }
1514 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1515 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1516 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1517 // something went wrong in the TTreeReader event loop
1518 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1519 std::to_string(treeReader.GetEntryStatus()));
1520 }
1521#else
1522 (void)treeReader;
1523 (void)slotStack;
1524 (void)entryCount;
1525#endif
1526}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:69
unsigned long long ULong64_t
Definition RtypesCore.h:70
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, std::string_view col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A Branch for the case of an object.
static TClass * Class()
A TTree is a list of TBranches.
Definition TBranch.h:93
static TClass * Class()
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4130
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
Mother of all ROOT objects.
Definition TObject.h:41
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:139
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1285
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4842
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5295
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9302
virtual TObjArray * GetListOfBranches()
Definition TTree.h:529
virtual TTree * GetTree() const
Definition TTree.h:558
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6035
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:41
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:581
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:305
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:563
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:192
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:574
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:592
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:349
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:586
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
R__EXTERN TVirtualRWMutex * gCoreMutex
static const char * what
Definition stlLoader.cc:5
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.