41static bool ClustersAreSortedAndContiguous(
const std::vector<std::vector<EntryCluster>> &cls)
44 for (
const auto &fcl : cls) {
45 for (
const auto &
c : fcl) {
46 if (last_end !=
c.start)
61static std::vector<std::vector<EntryCluster>>
62ConvertToElistClusters(std::vector<std::vector<EntryCluster>> &&clusters,
TEntryList &entryList,
63 const std::vector<std::string> &treeNames,
const std::vector<std::string> &fileNames,
64 const std::vector<Long64_t> &entriesPerFile)
67 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
69 const bool listHasGlobalEntryNumbers = entryList.
GetLists() ==
nullptr;
70 const auto nFiles = clusters.size();
72 std::unique_ptr<TChain> chain;
77 if (listHasGlobalEntryNumbers) {
85 for (
auto i = 0u; i < nFiles; ++i)
86 chain->
Add((fileNames[i] +
"?#" + treeNames[i]).c_str(), entriesPerFile[i]);
90 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
91 if (localEntry == -1ll)
93 return localEntry + ch->GetTreeOffset()[treenum];
102 std::vector<std::vector<EntryCluster>> elistClusters;
104 for (
auto fileN = 0u; fileN < nFiles; ++fileN) {
105 std::vector<EntryCluster> elistClustersForFile;
106 for (
const auto &
c : clusters[fileN]) {
107 if (entry >=
c.end || entry == -1ll)
110 const Long64_t elistRangeStart = elistEntry;
112 while (entry <
c.end && entry != -1ll)
113 entry = Next(elistEntry, entryList, chain.get());
114 elistClustersForFile.emplace_back(EntryCluster{elistRangeStart, elistEntry});
116 elistClusters.emplace_back(std::move(elistClustersForFile));
119 R__ASSERT(elistClusters.size() == clusters.size());
120 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
123 return elistClusters;
127using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
131static ClustersAndEntries MakeClusters(
const std::vector<std::string> &treeNames,
132 const std::vector<std::string> &fileNames,
const unsigned int maxTasksPerFile)
137 const auto nFileNames = fileNames.size();
138 std::vector<std::vector<EntryCluster>> clustersPerFile;
139 std::vector<Long64_t> entriesPerFile;
140 entriesPerFile.reserve(nFileNames);
142 for (
auto i = 0u; i < nFileNames; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
147 fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
148 if (!
f ||
f->IsZombie()) {
149 const auto msg =
"TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName +
"\"";
150 throw std::runtime_error(msg);
152 auto *t =
f->Get<
TTree>(treeName.c_str());
155 const auto msg =
"TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
156 "\" from file \"" + fileName +
"\"";
157 throw std::runtime_error(msg);
162 auto clusterIter = t->GetClusterIterator(0);
164 const Long64_t entries = t->GetEntries();
166 std::vector<EntryCluster> clusters;
167 while ((start = clusterIter()) < entries) {
168 end = clusterIter.GetNextEntry();
170 clusters.emplace_back(EntryCluster{start + offset, end + offset});
173 clustersPerFile.emplace_back(std::move(clusters));
174 entriesPerFile.emplace_back(entries);
192 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
193 auto clustersPerFileIt = clustersPerFile.begin();
194 auto eventRangesPerFileIt = eventRangesPerFile.begin();
195 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
196 const auto clustersInThisFileSize = clustersPerFileIt->size();
197 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
201 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
206 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
207 const auto &clustersInThisFile = *clustersPerFileIt;
208 for (
auto i = 0ULL; i < clustersInThisFileSize; ++i) {
209 const auto start = clustersInThisFile[i].start;
214 if (nReminderClusters > 0) {
218 const auto end = clustersInThisFile[i].end;
219 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
223 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
235 std::vector<std::vector<Long64_t>> friendEntries;
236 const auto nFriends = friendNames.size();
237 for (
auto i = 0u; i < nFriends; ++i) {
238 std::vector<Long64_t> nEntries;
239 const auto &thisFriendName = friendNames[i].first;
240 const auto &thisFriendFiles = friendFileNames[i];
241 const auto &thisFriendChainSubNames = friendChainSubNames[i];
246 if (!thisFriendChainSubNames.empty()) {
248 for (
auto fileidx = 0u; fileidx < thisFriendFiles.size(); ++fileidx) {
249 std::unique_ptr<TFile> curfile(
250 TFile::Open(thisFriendFiles[fileidx].c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
251 if (!curfile || curfile->IsZombie())
252 throw std::runtime_error(
"TTreeProcessorMT::GetFriendEntries: Could not open file \"" +
253 thisFriendFiles[fileidx] +
"\"");
256 TTree *curtree = curfile->Get<
TTree>(thisFriendChainSubNames[fileidx].c_str());
258 throw std::runtime_error(
"TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
259 thisFriendChainSubNames[fileidx] +
"\" from file \"" +
260 thisFriendFiles[fileidx] +
"\"");
267 for (
const auto &fname : thisFriendFiles) {
268 std::unique_ptr<TFile>
f(
TFile::Open(fname.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
269 if (!
f ||
f->IsZombie())
270 throw std::runtime_error(
"TTreeProcessorMT::GetFriendEntries: Could not open file \"" + fname +
"\"");
273 throw std::runtime_error(
"TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
274 thisFriendName +
"\" from file \"" + fname +
"\"");
279 friendEntries.emplace_back(std::move(nEntries));
282 return friendEntries;
302 const std::vector<std::vector<Long64_t>> &friendEntries)
310 const auto nFiles = fileNames.size();
311 for (
auto i = 0u; i < nFiles; ++i) {
312 fChain->Add((fileNames[i] +
"?#" + treeNames[i]).c_str(), nEntries[i]);
317 const auto nFriends = friendNames.size();
318 for (
auto i = 0u; i < nFriends; ++i) {
319 const auto &thisFriendNameAlias = friendNames[i];
320 const auto &thisFriendName = thisFriendNameAlias.first;
321 const auto &thisFriendAlias = thisFriendNameAlias.second;
322 const auto &thisFriendFiles = friendFileNames[i];
323 const auto &thisFriendChainSubNames = friendChainSubNames[i];
324 const auto &thisFriendEntries = friendEntries[i];
328 const auto nFileNames = friendFileNames[i].size();
329 if (thisFriendChainSubNames.empty()) {
332 for (
auto j = 0u; j < nFileNames; ++j) {
333 frChain->Add(thisFriendFiles[j].c_str(), thisFriendEntries[j]);
338 for (
auto j = 0u; j < nFileNames; ++j) {
339 frChain->Add((thisFriendFiles[j] +
"?#" + thisFriendChainSubNames[j]).c_str(), thisFriendEntries[j]);
344 fChain->AddFriend(frChain.get(), thisFriendAlias.c_str());
345 fFriends.emplace_back(std::move(frChain));
351std::unique_ptr<TTreeReader>
354 const TEntryList &entryList,
const std::vector<Long64_t> &nEntries,
355 const std::vector<std::vector<Long64_t>> &friendEntries)
357 const bool hasEntryList = entryList.
GetN() > 0;
358 const bool usingLocalEntries = friendInfo.
fFriendNames.empty() && !hasEntryList;
359 const bool needNewChain =
360 fChain ==
nullptr || (usingLocalEntries && (fileNames[0] !=
fChain->GetListOfFiles()->At(0)->GetTitle() ||
361 treeNames[0] !=
fChain->GetListOfFiles()->At(0)->GetName()));
363 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
374 auto reader = std::make_unique<TTreeReader>(
fChain.get(),
fEntryList.get());
375 reader->SetEntriesRange(start, end);
395 std::vector<std::string> treeNames;
398 throw std::runtime_error(
"Empty list of files and no tree name provided");
402 std::string treeName;
404 TIter next(
f->GetListOfKeys());
405 while (
auto *key =
static_cast<TKey *
>(next())) {
406 const char *className = key->GetClassName();
407 if (strcmp(className,
"TTree") == 0) {
408 treeName = key->GetName();
412 if (treeName.empty())
413 throw std::runtime_error(
"Cannot find any tree in file " + fname);
414 treeNames.emplace_back(std::move(treeName));
428 : fFileNames({std::string(filename)}),
429 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
438 throw std::runtime_error(
"The provided list of file names is empty");
440 std::vector<std::string> strings;
441 strings.reserve(views.size());
442 for (
const auto &
v : views)
443 strings.emplace_back(
v);
461 fTreeNames(treename.empty() ? FindTreeNames()
462 : std::vector<std::string>(fFileNames.
size(), std::string(treename))),
463 fFriendInfo(), fPool(nThreads)
475 : fFileNames(Internal::
TreeUtils::GetFileNamesFromTree(
tree)),
476 fTreeNames(Internal::
TreeUtils::GetTreeFullPaths(
tree)), fEntryList(entries),
477 fFriendInfo(Internal::
TreeUtils::GetFriendInfo(
tree)), fPool(nThreads)
509 const unsigned int maxTasksPerFile =
519 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
520 ClustersAndEntries clusterAndEntries{};
521 if (shouldRetrieveAllClusters) {
524 clusterAndEntries.first = ConvertToElistClusters(std::move(clusterAndEntries.first),
fEntryList,
fTreeNames,
528 const auto &clusters = clusterAndEntries.first;
529 const auto &entries = clusterAndEntries.second;
532 const auto friendEntries = hasFriends ? GetFriendEntries(
fFriendInfo) : std::vector<std::vector<Long64_t>>{};
536 auto processFile = [&](std::size_t fileIdx) {
538 const auto &theseFiles = shouldRetrieveAllClusters ?
fFileNames : std::vector<std::string>({
fFileNames[fileIdx]});
540 const auto &theseTrees = shouldRetrieveAllClusters ?
fTreeNames : std::vector<std::string>({
fTreeNames[fileIdx]});
542 const auto theseClustersAndEntries =
543 shouldRetrieveAllClusters ? ClustersAndEntries{} : MakeClusters(theseTrees, theseFiles, maxTasksPerFile);
546 const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
549 const auto &theseEntries =
550 shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
552 auto processCluster = [&](
const EntryCluster &
c) {
554 theseEntries, friendEntries);
561 std::vector<std::size_t> fileIdxs(
fFileNames.size());
562 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
569 if (view !=
nullptr) {
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Get a TTreeReader for the current tree of this view.
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
T * GetAtSlotRaw(unsigned i) const
Access a particular slot which corresponds to a single thread.
unsigned GetNSlots() const
Return the number of currently available slot.
A class to process the entries of a TTree in parallel.
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
const Internal::TreeUtils::RFriendInfo fFriendInfo
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u)
Constructor based on a file name.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A chain is a collection of files containing TTree objects.
@ kWithoutGlobalRegistration
TDirectory::TContext keeps track and restore the current directory.
A List of entry numbers in a TTree or TChain.
virtual TList * GetLists() const
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Long64_t GetN() const
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
virtual void Add(TObject *obj)
@ kCanDelete
if object in a list can be deleted
@ kMustCleanup
if object destructor must call RecursiveRemove()
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
A TTree represents a columnar dataset.
virtual Long64_t GetEntries() const
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Information about friend trees of a certain TTree or TChain object.
std::vector< NameAlias > fFriendNames
Pairs of names and aliases of friend trees/chains.
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
std::vector< std::vector< std::string > > fFriendChainSubNames
Names of the subtrees of a friend TChain.