Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RLoopManager.cxx
Go to the documentation of this file.
1#include "RConfigure.h" // R__USE_IMT
9#include "ROOT/RLogger.hxx"
10#include "RtypesCore.h" // Long64_t
11#include "TStopwatch.h"
12#include "TBranchElement.h"
13#include "TBranchObject.h"
14#include "TChain.h"
15#include "TEntryList.h"
16#include "TFile.h"
17#include "TFriendElement.h"
18#include "TInterpreter.h"
19#include "TROOT.h" // IsImplicitMTEnabled
20#include "TTreeReader.h"
21#include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
22
23#ifdef R__USE_IMT
26#endif
27
28#include <algorithm>
29#include <atomic>
30#include <exception>
31#include <functional>
32#include <iostream>
33#include <memory>
34#include <stdexcept>
35#include <string>
36#include <sstream>
37#include <thread>
38#include <unordered_map>
39#include <vector>
40#include <set>
41#include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
42
43using namespace ROOT::Detail::RDF;
44using namespace ROOT::Internal::RDF;
45
46namespace {
47/// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
48/// This allows different RLoopManager instances to share these data.
49/// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
50/// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
51/// much faster than jitting each RLoopManager's code separately.
52static std::string &GetCodeToJit()
53{
54 static std::string code;
55 return code;
56}
57
58static bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
59{
60 return (leaves.find(leaf) != leaves.end());
61}
62
63///////////////////////////////////////////////////////////////////////////////
64/// This overload does not perform any check on the duplicates.
65/// It is used for TBranch objects.
66static void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
67 const std::string &friendName)
68{
69
70 if (!friendName.empty()) {
71 // In case of a friend tree, users might prepend its name/alias to the branch names
72 const auto friendBName = friendName + "." + branchName;
73 if (bNamesReg.insert(friendBName).second)
74 bNames.push_back(friendBName);
75 }
76
77 if (bNamesReg.insert(branchName).second)
78 bNames.push_back(branchName);
79}
80
81///////////////////////////////////////////////////////////////////////////////
82/// This overloads makes sure that the TLeaf has not been already inserted.
83static void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
84 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf, bool allowDuplicates)
85{
86 const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
87 if (!canAdd) {
88 return;
89 }
90
91 UpdateList(bNamesReg, bNames, branchName, friendName);
92
93 foundLeaves.insert(leaf);
94}
95
96static void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
97 std::string prefix, std::string &friendName)
98{
99 for (auto sb : *b->GetListOfBranches()) {
100 TBranch *subBranch = static_cast<TBranch *>(sb);
101 auto subBranchName = std::string(subBranch->GetName());
102 auto fullName = prefix + subBranchName;
103
104 std::string newPrefix;
105 if (!prefix.empty())
106 newPrefix = fullName + ".";
107
108 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName);
109
110 auto branchDirectlyFromTree = t.GetBranch(fullName.c_str());
111 if (!branchDirectlyFromTree)
112 branchDirectlyFromTree = t.FindBranch(fullName.c_str()); // try harder
113 if (branchDirectlyFromTree)
114 UpdateList(bNamesReg, bNames, std::string(branchDirectlyFromTree->GetFullName()), friendName);
115
116 if (t.GetBranch(subBranchName.c_str()))
117 UpdateList(bNamesReg, bNames, subBranchName, friendName);
118 }
119}
120
121static void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
122 std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
123{
124 std::set<TLeaf *> foundLeaves;
125 if (!analysedTrees.insert(&t).second) {
126 return;
127 }
128
129 const auto branches = t.GetListOfBranches();
130 // Getting the branches here triggered the read of the first file of the chain if t is a chain.
131 // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
132 // operations
133 if (!t.GetTree()) {
134 std::string err("GetBranchNames: error in opening the tree ");
135 err += t.GetName();
136 throw std::runtime_error(err);
137 }
138 if (branches) {
139 for (auto b : *branches) {
140 TBranch *branch = static_cast<TBranch *>(b);
141 const auto branchName = std::string(branch->GetName());
142 if (branch->IsA() == TBranch::Class()) {
143 // Leaf list
144 auto listOfLeaves = branch->GetListOfLeaves();
145 if (listOfLeaves->GetEntries() == 1) {
146 auto leaf = static_cast<TLeaf *>(listOfLeaves->UncheckedAt(0));
147 UpdateList(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
148 }
149
150 for (auto leaf : *listOfLeaves) {
151 auto castLeaf = static_cast<TLeaf *>(leaf);
152 const auto leafName = std::string(leaf->GetName());
153 const auto fullName = branchName + "." + leafName;
154 UpdateList(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
155 }
156 } else if (branch->IsA() == TBranchObject::Class()) {
157 // TBranchObject
158 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName);
159 UpdateList(bNamesReg, bNames, branchName, friendName);
160 } else {
161 // TBranchElement
162 // Check if there is explicit or implicit dot in the name
163
164 bool dotIsImplied = false;
165 auto be = dynamic_cast<TBranchElement *>(b);
166 if (!be)
167 throw std::runtime_error("GetBranchNames: unsupported branch type");
168 // TClonesArray (3) and STL collection (4)
169 if (be->GetType() == 3 || be->GetType() == 4)
170 dotIsImplied = true;
171
172 if (dotIsImplied || branchName.back() == '.')
173 ExploreBranch(t, bNamesReg, bNames, branch, "", friendName);
174 else
175 ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName);
176
177 UpdateList(bNamesReg, bNames, branchName, friendName);
178 }
179 }
180 }
181
182 // The list of friends needs to be accessed via GetTree()->GetListOfFriends()
183 // (and not via GetListOfFriends() directly), otherwise when `t` is a TChain we
184 // might not recover the list correctly (https://github.com/root-project/root/issues/6741).
185 auto friendTrees = t.GetTree()->GetListOfFriends();
186
187 if (!friendTrees)
188 return;
189
190 for (auto friendTreeObj : *friendTrees) {
191 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
192
193 std::string frName;
194 auto alias = t.GetFriendAlias(friendTree);
195 if (alias != nullptr)
196 frName = std::string(alias);
197 else
198 frName = std::string(friendTree->GetName());
199
200 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
201 }
202}
203
204static void ThrowIfNSlotsChanged(unsigned int nSlots)
205{
206 const auto currentSlots = RDFInternal::GetNSlots();
207 if (currentSlots != nSlots) {
208 std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
209 std::to_string(nSlots) + ", but when starting the event loop it was " +
210 std::to_string(currentSlots) + ".";
211 if (currentSlots > nSlots)
212 msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
213 else
214 msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
215 throw std::runtime_error(msg);
216 }
217}
218
219/**
220\struct MaxTreeSizeRAII
221\brief Scope-bound change of `TTree::fgMaxTreeSize`.
222
223This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
224changes it to maximum at construction time and restores it back at destruction
225time. Needed for issue #6523 and should be reverted when #6640 will be solved.
226*/
227struct MaxTreeSizeRAII {
228 Long64_t fOldMaxTreeSize;
229
230 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
231 {
232 TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
233 }
234
235 ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
236};
237
238struct DatasetLogInfo {
239 std::string fDataSet;
240 ULong64_t fRangeStart;
241 ULong64_t fRangeEnd;
242 unsigned int fSlot;
243};
244
245std::string LogRangeProcessing(const DatasetLogInfo &info)
246{
247 std::stringstream msg;
248 msg << "Processing " << info.fDataSet << ": entry range [" << info.fRangeStart << "," << info.fRangeEnd - 1
249 << "], using slot " << info.fSlot << " in thread " << std::this_thread::get_id() << '.';
250 return msg.str();
251}
252
253DatasetLogInfo TreeDatasetLogInfo(const TTreeReader &r, unsigned int slot)
254{
255 const auto tree = r.GetTree();
256 const auto chain = dynamic_cast<TChain *>(tree);
257 std::string what;
258 if (chain) {
259 auto files = chain->GetListOfFiles();
260 std::vector<std::string> treeNames;
261 std::vector<std::string> fileNames;
262 for (TObject *f : *files) {
263 treeNames.emplace_back(f->GetName());
264 fileNames.emplace_back(f->GetTitle());
265 }
266 what = "trees {";
267 for (const auto &t : treeNames) {
268 what += t + ",";
269 }
270 what.back() = '}';
271 what += " in files {";
272 for (const auto &f : fileNames) {
273 what += f + ",";
274 }
275 what.back() = '}';
276 } else {
277 const auto treeName = tree->GetName();
278 what = std::string("tree \"") + treeName + "\"";
279 const auto file = tree->GetCurrentFile();
280 if (file)
281 what += std::string(" in file \"") + file->GetName() + "\"";
282 }
283 const auto entryRange = r.GetEntriesRange();
284 const ULong64_t end = entryRange.second == -1ll ? tree->GetEntries() : entryRange.second;
285 return {std::move(what), static_cast<ULong64_t>(entryRange.first), end, slot};
286}
287
288} // anonymous namespace
289
290namespace ROOT {
291namespace Detail {
292namespace RDF {
293
294/// A RAII object that calls RLoopManager::CleanUpTask at destruction
295struct RCallCleanUpTask {
296 RLoopManager &fLoopManager;
297 unsigned int fArg;
298 TTreeReader *fReader;
299
300 RCallCleanUpTask(RLoopManager &lm, unsigned int arg = 0u, TTreeReader *reader = nullptr)
301 : fLoopManager(lm), fArg(arg), fReader(reader)
302 {
303 }
304 ~RCallCleanUpTask() { fLoopManager.CleanUpTask(fReader, fArg); }
305};
306
307} // namespace RDF
308} // namespace Detail
309} // namespace ROOT
310
311///////////////////////////////////////////////////////////////////////////////
312/// Get all the branches names, including the ones of the friend trees
314{
315 std::set<std::string> bNamesSet;
316 ColumnNames_t bNames;
317 std::set<TTree *> analysedTrees;
318 std::string emptyFrName = "";
319 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
320 return bNames;
321}
322
324 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
325 fNSlots(RDFInternal::GetNSlots()),
326 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles),
327 fDataBlockNotifier(fNSlots)
328{
329}
330
332 : fNEmptyEntries(nEmptyEntries), fNSlots(RDFInternal::GetNSlots()),
333 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles), fDataBlockNotifier(fNSlots)
334{
335}
336
337RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
338 : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
339 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
340 fDataSource(std::move(ds)), fDataBlockNotifier(fNSlots)
341{
342 fDataSource->SetNSlots(fNSlots);
343}
344
345struct RSlotRAII {
346 RSlotStack &fSlotStack;
347 unsigned int fSlot;
348 RSlotRAII(RSlotStack &slotStack) : fSlotStack(slotStack), fSlot(slotStack.GetSlot()) {}
349 ~RSlotRAII() { fSlotStack.ReturnSlot(fSlot); }
350};
351
352/// Run event loop with no source files, in parallel.
354{
355#ifdef R__USE_IMT
356 RSlotStack slotStack(fNSlots);
357 // Working with an empty tree.
358 // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
359 const auto nEntriesPerSlot = fNEmptyEntries / (fNSlots * 2);
360 auto remainder = fNEmptyEntries % (fNSlots * 2);
361 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
362 ULong64_t start = 0;
363 while (start < fNEmptyEntries) {
364 ULong64_t end = start + nEntriesPerSlot;
365 if (remainder > 0) {
366 ++end;
367 --remainder;
368 }
369 entryRanges.emplace_back(start, end);
370 start = end;
371 }
372
373 // Each task will generate a subrange of entries
374 auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
375 RSlotRAII slotRAII(slotStack);
376 auto slot = slotRAII.fSlot;
377 RCallCleanUpTask cleanup(*this, slot);
378 InitNodeSlots(nullptr, slot);
379 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({"an empty source", range.first, range.second, slot});
380 try {
381 for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
382 RunAndCheckFilters(slot, currEntry);
383 }
384 } catch (...) {
385 // Error might throw in experiment frameworks like CMSSW
386 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
387 throw;
388 }
389 };
390
392 pool.Foreach(genFunction, entryRanges);
393
394#endif // not implemented otherwise
395}
396
397/// Run event loop with no source files, in sequence.
399{
400 InitNodeSlots(nullptr, 0);
401 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({"an empty source", 0, fNEmptyEntries, 0u});
402 RCallCleanUpTask cleanup(*this);
403 try {
404 for (ULong64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
405 RunAndCheckFilters(0, currEntry);
406 }
407 } catch (...) {
408 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
409 throw;
410 }
411}
412
413/// Run event loop over one or multiple ROOT files, in parallel.
415{
416#ifdef R__USE_IMT
417 RSlotStack slotStack(fNSlots);
418 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
419 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots);
420
421 std::atomic<ULong64_t> entryCount(0ull);
422
423 tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
424 RSlotRAII slotRAII(slotStack);
425 auto slot = slotRAII.fSlot;
426 RCallCleanUpTask cleanup(*this, slot, &r);
427 InitNodeSlots(&r, slot);
428 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, slot));
429 const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
430 const auto nEntries = entryRange.second - entryRange.first;
431 auto count = entryCount.fetch_add(nEntries);
432 try {
433 // recursive call to check filters and conditionally execute actions
434 while (r.Next()) {
435 RunAndCheckFilters(slot, count++);
436 }
437 } catch (...) {
438 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
439 throw;
440 }
441 });
442#endif // no-op otherwise (will not be called)
443}
444
445/// Run event loop over one or multiple ROOT files, in sequence.
447{
448 TTreeReader r(fTree.get(), fTree->GetEntryList());
449 if (0 == fTree->GetEntriesFast())
450 return;
451 RCallCleanUpTask cleanup(*this, 0u, &r);
452 InitNodeSlots(&r, 0);
453 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing(TreeDatasetLogInfo(r, 0u));
454
455 // recursive call to check filters and conditionally execute actions
456 // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
457 try {
458 while (r.Next() && fNStopsReceived < fNChildren) {
459 RunAndCheckFilters(0, r.GetCurrentEntry());
460 }
461 } catch (...) {
462 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
463 throw;
464 }
465 if (r.GetEntryStatus() != TTreeReader::kEntryNotFound && fNStopsReceived < fNChildren) {
466 // something went wrong in the TTreeReader event loop
467 throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
468 std::to_string(r.GetEntryStatus()));
469 }
470}
471
472/// Run event loop over data accessed through a DataSource, in sequence.
474{
475 R__ASSERT(fDataSource != nullptr);
476 fDataSource->Initialise();
477 auto ranges = fDataSource->GetEntryRanges();
478 while (!ranges.empty() && fNStopsReceived < fNChildren) {
479 InitNodeSlots(nullptr, 0u);
480 fDataSource->InitSlot(0u, 0ull);
481 RCallCleanUpTask cleanup(*this);
482 try {
483 for (const auto &range : ranges) {
484 const auto start = range.first;
485 const auto end = range.second;
486 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, 0u});
487 for (auto entry = start; entry < end && fNStopsReceived < fNChildren; ++entry) {
488 if (fDataSource->SetEntry(0u, entry)) {
489 RunAndCheckFilters(0u, entry);
490 }
491 }
492 }
493 } catch (...) {
494 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
495 throw;
496 }
497 fDataSource->FinaliseSlot(0u);
498 ranges = fDataSource->GetEntryRanges();
499 }
500 fDataSource->Finalise();
501}
502
503/// Run event loop over data accessed through a DataSource, in parallel.
505{
506#ifdef R__USE_IMT
507 R__ASSERT(fDataSource != nullptr);
508 RSlotStack slotStack(fNSlots);
510
511 // Each task works on a subrange of entries
512 auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
513 RSlotRAII slotRAII(slotStack);
514 const auto slot = slotRAII.fSlot;
515 InitNodeSlots(nullptr, slot);
516 RCallCleanUpTask cleanup(*this, slot);
517 fDataSource->InitSlot(slot, range.first);
518 const auto start = range.first;
519 const auto end = range.second;
520 R__LOG_INFO(RDFLogChannel()) << LogRangeProcessing({fDataSource->GetLabel(), start, end, slot});
521 try {
522 for (auto entry = start; entry < end; ++entry) {
523 if (fDataSource->SetEntry(slot, entry)) {
524 RunAndCheckFilters(slot, entry);
525 }
526 }
527 } catch (...) {
528 std::cerr << "RDataFrame::Run: event loop was interrupted\n";
529 throw;
530 }
531 fDataSource->FinaliseSlot(slot);
532 };
533
534 fDataSource->Initialise();
535 auto ranges = fDataSource->GetEntryRanges();
536 while (!ranges.empty()) {
537 pool.Foreach(runOnRange, ranges);
538 ranges = fDataSource->GetEntryRanges();
539 }
540 fDataSource->Finalise();
541#endif // not implemented otherwise (never called)
542}
543
544/// Execute actions and make sure named filters are called for each event.
545/// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
546void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
547{
548 // data-block callbacks run before the rest of the graph
549 if (fDataBlockNotifier.CheckFlag(slot)) {
550 for (auto &callback : fDataBlockCallbacks) {
551 callback(slot);
552 }
554 }
555
556 for (auto &actionPtr : fBookedActions)
557 actionPtr->Run(slot, entry);
558 for (auto &namedFilterPtr : fBookedNamedFilters)
559 namedFilterPtr->CheckFilters(slot, entry);
560 for (auto &callback : fCallbacks)
561 callback(slot);
562}
563
564/// Build TTreeReaderValues for all nodes
565/// This method loops over all filters, actions and other booked objects and
566/// calls their `InitSlot` method, to get them ready for running a task.
568{
570 for (auto &ptr : fBookedActions)
571 ptr->InitSlot(r, slot);
572 for (auto &ptr : fBookedFilters)
573 ptr->InitSlot(r, slot);
574 for (auto &callback : fCallbacksOnce)
575 callback(slot);
576}
577
579 if (r != nullptr) {
580 // we need to set a notifier so that we run the callbacks every time we switch to a new TTree
581 // `PrependLink` inserts this notifier into the TTree/TChain's linked list of notifiers
582 fDataBlockNotifier.GetChainNotifyLink(slot).PrependLink(*r->GetTree());
583 }
584 // Whatever the data source, initially set the "new data block" flag:
585 // - for TChains, this ensures that we don't skip the first data block because
586 // the correct tree is already loaded
587 // - for RDataSources and empty sources, which currently don't have data blocks, this
588 // ensures that we run once per task
590}
591
592/// Initialize all nodes of the functional graph before running the event loop.
593/// This method is called once per event-loop and performs generic initialization
594/// operations that do not depend on the specific processing slot (i.e. operations
595/// that are common for all threads).
597{
599 for (auto &filter : fBookedFilters)
600 filter->InitNode();
601 for (auto &range : fBookedRanges)
602 range->InitNode();
603 for (auto &ptr : fBookedActions)
604 ptr->Initialize();
605}
606
607/// Perform clean-up operations. To be called at the end of each event loop.
609{
610 fMustRunNamedFilters = false;
611
612 // forget RActions and detach TResultProxies
613 for (auto &ptr : fBookedActions)
614 ptr->Finalize();
615
616 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
617 fBookedActions.clear();
618
619 // reset children counts
620 fNChildren = 0;
621 fNStopsReceived = 0;
622 for (auto &ptr : fBookedFilters)
623 ptr->ResetChildrenCount();
624 for (auto &ptr : fBookedRanges)
625 ptr->ResetChildrenCount();
626
627 fCallbacks.clear();
628 fCallbacksOnce.clear();
629 fDataBlockCallbacks.clear();
630}
631
632/// Perform clean-up operations. To be called at the end of each task execution.
633void RLoopManager::CleanUpTask(TTreeReader *r, unsigned int slot)
634{
635 if (r != nullptr)
636 fDataBlockNotifier.GetChainNotifyLink(slot).RemoveLink(*r->GetTree());
637 for (auto &ptr : fBookedActions)
638 ptr->FinalizeSlot(slot);
639 for (auto &ptr : fBookedFilters)
640 ptr->FinaliseSlot(slot);
641}
642
643/// Add RDF nodes that require just-in-time compilation to the computation graph.
644/// This method also clears the contents of GetCodeToJit().
646{
647 // TODO this should be a read lock unless we find GetCodeToJit non-empty
649
650 const std::string code = std::move(GetCodeToJit());
651 if (code.empty()) {
652 R__LOG_INFO(RDFLogChannel()) << "Nothing to jit and execute.";
653 return;
654 }
655
656 TStopwatch s;
657 s.Start();
658 RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
659 s.Stop();
660 R__LOG_INFO(RDFLogChannel()) << "Just-in-time compilation phase completed"
661 << (s.RealTime() > 1e-3 ? " in " + std::to_string(s.RealTime()) + " seconds." : ".");
662}
663
664/// Trigger counting of number of children nodes for each node of the functional graph.
665/// This is done once before starting the event loop. Each action sends an `increase children count` signal
666/// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
667/// children counter. Each node only propagates the signal once, even if it receives it multiple times.
668/// Named filters also send an `increase children count` signal, just like actions, as they always execute during
669/// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
671{
672 for (auto &actionPtr : fBookedActions)
673 actionPtr->TriggerChildrenCount();
674 for (auto &namedFilterPtr : fBookedNamedFilters)
675 namedFilterPtr->TriggerChildrenCount();
676}
677
678/// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
679/// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
681{
682 // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
683 MaxTreeSizeRAII ctxtmts;
684
685 R__LOG_INFO(RDFLogChannel()) << "Starting event loop number " << fNRuns << '.';
686
687 ThrowIfNSlotsChanged(GetNSlots());
688
689 Jit();
690
691 InitNodes();
692
693 TStopwatch s;
694 s.Start();
695 switch (fLoopType) {
699 case ELoopType::kNoFiles: RunEmptySource(); break;
702 }
703 s.Stop();
704
705 CleanUpNodes();
706
707 fNRuns++;
708
709 R__LOG_INFO(RDFLogChannel()) << "Finished event loop number " << fNRuns - 1 << " (" << s.CpuTime() << "s CPU, "
710 << s.RealTime() << "s elapsed).";
711}
712
713/// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
715{
716 return fDefaultColumns;
717}
718
720{
721 return fTree.get();
722}
723
725{
726 fBookedActions.emplace_back(actionPtr);
727}
728
730{
731 RDFInternal::Erase(actionPtr, fRunActions);
732 RDFInternal::Erase(actionPtr, fBookedActions);
733}
734
736{
737 fBookedFilters.emplace_back(filterPtr);
738 if (filterPtr->HasName()) {
739 fBookedNamedFilters.emplace_back(filterPtr);
741 }
742}
743
745{
746 RDFInternal::Erase(filterPtr, fBookedFilters);
747 RDFInternal::Erase(filterPtr, fBookedNamedFilters);
748}
749
751{
752 fBookedRanges.emplace_back(rangePtr);
753}
754
756{
757 RDFInternal::Erase(rangePtr, fBookedRanges);
758}
759
760// dummy call, end of recursive chain of calls
762{
763 return true;
764}
765
766/// Call `FillReport` on all booked filters
768{
769 for (const auto &fPtr : fBookedNamedFilters)
770 fPtr->FillReport(rep);
771}
772
773void RLoopManager::ToJitExec(const std::string &code) const
774{
776 GetCodeToJit().append(code);
777}
778
779void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
780{
781 if (everyNEvents == 0ull)
782 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
783 else
784 fCallbacks.emplace_back(everyNEvents, std::move(f), fNSlots);
785}
786
787std::vector<std::string> RLoopManager::GetFiltersNames()
788{
789 std::vector<std::string> filters;
790 for (auto &filter : fBookedFilters) {
791 auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
792 filters.push_back(name);
793 }
794 return filters;
795}
796
797std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
798{
799 std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
800 auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
801 std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
802 return nodes;
803}
804
805std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
806{
807 std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
808 auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
809 std::copy(fRunActions.begin(), fRunActions.end(), it);
810 return actions;
811}
812
813std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph()
814{
815 std::string name;
816 if (fDataSource) {
817 name = fDataSource->GetLabel();
818 } else if (fTree) {
819 name = fTree->GetName();
820 } else {
821 name = std::to_string(fNEmptyEntries);
822 }
823
824 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(name);
825 thisNode->SetRoot();
826 thisNode->SetCounter(0);
827 return thisNode;
828}
829
830////////////////////////////////////////////////////////////////////////////
831/// Return all valid TTree::Branch names (caching results for subsequent calls).
832/// Never use fBranchNames directy, always request it through this method.
834{
835 if (fValidBranchNames.empty() && fTree) {
836 fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
837 }
838 return fValidBranchNames;
839}
840
841bool RLoopManager::HasDSValuePtrs(const std::string &col) const
842{
843 return fDSValuePtrMap.find(col) != fDSValuePtrMap.end();
844}
845
846void RLoopManager::AddDSValuePtrs(const std::string &col, const std::vector<void *> ptrs)
847{
848 fDSValuePtrMap[col] = ptrs;
849}
850
851void RLoopManager::AddDataBlockCallback(std::function<void(unsigned int)> &&callback)
852{
853 if (callback)
854 fDataBlockCallbacks.emplace_back(std::move(callback));
855}
ROOT::R::TRInterface & r
Definition Object.C:4
#define R__LOG_INFO(...)
Definition RLogger.hxx:364
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
long long Long64_t
Definition RtypesCore.h:73
unsigned long long ULong64_t
Definition RtypesCore.h:74
#define R__ASSERT(e)
Definition TError.h:120
const char * filters[]
char name[80]
Definition TGX11.cxx:110
R__EXTERN TVirtualMutex * gROOTMutex
Definition TROOT.h:63
#define R__LOCKGUARD(mutex)
The head node of a RDF computation graph.
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
void SetupDataBlockCallbacks(TTreeReader *r, unsigned int slot)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::vector< RRangeBase * > fBookedRanges
std::vector< TCallback > fCallbacks
Registered callbacks.
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
std::vector< RFilterBase * > fBookedFilters
RDFInternal::RDataBlockNotifier fDataBlockNotifier
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
std::vector< Callback_t > fDataBlockCallbacks
Registered callbacks to call at the beginning of each "data block".
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
bool HasDSValuePtrs(const std::string &col) const
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
const ColumnNames_t fDefaultColumns
void Book(RDFInternal::RActionBase *actionPtr)
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void AddDataBlockCallback(std::function< void(unsigned int)> &&callback)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition RNodeBase.hxx:45
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition RNodeBase.hxx:44
TNotifyLink< RDataBlockFlag > & GetChainNotifyLink(unsigned int slot)
bool CheckFlag(unsigned int slot) const
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
void ReturnSlot(unsigned int slotNumber)
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A Branch for the case of an object.
A TTree is a list of TBranches.
Definition TBranch.h:89
TObjArray * GetListOfLeaves()
Definition TBranch.h:243
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TObjArray * GetListOfFiles() const
Definition TChain.h:107
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition TLeaf.h:57
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:37
Stopwatch class.
Definition TStopwatch.h:28
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
@ kEntryNotFound
the tree entry number does not exist
A TTree represents a columnar dataset.
Definition TTree.h:79
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition TTree.cxx:4828
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition TTree.cxx:5275
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition TTree.cxx:9156
virtual TObjArray * GetListOfBranches()
Definition TTree.h:485
virtual TTree * GetTree() const
Definition TTree.h:514
virtual TList * GetListOfFriends() const
Definition TTree.h:487
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition TTree.cxx:6013
std::vector< std::string > ColumnNames_t
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
unsigned int GetNSlots()
Definition RDFUtils.cxx:286
Long64_t InterpreterCalc(const std::string &code, const std::string &context)
Definition RDFUtils.cxx:330
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition TROOT.cxx:556
Definition file.py:1
Definition tree.py:1
static const char * what
Definition stlLoader.cc:6