Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include <memory>
28
29#include "TROOT.h"
31
32using namespace ROOT;
33
34namespace {
35
36using EntryRange = std::pair<Long64_t, Long64_t>;
37
38// note that this routine assumes global entry numbers
39bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryRange>> &cls)
40{
41 Long64_t last_end = 0ll;
42 for (const auto &fcl : cls) {
43 for (const auto &c : fcl) {
44 if (last_end != c.first)
45 return false;
46 last_end = c.second;
47 }
48 }
49 return true;
50}
51
52/// Take a vector of vectors of EntryRanges (a vector per file), filter the entries according to entryList, and
53/// and return a new vector of vectors of EntryRanges where cluster start/end entry numbers have been converted to
54/// TEntryList-local entry numbers.
55///
56/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
57/// ascending order, i.e., for n > m:
58/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
59std::vector<std::vector<EntryRange>>
60ConvertToElistClusters(std::vector<std::vector<EntryRange>> &&clusters, TEntryList &entryList,
61 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
62 const std::vector<Long64_t> &entriesPerFile)
63{
64 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
65 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
66
67 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
68 const auto nFiles = clusters.size();
69
70 std::unique_ptr<TChain> chain;
71 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
72 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
73 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
74 NextFn_t Next;
75 if (listHasGlobalEntryNumbers) {
76 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
77 ++elEntry;
78 return elist.Next();
79 };
80 } else {
81 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
83 for (auto i = 0u; i < nFiles; ++i)
84 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
85 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
86 ++elEntry;
87 int treenum = -1;
88 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
89 if (localEntry == -1ll)
90 return localEntry;
91 return localEntry + ch->GetTreeOffset()[treenum];
92 };
93 }
94
95 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
96 // so we can be sure TEntryList::Next will return the correct thing
97 Long64_t elistEntry = 0ll;
98 Long64_t entry = entryList.GetEntry(elistEntry);
99
100 std::vector<std::vector<EntryRange>> elistClusters;
101
102 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
103 std::vector<EntryRange> elistClustersForFile;
104 for (const auto &c : clusters[fileN]) {
105 if (entry >= c.second || entry == -1ll) // no entrylist entries in this cluster
106 continue;
107 R__ASSERT(entry >= c.first); // current entry should never come before the cluster we are looking at
108 const Long64_t elistRangeStart = elistEntry;
109 // advance entry list until the entrylist entry goes beyond the end of the cluster
110 while (entry < c.second && entry != -1ll)
111 entry = Next(elistEntry, entryList, chain.get());
112 elistClustersForFile.emplace_back(EntryRange{elistRangeStart, elistEntry});
113 }
114 elistClusters.emplace_back(std::move(elistClustersForFile));
115 }
116
117 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
118 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
119
120 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
121 return elistClusters;
122}
123
124// EntryRanges and number of entries per file
125using ClustersAndEntries = std::pair<std::vector<std::vector<EntryRange>>, std::vector<Long64_t>>;
126
127////////////////////////////////////////////////////////////////////////
128/// Return a vector of cluster boundaries for the given tree and files.
129ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
130 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile,
131 const EntryRange &range = {0, std::numeric_limits<Long64_t>::max()})
132{
133 // Note that as a side-effect of opening all files that are going to be used in the
134 // analysis once, all necessary streamers will be loaded into memory.
136 const auto nFileNames = fileNames.size();
137 std::vector<std::vector<EntryRange>> clustersPerFile;
138 std::vector<Long64_t> entriesPerFile;
139 entriesPerFile.reserve(nFileNames);
140 Long64_t offset = 0ll;
141 bool rangeEndReached = false; // flag to break the outer loop
142 for (auto i = 0u; i < nFileNames && !rangeEndReached; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
145
146 std::unique_ptr<TFile> f(TFile::Open(
147 fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")); // need TFile::Open to load plugins if need be
148 if (!f || f->IsZombie()) {
149 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
150 throw std::runtime_error(msg);
151 }
152 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
153
154 if (!t) {
155 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
156 "\" from file \"" + fileName + "\"";
157 throw std::runtime_error(msg);
158 }
159
160 // Avoid calling TROOT::RecursiveRemove for this tree, it takes the read lock and we don't need it.
161 t->ResetBit(kMustCleanup);
163 auto clusterIter = t->GetClusterIterator(0);
164 Long64_t clusterStart = 0ll, clusterEnd = 0ll;
165 const Long64_t entries = t->GetEntries();
166 // Iterate over the clusters in the current file
167 std::vector<EntryRange> entryRanges;
168 while ((clusterStart = clusterIter()) < entries && !rangeEndReached) {
169 clusterEnd = clusterIter.GetNextEntry();
170 // Currently, if a user specified a range, the clusters will be only globally obtained
171 // Assume that there are 3 files with entries: [0, 100], [0, 150], [0, 200] (in this order)
172 // Since the cluster boundaries are obtained sequentially, applying the offsets, the boundaries
173 // would be: 0, 100, 250, 450. Now assume that the user provided the range (150, 300)
174 // Then, in the first iteration, nothing is going to be added to entryRanges since:
175 // std::max(0, 150) < std::min(100, max). Then, by the same logic only a subset of the second
176 // tree is added, i.e.: currentStart is now 200 and currentEnd is 250 (locally from 100 to 150).
177 // Lastly, the last tree would take entries from 250 to 300 (or from 0 to 50 locally).
178 // The current file's offset to start and end is added to make them (chain) global
179 const auto currentStart = std::max(clusterStart + offset, range.first);
180 const auto currentEnd = std::min(clusterEnd + offset, range.second);
181 // This is not satified if the desired start is larger than the last entry of some cluster
182 // In this case, this cluster is not going to be processes further
183 if (currentStart < currentEnd)
184 entryRanges.emplace_back(EntryRange{currentStart, currentEnd});
185 if (currentEnd == range.second) // if the desired end is reached, stop reading further
186 rangeEndReached = true;
187 }
188 offset += entries; // consistently keep track of the total number of entries
189 clustersPerFile.emplace_back(std::move(entryRanges));
190 // Keep track of the entries, even if their corresponding tree is out of the range, e.g. entryRanges is empty
191 entriesPerFile.emplace_back(entries);
192 }
193 if (range.first >= offset && offset > 0) // do not error out on an empty tree
194 throw std::logic_error(std::string("A range of entries was passed in the creation of the TTreeProcessorMT, ") +
195 "but the starting entry (" + range.first + ") is larger than the total number of " +
196 "entries (" + offset + ") in the dataset.");
197
198 // Here we "fuse" clusters together if the number of clusters is too big with respect to
199 // the number of slots, otherwise we can incur in an overhead which is big enough
200 // to make parallelisation detrimental to performance.
201 // For example, this is the case when, following a merging of many small files, a file
202 // contains a tree with many entries and with clusters of just a few entries each.
203 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
204 // of files (e.g. 1000 files): the large amount of files might result in a large amount
205 // of tasks, but the elevated concurrency level makes the little synchronization required by
206 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
207 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
208 //
209 // The criterion according to which we fuse clusters together is to have around
210 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
211 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
212
213 std::vector<std::vector<EntryRange>> eventRangesPerFile(clustersPerFile.size());
214 auto clustersPerFileIt = clustersPerFile.begin();
215 auto eventRangesPerFileIt = eventRangesPerFile.begin();
216 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
217 const auto clustersInThisFileSize = clustersPerFileIt->size();
218 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
219 // If the number of clusters is less than maxTasksPerFile
220 // we take the clusters as they are
221 if (nFolds == 0) {
222 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
223 continue;
224 }
225 // Otherwise, we have to merge clusters, distributing the reminder evenly
226 // onto the first clusters
227 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
228 const auto &clustersInThisFile = *clustersPerFileIt;
229 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
230 const auto start = clustersInThisFile[i].first;
231 // We lump together at least nFolds clusters, therefore
232 // we need to jump ahead of nFolds-1.
233 i += (nFolds - 1);
234 // We now add a cluster if we have some reminder left
235 if (nReminderClusters > 0) {
236 i += 1U;
237 nReminderClusters--;
238 }
239 const auto end = clustersInThisFile[i].second;
240 eventRangesPerFileIt->emplace_back(EntryRange({start, end}));
241 }
242 }
243
244 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
245}
246
247} // anonymous namespace
248
249namespace ROOT {
250
252
253namespace Internal {
254
255////////////////////////////////////////////////////////////////////////////////
256/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
257/// \param[in] treeNames Name of the tree for each file in `fileNames`.
258/// \param[in] fileNames Files to be opened.
259/// \param[in] friendInfo Information about TTree friends, if any.
260/// \param[in] nEntries Number of entries to be processed.
261/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
262void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
263 const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries)
264{
266 // Because of the range, we might have stopped reading entries earlier,
267 // hence the size of nEntries can be smaller than the number of all files
268 // TODO: pass "firstFileToProcess" index in case of a range,
269 // and do not add files to the chain, which are before the desired start entry of the range
270 const auto nFilesToProcess = nEntries.size();
271 for (auto i = 0u; i < nFilesToProcess; ++i) {
272 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
273 }
275
277 const auto nFriends = friendInfo.fFriendNames.size();
278 R__ASSERT(nFriends == fFriends.size() && "Created the wrong number of friends from the available information.");
279 for (std::size_t i = 0ul; i < nFriends; i++) {
280 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
281 fChain->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
282 }
283}
284
285//////////////////////////////////////////////////////////////////////////
286/// Get a TTreeReader for the current tree of this view.
287std::unique_ptr<TTreeReader>
288TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
289 const std::vector<std::string> &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo,
290 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
291 const std::vector<std::string> &suppressErrorsForMissingBranches)
292{
293 const bool hasEntryList = entryList.GetN() > 0;
294 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
295 const bool needNewChain =
296 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
297 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
298 if (needNewChain) {
299 MakeChain(treeNames, fileNames, friendInfo, nEntries);
300 if (hasEntryList) {
301 fEntryList = std::make_unique<TEntryList>(entryList);
302 if (fEntryList->GetLists() != nullptr) {
303 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
304 // sub-lists of the former...
305 fChain->SetEntryList(fEntryList.get());
306 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
307 }
308 }
309 }
310 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get(), /*warnAboutLongerFriends*/ false,
311 suppressErrorsForMissingBranches);
312 reader->SetEntriesRange(start, end);
313 return reader;
314}
315
316////////////////////////////////////////////////////////////////////////
317/// Clear the resources
319{
320 fChain.reset();
321 fEntryList.reset();
322 fFriends.clear();
323}
324
325} // namespace Internal
326} // namespace ROOT
327
328/////////////////////////////////////////////////////////////////////////////////////////////////
329/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
330std::vector<std::string> TTreeProcessorMT::FindTreeNames()
331{
332 std::vector<std::string> treeNames;
333
334 if (fFileNames.empty()) // This can never happen
335 throw std::runtime_error("Empty list of files and no tree name provided");
336
338 for (const auto &fname : fFileNames) {
339 std::string treeName;
340 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
341 TIter next(f->GetListOfKeys());
342 while (auto *key = static_cast<TKey *>(next())) {
343 const char *className = key->GetClassName();
344 if (strcmp(className, "TTree") == 0) {
345 treeName = key->GetName();
346 break;
347 }
348 }
349 if (treeName.empty())
350 throw std::runtime_error("Cannot find any tree in file " + fname);
351 treeNames.emplace_back(std::move(treeName));
352 }
353
354 return treeNames;
355}
356
357////////////////////////////////////////////////////////////////////////
358/// Constructor based on a file name.
359/// \param[in] filename Name of the file containing the tree to process.
360/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
361/// for a TTree key in the file and will use the first one it finds.
362/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
363/// the same as for TThreadExecutor.
364/// \param[in] /// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
365TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename, UInt_t nThreads,
366 const EntryRange &globalRange)
367 : fFileNames({std::string(filename)}),
368 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
369 fPool(nThreads), fGlobalRange(globalRange)
370{
372}
373
374std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
375{
376 if (views.empty())
377 throw std::runtime_error("The provided list of file names is empty");
378
379 std::vector<std::string> strings;
380 strings.reserve(views.size());
381 for (const auto &v : views)
382 strings.emplace_back(v);
383 return strings;
384}
385
386////////////////////////////////////////////////////////////////////////
387/// Constructor based on a collection of file names.
388/// \param[in] filenames Collection of the names of the files containing the tree to process.
389/// \param[in] treename Name of the tree to process. If not provided, the implementation will
390/// search filenames for a TTree key and will use the first one it finds in each file.
391/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
392/// the same as for TThreadExecutor.
393/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
394///
395/// If different files contain TTrees with different names and automatic TTree name detection is not an option
396/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
397/// it to the appropriate TTreeProcessorMT constructor.
398TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
399 UInt_t nThreads, const EntryRange &globalRange)
400 : fFileNames(CheckAndConvert(filenames)),
401 fTreeNames(treename.empty() ? FindTreeNames()
402 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
403 fFriendInfo(), fPool(nThreads), fGlobalRange(globalRange)
404{
406}
407
408////////////////////////////////////////////////////////////////////////
409/// Constructor based on a TTree and a TEntryList.
410/// \param[in] tree Tree or chain of files containing the tree to process.
411/// \param[in] entries List of entry numbers to process.
412/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
413/// the same as for TThreadExecutor.
415 const std::vector<std::string> &suppressErrorsForMissingBranches)
416 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
417 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)),
418 fEntryList(entries),
419 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree, /*retrieveEntries*/ true)),
420 fPool(nThreads),
421 fSuppressErrorsForMissingBranches(suppressErrorsForMissingBranches)
422{
424}
425
426////////////////////////////////////////////////////////////////////////
427/// Constructor based on a TTree.
428/// \param[in] tree Tree or chain of files containing the tree to process.
429/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
430/// the same as for TThreadExecutor.
431/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
432TTreeProcessorMT::TTreeProcessorMT(TTree &tree, UInt_t nThreads, const EntryRange &globalRange,
433 const std::vector<std::string> &suppressErrorsForMissingBranches)
434 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
435 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)),
436 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree, /*retrieveEntries*/ true)),
437 fPool(nThreads),
438 fGlobalRange(globalRange),
439 fSuppressErrorsForMissingBranches(suppressErrorsForMissingBranches)
440{
441}
442
443//////////////////////////////////////////////////////////////////////////////
444/// Process the entries of a TTree in parallel. The user-provided function
445/// receives a TTreeReader which can be used to iterate on a subrange of
446/// entries
447/// ~~~{.cpp}
448/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
449/// // Select branches to read
450/// while (readerSubRange.Next()) {
451/// // Use content of current entry
452/// }
453/// });
454/// ~~~
455/// The user needs to be aware that each of the subranges can potentially
456/// be processed in parallel. This means that the code of the user function
457/// should be thread safe.
458///
459/// \param[in] func User-defined function that processes a subrange of entries
460void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
461{
462 // compute number of tasks per file
463 const unsigned int maxTasksPerFile =
464 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
465
466 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
467 // so we do it here for all files.
468 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
469 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
470 // sub-entrylists.
471 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
472 const bool hasEntryList = fEntryList.GetN() > 0;
473 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList || fGlobalRange.first > 0 ||
474 fGlobalRange.second != std::numeric_limits<Long64_t>::max();
475 ClustersAndEntries allClusterAndEntries{};
476 auto &allClusters = allClusterAndEntries.first;
477 const auto &allEntries = allClusterAndEntries.second;
478 if (shouldRetrieveAllClusters) {
479 allClusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile, fGlobalRange);
480 if (hasEntryList)
481 allClusters = ConvertToElistClusters(std::move(allClusters), fEntryList, fTreeNames, fFileNames, allEntries);
482 }
483
484 // Per-file processing in case we retrieved all cluster info upfront
485 auto processFileUsingGlobalClusters = [&](std::size_t fileIdx) {
486 auto processCluster = [&](const EntryRange &c) {
489 func(*r);
490 };
491 fPool.Foreach(processCluster, allClusters[fileIdx]);
492 };
493
494 // Per-file processing that also retrieves cluster info for a file
495 auto processFileRetrievingClusters = [&](std::size_t fileIdx) {
496 // Evaluate clusters (with local entry numbers) and number of entries for this file
497 const auto &treeNames = std::vector<std::string>({fTreeNames[fileIdx]});
498 const auto &fileNames = std::vector<std::string>({fFileNames[fileIdx]});
499 const auto clustersAndEntries = MakeClusters(treeNames, fileNames, maxTasksPerFile);
500 const auto &clusters = clustersAndEntries.first[0];
501 const auto &entries = clustersAndEntries.second[0];
502 auto processCluster = [&](const EntryRange &c) {
503 auto r = fTreeView->GetTreeReader(c.first, c.second, treeNames, fileNames, fFriendInfo, fEntryList, {entries},
505 func(*r);
506 };
507 fPool.Foreach(processCluster, clusters);
508 };
509
510 const auto firstNonEmpty =
511 fGlobalRange.first > 0u ? std::distance(allClusters.begin(), std::find_if(allClusters.begin(), allClusters.end(),
512 [](auto &c) { return !c.empty(); }))
513 : 0u;
514
515 std::vector<std::size_t> fileIdxs(allEntries.empty() ? fFileNames.size() : allEntries.size() - firstNonEmpty);
516 std::iota(fileIdxs.begin(), fileIdxs.end(), firstNonEmpty);
517
518 if (shouldRetrieveAllClusters)
519 fPool.Foreach(processFileUsingGlobalClusters, fileIdxs);
520 else
521 fPool.Foreach(processFileRetrievingClusters, fileIdxs);
522
523 // make sure TChains and TFiles are cleaned up since they are not globally tracked
524 for (unsigned int islot = 0; islot < fTreeView.GetNSlots(); ++islot) {
525 ROOT::Internal::TTreeView *view = fTreeView.GetAtSlotRaw(islot);
526 if (view != nullptr) {
527 view->Reset();
528 }
529 }
530}
531
532////////////////////////////////////////////////////////////////////////
533/// \brief Retrieve the current value for the desired number of tasks per worker.
534/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
536{
538}
539
540////////////////////////////////////////////////////////////////////////
541/// \brief Set the hint for the desired number of tasks created per worker.
542/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
543///
544/// This allows to create a reasonable number of tasks even if any of the
545/// processed files features a bad clustering, for example with a lot of
546/// entries and just a few entries per cluster, or to limit the number of
547/// tasks spawned when a very large number of files and workers is used.
548void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
549{
550 fgTasksPerWorkerHint = tasksPerWorkerHint;
551}
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:69
#define gDirectory
Definition TDirectory.h:384
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
@ kMustCleanup
Definition TObject.h:368
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::string > &suppressErrorsForMissingBranches)
Get a TTreeReader for the current tree of this view.
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
ROOT::TreeUtils::RFriendInfo fFriendInfo
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
std::vector< std::string > fSuppressErrorsForMissingBranches
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u, const std::pair< Long64_t, Long64_t > &globalRange={0, std::numeric_limits< Long64_t >::max()})
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
std::pair< Long64_t, Long64_t > fGlobalRange
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TList * GetLists() const
Definition TEntryList.h:76
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Long64_t GetN() const
Definition TEntryList.h:78
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4086
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
@ kCanDelete
if object in a list can be deleted
Definition TObject.h:62
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:46
A TTree represents a columnar dataset.
Definition TTree.h:79
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
void ClearMustCleanupBits(TObjArray &arr)
Reset the kMustCleanup bit of a TObjArray of TBranch objects (e.g.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Definition TROOT.cxx:501
Information about friend trees of a certain TTree or TChain object.
std::vector< std::pair< std::string, std::string > > fFriendNames
Pairs of names and aliases of each friend tree/chain.