Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
21#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
22#include "ROOT/RLogger.hxx"
23#include "ROOT/RNTuple.hxx"
24#include "ROOT/RNTupleDS.hxx"
25#include "RtypesCore.h" // Long64_t
26#include "TStopwatch.h"
27#include "TBranchElement.h"
28#include "TBranchObject.h"
29#include "TChain.h"
30#include "TEntryList.h"
31#include "TFile.h"
32#include "TFriendElement.h"
33#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
34#include "TTreeReader.h"
35#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
36
37#include "ROOT/RTTreeDS.hxx"
38
39#ifdef R__USE_IMT
42#include "ROOT/RSlotStack.hxx"
43#endif
44
45#ifdef R__UNIX
46// Functions needed to perform EOS XRootD redirection in ChangeSpec
47#include "TEnv.h"
48#include "TSystem.h"
49#ifndef R__FBSD
50#include <sys/xattr.h>
51#else
52#include <sys/extattr.h>
53#endif
54#ifdef R__MACOSX
55/* On macOS getxattr takes two extra arguments that should be set to 0 */
56#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
57#endif
58#ifdef R__FBSD
59#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
60#endif
61#endif
62
63#include <algorithm>
64#include <atomic>
65#include <cassert>
66#include <functional>
67#include <iostream>
68#include <memory>
69#include <stdexcept>
70#include <string>
71#include <sstream>
72#include <thread>
73#include <unordered_map>
74#include <vector>
75#include <set>
76#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
77
78using namespace ROOT::Detail::RDF;
79using namespace ROOT::Internal::RDF;
80
81namespace {
82/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
83/// This allows different RLoopManager instances to share these data.
84/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
85/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
86/// much faster than jitting each RLoopManager's code separately.
87std::string &GetCodeToJit()
88{
89 static std::string code;
90 return code;
91}
92
93void ThrowIfNSlotsChanged(unsigned int nSlots)
94{
96 if (currentSlots != nSlots) {
97 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
98 std::to_string(nSlots) + ", but when starting the event loop it was " +
99 std::to_string(currentSlots) + ".";
100 if (currentSlots > nSlots)
101 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
102 else
103 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
104 throw std::runtime_error(msg);
105 }
106}
107
108/**
109\struct MaxTreeSizeRAII
110\brief Scope-bound change of `TTree::fgMaxTreeSize`.
111
112This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
113changes it to maximum at construction time and restores it back at destruction
114time. Needed for issue #6523 and should be reverted when #6640 will be solved.
115*/
116struct MaxTreeSizeRAII {
117 Long64_t fOldMaxTreeSize;
118
119 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
120 {
121 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
122 }
123
124 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
125};
126
127struct DatasetLogInfo {
128 std::string fDataSet;
129 ULong64_t fRangeStart;
130 ULong64_t fRangeEnd;
131 unsigned int fSlot;
132};
133
134std::string LogRangeProcessing(const DatasetLogInfo &info)
135{
136 std::stringstream msg;
137 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
138 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
139 return msg.str();
140}
141
142auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
143{
144 // We use a combination of column name and column type name as the key because in some cases we might end up
145 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
146 // df.Sum<vector<int>>("stdVectorBranch");
147 // df.Sum<RVecI>("stdVectorBranch");
148 return std::string(colName) + ':' + ti.name();
149}
150} // anonymous namespace
151
152/**
153 * \brief Helper function to open a file (or the first file from a glob).
154 * This function is used at construction time of an RDataFrame, to check the
155 * concrete type of the dataset stored inside the file.
156 */
157std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
158{
159 // Follow same logic in TChain::Add to find the correct string to look for globbing:
160 // - If the extension ".root" is present in the file name, pass along the basename.
161 // - If not, use the "?" token to delimit the part of the string which represents the basename.
162 // - Otherwise, pass the full filename.
163 auto &&baseNameAndQuery = [&fileNameGlob]() {
164 constexpr std::string_view delim{".root"};
165 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
166 it != fileNameGlob.end()) {
167 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
168 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
169 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
170 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
171 else
172 return std::make_pair(fileNameGlob, std::string_view{});
173 }();
174 // Captured structured bindings variable are only valid since C++20
175 auto &&baseName = baseNameAndQuery.first;
176 auto &&query = baseNameAndQuery.second;
177
178 std::string fileToOpen{fileNameGlob};
179 if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
180 const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
181 if (expanded.empty())
182 throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
183 "' did not match any files."};
184
185 fileToOpen = expanded.front() + std::string{query};
186 }
187
188 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
189 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
190 if (!inFile || inFile->IsZombie())
191 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
192
193 return inFile;
194}
195
196namespace ROOT {
197namespace Detail {
198namespace RDF {
199
200/// A RAII object that calls RLoopManager::CleanUpTask at destruction
212
213} // namespace RDF
214} // namespace Detail
215} // namespace ROOT
216
217ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
218 : fDefaultColumns(defaultColumns),
219 fNSlots(RDFInternal::GetNSlots()),
220 fNewSampleNotifier(fNSlots),
221 fSampleInfos(fNSlots),
222 fDatasetColumnReaders(fNSlots)
223{
224}
225
227 : fDefaultColumns(defaultBranches),
228 fNSlots(RDFInternal::GetNSlots()),
229 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
230 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
231 fNewSampleNotifier(fNSlots),
232 fSampleInfos(fNSlots),
233 fDatasetColumnReaders(fNSlots)
234{
235 fDataSource->SetNSlots(fNSlots);
236}
237
239 : fEmptyEntryRange(0, nEmptyEntries),
240 fNSlots(RDFInternal::GetNSlots()),
241 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
242 fNewSampleNotifier(fNSlots),
243 fSampleInfos(fNSlots),
244 fDatasetColumnReaders(fNSlots)
245{
246}
247
248RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
249 : fDefaultColumns(defaultBranches),
250 fNSlots(RDFInternal::GetNSlots()),
251 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
252 fDataSource(std::move(ds)),
253 fNewSampleNotifier(fNSlots),
254 fSampleInfos(fNSlots),
255 fDatasetColumnReaders(fNSlots)
256{
257 fDataSource->SetNSlots(fNSlots);
258}
259
261 : fNSlots(RDFInternal::GetNSlots()),
262 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
263 fNewSampleNotifier(fNSlots),
264 fSampleInfos(fNSlots),
265 fDatasetColumnReaders(fNSlots)
266{
267 ChangeSpec(std::move(spec));
268}
269
270#ifdef R__UNIX
271namespace {
272std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
273{
274 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
275 // If so, we create a redirected sample ID with the full xroot URL.
276 TString expandedUrl(path.data());
278 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
279 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
280 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
281 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
282 if (len > 0) {
283 std::string xurl(len, 0);
284 std::string fileNameFromUrl{fileurl.GetFile()};
285 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
286 // Sometimes the `getxattr` call may return an invalid URL due
287 // to the POSIX attribute not being yet completely filled by EOS.
288 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
289 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
290 return xurl + '/' + datasetName.data();
291 }
292 }
293 }
294 }
295 }
296
297 return std::nullopt;
298}
299} // namespace
300#endif
301
302/**
303 * @brief Changes the internal TTree held by the RLoopManager.
304 *
305 * @warning This method may lead to potentially dangerous interactions if used
306 * after the construction of the RDataFrame. Changing the specification makes
307 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
308 * new specification refers to exactly the same number of columns, with the
309 * same names and types. The actual use case of this method is moving the
310 * processing of the same RDataFrame to a different range of entries of the
311 * same dataset (which may be stored in a different set of files).
312 *
313 * @param spec The specification of the dataset to be adopted.
314 */
316{
317 auto filesVec = spec.GetFileNameGlobs();
319 filesVec[0]); // we only need the first file, we assume all files are either TTree or RNTuple
320 auto datasetName = spec.GetTreeNames();
321
322 // Change the range of entries to be processed
323 fBeginEntry = spec.GetEntryRangeBegin();
324 fEndEntry = spec.GetEntryRangeEnd();
325
326 // Store the samples
327 fSamples = spec.MoveOutSamples();
328 fSampleMap.clear();
329
330 const bool isTTree = inFile->Get<TTree>(datasetName[0].data());
331 const bool isRNTuple = inFile->Get<ROOT::RNTuple>(datasetName[0].data());
332
333 if (isTTree || isRNTuple) {
334
335 if (isTTree) {
336 // Create the internal main chain
338 for (auto &sample : fSamples) {
339 const auto &trees = sample.GetTreeNames();
340 const auto &files = sample.GetFileNameGlobs();
341 for (std::size_t i = 0ul; i < files.size(); ++i) {
342 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
343 // (see https://github.com/root-project/root/pull/8820 for why)
344 const auto fullpath = files[i] + "?#" + trees[i];
345 chain->Add(fullpath.c_str());
346 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
347 // change this easily because of backward compatibility: the sample ID
348 // is exposed to users via RSampleInfo and DefinePerSample).
349 const auto sampleId = files[i] + '/' + trees[i];
350 fSampleMap.insert({sampleId, &sample});
351#ifdef R__UNIX
352 // Also add redirected EOS xroot URL when available
354 fSampleMap.insert({redirectedSampleId.value(), &sample});
355#endif
356 }
357 }
358 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
359 } else if (isRNTuple) {
360
361 std::vector<std::string> fileNames;
362 std::set<std::string> rntupleNames;
363
364 for (auto &sample : fSamples) {
365 const auto &trees = sample.GetTreeNames();
366 const auto &files = sample.GetFileNameGlobs();
367 for (std::size_t i = 0ul; i < files.size(); ++i) {
368 const auto sampleId = files[i] + '/' + trees[i];
369 fSampleMap.insert({sampleId, &sample});
370 fileNames.push_back(files[i]);
371 rntupleNames.insert(trees[i]);
372
373#ifdef R__UNIX
374 // Also add redirected EOS xroot URL when available
376 fSampleMap.insert({redirectedSampleId.value(), &sample});
377#endif
378 }
379 }
380
381 if (rntupleNames.size() == 1) {
382 fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
383
384 } else {
385 throw std::runtime_error(
386 "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
387 }
388 }
389
390 fDataSource->SetNSlots(fNSlots);
391
392 for (unsigned int slot{}; slot < fNSlots; slot++) {
393 for (auto &v : fDatasetColumnReaders[slot])
394 v.second.reset();
395 }
396 } else {
397 throw std::invalid_argument(
398 "RDataFrame: unsupported data format for dataset. Make sure you use TTree or RNTuple.");
399 }
400}
401
402/// Run event loop with no source files, in parallel.
404{
405#ifdef R__USE_IMT
406 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack = SlotStack();
407 // Working with an empty tree.
408 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
409 const auto nEmptyEntries = GetNEmptyEntries();
410 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
411 auto remainder = nEmptyEntries % (fNSlots * 2);
412 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
413 ULong64_t begin = fEmptyEntryRange.first;
414 while (begin < fEmptyEntryRange.second) {
415 ULong64_t end = begin + nEntriesPerSlot;
416 if (remainder > 0) {
417 ++end;
418 --remainder;
419 }
420 entryRanges.emplace_back(begin, end);
421 begin = end;
422 }
423
424 // Each task will generate a subrange of entries
425 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
427 auto slot = slotRAII.fSlot;
428 RCallCleanUpTask cleanup(*this, slot);
429 InitNodeSlots(nullptr, slot);
430 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
431 try {
433 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
435 }
436 } catch (...) {
437 // Error might throw in experiment frameworks like CMSSW
438 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
439 throw;
440 }
441 };
442
444 pool.Foreach(genFunction, entryRanges);
445
446#endif // not implemented otherwise
447}
448
449/// Run event loop with no source files, in sequence.
451{
452 InitNodeSlots(nullptr, 0);
454 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
455 RCallCleanUpTask cleanup(*this);
456 try {
461 }
462 } catch (...) {
463 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
464 throw;
465 }
466}
467
468#ifdef R__USE_IMT
469namespace {
470/// Return true on succesful entry read.
471///
472/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
473/// there can be other situations where the read is still valid. For now, these
474/// are:
475/// - If there was no match of the current entry in one or more friend trees
476/// according to their respective indexes.
477/// - If there was a missing branch at the start of a new tree in the dataset.
478///
479/// In such situations, although the entry is not complete, the processing
480/// should not be aborted and nodes of the computation graph will take action
481/// accordingly.
483{
484 treeReader.Next();
485 switch (treeReader.GetEntryStatus()) {
486 case TTreeReader::kEntryValid: return true;
487 case TTreeReader::kIndexedFriendNoMatch: return true;
489 default: return false;
490 }
491}
492} // namespace
493#endif
494
495namespace {
496struct DSRunRAII {
498 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
499 {
501 }
502 ~DSRunRAII() { fDS.Finalize(); }
503};
504} // namespace
505
508 unsigned int fSlot;
511 TTreeReader *treeReader = nullptr)
512 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
513 {
514 fLM.InitNodeSlots(fTreeReader, fSlot);
515 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
516 }
518};
519
520/// Run event loop over data accessed through a DataSource, in sequence.
522{
523 assert(fDataSource != nullptr);
524 // Shortcut if the entry range would result in not reading anything
525 if (fBeginEntry == fEndEntry)
526 return;
527 // Apply global entry range if necessary
528 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
529 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
530 // Initialize data source and book finalization
532 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
533 // sources that need it (currently only TTree).
534 RCallCleanUpTask cleanup(*this);
535
536 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
537 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
538 // TTree).
539 std::uint64_t processedEntries{};
540 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
541 do {
542
544
545 ranges = fDataSource->GetEntryRanges();
546
548
549 try {
550 for (const auto &range : ranges) {
551 const auto start = range.first;
552 const auto end = range.second;
553 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
554 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
555 if (fDataSource->SetEntry(0u, entry)) {
557 }
559 }
560 }
561 } catch (...) {
562 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
563 throw;
564 }
565
566 } while (!ranges.empty() && fNStopsReceived < fNChildren);
567
569
570 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
571 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
572 std::ostringstream buf{};
573 buf << "RDataFrame stopped processing after ";
574 buf << processedEntries;
575 buf << " entries, whereas an entry range (begin=";
576 buf << fBeginEntry;
577 buf << ",end=";
578 buf << fEndEntry;
579 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
580 buf << (fBeginEntry + processedEntries);
581 buf << ".";
582 Warning("RDataFrame::Run", "%s", buf.str().c_str());
583 }
584}
585
586/// Run event loop over data accessed through a DataSource, in parallel.
588{
589#ifdef R__USE_IMT
590 assert(fDataSource != nullptr);
591 // Shortcut if the entry range would result in not reading anything
592 if (fBeginEntry == fEndEntry)
593 return;
594 // Apply global entry range if necessary
595 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
596 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
597
599
601
602#endif // not implemented otherwise (never called)
603}
604
605/// Execute actions and make sure named filters are called for each event.
606/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
608{
609 // data-block callbacks run before the rest of the graph
611 for (auto &callback : fSampleCallbacks)
612 callback.second(slot, fSampleInfos[slot]);
614 }
615
616 for (auto *actionPtr : fBookedActions)
617 actionPtr->Run(slot, entry);
619 namedFilterPtr->CheckFilters(slot, entry);
620 for (auto &callback : fCallbacksEveryNEvents)
621 callback(slot);
622}
623
624/// Build TTreeReaderValues for all nodes
625/// This method loops over all filters, actions and other booked objects and
626/// calls their `InitSlot` method, to get them ready for running a task.
628{
630 for (auto *ptr : fBookedActions)
631 ptr->InitSlot(r, slot);
632 for (auto *ptr : fBookedFilters)
633 ptr->InitSlot(r, slot);
634 for (auto *ptr : fBookedDefines)
635 ptr->InitSlot(r, slot);
636 for (auto *ptr : fBookedVariations)
637 ptr->InitSlot(r, slot);
638
639 for (auto &callback : fCallbacksOnce)
640 callback(slot);
641}
642
644 if (r != nullptr) {
645 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
646 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
647 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
648 }
649 // Whatever the data source, initially set the "new data block" flag:
650 // - for TChains, this ensures that we don't skip the first data block because
651 // the correct tree is already loaded
652 // - for RDataSources and empty sources, which currently don't have data blocks, this
653 // ensures that we run once per task
655}
656
657void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
659 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
660}
661
663 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
664 auto *tree = r.GetTree()->GetTree();
665 R__ASSERT(tree != nullptr);
666 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
667 auto *file = tree->GetCurrentFile();
668 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
669
670 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
671 R__ASSERT(range.first >= 0);
672 if (range.second == -1) {
673 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
674 }
675 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
676 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
677 if (fSampleMap.empty()) {
679 } else {
680 if (fSampleMap.find(id) == fSampleMap.end())
681 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
683 }
684}
685
686/// Create a slot stack with the desired number of slots or reuse a shared instance.
687/// When a LoopManager runs in isolation, it will create its own slot stack from the
688/// number of slots. When it runs as part of RunGraphs(), each loop manager will be
689/// assigned a shared slot stack, so dataframe helpers can be shared in a thread-safe
690/// manner.
691std::shared_ptr<ROOT::Internal::RSlotStack> RLoopManager::SlotStack() const
692{
693#ifdef R__USE_IMT
694 if (auto shared = fSlotStack.lock(); shared) {
695 return shared;
696 } else {
697 return std::make_shared<ROOT::Internal::RSlotStack>(fNSlots);
698 }
699#else
700 return nullptr;
701#endif
702}
703
704/// Initialize all nodes of the functional graph before running the event loop.
705/// This method is called once per event-loop and performs generic initialization
706/// operations that do not depend on the specific processing slot (i.e. operations
707/// that are common for all threads).
709{
711 for (auto *filter : fBookedFilters)
712 filter->InitNode();
713 for (auto *range : fBookedRanges)
714 range->InitNode();
715 for (auto *ptr : fBookedActions)
716 ptr->Initialize();
717}
718
719/// Perform clean-up operations. To be called at the end of each event loop.
721{
722 fMustRunNamedFilters = false;
723
724 // forget RActions and detach TResultProxies
725 for (auto *ptr : fBookedActions)
726 ptr->Finalize();
727
728 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
729 fBookedActions.clear();
730
731 // reset children counts
732 fNChildren = 0;
733 fNStopsReceived = 0;
734 for (auto *ptr : fBookedFilters)
735 ptr->ResetChildrenCount();
736 for (auto *ptr : fBookedRanges)
737 ptr->ResetChildrenCount();
738
740 fCallbacksOnce.clear();
741}
742
743/// Perform clean-up operations. To be called at the end of each task execution.
745{
746 if (r != nullptr)
747 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
748 for (auto *ptr : fBookedActions)
749 ptr->FinalizeSlot(slot);
750 for (auto *ptr : fBookedFilters)
751 ptr->FinalizeSlot(slot);
752 for (auto *ptr : fBookedDefines)
753 ptr->FinalizeSlot(slot);
754
755 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
756 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
757 // because the TTreeReader object changes at every task
758 for (auto &v : fDatasetColumnReaders[slot])
759 v.second.reset();
760 }
761}
762
763/// Add RDF nodes that require just-in-time compilation to the computation graph.
764/// This method also clears the contents of GetCodeToJit().
766{
767 {
769 if (GetCodeToJit().empty()) {
770 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
771 return;
772 }
773 }
774
775 const std::string code = []() {
777 return std::move(GetCodeToJit());
778 }();
779
780 TStopwatch s;
781 s.Start();
782 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
783 s.Stop();
784 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
785 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
786 : " in less than 1ms.");
787}
788
789/// Trigger counting of number of children nodes for each node of the functional graph.
790/// This is done once before starting the event loop. Each action sends an `increase children count` signal
791/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
792/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
793/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
794/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
796{
797 for (auto *actionPtr : fBookedActions)
798 actionPtr->TriggerChildrenCount();
800 namedFilterPtr->TriggerChildrenCount();
801}
802
803/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
804/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
805/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
807{
808 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
809 MaxTreeSizeRAII ctxtmts;
810
811 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
812
814
815 if (jit)
816 Jit();
817
818 InitNodes();
819
820 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
821 // we use RAII: even in case of an exception, the destructor of the object is invoked and
822 // all the cleanup takes place.
823 class NodesCleanerRAII {
825
826 public:
828 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
829 };
830
832
833 TStopwatch s;
834 s.Start();
835
836 switch (fLoopType) {
838 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
839 break;
842 case ELoopType::kNoFiles: RunEmptySource(); break;
844 }
845 s.Stop();
846
847 fNRuns++;
848
849 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
850 << s.RealTime() << "s elapsed).";
851}
852
853/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
858
864
871
873{
874 fBookedFilters.emplace_back(filterPtr);
875 if (filterPtr->HasName()) {
876 fBookedNamedFilters.emplace_back(filterPtr);
878 }
879}
880
886
891
896
898{
899 fBookedDefines.emplace_back(ptr);
900}
901
907
912
917
918// dummy call, end of recursive chain of calls
920{
921 return true;
922}
923
924/// Call `FillReport` on all booked filters
926{
927 for (const auto *fPtr : fBookedNamedFilters)
928 fPtr->FillReport(rep);
929}
930
931void RLoopManager::ToJitExec(const std::string &code) const
932{
934 GetCodeToJit().append(code);
935}
936
937void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
938{
939 if (everyNEvents == 0ull)
940 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
941 else
942 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
943}
944
945std::vector<std::string> RLoopManager::GetFiltersNames()
946{
947 std::vector<std::string> filters;
948 for (auto *filter : fBookedFilters) {
949 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
950 filters.push_back(name);
951 }
952 return filters;
953}
954
955std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
956{
957 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
958 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
959 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
960 return nodes;
961}
962
963std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
964{
965 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
966 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
967 std::copy(fRunActions.begin(), fRunActions.end(), it);
968 return actions;
969}
970
971std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
972 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
973{
974 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
975 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
977 return duplicateRLoopManagerIt->second;
978
979 std::string name;
980 if (fDataSource) {
981 name = fDataSource->GetLabel();
982 } else {
983 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
984 }
985 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
987 visitedMap[(void *)this] = thisNode;
988 return thisNode;
989}
990
991/// Return true if AddDataSourceColumnReaders was called for column name col.
992bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
993{
994 const auto key = MakeDatasetColReadersKey(col, ti);
995 assert(fDataSource != nullptr);
996 // since data source column readers are always added for all slots at the same time,
997 // if the reader is present for slot 0 we have it for all other slots as well.
998 auto it = fDatasetColumnReaders[0].find(key);
999 return (it != fDatasetColumnReaders[0].end() && it->second);
1000}
1001
1003 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1004 const std::type_info &ti)
1005{
1006 const auto key = MakeDatasetColReadersKey(col, ti);
1007 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1008 assert(readers.size() == fNSlots);
1009
1010 for (auto slot = 0u; slot < fNSlots; ++slot) {
1011 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1012 }
1013}
1014
1015// Differently from AddDataSourceColumnReaders, this can be called from multiple threads concurrently
1016/// \brief Register a new RTreeColumnReader with this RLoopManager.
1017/// \return A shared pointer to the inserted column reader.
1019 std::unique_ptr<RColumnReaderBase> &&reader,
1020 const std::type_info &ti)
1021{
1023 const auto key = MakeDatasetColReadersKey(col, ti);
1024 // if a reader for this column and this slot was already there, we are doing something wrong
1025 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1026 auto *rptr = reader.get();
1027 readers[key] = std::move(reader);
1028 return rptr;
1029}
1030
1032 const std::type_info &ti, TTreeReader *treeReader)
1033{
1035 const auto key = MakeDatasetColReadersKey(col, ti);
1036 // if a reader for this column and this slot was already there, we are doing something wrong
1037 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1038 assert(fDataSource && "Missing RDataSource to add column reader.");
1039
1041
1042 return readers[key].get();
1043}
1044
1046RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1047{
1048 const auto key = MakeDatasetColReadersKey(col, ti);
1049 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1050 return it->second.get();
1051 else
1052 return nullptr;
1053}
1054
1056{
1057 if (callback)
1058 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1059}
1060
1061void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1062{
1063 fEmptyEntryRange = std::move(newRange);
1064}
1065
1067{
1068 fBeginEntry = begin;
1069 fEndEntry = end;
1070}
1071
1073{
1074 fTTreeLifeline = std::move(lifeline);
1075}
1076
1077std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1080{
1081 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1082 // Creating an RDataFrame with a non-existing file will throw early rather
1083 // than wait for the start of the graph execution.
1084 if (checkFile) {
1086 }
1087
1088 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1089 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1090 return lm;
1091}
1092
1093std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1094ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1095 const std::vector<std::string> &defaultColumns, bool checkFile)
1096{
1097 if (fileNameGlobs.size() == 0)
1098 throw std::invalid_argument("RDataFrame: empty list of input files.");
1099 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1100 // Creating an RDataFrame with a non-existing file will throw early rather
1101 // than wait for the start of the graph execution.
1102 if (checkFile) {
1104 }
1105 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1106 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1107 return lm;
1108}
1109
1110std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1113{
1114 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1115 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1116 return lm;
1117}
1118
1119std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1120ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1122{
1123 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1124 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1125 return lm;
1126}
1127
1128std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1131{
1132
1134
1135 if (inFile->Get<TTree>(datasetName.data())) {
1136 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1137 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1139 }
1140
1141 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1142 "\" in file \"" + inFile->GetName() + "\".");
1143}
1144
1145std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1146ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1148{
1149
1150 if (fileNameGlobs.size() == 0)
1151 throw std::invalid_argument("RDataFrame: empty list of input files.");
1152
1154
1155 if (inFile->Get<TTree>(datasetName.data())) {
1156 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1157 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1159 }
1160
1161 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1162 "\" in file \"" + inFile->GetName() + "\".");
1163}
1164
1165// outlined to pin virtual table
1167
1168void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1169{
1170 if (dataSource) {
1171 fDataSource = std::move(dataSource);
1172 fDataSource->SetNSlots(fNSlots);
1173 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1174 }
1175}
1176
1177void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1179 std::atomic<ULong64_t> &entryCount)
1180{
1181#ifdef R__USE_IMT
1183 const auto &slot = slotRAII.fSlot;
1184
1185 const auto &[start, end] = entryRange;
1186 const auto nEntries = end - start;
1187 entryCount.fetch_add(nEntries);
1188
1189 RCallCleanUpTask cleanup(*this, slot);
1190 RDSRangeRAII _{*this, slot, start};
1191
1192 fSampleInfos[slot] = ROOT::Internal::RDF::CreateSampleInfo(*fDataSource, slot, fSampleMap);
1193
1194 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1195
1196 try {
1197 for (auto entry = start; entry < end; ++entry) {
1198 if (fDataSource->SetEntry(slot, entry)) {
1199 RunAndCheckFilters(slot, entry);
1200 }
1201 }
1202 } catch (...) {
1203 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1204 throw;
1205 }
1206 fDataSource->FinalizeSlot(slot);
1207#else
1208 (void)entryRange;
1209 (void)slotStack;
1210 (void)entryCount;
1211#endif
1212}
1213
1215 std::atomic<ULong64_t> &entryCount)
1216{
1217#ifdef R__USE_IMT
1219 const auto &slot = slotRAII.fSlot;
1220
1221 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1222 const auto &[start, end] = entryRange;
1223 const auto nEntries = end - start;
1224 auto count = entryCount.fetch_add(nEntries);
1225
1226 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1227 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1228
1230 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1231 try {
1232 // recursive call to check filters and conditionally execute actions
1234 if (fNewSampleNotifier.CheckFlag(slot)) {
1235 UpdateSampleInfo(slot, treeReader);
1236 }
1237 RunAndCheckFilters(slot, count++);
1238 }
1239 } catch (...) {
1240 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1241 throw;
1242 }
1243 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1244 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1245 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1246 // something went wrong in the TTreeReader event loop
1247 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1248 std::to_string(treeReader.GetEntryStatus()));
1249 }
1250#else
1251 (void)treeReader;
1252 (void)slotStack;
1253 (void)entryCount;
1254#endif
1255}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
Basic types used by ROOT and required by TInterpreter.
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, std::string_view col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:67
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3765
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:138
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1287
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:89
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9416
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:42
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:660
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:653
unsigned int GetNSlots()
Definition RDFUtils.cxx:384
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:642
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:201
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:671
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:428
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:665
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
Namespace for new ROOT classes and functions.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.