Logo ROOT  
Reference Guide
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
29
30using namespace ROOT;
31
32namespace {
33
34using EntryRange = std::pair<Long64_t, Long64_t>;
35
36// note that this routine assumes global entry numbers
37static bool ClustersAreSortedAndContiguous(const std::vector<std::vector<EntryRange>> &cls)
38{
39 Long64_t last_end = 0ll;
40 for (const auto &fcl : cls) {
41 for (const auto &c : fcl) {
42 if (last_end != c.first)
43 return false;
44 last_end = c.second;
45 }
46 }
47 return true;
48}
49
50/// Take a vector of vectors of EntryRanges (a vector per file), filter the entries according to entryList, and
51/// and return a new vector of vectors of EntryRanges where cluster start/end entry numbers have been converted to
52/// TEntryList-local entry numbers.
53///
54/// This routine assumes that entry numbers in the TEntryList (and, if present, in the sub-entrylists) are in
55/// ascending order, i.e., for n > m:
56/// elist.GetEntry(n) + tree_offset_for_entry_from_elist(n) > elist.GetEntry(m) + tree_offset_for_entry_from_elist(m)
57static std::vector<std::vector<EntryRange>>
58ConvertToElistClusters(std::vector<std::vector<EntryRange>> &&clusters, TEntryList &entryList,
59 const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
60 const std::vector<Long64_t> &entriesPerFile)
61{
62 R__ASSERT(entryList.GetN() > 0); // wasteful to call this function if it has nothing to do
63 R__ASSERT(ClustersAreSortedAndContiguous(clusters));
64
65 const bool listHasGlobalEntryNumbers = entryList.GetLists() == nullptr;
66 const auto nFiles = clusters.size();
67
68 std::unique_ptr<TChain> chain;
69 using NextFn_t = Long64_t (*)(Long64_t &, TEntryList &, TChain *);
70 // A function that advances TEntryList and returns global entry numbers or -1 if we reached the end
71 // (might or might not need a TChain depending on whether listHasGlobalEntryNumbers)
72 NextFn_t Next;
73 if (listHasGlobalEntryNumbers) {
74 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *) {
75 ++elEntry;
76 return elist.Next();
77 };
78 } else {
79 // we need `chain` to be able to convert local entry numbers to global entry numbers in `Next`
81 for (auto i = 0u; i < nFiles; ++i)
82 chain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), entriesPerFile[i]);
83 Next = [](Long64_t &elEntry, TEntryList &elist, TChain *ch) {
84 ++elEntry;
85 int treenum = -1;
86 Long64_t localEntry = elist.GetEntryAndTree(elEntry, treenum);
87 if (localEntry == -1ll)
88 return localEntry;
89 return localEntry + ch->GetTreeOffset()[treenum];
90 };
91 }
92
93 // the call to GetEntry also serves the purpose to reset TEntryList::fLastIndexQueried,
94 // so we can be sure TEntryList::Next will return the correct thing
95 Long64_t elistEntry = 0ll;
96 Long64_t entry = entryList.GetEntry(elistEntry);
97
98 std::vector<std::vector<EntryRange>> elistClusters;
99
100 for (auto fileN = 0u; fileN < nFiles; ++fileN) {
101 std::vector<EntryRange> elistClustersForFile;
102 for (const auto &c : clusters[fileN]) {
103 if (entry >= c.second || entry == -1ll) // no entrylist entries in this cluster
104 continue;
105 R__ASSERT(entry >= c.first); // current entry should never come before the cluster we are looking at
106 const Long64_t elistRangeStart = elistEntry;
107 // advance entry list until the entrylist entry goes beyond the end of the cluster
108 while (entry < c.second && entry != -1ll)
109 entry = Next(elistEntry, entryList, chain.get());
110 elistClustersForFile.emplace_back(EntryRange{elistRangeStart, elistEntry});
111 }
112 elistClusters.emplace_back(std::move(elistClustersForFile));
113 }
114
115 R__ASSERT(elistClusters.size() == clusters.size()); // same number of files
116 R__ASSERT(ClustersAreSortedAndContiguous(elistClusters));
117
118 entryList.GetEntry(0ll); // reset TEntryList internal state, lest we incur in ROOT-10807
119 return elistClusters;
120}
121
122// EntryRanges and number of entries per file
123using ClustersAndEntries = std::pair<std::vector<std::vector<EntryRange>>, std::vector<Long64_t>>;
124
125////////////////////////////////////////////////////////////////////////
126/// Return a vector of cluster boundaries for the given tree and files.
127static ClustersAndEntries MakeClusters(const std::vector<std::string> &treeNames,
128 const std::vector<std::string> &fileNames, const unsigned int maxTasksPerFile,
129 const EntryRange &range = {0, std::numeric_limits<Long64_t>::max()})
130{
131 // Note that as a side-effect of opening all files that are going to be used in the
132 // analysis once, all necessary streamers will be loaded into memory.
134 const auto nFileNames = fileNames.size();
135 std::vector<std::vector<EntryRange>> clustersPerFile;
136 std::vector<Long64_t> entriesPerFile;
137 entriesPerFile.reserve(nFileNames);
138 Long64_t offset = 0ll;
139 bool rangeEndReached = false; // flag to break the outer loop
140 for (auto i = 0u; i < nFileNames && !rangeEndReached; ++i) {
141 const auto &fileName = fileNames[i];
142 const auto &treeName = treeNames[i];
143
144 std::unique_ptr<TFile> f(TFile::Open(
145 fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION")); // need TFile::Open to load plugins if need be
146 if (!f || f->IsZombie()) {
147 const auto msg = "TTreeProcessorMT::Process: an error occurred while opening file \"" + fileName + "\"";
148 throw std::runtime_error(msg);
149 }
150 auto *t = f->Get<TTree>(treeName.c_str()); // t will be deleted by f
151
152 if (!t) {
153 const auto msg = "TTreeProcessorMT::Process: an error occurred while getting tree \"" + treeName +
154 "\" from file \"" + fileName + "\"";
155 throw std::runtime_error(msg);
156 }
157
158 // Avoid calling TROOT::RecursiveRemove for this tree, it takes the read lock and we don't need it.
159 t->ResetBit(kMustCleanup);
161 auto clusterIter = t->GetClusterIterator(0);
162 Long64_t clusterStart = 0ll, clusterEnd = 0ll;
163 const Long64_t entries = t->GetEntries();
164 // Iterate over the clusters in the current file
165 std::vector<EntryRange> entryRanges;
166 while ((clusterStart = clusterIter()) < entries && !rangeEndReached) {
167 clusterEnd = clusterIter.GetNextEntry();
168 // Currently, if a user specified a range, the clusters will be only globally obtained
169 // Assume that there are 3 files with entries: [0, 100], [0, 150], [0, 200] (in this order)
170 // Since the cluster boundaries are obtained sequentially, applying the offsets, the boundaries
171 // would be: 0, 100, 250, 450. Now assume that the user provided the range (150, 300)
172 // Then, in the first iteration, nothing is going to be added to entryRanges since:
173 // std::max(0, 150) < std::min(100, max). Then, by the same logic only a subset of the second
174 // tree is added, i.e.: currentStart is now 200 and currentEnd is 250 (locally from 100 to 150).
175 // Lastly, the last tree would take entries from 250 to 300 (or from 0 to 50 locally).
176 // The current file's offset to start and end is added to make them (chain) global
177 const auto currentStart = std::max(clusterStart + offset, range.first);
178 const auto currentEnd = std::min(clusterEnd + offset, range.second);
179 // This is not satified if the desired start is larger than the last entry of some cluster
180 // In this case, this cluster is not going to be processes further
181 if (currentStart < currentEnd)
182 entryRanges.emplace_back(EntryRange{currentStart, currentEnd});
183 if (currentEnd == range.second) // if the desired end is reached, stop reading further
184 rangeEndReached = true;
185 }
186 offset += entries; // consistently keep track of the total number of entries
187 clustersPerFile.emplace_back(std::move(entryRanges));
188 // Keep track of the entries, even if their corresponding tree is out of the range, e.g. entryRanges is empty
189 entriesPerFile.emplace_back(entries);
190 }
191 if (range.first >= offset && offset > 0) // do not error out on an empty tree
192 throw std::logic_error(std::string("A range of entries was passed in the creation of the TTreeProcessorMT, ") +
193 "but the starting entry (" + range.first + ") is larger than the total number of " +
194 "entries (" + offset + ") in the dataset.");
195
196 // Here we "fuse" clusters together if the number of clusters is too big with respect to
197 // the number of slots, otherwise we can incur in an overhead which is big enough
198 // to make parallelisation detrimental to performance.
199 // For example, this is the case when, following a merging of many small files, a file
200 // contains a tree with many entries and with clusters of just a few entries each.
201 // Another problematic case is a high number of slots (e.g. 256) coupled with a high number
202 // of files (e.g. 1000 files): the large amount of files might result in a large amount
203 // of tasks, but the elevated concurrency level makes the little synchronization required by
204 // task initialization very expensive. In this case it's better to simply process fewer, larger tasks.
205 // Cluster-merging can help reduce the number of tasks down to a minumum of one task per file.
206 //
207 // The criterion according to which we fuse clusters together is to have around
208 // TTreeProcessorMT::GetTasksPerWorkerHint() clusters per slot.
209 // Concretely, for each file we will cap the number of tasks to ceil(GetTasksPerWorkerHint() * nWorkers / nFiles).
210
211 std::vector<std::vector<EntryRange>> eventRangesPerFile(clustersPerFile.size());
212 auto clustersPerFileIt = clustersPerFile.begin();
213 auto eventRangesPerFileIt = eventRangesPerFile.begin();
214 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
215 const auto clustersInThisFileSize = clustersPerFileIt->size();
216 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
217 // If the number of clusters is less than maxTasksPerFile
218 // we take the clusters as they are
219 if (nFolds == 0) {
220 *eventRangesPerFileIt = std::move(*clustersPerFileIt);
221 continue;
222 }
223 // Otherwise, we have to merge clusters, distributing the reminder evenly
224 // onto the first clusters
225 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
226 const auto &clustersInThisFile = *clustersPerFileIt;
227 for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
228 const auto start = clustersInThisFile[i].first;
229 // We lump together at least nFolds clusters, therefore
230 // we need to jump ahead of nFolds-1.
231 i += (nFolds - 1);
232 // We now add a cluster if we have some reminder left
233 if (nReminderClusters > 0) {
234 i += 1U;
235 nReminderClusters--;
236 }
237 const auto end = clustersInThisFile[i].second;
238 eventRangesPerFileIt->emplace_back(EntryRange({start, end}));
239 }
240 }
241
242 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
243}
244
245////////////////////////////////////////////////////////////////////////
246/// Return a vector containing the number of entries of each file of each friend TChain
247static std::vector<std::vector<Long64_t>> GetFriendEntries(const ROOT::TreeUtils::RFriendInfo &friendInfo)
248{
249
250 const auto &friendNames = friendInfo.fFriendNames;
251 const auto &friendFileNames = friendInfo.fFriendFileNames;
252 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
253
254 std::vector<std::vector<Long64_t>> friendEntries;
255 const auto nFriends = friendNames.size();
256 for (auto i = 0u; i < nFriends; ++i) {
257 std::vector<Long64_t> nEntries;
258 const auto &thisFriendName = friendNames[i].first;
259 const auto &thisFriendFiles = friendFileNames[i];
260 const auto &thisFriendChainSubNames = friendChainSubNames[i];
261 // If this friend has chain sub names, it means it's a TChain.
262 // In this case, we need to traverse all files that make up the TChain,
263 // retrieve the correct sub tree from each file and store the number of
264 // entries for that sub tree.
265 if (!thisFriendChainSubNames.empty()) {
266 // Traverse together filenames and respective treenames
267 for (auto fileidx = 0u; fileidx < thisFriendFiles.size(); ++fileidx) {
268 std::unique_ptr<TFile> curfile(
269 TFile::Open(thisFriendFiles[fileidx].c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
270 if (!curfile || curfile->IsZombie())
271 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" +
272 thisFriendFiles[fileidx] + "\"");
273 // thisFriendChainSubNames[fileidx] stores the name of the current
274 // subtree in the TChain stored in the current file.
275 TTree *curtree = curfile->Get<TTree>(thisFriendChainSubNames[fileidx].c_str());
276 if (!curtree)
277 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
278 thisFriendChainSubNames[fileidx] + "\" from file \"" +
279 thisFriendFiles[fileidx] + "\"");
280 nEntries.emplace_back(curtree->GetEntries());
281 }
282 // Otherwise, if there are no sub names for the current friend, it means
283 // it's a TTree. We can safely use `thisFriendName` as the name of the tree
284 // to retrieve from the file in `thisFriendFiles`
285 } else {
286 for (const auto &fname : thisFriendFiles) {
287 std::unique_ptr<TFile> f(TFile::Open(fname.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
288 if (!f || f->IsZombie())
289 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not open file \"" + fname + "\"");
290 TTree *t = f->Get<TTree>(thisFriendName.c_str());
291 if (!t)
292 throw std::runtime_error("TTreeProcessorMT::GetFriendEntries: Could not retrieve TTree \"" +
293 thisFriendName + "\" from file \"" + fname + "\"");
294 nEntries.emplace_back(t->GetEntries());
295 }
296 }
297 // Store the vector with entries for each file in the current tree/chain.
298 friendEntries.emplace_back(std::move(nEntries));
299 }
300
301 return friendEntries;
302}
303
304} // anonymous namespace
305
306namespace ROOT {
307
309
310namespace Internal {
311
312////////////////////////////////////////////////////////////////////////////////
313/// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
314/// \param[in] treeNames Name of the tree for each file in `fileNames`.
315/// \param[in] fileNames Files to be opened.
316/// \param[in] friendInfo Information about TTree friends, if any.
317/// \param[in] nEntries Number of entries to be processed.
318/// \param[in] friendEntries Number of entries in each friend. Expected to have same ordering as friendInfo.
319void TTreeView::MakeChain(const std::vector<std::string> &treeNames, const std::vector<std::string> &fileNames,
320 const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
321 const std::vector<std::vector<Long64_t>> &friendEntries)
322{
323
324 const auto &friendNames = friendInfo.fFriendNames;
325 const auto &friendFileNames = friendInfo.fFriendFileNames;
326 const auto &friendChainSubNames = friendInfo.fFriendChainSubNames;
327
329 // Because of the range, we might have stopped reading entries earlier,
330 // hence the size of nEntries can be smaller than the number of all files
331 // TODO: pass "firstFileToProcess" index in case of a range,
332 // and do not add files to the chain, which are before the desired start entry of the range
333 const auto nFilesToProcess = nEntries.size();
334 for (auto i = 0u; i < nFilesToProcess; ++i) {
335 fChain->Add((fileNames[i] + "?#" + treeNames[i]).c_str(), nEntries[i]);
336 }
338
339 fFriends.clear();
340 const auto nFriends = friendNames.size();
341 for (auto i = 0u; i < nFriends; ++i) {
342 const auto &thisFriendNameAlias = friendNames[i];
343 const auto &thisFriendName = thisFriendNameAlias.first;
344 const auto &thisFriendAlias = thisFriendNameAlias.second;
345 const auto &thisFriendFiles = friendFileNames[i];
346 const auto &thisFriendChainSubNames = friendChainSubNames[i];
347 const auto &thisFriendEntries = friendEntries[i];
348
349 // Build a friend chain
350 auto frChain = ROOT::Internal::TreeUtils::MakeChainForMT(thisFriendName);
351 const auto nFileNames = friendFileNames[i].size();
352 if (thisFriendChainSubNames.empty()) {
353 // If there are no chain subnames, the friend was a TTree. It's safe
354 // to add to the chain the filename directly.
355 for (auto j = 0u; j < nFileNames; ++j) {
356 frChain->Add(thisFriendFiles[j].c_str(), thisFriendEntries[j]);
357 }
358 } else {
359 // Otherwise, the new friend chain needs to be built using the nomenclature
360 // "filename/treename" as argument to `TChain::Add`
361 for (auto j = 0u; j < nFileNames; ++j) {
362 frChain->Add((thisFriendFiles[j] + "?#" + thisFriendChainSubNames[j]).c_str(), thisFriendEntries[j]);
363 }
364 }
365
366 // Make it friends with the main chain
367 fChain->AddFriend(frChain.get(), thisFriendAlias.c_str());
368 fFriends.emplace_back(std::move(frChain));
369 }
370}
371
372//////////////////////////////////////////////////////////////////////////
373/// Get a TTreeReader for the current tree of this view.
374std::unique_ptr<TTreeReader>
375TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::vector<std::string> &treeNames,
376 const std::vector<std::string> &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo,
377 const TEntryList &entryList, const std::vector<Long64_t> &nEntries,
378 const std::vector<std::vector<Long64_t>> &friendEntries)
379{
380 const bool hasEntryList = entryList.GetN() > 0;
381 const bool usingLocalEntries = friendInfo.fFriendNames.empty() && !hasEntryList;
382 const bool needNewChain =
383 fChain == nullptr || (usingLocalEntries && (fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle() ||
384 treeNames[0] != fChain->GetListOfFiles()->At(0)->GetName()));
385 if (needNewChain) {
386 MakeChain(treeNames, fileNames, friendInfo, nEntries, friendEntries);
387 if (hasEntryList) {
388 fEntryList.reset(new TEntryList(entryList));
389 if (fEntryList->GetLists() != nullptr) {
390 // need to associate the TEntryList to the TChain for the latter to set entry the fTreeNumbers of the
391 // sub-lists of the former...
392 fChain->SetEntryList(fEntryList.get());
393 fEntryList->ResetBit(TObject::kCanDelete); // ...but we want to retain ownership
394 }
395 }
396 }
397 auto reader = std::make_unique<TTreeReader>(fChain.get(), fEntryList.get());
398 reader->SetEntriesRange(start, end);
399 return reader;
400}
401
402////////////////////////////////////////////////////////////////////////
403/// Clear the resources
405{
406 fChain.reset();
407 fEntryList.reset();
408 fFriends.clear();
409}
410
411} // namespace Internal
412} // namespace ROOT
413
414/////////////////////////////////////////////////////////////////////////////////////////////////
415/// Retrieve the names of the TTrees in each of the input files, throw if a TTree cannot be found.
416std::vector<std::string> TTreeProcessorMT::FindTreeNames()
417{
418 std::vector<std::string> treeNames;
419
420 if (fFileNames.empty()) // This can never happen
421 throw std::runtime_error("Empty list of files and no tree name provided");
422
424 for (const auto &fname : fFileNames) {
425 std::string treeName;
426 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
427 TIter next(f->GetListOfKeys());
428 while (auto *key = static_cast<TKey *>(next())) {
429 const char *className = key->GetClassName();
430 if (strcmp(className, "TTree") == 0) {
431 treeName = key->GetName();
432 break;
433 }
434 }
435 if (treeName.empty())
436 throw std::runtime_error("Cannot find any tree in file " + fname);
437 treeNames.emplace_back(std::move(treeName));
438 }
439
440 return treeNames;
441}
442
443////////////////////////////////////////////////////////////////////////
444/// Constructor based on a file name.
445/// \param[in] filename Name of the file containing the tree to process.
446/// \param[in] treename Name of the tree to process. If not provided, the implementation will search
447/// for a TTree key in the file and will use the first one it finds.
448/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
449/// the same as for TThreadExecutor.
450/// \param[in] /// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
452 const EntryRange &globalRange)
453 : fFileNames({std::string(filename)}),
454 fTreeNames(treename.empty() ? FindTreeNames() : std::vector<std::string>{std::string(treename)}), fFriendInfo(),
455 fPool(nThreads), fGlobalRange(globalRange)
456{
458}
459
460std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
461{
462 if (views.empty())
463 throw std::runtime_error("The provided list of file names is empty");
464
465 std::vector<std::string> strings;
466 strings.reserve(views.size());
467 for (const auto &v : views)
468 strings.emplace_back(v);
469 return strings;
470}
471
472////////////////////////////////////////////////////////////////////////
473/// Constructor based on a collection of file names.
474/// \param[in] filenames Collection of the names of the files containing the tree to process.
475/// \param[in] treename Name of the tree to process. If not provided, the implementation will
476/// search filenames for a TTree key and will use the first one it finds in each file.
477/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
478/// the same as for TThreadExecutor.
479/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
480///
481/// If different files contain TTrees with different names and automatic TTree name detection is not an option
482/// (for example, because some of the files contain multiple TTrees) please manually create a TChain and pass
483/// it to the appropriate TTreeProcessorMT constructor.
484TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename,
485 UInt_t nThreads, const EntryRange &globalRange)
486 : fFileNames(CheckAndConvert(filenames)),
487 fTreeNames(treename.empty() ? FindTreeNames()
488 : std::vector<std::string>(fFileNames.size(), std::string(treename))),
489 fFriendInfo(), fPool(nThreads), fGlobalRange(globalRange)
490{
492}
493
494////////////////////////////////////////////////////////////////////////
495/// Constructor based on a TTree and a TEntryList.
496/// \param[in] tree Tree or chain of files containing the tree to process.
497/// \param[in] entries List of entry numbers to process.
498/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
499/// the same as for TThreadExecutor.
500TTreeProcessorMT::TTreeProcessorMT(TTree &tree, const TEntryList &entries, UInt_t nThreads)
501 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
502 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)), fEntryList(entries),
503 fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree)), fPool(nThreads)
504{
506}
507
508////////////////////////////////////////////////////////////////////////
509/// Constructor based on a TTree.
510/// \param[in] tree Tree or chain of files containing the tree to process.
511/// \param[in] nThreads Number of threads to create in the underlying thread-pool. The semantics of this argument are
512/// the same as for TThreadExecutor.
513/// \param[in] globalRange Global entry range to process, {begin (inclusive), end (exclusive)}.
514TTreeProcessorMT::TTreeProcessorMT(TTree &tree, UInt_t nThreads, const EntryRange &globalRange)
515 : fFileNames(Internal::TreeUtils::GetFileNamesFromTree(tree)),
516 fTreeNames(Internal::TreeUtils::GetTreeFullPaths(tree)), fFriendInfo(Internal::TreeUtils::GetFriendInfo(tree)),
517 fPool(nThreads), fGlobalRange(globalRange)
518{
519}
520
521//////////////////////////////////////////////////////////////////////////////
522/// Process the entries of a TTree in parallel. The user-provided function
523/// receives a TTreeReader which can be used to iterate on a subrange of
524/// entries
525/// ~~~{.cpp}
526/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
527/// // Select branches to read
528/// while (readerSubRange.Next()) {
529/// // Use content of current entry
530/// }
531/// });
532/// ~~~
533/// The user needs to be aware that each of the subranges can potentially
534/// be processed in parallel. This means that the code of the user function
535/// should be thread safe.
536///
537/// \param[in] func User-defined function that processes a subrange of entries
539{
540 // compute number of tasks per file
541 const unsigned int maxTasksPerFile =
542 std::ceil(float(GetTasksPerWorkerHint() * fPool.GetPoolSize()) / float(fFileNames.size()));
543
544 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
545 // so we do it here for all files.
546 // Otherwise we can do it later, concurrently for each file, and clusters will contain local entry numbers.
547 // TODO: in practice we could also find clusters per-file in the case of no friends and a TEntryList with
548 // sub-entrylists.
549 const bool hasFriends = !fFriendInfo.fFriendNames.empty();
550 const bool hasEntryList = fEntryList.GetN() > 0;
551 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList || fGlobalRange.first > 0 ||
552 fGlobalRange.second != std::numeric_limits<Long64_t>::max();
553 ClustersAndEntries allClusterAndEntries{};
554 auto &allClusters = allClusterAndEntries.first;
555 const auto &allEntries = allClusterAndEntries.second;
556 if (shouldRetrieveAllClusters) {
557 allClusterAndEntries = MakeClusters(fTreeNames, fFileNames, maxTasksPerFile, fGlobalRange);
558 if (hasEntryList)
559 allClusters = ConvertToElistClusters(std::move(allClusters), fEntryList, fTreeNames, fFileNames, allEntries);
560 }
561
562 // Per-file processing in case we retrieved all cluster info upfront
563 auto processFileUsingGlobalClusters = [&](std::size_t fileIdx) {
564 auto processCluster = [&](const EntryRange &c) {
566 allEntries, GetFriendEntries(fFriendInfo));
567 func(*r);
568 };
569 fPool.Foreach(processCluster, allClusters[fileIdx]);
570 };
571
572 // Per-file processing that also retrieves cluster info for a file
573 auto processFileRetrievingClusters = [&](std::size_t fileIdx) {
574 // Evaluate clusters (with local entry numbers) and number of entries for this file
575 const auto &treeNames = std::vector<std::string>({fTreeNames[fileIdx]});
576 const auto &fileNames = std::vector<std::string>({fFileNames[fileIdx]});
577 const auto clustersAndEntries = MakeClusters(treeNames, fileNames, maxTasksPerFile);
578 const auto &clusters = clustersAndEntries.first[0];
579 const auto &entries = clustersAndEntries.second[0];
580 auto processCluster = [&](const EntryRange &c) {
581 auto r = fTreeView->GetTreeReader(c.first, c.second, treeNames, fileNames, fFriendInfo, fEntryList, {entries},
582 std::vector<std::vector<Long64_t>>{});
583 func(*r);
584 };
585 fPool.Foreach(processCluster, clusters);
586 };
587
588 const auto firstNonEmpty =
589 fGlobalRange.first > 0u ? std::distance(allClusters.begin(), std::find_if(allClusters.begin(), allClusters.end(),
590 [](auto &c) { return !c.empty(); }))
591 : 0u;
592
593 std::vector<std::size_t> fileIdxs(allEntries.empty() ? fFileNames.size() : allEntries.size() - firstNonEmpty);
594 std::iota(fileIdxs.begin(), fileIdxs.end(), firstNonEmpty);
595
596 if (shouldRetrieveAllClusters)
597 fPool.Foreach(processFileUsingGlobalClusters, fileIdxs);
598 else
599 fPool.Foreach(processFileRetrievingClusters, fileIdxs);
600
601 // make sure TChains and TFiles are cleaned up since they are not globally tracked
602 for (unsigned int islot = 0; islot < fTreeView.GetNSlots(); ++islot) {
603 ROOT::Internal::TTreeView *view = fTreeView.GetAtSlotRaw(islot);
604 if (view != nullptr) {
605 view->Reset();
606 }
607 }
608}
609
610////////////////////////////////////////////////////////////////////////
611/// \brief Retrieve the current value for the desired number of tasks per worker.
612/// \return The desired number of tasks to be created per worker. TTreeProcessorMT uses this value as an hint.
614{
616}
617
618////////////////////////////////////////////////////////////////////////
619/// \brief Set the hint for the desired number of tasks created per worker.
620/// \param[in] tasksPerWorkerHint Desired number of tasks per worker.
621///
622/// This allows to create a reasonable number of tasks even if any of the
623/// processed files features a bad clustering, for example with a lot of
624/// entries and just a few entries per cluster, or to limit the number of
625/// tasks spawned when a very large number of files and workers is used.
626void TTreeProcessorMT::SetTasksPerWorkerHint(unsigned int tasksPerWorkerHint)
627{
628 fgTasksPerWorkerHint = tasksPerWorkerHint;
629}
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
long long Long64_t
Definition: RtypesCore.h:80
#define gDirectory
Definition: TDirectory.h:348
#define R__ASSERT(e)
Definition: TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
@ kMustCleanup
Definition: TObject.h:370
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
void MakeChain(const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
std::unique_ptr< TChain > fChain
Chain on which to operate.
std::vector< std::unique_ptr< TChain > > fFriends
Friends of the tree/chain, if present.
std::unique_ptr< TEntryList > fEntryList
TEntryList for fChain, if present.
std::unique_ptr< TTreeReader > GetTreeReader(Long64_t start, Long64_t end, const std::vector< std::string > &treeName, const std::vector< std::string > &fileNames, const ROOT::TreeUtils::RFriendInfo &friendInfo, const TEntryList &entryList, const std::vector< Long64_t > &nEntries, const std::vector< std::vector< Long64_t > > &friendEntries)
Get a TTreeReader for the current tree of this view.
void Reset()
Clear the resources.
ROOT::Internal::TreeUtils::RNoCleanupNotifier fNoCleanupNotifier
unsigned GetPoolSize() const
Returns the number of worker threads in the task arena.
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute a function without arguments several times in parallel, dividing the execution in nChunks.
const std::vector< std::string > fTreeNames
TTree names (always same size and ordering as fFileNames)
const std::vector< std::string > fFileNames
Names of the files.
static unsigned int fgTasksPerWorkerHint
ROOT::TThreadExecutor fPool
! Thread pool for processing.
TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
static void SetTasksPerWorkerHint(unsigned int m)
Set the hint for the desired number of tasks created per worker.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > fTreeView
Thread-local TreeViews.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
TTreeProcessorMT(std::string_view filename, std::string_view treename="", UInt_t nThreads=0u, const std::pair< Long64_t, Long64_t > &globalRange={0, std::numeric_limits< Long64_t >::max()})
const ROOT::TreeUtils::RFriendInfo fFriendInfo
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
std::pair< Long64_t, Long64_t > fGlobalRange
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
TDirectory::TContext keeps track and restore the current directory.
Definition: TDirectory.h:89
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual TList * GetLists() const
Definition: TEntryList.h:76
virtual Long64_t GetEntry(Long64_t index)
Return the number of the entry #index of this TEntryList in the TTree or TChain See also Next().
Definition: TEntryList.cxx:753
virtual Long64_t GetN() const
Definition: TEntryList.h:78
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:4053
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:28
@ kCanDelete
if object in a list can be deleted
Definition: TObject.h:62
A simple, robust and fast interface to read values from ROOT columnar datasets such as TTree,...
Definition: TTreeReader.h:44
A TTree represents a columnar dataset.
Definition: TTree.h:79
virtual Long64_t GetEntries() const
Definition: TTree.h:459
Different standalone functions to work with trees and tuples, not reqiuired to be a member of any cla...
std::vector< std::string > GetTreeFullPaths(const TTree &tree)
Retrieve the full path(s) to a TTree or the trees in a TChain.
std::vector< std::string > GetFileNamesFromTree(const TTree &tree)
Get and store the file names associated with the input tree.
ROOT::TreeUtils::RFriendInfo GetFriendInfo(const TTree &tree)
Get and store the names, aliases and file names of the direct friends of the tree.
RVec< PromoteType< T > > ceil(const RVec< T > &v)
Definition: RVec.hxx:1813
basic_string_view< char > string_view
std::unique_ptr< TChain > MakeChainForMT(const std::string &name="", const std::string &title="")
Create a TChain object with options that avoid common causes of thread contention.
void ClearMustCleanupBits(TObjArray &arr)
Reset the kMustCleanup bit of a TObjArray of TBranch objects (e.g.
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:167
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
void EnableThreadSafety()
Enables the global mutex to make ROOT thread safe/aware.
Definition: TROOT.cxx:493
Definition: tree.py:1
Information about friend trees of a certain TTree or TChain object.
Definition: RFriendInfo.hxx:34
std::vector< std::pair< std::string, std::string > > fFriendNames
Pairs of names and aliases of friend trees/chains.
Definition: RFriendInfo.hxx:37
std::vector< std::vector< std::string > > fFriendChainSubNames
Names of the subtrees of a friend TChain.
Definition: RFriendInfo.hxx:50
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
Definition: RFriendInfo.hxx:42