Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
21#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
22#include "ROOT/RLogger.hxx"
23#include "ROOT/RNTuple.hxx"
24#include "ROOT/RNTupleDS.hxx"
25#include "RtypesCore.h" // Long64_t
26#include "TStopwatch.h"
27#include "TBranchElement.h"
28#include "TBranchObject.h"
29#include "TChain.h"
30#include "TEntryList.h"
31#include "TFile.h"
32#include "TFriendElement.h"
33#include "TInterpreter.h"
34#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
35#include "TTreeReader.h"
36#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
37
38#include "ROOT/RTTreeDS.hxx"
39
40#ifdef R__USE_IMT
43#include "ROOT/RSlotStack.hxx"
44#endif
45
46#ifdef R__UNIX
47// Functions needed to perform EOS XRootD redirection in ChangeSpec
48#include "TEnv.h"
49#include "TSystem.h"
50#ifndef R__FBSD
51#include <sys/xattr.h>
52#else
53#include <sys/extattr.h>
54#endif
55#ifdef R__MACOSX
56/* On macOS getxattr takes two extra arguments that should be set to 0 */
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
58#endif
59#ifdef R__FBSD
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
61#endif
62#endif
63
64#include <algorithm>
65#include <atomic>
66#include <cassert>
67#include <functional>
68#include <iostream>
69#include <memory>
70#include <stdexcept>
71#include <string>
72#include <sstream>
73#include <thread>
74#include <unordered_map>
75#include <vector>
76#include <set>
77#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
78
79using namespace ROOT::Detail::RDF;
80using namespace ROOT::Internal::RDF;
81
82namespace {
83/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
84/// This allows different RLoopManager instances to share these data.
85/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
86/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
87/// much faster than jitting each RLoopManager's code separately.
88std::string &GetCodeToJit()
89{
90 static std::string code;
91 return code;
92}
93
94std::string &GetCodeToDeclare()
95{
96 static std::string code;
97 return code;
98}
99
100// Signature of all helper functions that are created by JIT helpers, see
101// Book*Jit and JitBuildAction in RDFInterfaceUtils.cxx
102using JitHelperFunc_t = void (*)(const std::vector<std::string> &, ROOT::Internal::RDF::RColumnRegister &,
103 ROOT::Detail::RDF::RLoopManager &, void *, std::shared_ptr<void> *);
104std::unordered_map<std::size_t, JitHelperFunc_t> &GetJitHelperFuncMap()
105{
106 static std::unordered_map<std::size_t, JitHelperFunc_t> map;
107 return map;
108}
109std::unordered_map<std::size_t, std::size_t> &GetJitFuncBodyToFuncIdMap()
110{
111 static std::unordered_map<std::size_t, std::size_t> map;
112 return map;
113}
114
116{
117 // This function uses the interpreter and writes to the caches.
119
120 // Step 1: Declare the DeferredJitCall functions to the interpreter
121 // We use ProcessLine to ensure meta functionality (e.g. autoloading) is
122 // processed when needed.
123 // If instead we used Declare, builds with runtime_cxxmodules=OFF would fail
124 // in jitted actions with custom helpers with errors like:
125 // error: 'MyHelperType' is an incomplete type
126 // return std::make_unique<Action_t>(Helper_t(std::move(*h)), bl, std::move(prevNode), colRegister);
127 // ^
128 gInterpreter->ProcessLine(codeToDeclare.c_str());
129
130 // Step 2: Retrieve the declared functions as function pointers, cache them
131 // for later use in RunDeferredCalls
134 auto clinfo = gInterpreter->ClassInfo_Factory("R_rdf");
135 assert(gInterpreter->ClassInfo_IsValid(clinfo));
136
137 for (auto &codeAndId : funcBodyToFuncIdMap) {
138 if (auto it = funcIdToFuncPointersMap.find(codeAndId.second); it == funcIdToFuncPointersMap.end()) {
139 // fast fetch of the address via gInterpreter
140 // (faster than gInterpreter->Evaluate(function name, ret), ret->GetAsPointer())
141 // Retrieve the JIT helper function we registered via RegisterJitHelperCall
142 auto declid =
143 gInterpreter->GetFunction(clinfo, ("jitNodeRegistrator_" + std::to_string(codeAndId.second)).c_str());
144 assert(declid);
145 auto minfo = gInterpreter->MethodInfo_Factory(declid);
146 assert(gInterpreter->MethodInfo_IsValid(minfo));
147 auto mname = gInterpreter->MethodInfo_GetMangledName(minfo);
148 [[maybe_unused]] auto res = funcIdToFuncPointersMap.insert(
149 {codeAndId.second, reinterpret_cast<JitHelperFunc_t>(gInterpreter->FindSym(mname))});
150 assert(res.second);
151 gInterpreter->MethodInfo_Delete(minfo);
152 }
153 }
154 gInterpreter->ClassInfo_Delete(clinfo);
155}
156
157void ThrowIfNSlotsChanged(unsigned int nSlots)
158{
160 if (currentSlots != nSlots) {
161 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
162 std::to_string(nSlots) + ", but when starting the event loop it was " +
163 std::to_string(currentSlots) + ".";
164 if (currentSlots > nSlots)
165 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
166 else
167 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
168 throw std::runtime_error(msg);
169 }
170}
171
172/**
173\struct MaxTreeSizeRAII
174\brief Scope-bound change of `TTree::fgMaxTreeSize`.
175
176This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
177changes it to maximum at construction time and restores it back at destruction
178time. Needed for issue #6523 and should be reverted when #6640 will be solved.
179*/
180struct MaxTreeSizeRAII {
181 Long64_t fOldMaxTreeSize;
182
183 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
184 {
185 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
186 }
187
188 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
189};
190
191struct DatasetLogInfo {
192 std::string fDataSet;
193 ULong64_t fRangeStart;
194 ULong64_t fRangeEnd;
195 unsigned int fSlot;
196};
197
198std::string LogRangeProcessing(const DatasetLogInfo &info)
199{
200 std::stringstream msg;
201 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
202 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
203 return msg.str();
204}
205
206auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
207{
208 // We use a combination of column name and column type name as the key because in some cases we might end up
209 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
210 // df.Sum<vector<int>>("stdVectorBranch");
211 // df.Sum<RVecI>("stdVectorBranch");
212 return std::string(colName) + ':' + ti.name();
213}
214} // anonymous namespace
215
216/**
217 * \brief Helper function to open a file (or the first file from a glob).
218 * This function is used at construction time of an RDataFrame, to check the
219 * concrete type of the dataset stored inside the file.
220 */
221std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
222{
223 // Follow same logic in TChain::Add to find the correct string to look for globbing:
224 // - If the extension ".root" is present in the file name, pass along the basename.
225 // - If not, use the "?" token to delimit the part of the string which represents the basename.
226 // - Otherwise, pass the full filename.
227 auto &&baseNameAndQuery = [&fileNameGlob]() {
228 constexpr std::string_view delim{".root"};
229 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
230 it != fileNameGlob.end()) {
231 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
232 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
233 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
234 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
235 else
236 return std::make_pair(fileNameGlob, std::string_view{});
237 }();
238 // Captured structured bindings variable are only valid since C++20
239 auto &&baseName = baseNameAndQuery.first;
240 auto &&query = baseNameAndQuery.second;
241
242 std::string fileToOpen{fileNameGlob};
243 if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
244 const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
245 if (expanded.empty())
246 throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
247 "' did not match any files."};
248
249 fileToOpen = expanded.front() + std::string{query};
250 }
251
252 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
253 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
254 if (!inFile || inFile->IsZombie())
255 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
256
257 return inFile;
258}
259
260namespace ROOT {
261namespace Detail {
262namespace RDF {
263
264/// A RAII object that calls RLoopManager::CleanUpTask at destruction
276
277} // namespace RDF
278} // namespace Detail
279} // namespace ROOT
280
281ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
282 : fDefaultColumns(defaultColumns),
283 fNSlots(RDFInternal::GetNSlots()),
284 fNewSampleNotifier(fNSlots),
285 fSampleInfos(fNSlots),
286 fDatasetColumnReaders(fNSlots)
287{
288}
289
291 : fDefaultColumns(defaultBranches),
292 fNSlots(RDFInternal::GetNSlots()),
293 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
294 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
295 fNewSampleNotifier(fNSlots),
296 fSampleInfos(fNSlots),
297 fDatasetColumnReaders(fNSlots)
298{
299 fDataSource->SetNSlots(fNSlots);
300}
301
303 : fEmptyEntryRange(0, nEmptyEntries),
304 fNSlots(RDFInternal::GetNSlots()),
305 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
306 fNewSampleNotifier(fNSlots),
307 fSampleInfos(fNSlots),
308 fDatasetColumnReaders(fNSlots)
309{
310}
311
312RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
313 : fDefaultColumns(defaultBranches),
314 fNSlots(RDFInternal::GetNSlots()),
315 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
316 fDataSource(std::move(ds)),
317 fNewSampleNotifier(fNSlots),
318 fSampleInfos(fNSlots),
319 fDatasetColumnReaders(fNSlots)
320{
321 fDataSource->SetNSlots(fNSlots);
322}
323
325 : fNSlots(RDFInternal::GetNSlots()),
326 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
327 fNewSampleNotifier(fNSlots),
328 fSampleInfos(fNSlots),
329 fDatasetColumnReaders(fNSlots)
330{
331 ChangeSpec(std::move(spec));
332}
333
334#ifdef R__UNIX
335namespace {
336std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
337{
338 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
339 // If so, we create a redirected sample ID with the full xroot URL.
340 TString expandedUrl(path.data());
342 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
343 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
344 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
345 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
346 if (len > 0) {
347 std::string xurl(len, 0);
348 std::string fileNameFromUrl{fileurl.GetFile()};
349 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
350 // Sometimes the `getxattr` call may return an invalid URL due
351 // to the POSIX attribute not being yet completely filled by EOS.
352 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
353 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
354 return xurl + '/' + datasetName.data();
355 }
356 }
357 }
358 }
359 }
360
361 return std::nullopt;
362}
363} // namespace
364#endif
365
366/**
367 * @brief Changes the internal TTree held by the RLoopManager.
368 *
369 * @warning This method may lead to potentially dangerous interactions if used
370 * after the construction of the RDataFrame. Changing the specification makes
371 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
372 * new specification refers to exactly the same number of columns, with the
373 * same names and types. The actual use case of this method is moving the
374 * processing of the same RDataFrame to a different range of entries of the
375 * same dataset (which may be stored in a different set of files).
376 *
377 * @param spec The specification of the dataset to be adopted.
378 */
380{
381 auto filesVec = spec.GetFileNameGlobs();
383 filesVec[0]); // we only need the first file, we assume all files are either TTree or RNTuple
384 auto datasetName = spec.GetTreeNames();
385
386 // Change the range of entries to be processed
387 fBeginEntry = spec.GetEntryRangeBegin();
388 fEndEntry = spec.GetEntryRangeEnd();
389
390 // Store the samples
391 fSamples = spec.MoveOutSamples();
392 fSampleMap.clear();
393
394 const bool isTTree = inFile->Get<TTree>(datasetName[0].data());
395 const bool isRNTuple = inFile->Get<ROOT::RNTuple>(datasetName[0].data());
396
397 if (isTTree || isRNTuple) {
398
399 if (isTTree) {
400 // Create the internal main chain
402 for (auto &sample : fSamples) {
403 const auto &trees = sample.GetTreeNames();
404 const auto &files = sample.GetFileNameGlobs();
405 for (std::size_t i = 0ul; i < files.size(); ++i) {
406 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
407 // (see https://github.com/root-project/root/pull/8820 for why)
408 const auto fullpath = files[i] + "?#" + trees[i];
409 chain->Add(fullpath.c_str());
410 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
411 // change this easily because of backward compatibility: the sample ID
412 // is exposed to users via RSampleInfo and DefinePerSample).
413 const auto sampleId = files[i] + '/' + trees[i];
414 fSampleMap.insert({sampleId, &sample});
415#ifdef R__UNIX
416 // Also add redirected EOS xroot URL when available
418 fSampleMap.insert({redirectedSampleId.value(), &sample});
419#endif
420 }
421 }
422 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
423 } else if (isRNTuple) {
424
425 std::vector<std::string> fileNames;
426 std::set<std::string> rntupleNames;
427
428 for (auto &sample : fSamples) {
429 const auto &trees = sample.GetTreeNames();
430 const auto &files = sample.GetFileNameGlobs();
431 for (std::size_t i = 0ul; i < files.size(); ++i) {
432 const auto sampleId = files[i] + '/' + trees[i];
433 fSampleMap.insert({sampleId, &sample});
434 fileNames.push_back(files[i]);
435 rntupleNames.insert(trees[i]);
436
437#ifdef R__UNIX
438 // Also add redirected EOS xroot URL when available
440 fSampleMap.insert({redirectedSampleId.value(), &sample});
441#endif
442 }
443 }
444
445 if (rntupleNames.size() == 1) {
446 fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
447
448 } else {
449 throw std::runtime_error(
450 "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
451 }
452 }
453
454 fDataSource->SetNSlots(fNSlots);
455
456 for (unsigned int slot{}; slot < fNSlots; slot++) {
457 for (auto &v : fDatasetColumnReaders[slot])
458 v.second.reset();
459 }
460 } else {
461 throw std::invalid_argument(
462 "RDataFrame: unsupported data format for dataset. Make sure you use TTree or RNTuple.");
463 }
464}
465
466/// Run event loop with no source files, in parallel.
468{
469#ifdef R__USE_IMT
470 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack = SlotStack();
471 // Working with an empty tree.
472 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
473 const auto nEmptyEntries = GetNEmptyEntries();
474 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
475 auto remainder = nEmptyEntries % (fNSlots * 2);
476 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
477 ULong64_t begin = fEmptyEntryRange.first;
478 while (begin < fEmptyEntryRange.second) {
479 ULong64_t end = begin + nEntriesPerSlot;
480 if (remainder > 0) {
481 ++end;
482 --remainder;
483 }
484 entryRanges.emplace_back(begin, end);
485 begin = end;
486 }
487
488 // Each task will generate a subrange of entries
489 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
491 auto slot = slotRAII.fSlot;
492 RCallCleanUpTask cleanup(*this, slot);
493 InitNodeSlots(nullptr, slot);
494 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
495 try {
497 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
499 }
500 } catch (...) {
501 // Error might throw in experiment frameworks like CMSSW
502 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
503 throw;
504 }
505 };
506
508 pool.Foreach(genFunction, entryRanges);
509
510#endif // not implemented otherwise
511}
512
513/// Run event loop with no source files, in sequence.
515{
516 InitNodeSlots(nullptr, 0);
518 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
519 RCallCleanUpTask cleanup(*this);
520 try {
525 }
526 } catch (...) {
527 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
528 throw;
529 }
530}
531
532#ifdef R__USE_IMT
533namespace {
534/// Return true on succesful entry read.
535///
536/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
537/// there can be other situations where the read is still valid. For now, these
538/// are:
539/// - If there was no match of the current entry in one or more friend trees
540/// according to their respective indexes.
541/// - If there was a missing branch at the start of a new tree in the dataset.
542///
543/// In such situations, although the entry is not complete, the processing
544/// should not be aborted and nodes of the computation graph will take action
545/// accordingly.
547{
548 treeReader.Next();
549 switch (treeReader.GetEntryStatus()) {
550 case TTreeReader::kEntryValid: return true;
551 case TTreeReader::kIndexedFriendNoMatch: return true;
553 default: return false;
554 }
555}
556} // namespace
557#endif
558
559namespace {
560struct DSRunRAII {
562 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
563 {
565 }
566 ~DSRunRAII() { fDS.Finalize(); }
567};
568} // namespace
569
572 unsigned int fSlot;
575 TTreeReader *treeReader = nullptr)
576 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
577 {
578 fLM.InitNodeSlots(fTreeReader, fSlot);
579 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
580 }
582};
583
584/// Run event loop over data accessed through a DataSource, in sequence.
586{
587 assert(fDataSource != nullptr);
588 // Shortcut if the entry range would result in not reading anything
589 if (fBeginEntry == fEndEntry)
590 return;
591 // Apply global entry range if necessary
592 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
593 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
594 // Initialize data source and book finalization
596 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
597 // sources that need it (currently only TTree).
598 RCallCleanUpTask cleanup(*this);
599
600 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
601 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
602 // TTree).
603 std::uint64_t processedEntries{};
604 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
605 do {
606
608
609 ranges = fDataSource->GetEntryRanges();
610
612
613 try {
614 for (const auto &range : ranges) {
615 const auto start = range.first;
616 const auto end = range.second;
617 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
618 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
619 if (fDataSource->SetEntry(0u, entry)) {
621 }
623 }
624 }
625 } catch (...) {
626 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
627 throw;
628 }
629
630 } while (!ranges.empty() && fNStopsReceived < fNChildren);
631
633
634 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
635 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
636 std::ostringstream buf{};
637 buf << "RDataFrame stopped processing after ";
638 buf << processedEntries;
639 buf << " entries, whereas an entry range (begin=";
640 buf << fBeginEntry;
641 buf << ",end=";
642 buf << fEndEntry;
643 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
644 buf << (fBeginEntry + processedEntries);
645 buf << ".";
646 Warning("RDataFrame::Run", "%s", buf.str().c_str());
647 }
648}
649
650/// Run event loop over data accessed through a DataSource, in parallel.
652{
653#ifdef R__USE_IMT
654 assert(fDataSource != nullptr);
655 // Shortcut if the entry range would result in not reading anything
656 if (fBeginEntry == fEndEntry)
657 return;
658 // Apply global entry range if necessary
659 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
660 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
661
663
665
666#endif // not implemented otherwise (never called)
667}
668
669/// Execute actions and make sure named filters are called for each event.
670/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
672{
673 // data-block callbacks run before the rest of the graph
675 for (auto &callback : fSampleCallbacks)
676 callback.second(slot, fSampleInfos[slot]);
678 }
679
680 for (auto *actionPtr : fBookedActions)
681 actionPtr->Run(slot, entry);
683 namedFilterPtr->CheckFilters(slot, entry);
684 for (auto &callback : fCallbacksEveryNEvents)
685 callback(slot);
686}
687
688/// Build TTreeReaderValues for all nodes
689/// This method loops over all filters, actions and other booked objects and
690/// calls their `InitSlot` method, to get them ready for running a task.
692{
694 for (auto *ptr : fBookedActions)
695 ptr->InitSlot(r, slot);
696 for (auto *ptr : fBookedFilters)
697 ptr->InitSlot(r, slot);
698 for (auto *ptr : fBookedDefines)
699 ptr->InitSlot(r, slot);
700 for (auto *ptr : fBookedVariations)
701 ptr->InitSlot(r, slot);
702
703 for (auto &callback : fCallbacksOnce)
704 callback(slot);
705}
706
708 if (r != nullptr) {
709 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
710 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
711 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
712 }
713 // Whatever the data source, initially set the "new data block" flag:
714 // - for TChains, this ensures that we don't skip the first data block because
715 // the correct tree is already loaded
716 // - for RDataSources and empty sources, which currently don't have data blocks, this
717 // ensures that we run once per task
719}
720
721void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
723 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
724}
725
727 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
728 auto *tree = r.GetTree()->GetTree();
729 R__ASSERT(tree != nullptr);
730 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
731 auto *file = tree->GetCurrentFile();
732 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
733
734 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
735 R__ASSERT(range.first >= 0);
736 if (range.second == -1) {
737 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
738 }
739 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
740 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
741 if (fSampleMap.empty()) {
743 } else {
744 if (fSampleMap.find(id) == fSampleMap.end())
745 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
747 }
748}
749
750/// Create a slot stack with the desired number of slots or reuse a shared instance.
751/// When a LoopManager runs in isolation, it will create its own slot stack from the
752/// number of slots. When it runs as part of RunGraphs(), each loop manager will be
753/// assigned a shared slot stack, so dataframe helpers can be shared in a thread-safe
754/// manner.
755std::shared_ptr<ROOT::Internal::RSlotStack> RLoopManager::SlotStack() const
756{
757#ifdef R__USE_IMT
758 if (auto shared = fSlotStack.lock(); shared) {
759 return shared;
760 } else {
761 return std::make_shared<ROOT::Internal::RSlotStack>(fNSlots);
762 }
763#else
764 return nullptr;
765#endif
766}
767
768/// Initialize all nodes of the functional graph before running the event loop.
769/// This method is called once per event-loop and performs generic initialization
770/// operations that do not depend on the specific processing slot (i.e. operations
771/// that are common for all threads).
773{
775 for (auto *filter : fBookedFilters)
776 filter->InitNode();
777 for (auto *range : fBookedRanges)
778 range->InitNode();
779 for (auto *ptr : fBookedActions)
780 ptr->Initialize();
781}
782
783/// Perform clean-up operations. To be called at the end of each event loop.
785{
786 fMustRunNamedFilters = false;
787
788 // forget RActions and detach TResultProxies
789 for (auto *ptr : fBookedActions)
790 ptr->Finalize();
791
792 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
793 fBookedActions.clear();
794
795 // reset children counts
796 fNChildren = 0;
797 fNStopsReceived = 0;
798 for (auto *ptr : fBookedFilters)
799 ptr->ResetChildrenCount();
800 for (auto *ptr : fBookedRanges)
801 ptr->ResetChildrenCount();
802
804 fCallbacksOnce.clear();
805}
806
807/// Perform clean-up operations. To be called at the end of each task execution.
809{
810 if (r != nullptr)
811 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
812 for (auto *ptr : fBookedActions)
813 ptr->FinalizeSlot(slot);
814 for (auto *ptr : fBookedFilters)
815 ptr->FinalizeSlot(slot);
816 for (auto *ptr : fBookedDefines)
817 ptr->FinalizeSlot(slot);
818
819 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
820 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
821 // because the TTreeReader object changes at every task
822 for (auto &v : fDatasetColumnReaders[slot])
823 v.second.reset();
824 }
825}
826
827/// Add RDF nodes that require just-in-time compilation to the computation graph.
828/// This method also clears the contents of GetCodeToJit().
830{
831 {
833 if (GetCodeToJit().empty() && GetCodeToDeclare().empty()) {
835 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
836 return;
837 }
838 }
839
840 std::string codeToDeclare, code;
841 {
844 code.swap(GetCodeToJit());
845 };
846
847 TStopwatch s;
848 s.Start();
849 if (!codeToDeclare.empty()) {
851 }
852 if (!code.empty()) {
853 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
854 }
855 s.Stop();
856 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
857 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
858 : " in less than 1ms.");
859
861}
862
864{
865 if (!fJitHelperCalls.empty()) {
866 // funcMap is not thread-safe
868 TStopwatch s;
869 s.Start();
870 const auto &funcMap = GetJitHelperFuncMap();
871 for (auto &call : fJitHelperCalls) {
872 funcMap.at(call.fFunctionId)(call.fColNames, *call.fColRegister, *this, call.fJittedNode.get(),
873 &call.fExtraArgs);
874 }
875 s.Stop();
876 const auto realTime = s.RealTime();
877 R__LOG_INFO(RDFLogChannel()) << fJitHelperCalls.size() << " deferred calls completed"
878 << (realTime > 1e-3 ? " in " + std::to_string(realTime) + " seconds."
879 : " in less than 1ms.");
880 // Promoting to write lock to clear the vector
882 fJitHelperCalls.clear();
883 }
884}
885
886/// Trigger counting of number of children nodes for each node of the functional graph.
887/// This is done once before starting the event loop. Each action sends an `increase children count` signal
888/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
889/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
890/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
891/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
893{
894 for (auto *actionPtr : fBookedActions)
895 actionPtr->TriggerChildrenCount();
897 namedFilterPtr->TriggerChildrenCount();
898}
899
900/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
901/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
902/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
904{
905 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
906 MaxTreeSizeRAII ctxtmts;
907
908 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
909
911
912 if (jit)
913 Jit();
914
915 // Called here since in a RunGraphs run, multiple RLoopManager runs could be
916 // triggered from different threads.
918
919 InitNodes();
920
921 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
922 // we use RAII: even in case of an exception, the destructor of the object is invoked and
923 // all the cleanup takes place.
924 class NodesCleanerRAII {
926
927 public:
929 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
930 };
931
933
934 TStopwatch s;
935 s.Start();
936
937 switch (fLoopType) {
939 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
940 break;
943 case ELoopType::kNoFiles: RunEmptySource(); break;
945 }
946 s.Stop();
947
948 fNRuns++;
949
950 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
951 << s.RealTime() << "s elapsed).";
952}
953
954/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
959
965
972
974{
975 fBookedFilters.emplace_back(filterPtr);
976 if (filterPtr->HasName()) {
977 fBookedNamedFilters.emplace_back(filterPtr);
979 }
980}
981
987
992
997
999{
1000 fBookedDefines.emplace_back(ptr);
1001}
1002
1008
1013
1018
1019// dummy call, end of recursive chain of calls
1021{
1022 return true;
1023}
1024
1025/// Call `FillReport` on all booked filters
1027{
1028 for (const auto *fPtr : fBookedNamedFilters)
1029 fPtr->FillReport(rep);
1030}
1031
1032void RLoopManager::ToJitExec(const std::string &code) const
1033{
1035 GetCodeToJit().append(code);
1036}
1037
1039 std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegister,
1040 const std::vector<std::string> &colNames, std::shared_ptr<void> jittedNode,
1041 std::shared_ptr<void> argument)
1042{
1044 {
1046 auto match = funcBodyToFuncIdMap.find(fStringHasher(funcBody));
1047 if (match != funcBodyToFuncIdMap.end()) {
1048 R__WRITE_LOCKGUARD(ROOT::gCoreMutex); // modifying fJitHelperCalls
1049 std::string funcName = "jitNodeRegistrator_" + std::to_string(match->second);
1050 R__LOG_DEBUG(0, RDFLogChannel()) << "JIT helper " << funcName << " was already registered.";
1051 fJitHelperCalls.emplace_back(match->second, std::move(colRegister), colNames, jittedNode, argument);
1052 return;
1053 }
1054 }
1055
1056 {
1057 // Register lazily a JIT helper
1059 auto registratorId = funcBodyToFuncIdMap.size();
1060 std::string funcName = "jitNodeRegistrator_" + std::to_string(registratorId);
1062 assert(res.second);
1063
1064 std::string toDeclare = "namespace R_rdf {\n void " + funcName + funcBody + "\n}\n";
1065 R__LOG_DEBUG(0, RDFLogChannel()) << "Registering deferred JIT helper:\n" << toDeclare;
1066
1067 GetCodeToDeclare().append(toDeclare);
1069 }
1070}
1071
1072void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1073{
1074 if (everyNEvents == 0ull)
1075 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1076 else
1077 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1078}
1079
1080std::vector<std::string> RLoopManager::GetFiltersNames()
1081{
1082 std::vector<std::string> filters;
1083 for (auto *filter : fBookedFilters) {
1084 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1085 filters.push_back(name);
1086 }
1087 return filters;
1088}
1089
1090std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1091{
1092 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1093 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1094 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1095 return nodes;
1096}
1097
1098std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1099{
1100 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1101 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1102 std::copy(fRunActions.begin(), fRunActions.end(), it);
1103 return actions;
1104}
1105
1106std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1107 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1108{
1109 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1110 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1112 return duplicateRLoopManagerIt->second;
1113
1114 std::string name;
1115 if (fDataSource) {
1116 name = fDataSource->GetLabel();
1117 } else {
1118 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1119 }
1120 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1122 visitedMap[(void *)this] = thisNode;
1123 return thisNode;
1124}
1125
1126/// Return true if AddDataSourceColumnReaders was called for column name col.
1127bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
1128{
1129 const auto key = MakeDatasetColReadersKey(col, ti);
1130 assert(fDataSource != nullptr);
1131 // since data source column readers are always added for all slots at the same time,
1132 // if the reader is present for slot 0 we have it for all other slots as well.
1133 auto it = fDatasetColumnReaders[0].find(key);
1134 return (it != fDatasetColumnReaders[0].end() && it->second);
1135}
1136
1138 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1139 const std::type_info &ti)
1140{
1141 const auto key = MakeDatasetColReadersKey(col, ti);
1142 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1143 assert(readers.size() == fNSlots);
1144
1145 for (auto slot = 0u; slot < fNSlots; ++slot) {
1146 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1147 }
1148}
1149
1151 const std::type_info &ti, TTreeReader *treeReader)
1152{
1154 const auto key = MakeDatasetColReadersKey(col, ti);
1155 // if a reader for this column and this slot was already there, we are doing something wrong
1156 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1157 assert(fDataSource && "Missing RDataSource to add column reader.");
1158
1160
1161 return readers[key].get();
1162}
1163
1165RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1166{
1167 const auto key = MakeDatasetColReadersKey(col, ti);
1168 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1169 return it->second.get();
1170 else
1171 return nullptr;
1172}
1173
1175{
1176 if (callback)
1177 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1178}
1179
1180void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1181{
1182 fEmptyEntryRange = std::move(newRange);
1183}
1184
1186{
1187 fBeginEntry = begin;
1188 fEndEntry = end;
1189}
1190
1192{
1193 fTTreeLifeline = std::move(lifeline);
1194}
1195
1196std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1199{
1200 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1201 // Creating an RDataFrame with a non-existing file will throw early rather
1202 // than wait for the start of the graph execution.
1203 if (checkFile) {
1205 }
1206
1207 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1208 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1209 return lm;
1210}
1211
1212std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1213ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1214 const std::vector<std::string> &defaultColumns, bool checkFile)
1215{
1216 if (fileNameGlobs.size() == 0)
1217 throw std::invalid_argument("RDataFrame: empty list of input files.");
1218 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1219 // Creating an RDataFrame with a non-existing file will throw early rather
1220 // than wait for the start of the graph execution.
1221 if (checkFile) {
1223 }
1224 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1225 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1226 return lm;
1227}
1228
1229std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1232{
1233 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1234 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1235 return lm;
1236}
1237
1238std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1239ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1241{
1242 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1243 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1244 return lm;
1245}
1246
1247std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1250{
1251
1253
1254 if (inFile->Get<TTree>(datasetName.data())) {
1255 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1256 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1258 }
1259
1260 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1261 "\" in file \"" + inFile->GetName() + "\".");
1262}
1263
1264std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1265ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1267{
1268
1269 if (fileNameGlobs.size() == 0)
1270 throw std::invalid_argument("RDataFrame: empty list of input files.");
1271
1273
1274 if (inFile->Get<TTree>(datasetName.data())) {
1275 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1276 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1278 }
1279
1280 throw std::invalid_argument("RDataFrame: unsupported data format for dataset \"" + std::string(datasetName) +
1281 "\" in file \"" + inFile->GetName() + "\".");
1282}
1283
1284// outlined to pin virtual table
1286
1287void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1288{
1289 if (dataSource) {
1290 fDataSource = std::move(dataSource);
1291 fDataSource->SetNSlots(fNSlots);
1292 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1293 }
1294}
1295
1296void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1298 std::atomic<ULong64_t> &entryCount)
1299{
1300#ifdef R__USE_IMT
1302 const auto &slot = slotRAII.fSlot;
1303
1304 const auto &[start, end] = entryRange;
1305 const auto nEntries = end - start;
1306 entryCount.fetch_add(nEntries);
1307
1308 RCallCleanUpTask cleanup(*this, slot);
1309 RDSRangeRAII _{*this, slot, start};
1310
1311 fSampleInfos[slot] = ROOT::Internal::RDF::CreateSampleInfo(*fDataSource, slot, fSampleMap);
1312
1313 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1314
1315 try {
1316 for (auto entry = start; entry < end; ++entry) {
1317 if (fDataSource->SetEntry(slot, entry)) {
1318 RunAndCheckFilters(slot, entry);
1319 }
1320 }
1321 } catch (...) {
1322 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1323 throw;
1324 }
1325 fDataSource->FinalizeSlot(slot);
1326#else
1327 (void)entryRange;
1328 (void)slotStack;
1329 (void)entryCount;
1330#endif
1331}
1332
1334 std::atomic<ULong64_t> &entryCount)
1335{
1336#ifdef R__USE_IMT
1338 const auto &slot = slotRAII.fSlot;
1339
1340 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1341 const auto &[start, end] = entryRange;
1342 const auto nEntries = end - start;
1343 auto count = entryCount.fetch_add(nEntries);
1344
1345 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1346 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1347
1349 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1350 try {
1351 // recursive call to check filters and conditionally execute actions
1353 if (fNewSampleNotifier.CheckFlag(slot)) {
1354 UpdateSampleInfo(slot, treeReader);
1355 }
1356 RunAndCheckFilters(slot, count++);
1357 }
1358 } catch (...) {
1359 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1360 throw;
1361 }
1362 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1363 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1364 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1365 // something went wrong in the TTreeReader event loop
1366 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1367 std::to_string(treeReader.GetEntryStatus()));
1368 }
1369#else
1370 (void)treeReader;
1371 (void)slotStack;
1372 (void)entryCount;
1373#endif
1374}
1375
1378
1379ROOT::Detail::RDF::RLoopManager::DeferredJitCall &ROOT::Detail::RDF::RLoopManager::DeferredJitCall::operator=(
1380 ROOT::Detail::RDF::RLoopManager::DeferredJitCall &&) noexcept = default;
1381
1382ROOT::Detail::RDF::RLoopManager::DeferredJitCall::~DeferredJitCall() = default;
1383
1385 std::size_t id, std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegisterPtr,
1386 const std::vector<std::string> &colNamesArg, std::shared_ptr<void> jittedNode, std::shared_ptr<void> argPtr)
1387 : fFunctionId(id),
1388 fColRegister(std::move(colRegisterPtr)),
1389 fColNames(colNamesArg),
1390 fJittedNode(jittedNode),
1391 fExtraArgs(argPtr)
1392{
1393 assert(fJittedNode != nullptr);
1394}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
Basic types used by ROOT and required by TInterpreter.
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
#define gInterpreter
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void RegisterJitHelperCall(const std::string &funcBody, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > colRegister, const std::vector< std::string > &colnames, std::shared_ptr< void > jittedNode, std::shared_ptr< void > argument=nullptr)
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::hash< std::string > fStringHasher
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
std::vector< DeferredJitCall > fJitHelperCalls
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
A binder for user-defined columns, variations and aliases.
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:67
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3764
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:138
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1285
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:89
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9462
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:43
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:673
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:666
unsigned int GetNSlots()
Definition RDFUtils.cxx:397
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:655
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:204
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:684
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:441
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:678
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
DeferredJitCall(std::size_t id, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > cols, const std::vector< std::string > &colNamesArg, std::shared_ptr< void > jittedNode, std::shared_ptr< void > arg)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.