Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
21#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
22#include "ROOT/RLogger.hxx"
23#include "ROOT/RNTuple.hxx"
24#include "ROOT/RNTupleDS.hxx"
25#include "RtypesCore.h" // Long64_t
26#include "TStopwatch.h"
27#include "TBranchElement.h"
28#include "TBranchObject.h"
29#include "TChain.h"
30#include "TEntryList.h"
31#include "TFile.h"
32#include "TFriendElement.h"
33#include "TInterpreter.h"
34#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
35#include "TTreeReader.h"
36#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
37
38#include "ROOT/RTTreeDS.hxx"
39
40#ifdef R__USE_IMT
43#include "ROOT/RSlotStack.hxx"
44#endif
45
46#ifdef R__UNIX
47// Functions needed to perform EOS XRootD redirection in ChangeSpec
48#include "TEnv.h"
49#include "TSystem.h"
50#ifndef R__FBSD
51#include <sys/xattr.h>
52#else
53#include <sys/extattr.h>
54#endif
55#ifdef R__MACOSX
56/* On macOS getxattr takes two extra arguments that should be set to 0 */
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
58#endif
59#ifdef R__FBSD
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
61#endif
62#endif
63
64#include <algorithm>
65#include <atomic>
66#include <cassert>
67#include <functional>
68#include <iostream>
69#include <memory>
70#include <stdexcept>
71#include <string>
72#include <sstream>
73#include <thread>
74#include <unordered_map>
75#include <vector>
76#include <set>
77#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
78
79using namespace ROOT::Detail::RDF;
80using namespace ROOT::Internal::RDF;
81
82namespace {
83/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
84/// This allows different RLoopManager instances to share these data.
85/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
86/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
87/// much faster than jitting each RLoopManager's code separately.
88std::string &GetCodeToJit()
89{
90 static std::string code;
91 return code;
92}
93
94std::string &GetCodeToDeclare()
95{
96 static std::string code;
97 return code;
98}
99
100// Signature of all helper functions that are created by JIT helpers, see
101// Book*Jit and JitBuildAction in RDFInterfaceUtils.cxx
102using JitHelperFunc_t = void (*)(const std::vector<std::string> &, ROOT::Internal::RDF::RColumnRegister &,
103 ROOT::Detail::RDF::RLoopManager &, void *, std::shared_ptr<void> *);
104std::unordered_map<std::size_t, JitHelperFunc_t> &GetJitHelperFuncMap()
105{
106 static std::unordered_map<std::size_t, JitHelperFunc_t> map;
107 return map;
108}
109std::unordered_map<std::size_t, std::size_t> &GetJitFuncBodyToFuncIdMap()
110{
111 static std::unordered_map<std::size_t, std::size_t> map;
112 return map;
113}
114
116{
117 // This function uses the interpreter and writes to the caches.
119
120 // Step 1: Declare the DeferredJitCall functions to the interpreter
121 // We use ProcessLine to ensure meta functionality (e.g. autoloading) is
122 // processed when needed.
123 // If instead we used Declare, builds with runtime_cxxmodules=OFF would fail
124 // in jitted actions with custom helpers with errors like:
125 // error: 'MyHelperType' is an incomplete type
126 // return std::make_unique<Action_t>(Helper_t(std::move(*h)), bl, std::move(prevNode), colRegister);
127 // ^
128 gInterpreter->ProcessLine(codeToDeclare.c_str());
129
130 // Step 2: Retrieve the declared functions as function pointers, cache them
131 // for later use in RunDeferredCalls
134 auto clinfo = gInterpreter->ClassInfo_Factory("R_rdf");
135 assert(gInterpreter->ClassInfo_IsValid(clinfo));
136
137 for (auto &codeAndId : funcBodyToFuncIdMap) {
138 if (auto it = funcIdToFuncPointersMap.find(codeAndId.second); it == funcIdToFuncPointersMap.end()) {
139 // fast fetch of the address via gInterpreter
140 // (faster than gInterpreter->Evaluate(function name, ret), ret->GetAsPointer())
141 // Retrieve the JIT helper function we registered via RegisterJitHelperCall
142 auto declid =
143 gInterpreter->GetFunction(clinfo, ("jitNodeRegistrator_" + std::to_string(codeAndId.second)).c_str());
144 assert(declid);
145 auto minfo = gInterpreter->MethodInfo_Factory(declid);
146 assert(gInterpreter->MethodInfo_IsValid(minfo));
147 auto mname = gInterpreter->MethodInfo_GetMangledName(minfo);
148 [[maybe_unused]] auto res = funcIdToFuncPointersMap.insert(
149 {codeAndId.second, reinterpret_cast<JitHelperFunc_t>(gInterpreter->FindSym(mname))});
150 assert(res.second);
151 gInterpreter->MethodInfo_Delete(minfo);
152 }
153 }
154 gInterpreter->ClassInfo_Delete(clinfo);
155}
156
157void ThrowIfNSlotsChanged(unsigned int nSlots)
158{
160 if (currentSlots != nSlots) {
161 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
162 std::to_string(nSlots) + ", but when starting the event loop it was " +
163 std::to_string(currentSlots) + ".";
164 if (currentSlots > nSlots)
165 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
166 else
167 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
168 throw std::runtime_error(msg);
169 }
170}
171
172/**
173\struct MaxTreeSizeRAII
174\brief Scope-bound change of `TTree::fgMaxTreeSize`.
175
176This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
177changes it to maximum at construction time and restores it back at destruction
178time. Needed for issue #6523 and should be reverted when #6640 will be solved.
179*/
180struct MaxTreeSizeRAII {
181 Long64_t fOldMaxTreeSize;
182
183 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
184 {
185 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
186 }
187
188 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
189};
190
191struct DatasetLogInfo {
192 std::string fDataSet;
193 ULong64_t fRangeStart;
194 ULong64_t fRangeEnd;
195 unsigned int fSlot;
196};
197
198std::string LogRangeProcessing(const DatasetLogInfo &info)
199{
200 std::stringstream msg;
201 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
202 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
203 return msg.str();
204}
205
206auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
207{
208 // We use a combination of column name and column type name as the key because in some cases we might end up
209 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
210 // df.Sum<vector<int>>("stdVectorBranch");
211 // df.Sum<RVecI>("stdVectorBranch");
212 return std::string(colName) + ':' + ti.name();
213}
214} // anonymous namespace
215
216/**
217 * \brief Helper function to open a file (or the first file from a glob).
218 * This function is used at construction time of an RDataFrame, to check the
219 * concrete type of the dataset stored inside the file.
220 */
221std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
222{
223 // Follow same logic in TChain::Add to find the correct string to look for globbing:
224 // - If the extension ".root" is present in the file name, pass along the basename.
225 // - If not, use the "?" token to delimit the part of the string which represents the basename.
226 // - Otherwise, pass the full filename.
227 auto &&baseNameAndQuery = [&fileNameGlob]() {
228 constexpr std::string_view delim{".root"};
229 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
230 it != fileNameGlob.end()) {
231 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
232 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
233 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
234 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
235 else
236 return std::make_pair(fileNameGlob, std::string_view{});
237 }();
238 // Captured structured bindings variable are only valid since C++20
239 auto &&baseName = baseNameAndQuery.first;
240 auto &&query = baseNameAndQuery.second;
241
242 std::string fileToOpen{fileNameGlob};
243 if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
244 const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
245 if (expanded.empty())
246 throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
247 "' did not match any files."};
248
249 fileToOpen = expanded.front() + std::string{query};
250 }
251
252 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
253 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
254 if (!inFile || inFile->IsZombie())
255 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
256
257 return inFile;
258}
259
260namespace ROOT {
261namespace Detail {
262namespace RDF {
263
264/// A RAII object that calls RLoopManager::CleanUpTask at destruction
276
277} // namespace RDF
278} // namespace Detail
279} // namespace ROOT
280
281ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
282 : fDefaultColumns(defaultColumns),
283 fNSlots(RDFInternal::GetNSlots()),
284 fNewSampleNotifier(fNSlots),
285 fSampleInfos(fNSlots),
286 fDatasetColumnReaders(fNSlots)
287{
288}
289
291 : fDefaultColumns(defaultBranches),
292 fNSlots(RDFInternal::GetNSlots()),
293 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
294 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
295 fNewSampleNotifier(fNSlots),
296 fSampleInfos(fNSlots),
297 fDatasetColumnReaders(fNSlots)
298{
299 fDataSource->SetNSlots(fNSlots);
300}
301
303 : fEmptyEntryRange(0, nEmptyEntries),
304 fNSlots(RDFInternal::GetNSlots()),
305 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
306 fNewSampleNotifier(fNSlots),
307 fSampleInfos(fNSlots),
308 fDatasetColumnReaders(fNSlots)
309{
310}
311
312RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
313 : fDefaultColumns(defaultBranches),
314 fNSlots(RDFInternal::GetNSlots()),
315 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
316 fDataSource(std::move(ds)),
317 fNewSampleNotifier(fNSlots),
318 fSampleInfos(fNSlots),
319 fDatasetColumnReaders(fNSlots)
320{
321 fDataSource->SetNSlots(fNSlots);
322}
323
325 : fNSlots(RDFInternal::GetNSlots()),
326 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
327 fNewSampleNotifier(fNSlots),
328 fSampleInfos(fNSlots),
329 fDatasetColumnReaders(fNSlots)
330{
331 ChangeSpec(std::move(spec));
332}
333
334#ifdef R__UNIX
335namespace {
336std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
337{
338 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
339 // If so, we create a redirected sample ID with the full xroot URL.
340 TString expandedUrl(path.data());
342 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
343 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
344 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
345 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
346 if (len > 0) {
347 std::string xurl(len, 0);
348 std::string fileNameFromUrl{fileurl.GetFile()};
349 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
350 // Sometimes the `getxattr` call may return an invalid URL due
351 // to the POSIX attribute not being yet completely filled by EOS.
352 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
353 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
354 return xurl + '/' + datasetName.data();
355 }
356 }
357 }
358 }
359 }
360
361 return std::nullopt;
362}
363} // namespace
364#endif
365
366/**
367 * @brief Changes the internal TTree held by the RLoopManager.
368 *
369 * @warning This method may lead to potentially dangerous interactions if used
370 * after the construction of the RDataFrame. Changing the specification makes
371 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
372 * new specification refers to exactly the same number of columns, with the
373 * same names and types. The actual use case of this method is moving the
374 * processing of the same RDataFrame to a different range of entries of the
375 * same dataset (which may be stored in a different set of files).
376 *
377 * @param spec The specification of the dataset to be adopted.
378 */
380{
381 auto filesVec = spec.GetFileNameGlobs();
383 filesVec[0]); // we only need the first file, we assume all files are either TTree or RNTuple
384 auto datasetName = spec.GetTreeNames();
385
386 // Change the range of entries to be processed
387 fBeginEntry = spec.GetEntryRangeBegin();
388 fEndEntry = spec.GetEntryRangeEnd();
389
390 // Store the samples
391 fSamples = spec.MoveOutSamples();
392 fSampleMap.clear();
393
394 const bool isTTree = inFile->Get<TTree>(datasetName[0].data());
395 const bool isRNTuple = inFile->Get<ROOT::RNTuple>(datasetName[0].data());
396
397 if (isTTree || isRNTuple) {
398
399 if (isTTree) {
400 // Create the internal main chain
402 for (auto &sample : fSamples) {
403 const auto &trees = sample.GetTreeNames();
404 const auto &files = sample.GetFileNameGlobs();
405 for (std::size_t i = 0ul; i < files.size(); ++i) {
406 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
407 // (see https://github.com/root-project/root/pull/8820 for why)
408 const auto fullpath = files[i] + "?#" + trees[i];
409 chain->Add(fullpath.c_str());
410 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
411 // change this easily because of backward compatibility: the sample ID
412 // is exposed to users via RSampleInfo and DefinePerSample).
413 const auto sampleId = files[i] + '/' + trees[i];
414 fSampleMap.insert({sampleId, &sample});
415#ifdef R__UNIX
416 // Also add redirected EOS xroot URL when available
418 fSampleMap.insert({redirectedSampleId.value(), &sample});
419#endif
420 }
421 }
422 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
423 } else if (isRNTuple) {
424
425 std::vector<std::string> fileNames;
426 std::set<std::string> rntupleNames;
427
428 for (auto &sample : fSamples) {
429 const auto &trees = sample.GetTreeNames();
430 const auto &files = sample.GetFileNameGlobs();
431 for (std::size_t i = 0ul; i < files.size(); ++i) {
432 const auto sampleId = files[i] + '/' + trees[i];
433 fSampleMap.insert({sampleId, &sample});
434 fileNames.push_back(files[i]);
435 rntupleNames.insert(trees[i]);
436
437#ifdef R__UNIX
438 // Also add redirected EOS xroot URL when available
440 fSampleMap.insert({redirectedSampleId.value(), &sample});
441#endif
442 }
443 }
444
445 if (rntupleNames.size() == 1) {
446 fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
447
448 } else {
449 throw std::runtime_error(
450 "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
451 }
452 }
453
454 fDataSource->SetNSlots(fNSlots);
455
456 for (unsigned int slot{}; slot < fNSlots; slot++) {
457 for (auto &v : fDatasetColumnReaders[slot])
458 v.second.reset();
459 }
460 } else {
461 std::string errMsg = inFile->Get(datasetName[0].data()) ? "unsupported data format for" : "cannot find";
462 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName[0]) + "\" in file \"" +
463 inFile->GetName() + "\".");
464 }
465}
466
467/// Run event loop with no source files, in parallel.
469{
470#ifdef R__USE_IMT
471 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack = SlotStack();
472 // Working with an empty tree.
473 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
474 const auto nEmptyEntries = GetNEmptyEntries();
475 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
476 auto remainder = nEmptyEntries % (fNSlots * 2);
477 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
478 ULong64_t begin = fEmptyEntryRange.first;
479 while (begin < fEmptyEntryRange.second) {
480 ULong64_t end = begin + nEntriesPerSlot;
481 if (remainder > 0) {
482 ++end;
483 --remainder;
484 }
485 entryRanges.emplace_back(begin, end);
486 begin = end;
487 }
488
489 // Each task will generate a subrange of entries
490 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
492 auto slot = slotRAII.fSlot;
493 RCallCleanUpTask cleanup(*this, slot);
494 InitNodeSlots(nullptr, slot);
495 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
496 try {
498 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
500 }
501 } catch (...) {
502 // Error might throw in experiment frameworks like CMSSW
503 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
504 throw;
505 }
506 };
507
509 pool.Foreach(genFunction, entryRanges);
510
511#endif // not implemented otherwise
512}
513
514/// Run event loop with no source files, in sequence.
516{
517 InitNodeSlots(nullptr, 0);
519 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
520 RCallCleanUpTask cleanup(*this);
521 try {
526 }
527 } catch (...) {
528 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
529 throw;
530 }
531}
532
533#ifdef R__USE_IMT
534namespace {
535/// Return true on succesful entry read.
536///
537/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
538/// there can be other situations where the read is still valid. For now, these
539/// are:
540/// - If there was no match of the current entry in one or more friend trees
541/// according to their respective indexes.
542/// - If there was a missing branch at the start of a new tree in the dataset.
543///
544/// In such situations, although the entry is not complete, the processing
545/// should not be aborted and nodes of the computation graph will take action
546/// accordingly.
548{
549 treeReader.Next();
550 switch (treeReader.GetEntryStatus()) {
551 case TTreeReader::kEntryValid: return true;
552 case TTreeReader::kIndexedFriendNoMatch: return true;
554 default: return false;
555 }
556}
557} // namespace
558#endif
559
560namespace {
561struct DSRunRAII {
563 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
564 {
566 }
567 ~DSRunRAII() { fDS.Finalize(); }
568};
569} // namespace
570
573 unsigned int fSlot;
576 TTreeReader *treeReader = nullptr)
577 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
578 {
579 fLM.InitNodeSlots(fTreeReader, fSlot);
580 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
581 }
583};
584
585/// Run event loop over data accessed through a DataSource, in sequence.
587{
588 assert(fDataSource != nullptr);
589 // Shortcut if the entry range would result in not reading anything
590 if (fBeginEntry == fEndEntry)
591 return;
592 // Apply global entry range if necessary
593 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
594 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
595 // Initialize data source and book finalization
597 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
598 // sources that need it (currently only TTree).
599 RCallCleanUpTask cleanup(*this);
600
601 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
602 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
603 // TTree).
604 std::uint64_t processedEntries{};
605 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
606 do {
607
609
610 ranges = fDataSource->GetEntryRanges();
611
613
614 try {
615 for (const auto &range : ranges) {
616 const auto start = range.first;
617 const auto end = range.second;
618 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
619 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
620 if (fDataSource->SetEntry(0u, entry)) {
622 }
624 }
625 }
626 } catch (...) {
627 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
628 throw;
629 }
630
631 } while (!ranges.empty() && fNStopsReceived < fNChildren);
632
634
635 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
636 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
637 std::ostringstream buf{};
638 buf << "RDataFrame stopped processing after ";
639 buf << processedEntries;
640 buf << " entries, whereas an entry range (begin=";
641 buf << fBeginEntry;
642 buf << ",end=";
643 buf << fEndEntry;
644 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
645 buf << (fBeginEntry + processedEntries);
646 buf << ".";
647 Warning("RDataFrame::Run", "%s", buf.str().c_str());
648 }
649}
650
651/// Run event loop over data accessed through a DataSource, in parallel.
653{
654#ifdef R__USE_IMT
655 assert(fDataSource != nullptr);
656 // Shortcut if the entry range would result in not reading anything
657 if (fBeginEntry == fEndEntry)
658 return;
659 // Apply global entry range if necessary
660 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
661 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
662
664
666
667#endif // not implemented otherwise (never called)
668}
669
670/// Execute actions and make sure named filters are called for each event.
671/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
673{
674 // data-block callbacks run before the rest of the graph
676 for (auto &callback : fSampleCallbacks)
677 callback.second(slot, fSampleInfos[slot]);
679 }
680
681 for (auto *actionPtr : fBookedActions)
682 actionPtr->Run(slot, entry);
684 namedFilterPtr->CheckFilters(slot, entry);
685 for (auto &callback : fCallbacksEveryNEvents)
686 callback(slot);
687}
688
689/// Build TTreeReaderValues for all nodes
690/// This method loops over all filters, actions and other booked objects and
691/// calls their `InitSlot` method, to get them ready for running a task.
693{
695 for (auto *ptr : fBookedActions)
696 ptr->InitSlot(r, slot);
697 for (auto *ptr : fBookedFilters)
698 ptr->InitSlot(r, slot);
699 for (auto *ptr : fBookedDefines)
700 ptr->InitSlot(r, slot);
701 for (auto *ptr : fBookedVariations)
702 ptr->InitSlot(r, slot);
703
704 for (auto &callback : fCallbacksOnce)
705 callback(slot);
706}
707
709 if (r != nullptr) {
710 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
711 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
712 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
713 }
714 // Whatever the data source, initially set the "new data block" flag:
715 // - for TChains, this ensures that we don't skip the first data block because
716 // the correct tree is already loaded
717 // - for RDataSources and empty sources, which currently don't have data blocks, this
718 // ensures that we run once per task
720}
721
722void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
724 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
725}
726
728 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
729 auto *tree = r.GetTree()->GetTree();
730 R__ASSERT(tree != nullptr);
731 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
732 auto *file = tree->GetCurrentFile();
733 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
734
735 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
736 R__ASSERT(range.first >= 0);
737 if (range.second == -1) {
738 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
739 }
740 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
741 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
742 if (fSampleMap.empty()) {
743 fSampleInfos[slot] = RSampleInfo(id, range, nullptr, tree->GetEntries());
744 } else {
745 if (fSampleMap.find(id) == fSampleMap.end())
746 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
747 fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id], tree->GetEntries());
748 }
749}
750
751/// Create a slot stack with the desired number of slots or reuse a shared instance.
752/// When a LoopManager runs in isolation, it will create its own slot stack from the
753/// number of slots. When it runs as part of RunGraphs(), each loop manager will be
754/// assigned a shared slot stack, so dataframe helpers can be shared in a thread-safe
755/// manner.
756std::shared_ptr<ROOT::Internal::RSlotStack> RLoopManager::SlotStack() const
757{
758#ifdef R__USE_IMT
759 if (auto shared = fSlotStack.lock(); shared) {
760 return shared;
761 } else {
762 return std::make_shared<ROOT::Internal::RSlotStack>(fNSlots);
763 }
764#else
765 return nullptr;
766#endif
767}
768
769/// Initialize all nodes of the functional graph before running the event loop.
770/// This method is called once per event-loop and performs generic initialization
771/// operations that do not depend on the specific processing slot (i.e. operations
772/// that are common for all threads).
774{
776 for (auto *filter : fBookedFilters)
777 filter->InitNode();
778 for (auto *range : fBookedRanges)
779 range->InitNode();
780 for (auto *ptr : fBookedActions)
781 ptr->Initialize();
782}
783
784/// Perform clean-up operations. To be called at the end of each event loop.
786{
787 fMustRunNamedFilters = false;
788
789 // forget RActions and detach TResultProxies
790 for (auto *ptr : fBookedActions)
791 ptr->Finalize();
792
793 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
794 fBookedActions.clear();
795
796 // reset children counts
797 fNChildren = 0;
798 fNStopsReceived = 0;
799 for (auto *ptr : fBookedFilters)
800 ptr->ResetChildrenCount();
801 for (auto *ptr : fBookedRanges)
802 ptr->ResetChildrenCount();
803
805 fCallbacksOnce.clear();
806}
807
808/// Perform clean-up operations. To be called at the end of each task execution.
810{
811 if (r != nullptr)
812 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
813 for (auto *ptr : fBookedActions)
814 ptr->FinalizeSlot(slot);
815 for (auto *ptr : fBookedFilters)
816 ptr->FinalizeSlot(slot);
817 for (auto *ptr : fBookedDefines)
818 ptr->FinalizeSlot(slot);
819
820 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
821 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
822 // because the TTreeReader object changes at every task
823 for (auto &v : fDatasetColumnReaders[slot])
824 v.second.reset();
825 }
826}
827
828/// Add RDF nodes that require just-in-time compilation to the computation graph.
829/// This method also clears the contents of GetCodeToJit().
831{
832 {
834 if (GetCodeToJit().empty() && GetCodeToDeclare().empty()) {
836 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
837 return;
838 }
839 }
840
841 std::string codeToDeclare, code;
842 {
845 code.swap(GetCodeToJit());
846 };
847
848 TStopwatch s;
849 s.Start();
850 if (!codeToDeclare.empty()) {
852 }
853 if (!code.empty()) {
854 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
855 }
856 s.Stop();
857 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
858 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
859 : " in less than 1ms.");
860
862}
863
865{
866 if (!fJitHelperCalls.empty()) {
867 // funcMap is not thread-safe
869 TStopwatch s;
870 s.Start();
871 const auto &funcMap = GetJitHelperFuncMap();
872 for (auto &call : fJitHelperCalls) {
873 funcMap.at(call.fFunctionId)(call.fColNames, *call.fColRegister, *this, call.fJittedNode.get(),
874 &call.fExtraArgs);
875 }
876 s.Stop();
877 const auto realTime = s.RealTime();
878 R__LOG_INFO(RDFLogChannel()) << fJitHelperCalls.size() << " deferred calls completed"
879 << (realTime > 1e-3 ? " in " + std::to_string(realTime) + " seconds."
880 : " in less than 1ms.");
881 // Promoting to write lock to clear the vector
883 fJitHelperCalls.clear();
884 }
885}
886
887/// Trigger counting of number of children nodes for each node of the functional graph.
888/// This is done once before starting the event loop. Each action sends an `increase children count` signal
889/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
890/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
891/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
892/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
894{
895 for (auto *actionPtr : fBookedActions)
896 actionPtr->TriggerChildrenCount();
898 namedFilterPtr->TriggerChildrenCount();
899}
900
901/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
902/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
903/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
905{
906 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
907 MaxTreeSizeRAII ctxtmts;
908
909 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
910
912
913 if (jit)
914 Jit();
915
916 // Called here since in a RunGraphs run, multiple RLoopManager runs could be
917 // triggered from different threads.
919
920 InitNodes();
921
922 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
923 // we use RAII: even in case of an exception, the destructor of the object is invoked and
924 // all the cleanup takes place.
925 class NodesCleanerRAII {
927
928 public:
930 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
931 };
932
934
935 TStopwatch s;
936 s.Start();
937
938 switch (fLoopType) {
940 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
941 break;
944 case ELoopType::kNoFiles: RunEmptySource(); break;
946 }
947 s.Stop();
948
949 fNRuns++;
950
951 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
952 << s.RealTime() << "s elapsed).";
953}
954
955/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
960
966
973
975{
976 fBookedFilters.emplace_back(filterPtr);
977 if (filterPtr->HasName()) {
978 fBookedNamedFilters.emplace_back(filterPtr);
980 }
981}
982
988
993
998
1000{
1001 fBookedDefines.emplace_back(ptr);
1002}
1003
1009
1014
1019
1020// dummy call, end of recursive chain of calls
1022{
1023 return true;
1024}
1025
1026/// Call `FillReport` on all booked filters
1028{
1029 for (const auto *fPtr : fBookedNamedFilters)
1030 fPtr->FillReport(rep);
1031}
1032
1033void RLoopManager::ToJitExec(const std::string &code) const
1034{
1036 GetCodeToJit().append(code);
1037}
1038
1040 std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegister,
1041 const std::vector<std::string> &colNames, std::shared_ptr<void> jittedNode,
1042 std::shared_ptr<void> argument)
1043{
1045 {
1047 auto match = funcBodyToFuncIdMap.find(fStringHasher(funcBody));
1048 if (match != funcBodyToFuncIdMap.end()) {
1049 R__WRITE_LOCKGUARD(ROOT::gCoreMutex); // modifying fJitHelperCalls
1050 std::string funcName = "jitNodeRegistrator_" + std::to_string(match->second);
1051 R__LOG_DEBUG(0, RDFLogChannel()) << "JIT helper " << funcName << " was already registered.";
1052 fJitHelperCalls.emplace_back(match->second, std::move(colRegister), colNames, jittedNode, argument);
1053 return;
1054 }
1055 }
1056
1057 {
1058 // Register lazily a JIT helper
1060 auto registratorId = funcBodyToFuncIdMap.size();
1061 std::string funcName = "jitNodeRegistrator_" + std::to_string(registratorId);
1063 assert(res.second);
1064
1065 std::string toDeclare = "namespace R_rdf {\n void " + funcName + funcBody + "\n}\n";
1066 R__LOG_DEBUG(0, RDFLogChannel()) << "Registering deferred JIT helper:\n" << toDeclare;
1067
1068 GetCodeToDeclare().append(toDeclare);
1070 }
1071}
1072
1073void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1074{
1075 if (everyNEvents == 0ull)
1076 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1077 else
1078 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1079}
1080
1081std::vector<std::string> RLoopManager::GetFiltersNames()
1082{
1083 std::vector<std::string> filters;
1084 for (auto *filter : fBookedFilters) {
1085 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1086 filters.push_back(name);
1087 }
1088 return filters;
1089}
1090
1091std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1092{
1093 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1094 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1095 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1096 return nodes;
1097}
1098
1099std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1100{
1101 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1102 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1103 std::copy(fRunActions.begin(), fRunActions.end(), it);
1104 return actions;
1105}
1106
1107std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1108 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1109{
1110 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1111 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1113 return duplicateRLoopManagerIt->second;
1114
1115 std::string name;
1116 if (fDataSource) {
1117 name = fDataSource->GetLabel();
1118 } else {
1119 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1120 }
1121 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1123 visitedMap[(void *)this] = thisNode;
1124 return thisNode;
1125}
1126
1127/// Return true if AddDataSourceColumnReaders was called for column name col.
1128bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
1129{
1130 const auto key = MakeDatasetColReadersKey(col, ti);
1131 assert(fDataSource != nullptr);
1132 // since data source column readers are always added for all slots at the same time,
1133 // if the reader is present for slot 0 we have it for all other slots as well.
1134 auto it = fDatasetColumnReaders[0].find(key);
1135 return (it != fDatasetColumnReaders[0].end() && it->second);
1136}
1137
1139 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1140 const std::type_info &ti)
1141{
1142 const auto key = MakeDatasetColReadersKey(col, ti);
1143 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1144 assert(readers.size() == fNSlots);
1145
1146 for (auto slot = 0u; slot < fNSlots; ++slot) {
1147 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1148 }
1149}
1150
1152 const std::type_info &ti, TTreeReader *treeReader)
1153{
1155 const auto key = MakeDatasetColReadersKey(col, ti);
1156 // if a reader for this column and this slot was already there, we are doing something wrong
1157 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1158 assert(fDataSource && "Missing RDataSource to add column reader.");
1159
1161
1162 return readers[key].get();
1163}
1164
1166RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1167{
1168 const auto key = MakeDatasetColReadersKey(col, ti);
1169 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1170 return it->second.get();
1171 else
1172 return nullptr;
1173}
1174
1176{
1177 if (callback)
1178 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1179}
1180
1181void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1182{
1183 fEmptyEntryRange = std::move(newRange);
1184}
1185
1187{
1188 fBeginEntry = begin;
1189 fEndEntry = end;
1190}
1191
1193{
1194 fTTreeLifeline = std::move(lifeline);
1195}
1196
1197std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1200{
1201 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1202 // Creating an RDataFrame with a non-existing file will throw early rather
1203 // than wait for the start of the graph execution.
1204 if (checkFile) {
1206 }
1207
1208 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1209 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1210 return lm;
1211}
1212
1213std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1214ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1215 const std::vector<std::string> &defaultColumns, bool checkFile)
1216{
1217 if (fileNameGlobs.size() == 0)
1218 throw std::invalid_argument("RDataFrame: empty list of input files.");
1219 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1220 // Creating an RDataFrame with a non-existing file will throw early rather
1221 // than wait for the start of the graph execution.
1222 if (checkFile) {
1224 }
1225 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1226 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1227 return lm;
1228}
1229
1230std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1233{
1234 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1235 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1236 return lm;
1237}
1238
1239std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1240ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1242{
1243 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1244 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1245 return lm;
1246}
1247
1248std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1251{
1252
1254
1255 if (inFile->Get<TTree>(datasetName.data())) {
1256 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1257 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1259 }
1260
1261 std::string errMsg = inFile->Get(datasetName.data()) ? "unsupported data format for" : "cannot find";
1262
1263 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName) + "\" in file \"" +
1264 inFile->GetName() + "\".");
1265}
1266
1267std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1268ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1270{
1271
1272 if (fileNameGlobs.size() == 0)
1273 throw std::invalid_argument("RDataFrame: empty list of input files.");
1274
1276
1277 if (inFile->Get<TTree>(datasetName.data())) {
1278 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1279 } else if (inFile->Get<ROOT::RNTuple>(datasetName.data())) {
1281 }
1282
1283 std::string errMsg = inFile->Get(datasetName.data()) ? "unsupported data format for" : "cannot find";
1284
1285 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName) + "\" in file \"" +
1286 inFile->GetName() + "\".");
1287}
1288
1289// outlined to pin virtual table
1291
1292void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1293{
1294 if (dataSource) {
1295 fDataSource = std::move(dataSource);
1296 fDataSource->SetNSlots(fNSlots);
1297 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1298 }
1299}
1300
1301void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1303 std::atomic<ULong64_t> &entryCount)
1304{
1305#ifdef R__USE_IMT
1307 const auto &slot = slotRAII.fSlot;
1308
1309 const auto &[start, end] = entryRange;
1310 const auto nEntries = end - start;
1311 entryCount.fetch_add(nEntries);
1312
1313 RCallCleanUpTask cleanup(*this, slot);
1314 RDSRangeRAII _{*this, slot, start};
1315
1316 fSampleInfos[slot] = ROOT::Internal::RDF::CreateSampleInfo(*fDataSource, slot, fSampleMap);
1317
1318 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1319
1320 try {
1321 for (auto entry = start; entry < end; ++entry) {
1322 if (fDataSource->SetEntry(slot, entry)) {
1323 RunAndCheckFilters(slot, entry);
1324 }
1325 }
1326 } catch (...) {
1327 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1328 throw;
1329 }
1330 fDataSource->FinalizeSlot(slot);
1331#else
1332 (void)entryRange;
1333 (void)slotStack;
1334 (void)entryCount;
1335#endif
1336}
1337
1339 std::atomic<ULong64_t> &entryCount)
1340{
1341#ifdef R__USE_IMT
1343 const auto &slot = slotRAII.fSlot;
1344
1345 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1346 const auto &[start, end] = entryRange;
1347 const auto nEntries = end - start;
1348 auto count = entryCount.fetch_add(nEntries);
1349
1350 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1351 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1352
1354 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1355 try {
1356 // recursive call to check filters and conditionally execute actions
1358 if (fNewSampleNotifier.CheckFlag(slot)) {
1359 UpdateSampleInfo(slot, treeReader);
1360 }
1361 RunAndCheckFilters(slot, count++);
1362 }
1363 } catch (...) {
1364 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1365 throw;
1366 }
1367 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1368 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1369 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1370 // something went wrong in the TTreeReader event loop
1371 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1372 std::to_string(treeReader.GetEntryStatus()));
1373 }
1374#else
1375 (void)treeReader;
1376 (void)slotStack;
1377 (void)entryCount;
1378#endif
1379}
1380
1383
1384ROOT::Detail::RDF::RLoopManager::DeferredJitCall &ROOT::Detail::RDF::RLoopManager::DeferredJitCall::operator=(
1385 ROOT::Detail::RDF::RLoopManager::DeferredJitCall &&) noexcept = default;
1386
1387ROOT::Detail::RDF::RLoopManager::DeferredJitCall::~DeferredJitCall() = default;
1388
1390 std::size_t id, std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegisterPtr,
1391 const std::vector<std::string> &colNamesArg, std::shared_ptr<void> jittedNode, std::shared_ptr<void> argPtr)
1392 : fFunctionId(id),
1393 fColRegister(std::move(colRegisterPtr)),
1394 fColNames(colNamesArg),
1395 fJittedNode(jittedNode),
1396 fExtraArgs(argPtr)
1397{
1398 assert(fJittedNode != nullptr);
1399}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
Basic types used by ROOT and required by TInterpreter.
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:157
#define gInterpreter
R__EXTERN TSystem * gSystem
Definition TSystem.h:582
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void RegisterJitHelperCall(const std::string &funcBody, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > colRegister, const std::vector< std::string > &colnames, std::shared_ptr< void > jittedNode, std::shared_ptr< void > argument=nullptr)
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::hash< std::string > fStringHasher
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
std::vector< DeferredJitCall > fJitHelperCalls
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
A binder for user-defined columns, variations and aliases.
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:68
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3765
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:138
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1285
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:89
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9462
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:43
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:673
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:666
unsigned int GetNSlots()
Definition RDFUtils.cxx:397
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:655
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:204
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:684
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:441
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:678
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:600
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
DeferredJitCall(std::size_t id, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > cols, const std::vector< std::string > &colNamesArg, std::shared_ptr< void > jittedNode, std::shared_ptr< void > arg)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.