Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
20#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
21#include "ROOT/RLogger.hxx"
22#include "RtypesCore.h" // Long64_t
23#include "TStopwatch.h"
24#include "TBranchElement.h"
25#include "TBranchObject.h"
26#include "TChain.h"
27#include "TEntryList.h"
28#include "TFile.h"
29#include "TFriendElement.h"
30#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
31#include "TTreeReader.h"
32#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
33
34#ifdef R__USE_IMT
37#include "ROOT/RSlotStack.hxx"
38#endif
39
40#ifdef R__HAS_ROOT7
41#include "ROOT/RNTuple.hxx"
42#include "ROOT/RNTupleDS.hxx"
43#endif
44
45#include <algorithm>
46#include <atomic>
47#include <cassert>
48#include <functional>
49#include <iostream>
50#include <memory>
51#include <stdexcept>
52#include <string>
53#include <sstream>
54#include <thread>
55#include <unordered_map>
56#include <vector>
57#include <set>
58#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
59
60using namespace ROOT::Detail::RDF;
61using namespace ROOT::Internal::RDF;
62
63namespace {
64/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
65/// This allows different RLoopManager instances to share these data.
66/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
67/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
68/// much faster than jitting each RLoopManager's code separately.
69std::string &GetCodeToJit()
70{
71 static std::string code;
72 return code;
73}
74
75bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
76{
77 return (leaves.find(leaf) != leaves.end());
78}
79
80///////////////////////////////////////////////////////////////////////////////
81/// This overload does not check whether the leaf/branch is already in bNamesReg. In case this is a friend leaf/branch,
82/// `allowDuplicates` controls whether we add both `friendname.bname` and `bname` or just the shorter version.
83void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
84 const std::string &friendName, bool allowDuplicates)
85{
86 if (!friendName.empty()) {
87 // In case of a friend tree, users might prepend its name/alias to the branch names
88 const auto friendBName = friendName + "." + branchName;
89 if (bNamesReg.insert(friendBName).second)
90 bNames.push_back(friendBName);
91 }
92
93 if (allowDuplicates || friendName.empty()) {
94 if (bNamesReg.insert(branchName).second)
95 bNames.push_back(branchName);
96 }
97}
98
99///////////////////////////////////////////////////////////////////////////////
100/// This overload makes sure that the TLeaf has not been already inserted.
101void InsertBranchName(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
102 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
103 bool allowDuplicates)
104{
105 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
106 if (!canAdd) {
107 return;
108 }
109
110 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
111
112 foundLeaves.insert(leaf);
113}
114
115void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
116 std::string prefix, std::string &friendName, bool allowDuplicates)
117{
118 for (auto sb : *b->GetListOfBranches()) {
119 TBranch *subBranch = static_cast<TBranch *>(sb);
120 auto subBranchName = std::string(subBranch->GetName());
121 auto fullName = prefix + subBranchName;
122
123 std::string newPrefix;
124 if (!prefix.empty())
125 newPrefix = fullName + ".";
126
127 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName, allowDuplicates);
128
129 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
130 if (!branchDirectlyFromTree)
131 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
132 if (branchDirectlyFromTree)
133 InsertBranchName(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName,
134 allowDuplicates);
135
136 if (bNamesReg.find(subBranchName) == bNamesReg.end() && t.GetBranch(subBranchName.c_str()))
137 InsertBranchName(bNamesReg, bNames, subBranchName, friendName, allowDuplicates);
138 }
139}
140
141void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
142 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
143{
144 std::set<TLeaf *> foundLeaves;
145 if (!analysedTrees.insert(&t).second) {
146 return;
147 }
148
149 const auto branches = t.GetListOfBranches();
150 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
151 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
152 // operations
153 if (!t.GetTree()) {
154 std::string err("GetBranchNames: error in opening the tree ");
155 err += t.GetName();
156 throw std::runtime_error(err);
157 }
158 if (branches) {
159 for (auto b : *branches) {
160 TBranch *branch = static_cast<TBranch *>(b);
161 const auto branchName = std::string(branch->GetName());
162 if (branch->IsA() == TBranch::Class()) {
163 // Leaf list
164 auto listOfLeaves = branch->GetListOfLeaves();
165 if (listOfLeaves->GetEntriesUnsafe() == 1) {
166 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
167 InsertBranchName(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
168 }
169
170 for (auto leaf : *listOfLeaves) {
171 auto castLeaf = static_cast<TLeaf *>(leaf);
172 const auto leafName = std::string(leaf->GetName());
173 const auto fullName = branchName + "." + leafName;
174 InsertBranchName(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
175 }
176 } else if (branch->IsA() == TBranchObject::Class()) {
177 // TBranchObject
178 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName, allowDuplicates);
179 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
180 } else {
181 // TBranchElement
182 // Check if there is explicit or implicit dot in the name
183
184 bool dotIsImplied = false;
185 auto be = dynamic_cast<TBranchElement *>(b);
186 if (!be)
187 throw std::runtime_error("GetBranchNames: unsupported branch type");
188 // TClonesArray (3) and STL collection (4)
189 if (be->GetType() == 3 || be->GetType() == 4)
190 dotIsImplied = true;
191
192 if (dotIsImplied || branchName.back() == '.')
193 ExploreBranch(t, bNamesReg, bNames, branch, "", friendName, allowDuplicates);
194 else
195 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName, allowDuplicates);
196
197 InsertBranchName(bNamesReg, bNames, branchName, friendName, allowDuplicates);
198 }
199 }
200 }
201
202 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
203 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
204 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
205 auto friendTrees = t.GetTree()->GetListOfFriends();
206
207 if (!friendTrees)
208 return;
209
210 for (auto friendTreeObj : *friendTrees) {
211 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
212
213 std::string frName;
214 auto alias = t.GetFriendAlias(friendTree);
215 if (alias != nullptr)
216 frName = std::string(alias);
217 else
218 frName = std::string(friendTree->GetName());
219
220 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
221 }
222}
223
224void ThrowIfNSlotsChanged(unsigned int nSlots)
225{
226 const auto currentSlots = RDFInternal::GetNSlots();
227 if (currentSlots != nSlots) {
228 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
229 std::to_string(nSlots) + ", but when starting the event loop it was " +
230 std::to_string(currentSlots) + ".";
231 if (currentSlots > nSlots)
232 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
233 else
234 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
235 throw std::runtime_error(msg);
236 }
237}
238
239/**
240\struct MaxTreeSizeRAII
241\brief Scope-bound change of `TTree::fgMaxTreeSize`.
242
243This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
244changes it to maximum at construction time and restores it back at destruction
245time. Needed for issue #6523 and should be reverted when #6640 will be solved.
246*/
247struct MaxTreeSizeRAII {
248 Long64_t fOldMaxTreeSize;
249
250 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
251 {
252 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
253 }
254
255 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
256};
257
258struct DatasetLogInfo {
259 std::string fDataSet;
260 ULong64_t fRangeStart;
261 ULong64_t fRangeEnd;
262 unsigned int fSlot;
263};
264
265std::string LogRangeProcessing(const DatasetLogInfo &info)
266{
267 std::stringstream msg;
268 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
269 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
270 return msg.str();
271}
272
273DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
274{
275 const auto tree = r.GetTree();
276 const auto chain = dynamic_cast<TChain *>(tree);
277 std::string what;
278 if (chain) {
279 auto files = chain->GetListOfFiles();
280 std::vector<std::string> treeNames;
281 std::vector<std::string> fileNames;
282 for (TObject *f : *files) {
283 treeNames.emplace_back(f->GetName());
284 fileNames.emplace_back(f->GetTitle());
285 }
286 what = "trees {";
287 for (const auto &t : treeNames) {
288 what += t + ",";
289 }
290 what.back() = '}';
291 what += " in files {";
292 for (const auto &f : fileNames) {
293 what += f + ",";
294 }
295 what.back() = '}';
296 } else {
297 assert(tree != nullptr); // to make clang-tidy happy
298 const auto treeName = tree->GetName();
299 what = std::string("tree \"") + treeName + "\"";
300 const auto file = tree->GetCurrentFile();
301 if (file)
302 what += std::string(" in file \"") + file->GetName() + "\"";
303 }
304 const auto entryRange = r.GetEntriesRange();
305 const ULong64_t end = entryRange.second == -1ll ? tree->GetEntries() : entryRange.second;
306 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
307}
308
309auto MakeDatasetColReadersKey(const std::string &colName, const std::type_info &ti)
310{
311 // We use a combination of column name and column type name as the key because in some cases we might end up
312 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
313 // df.Sum<vector<int>>("stdVectorBranch");
314 // df.Sum<RVecI>("stdVectorBranch");
315 return colName + ':' + ti.name();
316}
317} // anonymous namespace
318
319namespace ROOT {
320namespace Detail {
321namespace RDF {
322
323/// A RAII object that calls RLoopManager::CleanUpTask at destruction
326 unsigned int fArg;
328
329 RCallCleanUpTask(RLoopManager &lm, unsigned int arg = 0u, TTreeReader *reader = nullptr)
330 : fLoopManager(lm), fArg(arg), fReader(reader)
331 {
332 }
334};
335
336} // namespace RDF
337} // namespace Detail
338} // namespace ROOT
339
340///////////////////////////////////////////////////////////////////////////////
341/// Get all the branches names, including the ones of the friend trees
343{
344 std::set<std::string> bNamesSet;
345 ColumnNames_t bNames;
346 std::set<TTree *> analysedTrees;
347 std::string emptyFrName = "";
348 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
349 return bNames;
350}
351
352RLoopManager::RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
353 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
354 fNSlots(RDFInternal::GetNSlots()),
355 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
356 fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots), fDatasetColumnReaders(fNSlots)
357{
358}
359
360RLoopManager::RLoopManager(std::unique_ptr<TTree> tree, const ColumnNames_t &defaultBranches)
361 : fTree(std::move(tree)),
362 fDefaultColumns(defaultBranches),
363 fNSlots(RDFInternal::GetNSlots()),
364 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
365 fNewSampleNotifier(fNSlots),
366 fSampleInfos(fNSlots),
367 fDatasetColumnReaders(fNSlots)
368{
369}
370
372 : fEmptyEntryRange(0, nEmptyEntries),
373 fNSlots(RDFInternal::GetNSlots()),
374 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
375 fNewSampleNotifier(fNSlots),
376 fSampleInfos(fNSlots),
377 fDatasetColumnReaders(fNSlots)
378{
379}
380
381RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
382 : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
383 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
384 fDataSource(std::move(ds)), fNewSampleNotifier(fNSlots), fSampleInfos(fNSlots), fDatasetColumnReaders(fNSlots)
385{
386 fDataSource->SetNSlots(fNSlots);
387}
388
390 : fNSlots(RDFInternal::GetNSlots()),
391 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
392 fNewSampleNotifier(fNSlots),
393 fSampleInfos(fNSlots),
394 fDatasetColumnReaders(fNSlots)
395{
396 ChangeSpec(std::move(spec));
397}
398
399/**
400 * @brief Changes the internal TTree held by the RLoopManager.
401 *
402 * @warning This method may lead to potentially dangerous interactions if used
403 * after the construction of the RDataFrame. Changing the specification makes
404 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
405 * new specification refers to exactly the same number of columns, with the
406 * same names and types. The actual use case of this method is moving the
407 * processing of the same RDataFrame to a different range of entries of the
408 * same dataset (which may be stored in a different set of files).
409 *
410 * @param spec The specification of the dataset to be adopted.
411 */
413{
414 // Change the range of entries to be processed
415 fBeginEntry = spec.GetEntryRangeBegin();
416 fEndEntry = spec.GetEntryRangeEnd();
417
418 // Store the samples
419 fSamples = spec.MoveOutSamples();
420 fSampleMap.clear();
421
422 // Create the internal main chain
424 for (auto &sample : fSamples) {
425 const auto &trees = sample.GetTreeNames();
426 const auto &files = sample.GetFileNameGlobs();
427 for (std::size_t i = 0ul; i < files.size(); ++i) {
428 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
429 // (see https://github.com/root-project/root/pull/8820 for why)
430 const auto fullpath = files[i] + "?#" + trees[i];
431 chain->Add(fullpath.c_str());
432 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
433 // change this easily because of backward compatibility: the sample ID
434 // is exposed to users via RSampleInfo and DefinePerSample).
435 const auto sampleId = files[i] + '/' + trees[i];
436 fSampleMap.insert({sampleId, &sample});
437 }
438 }
439 SetTree(std::move(chain));
440
441 // Create friends from the specification and connect them to the main chain
442 const auto &friendInfo = spec.GetFriendInfo();
444 for (std::size_t i = 0ul; i < fFriends.size(); i++) {
445 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
446 fTree->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
447 }
448}
449
450/// Run event loop with no source files, in parallel.
452{
453#ifdef R__USE_IMT
455 // Working with an empty tree.
456 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
457 const auto nEmptyEntries = GetNEmptyEntries();
458 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
459 auto remainder = nEmptyEntries % (fNSlots * 2);
460 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
461 ULong64_t begin = fEmptyEntryRange.first;
462 while (begin < fEmptyEntryRange.second) {
463 ULong64_t end = begin + nEntriesPerSlot;
464 if (remainder > 0) {
465 ++end;
466 --remainder;
467 }
468 entryRanges.emplace_back(begin, end);
469 begin = end;
470 }
471
472 // Each task will generate a subrange of entries
473 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
474 ROOT::Internal::RSlotStackRAII slotRAII(slotStack);
475 auto slot = slotRAII.fSlot;
476 RCallCleanUpTask cleanup(*this, slot);
477 InitNodeSlots(nullptr, slot);
478 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
479 try {
480 UpdateSampleInfo(slot, range);
481 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
482 RunAndCheckFilters(slot, currEntry);
483 }
484 } catch (...) {
485 // Error might throw in experiment frameworks like CMSSW
486 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
487 throw;
488 }
489 };
490
492 pool.Foreach(genFunction, entryRanges);
493
494#endif // not implemented otherwise
495}
496
497/// Run event loop with no source files, in sequence.
499{
500 InitNodeSlots(nullptr, 0);
501 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing(
502 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
503 RCallCleanUpTask cleanup(*this);
504 try {
506 for (ULong64_t currEntry = fEmptyEntryRange.first;
507 currEntry < fEmptyEntryRange.second && fNStopsReceived < fNChildren; ++currEntry) {
508 RunAndCheckFilters(0, currEntry);
509 }
510 } catch (...) {
511 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
512 throw;
513 }
514}
515
516/// Run event loop over one or multiple ROOT files, in parallel.
518{
519#ifdef R__USE_IMT
520 if (fEndEntry == fBeginEntry) // empty range => no work needed
521 return;
523 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
524 auto tp = (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
525 ? std::make_unique<ROOT::TTreeProcessorMT>(*fTree, fNSlots, std::make_pair(fBeginEntry, fEndEntry))
526 : std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots);
527
528 std::atomic<ULong64_t> entryCount(0ull);
529
530 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
531 ROOT::Internal::RSlotStackRAII slotRAII(slotStack);
532 auto slot = slotRAII.fSlot;
533 RCallCleanUpTask cleanup(*this, slot, &r);
534 InitNodeSlots(&r, slot);
535 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, slot));
536 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
537 const auto nEntries = entryRange.second - entryRange.first;
538 auto count = entryCount.fetch_add(nEntries);
539 try {
540 // recursive call to check filters and conditionally execute actions
541 while (r.Next()) {
542 if (fNewSampleNotifier.CheckFlag(slot)) {
543 UpdateSampleInfo(slot, r);
544 }
545 RunAndCheckFilters(slot, count++);
546 }
547 } catch (...) {
548 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
549 throw;
550 }
551 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
552 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
553 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
554 // something went wrong in the TTreeReader event loop
555 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
556 std::to_string(r.GetEntryStatus()));
557 }
558 });
559#endif // no-op otherwise (will not be called)
560}
561
562/// Run event loop over one or multiple ROOT files, in sequence.
564{
565 TTreeReader r(fTree.get(), fTree->GetEntryList());
566 if (0 == fTree->GetEntriesFast() || fBeginEntry == fEndEntry)
567 return;
568 // Apply the range if there is any
569 // In case of a chain with a total of N entries, calling SetEntriesRange(N + 1, ...) does not error out
570 // This is a bug, reported here: https://github.com/root-project/root/issues/10774
571 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
572 if (r.SetEntriesRange(fBeginEntry, fEndEntry) != TTreeReader::kEntryValid)
573 throw std::logic_error("Something went wrong in initializing the TTreeReader.");
574
575 RCallCleanUpTask cleanup(*this, 0u, &r);
576 InitNodeSlots(&r, 0);
577 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, 0u));
578
579 // recursive call to check filters and conditionally execute actions
580 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
581 try {
582 while (r.Next() && fNStopsReceived < fNChildren) {
584 UpdateSampleInfo(/*slot*/0, r);
585 }
586 RunAndCheckFilters(0, r.GetCurrentEntry());
587 }
588 } catch (...) {
589 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
590 throw;
591 }
592 if (r.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
593 // something went wrong in the TTreeReader event loop
594 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
595 std::to_string(r.GetEntryStatus()));
596 }
597}
598
599/// Run event loop over data accessed through a DataSource, in sequence.
601{
602 assert(fDataSource != nullptr);
603 fDataSource->Initialize();
604 auto ranges = fDataSource->GetEntryRanges();
605 while (!ranges.empty() && fNStopsReceived < fNChildren) {
606 InitNodeSlots(nullptr, 0u);
607 fDataSource->InitSlot(0u, 0ull);
608 RCallCleanUpTask cleanup(*this);
609 try {
610 for (const auto &range : ranges) {
611 const auto start = range.first;
612 const auto end = range.second;
613 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
614 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
615 if (fDataSource->SetEntry(0u, entry)) {
616 RunAndCheckFilters(0u, entry);
617 }
618 }
619 }
620 } catch (...) {
621 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
622 throw;
623 }
624 fDataSource->FinalizeSlot(0u);
625 ranges = fDataSource->GetEntryRanges();
626 }
627 fDataSource->Finalize();
628}
629
630/// Run event loop over data accessed through a DataSource, in parallel.
632{
633#ifdef R__USE_IMT
634 assert(fDataSource != nullptr);
637
638 // Each task works on a subrange of entries
639 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
640 ROOT::Internal::RSlotStackRAII slotRAII(slotStack);
641 const auto slot = slotRAII.fSlot;
642 InitNodeSlots(nullptr, slot);
643 RCallCleanUpTask cleanup(*this, slot);
644 fDataSource->InitSlot(slot, range.first);
645 const auto start = range.first;
646 const auto end = range.second;
647 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
648 try {
649 for (auto entry = start; entry < end; ++entry) {
650 if (fDataSource->SetEntry(slot, entry)) {
651 RunAndCheckFilters(slot, entry);
652 }
653 }
654 } catch (...) {
655 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
656 throw;
657 }
658 fDataSource->FinalizeSlot(slot);
659 };
660
661 fDataSource->Initialize();
662 auto ranges = fDataSource->GetEntryRanges();
663 while (!ranges.empty()) {
664 pool.Foreach(runOnRange, ranges);
665 ranges = fDataSource->GetEntryRanges();
666 }
667 fDataSource->Finalize();
668#endif // not implemented otherwise (never called)
669}
670
671/// Execute actions and make sure named filters are called for each event.
672/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
673void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
674{
675 // data-block callbacks run before the rest of the graph
676 if (fNewSampleNotifier.CheckFlag(slot)) {
677 for (auto &callback : fSampleCallbacks)
678 callback.second(slot, fSampleInfos[slot]);
680 }
681
682 for (auto *actionPtr : fBookedActions)
683 actionPtr->Run(slot, entry);
684 for (auto *namedFilterPtr : fBookedNamedFilters)
685 namedFilterPtr->CheckFilters(slot, entry);
686 for (auto &callback : fCallbacksEveryNEvents)
687 callback(slot);
688}
689
690/// Build TTreeReaderValues for all nodes
691/// This method loops over all filters, actions and other booked objects and
692/// calls their `InitSlot` method, to get them ready for running a task.
694{
695 SetupSampleCallbacks(r, slot);
696 for (auto *ptr : fBookedActions)
697 ptr->InitSlot(r, slot);
698 for (auto *ptr : fBookedFilters)
699 ptr->InitSlot(r, slot);
700 for (auto *ptr : fBookedDefines)
701 ptr->InitSlot(r, slot);
702 for (auto *ptr : fBookedVariations)
703 ptr->InitSlot(r, slot);
704
705 for (auto &callback : fCallbacksOnce)
706 callback(slot);
707}
708
710 if (r != nullptr) {
711 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
712 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
713 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
714 }
715 // Whatever the data source, initially set the "new data block" flag:
716 // - for TChains, this ensures that we don't skip the first data block because
717 // the correct tree is already loaded
718 // - for RDataSources and empty sources, which currently don't have data blocks, this
719 // ensures that we run once per task
721}
722
723void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
725 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
726}
727
729 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
730 auto *tree = r.GetTree()->GetTree();
731 R__ASSERT(tree != nullptr);
732 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
733 auto *file = tree->GetCurrentFile();
734 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
735
736 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
737 R__ASSERT(range.first >= 0);
738 if (range.second == -1) {
739 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
740 }
741 const std::string &id = fname + '/' + treename;
742 fSampleInfos[slot] = fSampleMap.empty() ? RSampleInfo(id, range) : RSampleInfo(id, range, fSampleMap[id]);
743}
744
745/// Initialize all nodes of the functional graph before running the event loop.
746/// This method is called once per event-loop and performs generic initialization
747/// operations that do not depend on the specific processing slot (i.e. operations
748/// that are common for all threads).
750{
752 for (auto *filter : fBookedFilters)
753 filter->InitNode();
754 for (auto *range : fBookedRanges)
755 range->InitNode();
756 for (auto *ptr : fBookedActions)
757 ptr->Initialize();
758}
759
760/// Perform clean-up operations. To be called at the end of each event loop.
762{
763 fMustRunNamedFilters = false;
764
765 // forget RActions and detach TResultProxies
766 for (auto *ptr : fBookedActions)
767 ptr->Finalize();
768
769 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
770 fBookedActions.clear();
771
772 // reset children counts
773 fNChildren = 0;
774 fNStopsReceived = 0;
775 for (auto *ptr : fBookedFilters)
776 ptr->ResetChildrenCount();
777 for (auto *ptr : fBookedRanges)
778 ptr->ResetChildrenCount();
779
781 fCallbacksOnce.clear();
782}
783
784/// Perform clean-up operations. To be called at the end of each task execution.
785void RLoopManager::CleanUpTask(TTreeReader *r, unsigned int slot)
786{
787 if (r != nullptr)
788 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
789 for (auto *ptr : fBookedActions)
790 ptr->FinalizeSlot(slot);
791 for (auto *ptr : fBookedFilters)
792 ptr->FinalizeSlot(slot);
793 for (auto *ptr : fBookedDefines)
794 ptr->FinalizeSlot(slot);
795
797 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
798 // because the TTreeReader object changes at every task
799 for (auto &v : fDatasetColumnReaders[slot])
800 v.second.reset();
801 }
802}
803
804/// Add RDF nodes that require just-in-time compilation to the computation graph.
805/// This method also clears the contents of GetCodeToJit().
807{
808 {
810 if (GetCodeToJit().empty()) {
811 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
812 return;
813 }
814 }
815
816 const std::string code = []() {
818 return std::move(GetCodeToJit());
819 }();
820
821 TStopwatch s;
822 s.Start();
823 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
824 s.Stop();
825 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
826 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
827 : " in less than 1ms.");
828}
829
830/// Trigger counting of number of children nodes for each node of the functional graph.
831/// This is done once before starting the event loop. Each action sends an `increase children count` signal
832/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
833/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
834/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
835/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
837{
838 for (auto *actionPtr : fBookedActions)
839 actionPtr->TriggerChildrenCount();
840 for (auto *namedFilterPtr : fBookedNamedFilters)
841 namedFilterPtr->TriggerChildrenCount();
842}
843
844/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
845/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
846/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
847void RLoopManager::Run(bool jit)
848{
849 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
850 MaxTreeSizeRAII ctxtmts;
851
852 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
853
854 ThrowIfNSlotsChanged(GetNSlots());
855
856 if (jit)
857 Jit();
858
859 InitNodes();
860
861 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
862 // we use RAII: even in case of an exception, the destructor of the object is invoked and
863 // all the cleanup takes place.
864 class NodesCleanerRAII {
865 RLoopManager &fRLM;
866
867 public:
868 NodesCleanerRAII(RLoopManager &thisRLM) : fRLM(thisRLM) {}
869 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
870 };
871
872 NodesCleanerRAII runKeeper(*this);
873
874 TStopwatch s;
875 s.Start();
876
877 switch (fLoopType) {
881 case ELoopType::kNoFiles: RunEmptySource(); break;
884 }
885 s.Stop();
886
887 fNRuns++;
888
889 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
890 << s.RealTime() << "s elapsed).";
891}
892
893/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
895{
896 return fDefaultColumns;
897}
898
900{
901 return fTree.get();
902}
903
905{
906 fBookedActions.emplace_back(actionPtr);
907 AddSampleCallback(actionPtr, actionPtr->GetSampleCallback());
908}
909
911{
914 fSampleCallbacks.erase(actionPtr);
915}
916
918{
919 fBookedFilters.emplace_back(filterPtr);
920 if (filterPtr->HasName()) {
921 fBookedNamedFilters.emplace_back(filterPtr);
923 }
924}
925
927{
930}
931
933{
934 fBookedRanges.emplace_back(rangePtr);
935}
936
938{
940}
941
943{
944 fBookedDefines.emplace_back(ptr);
945}
946
948{
950 fSampleCallbacks.erase(ptr);
951}
952
954{
955 fBookedVariations.emplace_back(v);
956}
957
959{
961}
962
963// dummy call, end of recursive chain of calls
965{
966 return true;
967}
968
969/// Call `FillReport` on all booked filters
971{
972 for (const auto *fPtr : fBookedNamedFilters)
973 fPtr->FillReport(rep);
974}
975
976void RLoopManager::SetTree(std::shared_ptr<TTree> tree)
977{
978 fTree = std::move(tree);
979
980 TChain *ch = nullptr;
981 if ((ch = dynamic_cast<TChain *>(fTree.get())))
983}
984
985void RLoopManager::ToJitExec(const std::string &code) const
986{
988 GetCodeToJit().append(code);
989}
990
991void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
992{
993 if (everyNEvents == 0ull)
994 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
995 else
996 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
997}
998
999std::vector<std::string> RLoopManager::GetFiltersNames()
1000{
1001 std::vector<std::string> filters;
1002 for (auto *filter : fBookedFilters) {
1003 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1004 filters.push_back(name);
1005 }
1006 return filters;
1007}
1008
1009std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1010{
1011 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1012 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1013 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1014 return nodes;
1015}
1016
1017std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1018{
1019 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1020 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1021 std::copy(fRunActions.begin(), fRunActions.end(), it);
1022 return actions;
1023}
1024
1025std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1026 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1027{
1028 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1029 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1030 if (duplicateRLoopManagerIt != visitedMap.end())
1031 return duplicateRLoopManagerIt->second;
1032
1033 std::string name;
1034 if (fDataSource) {
1035 name = fDataSource->GetLabel();
1036 } else if (fTree) {
1037 name = fTree->GetName();
1038 if (name.empty())
1039 name = fTree->ClassName();
1040 } else {
1041 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1042 }
1043 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1045 visitedMap[(void *)this] = thisNode;
1046 return thisNode;
1047}
1048
1049////////////////////////////////////////////////////////////////////////////
1050/// Return all valid TTree::Branch names (caching results for subsequent calls).
1051/// Never use fBranchNames directy, always request it through this method.
1053{
1054 if (fValidBranchNames.empty() && fTree) {
1055 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
1056 }
1057 return fValidBranchNames;
1058}
1059
1060/// Return true if AddDataSourceColumnReaders was called for column name col.
1061bool RLoopManager::HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
1062{
1063 const auto key = MakeDatasetColReadersKey(col, ti);
1064 assert(fDataSource != nullptr);
1065 // since data source column readers are always added for all slots at the same time,
1066 // if the reader is present for slot 0 we have it for all other slots as well.
1067 return fDatasetColumnReaders[0].find(key) != fDatasetColumnReaders[0].end();
1068}
1069
1071 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1072 const std::type_info &ti)
1073{
1074 const auto key = MakeDatasetColReadersKey(col, ti);
1075 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1076 assert(readers.size() == fNSlots);
1077
1078 for (auto slot = 0u; slot < fNSlots; ++slot) {
1079 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1080 }
1081}
1082
1083// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1084/// \brief Register a new RTreeColumnReader with this RLoopManager.
1085/// \return A shared pointer to the inserted column reader.
1086RColumnReaderBase *RLoopManager::AddTreeColumnReader(unsigned int slot, const std::string &col,
1087 std::unique_ptr<RColumnReaderBase> &&reader,
1088 const std::type_info &ti)
1089{
1090 auto &readers = fDatasetColumnReaders[slot];
1091 const auto key = MakeDatasetColReadersKey(col, ti);
1092 // if a reader for this column and this slot was already there, we are doing something wrong
1093 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1094 auto *rptr = reader.get();
1095 readers[key] = std::move(reader);
1096 return rptr;
1097}
1098
1100RLoopManager::GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
1101{
1102 const auto key = MakeDatasetColReadersKey(col, ti);
1103 auto it = fDatasetColumnReaders[slot].find(key);
1104 if (it != fDatasetColumnReaders[slot].end())
1105 return it->second.get();
1106 else
1107 return nullptr;
1108}
1109
1111{
1112 if (callback)
1113 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1114}
1115
1116void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1117{
1118 fEmptyEntryRange = std::move(newRange);
1119}
1120
1121/**
1122 * \brief Helper function to open a file (or the first file from a glob).
1123 * This function is used at construction time of an RDataFrame, to check the
1124 * concrete type of the dataset stored inside the file.
1125 */
1126std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
1127{
1128 // Follow same logic in TChain::Add to find the correct string to look for globbing:
1129 // - If the extension ".root" is present in the file name, pass along the basename.
1130 // - If not, use the "?" token to delimit the part of the string which represents the basename.
1131 // - Otherwise, pass the full filename.
1132 auto &&baseNameAndQuery = [&fileNameGlob]() {
1133 constexpr std::string_view delim{".root"};
1134 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
1135 it != fileNameGlob.end()) {
1136 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
1137 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
1138 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
1139 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
1140 else
1141 return std::make_pair(fileNameGlob, std::string_view{});
1142 }();
1143 // Captured structured bindings variable are only valid since C++20
1144 auto &&baseName = baseNameAndQuery.first;
1145 auto &&query = baseNameAndQuery.second;
1146
1147 const auto nameHasWildcard = [&baseName]() {
1148 constexpr std::array<char, 4> wildCards{'[', ']', '*', '?'}; // Wildcards accepted by TChain::Add
1149 return std::any_of(wildCards.begin(), wildCards.end(),
1150 [&baseName](auto &&wc) { return baseName.find(wc) != std::string_view::npos; });
1151 }();
1152
1153 // Open first file in case of glob, suppose all files in the glob use the same data format
1154 std::string fileToOpen{nameHasWildcard
1155 ? ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName})[0] + std::string{query}
1156 : fileNameGlob};
1157
1158 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
1159 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
1160 if (!inFile || inFile->IsZombie())
1161 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
1162
1163 return inFile;
1164}
1165
1166std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1167ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
1168 const ROOT::RDF::ColumnNames_t &defaultColumns, bool checkFile)
1169{
1170 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1171 // Creating an RDataFrame with a non-existing file will throw early rather
1172 // than wait for the start of the graph execution.
1173 if (checkFile) {
1174 OpenFileWithSanityChecks(fileNameGlob);
1175 }
1176 std::string datasetNameInt{datasetName};
1177 std::string fileNameGlobInt{fileNameGlob};
1178 auto chain = ROOT::Internal::TreeUtils::MakeChainForMT(datasetNameInt.c_str());
1179 chain->Add(fileNameGlobInt.c_str());
1180 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1181 return lm;
1182}
1183
1184std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1185ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1186 const std::vector<std::string> &defaultColumns, bool checkFile)
1187{
1188 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1189 // Creating an RDataFrame with a non-existing file will throw early rather
1190 // than wait for the start of the graph execution.
1191 if (checkFile) {
1192 OpenFileWithSanityChecks(fileNameGlobs[0]);
1193 }
1194 std::string treeNameInt(datasetName);
1195 auto chain = ROOT::Internal::TreeUtils::MakeChainForMT(treeNameInt);
1196 for (auto &f : fileNameGlobs)
1197 chain->Add(f.c_str());
1198 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(chain), defaultColumns);
1199 return lm;
1200}
1201
1202#ifdef R__HAS_ROOT7
1203std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1204ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob,
1205 const ROOT::RDF::ColumnNames_t &defaultColumns)
1206{
1207 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlob);
1208 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1209 return lm;
1210}
1211
1212std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1213ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1214 const ROOT::RDF::ColumnNames_t &defaultColumns)
1215{
1216 auto dataSource = std::make_unique<ROOT::Experimental::RNTupleDS>(datasetName, fileNameGlobs);
1217 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1218 return lm;
1219}
1220
1221std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1222ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob,
1223 const ROOT::RDF::ColumnNames_t &defaultColumns)
1224{
1225
1226 auto inFile = OpenFileWithSanityChecks(fileNameGlob);
1227
1228 if (inFile->Get<TTree>(datasetName.data())) {
1229 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1230 } else if (inFile->Get<ROOT::Experimental::RNTuple>(datasetName.data())) {
1231 return CreateLMFromRNTuple(datasetName, fileNameGlob, defaultColumns);
1232 }
1233
1234 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1235 "\" in file \"" + inFile->GetName() + "\".");
1236}
1237
1238std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1239ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1240 const ROOT::RDF::ColumnNames_t &defaultColumns)
1241{
1242
1243 auto inFile = OpenFileWithSanityChecks(fileNameGlobs[0]);
1244
1245 if (inFile->Get<TTree>(datasetName.data())) {
1246 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1247 } else if (inFile->Get<ROOT::Experimental::RNTuple>(datasetName.data())) {
1248 return CreateLMFromRNTuple(datasetName, fileNameGlobs, defaultColumns);
1249 }
1250
1251 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1252 "\" in file \"" + inFile->GetName() + "\".");
1253}
1254#endif
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:365
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
#define R__ASSERT(e)
Definition TError.h:118
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
char name[80]
Definition TGX11.cxx:110
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the fTree. Only used if we constructed fTree ourselves.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
virtual ROOT::RDF::SampleCallback_t GetSampleCallback()=0
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
static TClass * Class()
A TTree is a list of TBranches.
Definition TBranch.h:93
static TClass * Class()
TClass * IsA() const override
Definition TBranch.h:295
TObjArray * GetListOfLeaves()
Definition TBranch.h:247
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TObjArray * GetListOfFiles() const
Definition TChain.h:111
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4089
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4841
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5294
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9187
virtual TObjArray * GetListOfBranches()
Definition TTree.h:488
virtual TTree * GetTree() const
Definition TTree.h:517
virtual TList * GetListOfFriends() const
Definition TTree.h:490
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6032
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::Experimental::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:37
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:301
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:189
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:345
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
std::vector< std::string > ColumnNames_t
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:570
R__EXTERN TVirtualRWMutex * gCoreMutex
static const char * what
Definition stlLoader.cc:5
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.