Logo ROOT  
Reference Guide
RLoopManager.cxx
Go to the documentation of this file.
1 #include "RConfigure.h" // R__USE_IMT
2 #include "ROOT/RDataSource.hxx"
3 #include "ROOT/RDF/GraphNode.hxx"
9 #include "RtypesCore.h" // Long64_t
10 #include "TBranchElement.h"
11 #include "TBranchObject.h"
12 #include "TEntryList.h"
13 #include "TFriendElement.h"
14 #include "TInterpreter.h"
15 #include "TROOT.h" // IsImplicitMTEnabled
16 #include "TTreeReader.h"
17 #include "TTree.h" // For MaxTreeSizeRAII. Revert when #6640 will be solved.
18 
19 #ifdef R__USE_IMT
20 #include "ROOT/TThreadExecutor.hxx"
22 #endif
23 
24 #include <algorithm>
25 #include <atomic>
26 #include <exception>
27 #include <functional>
28 #include <iostream>
29 #include <memory>
30 #include <stdexcept>
31 #include <string>
32 #include <unordered_map>
33 #include <vector>
34 #include <set>
35 #include <limits> // For MaxTreeSizeRAII. Revert when #6640 will be solved.
36 
37 using namespace ROOT::Detail::RDF;
38 using namespace ROOT::Internal::RDF;
39 
40 namespace {
41 /// A helper function that returns all RDF code that is currently scheduled for just-in-time compilation.
42 /// This allows different RLoopManager instances to share these data.
43 /// We want RLoopManagers to be able to add their code to a global "code to execute via cling",
44 /// so that, lazily, we can jit everything that's needed by all RDFs in one go, which is potentially
45 /// much faster than jitting each RLoopManager's code separately.
46 static std::string &GetCodeToJit()
47 {
48  static std::string code;
49  return code;
50 }
51 
52 static bool ContainsLeaf(const std::set<TLeaf *> &leaves, TLeaf *leaf)
53 {
54  return (leaves.find(leaf) != leaves.end());
55 }
56 
57 ///////////////////////////////////////////////////////////////////////////////
58 /// This overload does not perform any check on the duplicates.
59 /// It is used for TBranch objects.
60 static void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
61  const std::string &friendName)
62 {
63 
64  if (!friendName.empty()) {
65  // In case of a friend tree, users might prepend its name/alias to the branch names
66  const auto friendBName = friendName + "." + branchName;
67  if (bNamesReg.insert(friendBName).second)
68  bNames.push_back(friendBName);
69  }
70 
71  if (bNamesReg.insert(branchName).second)
72  bNames.push_back(branchName);
73 }
74 
75 ///////////////////////////////////////////////////////////////////////////////
76 /// This overloads makes sure that the TLeaf has not been already inserted.
77 static void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames, const std::string &branchName,
78  const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf, bool allowDuplicates)
79 {
80  const bool canAdd = allowDuplicates ? true : !ContainsLeaf(foundLeaves, leaf);
81  if (!canAdd) {
82  return;
83  }
84 
85  UpdateList(bNamesReg, bNames, branchName, friendName);
86 
87  foundLeaves.insert(leaf);
88 }
89 
90 static void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b,
91  std::string prefix, std::string &friendName)
92 {
93  for (auto sb : *b->GetListOfBranches()) {
94  TBranch *subBranch = static_cast<TBranch *>(sb);
95  auto subBranchName = std::string(subBranch->GetName());
96  auto fullName = prefix + subBranchName;
97 
98  std::string newPrefix;
99  if (!prefix.empty())
100  newPrefix = fullName + ".";
101 
102  ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName);
103 
104  if (t.GetBranch(fullName.c_str()) || t.FindBranch(fullName.c_str()))
105  UpdateList(bNamesReg, bNames, fullName, friendName);
106 
107  if (t.GetBranch(subBranchName.c_str()))
108  UpdateList(bNamesReg, bNames, subBranchName, friendName);
109  }
110 }
111 
112 static void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
113  std::set<TTree *> &analysedTrees, std::string &friendName, bool allowDuplicates)
114 {
115  std::set<TLeaf *> foundLeaves;
116  if (!analysedTrees.insert(&t).second) {
117  return;
118  }
119 
120  const auto branches = t.GetListOfBranches();
121  // Getting the branches here triggered the read of the first file of the chain if t is a chain.
122  // We check if a tree has been successfully read, otherwise we throw (see ROOT-9984) to avoid further
123  // operations
124  if (!t.GetTree()) {
125  std::string err("GetBranchNames: error in opening the tree ");
126  err += t.GetName();
127  throw std::runtime_error(err);
128  }
129  if (branches) {
130  for (auto b : *branches) {
131  TBranch *branch = static_cast<TBranch *>(b);
132  const auto branchName = std::string(branch->GetName());
133  if (branch->IsA() == TBranch::Class()) {
134  // Leaf list
135  auto listOfLeaves = branch->GetListOfLeaves();
136  if (listOfLeaves->GetEntries() == 1) {
137  auto leaf = static_cast<TLeaf *>(listOfLeaves->At(0));
138  const auto leafName = std::string(leaf->GetName());
139  if (leafName == branchName) {
140  UpdateList(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
141  }
142  }
143 
144  for (auto leaf : *listOfLeaves) {
145  auto castLeaf = static_cast<TLeaf *>(leaf);
146  const auto leafName = std::string(leaf->GetName());
147  const auto fullName = branchName + "." + leafName;
148  UpdateList(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
149  }
150  } else if (branch->IsA() == TBranchObject::Class()) {
151  // TBranchObject
152  ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName);
153  UpdateList(bNamesReg, bNames, branchName, friendName);
154  } else {
155  // TBranchElement
156  // Check if there is explicit or implicit dot in the name
157 
158  bool dotIsImplied = false;
159  auto be = dynamic_cast<TBranchElement *>(b);
160  if (!be)
161  throw std::runtime_error("GetBranchNames: unsupported branch type");
162  // TClonesArray (3) and STL collection (4)
163  if (be->GetType() == 3 || be->GetType() == 4)
164  dotIsImplied = true;
165 
166  if (dotIsImplied || branchName.back() == '.')
167  ExploreBranch(t, bNamesReg, bNames, branch, "", friendName);
168  else
169  ExploreBranch(t, bNamesReg, bNames, branch, branchName + ".", friendName);
170 
171  UpdateList(bNamesReg, bNames, branchName, friendName);
172  }
173  }
174  }
175 
176  auto friendTrees = t.GetListOfFriends();
177 
178  if (!friendTrees)
179  return;
180 
181  for (auto friendTreeObj : *friendTrees) {
182  auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
183 
184  std::string frName;
185  auto alias = t.GetFriendAlias(friendTree);
186  if (alias != nullptr)
187  frName = std::string(alias);
188  else
189  frName = std::string(friendTree->GetName());
190 
191  GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
192  }
193 }
194 
195 static void ThrowIfNSlotsChanged(unsigned int nSlots)
196 {
197  const auto currentSlots = RDFInternal::GetNSlots();
198  if (currentSlots != nSlots) {
199  std::string msg = "RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
200  std::to_string(nSlots) + ", but when starting the event loop it was " +
201  std::to_string(currentSlots) + ".";
202  if (currentSlots > nSlots)
203  msg += " Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
204  else
205  msg += " Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
206  throw std::runtime_error(msg);
207  }
208 }
209 
210 /**
211 \struct MaxTreeSizeRAII
212 \brief Scope-bound change of `TTree::fgMaxTreeSize`.
213 
214 This RAII object stores the current value result of `TTree::GetMaxTreeSize`,
215 changes it to maximum at construction time and restores it back at destruction
216 time. Needed for issue #6523 and should be reverted when #6640 will be solved.
217 */
218 struct MaxTreeSizeRAII {
219  Long64_t fOldMaxTreeSize;
220 
221  MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
222  {
223  TTree::SetMaxTreeSize(std::numeric_limits<Long64_t>::max());
224  }
225 
226  ~MaxTreeSizeRAII() { TTree::SetMaxTreeSize(fOldMaxTreeSize); }
227 };
228 
229 } // anonymous namespace
230 
231 ///////////////////////////////////////////////////////////////////////////////
232 /// Get all the branches names, including the ones of the friend trees
234 {
235  std::set<std::string> bNamesSet;
236  ColumnNames_t bNames;
237  std::set<TTree *> analysedTrees;
238  std::string emptyFrName = "";
239  GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
240  return bNames;
241 }
242 
244  : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
245  fNSlots(RDFInternal::GetNSlots()),
246  fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles)
247 {
248 }
249 
251  : fNEmptyEntries(nEmptyEntries), fNSlots(RDFInternal::GetNSlots()),
252  fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles)
253 {
254 }
255 
256 RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches)
257  : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
258  fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
259  fDataSource(std::move(ds))
260 {
261  fDataSource->SetNSlots(fNSlots);
262 }
263 
264 // ROOT-9559: we cannot handle indexed friends
266 {
267  auto friends = fTree->GetListOfFriends();
268  if (!friends)
269  return;
270  for (auto friendElObj : *friends) {
271  auto friendEl = static_cast<TFriendElement *>(friendElObj);
272  auto friendTree = friendEl->GetTree();
273  if (friendTree && friendTree->GetTreeIndex()) {
274  std::string err = fTree->GetName();
275  err += " has a friend, \"";
276  err += friendTree->GetName();
277  err += "\", which has an index. This is not supported.";
278  throw std::runtime_error(err);
279  }
280  }
281 }
282 
283 struct RSlotRAII {
284  RSlotStack &fSlotStack;
285  unsigned int fSlot;
286  RSlotRAII(RSlotStack &slotStack) : fSlotStack(slotStack), fSlot(slotStack.GetSlot()) {}
287  ~RSlotRAII() { fSlotStack.ReturnSlot(fSlot); }
288 };
289 
290 /// Run event loop with no source files, in parallel.
292 {
293 #ifdef R__USE_IMT
294  RSlotStack slotStack(fNSlots);
295  // Working with an empty tree.
296  // Evenly partition the entries according to fNSlots. Produce around 2 tasks per slot.
297  const auto nEntriesPerSlot = fNEmptyEntries / (fNSlots * 2);
298  auto remainder = fNEmptyEntries % (fNSlots * 2);
299  std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
300  ULong64_t start = 0;
301  while (start < fNEmptyEntries) {
302  ULong64_t end = start + nEntriesPerSlot;
303  if (remainder > 0) {
304  ++end;
305  --remainder;
306  }
307  entryRanges.emplace_back(start, end);
308  start = end;
309  }
310 
311  // Each task will generate a subrange of entries
312  auto genFunction = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
313  RSlotRAII slotRAII(slotStack);
314  auto slot = slotRAII.fSlot;
315  InitNodeSlots(nullptr, slot);
316  try {
317  for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
318  RunAndCheckFilters(slot, currEntry);
319  }
320  } catch (...) {
321  CleanUpTask(slot);
322  // Error might throw in experiment frameworks like CMSSW
323  std::cerr << "RDataFrame::Run: event loop was interrupted\n";
324  throw;
325  }
326  CleanUpTask(slot);
327  };
328 
330  pool.Foreach(genFunction, entryRanges);
331 
332 #endif // not implemented otherwise
333 }
334 
335 /// Run event loop with no source files, in sequence.
337 {
338  InitNodeSlots(nullptr, 0);
339  try {
340  for (ULong64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
341  RunAndCheckFilters(0, currEntry);
342  }
343  } catch (...) {
344  CleanUpTask(0u);
345  std::cerr << "RDataFrame::Run: event loop was interrupted\n";
346  throw;
347  }
348  CleanUpTask(0u);
349 }
350 
351 /// Run event loop over one or multiple ROOT files, in parallel.
353 {
354 #ifdef R__USE_IMT
356  RSlotStack slotStack(fNSlots);
357  const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
358  auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList, fNSlots);
359 
360  std::atomic<ULong64_t> entryCount(0ull);
361 
362  tp->Process([this, &slotStack, &entryCount](TTreeReader &r) -> void {
363  RSlotRAII slotRAII(slotStack);
364  auto slot = slotRAII.fSlot;
365  InitNodeSlots(&r, slot);
366  const auto entryRange = r.GetEntriesRange(); // we trust TTreeProcessorMT to call SetEntriesRange
367  const auto nEntries = entryRange.second - entryRange.first;
368  auto count = entryCount.fetch_add(nEntries);
369  try {
370  // recursive call to check filters and conditionally execute actions
371  while (r.Next()) {
372  RunAndCheckFilters(slot, count++);
373  }
374  } catch (...) {
375  CleanUpTask(slot);
376  std::cerr << "RDataFrame::Run: event loop was interrupted\n";
377  throw;
378  }
379  CleanUpTask(slot);
380  });
381 #endif // no-op otherwise (will not be called)
382 }
383 
384 /// Run event loop over one or multiple ROOT files, in sequence.
386 {
388  TTreeReader r(fTree.get(), fTree->GetEntryList());
389  if (0 == fTree->GetEntriesFast())
390  return;
391  InitNodeSlots(&r, 0);
392 
393  // recursive call to check filters and conditionally execute actions
394  // in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
395  try {
396  while (r.Next() && fNStopsReceived < fNChildren) {
397  RunAndCheckFilters(0, r.GetCurrentEntry());
398  }
399  } catch (...) {
400  CleanUpTask(0u);
401  std::cerr << "RDataFrame::Run: event loop was interrupted\n";
402  throw;
403  }
404  if (r.GetEntryStatus() != TTreeReader::kEntryNotFound && fNStopsReceived < fNChildren) {
405  // something went wrong in the TTreeReader event loop
406  throw std::runtime_error("An error was encountered while processing the data. TTreeReader status code is: " +
407  std::to_string(r.GetEntryStatus()));
408  }
409  CleanUpTask(0u);
410 }
411 
412 /// Run event loop over data accessed through a DataSource, in sequence.
414 {
415  R__ASSERT(fDataSource != nullptr);
416  fDataSource->Initialise();
417  auto ranges = fDataSource->GetEntryRanges();
418  while (!ranges.empty() && fNStopsReceived < fNChildren) {
419  InitNodeSlots(nullptr, 0u);
420  fDataSource->InitSlot(0u, 0ull);
421  try {
422  for (const auto &range : ranges) {
423  auto end = range.second;
424  for (auto entry = range.first; entry < end && fNStopsReceived < fNChildren; ++entry) {
425  if (fDataSource->SetEntry(0u, entry)) {
426  RunAndCheckFilters(0u, entry);
427  }
428  }
429  }
430  } catch (...) {
431  CleanUpTask(0u);
432  std::cerr << "RDataFrame::Run: event loop was interrupted\n";
433  throw;
434  }
435  CleanUpTask(0u);
436  fDataSource->FinaliseSlot(0u);
437  ranges = fDataSource->GetEntryRanges();
438  }
439  fDataSource->Finalise();
440 }
441 
442 /// Run event loop over data accessed through a DataSource, in parallel.
444 {
445 #ifdef R__USE_IMT
446  R__ASSERT(fDataSource != nullptr);
447  RSlotStack slotStack(fNSlots);
449 
450  // Each task works on a subrange of entries
451  auto runOnRange = [this, &slotStack](const std::pair<ULong64_t, ULong64_t> &range) {
452  RSlotRAII slotRAII(slotStack);
453  const auto slot = slotRAII.fSlot;
454  InitNodeSlots(nullptr, slot);
455  fDataSource->InitSlot(slot, range.first);
456  const auto end = range.second;
457  try {
458  for (auto entry = range.first; entry < end; ++entry) {
459  if (fDataSource->SetEntry(slot, entry)) {
460  RunAndCheckFilters(slot, entry);
461  }
462  }
463  } catch (...) {
464  CleanUpTask(slot);
465  std::cerr << "RDataFrame::Run: event loop was interrupted\n";
466  throw;
467  }
468  CleanUpTask(slot);
469  fDataSource->FinaliseSlot(slot);
470  };
471 
472  fDataSource->Initialise();
473  auto ranges = fDataSource->GetEntryRanges();
474  while (!ranges.empty()) {
475  pool.Foreach(runOnRange, ranges);
476  ranges = fDataSource->GetEntryRanges();
477  }
478  fDataSource->Finalise();
479 #endif // not implemented otherwise (never called)
480 }
481 
482 /// Execute actions and make sure named filters are called for each event.
483 /// Named filters must be called even if the analysis logic would not require it, lest they report confusing results.
484 void RLoopManager::RunAndCheckFilters(unsigned int slot, Long64_t entry)
485 {
486  for (auto &actionPtr : fBookedActions)
487  actionPtr->Run(slot, entry);
488  for (auto &namedFilterPtr : fBookedNamedFilters)
489  namedFilterPtr->CheckFilters(slot, entry);
490  for (auto &callback : fCallbacks)
491  callback(slot);
492 }
493 
494 /// Build TTreeReaderValues for all nodes
495 /// This method loops over all filters, actions and other booked objects and
496 /// calls their `InitSlot` method, to get them ready for running a task.
497 void RLoopManager::InitNodeSlots(TTreeReader *r, unsigned int slot)
498 {
499  for (auto &ptr : fBookedActions)
500  ptr->InitSlot(r, slot);
501  for (auto &ptr : fBookedFilters)
502  ptr->InitSlot(r, slot);
503  for (auto &callback : fCallbacksOnce)
504  callback(slot);
505 }
506 
507 /// Initialize all nodes of the functional graph before running the event loop.
508 /// This method is called once per event-loop and performs generic initialization
509 /// operations that do not depend on the specific processing slot (i.e. operations
510 /// that are common for all threads).
512 {
514  for (auto &filter : fBookedFilters)
515  filter->InitNode();
516  for (auto &range : fBookedRanges)
517  range->InitNode();
518  for (auto &ptr : fBookedActions)
519  ptr->Initialize();
520 }
521 
522 /// Perform clean-up operations. To be called at the end of each event loop.
524 {
525  fMustRunNamedFilters = false;
526 
527  // forget RActions and detach TResultProxies
528  for (auto &ptr : fBookedActions)
529  ptr->Finalize();
530 
531  fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
532  fBookedActions.clear();
533 
534  // reset children counts
535  fNChildren = 0;
536  fNStopsReceived = 0;
537  for (auto &ptr : fBookedFilters)
538  ptr->ResetChildrenCount();
539  for (auto &ptr : fBookedRanges)
540  ptr->ResetChildrenCount();
541 
542  fCallbacks.clear();
543  fCallbacksOnce.clear();
544 }
545 
546 /// Perform clean-up operations. To be called at the end of each task execution.
547 void RLoopManager::CleanUpTask(unsigned int slot)
548 {
549  for (auto &ptr : fBookedActions)
550  ptr->FinalizeSlot(slot);
551  for (auto &ptr : fBookedFilters)
552  ptr->FinaliseSlot(slot);
553 }
554 
555 /// Add RDF nodes that require just-in-time compilation to the computation graph.
556 /// This method also clears the contents of GetCodeToJit().
558 {
560 
561  const std::string code = std::move(GetCodeToJit());
562  if (code.empty())
563  return;
564 
565  RDFInternal::InterpreterCalc(code, "RLoopManager::Run");
566 }
567 
568 /// Trigger counting of number of children nodes for each node of the functional graph.
569 /// This is done once before starting the event loop. Each action sends an `increase children count` signal
570 /// upstream, which is propagated until RLoopManager. Each time a node receives the signal, in increments its
571 /// children counter. Each node only propagates the signal once, even if it receives it multiple times.
572 /// Named filters also send an `increase children count` signal, just like actions, as they always execute during
573 /// the event loop so the graph branch they belong to must count as active even if it does not end in an action.
575 {
576  for (auto &actionPtr : fBookedActions)
577  actionPtr->TriggerChildrenCount();
578  for (auto &namedFilterPtr : fBookedNamedFilters)
579  namedFilterPtr->TriggerChildrenCount();
580 }
581 
582 /// Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
583 /// Also perform a few setup and clean-up operations (jit actions if necessary, clear booked actions after the loop...).
585 {
586  // Change value of TTree::GetMaxTreeSize only for this scope. Revert when #6640 will be solved.
587  MaxTreeSizeRAII ctxtmts;
588 
589  ThrowIfNSlotsChanged(GetNSlots());
590 
591  Jit();
592 
593  InitNodes();
594 
595  switch (fLoopType) {
599  case ELoopType::kNoFiles: RunEmptySource(); break;
600  case ELoopType::kROOTFiles: RunTreeReader(); break;
601  case ELoopType::kDataSource: RunDataSource(); break;
602  }
603 
604  CleanUpNodes();
605 
606  fNRuns++;
607 }
608 
609 /// Return the list of default columns -- empty if none was provided when constructing the RDataFrame
611 {
612  return fDefaultColumns;
613 }
614 
616 {
617  return fTree.get();
618 }
619 
621 {
622  fBookedActions.emplace_back(actionPtr);
623 }
624 
626 {
627  RDFInternal::Erase(actionPtr, fRunActions);
628  RDFInternal::Erase(actionPtr, fBookedActions);
629 }
630 
632 {
633  fBookedFilters.emplace_back(filterPtr);
634  if (filterPtr->HasName()) {
635  fBookedNamedFilters.emplace_back(filterPtr);
636  fMustRunNamedFilters = true;
637  }
638 }
639 
641 {
642  RDFInternal::Erase(filterPtr, fBookedFilters);
643  RDFInternal::Erase(filterPtr, fBookedNamedFilters);
644 }
645 
647 {
648  fBookedRanges.emplace_back(rangePtr);
649 }
650 
652 {
653  RDFInternal::Erase(rangePtr, fBookedRanges);
654 }
655 
656 // dummy call, end of recursive chain of calls
658 {
659  return true;
660 }
661 
662 /// Call `FillReport` on all booked filters
664 {
665  for (const auto &fPtr : fBookedNamedFilters)
666  fPtr->FillReport(rep);
667 }
668 
669 void RLoopManager::ToJitExec(const std::string &code) const
670 {
672  GetCodeToJit().append(code);
673 }
674 
675 void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f)
676 {
677  if (everyNEvents == 0ull)
678  fCallbacksOnce.emplace_back(std::move(f), fNSlots);
679  else
680  fCallbacks.emplace_back(everyNEvents, std::move(f), fNSlots);
681 }
682 
683 std::vector<std::string> RLoopManager::GetFiltersNames()
684 {
685  std::vector<std::string> filters;
686  for (auto &filter : fBookedFilters) {
687  auto name = (filter->HasName() ? filter->GetName() : "Unnamed Filter");
688  filters.push_back(name);
689  }
690  return filters;
691 }
692 
693 std::vector<RNodeBase *> RLoopManager::GetGraphEdges() const
694 {
695  std::vector<RNodeBase *> nodes(fBookedFilters.size() + fBookedRanges.size());
696  auto it = std::copy(fBookedFilters.begin(), fBookedFilters.end(), nodes.begin());
697  std::copy(fBookedRanges.begin(), fBookedRanges.end(), it);
698  return nodes;
699 }
700 
701 std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions() const
702 {
703  std::vector<RDFInternal::RActionBase *> actions(fBookedActions.size() + fRunActions.size());
704  auto it = std::copy(fBookedActions.begin(), fBookedActions.end(), actions.begin());
705  std::copy(fRunActions.begin(), fRunActions.end(), it);
706  return actions;
707 }
708 
709 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph()
710 {
711  std::string name;
712  if (fDataSource) {
713  name = fDataSource->GetLabel();
714  } else if (fTree) {
715  name = fTree->GetName();
716  } else {
717  name = std::to_string(fNEmptyEntries);
718  }
719 
720  auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(name);
721  thisNode->SetRoot();
722  thisNode->SetCounter(0);
723  return thisNode;
724 }
725 
726 ////////////////////////////////////////////////////////////////////////////
727 /// Return all valid TTree::Branch names (caching results for subsequent calls).
728 /// Never use fBranchNames directy, always request it through this method.
730 {
731  if (fValidBranchNames.empty() && fTree) {
732  fValidBranchNames = RDFInternal::GetBranchNames(*fTree, /*allowRepetitions=*/true);
733  }
734  return fValidBranchNames;
735 }
736 
737 bool RLoopManager::HasDSValuePtrs(const std::string &col) const
738 {
739  return fDSValuePtrMap.find(col) != fDSValuePtrMap.end();
740 }
741 
742 void RLoopManager::AddDSValuePtrs(const std::string &col, const std::vector<void *> ptrs)
743 {
744  fDSValuePtrMap[col] = ptrs;
745 }
ROOT::Detail::RDF::RFilterBase
Definition: RFilterBase.hxx:36
ROOT::Internal::RDF::InterpreterCalc
Long64_t InterpreterCalc(const std::string &code, const std::string &context)
Definition: RDFUtils.cxx:310
ROOT::Detail::RDF::RLoopManager::Run
void Run()
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
Definition: RLoopManager.cxx:584
filters
const char * filters[]
Definition: TGFileBrowser.cxx:61
ROOT::Detail::RDF::RLoopManager::fNSlots
const unsigned int fNSlots
Definition: RLoopManager.hxx:109
ROOT::Detail::RDF::RLoopManager::CleanUpTask
void CleanUpTask(unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
Definition: RLoopManager.cxx:547
TTree::FindBranch
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
Definition: TTree.cxx:4774
f
#define f(i)
Definition: RSha256.hxx:104
ROOT::Detail::RDF::RLoopManager::fDSValuePtrMap
std::map< std::string, std::vector< void * > > fDSValuePtrMap
Registry of per-slot value pointers for booked data-source columns.
Definition: RLoopManager.hxx:119
ROOT::Detail::RDF::RLoopManager::fBookedRanges
std::vector< RRangeBase * > fBookedRanges
Definition: RLoopManager.hxx:102
TBranchElement
A Branch for the case of an object.
Definition: TBranchElement.h:39
ROOT::Detail::RDF::RLoopManager::fBookedFilters
std::vector< RFilterBase * > fBookedFilters
Definition: RLoopManager.hxx:100
TTree::GetListOfFriends
virtual TList * GetListOfFriends() const
Definition: TTree.h:485
tree
Definition: tree.py:1
ROOT::Detail::RDF::RLoopManager::fMustRunNamedFilters
bool fMustRunNamedFilters
Definition: RLoopManager.hxx:110
r
ROOT::R::TRInterface & r
Definition: Object.C:4
Long64_t
long long Long64_t
Definition: RtypesCore.h:73
ROOT::Detail::RDF::RLoopManager::EvalChildrenCounts
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
Definition: RLoopManager.cxx:574
TTree
A TTree represents a columnar dataset.
Definition: TTree.h:79
ROOT::TThreadExecutor
This class provides a simple interface to execute the same task multiple times in parallel,...
Definition: TThreadExecutor.hxx:35
extract_docstrings.ds
ds
Definition: extract_docstrings.py:40
ROOT::Detail::RDF::RLoopManager::RunAndCheckFilters
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
Definition: RLoopManager.cxx:484
RFilterBase.hxx
ROOT::Detail::RDF::RRangeBase
Definition: RRangeBase.hxx:32
ROOT::Detail::RDF::RLoopManager::InitNodes
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
Definition: RLoopManager.cxx:511
ROOT::TThreadExecutor::Foreach
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
Definition: TThreadExecutor.hxx:117
ROOT::Detail::RDF::RLoopManager::fCallbacks
std::vector< TCallback > fCallbacks
Registered callbacks.
Definition: RLoopManager.hxx:114
TFriendElement
A TFriendElement TF describes a TTree object TF in a file.
Definition: TFriendElement.h:33
ROOT::Detail::RDF::RLoopManager::fRunActions
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
Definition: RLoopManager.hxx:99
ROOT::Detail::RDF::RLoopManager::fNRuns
unsigned int fNRuns
Number of event loops run.
Definition: RLoopManager.hxx:116
ROOT::Detail::RDF::RLoopManager::ELoopType::kDataSourceMT
@ kDataSourceMT
ROOT::Detail::RDF::RLoopManager::ELoopType
ELoopType
Definition: RLoopManager.hxx:58
TFriendElement.h
TTree.h
ROOT::Detail::RDF::RLoopManager::ToJitExec
void ToJitExec(const std::string &) const
Definition: RLoopManager.cxx:669
ROOT::Detail::RDF::RLoopManager::AddDSValuePtrs
void AddDSValuePtrs(const std::string &col, const std::vector< void * > ptrs)
Definition: RLoopManager.cxx:742
b
#define b(i)
Definition: RSha256.hxx:100
ROOT::Detail::RDF::RLoopManager::fDataSource
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object. Null if no data-source.
Definition: RLoopManager.hxx:112
ROOT::Detail::RDF
Definition: GraphUtils.hxx:28
ROOT::Detail::RDF::ColumnNames_t
std::vector< std::string > ColumnNames_t
Definition: RLoopManager.hxx:53
ROOT::Internal::RDF::RActionBase
Definition: RActionBase.hxx:39
ROOT::Detail::RDF::RLoopManager::GetGraph
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph()
Definition: RLoopManager.cxx:709
TTree::GetListOfBranches
virtual TObjArray * GetListOfBranches()
Definition: TTree.h:483
TTree::SetMaxTreeSize
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
Definition: TTree.cxx:9028
RDataSource.hxx
ROOT::Detail::RDF::RLoopManager::fDefaultColumns
const ColumnNames_t fDefaultColumns
Definition: RLoopManager.hxx:107
TROOT.h
TTreeProcessorMT.hxx
TTree::GetTree
virtual TTree * GetTree() const
Definition: TTree.h:512
TBranch
A TTree is a list of TBranches.
Definition: TBranch.h:89
TTree::GetBranch
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
Definition: TTree.cxx:5221
ROOT::Detail::RDF::RLoopManager::fCallbacksOnce
std::vector< TOneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
Definition: RLoopManager.hxx:115
TTreeReader.h
ROOT::Detail::RDF::RLoopManager::fBookedNamedFilters
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
Definition: RLoopManager.hxx:101
ROOT::Detail::RDF::RLoopManager::fNEmptyEntries
const ULong64_t fNEmptyEntries
Definition: RLoopManager.hxx:108
TBranchElement.h
ROOT::Detail::RDF::RLoopManager::RunEmptySourceMT
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
Definition: RLoopManager.cxx:291
ROOT::Detail::RDF::RLoopManager::GetBranchNames
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
Definition: RLoopManager.cxx:729
ROOT::Detail::RDF::RLoopManager::fBookedActions
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
Definition: RLoopManager.hxx:98
gROOTMutex
R__EXTERN TVirtualMutex * gROOTMutex
Definition: TROOT.h:61
TLeaf
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
Definition: TLeaf.h:57
ROOT::Detail::RDF::RLoopManager::GetTree
TTree * GetTree() const
Definition: RLoopManager.cxx:615
ROOT::Detail::RDF::RLoopManager::fValidBranchNames
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
Definition: RLoopManager.hxx:122
TFriendElement::GetTree
virtual TTree * GetTree()
Return pointer to friend TTree.
Definition: TFriendElement.cxx:209
ROOT::Detail::RDF::RFilterBase::HasName
bool HasName() const
Definition: RFilterBase.cxx:25
ROOT::Detail::RDF::RLoopManager::RLoopManager
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
Definition: RLoopManager.cxx:243
ROOT::Detail::RDF::RLoopManager::RunEmptySource
void RunEmptySource()
Run event loop with no source files, in sequence.
Definition: RLoopManager.cxx:336
ROOT::Detail::RDF::RLoopManager::GetDefaultColumnNames
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
Definition: RLoopManager.cxx:610
TThreadExecutor.hxx
ROOT::R::function
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
ROOT::Detail::RDF::RLoopManager::ELoopType::kDataSource
@ kDataSource
ROOT::Internal::RDF::RSlotStack
This is an helper class to allow to pick a slot resorting to a map indexed by thread ids.
Definition: RSlotStack.hxx:26
ROOT::Detail::RDF::RLoopManager::RunTreeReader
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
Definition: RLoopManager.cxx:385
TTreeReader
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:44
ROOT::Internal::RDF::RSlotStack::ReturnSlot
void ReturnSlot(unsigned int slotNumber)
Definition: RSlotStack.cxx:23
R__LOCKGUARD
#define R__LOCKGUARD(mutex)
Definition: TVirtualMutex.h:104
ROOT::Detail::RDF::RLoopManager::Book
void Book(RDFInternal::RActionBase *actionPtr)
Definition: RLoopManager.cxx:620
ROOT::Detail::RDF::RLoopManager::GetAllActions
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
Definition: RLoopManager.cxx:701
ROOT::Detail::RDF::RNodeBase::fNChildren
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
Definition: RNodeBase.hxx:44
ROOT::Detail::RDF::RLoopManager::ELoopType::kROOTFiles
@ kROOTFiles
TTreeReader::kEntryNotFound
@ kEntryNotFound
the tree entry number does not exist
Definition: TTreeReader.h:129
ROOT::Detail::RDF::RLoopManager::fTree
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
Definition: RLoopManager.hxx:106
ROOT::Detail::RDF::RLoopManager::CheckFilters
bool CheckFilters(unsigned int, Long64_t) final
Definition: RLoopManager.cxx:657
RRangeBase.hxx
TEntryList
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
ROOT::Detail::RDF::RLoopManager::ELoopType::kNoFilesMT
@ kNoFilesMT
ULong64_t
unsigned long long ULong64_t
Definition: RtypesCore.h:74
ROOT::Detail::RDF::RLoopManager::RunDataSource
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
Definition: RLoopManager.cxx:413
ROOT::Internal::RDF::GetNSlots
unsigned int GetNSlots()
Definition: RDFUtils.cxx:268
ROOT::IsImplicitMTEnabled
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
Definition: TROOT.cxx:556
ROOT::Detail::RDF::RLoopManager::CleanUpNodes
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
Definition: RLoopManager.cxx:523
RSlotStack.hxx
RtypesCore.h
R__ASSERT
#define R__ASSERT(e)
Definition: TError.h:120
TInterpreter.h
ROOT::RDF::RCutFlowReport
Definition: RCutFlowReport.hxx:47
RActionBase.hxx
TEntryList.h
ROOT::Internal::RDF
Definition: RArrowDS.hxx:15
ROOT::Detail::RDF::RLoopManager::Jit
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
Definition: RLoopManager.cxx:557
ROOT::Detail::RDF::RLoopManager::GetGraphEdges
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
Definition: RLoopManager.cxx:693
name
char name[80]
Definition: TGX11.cxx:110
ROOT::Detail::RDF::RLoopManager::RunTreeProcessorMT
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
Definition: RLoopManager.cxx:352
ROOT::Detail::RDF::RLoopManager::ELoopType::kROOTFilesMT
@ kROOTFilesMT
TBranchObject.h
TTree::GetFriendAlias
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
Definition: TTree.cxx:5943
ROOT::Detail::RDF::RLoopManager::RegisterCallback
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
Definition: RLoopManager.cxx:675
ROOT::Detail::RDF::RLoopManager::GetFiltersNames
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
Definition: RLoopManager.cxx:683
ROOT::Detail::RDF::RLoopManager::Report
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
Definition: RLoopManager.cxx:663
ROOT::Detail::RDF::RLoopManager::RunDataSourceMT
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
Definition: RLoopManager.cxx:443
RLoopManager.hxx
ROOT::Detail::RDF::RLoopManager::InitNodeSlots
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
Definition: RLoopManager.cxx:497
TNamed::GetName
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
ROOT::Detail::RDF::RLoopManager::fLoopType
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
Definition: RLoopManager.hxx:111
Class
void Class()
Definition: Class.C:29
ROOT::Detail::RDF::RLoopManager::Deregister
void Deregister(RDFInternal::RActionBase *actionPtr)
Definition: RLoopManager.cxx:625
TBranch::GetListOfLeaves
TObjArray * GetListOfLeaves()
Definition: TBranch.h:243
ROOT::Detail::RDF::RNodeBase::fNStopsReceived
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
Definition: RNodeBase.hxx:45
ROOT::Internal::RDF::GetBranchNames
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
Definition: RLoopManager.cxx:233
ROOT::Detail::RDF::RLoopManager::GetNSlots
unsigned int GetNSlots() const
Definition: RLoopManager.hxx:161
ROOT
VSD Structures.
Definition: StringConv.hxx:21
GraphNode.hxx
ROOT::Detail::RDF::RLoopManager::ELoopType::kNoFiles
@ kNoFiles
ROOT::Detail::RDF::RLoopManager::HasDSValuePtrs
bool HasDSValuePtrs(const std::string &col) const
Definition: RLoopManager.cxx:737
ROOT::Detail::RDF::RLoopManager::CheckIndexedFriends
void CheckIndexedFriends()
Definition: RLoopManager.cxx:265