Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include <memory>
28
29#include "TROOT.h"
31
32using namespace ROOT;
33
34namespace {
35
36using EntryRange = std::pair<Long64_t, Long64_t>;
37
38// note that this routine assumes global entry numbers
39bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryRange>> &cls)
40{
41 Long64_t last_end = 0ll;
42 for (const auto &fcl : cls) {
43 for (const auto &c : fcl) {
44 if (last_end != c.first)
45 return false;
46 last_end = c.second;
47 }
48 }
49 return true;
50}
51
52/// Take a vector of vectors of EntryRanges (a vector per file), filter the entries according to entryList, and
53/// and return a new vector of vectors of EntryRanges where cluster start/end entry numbers have been converted to
54/// TEntryList-local entry numbers.
55///
56/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
57/// ascending order, i.e., for n > m:
58/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
59std::vector<std::vector<EntryRange>>
60ConvertToElistClusters(std::vector<std::vector<EntryRange>> &&clusters, TEntryList &entryList,
61 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
62 const std::vector<Long64_t> &entriesPerFile)
63{
64 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
65 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
66
67 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
68 const auto nFiles = clusters.size();
69
70 std::unique_ptr<TChain> chain;
71 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
72 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
73 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
74 NextFn_t Next;
75 if (listHasGlobalEntryNumbers) {
76 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
77 ++elEntry;
78 return elist.Next();
79 };
80 } else {
81 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
83 for (auto i = 0u; i < nFiles; ++i)
84 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
85 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
86 ++elEntry;
87 int treenum = -1;
88 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
89 if (localEntry == -1ll)
90 return localEntry;
91 return localEntry + ch->GetTreeOffset()[treenum];
92 };
93 }
94
95 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
96 // so we can be sure TEntryList::Next will return the correct thing
97 Long64_t elistEntry = 0ll;
98 Long64_t entry = entryList.GetEntry(elistEntry);
99
100 std::vector<std::vector<EntryRange>> elistClusters;
101
102 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
103 std::vector<EntryRange> elistClustersForFile;
104 for (const auto &c : clusters[fileN]) {
105 if (entry >= c.second || entry == -1ll) // no entrylist entries in this cluster
106 continue;
107 R__ASSERT(entry >= c.first); // current entry should never come before the cluster we are looking at
108 const Long64_t elistRangeStart = elistEntry;
109 // advance entry list until the entrylist entry goes beyond the end of the cluster
110 while (entry < c.second && entry != -1ll)
111 entry = Next(elistEntry, entryList, chain.get());
112 elistClustersForFile.emplace_back(EntryRange{elistRangeStart, elistEntry});
113 }
114 elistClusters.emplace_back(std::move(elistClustersForFile));
115 }
116
117 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
118 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
119
120 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
121 return elistClusters;
122}
123
124// EntryRanges and number of entries per file
125using ClustersAndEntries = std::pair<std::vector<std::vector<EntryRange>>, std::vector<Long64_t>>;
126
127////////////////////////////////////////////////////////////////////////
128/// Return a vector of cluster boundaries for the given tree and files.
129ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
130 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile,
131 const EntryRange &range = {0, std::numeric_limits<Long64_t>::max()})
132{
133 // Note that as a side-effect of opening all files that are going to be used in the
134 // analysis once, all necessary streamers will be loaded into memory.
136 const auto nFileNames = fileNames.size();
137 std::vector<std::vector<EntryRange>> clustersPerFile;
138 std::vector<Long64_t> entriesPerFile;
139 entriesPerFile.reserve(nFileNames);
140 Long64_t offset = 0ll;
141 bool rangeEndReached = false; // flag to break the outer loop
142 for (auto i = 0u; i < nFileNames && !rangeEndReached; ++i) {
143 const auto &fileName = fileNames[i];
144 const auto &treeName = treeNames[i];
145
146 std::unique_ptr<TFile> f(TFile::Open(
147 fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")); // need TFile::Open to load plugins if need be
148 if (!f || f->IsZombie()) {
149 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
150 throw std::runtime_error(msg);
151 }
152 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
153
154 if (!t) {
155 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
156 "\" from file \"" + fileName + "\"";
157 throw std::runtime_error(msg);
158 }
159
160 // Avoid calling TROOT::RecursiveRemove for this tree, it takes the read lock and we don't need it.
161 t->ResetBit(kMustCleanup);
163 auto clusterIter = t->GetClusterIterator(0);
164 Long64_t clusterStart = 0ll, clusterEnd = 0ll;
165 const Long64_t entries = t->GetEntries();
166 // Iterate over the clusters in the current file
167 std::vector<EntryRange> entryRanges;
168 while ((clusterStart = clusterIter()) < entries && !rangeEndReached) {
169 clusterEnd = clusterIter.GetNextEntry();
170 // Currently, if a user specified a range, the clusters will be only globally obtained
171 // Assume that there are 3 files with entries: [0, 100], [0, 150], [0, 200] (in this order)
172 // Since the cluster boundaries are obtained sequentially, applying the offsets, the boundaries
173 // would be: 0, 100, 250, 450. Now assume that the user provided the range (150, 300)
174 // Then, in the first iteration, nothing is going to be added to entryRanges since:
175 // std::max(0, 150) < std::min(100, max). Then, by the same logic only a subset of the second
176 // tree is added, i.e.: currentStart is now 200 and currentEnd is 250 (locally from 100 to 150).
177 // Lastly, the last tree would take entries from 250 to 300 (or from 0 to 50 locally).
178 // The current file's offset to start and end is added to make them (chain) global
179 const auto currentStart = std::max(clusterStart + offset, range.first);
180 const auto currentEnd = std::min(clusterEnd + offset, range.second);
181 // This is not satified if the desired start is larger than the last entry of some cluster
182 // In this case, this cluster is not going to be processes further
183 if (currentStart < currentEnd)
184 entryRanges.emplace_back(EntryRange{currentStart, currentEnd});
185 if (currentEnd == range.second) // if the desired end is reached, stop reading further
186 rangeEndReached = true;
187 }
188 offset += entries; // consistently keep track of the total number of entries
189 clustersPerFile.emplace_back(std::move(entryRanges));
190 // Keep track of the entries, even if their corresponding tree is out of the range, e.g. entryRanges is empty
191 entriesPerFile.emplace_back(entries);
192 }
193 if (range.first >= offset && offset > 0) // do not error out on an empty tree
194 throw std::logic_error(std::string("A range of entries was passed in the creation of the TTreeProcessorMT, ") +
195 "but the starting entry (" + range.first + ") is larger than the total number of " +
196 "entries (" + offset + ") in the dataset.");
197
198 // Here we "fuse" clusters together if the number of clusters is too big with respect to
199 // the number of slots, otherwise we can incur in an overhead which is big enough
200 // to make parallelisation detrimental to performance.
201 // For example, this is the case when, following a merging of many small files, a file
202 // contains a tree with many entries and with clusters of just a few entries each.
203 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
204 // of files (e.g. 1000 files): the large amount of files might result in a large amount
205 // of tasks, but the elevated concurrency level makes the little synchronization required by
206 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
207 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
208 //
209 // The criterion according to which we fuse clusters together is to have around
210 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
211 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
212
213 std::vector<std::vector<EntryRange>> eventRangesPerFile(clustersPerFile.size());
214 auto clustersPerFileIt = clustersPerFile.begin();
215 auto eventRangesPerFileIt = eventRangesPerFile.begin();
216 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
217 const auto clustersInThisFileSize = clustersPerFileIt->size();
218 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
219 // If the number of clusters is less than maxTasksPerFile
220 // we take the clusters as they are
221 if (nFolds == 0) {
222 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
223 continue;
224 }
225 // Otherwise, we have to merge clusters, distributing the reminder evenly
226 // onto the first clusters
227 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
228 const auto &clustersInThisFile = *clustersPerFileIt;
229 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
230 const auto start = clustersInThisFile[i].first;
231 // We lump together at least nFolds clusters, therefore
232 // we need to jump ahead of nFolds-1.
233 i += (nFolds - 1);
234 // We now add a cluster if we have some reminder left
235 if (nReminderClusters > 0) {
236 i += 1U;
237 nReminderClusters--;
238 }
239 const auto end = clustersInThisFile[i].second;
240 eventRangesPerFileIt->emplace_back(EntryRange({start, end}));
241 }
242 }
243
244 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
245}
246
247} // anonymous namespace
248
249namespace ROOT {
250
252
253namespace Internal {
254
255////////////////////////////////////////////////////////////////////////////////
256/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
257/// \param[in] treeNames Name of the tree for each file in `fileNames`.
258/// \param[in] fileNames Files to be opened.
259/// \param[in] friendInfo Information about TTree friends, if any.
260/// \param[in] nEntries Number of entries to be processed.
261/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
262void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
263 const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries)
264{
266 // Because of the range, we might have stopped reading entries earlier,
267 // hence the size of nEntries can be smaller than the number of all files
268 // TODO: pass "firstFileToProcess" index in case of a range,
269 // and do not add files to the chain, which are before the desired start entry of the range
270 const auto nFilesToProcess = nEntries.size();
271 for (auto i = 0u; i < nFilesToProcess; ++i) {
272 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
273 }
275
277 const auto nFriends = friendInfo.fFriendNames.size();
278 R__ASSERT(nFriends == fFriends.size() && "Created the wrong number of friends from the available information.");
279 for (std::size_t i = 0ul; i < nFriends; i++) {
280 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
281 fChain->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
282 }
283}
284
285//////////////////////////////////////////////////////////////////////////
286/// Get a TTreeReader for the current tree of this view.
287std::unique_ptr<TTreeReader>
288TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
289 const std::vector<std::string> &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo,
290 const TEntryList &entryList, const std::vector<Long64_t> &nEntries)
291{
292 const bool hasEntryList = entryList.GetN() > 0;
293 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
294 const bool needNewChain =
295 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
296 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
297 if (needNewChain) {
298 MakeChain(treeNames, fileNames, friendInfo, nEntries);
299 if (hasEntryList) {
300 fEntryList = std::make_unique<TEntryList>(entryList);
301 if (fEntryList->GetLists() != nullptr) {
302 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
303 // sub-lists of the former...
304 fChain->SetEntryList(fEntryList.get());
305 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
306 }
307 }
308 }
309 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
310 reader->SetEntriesRange(start, end);
311 return reader;
312}
313
314////////////////////////////////////////////////////////////////////////
315/// Clear the resources
317{
318 fChain.reset();
319 fEntryList.reset();
320 fFriends.clear();
321}
322
323} // namespace Internal
324} // namespace ROOT
325
326/////////////////////////////////////////////////////////////////////////////////////////////////
327/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
328std::vector<std::string> TTreeProcessorMT::FindTreeNames()
329{
330 std::vector<std::string> treeNames;
331
332 if (fFileNames.empty()) // This can never happen
333 throw std::runtime_error("Empty list of files and no tree name provided");
334
336 for (const auto &fname : fFileNames) {
337 std::string treeName;
338 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
339 TIter next(f->GetListOfKeys());
340 while (auto *key = static_cast<TKey *>(next())) {
341 const char *className = key->GetClassName();
342 if (strcmp(className, "TTree") == 0) {
343 treeName = key->GetName();
344 break;
345 }
346 }
347 if (treeName.empty())
348 throw std::runtime_error("Cannot find any tree in file " + fname);
349 treeNames.emplace_back(std::move(treeName));
350 }
351
352 return treeNames;
353}
354
355////////////////////////////////////////////////////////////////////////
356/// Constructor based on a file name.
357/// \param[in] filename Name of the file containing the tree to process.
358/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
359/// for a TTree key in the file and will use the first one it finds.
360/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
361/// the same as for TThreadExecutor.
362/// \param[in] /// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
363TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename, UInt_t nThreads,
364 const EntryRange &globalRange)
365 : fFileNames({std::string(filename)}),
366 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
367 fPool(nThreads), fGlobalRange(globalRange)
368{
370}
371
372std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
373{
374 if (views.empty())
375 throw std::runtime_error("The provided list of file names is empty");
376
377 std::vector<std::string> strings;
378 strings.reserve(views.size());
379 for (const auto &v : views)
380 strings.emplace_back(v);
381 return strings;
382}
383
384////////////////////////////////////////////////////////////////////////
385/// Constructor based on a collection of file names.
386/// \param[in] filenames Collection of the names of the files containing the tree to process.
387/// \param[in] treename Name of the tree to process. If not provided, the implementation will
388/// search filenames for a TTree key and will use the first one it finds in each file.
389/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
390/// the same as for TThreadExecutor.
391/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
392///
393/// If different files contain TTrees with different names and automatic TTree name detection is not an option
394/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
395/// it to the appropriate TTreeProcessorMT constructor.
396TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
397 UInt_t nThreads, const EntryRange &globalRange)
398 : fFileNames(CheckAndConvert(filenames)),
399 fTreeNames(treename.empty() ? FindTreeNames()
400 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
401 fFriendInfo(), fPool(nThreads), fGlobalRange(globalRange)
402{
404}
405
406////////////////////////////////////////////////////////////////////////
407/// Constructor based on a TTree and a TEntryList.
408/// \param[in] tree Tree or chain of files containing the tree to process.
409/// \param[in] entries List of entry numbers to process.
410/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
411/// the same as for TThreadExecutor.
413 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
414 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)),
415 fEntryList(entries),
416 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree, /*retrieveEntries*/ true)),
417 fPool(nThreads)
418{
420}
421
422////////////////////////////////////////////////////////////////////////
423/// Constructor based on a TTree.
424/// \param[in] tree Tree or chain of files containing the tree to process.
425/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
426/// the same as for TThreadExecutor.
427/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
428TTreeProcessorMT::TTreeProcessorMT(TTree &tree, UInt_t nThreads, const EntryRange &globalRange)
429 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
430 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)),
431 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree, /*retrieveEntries*/ true)),
432 fPool(nThreads),
433 fGlobalRange(globalRange)
434{
435}
436
437//////////////////////////////////////////////////////////////////////////////
438/// Process the entries of a TTree in parallel. The user-provided function
439/// receives a TTreeReader which can be used to iterate on a subrange of
440/// entries
441/// ~~~{.cpp}
442/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
443/// // Select branches to read
444/// while (readerSubRange.Next()) {
445/// // Use content of current entry
446/// }
447/// });
448/// ~~~
449/// The user needs to be aware that each of the subranges can potentially
450/// be processed in parallel. This means that the code of the user function
451/// should be thread safe.
452///
453/// \param[in] func User-defined function that processes a subrange of entries
454void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
455{
456 // compute number of tasks per file
457 const unsigned int maxTasksPerFile =
458 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
459
460 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
461 // so we do it here for all files.
462 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
463 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
464 // sub-entrylists.
465 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
466 const bool hasEntryList = fEntryList.GetN() > 0;
467 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList || fGlobalRange.first > 0 ||
468 fGlobalRange.second != std::numeric_limits<Long64_t>::max();
469 ClustersAndEntries allClusterAndEntries{};
470 auto &allClusters = allClusterAndEntries.first;
471 const auto &allEntries = allClusterAndEntries.second;
472 if (shouldRetrieveAllClusters) {
473 allClusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile, fGlobalRange);
474 if (hasEntryList)
475 allClusters = ConvertToElistClusters(std::move(allClusters), fEntryList, fTreeNames, fFileNames, allEntries);
476 }
477
478 // Per-file processing in case we retrieved all cluster info upfront
479 auto processFileUsingGlobalClusters = [&](std::size_t fileIdx) {
480 auto processCluster = [&](const EntryRange &c) {
481 auto r =
482 fTreeView->GetTreeReader(c.first, c.second, fTreeNames, fFileNames, fFriendInfo, fEntryList, allEntries);
483 func(*r);
484 };
485 fPool.Foreach(processCluster, allClusters[fileIdx]);
486 };
487
488 // Per-file processing that also retrieves cluster info for a file
489 auto processFileRetrievingClusters = [&](std::size_t fileIdx) {
490 // Evaluate clusters (with local entry numbers) and number of entries for this file
491 const auto &treeNames = std::vector<std::string>({fTreeNames[fileIdx]});
492 const auto &fileNames = std::vector<std::string>({fFileNames[fileIdx]});
493 const auto clustersAndEntries = MakeClusters(treeNames, fileNames, maxTasksPerFile);
494 const auto &clusters = clustersAndEntries.first[0];
495 const auto &entries = clustersAndEntries.second[0];
496 auto processCluster = [&](const EntryRange &c) {
497 auto r = fTreeView->GetTreeReader(c.first, c.second, treeNames, fileNames, fFriendInfo, fEntryList, {entries});
498 func(*r);
499 };
500 fPool.Foreach(processCluster, clusters);
501 };
502
503 const auto firstNonEmpty =
504 fGlobalRange.first > 0u ? std::distance(allClusters.begin(), std::find_if(allClusters.begin(), allClusters.end(),
505 [](auto &c) { return !c.empty(); }))
506 : 0u;
507
508 std::vector<std::size_t> fileIdxs(allEntries.empty() ? fFileNames.size() : allEntries.size() - firstNonEmpty);
509 std::iota(fileIdxs.begin(), fileIdxs.end(), firstNonEmpty);
510
511 if (shouldRetrieveAllClusters)
512 fPool.Foreach(processFileUsingGlobalClusters, fileIdxs);
513 else
514 fPool.Foreach(processFileRetrievingClusters, fileIdxs);
515
516 // make sure TChains and TFiles are cleaned up since they are not globally tracked
517 for (unsigned int islot = 0; islot < fTreeView.GetNSlots(); ++islot) {
518 ROOT::Internal::TTreeView *view = fTreeView.GetAtSlotRaw(islot);
519 if (view != nullptr) {
520 view->Reset();
521 }
522 }
523}
524
525////////////////////////////////////////////////////////////////////////
526/// \brief Retrieve the current value for the desired number of tasks per worker.
527/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
529{
531}
532
533////////////////////////////////////////////////////////////////////////
534/// \brief Set the hint for the desired number of tasks created per worker.
535/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
536///
537/// This allows to create a reasonable number of tasks even if any of the
538/// processed files features a bad clustering, for example with a lot of
539/// entries and just a few entries per cluster, or to limit the number of
540/// tasks spawned when a very large number of files and workers is used.
541void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
542{
543 fgTasksPerWorkerHint = tasksPerWorkerHint;
544}
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:80
#define gDirectory
Definition TDirectory.h:384
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
@ kMustCleanup
Definition TObject.h:368
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries)
Get a TTreeReader for the current tree of this view.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
ROOT::TreeUtils::RFriendInfo fFriendInfo
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u, const std::pair< Long64_t, Long64_t > &globalRange={0, std::numeric_limits< Long64_t >::max()})
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
std::pair< Long64_t, Long64_t > fGlobalRange
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TList * GetLists() const
Definition TEntryList.h:76
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Long64_t GetN() const
Definition TEntryList.h:78
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4089
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
@ kCanDelete
if object in a list can be deleted
Definition TObject.h:62
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
void ClearMustCleanupBits(TObjArray &arr)
Reset the kMustCleanup bit of a TObjArray of TBranch objects (e.g.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Definition TROOT.cxx:501
Information about friend trees of a certain TTree or TChain object.
std::vector< std::pair< std::string, std::string > > fFriendNames
Pairs of names and aliases of each friend tree/chain.