Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
21#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
22#include "ROOT/RLogger.hxx"
23#include "ROOT/RNTuple.hxx"
24#include "ROOT/RNTupleDS.hxx"
25#include "RtypesCore.h" // Long64_t
26#include "TStopwatch.h"
27#include "TBranchElement.h"
28#include "TBranchObject.h"
29#include "TChain.h"
30#include "TEntryList.h"
31#include "TFile.h"
32#include "TFriendElement.h"
33#include "TInterpreter.h"
34#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
35#include "TTreeReader.h"
36#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
37
38#include "ROOT/RTTreeDS.hxx"
39
40#ifdef R__USE_IMT
43#include "ROOT/RSlotStack.hxx"
44#endif
45
46#ifdef R__UNIX
47// Functions needed to perform EOS XRootD redirection in ChangeSpec
48#include "TEnv.h"
49#include "TSystem.h"
50#ifndef R__FBSD
51#include <sys/xattr.h>
52#else
53#include <sys/extattr.h>
54#endif
55#ifdef R__MACOSX
56/* On macOS getxattr takes two extra arguments that should be set to 0 */
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
58#endif
59#ifdef R__FBSD
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
61#endif
62#endif
63
64#include <algorithm>
65#include <atomic>
66#include <cassert>
67#include <functional>
68#include <iostream>
69#include <memory>
70#include <stdexcept>
71#include <string>
72#include <sstream>
73#include <thread>
74#include <unordered_map>
75#include <vector>
76#include <set>
77#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
78
79using namespace ROOT::Detail::RDF;
80using namespace ROOT::Internal::RDF;
81
82namespace {
83/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
84/// This allows different RLoopManager instances to share these data.
85/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
86/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
87/// much faster than jitting each RLoopManager's code separately.
88std::string &GetCodeToJit()
89{
90 static std::string code;
91 return code;
92}
93
94std::string &GetCodeToDeclare()
95{
96 static std::string code;
97 return code;
98}
99
100// Signature of all helper functions that are created by JIT helpers, see
101// Book*Jit and JitBuildAction in RDFInterfaceUtils.cxx
102using JitHelperFunc_t = void (*)(const std::vector<std::string> &, ROOT::Internal::RDF::RColumnRegister &,
103 ROOT::Detail::RDF::RLoopManager &, void *, std::shared_ptr<void> *);
104std::unordered_map<std::size_t, JitHelperFunc_t> &GetJitHelperFuncMap()
105{
106 static std::unordered_map<std::size_t, JitHelperFunc_t> map;
107 return map;
108}
109std::unordered_map<std::size_t, std::size_t> &GetJitFuncBodyToFuncIdMap()
110{
111 static std::unordered_map<std::size_t, std::size_t> map;
112 return map;
113}
114
116{
117 // This function uses the interpreter and writes to the caches.
119
120 // Step 1: Declare the DeferredJitCall functions to the interpreter
121 // We use ProcessLine to ensure meta functionality (e.g. autoloading) is
122 // processed when needed.
123 // If instead we used Declare, builds with runtime_cxxmodules=OFF would fail
124 // in jitted actions with custom helpers with errors like:
125 // error: 'MyHelperType' is an incomplete type
126 // return std::make_unique<Action_t>(Helper_t(std::move(*h)), bl, std::move(prevNode), colRegister);
127 // ^
129 gInterpreter->ProcessLine(codeToDeclare.c_str(), &interpErrorCode);
131 throw std::runtime_error(
132 "\nAn error occurred during just-in-time compilation in RLoopManager::Run. The lines above might "
133 "indicate the cause of the error.\nAll RDF objects that have not run their event loop yet should be "
134 "considered in an invalid state.\n");
135 }
136
137 // Step 2: Retrieve the declared functions as function pointers, cache them
138 // for later use in RunDeferredCalls
141 auto clinfo = gInterpreter->ClassInfo_Factory("R_rdf");
142 assert(gInterpreter->ClassInfo_IsValid(clinfo));
143
144 for (auto &codeAndId : funcBodyToFuncIdMap) {
145 if (auto it = funcIdToFuncPointersMap.find(codeAndId.second); it == funcIdToFuncPointersMap.end()) {
146 // fast fetch of the address via gInterpreter
147 // (faster than gInterpreter->Evaluate(function name, ret), ret->GetAsPointer())
148 // Retrieve the JIT helper function we registered via RegisterJitHelperCall
149 const std::string funcName = "jitNodeRegistrator_" + std::to_string(codeAndId.second);
150 auto declid = gInterpreter->GetFunction(clinfo, funcName.c_str());
151 if (!declid) {
152 // The interpreter failed to compile the helper. Without this check
153 // we would later dereference a null function pointer and crash.
154 gInterpreter->ClassInfo_Delete(clinfo);
155 throw std::runtime_error(
156 "\nAn error occurred during just-in-time compilation in RLoopManager::Run: failed to retrieve "
157 "the JIT helper function '" +
158 funcName +
159 "'. The lines above might indicate the cause of the error.\nAll RDF objects that have not run "
160 "their event loop yet should be considered in an invalid state.\n");
161 }
162 auto minfo = gInterpreter->MethodInfo_Factory(declid);
163 assert(gInterpreter->MethodInfo_IsValid(minfo));
164 auto mname = gInterpreter->MethodInfo_GetMangledName(minfo);
165 [[maybe_unused]] auto res = funcIdToFuncPointersMap.insert(
166 {codeAndId.second, reinterpret_cast<JitHelperFunc_t>(gInterpreter->FindSym(mname))});
167 assert(res.second);
168 gInterpreter->MethodInfo_Delete(minfo);
169 }
170 }
171 gInterpreter->ClassInfo_Delete(clinfo);
172}
173
174void ThrowIfNSlotsChanged(unsigned int nSlots)
175{
177 if (currentSlots != nSlots) {
178 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
179 std::to_string(nSlots) + ", but when starting the event loop it was " +
180 std::to_string(currentSlots) + ".";
181 if (currentSlots > nSlots)
182 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
183 else
184 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
185 throw std::runtime_error(msg);
186 }
187}
188
189/**
190\struct MaxTreeSizeRAII
191\brief Scope-bound change of `TTree::fgMaxTreeSize`.
192
193This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
194changes it to maximum at construction time and restores it back at destruction
195time. Needed for issue #6523 and should be reverted when #6640 will be solved.
196*/
197struct MaxTreeSizeRAII {
198 Long64_t fOldMaxTreeSize;
199
200 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
201 {
202 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
203 }
204
205 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
206};
207
208struct DatasetLogInfo {
209 std::string fDataSet;
210 ULong64_t fRangeStart;
211 ULong64_t fRangeEnd;
212 unsigned int fSlot;
213};
214
215std::string LogRangeProcessing(const DatasetLogInfo &info)
216{
217 std::stringstream msg;
218 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
219 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
220 return msg.str();
221}
222
223auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
224{
225 // We use a combination of column name and column type name as the key because in some cases we might end up
226 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
227 // df.Sum<vector<int>>("stdVectorBranch");
228 // df.Sum<RVecI>("stdVectorBranch");
229 return std::string(colName) + ':' + ti.name();
230}
231
232/// \brief Check if object of a certain type is in the directory
233///
234/// Attempts to read an object of the specified type via TDirectory::Get, wraps
235/// it in a std::unique_ptr to avoid leaking the object.
236template <typename T>
237bool IsObjectInDir(std::string_view objName, TDirectory &dir)
238{
239 std::unique_ptr<T> o{dir.Get<T>(objName.data())};
240 return o.get();
241}
242
243/// \brief Check if a generic object is in the directory
244///
245/// Checks if a generic object is in the directory, uses TDirectory::GetKey
246/// to avoid having to deal with memory management of the object being read
247/// without having its type.
248template <>
249bool IsObjectInDir<void>(std::string_view objName, TDirectory &dir)
250{
251 return dir.GetKey(objName.data());
252}
253} // anonymous namespace
254
255/**
256 * \brief Helper function to open a file (or the first file from a glob).
257 * This function is used at construction time of an RDataFrame, to check the
258 * concrete type of the dataset stored inside the file.
259 */
260std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
261{
262 // Follow same logic in TChain::Add to find the correct string to look for globbing:
263 // - If the extension ".root" is present in the file name, pass along the basename.
264 // - If not, use the "?" token to delimit the part of the string which represents the basename.
265 // - Otherwise, pass the full filename.
266 auto &&baseNameAndQuery = [&fileNameGlob]() {
267 constexpr std::string_view delim{".root"};
268 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
269 it != fileNameGlob.end()) {
270 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
271 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
272 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
273 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
274 else
275 return std::make_pair(fileNameGlob, std::string_view{});
276 }();
277 // Captured structured bindings variable are only valid since C++20
278 auto &&baseName = baseNameAndQuery.first;
279 auto &&query = baseNameAndQuery.second;
280
281 std::string fileToOpen{fileNameGlob};
282 if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
283 const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
284 if (expanded.empty())
285 throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
286 "' did not match any files."};
287
288 fileToOpen = expanded.front() + std::string{query};
289 }
290
291 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
292 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
293 if (!inFile || inFile->IsZombie())
294 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
295
296 return inFile;
297}
298
299namespace ROOT {
300namespace Detail {
301namespace RDF {
302
303/// A RAII object that calls RLoopManager::CleanUpTask at destruction
315
316} // namespace RDF
317} // namespace Detail
318} // namespace ROOT
319
320ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
321 : fDefaultColumns(defaultColumns),
322 fNSlots(RDFInternal::GetNSlots()),
323 fNewSampleNotifier(fNSlots),
324 fSampleInfos(fNSlots),
325 fDatasetColumnReaders(fNSlots)
326{
327}
328
330 : fDefaultColumns(defaultBranches),
331 fNSlots(RDFInternal::GetNSlots()),
332 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
333 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
334 fNewSampleNotifier(fNSlots),
335 fSampleInfos(fNSlots),
336 fDatasetColumnReaders(fNSlots)
337{
338 fDataSource->SetNSlots(fNSlots);
339}
340
342 : fEmptyEntryRange(0, nEmptyEntries),
343 fNSlots(RDFInternal::GetNSlots()),
344 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
345 fNewSampleNotifier(fNSlots),
346 fSampleInfos(fNSlots),
347 fDatasetColumnReaders(fNSlots)
348{
349}
350
351RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
352 : fDefaultColumns(defaultBranches),
353 fNSlots(RDFInternal::GetNSlots()),
354 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
355 fDataSource(std::move(ds)),
356 fNewSampleNotifier(fNSlots),
357 fSampleInfos(fNSlots),
358 fDatasetColumnReaders(fNSlots)
359{
360 fDataSource->SetNSlots(fNSlots);
361}
362
364 : fNSlots(RDFInternal::GetNSlots()),
365 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
366 fNewSampleNotifier(fNSlots),
367 fSampleInfos(fNSlots),
368 fDatasetColumnReaders(fNSlots)
369{
370 ChangeSpec(std::move(spec));
371}
372
373#ifdef R__UNIX
374namespace {
375std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
376{
377 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
378 // If so, we create a redirected sample ID with the full xroot URL.
379 TString expandedUrl(path.data());
381 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
382 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
383 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
384 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
385 if (len > 0) {
386 std::string xurl(len, 0);
387 std::string fileNameFromUrl{fileurl.GetFile()};
388 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
389 // Sometimes the `getxattr` call may return an invalid URL due
390 // to the POSIX attribute not being yet completely filled by EOS.
391 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
392 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
393 return xurl + '/' + datasetName.data();
394 }
395 }
396 }
397 }
398 }
399
400 return std::nullopt;
401}
402} // namespace
403#endif
404
405/**
406 * @brief Changes the internal TTree held by the RLoopManager.
407 *
408 * @warning This method may lead to potentially dangerous interactions if used
409 * after the construction of the RDataFrame. Changing the specification makes
410 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
411 * new specification refers to exactly the same number of columns, with the
412 * same names and types. The actual use case of this method is moving the
413 * processing of the same RDataFrame to a different range of entries of the
414 * same dataset (which may be stored in a different set of files).
415 *
416 * @param spec The specification of the dataset to be adopted.
417 */
419{
420 auto filesVec = spec.GetFileNameGlobs();
422 filesVec[0]); // we only need the first file, we assume all files are either TTree or RNTuple
423 auto datasetName = spec.GetTreeNames();
424
425 // Change the range of entries to be processed
426 fBeginEntry = spec.GetEntryRangeBegin();
427 fEndEntry = spec.GetEntryRangeEnd();
428
429 // Store the samples
430 fSamples = spec.MoveOutSamples();
431 fSampleMap.clear();
432
435
436 if (isTTree || isRNTuple) {
437
438 if (isTTree) {
439 // Create the internal main chain
441 for (auto &sample : fSamples) {
442 const auto &trees = sample.GetTreeNames();
443 const auto &files = sample.GetFileNameGlobs();
444 for (std::size_t i = 0ul; i < files.size(); ++i) {
445 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
446 // (see https://github.com/root-project/root/pull/8820 for why)
447 const auto fullpath = files[i] + "?#" + trees[i];
448 chain->Add(fullpath.c_str());
449 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
450 // change this easily because of backward compatibility: the sample ID
451 // is exposed to users via RSampleInfo and DefinePerSample).
452 const auto sampleId = files[i] + '/' + trees[i];
453 fSampleMap.insert({sampleId, &sample});
454#ifdef R__UNIX
455 // Also add redirected EOS xroot URL when available
457 fSampleMap.insert({redirectedSampleId.value(), &sample});
458#endif
459 }
460 }
461 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
462 } else if (isRNTuple) {
463
464 std::vector<std::string> fileNames;
465 std::set<std::string> rntupleNames;
466
467 for (auto &sample : fSamples) {
468 const auto &trees = sample.GetTreeNames();
469 const auto &files = sample.GetFileNameGlobs();
470 for (std::size_t i = 0ul; i < files.size(); ++i) {
471 const auto sampleId = files[i] + '/' + trees[i];
472 fSampleMap.insert({sampleId, &sample});
473 fileNames.push_back(files[i]);
474 rntupleNames.insert(trees[i]);
475
476#ifdef R__UNIX
477 // Also add redirected EOS xroot URL when available
479 fSampleMap.insert({redirectedSampleId.value(), &sample});
480#endif
481 }
482 }
483
484 if (rntupleNames.size() == 1) {
485 fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
486
487 } else {
488 throw std::runtime_error(
489 "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
490 }
491 }
492
493 fDataSource->SetNSlots(fNSlots);
494
495 for (unsigned int slot{}; slot < fNSlots; slot++) {
496 for (auto &v : fDatasetColumnReaders[slot])
497 v.second.reset();
498 }
499 } else {
500 std::string errMsg =
501 IsObjectInDir<void>(datasetName[0].data(), *inFile) ? "unsupported data format for" : "cannot find";
502 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName[0]) + "\" in file \"" +
503 inFile->GetName() + "\".");
504 }
505}
506
507/// Run event loop with no source files, in parallel.
509{
510#ifdef R__USE_IMT
511 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack = SlotStack();
512 // Working with an empty tree.
513 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
514 const auto nEmptyEntries = GetNEmptyEntries();
515 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
516 auto remainder = nEmptyEntries % (fNSlots * 2);
517 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
518 ULong64_t begin = fEmptyEntryRange.first;
519 while (begin < fEmptyEntryRange.second) {
520 ULong64_t end = begin + nEntriesPerSlot;
521 if (remainder > 0) {
522 ++end;
523 --remainder;
524 }
525 entryRanges.emplace_back(begin, end);
526 begin = end;
527 }
528
529 // Each task will generate a subrange of entries
530 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
532 auto slot = slotRAII.fSlot;
533 RCallCleanUpTask cleanup(*this, slot);
534 InitNodeSlots(nullptr, slot);
535 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
536 try {
538 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
540 }
541 } catch (...) {
542 // Error might throw in experiment frameworks like CMSSW
543 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
544 throw;
545 }
546 };
547
549 pool.Foreach(genFunction, entryRanges);
550
551#endif // not implemented otherwise
552}
553
554/// Run event loop with no source files, in sequence.
556{
557 InitNodeSlots(nullptr, 0);
559 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
560 RCallCleanUpTask cleanup(*this);
561 try {
566 }
567 } catch (...) {
568 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
569 throw;
570 }
571}
572
573#ifdef R__USE_IMT
574namespace {
575/// Return true on succesful entry read.
576///
577/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
578/// there can be other situations where the read is still valid. For now, these
579/// are:
580/// - If there was no match of the current entry in one or more friend trees
581/// according to their respective indexes.
582/// - If there was a missing branch at the start of a new tree in the dataset.
583///
584/// In such situations, although the entry is not complete, the processing
585/// should not be aborted and nodes of the computation graph will take action
586/// accordingly.
588{
589 treeReader.Next();
590 switch (treeReader.GetEntryStatus()) {
591 case TTreeReader::kEntryValid: return true;
592 case TTreeReader::kIndexedFriendNoMatch: return true;
594 default: return false;
595 }
596}
597} // namespace
598#endif
599
600namespace {
601struct DSRunRAII {
603 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
604 {
606 }
607 ~DSRunRAII() { fDS.Finalize(); }
608};
609} // namespace
610
613 unsigned int fSlot;
616 TTreeReader *treeReader = nullptr)
617 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
618 {
619 fLM.InitNodeSlots(fTreeReader, fSlot);
620 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
621 }
623};
624
625/// Run event loop over data accessed through a DataSource, in sequence.
627{
628 assert(fDataSource != nullptr);
629 // Shortcut if the entry range would result in not reading anything
630 if (fBeginEntry == fEndEntry)
631 return;
632 // Apply global entry range if necessary
633 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
634 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
635 // Initialize data source and book finalization
637 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
638 // sources that need it (currently only TTree).
639 RCallCleanUpTask cleanup(*this);
640
641 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
642 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
643 // TTree).
644 std::uint64_t processedEntries{};
645 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
646 do {
647
649
650 ranges = fDataSource->GetEntryRanges();
651
653
654 try {
655 for (const auto &range : ranges) {
656 const auto start = range.first;
657 const auto end = range.second;
658 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
659 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
660 if (fDataSource->SetEntry(0u, entry)) {
662 }
664 }
665 }
666 } catch (...) {
667 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
668 throw;
669 }
670
671 } while (!ranges.empty() && fNStopsReceived < fNChildren);
672
674
675 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
676 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
677 std::ostringstream buf{};
678 buf << "RDataFrame stopped processing after ";
679 buf << processedEntries;
680 buf << " entries, whereas an entry range (begin=";
681 buf << fBeginEntry;
682 buf << ",end=";
683 buf << fEndEntry;
684 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
685 buf << (fBeginEntry + processedEntries);
686 buf << ".";
687 Warning("RDataFrame::Run", "%s", buf.str().c_str());
688 }
689}
690
691/// Run event loop over data accessed through a DataSource, in parallel.
693{
694#ifdef R__USE_IMT
695 assert(fDataSource != nullptr);
696 // Shortcut if the entry range would result in not reading anything
697 if (fBeginEntry == fEndEntry)
698 return;
699 // Apply global entry range if necessary
700 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
701 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
702
704
706
707#endif // not implemented otherwise (never called)
708}
709
710/// Execute actions and make sure named filters are called for each event.
711/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
713{
714 // data-block callbacks run before the rest of the graph
716 for (auto &callback : fSampleCallbacks)
717 callback.second(slot, fSampleInfos[slot]);
719 }
720
721 for (auto *actionPtr : fBookedActions)
722 actionPtr->Run(slot, entry);
724 namedFilterPtr->CheckFilters(slot, entry);
725 for (auto &callback : fCallbacksEveryNEvents)
726 callback(slot);
727}
728
729/// Build TTreeReaderValues for all nodes
730/// This method loops over all filters, actions and other booked objects and
731/// calls their `InitSlot` method, to get them ready for running a task.
733{
735 for (auto *ptr : fBookedActions)
736 ptr->InitSlot(r, slot);
737 for (auto *ptr : fBookedFilters)
738 ptr->InitSlot(r, slot);
739 for (auto *ptr : fBookedDefines)
740 ptr->InitSlot(r, slot);
741 for (auto *ptr : fBookedVariations)
742 ptr->InitSlot(r, slot);
743
744 for (auto &callback : fCallbacksOnce)
745 callback(slot);
746}
747
749 if (r != nullptr) {
750 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
751 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
752 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
753 }
754 // Whatever the data source, initially set the "new data block" flag:
755 // - for TChains, this ensures that we don't skip the first data block because
756 // the correct tree is already loaded
757 // - for RDataSources and empty sources, which currently don't have data blocks, this
758 // ensures that we run once per task
760}
761
762void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
764 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
765}
766
768 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
769 auto *tree = r.GetTree()->GetTree();
770 R__ASSERT(tree != nullptr);
771 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
772 auto *file = tree->GetCurrentFile();
773 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
774
775 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
776 R__ASSERT(range.first >= 0);
777 if (range.second == -1) {
778 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
779 }
780 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
781 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
782 if (fSampleMap.empty()) {
783 fSampleInfos[slot] = RSampleInfo(id, range, nullptr, tree->GetEntries());
784 } else {
785 if (fSampleMap.find(id) == fSampleMap.end())
786 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
787 fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id], tree->GetEntries());
788 }
789}
790
791/// Create a slot stack with the desired number of slots or reuse a shared instance.
792/// When a LoopManager runs in isolation, it will create its own slot stack from the
793/// number of slots. When it runs as part of RunGraphs(), each loop manager will be
794/// assigned a shared slot stack, so dataframe helpers can be shared in a thread-safe
795/// manner.
796std::shared_ptr<ROOT::Internal::RSlotStack> RLoopManager::SlotStack() const
797{
798#ifdef R__USE_IMT
799 if (auto shared = fSlotStack.lock(); shared) {
800 return shared;
801 } else {
802 return std::make_shared<ROOT::Internal::RSlotStack>(fNSlots);
803 }
804#else
805 return nullptr;
806#endif
807}
808
809/// Initialize all nodes of the functional graph before running the event loop.
810/// This method is called once per event-loop and performs generic initialization
811/// operations that do not depend on the specific processing slot (i.e. operations
812/// that are common for all threads).
814{
816 for (auto *filter : fBookedFilters)
817 filter->InitNode();
818 for (auto *range : fBookedRanges)
819 range->InitNode();
820 for (auto *ptr : fBookedActions)
821 ptr->Initialize();
822}
823
824/// Perform clean-up operations. To be called at the end of each event loop.
826{
827 fMustRunNamedFilters = false;
828
829 // forget RActions and detach TResultProxies
830 for (auto *ptr : fBookedActions)
831 ptr->Finalize();
832
833 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
834 fBookedActions.clear();
835
836 // reset children counts
837 fNChildren = 0;
838 fNStopsReceived = 0;
839 for (auto *ptr : fBookedFilters)
840 ptr->ResetChildrenCount();
841 for (auto *ptr : fBookedRanges)
842 ptr->ResetChildrenCount();
843
845 fCallbacksOnce.clear();
846}
847
848/// Perform clean-up operations. To be called at the end of each task execution.
850{
851 if (r != nullptr)
852 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
853 for (auto *ptr : fBookedActions)
854 ptr->FinalizeSlot(slot);
855 for (auto *ptr : fBookedFilters)
856 ptr->FinalizeSlot(slot);
857 for (auto *ptr : fBookedDefines)
858 ptr->FinalizeSlot(slot);
859
860 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
861 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
862 // because the TTreeReader object changes at every task
863 for (auto &v : fDatasetColumnReaders[slot])
864 v.second.reset();
865 }
866}
867
868/// Add RDF nodes that require just-in-time compilation to the computation graph.
869/// This method also clears the contents of GetCodeToJit().
871{
872 {
874 if (GetCodeToJit().empty() && GetCodeToDeclare().empty()) {
876 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
877 return;
878 }
879 }
880
881 std::string codeToDeclare, code;
882 {
885 code.swap(GetCodeToJit());
886 };
887
888 TStopwatch s;
889 s.Start();
890 if (!codeToDeclare.empty()) {
892 }
893 if (!code.empty()) {
894 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
895 }
896 s.Stop();
897 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
898 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
899 : " in less than 1ms.");
900
902}
903
905{
906 if (!fJitHelperCalls.empty()) {
907 // funcMap is not thread-safe
909 TStopwatch s;
910 s.Start();
911 const auto &funcMap = GetJitHelperFuncMap();
912 for (auto &call : fJitHelperCalls) {
913 funcMap.at(call.fFunctionId)(call.fColNames, *call.fColRegister, *this, call.fJittedNode.get(),
914 &call.fExtraArgs);
915 }
916 s.Stop();
917 const auto realTime = s.RealTime();
918 R__LOG_INFO(RDFLogChannel()) << fJitHelperCalls.size() << " deferred calls completed"
919 << (realTime > 1e-3 ? " in " + std::to_string(realTime) + " seconds."
920 : " in less than 1ms.");
921 // Promoting to write lock to clear the vector
923 fJitHelperCalls.clear();
924 }
925}
926
927/// Trigger counting of number of children nodes for each node of the functional graph.
928/// This is done once before starting the event loop. Each action sends an `increase children count` signal
929/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
930/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
931/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
932/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
934{
935 for (auto *actionPtr : fBookedActions)
936 actionPtr->TriggerChildrenCount();
938 namedFilterPtr->TriggerChildrenCount();
939}
940
941/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
942/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
943/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
945{
946 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
947 MaxTreeSizeRAII ctxtmts;
948
949 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
950
952
953 if (jit)
954 Jit();
955
956 // Called here since in a RunGraphs run, multiple RLoopManager runs could be
957 // triggered from different threads.
959
960 InitNodes();
961
962 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
963 // we use RAII: even in case of an exception, the destructor of the object is invoked and
964 // all the cleanup takes place.
965 class NodesCleanerRAII {
967
968 public:
970 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
971 };
972
974
975 TStopwatch s;
976 s.Start();
977
978 switch (fLoopType) {
980 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
981 break;
984 case ELoopType::kNoFiles: RunEmptySource(); break;
986 }
987 s.Stop();
988
989 fNRuns++;
990
991 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
992 << s.RealTime() << "s elapsed).";
993}
994
995/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
1000
1006
1013
1015{
1016 fBookedFilters.emplace_back(filterPtr);
1017 if (filterPtr->HasName()) {
1018 fBookedNamedFilters.emplace_back(filterPtr);
1019 fMustRunNamedFilters = true;
1020 }
1021}
1022
1028
1030{
1031 fBookedRanges.emplace_back(rangePtr);
1032}
1033
1038
1040{
1041 fBookedDefines.emplace_back(ptr);
1042}
1043
1049
1054
1059
1060// dummy call, end of recursive chain of calls
1062{
1063 return true;
1064}
1065
1066/// Call `FillReport` on all booked filters
1068{
1069 for (const auto *fPtr : fBookedNamedFilters)
1070 fPtr->FillReport(rep);
1071}
1072
1073void RLoopManager::ToJitExec(const std::string &code) const
1074{
1076 GetCodeToJit().append(code);
1077}
1078
1080 std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegister,
1081 const std::vector<std::string> &colNames, std::shared_ptr<void> jittedNode,
1082 std::shared_ptr<void> argument)
1083{
1085 {
1087 auto match = funcBodyToFuncIdMap.find(fStringHasher(funcBody));
1088 if (match != funcBodyToFuncIdMap.end()) {
1089 R__WRITE_LOCKGUARD(ROOT::gCoreMutex); // modifying fJitHelperCalls
1090 std::string funcName = "jitNodeRegistrator_" + std::to_string(match->second);
1091 R__LOG_DEBUG(0, RDFLogChannel()) << "JIT helper " << funcName << " was already registered.";
1092 fJitHelperCalls.emplace_back(match->second, std::move(colRegister), colNames, jittedNode, argument);
1093 return;
1094 }
1095 }
1096
1097 {
1098 // Register lazily a JIT helper
1100 auto registratorId = funcBodyToFuncIdMap.size();
1101 std::string funcName = "jitNodeRegistrator_" + std::to_string(registratorId);
1103 assert(res.second);
1104
1105 std::string toDeclare = "namespace R_rdf {\n void " + funcName + funcBody + "\n}\n";
1106 R__LOG_DEBUG(0, RDFLogChannel()) << "Registering deferred JIT helper:\n" << toDeclare;
1107
1108 GetCodeToDeclare().append(toDeclare);
1110 }
1111}
1112
1113void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1114{
1115 if (everyNEvents == 0ull)
1116 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1117 else
1118 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1119}
1120
1121std::vector<std::string> RLoopManager::GetFiltersNames()
1122{
1123 std::vector<std::string> filters;
1124 for (auto *filter : fBookedFilters) {
1125 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1126 filters.push_back(name);
1127 }
1128 return filters;
1129}
1130
1131std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1132{
1133 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1134 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1135 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1136 return nodes;
1137}
1138
1139std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1140{
1141 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1142 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1143 std::copy(fRunActions.begin(), fRunActions.end(), it);
1144 return actions;
1145}
1146
1147std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1148 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1149{
1150 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1151 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1153 return duplicateRLoopManagerIt->second;
1154
1155 std::string name;
1156 if (fDataSource) {
1157 name = fDataSource->GetLabel();
1158 } else {
1159 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1160 }
1161 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1163 visitedMap[(void *)this] = thisNode;
1164 return thisNode;
1165}
1166
1167/// Return true if AddDataSourceColumnReaders was called for column name col.
1168bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
1169{
1170 const auto key = MakeDatasetColReadersKey(col, ti);
1171 assert(fDataSource != nullptr);
1172 // since data source column readers are always added for all slots at the same time,
1173 // if the reader is present for slot 0 we have it for all other slots as well.
1174 auto it = fDatasetColumnReaders[0].find(key);
1175 return (it != fDatasetColumnReaders[0].end() && it->second);
1176}
1177
1179 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1180 const std::type_info &ti)
1181{
1182 const auto key = MakeDatasetColReadersKey(col, ti);
1183 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1184 assert(readers.size() == fNSlots);
1185
1186 for (auto slot = 0u; slot < fNSlots; ++slot) {
1187 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1188 }
1189}
1190
1192 const std::type_info &ti, TTreeReader *treeReader)
1193{
1195 const auto key = MakeDatasetColReadersKey(col, ti);
1196 // if a reader for this column and this slot was already there, we are doing something wrong
1197 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1198 assert(fDataSource && "Missing RDataSource to add column reader.");
1199
1201
1202 return readers[key].get();
1203}
1204
1206RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1207{
1208 const auto key = MakeDatasetColReadersKey(col, ti);
1209 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1210 return it->second.get();
1211 else
1212 return nullptr;
1213}
1214
1216{
1217 if (callback)
1218 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1219}
1220
1221void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1222{
1223 fEmptyEntryRange = std::move(newRange);
1224}
1225
1227{
1228 fBeginEntry = begin;
1229 fEndEntry = end;
1230}
1231
1233{
1234 fTTreeLifeline = std::move(lifeline);
1235}
1236
1237std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1240{
1241 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1242 // Creating an RDataFrame with a non-existing file will throw early rather
1243 // than wait for the start of the graph execution.
1244 if (checkFile) {
1246 }
1247
1248 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1249 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1250 return lm;
1251}
1252
1253std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1254ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1255 const std::vector<std::string> &defaultColumns, bool checkFile)
1256{
1257 if (fileNameGlobs.size() == 0)
1258 throw std::invalid_argument("RDataFrame: empty list of input files.");
1259 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1260 // Creating an RDataFrame with a non-existing file will throw early rather
1261 // than wait for the start of the graph execution.
1262 if (checkFile) {
1264 }
1265 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1266 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1267 return lm;
1268}
1269
1270std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1273{
1274 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1275 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1276 return lm;
1277}
1278
1279std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1280ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1282{
1283 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1284 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1285 return lm;
1286}
1287
1288std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1291{
1292
1294
1296 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1299 }
1300
1301 std::string errMsg = IsObjectInDir<void>(datasetName, *inFile) ? "unsupported data format for" : "cannot find";
1302
1303 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName) + "\" in file \"" +
1304 inFile->GetName() + "\".");
1305}
1306
1307std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1308ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1310{
1311
1312 if (fileNameGlobs.size() == 0)
1313 throw std::invalid_argument("RDataFrame: empty list of input files.");
1314
1316
1318 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1321 }
1322
1323 std::string errMsg = IsObjectInDir<void>(datasetName, *inFile) ? "unsupported data format for" : "cannot find";
1324
1325 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName) + "\" in file \"" +
1326 inFile->GetName() + "\".");
1327}
1328
1329// outlined to pin virtual table
1331
1332void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1333{
1334 if (dataSource) {
1335 fDataSource = std::move(dataSource);
1336 fDataSource->SetNSlots(fNSlots);
1337 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1338 }
1339}
1340
1341void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1343 std::atomic<ULong64_t> &entryCount)
1344{
1345#ifdef R__USE_IMT
1347 const auto &slot = slotRAII.fSlot;
1348
1349 const auto &[start, end] = entryRange;
1350 const auto nEntries = end - start;
1351 entryCount.fetch_add(nEntries);
1352
1353 RCallCleanUpTask cleanup(*this, slot);
1354 RDSRangeRAII _{*this, slot, start};
1355
1356 fSampleInfos[slot] = ROOT::Internal::RDF::CreateSampleInfo(*fDataSource, slot, fSampleMap);
1357
1358 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1359
1360 try {
1361 for (auto entry = start; entry < end; ++entry) {
1362 if (fDataSource->SetEntry(slot, entry)) {
1363 RunAndCheckFilters(slot, entry);
1364 }
1365 }
1366 } catch (...) {
1367 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1368 throw;
1369 }
1370#else
1371 (void)entryRange;
1372 (void)slotStack;
1373 (void)entryCount;
1374#endif
1375}
1376
1378 std::atomic<ULong64_t> &entryCount)
1379{
1380#ifdef R__USE_IMT
1382 const auto &slot = slotRAII.fSlot;
1383
1384 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1385 const auto &[start, end] = entryRange;
1386 const auto nEntries = end - start;
1387 auto count = entryCount.fetch_add(nEntries);
1388
1389 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1390 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1391
1393 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1394 try {
1395 // recursive call to check filters and conditionally execute actions
1397 if (fNewSampleNotifier.CheckFlag(slot)) {
1398 UpdateSampleInfo(slot, treeReader);
1399 }
1400 RunAndCheckFilters(slot, count++);
1401 }
1402 } catch (...) {
1403 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1404 throw;
1405 }
1406 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1407 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1408 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1409 // something went wrong in the TTreeReader event loop
1410 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1411 std::to_string(treeReader.GetEntryStatus()));
1412 }
1413#else
1414 (void)treeReader;
1415 (void)slotStack;
1416 (void)entryCount;
1417#endif
1418}
1419
1422
1423ROOT::Detail::RDF::RLoopManager::DeferredJitCall &ROOT::Detail::RDF::RLoopManager::DeferredJitCall::operator=(
1424 ROOT::Detail::RDF::RLoopManager::DeferredJitCall &&) noexcept = default;
1425
1426ROOT::Detail::RDF::RLoopManager::DeferredJitCall::~DeferredJitCall() = default;
1427
1429 std::size_t id, std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegisterPtr,
1430 const std::vector<std::string> &colNamesArg, std::shared_ptr<void> jittedNode, std::shared_ptr<void> argPtr)
1431 : fFunctionId(id),
1432 fColRegister(std::move(colRegisterPtr)),
1433 fColNames(colNamesArg),
1434 fJittedNode(jittedNode),
1435 fExtraArgs(argPtr)
1436{
1437 assert(fJittedNode != nullptr);
1438}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:359
#define R__LOG_INFO(...)
Definition RLogger.hxx:358
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
Basic types used by ROOT and required by TInterpreter.
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:148
#define gInterpreter
R__EXTERN TSystem * gSystem
Definition TSystem.h:582
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void RegisterJitHelperCall(const std::string &funcBody, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > colRegister, const std::vector< std::string > &colnames, std::shared_ptr< void > jittedNode, std::shared_ptr< void > argument=nullptr)
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::hash< std::string > fStringHasher
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
std::vector< DeferredJitCall > fJitHelperCalls
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
A binder for user-defined columns, variations and aliases.
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
Describe directory structure in memory.
Definition TDirectory.h:45
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition TDirectory.h:222
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3787
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:138
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1289
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:89
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9462
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:43
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:678
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:671
unsigned int GetNSlots()
Definition RDFUtils.cxx:402
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:660
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:204
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:689
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:446
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:683
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:669
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
DeferredJitCall(std::size_t id, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > cols, const std::vector< std::string > &colNamesArg, std::shared_ptr< void > jittedNode, std::shared_ptr< void > arg)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.