Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#include "RConfigure.h" // R__USE_IMT
10#include "ROOT/RDataSource.hxx"
12#include "ROOT/InternalTreeUtils.hxx" // GetTreeFullPaths
15#include "ROOT/RDF/RDefineReader.hxx" // RDefinesWithReaders
21#include "ROOT/RDF/RVariationReader.hxx" // RVariationsWithReaders
22#include "ROOT/RLogger.hxx"
23#include "ROOT/RNTuple.hxx"
24#include "ROOT/RNTupleDS.hxx"
25#include "RtypesCore.h" // Long64_t
26#include "TStopwatch.h"
27#include "TBranchElement.h"
28#include "TBranchObject.h"
29#include "TChain.h"
30#include "TEntryList.h"
31#include "TFile.h"
32#include "TFriendElement.h"
33#include "TInterpreter.h"
34#include "TROOT.h" // IsImplicitMTEnabled, gCoreMutex, R__*_LOCKGUARD
35#include "TTreeReader.h"
36#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
37
38#include "ROOT/RTTreeDS.hxx"
39
40#ifdef R__USE_IMT
43#include "ROOT/RSlotStack.hxx"
44#endif
45
46#ifdef R__UNIX
47// Functions needed to perform EOS XRootD redirection in ChangeSpec
48#include "TEnv.h"
49#include "TSystem.h"
50#ifndef R__FBSD
51#include <sys/xattr.h>
52#else
53#include <sys/extattr.h>
54#endif
55#ifdef R__MACOSX
56/* On macOS getxattr takes two extra arguments that should be set to 0 */
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
58#endif
59#ifdef R__FBSD
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
61#endif
62#endif
63
64#include <algorithm>
65#include <atomic>
66#include <cassert>
67#include <functional>
68#include <iostream>
69#include <memory>
70#include <stdexcept>
71#include <string>
72#include <sstream>
73#include <thread>
74#include <unordered_map>
75#include <vector>
76#include <set>
77#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
78
79using namespace ROOT::Detail::RDF;
80using namespace ROOT::Internal::RDF;
81
82namespace {
83/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
84/// This allows different RLoopManager instances to share these data.
85/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
86/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
87/// much faster than jitting each RLoopManager's code separately.
88std::string &GetCodeToJit()
89{
90 static std::string code;
91 return code;
92}
93
94std::string &GetCodeToDeclare()
95{
96 static std::string code;
97 return code;
98}
99
100// Signature of all helper functions that are created by JIT helpers, see
101// Book*Jit and JitBuildAction in RDFInterfaceUtils.cxx
102using JitHelperFunc_t = void (*)(const std::vector<std::string> &, ROOT::Internal::RDF::RColumnRegister &,
103 ROOT::Detail::RDF::RLoopManager &, void *, std::shared_ptr<void> *);
104std::unordered_map<std::size_t, JitHelperFunc_t> &GetJitHelperFuncMap()
105{
106 static std::unordered_map<std::size_t, JitHelperFunc_t> map;
107 return map;
108}
109std::unordered_map<std::size_t, std::size_t> &GetJitFuncBodyToFuncIdMap()
110{
111 static std::unordered_map<std::size_t, std::size_t> map;
112 return map;
113}
114
116{
117 // This function uses the interpreter and writes to the caches.
119
120 // Step 1: Declare the DeferredJitCall functions to the interpreter
121 // We use ProcessLine to ensure meta functionality (e.g. autoloading) is
122 // processed when needed.
123 // If instead we used Declare, builds with runtime_cxxmodules=OFF would fail
124 // in jitted actions with custom helpers with errors like:
125 // error: 'MyHelperType' is an incomplete type
126 // return std::make_unique<Action_t>(Helper_t(std::move(*h)), bl, std::move(prevNode), colRegister);
127 // ^
128 gInterpreter->ProcessLine(codeToDeclare.c_str());
129
130 // Step 2: Retrieve the declared functions as function pointers, cache them
131 // for later use in RunDeferredCalls
134 auto clinfo = gInterpreter->ClassInfo_Factory("R_rdf");
135 assert(gInterpreter->ClassInfo_IsValid(clinfo));
136
137 for (auto &codeAndId : funcBodyToFuncIdMap) {
138 if (auto it = funcIdToFuncPointersMap.find(codeAndId.second); it == funcIdToFuncPointersMap.end()) {
139 // fast fetch of the address via gInterpreter
140 // (faster than gInterpreter->Evaluate(function name, ret), ret->GetAsPointer())
141 // Retrieve the JIT helper function we registered via RegisterJitHelperCall
142 auto declid =
143 gInterpreter->GetFunction(clinfo, ("jitNodeRegistrator_" + std::to_string(codeAndId.second)).c_str());
144 assert(declid);
145 auto minfo = gInterpreter->MethodInfo_Factory(declid);
146 assert(gInterpreter->MethodInfo_IsValid(minfo));
147 auto mname = gInterpreter->MethodInfo_GetMangledName(minfo);
148 [[maybe_unused]] auto res = funcIdToFuncPointersMap.insert(
149 {codeAndId.second, reinterpret_cast<JitHelperFunc_t>(gInterpreter->FindSym(mname))});
150 assert(res.second);
151 gInterpreter->MethodInfo_Delete(minfo);
152 }
153 }
154 gInterpreter->ClassInfo_Delete(clinfo);
155}
156
157void ThrowIfNSlotsChanged(unsigned int nSlots)
158{
160 if (currentSlots != nSlots) {
161 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
162 std::to_string(nSlots) + ", but when starting the event loop it was " +
163 std::to_string(currentSlots) + ".";
164 if (currentSlots > nSlots)
165 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
166 else
167 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
168 throw std::runtime_error(msg);
169 }
170}
171
172/**
173\struct MaxTreeSizeRAII
174\brief Scope-bound change of `TTree::fgMaxTreeSize`.
175
176This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
177changes it to maximum at construction time and restores it back at destruction
178time. Needed for issue #6523 and should be reverted when #6640 will be solved.
179*/
180struct MaxTreeSizeRAII {
181 Long64_t fOldMaxTreeSize;
182
183 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
184 {
185 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
186 }
187
188 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
189};
190
191struct DatasetLogInfo {
192 std::string fDataSet;
193 ULong64_t fRangeStart;
194 ULong64_t fRangeEnd;
195 unsigned int fSlot;
196};
197
198std::string LogRangeProcessing(const DatasetLogInfo &info)
199{
200 std::stringstream msg;
201 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
202 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
203 return msg.str();
204}
205
206auto MakeDatasetColReadersKey(std::string_view colName, const std::type_info &ti)
207{
208 // We use a combination of column name and column type name as the key because in some cases we might end up
209 // with concrete readers that use different types for the same column, e.g. std::vector and RVec here:
210 // df.Sum<vector<int>>("stdVectorBranch");
211 // df.Sum<RVecI>("stdVectorBranch");
212 return std::string(colName) + ':' + ti.name();
213}
214
215/// \brief Check if object of a certain type is in the directory
216///
217/// Attempts to read an object of the specified type via TDirectory::Get, wraps
218/// it in a std::unique_ptr to avoid leaking the object.
219template <typename T>
220bool IsObjectInDir(std::string_view objName, TDirectory &dir)
221{
222 std::unique_ptr<T> o{dir.Get<T>(objName.data())};
223 return o.get();
224}
225
226/// \brief Check if a generic object is in the directory
227///
228/// Checks if a generic object is in the directory, uses TDirectory::GetKey
229/// to avoid having to deal with memory management of the object being read
230/// without having its type.
231template <>
232bool IsObjectInDir<void>(std::string_view objName, TDirectory &dir)
233{
234 return dir.GetKey(objName.data());
235}
236} // anonymous namespace
237
238/**
239 * \brief Helper function to open a file (or the first file from a glob).
240 * This function is used at construction time of an RDataFrame, to check the
241 * concrete type of the dataset stored inside the file.
242 */
243std::unique_ptr<TFile> OpenFileWithSanityChecks(std::string_view fileNameGlob)
244{
245 // Follow same logic in TChain::Add to find the correct string to look for globbing:
246 // - If the extension ".root" is present in the file name, pass along the basename.
247 // - If not, use the "?" token to delimit the part of the string which represents the basename.
248 // - Otherwise, pass the full filename.
249 auto &&baseNameAndQuery = [&fileNameGlob]() {
250 constexpr std::string_view delim{".root"};
251 if (auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
252 it != fileNameGlob.end()) {
253 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
254 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
255 } else if (auto &&lastQuestionMark = fileNameGlob.find_last_of('?'); lastQuestionMark != std::string_view::npos)
256 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
257 else
258 return std::make_pair(fileNameGlob, std::string_view{});
259 }();
260 // Captured structured bindings variable are only valid since C++20
261 auto &&baseName = baseNameAndQuery.first;
262 auto &&query = baseNameAndQuery.second;
263
264 std::string fileToOpen{fileNameGlob};
265 if (baseName.find_first_of("[]*?") != std::string_view::npos) { // Wildcards accepted by TChain::Add
266 const auto expanded = ROOT::Internal::TreeUtils::ExpandGlob(std::string{baseName});
267 if (expanded.empty())
268 throw std::invalid_argument{"RDataFrame: The glob expression '" + std::string{baseName} +
269 "' did not match any files."};
270
271 fileToOpen = expanded.front() + std::string{query};
272 }
273
274 ::TDirectory::TContext ctxt; // Avoid changing gDirectory;
275 std::unique_ptr<TFile> inFile{TFile::Open(fileToOpen.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")};
276 if (!inFile || inFile->IsZombie())
277 throw std::invalid_argument("RDataFrame: could not open file \"" + fileToOpen + "\".");
278
279 return inFile;
280}
281
282namespace ROOT {
283namespace Detail {
284namespace RDF {
285
286/// A RAII object that calls RLoopManager::CleanUpTask at destruction
298
299} // namespace RDF
300} // namespace Detail
301} // namespace ROOT
302
303ROOT::Detail::RDF::RLoopManager::RLoopManager(const ROOT::Detail::RDF::ColumnNames_t &defaultColumns)
304 : fDefaultColumns(defaultColumns),
305 fNSlots(RDFInternal::GetNSlots()),
306 fNewSampleNotifier(fNSlots),
307 fSampleInfos(fNSlots),
308 fDatasetColumnReaders(fNSlots)
309{
310}
311
313 : fDefaultColumns(defaultBranches),
314 fNSlots(RDFInternal::GetNSlots()),
315 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
316 fDataSource(std::make_unique<ROOT::Internal::RDF::RTTreeDS>(ROOT::Internal::RDF::MakeAliasedSharedPtr(tree))),
317 fNewSampleNotifier(fNSlots),
318 fSampleInfos(fNSlots),
319 fDatasetColumnReaders(fNSlots)
320{
321 fDataSource->SetNSlots(fNSlots);
322}
323
325 : fEmptyEntryRange(0, nEmptyEntries),
326 fNSlots(RDFInternal::GetNSlots()),
327 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles),
328 fNewSampleNotifier(fNSlots),
329 fSampleInfos(fNSlots),
330 fDatasetColumnReaders(fNSlots)
331{
332}
333
334RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
335 : fDefaultColumns(defaultBranches),
336 fNSlots(RDFInternal::GetNSlots()),
337 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
338 fDataSource(std::move(ds)),
339 fNewSampleNotifier(fNSlots),
340 fSampleInfos(fNSlots),
341 fDatasetColumnReaders(fNSlots)
342{
343 fDataSource->SetNSlots(fNSlots);
344}
345
347 : fNSlots(RDFInternal::GetNSlots()),
348 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
349 fNewSampleNotifier(fNSlots),
350 fSampleInfos(fNSlots),
351 fDatasetColumnReaders(fNSlots)
352{
353 ChangeSpec(std::move(spec));
354}
355
356#ifdef R__UNIX
357namespace {
358std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
359{
360 // Mimick the redirection done in TFile::Open to see if the path points to a FUSE-mounted EOS path.
361 // If so, we create a redirected sample ID with the full xroot URL.
362 TString expandedUrl(path.data());
364 if (gEnv->GetValue("TFile.CrossProtocolRedirects", 1) == 1) {
365 TUrl fileurl(expandedUrl, /* default is file */ kTRUE);
366 if (strcmp(fileurl.GetProtocol(), "file") == 0) {
367 ssize_t len = getxattr(fileurl.GetFile(), "eos.url.xroot", nullptr, 0);
368 if (len > 0) {
369 std::string xurl(len, 0);
370 std::string fileNameFromUrl{fileurl.GetFile()};
371 if (getxattr(fileNameFromUrl.c_str(), "eos.url.xroot", &xurl[0], len) == len) {
372 // Sometimes the `getxattr` call may return an invalid URL due
373 // to the POSIX attribute not being yet completely filled by EOS.
374 if (auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of("/") + 1);
375 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
376 return xurl + '/' + datasetName.data();
377 }
378 }
379 }
380 }
381 }
382
383 return std::nullopt;
384}
385} // namespace
386#endif
387
388/**
389 * @brief Changes the internal TTree held by the RLoopManager.
390 *
391 * @warning This method may lead to potentially dangerous interactions if used
392 * after the construction of the RDataFrame. Changing the specification makes
393 * sense *if and only if* the schema of the dataset is *unchanged*, i.e. the
394 * new specification refers to exactly the same number of columns, with the
395 * same names and types. The actual use case of this method is moving the
396 * processing of the same RDataFrame to a different range of entries of the
397 * same dataset (which may be stored in a different set of files).
398 *
399 * @param spec The specification of the dataset to be adopted.
400 */
402{
403 auto filesVec = spec.GetFileNameGlobs();
405 filesVec[0]); // we only need the first file, we assume all files are either TTree or RNTuple
406 auto datasetName = spec.GetTreeNames();
407
408 // Change the range of entries to be processed
409 fBeginEntry = spec.GetEntryRangeBegin();
410 fEndEntry = spec.GetEntryRangeEnd();
411
412 // Store the samples
413 fSamples = spec.MoveOutSamples();
414 fSampleMap.clear();
415
418
419 if (isTTree || isRNTuple) {
420
421 if (isTTree) {
422 // Create the internal main chain
424 for (auto &sample : fSamples) {
425 const auto &trees = sample.GetTreeNames();
426 const auto &files = sample.GetFileNameGlobs();
427 for (std::size_t i = 0ul; i < files.size(); ++i) {
428 // We need to use `<filename>?#<treename>` as an argument to TChain::Add
429 // (see https://github.com/root-project/root/pull/8820 for why)
430 const auto fullpath = files[i] + "?#" + trees[i];
431 chain->Add(fullpath.c_str());
432 // ...but instead we use `<filename>/<treename>` as a sample ID (cannot
433 // change this easily because of backward compatibility: the sample ID
434 // is exposed to users via RSampleInfo and DefinePerSample).
435 const auto sampleId = files[i] + '/' + trees[i];
436 fSampleMap.insert({sampleId, &sample});
437#ifdef R__UNIX
438 // Also add redirected EOS xroot URL when available
440 fSampleMap.insert({redirectedSampleId.value(), &sample});
441#endif
442 }
443 }
444 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
445 } else if (isRNTuple) {
446
447 std::vector<std::string> fileNames;
448 std::set<std::string> rntupleNames;
449
450 for (auto &sample : fSamples) {
451 const auto &trees = sample.GetTreeNames();
452 const auto &files = sample.GetFileNameGlobs();
453 for (std::size_t i = 0ul; i < files.size(); ++i) {
454 const auto sampleId = files[i] + '/' + trees[i];
455 fSampleMap.insert({sampleId, &sample});
456 fileNames.push_back(files[i]);
457 rntupleNames.insert(trees[i]);
458
459#ifdef R__UNIX
460 // Also add redirected EOS xroot URL when available
462 fSampleMap.insert({redirectedSampleId.value(), &sample});
463#endif
464 }
465 }
466
467 if (rntupleNames.size() == 1) {
468 fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
469
470 } else {
471 throw std::runtime_error(
472 "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
473 }
474 }
475
476 fDataSource->SetNSlots(fNSlots);
477
478 for (unsigned int slot{}; slot < fNSlots; slot++) {
479 for (auto &v : fDatasetColumnReaders[slot])
480 v.second.reset();
481 }
482 } else {
483 std::string errMsg =
484 IsObjectInDir<void>(datasetName[0].data(), *inFile) ? "unsupported data format for" : "cannot find";
485 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName[0]) + "\" in file \"" +
486 inFile->GetName() + "\".");
487 }
488}
489
490/// Run event loop with no source files, in parallel.
492{
493#ifdef R__USE_IMT
494 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack = SlotStack();
495 // Working with an empty tree.
496 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
497 const auto nEmptyEntries = GetNEmptyEntries();
498 const auto nEntriesPerSlot = nEmptyEntries / (fNSlots * 2);
499 auto remainder = nEmptyEntries % (fNSlots * 2);
500 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
501 ULong64_t begin = fEmptyEntryRange.first;
502 while (begin < fEmptyEntryRange.second) {
503 ULong64_t end = begin + nEntriesPerSlot;
504 if (remainder > 0) {
505 ++end;
506 --remainder;
507 }
508 entryRanges.emplace_back(begin, end);
509 begin = end;
510 }
511
512 // Each task will generate a subrange of entries
513 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
515 auto slot = slotRAII.fSlot;
516 RCallCleanUpTask cleanup(*this, slot);
517 InitNodeSlots(nullptr, slot);
518 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
519 try {
521 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
523 }
524 } catch (...) {
525 // Error might throw in experiment frameworks like CMSSW
526 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
527 throw;
528 }
529 };
530
532 pool.Foreach(genFunction, entryRanges);
533
534#endif // not implemented otherwise
535}
536
537/// Run event loop with no source files, in sequence.
539{
540 InitNodeSlots(nullptr, 0);
542 {"an empty source", fEmptyEntryRange.first, fEmptyEntryRange.second, 0u});
543 RCallCleanUpTask cleanup(*this);
544 try {
549 }
550 } catch (...) {
551 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
552 throw;
553 }
554}
555
556#ifdef R__USE_IMT
557namespace {
558/// Return true on succesful entry read.
559///
560/// TTreeReader encodes successful reads in the `kEntryValid` enum value, but
561/// there can be other situations where the read is still valid. For now, these
562/// are:
563/// - If there was no match of the current entry in one or more friend trees
564/// according to their respective indexes.
565/// - If there was a missing branch at the start of a new tree in the dataset.
566///
567/// In such situations, although the entry is not complete, the processing
568/// should not be aborted and nodes of the computation graph will take action
569/// accordingly.
571{
572 treeReader.Next();
573 switch (treeReader.GetEntryStatus()) {
574 case TTreeReader::kEntryValid: return true;
575 case TTreeReader::kIndexedFriendNoMatch: return true;
577 default: return false;
578 }
579}
580} // namespace
581#endif
582
583namespace {
584struct DSRunRAII {
586 DSRunRAII(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
587 {
589 }
590 ~DSRunRAII() { fDS.Finalize(); }
591};
592} // namespace
593
596 unsigned int fSlot;
599 TTreeReader *treeReader = nullptr)
600 : fLM(lm), fSlot(slot), fTreeReader(treeReader)
601 {
602 fLM.InitNodeSlots(fTreeReader, fSlot);
603 fLM.GetDataSource()->InitSlot(fSlot, firstEntry);
604 }
606};
607
608/// Run event loop over data accessed through a DataSource, in sequence.
610{
611 assert(fDataSource != nullptr);
612 // Shortcut if the entry range would result in not reading anything
613 if (fBeginEntry == fEndEntry)
614 return;
615 // Apply global entry range if necessary
616 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
617 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
618 // Initialize data source and book finalization
620 // Ensure cleanup task is always called at the end. Notably, this also resets the column readers for those data
621 // sources that need it (currently only TTree).
622 RCallCleanUpTask cleanup(*this);
623
624 // Main event loop. We start with an empty vector of ranges because we need to initialize the nodes and the data
625 // source before the first call to GetEntryRanges, since it could trigger reading (currently only happens with
626 // TTree).
627 std::uint64_t processedEntries{};
628 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
629 do {
630
632
633 ranges = fDataSource->GetEntryRanges();
634
636
637 try {
638 for (const auto &range : ranges) {
639 const auto start = range.first;
640 const auto end = range.second;
641 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
642 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
643 if (fDataSource->SetEntry(0u, entry)) {
645 }
647 }
648 }
649 } catch (...) {
650 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
651 throw;
652 }
653
654 } while (!ranges.empty() && fNStopsReceived < fNChildren);
655
657
658 if (fEndEntry != std::numeric_limits<Long64_t>::max() &&
659 static_cast<std::uint64_t>(fEndEntry - fBeginEntry) > processedEntries) {
660 std::ostringstream buf{};
661 buf << "RDataFrame stopped processing after ";
662 buf << processedEntries;
663 buf << " entries, whereas an entry range (begin=";
664 buf << fBeginEntry;
665 buf << ",end=";
666 buf << fEndEntry;
667 buf << ") was requested. Consider adjusting the end value of the entry range to a maximum of ";
668 buf << (fBeginEntry + processedEntries);
669 buf << ".";
670 Warning("RDataFrame::Run", "%s", buf.str().c_str());
671 }
672}
673
674/// Run event loop over data accessed through a DataSource, in parallel.
676{
677#ifdef R__USE_IMT
678 assert(fDataSource != nullptr);
679 // Shortcut if the entry range would result in not reading anything
680 if (fBeginEntry == fEndEntry)
681 return;
682 // Apply global entry range if necessary
683 if (fBeginEntry != 0 || fEndEntry != std::numeric_limits<Long64_t>::max())
684 fDataSource->SetGlobalEntryRange(std::make_pair<std::uint64_t, std::uint64_t>(fBeginEntry, fEndEntry));
685
687
689
690#endif // not implemented otherwise (never called)
691}
692
693/// Execute actions and make sure named filters are called for each event.
694/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
696{
697 // data-block callbacks run before the rest of the graph
699 for (auto &callback : fSampleCallbacks)
700 callback.second(slot, fSampleInfos[slot]);
702 }
703
704 for (auto *actionPtr : fBookedActions)
705 actionPtr->Run(slot, entry);
707 namedFilterPtr->CheckFilters(slot, entry);
708 for (auto &callback : fCallbacksEveryNEvents)
709 callback(slot);
710}
711
712/// Build TTreeReaderValues for all nodes
713/// This method loops over all filters, actions and other booked objects and
714/// calls their `InitSlot` method, to get them ready for running a task.
716{
718 for (auto *ptr : fBookedActions)
719 ptr->InitSlot(r, slot);
720 for (auto *ptr : fBookedFilters)
721 ptr->InitSlot(r, slot);
722 for (auto *ptr : fBookedDefines)
723 ptr->InitSlot(r, slot);
724 for (auto *ptr : fBookedVariations)
725 ptr->InitSlot(r, slot);
726
727 for (auto &callback : fCallbacksOnce)
728 callback(slot);
729}
730
732 if (r != nullptr) {
733 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
734 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
735 fNewSampleNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
736 }
737 // Whatever the data source, initially set the "new data block" flag:
738 // - for TChains, this ensures that we don't skip the first data block because
739 // the correct tree is already loaded
740 // - for RDataSources and empty sources, which currently don't have data blocks, this
741 // ensures that we run once per task
743}
744
745void RLoopManager::UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range) {
747 "Empty source, range: {" + std::to_string(range.first) + ", " + std::to_string(range.second) + "}", range);
748}
749
751 // one GetTree to retrieve the TChain, another to retrieve the underlying TTree
752 auto *tree = r.GetTree()->GetTree();
753 R__ASSERT(tree != nullptr);
754 const std::string treename = ROOT::Internal::TreeUtils::GetTreeFullPaths(*tree)[0];
755 auto *file = tree->GetCurrentFile();
756 const std::string fname = file != nullptr ? file->GetName() : "#inmemorytree#";
757
758 std::pair<Long64_t, Long64_t> range = r.GetEntriesRange();
759 R__ASSERT(range.first >= 0);
760 if (range.second == -1) {
761 range.second = tree->GetEntries(); // convert '-1', i.e. 'until the end', to the actual entry number
762 }
763 // If the tree is stored in a subdirectory, treename will be the full path to it starting with the root directory '/'
764 const std::string &id = fname + (treename.rfind('/', 0) == 0 ? "" : "/") + treename;
765 if (fSampleMap.empty()) {
766 fSampleInfos[slot] = RSampleInfo(id, range, nullptr, tree->GetEntries());
767 } else {
768 if (fSampleMap.find(id) == fSampleMap.end())
769 throw std::runtime_error("Full sample identifier '" + id + "' cannot be found in the available samples.");
770 fSampleInfos[slot] = RSampleInfo(id, range, fSampleMap[id], tree->GetEntries());
771 }
772}
773
774/// Create a slot stack with the desired number of slots or reuse a shared instance.
775/// When a LoopManager runs in isolation, it will create its own slot stack from the
776/// number of slots. When it runs as part of RunGraphs(), each loop manager will be
777/// assigned a shared slot stack, so dataframe helpers can be shared in a thread-safe
778/// manner.
779std::shared_ptr<ROOT::Internal::RSlotStack> RLoopManager::SlotStack() const
780{
781#ifdef R__USE_IMT
782 if (auto shared = fSlotStack.lock(); shared) {
783 return shared;
784 } else {
785 return std::make_shared<ROOT::Internal::RSlotStack>(fNSlots);
786 }
787#else
788 return nullptr;
789#endif
790}
791
792/// Initialize all nodes of the functional graph before running the event loop.
793/// This method is called once per event-loop and performs generic initialization
794/// operations that do not depend on the specific processing slot (i.e. operations
795/// that are common for all threads).
797{
799 for (auto *filter : fBookedFilters)
800 filter->InitNode();
801 for (auto *range : fBookedRanges)
802 range->InitNode();
803 for (auto *ptr : fBookedActions)
804 ptr->Initialize();
805}
806
807/// Perform clean-up operations. To be called at the end of each event loop.
809{
810 fMustRunNamedFilters = false;
811
812 // forget RActions and detach TResultProxies
813 for (auto *ptr : fBookedActions)
814 ptr->Finalize();
815
816 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
817 fBookedActions.clear();
818
819 // reset children counts
820 fNChildren = 0;
821 fNStopsReceived = 0;
822 for (auto *ptr : fBookedFilters)
823 ptr->ResetChildrenCount();
824 for (auto *ptr : fBookedRanges)
825 ptr->ResetChildrenCount();
826
828 fCallbacksOnce.clear();
829}
830
831/// Perform clean-up operations. To be called at the end of each task execution.
833{
834 if (r != nullptr)
835 fNewSampleNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
836 for (auto *ptr : fBookedActions)
837 ptr->FinalizeSlot(slot);
838 for (auto *ptr : fBookedFilters)
839 ptr->FinalizeSlot(slot);
840 for (auto *ptr : fBookedDefines)
841 ptr->FinalizeSlot(slot);
842
843 if (auto ds = GetDataSource(); ds && ds->GetLabel() == "TTreeDS") {
844 // we are reading from a tree/chain and we need to re-create the RTreeColumnReaders at every task
845 // because the TTreeReader object changes at every task
846 for (auto &v : fDatasetColumnReaders[slot])
847 v.second.reset();
848 }
849}
850
851/// Add RDF nodes that require just-in-time compilation to the computation graph.
852/// This method also clears the contents of GetCodeToJit().
854{
855 {
857 if (GetCodeToJit().empty() && GetCodeToDeclare().empty()) {
859 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
860 return;
861 }
862 }
863
864 std::string codeToDeclare, code;
865 {
868 code.swap(GetCodeToJit());
869 };
870
871 TStopwatch s;
872 s.Start();
873 if (!codeToDeclare.empty()) {
875 }
876 if (!code.empty()) {
877 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
878 }
879 s.Stop();
880 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
881 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds."
882 : " in less than 1ms.");
883
885}
886
888{
889 if (!fJitHelperCalls.empty()) {
890 // funcMap is not thread-safe
892 TStopwatch s;
893 s.Start();
894 const auto &funcMap = GetJitHelperFuncMap();
895 for (auto &call : fJitHelperCalls) {
896 funcMap.at(call.fFunctionId)(call.fColNames, *call.fColRegister, *this, call.fJittedNode.get(),
897 &call.fExtraArgs);
898 }
899 s.Stop();
900 const auto realTime = s.RealTime();
901 R__LOG_INFO(RDFLogChannel()) << fJitHelperCalls.size() << " deferred calls completed"
902 << (realTime > 1e-3 ? " in " + std::to_string(realTime) + " seconds."
903 : " in less than 1ms.");
904 // Promoting to write lock to clear the vector
906 fJitHelperCalls.clear();
907 }
908}
909
910/// Trigger counting of number of children nodes for each node of the functional graph.
911/// This is done once before starting the event loop. Each action sends an `increase children count` signal
912/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
913/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
914/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
915/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
917{
918 for (auto *actionPtr : fBookedActions)
919 actionPtr->TriggerChildrenCount();
921 namedFilterPtr->TriggerChildrenCount();
922}
923
924/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
925/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
926/// The jitting phase is skipped if the `jit` parameter is `false` (unsafe, use with care).
928{
929 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
930 MaxTreeSizeRAII ctxtmts;
931
932 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
933
935
936 if (jit)
937 Jit();
938
939 // Called here since in a RunGraphs run, multiple RLoopManager runs could be
940 // triggered from different threads.
942
943 InitNodes();
944
945 // Exceptions can occur during the event loop. In order to ensure proper cleanup of nodes
946 // we use RAII: even in case of an exception, the destructor of the object is invoked and
947 // all the cleanup takes place.
948 class NodesCleanerRAII {
950
951 public:
953 ~NodesCleanerRAII() { fRLM.CleanUpNodes(); }
954 };
955
957
958 TStopwatch s;
959 s.Start();
960
961 switch (fLoopType) {
963 throw std::runtime_error("RDataFrame: executing the computation graph without a data source, aborting.");
964 break;
967 case ELoopType::kNoFiles: RunEmptySource(); break;
969 }
970 s.Stop();
971
972 fNRuns++;
973
974 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
975 << s.RealTime() << "s elapsed).";
976}
977
978/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
983
989
996
998{
999 fBookedFilters.emplace_back(filterPtr);
1000 if (filterPtr->HasName()) {
1001 fBookedNamedFilters.emplace_back(filterPtr);
1002 fMustRunNamedFilters = true;
1003 }
1004}
1005
1011
1013{
1014 fBookedRanges.emplace_back(rangePtr);
1015}
1016
1021
1023{
1024 fBookedDefines.emplace_back(ptr);
1025}
1026
1032
1037
1042
1043// dummy call, end of recursive chain of calls
1045{
1046 return true;
1047}
1048
1049/// Call `FillReport` on all booked filters
1051{
1052 for (const auto *fPtr : fBookedNamedFilters)
1053 fPtr->FillReport(rep);
1054}
1055
1056void RLoopManager::ToJitExec(const std::string &code) const
1057{
1059 GetCodeToJit().append(code);
1060}
1061
1063 std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegister,
1064 const std::vector<std::string> &colNames, std::shared_ptr<void> jittedNode,
1065 std::shared_ptr<void> argument)
1066{
1068 {
1070 auto match = funcBodyToFuncIdMap.find(fStringHasher(funcBody));
1071 if (match != funcBodyToFuncIdMap.end()) {
1072 R__WRITE_LOCKGUARD(ROOT::gCoreMutex); // modifying fJitHelperCalls
1073 std::string funcName = "jitNodeRegistrator_" + std::to_string(match->second);
1074 R__LOG_DEBUG(0, RDFLogChannel()) << "JIT helper " << funcName << " was already registered.";
1075 fJitHelperCalls.emplace_back(match->second, std::move(colRegister), colNames, jittedNode, argument);
1076 return;
1077 }
1078 }
1079
1080 {
1081 // Register lazily a JIT helper
1083 auto registratorId = funcBodyToFuncIdMap.size();
1084 std::string funcName = "jitNodeRegistrator_" + std::to_string(registratorId);
1086 assert(res.second);
1087
1088 std::string toDeclare = "namespace R_rdf {\n void " + funcName + funcBody + "\n}\n";
1089 R__LOG_DEBUG(0, RDFLogChannel()) << "Registering deferred JIT helper:\n" << toDeclare;
1090
1091 GetCodeToDeclare().append(toDeclare);
1093 }
1094}
1095
1096void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
1097{
1098 if (everyNEvents == 0ull)
1099 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
1100 else
1101 fCallbacksEveryNEvents.emplace_back(everyNEvents, std::move(f), fNSlots);
1102}
1103
1104std::vector<std::string> RLoopManager::GetFiltersNames()
1105{
1106 std::vector<std::string> filters;
1107 for (auto *filter : fBookedFilters) {
1108 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
1109 filters.push_back(name);
1110 }
1111 return filters;
1112}
1113
1114std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
1115{
1116 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
1117 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
1118 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
1119 return nodes;
1120}
1121
1122std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
1123{
1124 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
1125 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
1126 std::copy(fRunActions.begin(), fRunActions.end(), it);
1127 return actions;
1128}
1129
1130std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph(
1131 std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1132{
1133 // If there is already a node for the RLoopManager return it. If there is not, return a new one.
1134 auto duplicateRLoopManagerIt = visitedMap.find((void *)this);
1136 return duplicateRLoopManagerIt->second;
1137
1138 std::string name;
1139 if (fDataSource) {
1140 name = fDataSource->GetLabel();
1141 } else {
1142 name = "Empty source\\nEntries: " + std::to_string(GetNEmptyEntries());
1143 }
1144 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1146 visitedMap[(void *)this] = thisNode;
1147 return thisNode;
1148}
1149
1150/// Return true if AddDataSourceColumnReaders was called for column name col.
1151bool RLoopManager::HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
1152{
1153 const auto key = MakeDatasetColReadersKey(col, ti);
1154 assert(fDataSource != nullptr);
1155 // since data source column readers are always added for all slots at the same time,
1156 // if the reader is present for slot 0 we have it for all other slots as well.
1157 auto it = fDatasetColumnReaders[0].find(key);
1158 return (it != fDatasetColumnReaders[0].end() && it->second);
1159}
1160
1162 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1163 const std::type_info &ti)
1164{
1165 const auto key = MakeDatasetColReadersKey(col, ti);
1166 assert(fDataSource != nullptr && !HasDataSourceColumnReaders(col, ti));
1167 assert(readers.size() == fNSlots);
1168
1169 for (auto slot = 0u; slot < fNSlots; ++slot) {
1170 fDatasetColumnReaders[slot][key] = std::move(readers[slot]);
1171 }
1172}
1173
1175 const std::type_info &ti, TTreeReader *treeReader)
1176{
1178 const auto key = MakeDatasetColReadersKey(col, ti);
1179 // if a reader for this column and this slot was already there, we are doing something wrong
1180 assert(readers.find(key) == readers.end() || readers[key] == nullptr);
1181 assert(fDataSource && "Missing RDataSource to add column reader.");
1182
1184
1185 return readers[key].get();
1186}
1187
1189RLoopManager::GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
1190{
1191 const auto key = MakeDatasetColReadersKey(col, ti);
1192 if (auto it = fDatasetColumnReaders[slot].find(key); it != fDatasetColumnReaders[slot].end() && it->second)
1193 return it->second.get();
1194 else
1195 return nullptr;
1196}
1197
1199{
1200 if (callback)
1201 fSampleCallbacks.insert({nodePtr, std::move(callback)});
1202}
1203
1204void RLoopManager::SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange)
1205{
1206 fEmptyEntryRange = std::move(newRange);
1207}
1208
1210{
1211 fBeginEntry = begin;
1212 fEndEntry = end;
1213}
1214
1216{
1217 fTTreeLifeline = std::move(lifeline);
1218}
1219
1220std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1223{
1224 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1225 // Creating an RDataFrame with a non-existing file will throw early rather
1226 // than wait for the start of the graph execution.
1227 if (checkFile) {
1229 }
1230
1231 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1232 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1233 return lm;
1234}
1235
1236std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1237ROOT::Detail::RDF::CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1238 const std::vector<std::string> &defaultColumns, bool checkFile)
1239{
1240 if (fileNameGlobs.size() == 0)
1241 throw std::invalid_argument("RDataFrame: empty list of input files.");
1242 // Introduce the same behaviour as in CreateLMFromFile for consistency.
1243 // Creating an RDataFrame with a non-existing file will throw early rather
1244 // than wait for the start of the graph execution.
1245 if (checkFile) {
1247 }
1248 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1249 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1250 return lm;
1251}
1252
1253std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1256{
1257 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1258 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1259 return lm;
1260}
1261
1262std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1263ROOT::Detail::RDF::CreateLMFromRNTuple(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1265{
1266 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1267 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1268 return lm;
1269}
1270
1271std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1274{
1275
1277
1279 return CreateLMFromTTree(datasetName, fileNameGlob, defaultColumns, /*checkFile=*/false);
1282 }
1283
1284 std::string errMsg = IsObjectInDir<void>(datasetName, *inFile) ? "unsupported data format for" : "cannot find";
1285
1286 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName) + "\" in file \"" +
1287 inFile->GetName() + "\".");
1288}
1289
1290std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1291ROOT::Detail::RDF::CreateLMFromFile(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
1293{
1294
1295 if (fileNameGlobs.size() == 0)
1296 throw std::invalid_argument("RDataFrame: empty list of input files.");
1297
1299
1301 return CreateLMFromTTree(datasetName, fileNameGlobs, defaultColumns, /*checkFile=*/false);
1304 }
1305
1306 std::string errMsg = IsObjectInDir<void>(datasetName, *inFile) ? "unsupported data format for" : "cannot find";
1307
1308 throw std::invalid_argument("RDataFrame: " + errMsg + " dataset \"" + std::string(datasetName) + "\" in file \"" +
1309 inFile->GetName() + "\".");
1310}
1311
1312// outlined to pin virtual table
1314
1315void ROOT::Detail::RDF::RLoopManager::SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource)
1316{
1317 if (dataSource) {
1318 fDataSource = std::move(dataSource);
1319 fDataSource->SetNSlots(fNSlots);
1320 fLoopType = ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource;
1321 }
1322}
1323
1324void ROOT::Detail::RDF::RLoopManager::DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange,
1326 std::atomic<ULong64_t> &entryCount)
1327{
1328#ifdef R__USE_IMT
1330 const auto &slot = slotRAII.fSlot;
1331
1332 const auto &[start, end] = entryRange;
1333 const auto nEntries = end - start;
1334 entryCount.fetch_add(nEntries);
1335
1336 RCallCleanUpTask cleanup(*this, slot);
1337 RDSRangeRAII _{*this, slot, start};
1338
1339 fSampleInfos[slot] = ROOT::Internal::RDF::CreateSampleInfo(*fDataSource, slot, fSampleMap);
1340
1341 R__LOG_DEBUG(0, RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
1342
1343 try {
1344 for (auto entry = start; entry < end; ++entry) {
1345 if (fDataSource->SetEntry(slot, entry)) {
1346 RunAndCheckFilters(slot, entry);
1347 }
1348 }
1349 } catch (...) {
1350 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1351 throw;
1352 }
1353 fDataSource->FinalizeSlot(slot);
1354#else
1355 (void)entryRange;
1356 (void)slotStack;
1357 (void)entryCount;
1358#endif
1359}
1360
1362 std::atomic<ULong64_t> &entryCount)
1363{
1364#ifdef R__USE_IMT
1366 const auto &slot = slotRAII.fSlot;
1367
1368 const auto entryRange = treeReader.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
1369 const auto &[start, end] = entryRange;
1370 const auto nEntries = end - start;
1371 auto count = entryCount.fetch_add(nEntries);
1372
1373 RDSRangeRAII _{*this, slot, static_cast<ULong64_t>(start), &treeReader};
1374 RCallCleanUpTask cleanup(*this, slot, &treeReader);
1375
1377 {fDataSource->GetLabel(), static_cast<ULong64_t>(start), static_cast<ULong64_t>(end), slot});
1378 try {
1379 // recursive call to check filters and conditionally execute actions
1381 if (fNewSampleNotifier.CheckFlag(slot)) {
1382 UpdateSampleInfo(slot, treeReader);
1383 }
1384 RunAndCheckFilters(slot, count++);
1385 }
1386 } catch (...) {
1387 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
1388 throw;
1389 }
1390 // fNStopsReceived < fNChildren is always true at the moment as we don't support event loop early quitting in
1391 // multi-thread runs, but it costs nothing to be safe and future-proof in case we add support for that later.
1392 if (treeReader.GetEntryStatus() != TTreeReader::kEntryBeyondEnd && fNStopsReceived < fNChildren) {
1393 // something went wrong in the TTreeReader event loop
1394 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
1395 std::to_string(treeReader.GetEntryStatus()));
1396 }
1397#else
1398 (void)treeReader;
1399 (void)slotStack;
1400 (void)entryCount;
1401#endif
1402}
1403
1406
1407ROOT::Detail::RDF::RLoopManager::DeferredJitCall &ROOT::Detail::RDF::RLoopManager::DeferredJitCall::operator=(
1408 ROOT::Detail::RDF::RLoopManager::DeferredJitCall &&) noexcept = default;
1409
1410ROOT::Detail::RDF::RLoopManager::DeferredJitCall::~DeferredJitCall() = default;
1411
1413 std::size_t id, std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegisterPtr,
1414 const std::vector<std::string> &colNamesArg, std::shared_ptr<void> jittedNode, std::shared_ptr<void> argPtr)
1415 : fFunctionId(id),
1416 fColRegister(std::move(colRegisterPtr)),
1417 fColNames(colNamesArg),
1418 fJittedNode(jittedNode),
1419 fExtraArgs(argPtr)
1420{
1421 assert(fJittedNode != nullptr);
1422}
#define R__LOG_DEBUG(DEBUGLEVEL,...)
Definition RLogger.hxx:360
#define R__LOG_INFO(...)
Definition RLogger.hxx:359
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
Basic types used by ROOT and required by TInterpreter.
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
const char * filters[]
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:148
#define gInterpreter
R__EXTERN TSystem * gSystem
Definition TSystem.h:582
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
#define _(A, B)
Definition cfortran.h:108
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void RegisterJitHelperCall(const std::string &funcBody, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > colRegister, const std::vector< std::string > &colnames, std::shared_ptr< void > jittedNode, std::shared_ptr< void > argument=nullptr)
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
std::hash< std::string > fStringHasher
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
std::vector< DeferredJitCall > fJitHelperCalls
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:47
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:46
A binder for user-defined columns, variations and aliases.
bool CheckFlag(unsigned int slot) const
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual void InitSlot(unsigned int, ULong64_t)
Convenience method called at the start of the data processing associated to a slot.
virtual void FinalizeSlot(unsigned int)
Convenience method called at the end of the data processing associated to a slot.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
Describe directory structure in memory.
Definition TDirectory.h:45
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition TDirectory.h:222
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:503
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3787
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
Basic string class.
Definition TString.h:138
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Definition TSystem.cxx:1285
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
Definition TTree.h:89
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9462
This class represents a WWW compatible URL.
Definition TUrl.h:33
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
Definition RDFUtils.cxx:43
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
Definition RDFUtils.cxx:678
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
Definition RDFUtils.cxx:671
unsigned int GetNSlots()
Definition RDFUtils.cxx:402
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
Definition RDFUtils.cxx:660
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Definition Utils.hxx:204
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
Definition RDFUtils.cxx:689
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
Definition RDFUtils.cxx:446
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
Definition RDFUtils.cxx:683
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:675
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
DeferredJitCall(std::size_t id, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > cols, const std::vector< std::string > &colNamesArg, std::shared_ptr< void > jittedNode, std::shared_ptr< void > arg)
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
A RAII object to pop and push slot numbers from a RSlotStack object.