Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
29
30using namespace ROOT;
31
32namespace {
33
34using EntryRange = std::pair<Long64_t, Long64_t>;
35
36// note that this routine assumes global entry numbers
37static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryRange>> &cls)
38{
39 Long64_t last_end = 0ll;
40 for (const auto &fcl : cls) {
41 for (const auto &c : fcl) {
42 if (last_end != c.first)
43 return false;
44 last_end = c.second;
45 }
46 }
47 return true;
48}
49
50/// Take a vector of vectors of EntryRanges (a vector per file), filter the entries according to entryList, and
51/// and return a new vector of vectors of EntryRanges where cluster start/end entry numbers have been converted to
52/// TEntryList-local entry numbers.
53///
54/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
55/// ascending order, i.e., for n > m:
56/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
57static std::vector<std::vector<EntryRange>>
58ConvertToElistClusters(std::vector<std::vector<EntryRange>> &&clusters, TEntryList &entryList,
59 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
60 const std::vector<Long64_t> &entriesPerFile)
61{
62 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
63 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
64
65 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
66 const auto nFiles = clusters.size();
67
68 std::unique_ptr<TChain> chain;
69 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
70 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
71 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
72 NextFn_t Next;
73 if (listHasGlobalEntryNumbers) {
74 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
75 ++elEntry;
76 return elist.Next();
77 };
78 } else {
79 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
81 for (auto i = 0u; i < nFiles; ++i)
82 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
83 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
84 ++elEntry;
85 int treenum = -1;
86 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
87 if (localEntry == -1ll)
88 return localEntry;
89 return localEntry + ch->GetTreeOffset()[treenum];
90 };
91 }
92
93 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
94 // so we can be sure TEntryList::Next will return the correct thing
95 Long64_t elistEntry = 0ll;
96 Long64_t entry = entryList.GetEntry(elistEntry);
97
98 std::vector<std::vector<EntryRange>> elistClusters;
99
100 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
101 std::vector<EntryRange> elistClustersForFile;
102 for (const auto &c : clusters[fileN]) {
103 if (entry >= c.second || entry == -1ll) // no entrylist entries in this cluster
104 continue;
105 R__ASSERT(entry >= c.first); // current entry should never come before the cluster we are looking at
106 const Long64_t elistRangeStart = elistEntry;
107 // advance entry list until the entrylist entry goes beyond the end of the cluster
108 while (entry < c.second && entry != -1ll)
109 entry = Next(elistEntry, entryList, chain.get());
110 elistClustersForFile.emplace_back(EntryRange{elistRangeStart, elistEntry});
111 }
112 elistClusters.emplace_back(std::move(elistClustersForFile));
113 }
114
115 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
116 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
117
118 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
119 return elistClusters;
120}
121
122// EntryRanges and number of entries per file
123using ClustersAndEntries = std::pair<std::vector<std::vector<EntryRange>>, std::vector<Long64_t>>;
124
125////////////////////////////////////////////////////////////////////////
126/// Return a vector of cluster boundaries for the given tree and files.
127static ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
128 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile,
129 const EntryRange &range = {0, std::numeric_limits<Long64_t>::max()})
130{
131 // Note that as a side-effect of opening all files that are going to be used in the
132 // analysis once, all necessary streamers will be loaded into memory.
134 const auto nFileNames = fileNames.size();
135 std::vector<std::vector<EntryRange>> clustersPerFile;
136 std::vector<Long64_t> entriesPerFile;
137 entriesPerFile.reserve(nFileNames);
138 Long64_t offset = 0ll;
139 bool rangeEndReached = false; // flag to break the outer loop
140 for (auto i = 0u; i < nFileNames && !rangeEndReached; ++i) {
141 const auto &fileName = fileNames[i];
142 const auto &treeName = treeNames[i];
143
144 std::unique_ptr<TFile> f(TFile::Open(
145 fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")); // need TFile::Open to load plugins if need be
146 if (!f || f->IsZombie()) {
147 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
148 throw std::runtime_error(msg);
149 }
150 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
151
152 if (!t) {
153 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
154 "\" from file \"" + fileName + "\"";
155 throw std::runtime_error(msg);
156 }
157
158 // Avoid calling TROOT::RecursiveRemove for this tree, it takes the read lock and we don't need it.
159 t->ResetBit(kMustCleanup);
161 auto clusterIter = t->GetClusterIterator(0);
162 Long64_t clusterStart = 0ll, clusterEnd = 0ll;
163 const Long64_t entries = t->GetEntries();
164 // Iterate over the clusters in the current file
165 std::vector<EntryRange> entryRanges;
166 while ((clusterStart = clusterIter()) < entries && !rangeEndReached) {
167 clusterEnd = clusterIter.GetNextEntry();
168 // Currently, if a user specified a range, the clusters will be only globally obtained
169 // Assume that there are 3 files with entries: [0, 100], [0, 150], [0, 200] (in this order)
170 // Since the cluster boundaries are obtained sequentially, applying the offsets, the boundaries
171 // would be: 0, 100, 250, 450. Now assume that the user provided the range (150, 300)
172 // Then, in the first iteration, nothing is going to be added to entryRanges since:
173 // std::max(0, 150) < std::min(100, max). Then, by the same logic only a subset of the second
174 // tree is added, i.e.: currentStart is now 200 and currentEnd is 250 (locally from 100 to 150).
175 // Lastly, the last tree would take entries from 250 to 300 (or from 0 to 50 locally).
176 // The current file's offset to start and end is added to make them (chain) global
177 const auto currentStart = std::max(clusterStart + offset, range.first);
178 const auto currentEnd = std::min(clusterEnd + offset, range.second);
179 // This is not satified if the desired start is larger than the last entry of some cluster
180 // In this case, this cluster is not going to be processes further
181 if (currentStart < currentEnd)
182 entryRanges.emplace_back(EntryRange{currentStart, currentEnd});
183 if (currentEnd == range.second) // if the desired end is reached, stop reading further
184 rangeEndReached = true;
185 }
186 offset += entries; // consistently keep track of the total number of entries
187 clustersPerFile.emplace_back(std::move(entryRanges));
188 // Keep track of the entries, even if their corresponding tree is out of the range, e.g. entryRanges is empty
189 entriesPerFile.emplace_back(entries);
190 }
191 if (range.first >= offset && offset > 0) // do not error out on an empty tree
192 throw std::logic_error(std::string("A range of entries was passed in the creation of the TTreeProcessorMT, ") +
193 "but the starting entry (" + range.first + ") is larger than the total number of " +
194 "entries (" + offset + ") in the dataset.");
195
196 // Here we "fuse" clusters together if the number of clusters is too big with respect to
197 // the number of slots, otherwise we can incur in an overhead which is big enough
198 // to make parallelisation detrimental to performance.
199 // For example, this is the case when, following a merging of many small files, a file
200 // contains a tree with many entries and with clusters of just a few entries each.
201 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
202 // of files (e.g. 1000 files): the large amount of files might result in a large amount
203 // of tasks, but the elevated concurrency level makes the little synchronization required by
204 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
205 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
206 //
207 // The criterion according to which we fuse clusters together is to have around
208 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
209 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
210
211 std::vector<std::vector<EntryRange>> eventRangesPerFile(clustersPerFile.size());
212 auto clustersPerFileIt = clustersPerFile.begin();
213 auto eventRangesPerFileIt = eventRangesPerFile.begin();
214 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
215 const auto clustersInThisFileSize = clustersPerFileIt->size();
216 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
217 // If the number of clusters is less than maxTasksPerFile
218 // we take the clusters as they are
219 if (nFolds == 0) {
220 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
221 continue;
222 }
223 // Otherwise, we have to merge clusters, distributing the reminder evenly
224 // onto the first clusters
225 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
226 const auto &clustersInThisFile = *clustersPerFileIt;
227 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
228 const auto start = clustersInThisFile[i].first;
229 // We lump together at least nFolds clusters, therefore
230 // we need to jump ahead of nFolds-1.
231 i += (nFolds - 1);
232 // We now add a cluster if we have some reminder left
233 if (nReminderClusters > 0) {
234 i += 1U;
235 nReminderClusters--;
236 }
237 const auto end = clustersInThisFile[i].second;
238 eventRangesPerFileIt->emplace_back(EntryRange({start, end}));
239 }
240 }
241
242 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
243}
244
245} // anonymous namespace
246
247namespace ROOT {
248
250
251namespace Internal {
252
253////////////////////////////////////////////////////////////////////////////////
254/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
255/// \param[in] treeNames Name of the tree for each file in `fileNames`.
256/// \param[in] fileNames Files to be opened.
257/// \param[in] friendInfo Information about TTree friends, if any.
258/// \param[in] nEntries Number of entries to be processed.
259/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
260void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
261 const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries)
262{
264 // Because of the range, we might have stopped reading entries earlier,
265 // hence the size of nEntries can be smaller than the number of all files
266 // TODO: pass "firstFileToProcess" index in case of a range,
267 // and do not add files to the chain, which are before the desired start entry of the range
268 const auto nFilesToProcess = nEntries.size();
269 for (auto i = 0u; i < nFilesToProcess; ++i) {
270 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
271 }
273
275 const auto nFriends = friendInfo.fFriendNames.size();
276 R__ASSERT(nFriends == fFriends.size() && "Created the wrong number of friends from the available information.");
277 for (std::size_t i = 0ul; i < nFriends; i++) {
278 const auto &thisFriendAlias = friendInfo.fFriendNames[i].second;
279 fChain->AddFriend(fFriends[i].get(), thisFriendAlias.c_str());
280 }
281}
282
283//////////////////////////////////////////////////////////////////////////
284/// Get a TTreeReader for the current tree of this view.
285std::unique_ptr<TTreeReader>
286TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
287 const std::vector<std::string> &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo,
288 const TEntryList &entryList, const std::vector<Long64_t> &nEntries)
289{
290 const bool hasEntryList = entryList.GetN() > 0;
291 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
292 const bool needNewChain =
293 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
294 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
295 if (needNewChain) {
296 MakeChain(treeNames, fileNames, friendInfo, nEntries);
297 if (hasEntryList) {
298 fEntryList.reset(new TEntryList(entryList));
299 if (fEntryList->GetLists() != nullptr) {
300 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
301 // sub-lists of the former...
302 fChain->SetEntryList(fEntryList.get());
303 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
304 }
305 }
306 }
307 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
308 reader->SetEntriesRange(start, end);
309 return reader;
310}
311
312////////////////////////////////////////////////////////////////////////
313/// Clear the resources
315{
316 fChain.reset();
317 fEntryList.reset();
318 fFriends.clear();
319}
320
321} // namespace Internal
322} // namespace ROOT
323
324/////////////////////////////////////////////////////////////////////////////////////////////////
325/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
326std::vector<std::string> TTreeProcessorMT::FindTreeNames()
327{
328 std::vector<std::string> treeNames;
329
330 if (fFileNames.empty()) // This can never happen
331 throw std::runtime_error("Empty list of files and no tree name provided");
332
334 for (const auto &fname : fFileNames) {
335 std::string treeName;
336 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
337 TIter next(f->GetListOfKeys());
338 while (auto *key = static_cast<TKey *>(next())) {
339 const char *className = key->GetClassName();
340 if (strcmp(className, "TTree") == 0) {
341 treeName = key->GetName();
342 break;
343 }
344 }
345 if (treeName.empty())
346 throw std::runtime_error("Cannot find any tree in file " + fname);
347 treeNames.emplace_back(std::move(treeName));
348 }
349
350 return treeNames;
351}
352
353////////////////////////////////////////////////////////////////////////
354/// Constructor based on a file name.
355/// \param[in] filename Name of the file containing the tree to process.
356/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
357/// for a TTree key in the file and will use the first one it finds.
358/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
359/// the same as for TThreadExecutor.
360/// \param[in] /// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
361TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename, UInt_t nThreads,
362 const EntryRange &globalRange)
363 : fFileNames({std::string(filename)}),
364 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
365 fPool(nThreads), fGlobalRange(globalRange)
366{
368}
369
370std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
371{
372 if (views.empty())
373 throw std::runtime_error("The provided list of file names is empty");
374
375 std::vector<std::string> strings;
376 strings.reserve(views.size());
377 for (const auto &v : views)
378 strings.emplace_back(v);
379 return strings;
380}
381
382////////////////////////////////////////////////////////////////////////
383/// Constructor based on a collection of file names.
384/// \param[in] filenames Collection of the names of the files containing the tree to process.
385/// \param[in] treename Name of the tree to process. If not provided, the implementation will
386/// search filenames for a TTree key and will use the first one it finds in each file.
387/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
388/// the same as for TThreadExecutor.
389/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
390///
391/// If different files contain TTrees with different names and automatic TTree name detection is not an option
392/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
393/// it to the appropriate TTreeProcessorMT constructor.
394TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
395 UInt_t nThreads, const EntryRange &globalRange)
396 : fFileNames(CheckAndConvert(filenames)),
397 fTreeNames(treename.empty() ? FindTreeNames()
398 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
399 fFriendInfo(), fPool(nThreads), fGlobalRange(globalRange)
400{
402}
403
404////////////////////////////////////////////////////////////////////////
405/// Constructor based on a TTree and a TEntryList.
406/// \param[in] tree Tree or chain of files containing the tree to process.
407/// \param[in] entries List of entry numbers to process.
408/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
409/// the same as for TThreadExecutor.
411 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
412 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)),
413 fEntryList(entries),
414 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree, /*retrieveEntries*/ true)),
415 fPool(nThreads)
416{
418}
419
420////////////////////////////////////////////////////////////////////////
421/// Constructor based on a TTree.
422/// \param[in] tree Tree or chain of files containing the tree to process.
423/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
424/// the same as for TThreadExecutor.
425/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
426TTreeProcessorMT::TTreeProcessorMT(TTree &tree, UInt_t nThreads, const EntryRange &globalRange)
427 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
428 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)),
429 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree, /*retrieveEntries*/ true)),
430 fPool(nThreads),
431 fGlobalRange(globalRange)
432{
433}
434
435//////////////////////////////////////////////////////////////////////////////
436/// Process the entries of a TTree in parallel. The user-provided function
437/// receives a TTreeReader which can be used to iterate on a subrange of
438/// entries
439/// ~~~{.cpp}
440/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
441/// // Select branches to read
442/// while (readerSubRange.Next()) {
443/// // Use content of current entry
444/// }
445/// });
446/// ~~~
447/// The user needs to be aware that each of the subranges can potentially
448/// be processed in parallel. This means that the code of the user function
449/// should be thread safe.
450///
451/// \param[in] func User-defined function that processes a subrange of entries
452void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
453{
454 // compute number of tasks per file
455 const unsigned int maxTasksPerFile =
456 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
457
458 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
459 // so we do it here for all files.
460 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
461 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
462 // sub-entrylists.
463 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
464 const bool hasEntryList = fEntryList.GetN() > 0;
465 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList || fGlobalRange.first > 0 ||
466 fGlobalRange.second != std::numeric_limits<Long64_t>::max();
467 ClustersAndEntries allClusterAndEntries{};
468 auto &allClusters = allClusterAndEntries.first;
469 const auto &allEntries = allClusterAndEntries.second;
470 if (shouldRetrieveAllClusters) {
471 allClusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile, fGlobalRange);
472 if (hasEntryList)
473 allClusters = ConvertToElistClusters(std::move(allClusters), fEntryList, fTreeNames, fFileNames, allEntries);
474 }
475
476 // Per-file processing in case we retrieved all cluster info upfront
477 auto processFileUsingGlobalClusters = [&](std::size_t fileIdx) {
478 auto processCluster = [&](const EntryRange &c) {
479 auto r =
480 fTreeView->GetTreeReader(c.first, c.second, fTreeNames, fFileNames, fFriendInfo, fEntryList, allEntries);
481 func(*r);
482 };
483 fPool.Foreach(processCluster, allClusters[fileIdx]);
484 };
485
486 // Per-file processing that also retrieves cluster info for a file
487 auto processFileRetrievingClusters = [&](std::size_t fileIdx) {
488 // Evaluate clusters (with local entry numbers) and number of entries for this file
489 const auto &treeNames = std::vector<std::string>({fTreeNames[fileIdx]});
490 const auto &fileNames = std::vector<std::string>({fFileNames[fileIdx]});
491 const auto clustersAndEntries = MakeClusters(treeNames, fileNames, maxTasksPerFile);
492 const auto &clusters = clustersAndEntries.first[0];
493 const auto &entries = clustersAndEntries.second[0];
494 auto processCluster = [&](const EntryRange &c) {
495 auto r = fTreeView->GetTreeReader(c.first, c.second, treeNames, fileNames, fFriendInfo, fEntryList, {entries});
496 func(*r);
497 };
498 fPool.Foreach(processCluster, clusters);
499 };
500
501 const auto firstNonEmpty =
502 fGlobalRange.first > 0u ? std::distance(allClusters.begin(), std::find_if(allClusters.begin(), allClusters.end(),
503 [](auto &c) { return !c.empty(); }))
504 : 0u;
505
506 std::vector<std::size_t> fileIdxs(allEntries.empty() ? fFileNames.size() : allEntries.size() - firstNonEmpty);
507 std::iota(fileIdxs.begin(), fileIdxs.end(), firstNonEmpty);
508
509 if (shouldRetrieveAllClusters)
510 fPool.Foreach(processFileUsingGlobalClusters, fileIdxs);
511 else
512 fPool.Foreach(processFileRetrievingClusters, fileIdxs);
513
514 // make sure TChains and TFiles are cleaned up since they are not globally tracked
515 for (unsigned int islot = 0; islot < fTreeView.GetNSlots(); ++islot) {
516 ROOT::Internal::TTreeView *view = fTreeView.GetAtSlotRaw(islot);
517 if (view != nullptr) {
518 view->Reset();
519 }
520 }
521}
522
523////////////////////////////////////////////////////////////////////////
524/// \brief Retrieve the current value for the desired number of tasks per worker.
525/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
527{
529}
530
531////////////////////////////////////////////////////////////////////////
532/// \brief Set the hint for the desired number of tasks created per worker.
533/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
534///
535/// This allows to create a reasonable number of tasks even if any of the
536/// processed files features a bad clustering, for example with a lot of
537/// entries and just a few entries per cluster, or to limit the number of
538/// tasks spawned when a very large number of files and workers is used.
539void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
540{
541 fgTasksPerWorkerHint = tasksPerWorkerHint;
542}
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition RtypesCore.h:80
#define gDirectory
Definition TDirectory.h:386
#define R__ASSERT(e)
Definition TError.h:117
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
@ kMustCleanup
Definition TObject.h:370
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
void Reset()
Clear the resources.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries)
Get a TTreeReader for the current tree of this view.
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
ROOT::TreeUtils::RFriendInfo fFriendInfo
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
std::vector< std::string > FindTreeNames()
Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u, const std::pair< Long64_t, Long64_t > &globalRange={0, std::numeric_limits< Long64_t >::max()})
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
std::pair< Long64_t, Long64_t > fGlobalRange
A chain is a collection of files containing TTree objects.
Definition TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual TList * GetLists() const
Definition TEntryList.h:76
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
virtual Long64_t GetN() const
Definition TEntryList.h:78
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4053
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
@ kCanDelete
if object in a list can be deleted
Definition TObject.h:62
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition TTreeReader.h:44
A TTree represents a columnar dataset.
Definition TTree.h:79
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
std::vector< std::unique_ptr< TChain > > MakeFriends(const ROOT::TreeUtils::RFriendInfo &finfo)
Create friends from the main TTree.
void ClearMustCleanupBits(TObjArray &arr)
Reset the kMustCleanup bit of a TObjArray of TBranch objects (e.g.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
void EnableThreadSafety()
Enable support for multi-threading within the ROOT code in particular, enables the global mutex to ma...
Definition TROOT.cxx:493
Definition tree.py:1
Information about friend trees of a certain TTree or TChain object.
std::vector< std::pair< std::string, std::string > > fFriendNames
Pairs of names and aliases of each friend tree/chain.