Logo ROOT  
Reference Guide
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
29
30using namespace ROOT;
31
32namespace {
33
34/// A cluster of entries
35struct EntryCluster {
36 Long64_t start;
37 Long64_t end;
38};
39
40// note that this routine assumes global entry numbers
41static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryCluster>> &cls)
42{
43 Long64_t last_end = 0ll;
44 for (const auto &fcl : cls) {
45 for (const auto &c : fcl) {
46 if (last_end != c.start)
47 return false;
48 last_end = c.end;
49 }
50 }
51 return true;
52}
53
54/// Take a vector of vectors of EntryClusters (a vector per file), filter the entries according to entryList, and
55/// and return a new vector of vectors of EntryClusters where cluster start/end entry numbers have been converted to
56/// TEntryList-local entry numbers.
57///
58/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
59/// ascending order, i.e., for n > m:
60/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
61static std::vector<std::vector<EntryCluster>>
62ConvertToElistClusters(std::vector<std::vector<EntryCluster>> &&clusters, TEntryList &entryList,
63 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
64 const std::vector<Long64_t> &entriesPerFile)
65{
66 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
67 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
68
69 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
70 const auto nFiles = clusters.size();
71
72 std::unique_ptr<TChain> chain;
73 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
74 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
75 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
76 NextFn_t Next;
77 if (listHasGlobalEntryNumbers) {
78 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
79 ++elEntry;
80 return elist.Next();
81 };
82 } else {
83 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
85 for (auto i = 0u; i < nFiles; ++i)
86 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
87 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
88 ++elEntry;
89 int treenum = -1;
90 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
91 if (localEntry == -1ll)
92 return localEntry;
93 return localEntry + ch->GetTreeOffset()[treenum];
94 };
95 }
96
97 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
98 // so we can be sure TEntryList::Next will return the correct thing
99 Long64_t elistEntry = 0ll;
100 Long64_t entry = entryList.GetEntry(elistEntry);
101
102 std::vector<std::vector<EntryCluster>> elistClusters;
103
104 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
105 std::vector<EntryCluster> elistClustersForFile;
106 for (const auto &c : clusters[fileN]) {
107 if (entry >= c.end || entry == -1ll) // no entrylist entries in this cluster
108 continue;
109 R__ASSERT(entry >= c.start); // current entry should never come before the cluster we are looking at
110 const Long64_t elistRangeStart = elistEntry;
111 // advance entry list until the entrylist entry goes beyond the end of the cluster
112 while (entry < c.end && entry != -1ll)
113 entry = Next(elistEntry, entryList, chain.get());
114 elistClustersForFile.emplace_back(EntryCluster{elistRangeStart, elistEntry});
115 }
116 elistClusters.emplace_back(std::move(elistClustersForFile));
117 }
118
119 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
120 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
121
122 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
123 return elistClusters;
124}
125
126// EntryClusters and number of entries per file
127using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
128
129////////////////////////////////////////////////////////////////////////
130/// Return a vector of cluster boundaries for the given tree and files.
131static ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
132 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile)
133{
134 // Note that as a side-effect of opening all files that are going to be used in the
135 // analysis once, all necessary streamers will be loaded into memory.
137 const auto nFileNames = fileNames.size();
138 std::vector<std::vector<EntryCluster>> clustersPerFile;
139 std::vector<Long64_t> entriesPerFile;
140 entriesPerFile.reserve(nFileNames);
141 Long64_t offset = 0ll;
142 for (auto i = 0u; i < nFileNames; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
145
146 std::unique_ptr<TFile> f(TFile::Open(
147 fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")); // need TFile::Open to load plugins if need be
148 if (!f || f->IsZombie()) {
149 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
150 throw std::runtime_error(msg);
151 }
152 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
153
154 if (!t) {
155 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
156 "\" from file \"" + fileName + "\"";
157 throw std::runtime_error(msg);
158 }
159
160 // Avoid calling TROOT::RecursiveRemove for this tree, it takes the read lock and we don't need it.
161 t->ResetBit(kMustCleanup);
163 auto clusterIter = t->GetClusterIterator(0);
164 Long64_t start = 0ll, end = 0ll;
165 const Long64_t entries = t->GetEntries();
166 // Iterate over the clusters in the current file
167 std::vector<EntryCluster> clusters;
168 while ((start = clusterIter()) < entries) {
169 end = clusterIter.GetNextEntry();
170 // Add the current file's offset to start and end to make them (chain) global
171 clusters.emplace_back(EntryCluster{start + offset, end + offset});
172 }
173 offset += entries;
174 clustersPerFile.emplace_back(std::move(clusters));
175 entriesPerFile.emplace_back(entries);
176 }
177
178 // Here we "fuse" clusters together if the number of clusters is too big with respect to
179 // the number of slots, otherwise we can incur in an overhead which is big enough
180 // to make parallelisation detrimental to performance.
181 // For example, this is the case when, following a merging of many small files, a file
182 // contains a tree with many entries and with clusters of just a few entries each.
183 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
184 // of files (e.g. 1000 files): the large amount of files might result in a large amount
185 // of tasks, but the elevated concurrency level makes the little synchronization required by
186 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
187 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
188 //
189 // The criterion according to which we fuse clusters together is to have around
190 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
191 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
192
193 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
194 auto clustersPerFileIt = clustersPerFile.begin();
195 auto eventRangesPerFileIt = eventRangesPerFile.begin();
196 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
197 const auto clustersInThisFileSize = clustersPerFileIt->size();
198 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
199 // If the number of clusters is less than maxTasksPerFile
200 // we take the clusters as they are
201 if (nFolds == 0) {
202 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
203 continue;
204 }
205 // Otherwise, we have to merge clusters, distributing the reminder evenly
206 // onto the first clusters
207 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
208 const auto &clustersInThisFile = *clustersPerFileIt;
209 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
210 const auto start = clustersInThisFile[i].start;
211 // We lump together at least nFolds clusters, therefore
212 // we need to jump ahead of nFolds-1.
213 i += (nFolds - 1);
214 // We now add a cluster if we have some reminder left
215 if (nReminderClusters > 0) {
216 i += 1U;
217 nReminderClusters--;
218 }
219 const auto end = clustersInThisFile[i].end;
220 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
221 }
222 }
223
224 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
225}
226
227////////////////////////////////////////////////////////////////////////
228/// Return a vector containing the number of entries of each file of each friend TChain
229static std::vector<std::vector<Long64_t>> GetFriendEntries(const Internal::TreeUtils::RFriendInfo &friendInfo)
230{
231
232 const auto &friendNames = friendInfo.fFriendNames;
233 const auto &friendFileNames = friendInfo.fFriendFileNames;
234 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
235
236 std::vector<std::vector<Long64_t>> friendEntries;
237 const auto nFriends = friendNames.size();
238 for (auto i = 0u; i < nFriends; ++i) {
239 std::vector<Long64_t> nEntries;
240 const auto &thisFriendName = friendNames[i].first;
241 const auto &thisFriendFiles = friendFileNames[i];
242 const auto &thisFriendChainSubNames = friendChainSubNames[i];
243 // If this friend has chain sub names, it means it's a TChain.
244 // In this case, we need to traverse all files that make up the TChain,
245 // retrieve the correct sub tree from each file and store the number of
246 // entries for that sub tree.
247 if (!thisFriendChainSubNames.empty()) {
248 // Traverse together filenames and respective treenames
249 for (auto fileidx = 0u; fileidx < thisFriendFiles.size(); ++fileidx) {
250 std::unique_ptr<TFile> curfile(
251 TFile::Open(thisFriendFiles[fileidx].c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
252 if (!curfile || curfile->IsZombie())
253 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" +
254 thisFriendFiles[fileidx] + "\"");
255 // thisFriendChainSubNames[fileidx] stores the name of the current
256 // subtree in the TChain stored in the current file.
257 TTree *curtree = curfile->Get<TTree>(thisFriendChainSubNames[fileidx].c_str());
258 if (!curtree)
259 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
260 thisFriendChainSubNames[fileidx] + "\" from file \"" +
261 thisFriendFiles[fileidx] + "\"");
262 nEntries.emplace_back(curtree->GetEntries());
263 }
264 // Otherwise, if there are no sub names for the current friend, it means
265 // it's a TTree. We can safely use `thisFriendName` as the name of the tree
266 // to retrieve from the file in `thisFriendFiles`
267 } else {
268 for (const auto &fname : thisFriendFiles) {
269 std::unique_ptr<TFile> f(TFile::Open(fname.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
270 if (!f || f->IsZombie())
271 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" + fname + "\"");
272 TTree *t = f->Get<TTree>(thisFriendName.c_str());
273 if (!t)
274 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
275 thisFriendName + "\" from file \"" + fname + "\"");
276 nEntries.emplace_back(t->GetEntries());
277 }
278 }
279 // Store the vector with entries for each file in the current tree/chain.
280 friendEntries.emplace_back(std::move(nEntries));
281 }
282
283 return friendEntries;
284}
285
286} // anonymous namespace
287
288namespace ROOT {
289
291
292namespace Internal {
293
294////////////////////////////////////////////////////////////////////////////////
295/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
296/// \param[in] treeNames Name of the tree for each file in `fileNames`.
297/// \param[in] fileNames Files to be opened.
298/// \param[in] friendInfo Information about TTree friends, if any.
299/// \param[in] nEntries Number of entries to be processed.
300/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
301void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
302 const TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
303 const std::vector<std::vector<Long64_t>> &friendEntries)
304{
305
306 const auto &friendNames = friendInfo.fFriendNames;
307 const auto &friendFileNames = friendInfo.fFriendFileNames;
308 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
309
311 const auto nFiles = fileNames.size();
312 for (auto i = 0u; i < nFiles; ++i) {
313 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
314 }
315 fChain->ResetBit(TObject::kMustCleanup);
317
318 fFriends.clear();
319 const auto nFriends = friendNames.size();
320 for (auto i = 0u; i < nFriends; ++i) {
321 const auto &thisFriendNameAlias = friendNames[i];
322 const auto &thisFriendName = thisFriendNameAlias.first;
323 const auto &thisFriendAlias = thisFriendNameAlias.second;
324 const auto &thisFriendFiles = friendFileNames[i];
325 const auto &thisFriendChainSubNames = friendChainSubNames[i];
326 const auto &thisFriendEntries = friendEntries[i];
327
328 // Build a friend chain
329 auto frChain = std::make_unique<TChain>(thisFriendName.c_str(), "", TChain::kWithoutGlobalRegistration);
330 const auto nFileNames = friendFileNames[i].size();
331 if (thisFriendChainSubNames.empty()) {
332 // If there are no chain subnames, the friend was a TTree. It's safe
333 // to add to the chain the filename directly.
334 for (auto j = 0u; j < nFileNames; ++j) {
335 frChain->Add(thisFriendFiles[j].c_str(), thisFriendEntries[j]);
336 }
337 } else {
338 // Otherwise, the new friend chain needs to be built using the nomenclature
339 // "filename/treename" as argument to `TChain::Add`
340 for (auto j = 0u; j < nFileNames; ++j) {
341 frChain->Add((thisFriendFiles[j] + "?#" + thisFriendChainSubNames[j]).c_str(), thisFriendEntries[j]);
342 }
343 }
344
345 // Make it friends with the main chain
346 fChain->AddFriend(frChain.get(), thisFriendAlias.c_str());
347 fFriends.emplace_back(std::move(frChain));
348 }
349}
350
351//////////////////////////////////////////////////////////////////////////
352/// Get a TTreeReader for the current tree of this view.
353std::unique_ptr<TTreeReader>
354TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
355 const std::vector<std::string> &fileNames, const TreeUtils::RFriendInfo &friendInfo,
356 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
357 const std::vector<std::vector<Long64_t>> &friendEntries)
358{
359 const bool hasEntryList = entryList.GetN() > 0;
360 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
361 const bool needNewChain =
362 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
363 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
364 if (needNewChain) {
365 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
366 if (hasEntryList) {
367 fEntryList.reset(new TEntryList(entryList));
368 if (fEntryList->GetLists() != nullptr) {
369 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
370 // sub-lists of the former...
371 fChain->SetEntryList(fEntryList.get());
372 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
373 }
374 }
375 }
376 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
377 reader->SetEntriesRange(start, end);
378 return reader;
379}
380
381////////////////////////////////////////////////////////////////////////
382/// Clear the resources
384{
385 fChain.reset();
386 fEntryList.reset();
387 fFriends.clear();
388}
389
390} // namespace Internal
391} // namespace ROOT
392
393/////////////////////////////////////////////////////////////////////////////////////////////////
394/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
395std::vector<std::string> TTreeProcessorMT::FindTreeNames()
396{
397 std::vector<std::string> treeNames;
398
399 if (fFileNames.empty()) // This can never happen
400 throw std::runtime_error("Empty list of files and no tree name provided");
401
403 for (const auto &fname : fFileNames) {
404 std::string treeName;
405 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
406 TIter next(f->GetListOfKeys());
407 while (auto *key = static_cast<TKey *>(next())) {
408 const char *className = key->GetClassName();
409 if (strcmp(className, "TTree") == 0) {
410 treeName = key->GetName();
411 break;
412 }
413 }
414 if (treeName.empty())
415 throw std::runtime_error("Cannot find any tree in file " + fname);
416 treeNames.emplace_back(std::move(treeName));
417 }
418
419 return treeNames;
420}
421
422////////////////////////////////////////////////////////////////////////
423/// Constructor based on a file name.
424/// \param[in] filename Name of the file containing the tree to process.
425/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
426/// for a TTree key in the file and will use the first one it finds.
427/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
428/// the same as for TThreadExecutor.
430 : fFileNames({std::string(filename)}),
431 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
432 fPool(nThreads)
433{
435}
436
437std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
438{
439 if (views.empty())
440 throw std::runtime_error("The provided list of file names is empty");
441
442 std::vector<std::string> strings;
443 strings.reserve(views.size());
444 for (const auto &v : views)
445 strings.emplace_back(v);
446 return strings;
447}
448
449////////////////////////////////////////////////////////////////////////
450/// Constructor based on a collection of file names.
451/// \param[in] filenames Collection of the names of the files containing the tree to process.
452/// \param[in] treename Name of the tree to process. If not provided, the implementation will
453/// search filenames for a TTree key and will use the first one it finds in each file.
454/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
455/// the same as for TThreadExecutor.
456///
457/// If different files contain TTrees with different names and automatic TTree name detection is not an option
458/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
459/// it to the appropriate TTreeProcessorMT constructor.
460TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
461 UInt_t nThreads)
462 : fFileNames(CheckAndConvert(filenames)),
463 fTreeNames(treename.empty() ? FindTreeNames()
464 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
465 fFriendInfo(), fPool(nThreads)
466{
468}
469
470////////////////////////////////////////////////////////////////////////
471/// Constructor based on a TTree and a TEntryList.
472/// \param[in] tree Tree or chain of files containing the tree to process.
473/// \param[in] entries List of entry numbers to process.
474/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
475/// the same as for TThreadExecutor.
477 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
478 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)), fEntryList(entries),
479 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree)), fPool(nThreads)
480{
482}
483
484////////////////////////////////////////////////////////////////////////
485/// Constructor based on a TTree.
486/// \param[in] tree Tree or chain of files containing the tree to process.
487/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
488/// the same as for TThreadExecutor.
490
491//////////////////////////////////////////////////////////////////////////////
492/// Process the entries of a TTree in parallel. The user-provided function
493/// receives a TTreeReader which can be used to iterate on a subrange of
494/// entries
495/// ~~~{.cpp}
496/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
497/// // Select branches to read
498/// while (readerSubRange.Next()) {
499/// // Use content of current entry
500/// }
501/// });
502/// ~~~
503/// The user needs to be aware that each of the subranges can potentially
504/// be processed in parallel. This means that the code of the user function
505/// should be thread safe.
506///
507/// \param[in] func User-defined function that processes a subrange of entries
509{
510 // compute number of tasks per file
511 const unsigned int maxTasksPerFile =
512 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
513
514 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
515 // so we do it here for all files.
516 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
517 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
518 // sub-entrylists.
519 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
520 const bool hasEntryList = fEntryList.GetN() > 0;
521 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
522 ClustersAndEntries allClusterAndEntries{};
523 auto &allClusters = allClusterAndEntries.first;
524 const auto &allEntries = allClusterAndEntries.second;
525 if (shouldRetrieveAllClusters) {
526 allClusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile);
527 if (hasEntryList)
528 allClusters = ConvertToElistClusters(std::move(allClusters), fEntryList, fTreeNames, fFileNames, allEntries);
529 }
530
531 // Per-file processing in case we retrieved all cluster info upfront
532 auto processFileUsingGlobalClusters = [&](std::size_t fileIdx) {
533 auto processCluster = [&](const EntryCluster &c) {
534 auto r = fTreeView->GetTreeReader(c.start, c.end, fTreeNames, fFileNames, fFriendInfo, fEntryList, allEntries,
535 GetFriendEntries(fFriendInfo));
536 func(*r);
537 };
538 fPool.Foreach(processCluster, allClusters[fileIdx]);
539 };
540
541 // Per-file processing that also retrieves cluster info for a file
542 auto processFileRetrievingClusters = [&](std::size_t fileIdx) {
543 // Evaluate clusters (with local entry numbers) and number of entries for this file
544 const auto &treeNames = std::vector<std::string>({fTreeNames[fileIdx]});
545 const auto &fileNames = std::vector<std::string>({fFileNames[fileIdx]});
546 const auto clustersAndEntries = MakeClusters(treeNames, fileNames, maxTasksPerFile);
547 const auto &clusters = clustersAndEntries.first[0];
548 const auto &entries = clustersAndEntries.second[0];
549 auto processCluster = [&](const EntryCluster &c) {
550 auto r = fTreeView->GetTreeReader(c.start, c.end, treeNames, fileNames, fFriendInfo, fEntryList, {entries},
551 std::vector<std::vector<Long64_t>>{});
552 func(*r);
553 };
554 fPool.Foreach(processCluster, clusters);
555 };
556
557 std::vector<std::size_t> fileIdxs(fFileNames.size());
558 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
559
560 if (shouldRetrieveAllClusters)
561 fPool.Foreach(processFileUsingGlobalClusters, fileIdxs);
562 else
563 fPool.Foreach(processFileRetrievingClusters, fileIdxs);
564
565 // make sure TChains and TFiles are cleaned up since they are not globally tracked
566 for (unsigned int islot = 0; islot < fTreeView.GetNSlots(); ++islot) {
567 ROOT::Internal::TTreeView *view = fTreeView.GetAtSlotRaw(islot);
568 if (view != nullptr) {
569 view->Reset();
570 }
571 }
572}
573
574////////////////////////////////////////////////////////////////////////
575/// \brief Retrieve the current value for the desired number of tasks per worker.
576/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
578{
580}
581
582////////////////////////////////////////////////////////////////////////
583/// \brief Set the hint for the desired number of tasks created per worker.
584/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
585///
586/// This allows to create a reasonable number of tasks even if any of the
587/// processed files features a bad clustering, for example with a lot of
588/// entries and just a few entries per cluster, or to limit the number of
589/// tasks spawned when a very large number of files and workers is used.
590void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
591{
592 fgTasksPerWorkerHint = tasksPerWorkerHint;
593}
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition: RtypesCore.h:80
#define gDirectory
Definition: TDirectory.h:348
#define R__ASSERT(e)
Definition: TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
@ kMustCleanup
Definition: TObject.h:355
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Get a TTreeReader for the current tree of this view.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
A class to process the entries of a TTree in parallel.
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
const Internal::TreeUtils::RFriendInfo fFriendInfo
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u)
Constructor based on a file name.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
@ kWithoutGlobalRegistration
Definition: TChain.h:70
TDirectory::TContext keeps track and restore the current directory.
Definition: TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual TList * GetLists() const
Definition: TEntryList.h:76
virtual Long64_t GetEntry(Int_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
Definition: TEntryList.cxx:753
virtual Long64_t GetN() const
Definition: TEntryList.h:78
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:4019
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:28
@ kCanDelete
if object in a list can be deleted
Definition: TObject.h:58
@ kMustCleanup
if object destructor must call RecursiveRemove()
Definition: TObject.h:60
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:44
A TTree represents a columnar dataset.
Definition: TTree.h:79
virtual Long64_t GetEntries() const
Definition: TTree.h:459
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
Retrieve the full path(s) to a TTree or the trees in a TChain.
RFriendInfo GetFriendInfo(const TTree &tree)
Get and store the names, aliases and file names of the direct friends of the tree.
std::vector< std::string > GetFileNamesFromTree(const TTree &tree)
Get and store the file names associated with the input tree.
RVec< PromoteType< T > > ceil(const RVec< T > &v)
Definition: RVec.hxx:1774
basic_string_view< char > string_view
void ClearMustCleanupBits(TObjArray &arr)
Reset the kMustCleanup bit of a TObjArray of TBranch objects (e.g.
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:167
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition: TROOT.cxx:493
Definition: tree.py:1
Information about friend trees of a certain TTree or TChain object.
std::vector< NameAlias > fFriendNames
Pairs of names and aliases of friend trees/chains.
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
std::vector< std::vector< std::string > > fFriendChainSubNames
Names of the subtrees of a friend TChain.