53#include <sys/extattr.h>
57#define getxattr(path, name, value, size) getxattr(path, name, value, size, 0u, 0)
60#define getxattr(path, name, value, size) extattr_get_file(path, EXTATTR_NAMESPACE_USER, name, value, size)
74#include <unordered_map>
88std::string &GetCodeToJit()
90 static std::string code;
94std::string &GetCodeToDeclare()
96 static std::string code;
104std::unordered_map<std::size_t, JitHelperFunc_t> &GetJitHelperFuncMap()
106 static std::unordered_map<std::size_t, JitHelperFunc_t>
map;
109std::unordered_map<std::size_t, std::size_t> &GetJitFuncBodyToFuncIdMap()
111 static std::unordered_map<std::size_t, std::size_t>
map;
115void DeclareAndRetrieveDeferredJitCalls(
const std::string &codeToDeclare)
129 gInterpreter->ProcessLine(codeToDeclare.c_str(), &interpErrorCode);
131 throw std::runtime_error(
132 "\nAn error occurred during just-in-time compilation in RLoopManager::Run. The lines above might "
133 "indicate the cause of the error.\nAll RDF objects that have not run their event loop yet should be "
134 "considered in an invalid state.\n");
139 auto &funcIdToFuncPointersMap = GetJitHelperFuncMap();
140 auto &funcBodyToFuncIdMap = GetJitFuncBodyToFuncIdMap();
144 for (
auto &codeAndId : funcBodyToFuncIdMap) {
145 if (
auto it = funcIdToFuncPointersMap.find(codeAndId.second); it == funcIdToFuncPointersMap.end()) {
149 const std::string funcName =
"jitNodeRegistrator_" + std::to_string(codeAndId.second);
150 auto declid =
gInterpreter->GetFunction(clinfo, funcName.c_str());
155 throw std::runtime_error(
156 "\nAn error occurred during just-in-time compilation in RLoopManager::Run: failed to retrieve "
157 "the JIT helper function '" +
159 "'. The lines above might indicate the cause of the error.\nAll RDF objects that have not run "
160 "their event loop yet should be considered in an invalid state.\n");
164 auto mname =
gInterpreter->MethodInfo_GetMangledName(minfo);
165 [[maybe_unused]]
auto res = funcIdToFuncPointersMap.insert(
166 {codeAndId.second,
reinterpret_cast<JitHelperFunc_t
>(
gInterpreter->FindSym(mname))});
174void ThrowIfNSlotsChanged(
unsigned int nSlots)
177 if (currentSlots != nSlots) {
178 std::string msg =
"RLoopManager::Run: when the RDataFrame was constructed the number of slots required was " +
179 std::to_string(nSlots) +
", but when starting the event loop it was " +
180 std::to_string(currentSlots) +
".";
181 if (currentSlots > nSlots)
182 msg +=
" Maybe EnableImplicitMT() was called after the RDataFrame was constructed?";
184 msg +=
" Maybe DisableImplicitMT() was called after the RDataFrame was constructed?";
185 throw std::runtime_error(msg);
197struct MaxTreeSizeRAII {
200 MaxTreeSizeRAII() : fOldMaxTreeSize(TTree::GetMaxTreeSize())
208struct DatasetLogInfo {
209 std::string fDataSet;
215std::string LogRangeProcessing(
const DatasetLogInfo &info)
217 std::stringstream msg;
218 msg <<
"Processing " << info.fDataSet <<
": entry range [" << info.fRangeStart <<
"," << info.fRangeEnd - 1
219 <<
"], using slot " << info.fSlot <<
" in thread " << std::this_thread::get_id() <<
'.';
223auto MakeDatasetColReadersKey(std::string_view colName,
const std::type_info &ti)
229 return std::string(colName) +
':' + ti.name();
237bool IsObjectInDir(std::string_view objName,
TDirectory &dir)
239 std::unique_ptr<T> o{dir.
Get<T>(objName.data())};
249bool IsObjectInDir<void>(std::string_view objName,
TDirectory &dir)
251 return dir.
GetKey(objName.data());
266 auto &&baseNameAndQuery = [&fileNameGlob]() {
267 constexpr std::string_view delim{
".root"};
268 if (
auto &&it = std::find_end(fileNameGlob.begin(), fileNameGlob.end(), delim.begin(), delim.end());
269 it != fileNameGlob.end()) {
270 auto &&distanceToEndOfDelim = std::distance(fileNameGlob.begin(), it + delim.length());
271 return std::make_pair(fileNameGlob.substr(0, distanceToEndOfDelim), fileNameGlob.substr(distanceToEndOfDelim));
272 }
else if (
auto &&lastQuestionMark = fileNameGlob.find_last_of(
'?'); lastQuestionMark != std::string_view::npos)
273 return std::make_pair(fileNameGlob.substr(0, lastQuestionMark), fileNameGlob.substr(lastQuestionMark));
275 return std::make_pair(fileNameGlob, std::string_view{});
278 auto &&baseName = baseNameAndQuery.first;
279 auto &&query = baseNameAndQuery.second;
281 std::string fileToOpen{fileNameGlob};
282 if (baseName.find_first_of(
"[]*?") != std::string_view::npos) {
284 if (expanded.empty())
285 throw std::invalid_argument{
"RDataFrame: The glob expression '" + std::string{baseName} +
286 "' did not match any files."};
288 fileToOpen = expanded.front() + std::string{query};
292 std::unique_ptr<TFile> inFile{
TFile::Open(fileToOpen.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION")};
293 if (!inFile || inFile->IsZombie())
294 throw std::invalid_argument(
"RDataFrame: could not open file \"" + fileToOpen +
"\".");
375std::optional<std::string> GetRedirectedSampleId(std::string_view path, std::string_view datasetName)
379 TString expandedUrl(path.data());
380 gSystem->ExpandPathName(expandedUrl);
381 if (
gEnv->GetValue(
"TFile.CrossProtocolRedirects", 1) == 1) {
383 if (strcmp(fileurl.GetProtocol(),
"file") == 0) {
384 ssize_t len = getxattr(fileurl.GetFile(),
"eos.url.xroot",
nullptr, 0);
386 std::string xurl(len, 0);
387 std::string fileNameFromUrl{fileurl.GetFile()};
388 if (getxattr(fileNameFromUrl.c_str(),
"eos.url.xroot", &xurl[0], len) == len) {
391 if (
auto baseName = fileNameFromUrl.substr(fileNameFromUrl.find_last_of(
"/") + 1);
392 std::equal(baseName.crbegin(), baseName.crend(), xurl.crbegin())) {
393 return xurl +
'/' + datasetName.data();
420 auto filesVec = spec.GetFileNameGlobs();
423 auto datasetName = spec.GetTreeNames();
433 const bool isTTree = IsObjectInDir<TTree>(datasetName[0], *inFile);
434 const bool isRNTuple = IsObjectInDir<ROOT::RNTuple>(datasetName[0], *inFile);
436 if (isTTree || isRNTuple) {
442 const auto &trees = sample.GetTreeNames();
443 const auto &files = sample.GetFileNameGlobs();
444 for (std::size_t i = 0ul; i < files.size(); ++i) {
447 const auto fullpath = files[i] +
"?#" + trees[i];
448 chain->Add(fullpath.c_str());
452 const auto sampleId = files[i] +
'/' + trees[i];
456 if (
auto redirectedSampleId = GetRedirectedSampleId(files[i], trees[i]))
457 fSampleMap.insert({redirectedSampleId.value(), &sample});
461 fDataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(std::move(chain), spec.GetFriendInfo());
462 }
else if (isRNTuple) {
464 std::vector<std::string> fileNames;
465 std::set<std::string> rntupleNames;
468 const auto &trees = sample.GetTreeNames();
469 const auto &files = sample.GetFileNameGlobs();
470 for (std::size_t i = 0ul; i < files.size(); ++i) {
471 const auto sampleId = files[i] +
'/' + trees[i];
473 fileNames.push_back(files[i]);
474 rntupleNames.insert(trees[i]);
478 if (
auto redirectedSampleId = GetRedirectedSampleId(files[i], trees[i]))
479 fSampleMap.insert({redirectedSampleId.value(), &sample});
484 if (rntupleNames.size() == 1) {
485 fDataSource = std::make_unique<ROOT::RDF::RNTupleDS>(*rntupleNames.begin(), fileNames);
488 throw std::runtime_error(
489 "More than one RNTuple name was found, please make sure to use RNTuples with the same name.");
495 for (
unsigned int slot{}; slot <
fNSlots; slot++) {
501 IsObjectInDir<void>(datasetName[0].data(), *inFile) ?
"unsupported data format for" :
"cannot find";
502 throw std::invalid_argument(
"RDataFrame: " + errMsg +
" dataset \"" + std::string(datasetName[0]) +
"\" in file \"" +
503 inFile->GetName() +
"\".");
511 std::shared_ptr<ROOT::Internal::RSlotStack> slotStack =
SlotStack();
515 const auto nEntriesPerSlot = nEmptyEntries / (
fNSlots * 2);
516 auto remainder = nEmptyEntries % (
fNSlots * 2);
517 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
525 entryRanges.emplace_back(begin, end);
530 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
532 auto slot = slotRAII.
fSlot;
538 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
543 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
549 pool.
Foreach(genFunction, entryRanges);
568 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
594 default:
return false;
603 DSRunRAII(
ROOT::RDF::RDataSource &ds,
const std::set<std::string> &suppressErrorsForMissingColumns) : fDS(ds)
620 fLM.GetDataSource()->InitSlot(
fSlot, firstEntry);
644 std::uint64_t processedEntries{};
645 std::vector<std::pair<ULong64_t, ULong64_t>> ranges{};
655 for (
const auto &range : ranges) {
656 const auto start = range.first;
657 const auto end = range.second;
667 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
675 if (
fEndEntry != std::numeric_limits<Long64_t>::max() &&
677 std::ostringstream buf{};
678 buf <<
"RDataFrame stopped processing after ";
679 buf << processedEntries;
680 buf <<
" entries, whereas an entry range (begin=";
684 buf <<
") was requested. Consider adjusting the end value of the entry range to a maximum of ";
687 Warning(
"RDataFrame::Run",
"%s", buf.str().c_str());
722 actionPtr->Run(slot, entry);
724 namedFilterPtr->CheckFilters(slot, entry);
736 ptr->InitSlot(
r, slot);
738 ptr->InitSlot(
r, slot);
740 ptr->InitSlot(
r, slot);
742 ptr->InitSlot(
r, slot);
764 "Empty source, range: {" + std::to_string(range.first) +
", " + std::to_string(range.second) +
"}", range);
769 auto *tree =
r.GetTree()->GetTree();
772 auto *file = tree->GetCurrentFile();
773 const std::string fname = file !=
nullptr ? file->GetName() :
"#inmemorytree#";
775 std::pair<Long64_t, Long64_t> range =
r.GetEntriesRange();
777 if (range.second == -1) {
778 range.second = tree->GetEntries();
781 const std::string &
id = fname + (treename.rfind(
'/', 0) == 0 ?
"" :
"/") + treename;
786 throw std::runtime_error(
"Full sample identifier '" +
id +
"' cannot be found in the available samples.");
799 if (
auto shared =
fSlotStack.lock(); shared) {
802 return std::make_shared<ROOT::Internal::RSlotStack>(
fNSlots);
840 ptr->ResetChildrenCount();
842 ptr->ResetChildrenCount();
854 ptr->FinalizeSlot(slot);
856 ptr->FinalizeSlot(slot);
858 ptr->FinalizeSlot(slot);
874 if (GetCodeToJit().empty() && GetCodeToDeclare().empty()) {
881 std::string codeToDeclare, code;
884 codeToDeclare.swap(GetCodeToDeclare());
885 code.swap(GetCodeToJit());
890 if (!codeToDeclare.empty()) {
891 DeclareAndRetrieveDeferredJitCalls(codeToDeclare);
899 :
" in less than 1ms.");
911 const auto &funcMap = GetJitHelperFuncMap();
913 funcMap.at(call.fFunctionId)(call.fColNames, *call.fColRegister, *
this, call.fJittedNode.get(),
919 << (realTime > 1
e-3 ?
" in " + std::to_string(realTime) +
" seconds."
920 :
" in less than 1ms.");
936 actionPtr->TriggerChildrenCount();
938 namedFilterPtr->TriggerChildrenCount();
947 MaxTreeSizeRAII ctxtmts;
965 class NodesCleanerRAII {
969 NodesCleanerRAII(
RLoopManager &thisRLM) : fRLM(thisRLM) {}
973 NodesCleanerRAII runKeeper(*
this);
980 throw std::runtime_error(
"RDataFrame: executing the computation graph without a data source, aborting.");
1070 fPtr->FillReport(rep);
1076 GetCodeToJit().append(code);
1080 std::unique_ptr<ROOT::Internal::RDF::RColumnRegister> colRegister,
1081 const std::vector<std::string> &colNames, std::shared_ptr<void> jittedNode,
1082 std::shared_ptr<void> argument)
1084 auto &funcBodyToFuncIdMap = GetJitFuncBodyToFuncIdMap();
1087 auto match = funcBodyToFuncIdMap.find(
fStringHasher(funcBody));
1088 if (match != funcBodyToFuncIdMap.end()) {
1090 std::string funcName =
"jitNodeRegistrator_" + std::to_string(match->second);
1092 fJitHelperCalls.emplace_back(match->second, std::move(colRegister), colNames, jittedNode, argument);
1100 auto registratorId = funcBodyToFuncIdMap.size();
1101 std::string funcName =
"jitNodeRegistrator_" + std::to_string(registratorId);
1102 [[maybe_unused]]
auto res = funcBodyToFuncIdMap.insert({
fStringHasher(funcBody), registratorId});
1105 std::string toDeclare =
"namespace R_rdf {\n void " + funcName + funcBody +
"\n}\n";
1108 GetCodeToDeclare().append(toDeclare);
1109 fJitHelperCalls.emplace_back(registratorId, std::move(colRegister), colNames, jittedNode, argument);
1115 if (everyNEvents == 0ull)
1123 std::vector<std::string>
filters;
1125 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
1148 std::unordered_map<
void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap)
1151 auto duplicateRLoopManagerIt = visitedMap.find((
void *)
this);
1152 if (duplicateRLoopManagerIt != visitedMap.end())
1153 return duplicateRLoopManagerIt->second;
1161 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(
1163 visitedMap[(
void *)
this] = thisNode;
1170 const auto key = MakeDatasetColReadersKey(col, ti);
1179 std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
1180 const std::type_info &ti)
1182 const auto key = MakeDatasetColReadersKey(col, ti);
1184 assert(readers.size() ==
fNSlots);
1186 for (
auto slot = 0u; slot <
fNSlots; ++slot) {
1192 const std::type_info &ti,
TTreeReader *treeReader)
1195 const auto key = MakeDatasetColReadersKey(col, ti);
1197 assert(readers.find(key) == readers.end() || readers[key] ==
nullptr);
1198 assert(
fDataSource &&
"Missing RDataSource to add column reader.");
1202 return readers[key].get();
1208 const auto key = MakeDatasetColReadersKey(col, ti);
1210 return it->second.get();
1237std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1248 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlob);
1249 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1253std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1255 const std::vector<std::string> &defaultColumns,
bool checkFile)
1257 if (fileNameGlobs.size() == 0)
1258 throw std::invalid_argument(
"RDataFrame: empty list of input files.");
1265 auto dataSource = std::make_unique<ROOT::Internal::RDF::RTTreeDS>(datasetName, fileNameGlobs);
1266 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1270std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1274 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlob);
1275 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1279std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1283 auto dataSource = std::make_unique<ROOT::RDF::RNTupleDS>(datasetName, fileNameGlobs);
1284 auto lm = std::make_shared<ROOT::Detail::RDF::RLoopManager>(std::move(dataSource), defaultColumns);
1288std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1295 if (IsObjectInDir<TTree>(datasetName, *inFile)) {
1297 }
else if (IsObjectInDir<ROOT::RNTuple>(datasetName, *inFile)) {
1301 std::string errMsg = IsObjectInDir<void>(datasetName, *inFile) ?
"unsupported data format for" :
"cannot find";
1303 throw std::invalid_argument(
"RDataFrame: " + errMsg +
" dataset \"" + std::string(datasetName) +
"\" in file \"" +
1304 inFile->GetName() +
"\".");
1307std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
1312 if (fileNameGlobs.size() == 0)
1313 throw std::invalid_argument(
"RDataFrame: empty list of input files.");
1317 if (IsObjectInDir<TTree>(datasetName, *inFile)) {
1319 }
else if (IsObjectInDir<ROOT::RNTuple>(datasetName, *inFile)) {
1323 std::string errMsg = IsObjectInDir<void>(datasetName, *inFile) ?
"unsupported data format for" :
"cannot find";
1325 throw std::invalid_argument(
"RDataFrame: " + errMsg +
" dataset \"" + std::string(datasetName) +
"\" in file \"" +
1326 inFile->GetName() +
"\".");
1343 std::atomic<ULong64_t> &entryCount)
1347 const auto &slot = slotRAII.
fSlot;
1349 const auto &[
start, end] = entryRange;
1350 const auto nEntries = end -
start;
1351 entryCount.fetch_add(nEntries);
1361 for (
auto entry =
start; entry < end; ++entry) {
1367 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
1378 std::atomic<ULong64_t> &entryCount)
1382 const auto &slot = slotRAII.
fSlot;
1385 const auto &[
start, end] = entryRange;
1386 const auto nEntries = end -
start;
1387 auto count = entryCount.fetch_add(nEntries);
1396 while (validTTreeReaderRead(treeReader)) {
1403 std::cerr <<
"RDataFrame::Run: event loop was interrupted\n";
1410 throw std::runtime_error(
"An error was encountered while processing the data. TTreeReader status code is: " +
1430 const std::
vector<std::
string> &colNamesArg, std::shared_ptr<
void> jittedNode, std::shared_ptr<
void> argPtr)
#define R__LOG_DEBUG(DEBUGLEVEL,...)
std::unique_ptr< TFile > OpenFileWithSanityChecks(std::string_view fileNameGlob)
Helper function to open a file (or the first file from a glob).
Basic types used by ROOT and required by TInterpreter.
long long Long64_t
Portable signed long integer 8 bytes.
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
#define R__WRITE_LOCKGUARD(mutex)
#define R__READ_LOCKGUARD(mutex)
The head node of a RDF computation graph.
RColumnReaderBase * AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti, TTreeReader *treeReader)
void UpdateSampleInfo(unsigned int slot, const std::pair< ULong64_t, ULong64_t > &range)
unsigned int fNRuns
Number of event loops run.
bool CheckFilters(unsigned int, Long64_t) final
void RegisterJitHelperCall(const std::string &funcBody, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > colRegister, const std::vector< std::string > &colnames, std::shared_ptr< void > jittedNode, std::shared_ptr< void > argument=nullptr)
void EvalChildrenCounts()
Trigger counting of number of children nodes for each node of the functional graph.
friend struct ROOT::Internal::RDF::RDSRangeRAII
void CleanUpNodes()
Perform clean-up operations. To be called at the end of each event loop.
void RunEmptySource()
Run event loop with no source files, in sequence.
void SetEmptyEntryRange(std::pair< ULong64_t, ULong64_t > &&newRange)
void Report(ROOT::RDF::RCutFlowReport &rep) const final
Call FillReport on all booked filters.
void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback)
std::vector< RFilterBase * > fBookedNamedFilters
Contains a subset of fBookedFilters, i.e. only the named filters.
void RunEmptySourceMT()
Run event loop with no source files, in parallel.
ULong64_t GetNEmptyEntries() const
std::hash< std::string > fStringHasher
std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > fSampleMap
Keys are fname + "/" + treename as RSampleInfo::fID; Values are pointers to the corresponding sample.
void AddDataSourceColumnReaders(std::string_view col, std::vector< std::unique_ptr< RColumnReaderBase > > &&readers, const std::type_info &ti)
std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > GetGraph(std::unordered_map< void *, std::shared_ptr< ROOT::Internal::RDF::GraphDrawing::GraphNode > > &visitedMap) final
void ToJitExec(const std::string &) const
std::vector< RDFInternal::RActionBase * > GetAllActions() const
Return all actions, either booked or already run.
std::vector< ROOT::RDF::RSampleInfo > fSampleInfos
std::set< std::string > fSuppressErrorsForMissingBranches
bool fMustRunNamedFilters
void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec)
Changes the internal TTree held by the RLoopManager.
std::weak_ptr< ROOT::Internal::RSlotStack > fSlotStack
Pointer to a shared slot stack in case this instance runs concurrently with others:
std::vector< RDefineBase * > fBookedDefines
void TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data s...
friend struct RCallCleanUpTask
std::vector< RDFInternal::RActionBase * > fRunActions
Non-owning pointers to actions already run.
RLoopManager(const ColumnNames_t &defaultColumns={})
std::vector< RRangeBase * > fBookedRanges
std::vector< ROOT::RDF::Experimental::RSample > fSamples
Samples need to survive throughout the whole event loop, hence stored as an attribute.
std::vector< std::string > ColumnNames_t
void RunAndCheckFilters(unsigned int slot, Long64_t entry)
Execute actions and make sure named filters are called for each event.
void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end)
std::vector< RFilterBase * > fBookedFilters
void Run(bool jit=true)
Start the event loop with a different mechanism depending on IMT/no IMT, data source/no data source.
std::unordered_map< void *, ROOT::RDF::SampleCallback_t > fSampleCallbacks
Registered callbacks to call at the beginning of each "data block".
std::vector< RDFInternal::RActionBase * > fBookedActions
Non-owning pointers to actions to be run.
ColumnNames_t fDefaultColumns
void SetupSampleCallbacks(TTreeReader *r, unsigned int slot)
void CleanUpTask(TTreeReader *r, unsigned int slot)
Perform clean-up operations. To be called at the end of each task execution.
std::vector< RDFInternal::RCallback > fCallbacksEveryNEvents
Registered callbacks to be executed every N events.
std::vector< std::unordered_map< std::string, std::unique_ptr< RColumnReaderBase > > > fDatasetColumnReaders
Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
void Register(RDFInternal::RActionBase *actionPtr)
std::vector< DeferredJitCall > fJitHelperCalls
const ColumnNames_t & GetDefaultColumnNames() const
Return the list of default columns – empty if none was provided when constructing the RDataFrame.
std::vector< RDFInternal::RVariationBase * > fBookedVariations
std::vector< RNodeBase * > GetGraphEdges() const
Return all graph edges known to RLoopManager This includes Filters and Ranges but not Defines.
RDataSource * GetDataSource() const
unsigned int GetNSlots() const
void RunDataSourceMT()
Run event loop over data accessed through a DataSource, in parallel.
std::vector< std::string > GetFiltersNames()
For each booked filter, returns either the name or "Unnamed Filter".
RDFInternal::RNewSampleNotifier fNewSampleNotifier
std::pair< ULong64_t, ULong64_t > fEmptyEntryRange
Range of entries created when no data source is specified.
std::unique_ptr< RDataSource > fDataSource
Owning pointer to a data-source object.
void DataSourceThreadTask(const std::pair< ULong64_t, ULong64_t > &entryRange, ROOT::Internal::RSlotStack &slotStack, std::atomic< ULong64_t > &entryCount)
The task run by every thread on the input entry range, for the generic RDataSource.
void InitNodeSlots(TTreeReader *r, unsigned int slot)
Build TTreeReaderValues for all nodes This method loops over all filters, actions and other booked ob...
std::vector< RDFInternal::ROneTimeCallback > fCallbacksOnce
Registered callbacks to invoke just once before running the loop.
void SetDataSource(std::unique_ptr< ROOT::RDF::RDataSource > dataSource)
void RegisterCallback(ULong64_t everyNEvents, std::function< void(unsigned int)> &&f)
void SetTTreeLifeline(std::any lifeline)
void RunDataSource()
Run event loop over data accessed through a DataSource, in sequence.
void Jit()
Add RDF nodes that require just-in-time compilation to the computation graph.
RColumnReaderBase * GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const
std::shared_ptr< ROOT::Internal::RSlotStack > SlotStack() const
Create a slot stack with the desired number of slots or reuse a shared instance.
void Deregister(RDFInternal::RActionBase *actionPtr)
ELoopType fLoopType
The kind of event loop that is going to be run (e.g. on ROOT files, on no files).
void InitNodes()
Initialize all nodes of the functional graph before running the event loop.
bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const
Return true if AddDataSourceColumnReaders was called for column name col.
unsigned int fNStopsReceived
Number of times that a children node signaled to stop processing entries.
unsigned int fNChildren
Number of nodes of the functional graph hanging from this object.
virtual ROOT::RDF::SampleCallback_t GetSampleCallback()=0
A binder for user-defined columns, variations and aliases.
This type includes all parts of RVariation that do not depend on the callable signature.
A thread-safe list of N indexes (0 to size - 1).
The dataset specification for RDataFrame.
RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
virtual void Finalize()
Convenience method called after concluding an event-loop.
virtual std::string GetLabel()
Return a string representation of the datasource type.
This type represents a sample identifier, to be used in conjunction with RDataFrame features such as ...
This class provides a simple interface to execute the same task multiple times in parallel threads,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
TDirectory::TContext keeps track and restore the current directory.
Describe directory structure in memory.
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
virtual TKey * GetKey(const char *, Short_t=9999) const
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
std::pair< Long64_t, Long64_t > GetEntriesRange() const
Get the begin and end entry numbers.
@ kIndexedFriendNoMatch
A friend with TTreeIndex doesn't have an entry for this index.
@ kMissingBranchWhenSwitchingTree
A branch was not found when switching to the next TTree in the chain.
@ kEntryBeyondEnd
last entry loop has reached its end
@ kEntryValid
data read okay
EEntryStatus GetEntryStatus() const
bool Next()
Move to the next entry (or index of the TEntryList if that is set).
A TTree represents a columnar dataset.
static void SetMaxTreeSize(Long64_t maxsize=100000000000LL)
Set the maximum size in bytes of a Tree file (static function).
This class represents a WWW compatible URL.
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
Retrieve the full path(s) to a TTree or the trees in a TChain.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns, bool checkFile=true)
Create an RLoopManager that reads a TChain.
ROOT::RLogChannel & RDFLogChannel()
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromFile(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager opening a file and checking the data format of the dataset.
std::shared_ptr< ROOT::Detail::RDF::RLoopManager > CreateLMFromRNTuple(std::string_view datasetName, std::string_view fileNameGlob, const std::vector< std::string > &defaultColumns)
Create an RLoopManager that reads an RNTuple.
Special implementation of ROOT::RRangeCast for TCollection, including a check that the cast target ty...
void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun)
ROOT::RDF::RSampleInfo CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, const std::unordered_map< std::string, ROOT::RDF::Experimental::RSample * > &sampleMap)
void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set< std::string > &suppressErrorsForMissingColumns)
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v.
std::unique_ptr< ROOT::Detail::RDF::RColumnReaderBase > CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *treeReader)
void InterpreterCalc(const std::string &code, const std::string &context="")
Jit code in the interpreter with TInterpreter::Calc, throw in case of errors.
void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm)
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::string > ExpandGlob(const std::string &glob)
Expands input glob into a collection of full paths to files.
auto MakeAliasedSharedPtr(T *rawPtr)
std::function< void(unsigned int, const ROOT::RDF::RSampleInfo &)> SampleCallback_t
The type of a data-block callback, registered with an RDataFrame computation graph via e....
std::vector< std::string > ColumnNames_t
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
externTVirtualRWMutex * gCoreMutex
RCallCleanUpTask(RLoopManager &lm, unsigned int arg=0u, TTreeReader *reader=nullptr)
RLoopManager & fLoopManager
std::vector< std::string > fColNames
std::shared_ptr< void > fJittedNode
std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > fColRegister
DeferredJitCall(std::size_t id, std::unique_ptr< ROOT::Internal::RDF::RColumnRegister > cols, const std::vector< std::string > &colNamesArg, std::shared_ptr< void > jittedNode, std::shared_ptr< void > arg)
std::shared_ptr< void > fExtraArgs
ROOT::Detail::RDF::RLoopManager & fLM
RDSRangeRAII(ROOT::Detail::RDF::RLoopManager &lm, unsigned int slot, ULong64_t firstEntry, TTreeReader *treeReader=nullptr)
TTreeReader * fTreeReader
A RAII object to pop and push slot numbers from a RSlotStack object.