53#include <sys/extattr.h>
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
74#include <unordered_map>
90 static std::string code;
137 for (
auto sb : *
b->GetListOfBranches()) {
173 std::string err(
"GetBranchNames: error in opening the tree ");
175 throw std::runtime_error(err);
206 throw std::runtime_error(
"GetBranchNames: unsupported branch type");
208 if (
be->GetType() == 3 ||
be->GetType() == 4)
234 if (
alias !=
nullptr)
247 std::string
msg =
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
248 std::to_string(
nSlots) +
", but when starting the event loop it was " +
251 msg +=
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
253 msg +=
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
254 throw std::runtime_error(
msg);
266struct MaxTreeSizeRAII {
269 MaxTreeSizeRAII() : fOldMaxTreeSize(
TTree::GetMaxTreeSize())
277struct DatasetLogInfo {
278 std::string fDataSet;
286 std::stringstream
msg;
287 msg <<
"Processing " <<
info.fDataSet <<
": entry range [" <<
info.fRangeStart <<
"," <<
info.fRangeEnd - 1
288 <<
"], using slot " <<
info.fSlot <<
" in thread " << std::this_thread::get_id() <<
'.';
294 const auto tree =
r.GetTree();
310 what +=
" in files {";
319 const auto file =
tree->GetCurrentFile();
321 what += std::string(
" in file \"") + file->GetName() +
"\"";
372 : fTree(std::shared_ptr<
TTree>(tree, [](
TTree *) {})),
376 fNewSampleNotifier(fNSlots),
377 fSampleInfos(fNSlots),
378 fDatasetColumnReaders(fNSlots)
383 : fTree(std::
move(tree)),
387 fNewSampleNotifier(fNSlots),
388 fSampleInfos(fNSlots),
389 fDatasetColumnReaders(fNSlots)
397 fNewSampleNotifier(fNSlots),
398 fSampleInfos(fNSlots),
399 fDatasetColumnReaders(fNSlots)
407 fDataSource(std::
move(
ds)),
408 fNewSampleNotifier(fNSlots),
409 fSampleInfos(fNSlots),
410 fDatasetColumnReaders(fNSlots)
418 fNewSampleNotifier(fNSlots),
419 fSampleInfos(fNSlots),
420 fDatasetColumnReaders(fNSlots)
433 if (
gEnv->
GetValue(
"TFile.CrossProtocolRedirects", 1) == 1) {
485 for (std::size_t i = 0
ul; i <
files.size(); ++i) {
502 SetTree(std::move(
chain));
507 for (std::size_t i = 0
ul; i < fFriends.size(); i++) {
523 std::vector<std::pair<ULong64_t, ULong64_t>>
entryRanges;
549 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
574 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
599 default:
return false;
638 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
645 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
646 std::to_string(
r.GetEntryStatus()));
664 throw std::logic_error(
"Something went wrong in initializing the TTreeReader.");
680 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
685 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
686 std::to_string(
r.GetEntryStatus()));
701 for (
const auto &
range : ranges) {
702 const auto start =
range.first;
703 const auto end =
range.second;
712 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
736 const auto start =
range.first;
737 const auto end =
range.second;
746 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
754 while (!ranges.empty()) {
788 ptr->InitSlot(
r,
slot);
790 ptr->InitSlot(
r,
slot);
792 ptr->InitSlot(
r,
slot);
794 ptr->InitSlot(
r,
slot);
816 "Empty source, range: {" + std::to_string(
range.first) +
", " + std::to_string(
range.second) +
"}",
range);
821 auto *tree =
r.GetTree()->GetTree();
824 auto *file = tree->GetCurrentFile();
825 const std::string
fname = file !=
nullptr ? file->GetName() :
"#inmemorytree#";
827 std::pair<Long64_t, Long64_t>
range =
r.GetEntriesRange();
829 if (
range.second == -1) {
830 range.second = tree->GetEntries();
838 throw std::runtime_error(
"Full sample identifier '" +
id +
"' cannot be found in the available samples.");
874 ptr->ResetChildrenCount();
876 ptr->ResetChildrenCount();
888 ptr->FinalizeSlot(
slot);
890 ptr->FinalizeSlot(
slot);
892 ptr->FinalizeSlot(
slot);
914 const std::string code = []() {
925 :
" in less than 1ms.");
1071 fPtr->FillReport(
rep);
1076 fTree = std::move(tree);
1099 std::vector<std::string>
filters;
1101 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
1124 std::unordered_map<
void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &
visitedMap)
1141 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1169 std::vector<std::unique_ptr<RColumnReaderBase>> &&
readers,
1170 const std::type_info &
ti)
1185 std::unique_ptr<RColumnReaderBase> &&
reader,
1186 const std::type_info &
ti)
1203 return it->second.get();
1237 constexpr std::string_view
delim{
".root"};
1245 return std::make_pair(
fileNameGlob, std::string_view{});
1252 constexpr std::array<char, 4>
wildCards{
'[',
']',
'*',
'?'};
1254 [&
baseName](
auto &&
wc) { return baseName.find(wc) != std::string_view::npos; });
1265 throw std::invalid_argument(
"RDataFrame: could not open file \"" +
fileToOpen +
"\".");
1270std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1288std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1293 throw std::invalid_argument(
"RDataFrame: empty list of input files.");
1309std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1318std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1327std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1340 throw std::invalid_argument(
"RDataFrame: unsupported data format for dataset \"" + std::string(
datasetName) +
1341 "\" in file \"" +
inFile->GetName() +
"\".");
1344std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1350 throw std::invalid_argument(
"RDataFrame: empty list of input files.");
1360 throw std::invalid_argument(
"RDataFrame: unsupported data format for dataset \"" + std::string(
datasetName) +
1361 "\" in file \"" +
inFile->GetName() +
"\".");
#define R__LOG_DEBUG(DEBUGLEVEL,...)
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
unsigned long long ULong64_t
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
R__EXTERN TSystem * gSystem
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
The head node of a RDF computation graph.
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
ULong64_t GetNEmptyEntries() const
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
const ColumnNames_t & GetBranchNames()
Return all valid TTree::Branch names (caching results for subsequent calls).
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
bool fMustRunNamedFilters
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
void SetTree(std::shared_ptr< TTree > tree)
std::shared_ptr< TTree > fTree
Shared pointer to the input TTree.
std::vector< RDefineBase * > fBookedDefines
void RunTreeReader()
Run event loop over one or multiple ROOT files, in sequence.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, const std::string &col, const std::type_info &ti) const
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< std::string > fSuppressErrorsForMissingBranches
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
RColumnReaderBase * AddTreeColumnReader(unsigned int slot, const std::string &col, std::unique_ptr< RColumnReaderBase > &&reader, const std::type_info &ti)
Register a new RTreeColumnReader with this RLoopManager.
const ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
void AddDataSourceColumnReaders(const std::string &col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
ColumnNames_t fValidBranchNames
Cache of the tree/chain branch names. Never access directy, always use GetBranchNames().
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
const unsigned int fNSlots
void Register(RDFInternal::RActionBase *actionPtr)
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
const std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
const ColumnNames_t fDefaultColumns
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
void RunTreeProcessorMT()
Run event loop over one or multiple ROOT files, in parallel.
void Deregister(RDFInternal::RActionBase *actionPtr)
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(const std::string &col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
void SetFlag(unsigned int slot)
bool CheckFlag(unsigned int slot) const
void UnsetFlag(unsigned int slot)
TNotifyLink< RNewSampleFlag > & GetChainNotifyLink(unsigned int slot)
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe stack of N indexes (0 to size - 1).
void RegisterChain(TChain &c)
The dataset specification for RDataFrame.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
Representation of an RNTuple data set in a ROOT file.
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
A Branch for the case of an object.
A TTree is a list of TBranches.
A chain is a collection of files containing TTree objects.
TDirectory::TContext keeps track and restore the current directory.
A List of entry numbers in a TTree or TChain.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
A TFriendElement TF describes a TTree object TF in a file.
A TLeaf describes individual elements of a TBranch See TBranch structure in TTree.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
A TTree represents a columnar dataset.
virtual TBranch * FindBranch(const char *name)
Return the branch that correspond to the path 'branchname', which can include the name of the tree or...
virtual TBranch * GetBranch(const char *name)
Return pointer to the branch with the given name in this tree or its friends.
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
virtual TObjArray * GetListOfBranches()
virtual TTree * GetTree() const
virtual const char * GetFriendAlias(TTree *) const
If the 'tree' is a friend, this method returns its alias name.
This class represents a WWW compatible URL.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::Experimental::RLogChannel & RDFLogChannel()
std::vector< std::string > GetBranchNames(TTree &t, bool allowDuplicates=true)
Get all the branches names, including the ones of the friend trees.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Long64_t InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
R__EXTERN TVirtualRWMutex * gCoreMutex
A RAII object that calls RLoopManager::CleanUpTask at destruction.
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
RLoopManager & fLoopManager
A RAII object to pop and push slot numbers from a RSlotStack object.