36using EntryRange = std::pair<Long64_t, Long64_t>;
39bool ClustersAreSortedAndContiguous(
const std::vector<std::vector<EntryRange>> &cls)
42 for (
const auto &fcl : cls) {
43 for (
const auto &
c : fcl) {
44 if (last_end !=
c.first)
59std::vector<std::vector<EntryRange>>
60ConvertToElistClusters(std::vector<std::vector<EntryRange>> &&clusters,
TEntryList &entryList,
61 const std::vector<std::string> &treeNames,
const std::vector<std::string> &fileNames,
62 const std::vector<Long64_t> &entriesPerFile)
65 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
67 const bool listHasGlobalEntryNumbers = entryList.
GetLists() ==
nullptr;
68 const auto nFiles = clusters.
size();
70 std::unique_ptr<TChain> chain;
75 if (listHasGlobalEntryNumbers) {
83 for (
auto i = 0u; i < nFiles; ++i)
84 chain->Add((fileNames[i] +
"?#" + treeNames[i]).c_str(), entriesPerFile[i]);
88 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
89 if (localEntry == -1ll)
91 return localEntry + ch->GetTreeOffset()[treenum];
100 std::vector<std::vector<EntryRange>> elistClusters;
102 for (
auto fileN = 0u; fileN < nFiles; ++fileN) {
103 std::vector<EntryRange> elistClustersForFile;
104 for (
const auto &
c : clusters[fileN]) {
105 if (entry >=
c.second || entry == -1ll)
108 const Long64_t elistRangeStart = elistEntry;
110 while (entry <
c.second && entry != -1ll)
111 entry = Next(elistEntry, entryList, chain.get());
112 elistClustersForFile.emplace_back(EntryRange{elistRangeStart, elistEntry});
114 elistClusters.emplace_back(std::move(elistClustersForFile));
117 R__ASSERT(elistClusters.size() == clusters.size());
118 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
121 return elistClusters;
125using ClustersAndEntries = std::pair<std::vector<std::vector<EntryRange>>, std::vector<Long64_t>>;
129ClustersAndEntries MakeClusters(
const std::vector<std::string> &treeNames,
130 const std::vector<std::string> &fileNames,
const unsigned int maxTasksPerFile,
131 const EntryRange &range = {0, std::numeric_limits<Long64_t>::max()})
136 const auto nFileNames = fileNames.size();
137 std::vector<std::vector<EntryRange>> clustersPerFile;
138 std::vector<Long64_t> entriesPerFile;
139 entriesPerFile.reserve(nFileNames);
141 bool rangeEndReached =
false;
142 for (
auto i = 0u; i < nFileNames && !rangeEndReached; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
147 fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
148 if (!
f ||
f->IsZombie()) {
149 const auto msg =
"TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName +
"\"";
150 throw std::runtime_error(msg);
152 auto *t =
f->Get<
TTree>(treeName.c_str());
155 const auto msg =
"TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
156 "\" from file \"" + fileName +
"\"";
157 throw std::runtime_error(msg);
163 auto clusterIter = t->GetClusterIterator(0);
164 Long64_t clusterStart = 0ll, clusterEnd = 0ll;
165 const Long64_t entries = t->GetEntries();
167 std::vector<EntryRange> entryRanges;
168 while ((clusterStart = clusterIter()) < entries && !rangeEndReached) {
169 clusterEnd = clusterIter.GetNextEntry();
179 const auto currentStart = std::max(clusterStart + offset, range.first);
180 const auto currentEnd = std::min(clusterEnd + offset, range.second);
183 if (currentStart < currentEnd)
184 entryRanges.emplace_back(EntryRange{currentStart, currentEnd});
185 if (currentEnd == range.second)
186 rangeEndReached =
true;
189 clustersPerFile.emplace_back(std::move(entryRanges));
191 entriesPerFile.emplace_back(entries);
193 if (range.first >= offset && offset > 0)
194 throw std::logic_error(std::string(
"A range of entries was passed in the creation of the TTreeProcessorMT, ") +
195 "but the starting entry (" + range.first +
") is larger than the total number of " +
196 "entries (" + offset +
") in the dataset.");
213 std::vector<std::vector<EntryRange>> eventRangesPerFile(clustersPerFile.size());
214 auto clustersPerFileIt = clustersPerFile.begin();
215 auto eventRangesPerFileIt = eventRangesPerFile.begin();
216 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
217 const auto clustersInThisFileSize = clustersPerFileIt->size();
218 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
222 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
227 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
228 const auto &clustersInThisFile = *clustersPerFileIt;
229 for (
auto i = 0ULL; i < clustersInThisFileSize; ++i) {
230 const auto start = clustersInThisFile[i].first;
235 if (nReminderClusters > 0) {
239 const auto end = clustersInThisFile[i].second;
240 eventRangesPerFileIt->emplace_back(EntryRange({
start, end}));
244 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
270 const auto nFilesToProcess = nEntries.size();
271 for (
auto i = 0u; i < nFilesToProcess; ++i) {
272 fChain->Add((fileNames[i] +
"?#" + treeNames[i]).c_str(), nEntries[i]);
278 R__ASSERT(nFriends ==
fFriends.size() &&
"Created the wrong number of friends from the available information.");
279 for (std::size_t i = 0ul; i < nFriends; i++) {
280 const auto &thisFriendAlias = friendInfo.
fFriendNames[i].second;
287std::unique_ptr<TTreeReader>
290 const TEntryList &entryList,
const std::vector<Long64_t> &nEntries,
291 const std::set<std::string> &suppressErrorsForMissingBranches)
293 const bool hasEntryList = entryList.
GetN() > 0;
294 const bool usingLocalEntries = friendInfo.
fFriendNames.empty() && !hasEntryList;
295 const bool needNewChain =
296 fChain ==
nullptr || (usingLocalEntries && (fileNames[0] !=
fChain->GetListOfFiles()->At(0)->GetTitle() ||
297 treeNames[0] !=
fChain->GetListOfFiles()->At(0)->GetName()));
299 MakeChain(treeNames, fileNames, friendInfo, nEntries);
301 fEntryList = std::make_unique<TEntryList>(entryList);
310 auto reader = std::make_unique<TTreeReader>(
fChain.get(),
fEntryList.get(),
false,
311 suppressErrorsForMissingBranches);
312 reader->SetEntriesRange(
start, end);
332 std::vector<std::string> treeNames;
335 throw std::runtime_error(
"Empty list of files and no tree name provided");
339 std::string treeName;
341 TIter next(
f->GetListOfKeys());
342 while (
auto *key =
static_cast<TKey *
>(next())) {
343 const char *className = key->GetClassName();
344 if (strcmp(className,
"TTree") == 0) {
345 treeName = key->GetName();
349 if (treeName.empty())
350 throw std::runtime_error(
"Cannot find any tree in file " + fname);
351 treeNames.emplace_back(std::move(treeName));
366 const EntryRange &globalRange)
367 : fFileNames({std::string(filename)}),
368 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
369 fPool(nThreads), fGlobalRange(globalRange)
377 throw std::runtime_error(
"The provided list of file names is empty");
379 std::vector<std::string> strings;
380 strings.reserve(views.size());
381 for (
const auto &
v : views)
382 strings.emplace_back(
v);
399 UInt_t nThreads,
const EntryRange &globalRange)
401 fTreeNames(treename.empty() ? FindTreeNames()
402 : std::vector<std::string>(fFileNames.
size(), std::string(treename))),
403 fFriendInfo(), fPool(nThreads), fGlobalRange(globalRange)
415 const std::set<std::string> &suppressErrorsForMissingBranches)
433 const std::set<std::string> &suppressErrorsForMissingBranches)
438 fGlobalRange(globalRange),
439 fSuppressErrorsForMissingBranches(suppressErrorsForMissingBranches)
463 const unsigned int maxTasksPerFile =
471 const bool hasFriends = !
fFriendInfo.fFriendNames.empty();
472 const bool hasEntryList =
fEntryList.GetN() > 0;
473 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList ||
fGlobalRange.first > 0 ||
474 fGlobalRange.second != std::numeric_limits<Long64_t>::max();
475 ClustersAndEntries allClusterAndEntries{};
476 auto &allClusters = allClusterAndEntries.first;
477 const auto &allEntries = allClusterAndEntries.second;
478 if (shouldRetrieveAllClusters) {
485 auto processFileUsingGlobalClusters = [&](std::size_t fileIdx) {
486 auto processCluster = [&](
const EntryRange &
c) {
491 fPool.Foreach(processCluster, allClusters[fileIdx]);
495 auto processFileRetrievingClusters = [&](std::size_t fileIdx) {
497 const auto &treeNames = std::vector<std::string>({
fTreeNames[fileIdx]});
498 const auto &fileNames = std::vector<std::string>({
fFileNames[fileIdx]});
499 const auto clustersAndEntries = MakeClusters(treeNames, fileNames, maxTasksPerFile);
500 const auto &clusters = clustersAndEntries.first[0];
501 const auto &entries = clustersAndEntries.second[0];
502 auto processCluster = [&](
const EntryRange &
c) {
507 fPool.Foreach(processCluster, clusters);
510 const auto firstNonEmpty =
511 fGlobalRange.first > 0u ? std::distance(allClusters.begin(), std::find_if(allClusters.begin(), allClusters.end(),
512 [](
auto &
c) { return !c.empty(); }))
515 std::vector<std::size_t> fileIdxs(allEntries.empty() ?
fFileNames.size() : allEntries.size() - firstNonEmpty);
516 std::iota(fileIdxs.begin(), fileIdxs.end(), firstNonEmpty);
518 if (shouldRetrieveAllClusters)
519 fPool.Foreach(processFileUsingGlobalClusters, fileIdxs);
521 fPool.Foreach(processFileRetrievingClusters, fileIdxs);
524 for (
unsigned int islot = 0; islot <
fTreeView.GetNSlots(); ++islot) {
526 if (view !=
nullptr) {
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
long long Long64_t
Portable signed long integer 8 bytes.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::set< std::string > &suppressErrorsForMissingBranches)
Get a TTreeReader for the current tree of this view.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
ROOT::TreeUtils::RFriendInfo fFriendInfo
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames).
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
std::set< std::string > fSuppressErrorsForMissingBranches
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u, const std::pair< Long64_t, Long64_t > &globalRange={0, std::numeric_limits< Long64_t >::max()})
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
std::pair< Long64_t, Long64_t > fGlobalRange
A chain is a collection of files containing TTree objects.
TDirectory::TContext keeps track and restore the current directory.
virtual TList * GetLists() const
virtual Long64_t GetEntry(Long64_t index)
virtual Long64_t GetN() const
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
@ kCanDelete
if object in a list can be deleted
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
A TTree represents a columnar dataset.
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
void ClearMustCleanupBits(TObjArray &arr)
Reset the kMustCleanup bit of a TObjArray of TBranch objects (e.g.
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Information about friend trees of a certain TTree or TChain object.
std::vector< std::pair< std::string, std::string > > fFriendNames
Pairs of names and aliases of each friend tree/chain.