Logo ROOT   6.16/01
Reference Guide
TTreeProcessorMT.cxx
Go to the documentation of this file.
1// @(#)root/thread:$Id$
2// Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3
4/*************************************************************************
5 * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class ROOT::TTreeProcessorMT
13 \ingroup Parallelism
14 \brief A class to process the entries of a TTree in parallel.
15
16By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18passes a function whose only parameter is a TTreeReader. The function iterates
19on a subrange of entries by using that TTreeReader.
20
21The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22each corresponding to a cluster in the TTree. This is possible thanks to the use
23of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24objects.
25*/
26
27#include "TROOT.h"
30
31using namespace ROOT;
32
33namespace ROOT {
34
36
37namespace Internal {
38////////////////////////////////////////////////////////////////////////
39/// Return a vector of cluster boundaries for the given tree and files.
40// EntryClusters and number of entries per file
41using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
42static ClustersAndEntries MakeClusters(const std::string &treeName, const std::vector<std::string> &fileNames)
43{
44 // Note that as a side-effect of opening all files that are going to be used in the
45 // analysis once, all necessary streamers will be loaded into memory.
47 const auto nFileNames = fileNames.size();
48 std::vector<std::vector<EntryCluster>> clustersPerFile;
49 std::vector<Long64_t> entriesPerFile; entriesPerFile.reserve(nFileNames);
50 Long64_t offset = 0ll;
51 for (const auto &fileName : fileNames) {
52 auto fileNameC = fileName.c_str();
53 std::unique_ptr<TFile> f(TFile::Open(fileNameC)); // need TFile::Open to load plugins if need be
54 if (!f || f->IsZombie()) {
55 Error("TTreeProcessorMT::Process",
56 "An error occurred while opening file %s: skipping it.",
57 fileNameC);
58 clustersPerFile.emplace_back(std::vector<EntryCluster>());
59 entriesPerFile.emplace_back(0ULL);
60 continue;
61 }
62 TTree *t = nullptr; // not a leak, t will be deleted by f
63 f->GetObject(treeName.c_str(), t);
64
65 if (!t) {
66 Error("TTreeProcessorMT::Process",
67 "An error occurred while getting tree %s from file %s: skipping this file.",
68 treeName.c_str(), fileNameC);
69 clustersPerFile.emplace_back(std::vector<EntryCluster>());
70 entriesPerFile.emplace_back(0ULL);
71 continue;
72 }
73
74 auto clusterIter = t->GetClusterIterator(0);
75 Long64_t start = 0ll, end = 0ll;
76 const Long64_t entries = t->GetEntries();
77 // Iterate over the clusters in the current file
78 std::vector<EntryCluster> clusters;
79 while ((start = clusterIter()) < entries) {
80 end = clusterIter.GetNextEntry();
81 // Add the current file's offset to start and end to make them (chain) global
82 clusters.emplace_back(EntryCluster{start + offset, end + offset});
83 }
84 offset += entries;
85 clustersPerFile.emplace_back(std::move(clusters));
86 entriesPerFile.emplace_back(entries);
87 }
88
89 // Here we "fuse" together clusters if the number of clusters is to big with respect to
90 // the number of slots, otherwise we can incurr in an overhead which is so big to make
91 // the parallelisation detrimental for performance.
92 // For example, this is the case when following a merging of many small files a file
93 // contains a tree with many entries and with clusters of just a few entries.
94 // The criterion according to which we fuse clusters together is to have at most
95 // TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters per file per slot.
96 // For example: given 2 files and 16 workers, at most
97 // 16 * 2 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters will be created, at most
98 // 16 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() per file.
99
101 std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
102 auto clustersPerFileIt = clustersPerFile.begin();
103 auto eventRangesPerFileIt = eventRangesPerFile.begin();
104 for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
105 const auto clustersInThisFileSize = clustersPerFileIt->size();
106 const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
107 // If the number of clusters is less than maxTasksPerFile
108 // we take the clusters as they are
109 if (nFolds == 0) {
110 std::for_each(clustersPerFileIt->begin(), clustersPerFileIt->end(),
111 [&eventRangesPerFileIt](const EntryCluster &clust) { eventRangesPerFileIt->emplace_back(clust); });
112 continue;
113 }
114 // Otherwise, we have to merge clusters, distributing the reminder evenly
115 // onto the first clusters
116 auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
117 const auto clustersInThisFile = *clustersPerFileIt;
118 for(auto i = 0ULL; i < (clustersInThisFileSize-1); ++i) {
119 const auto start = clustersInThisFile[i].start;
120 // We lump together at least nFolds clusters, therefore
121 // we need to jump ahead of nFolds-1.
122 i += (nFolds - 1);
123 // We now add a cluster if we have some reminder left
124 if (nReminderClusters > 0) {
125 i += 1U;
126 nReminderClusters--;
127 }
128 const auto end = clustersInThisFile[i].end;
129 eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
130 }
131 }
132
133 return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
134}
135
136////////////////////////////////////////////////////////////////////////
137/// Return a vector containing the number of entries of each file of each friend TChain
138static std::vector<std::vector<Long64_t>> GetFriendEntries(const std::vector<std::pair<std::string, std::string>> &friendNames,
139 const std::vector<std::vector<std::string>> &friendFileNames)
140{
141 std::vector<std::vector<Long64_t>> friendEntries;
142 const auto nFriends = friendNames.size();
143 for (auto i = 0u; i < nFriends; ++i) {
144 std::vector<Long64_t> nEntries;
145 const auto &thisFriendName = friendNames[i].first;
146 const auto &thisFriendFiles = friendFileNames[i];
147 for (const auto &fname : thisFriendFiles) {
148 std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
149 TTree *t = nullptr; // owned by TFile
150 f->GetObject(thisFriendName.c_str(), t);
151 nEntries.emplace_back(t->GetEntries());
152 }
153 friendEntries.emplace_back(std::move(nEntries));
154 }
155
156 return friendEntries;
157}
158
159////////////////////////////////////////////////////////////////////////
160/// Return the full path of the tree
161static std::string GetTreeFullPath(const TTree &tree)
162{
163 // Case 1: this is a TChain: we get the name out of the first TChainElement
164 if (0 == strcmp("TChain", tree.ClassName())) {
165 auto &chain = dynamic_cast<const TChain&>(tree);
166 auto files = chain.GetListOfFiles();
167 if (files && 0 != files->GetEntries()) {
168 return files->At(0)->GetName();
169 }
170 }
171
172 // Case 2: this is a TTree: we get the full path of it
173 if (auto motherDir = tree.GetDirectory()) {
174 std::string fullPath(motherDir->GetPath());
175 fullPath += "/";
176 fullPath += tree.GetName();
177 return fullPath;
178 }
179
180 // We do our best and return the name of the tree
181 return tree.GetName();
182}
183
184} // End NS Internal
185} // End NS ROOT
186
187////////////////////////////////////////////////////////////////////////////////
188/// Get and store the names, aliases and file names of the friends of the tree.
189/// \param[in] tree The main tree whose friends to
190///
191/// Note that "friends of friends" and circular references in the lists of friends are not supported.
192Internal::FriendInfo TTreeProcessorMT::GetFriendInfo(TTree &tree)
193{
194 std::vector<Internal::NameAlias> friendNames;
195 std::vector<std::vector<std::string>> friendFileNames;
196
197 const auto friends = tree.GetListOfFriends();
198 if (!friends)
199 return Internal::FriendInfo();
200
201 for (auto fr : *friends) {
202 const auto frTree = static_cast<TFriendElement *>(fr)->GetTree();
203
204 // Check if friend tree has an alias
205 const auto realName = frTree->GetName();
206 const auto alias = tree.GetFriendAlias(frTree);
207 if (alias) {
208 friendNames.emplace_back(std::make_pair(realName, std::string(alias)));
209 } else {
210 friendNames.emplace_back(std::make_pair(realName, ""));
211 }
212
213 // Store the file names of the friend tree
214 friendFileNames.emplace_back();
215 auto &fileNames = friendFileNames.back();
216 const bool isChain = tree.IsA() == TChain::Class();
217 if (isChain) {
218 const auto frChain = static_cast<TChain *>(frTree);
219 for (auto f : *(frChain->GetListOfFiles())) {
220 fileNames.emplace_back(f->GetTitle());
221 }
222 } else {
223 const auto f = frTree->GetCurrentFile();
224 if (!f)
225 throw std::runtime_error("Friend trees with no associated file are not supported.");
226 fileNames.emplace_back(f->GetName());
227 }
228 }
229
230 return Internal::FriendInfo{std::move(friendNames), std::move(friendFileNames)};
231}
232
233////////////////////////////////////////////////////////////////////////////////
234/// Retrieve the name of the first TTree in the first input file, else throw.
236{
237 std::string treeName;
238
239 if (fFileNames.empty())
240 throw std::runtime_error("Empty list of files and no tree name provided");
241
243 std::unique_ptr<TFile> f(TFile::Open(fFileNames[0].c_str()));
244 TIter next(f->GetListOfKeys());
245 while (TKey *key = (TKey *)next()) {
246 const char *className = key->GetClassName();
247 if (strcmp(className, "TTree") == 0) {
248 treeName = key->GetName();
249 break;
250 }
251 }
252 if (treeName.empty())
253 throw std::runtime_error("Cannot find any tree in file " + fFileNames[0]);
254
255 return treeName;
256}
257
258////////////////////////////////////////////////////////////////////////
259/// Constructor based on a file name.
260/// \param[in] filename Name of the file containing the tree to process.
261/// \param[in] treename Name of the tree to process. If not provided,
262/// the implementation will automatically search for a
263/// tree in the file.
265 : fFileNames({std::string(filename)}), fTreeName(treename.empty() ? FindTreeName() : treename), fFriendInfo() {}
266
267std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> & views)
268{
269 if (views.empty())
270 throw std::runtime_error("The provided list of file names is empty");
271
272 std::vector<std::string> strings;
273 strings.reserve(views.size());
274 for (const auto &v : views)
275 strings.emplace_back(v);
276 return strings;
277}
278
279////////////////////////////////////////////////////////////////////////
280/// Constructor based on a collection of file names.
281/// \param[in] filenames Collection of the names of the files containing the tree to process.
282/// \param[in] treename Name of the tree to process. If not provided,
283/// the implementation will automatically search for a
284/// tree in the collection of files.
285TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename)
286 : fFileNames(CheckAndConvert(filenames)), fTreeName(treename.empty() ? FindTreeName() : treename), fFriendInfo() {}
287
288std::vector<std::string> GetFilesFromTree(TTree &tree)
289{
290 std::vector<std::string> filenames;
291
292 const bool isChain = tree.IsA() == TChain::Class();
293 if (isChain) {
294 TObjArray *filelist = static_cast<TChain &>(tree).GetListOfFiles();
295 const auto nFiles = filelist->GetEntries();
296 if (nFiles == 0)
297 throw std::runtime_error("The provided chain of files is empty");
298 filenames.reserve(nFiles);
299 for (auto f : *filelist)
300 filenames.emplace_back(f->GetTitle());
301 } else {
302 TFile *f = tree.GetCurrentFile();
303 if (!f) {
304 const auto msg = "The specified TTree is not linked to any file, in-memory-only trees are not supported.";
305 throw std::runtime_error(msg);
306 }
307
308 filenames.emplace_back(f->GetName());
309 }
310
311 return filenames;
312}
313
314////////////////////////////////////////////////////////////////////////
315/// Constructor based on a TTree and a TEntryList.
316/// \param[in] tree Tree or chain of files containing the tree to process.
317/// \param[in] entries List of entry numbers to process.
319 : fFileNames(GetFilesFromTree(tree)), fTreeName(ROOT::Internal::GetTreeFullPath(tree)), fEntryList(entries),
320 fFriendInfo(GetFriendInfo(tree)) {}
321
322////////////////////////////////////////////////////////////////////////
323/// Constructor based on a TTree.
324/// \param[in] tree Tree or chain of files containing the tree to process.
326
327//////////////////////////////////////////////////////////////////////////////
328/// Process the entries of a TTree in parallel. The user-provided function
329/// receives a TTreeReader which can be used to iterate on a subrange of
330/// entries
331/// ~~~{.cpp}
332/// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
333/// // Select branches to read
334/// while (readerSubRange.Next()) {
335/// // Use content of current entry
336/// }
337/// });
338/// ~~~
339/// The user needs to be aware that each of the subranges can potentially
340/// be processed in parallel. This means that the code of the user function
341/// should be thread safe.
342///
343/// \param[in] func User-defined function that processes a subrange of entries
345{
346 const std::vector<Internal::NameAlias> &friendNames = fFriendInfo.fFriendNames;
347 const std::vector<std::vector<std::string>> &friendFileNames = fFriendInfo.fFriendFileNames;
348
349 // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
350 // so we do it here for all files.
351 const bool hasFriends = !friendNames.empty();
352 const bool hasEntryList = fEntryList.GetN() > 0;
353 const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
354 const auto clustersAndEntries =
356 const auto &clusters = clustersAndEntries.first;
357 const auto &entries = clustersAndEntries.second;
358
359 // Retrieve number of entries for each file for each friend tree
360 const auto friendEntries =
361 hasFriends ? Internal::GetFriendEntries(friendNames, friendFileNames) : std::vector<std::vector<Long64_t>>{};
362
363 TThreadExecutor pool;
364 // Parent task, spawns tasks that process each of the entry clusters for each input file
366 auto processFile = [&](std::size_t fileIdx) {
367 // theseFiles contains either all files or just the single file to process
368 const auto &theseFiles = shouldRetrieveAllClusters ? fFileNames : std::vector<std::string>({fFileNames[fileIdx]});
369 // Evaluate clusters (with local entry numbers) and number of entries for this file, if needed
370 const auto theseClustersAndEntries =
371 shouldRetrieveAllClusters ? Internal::ClustersAndEntries{} : Internal::MakeClusters(fTreeName, theseFiles);
372
373 // All clusters for the file to process, either with global or local entry numbers
374 const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
375
376 // Either all number of entries or just the ones for this file
377 const auto &theseEntries =
378 shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
379
380 auto processCluster = [&](const Internal::EntryCluster &c) {
381 std::unique_ptr<TTreeReader> reader;
382 std::unique_ptr<TEntryList> elist;
383 std::tie(reader, elist) = treeView->GetTreeReader(c.start, c.end, fTreeName, theseFiles, fFriendInfo,
384 fEntryList, theseEntries, friendEntries);
385 func(*reader);
386 };
387
388 pool.Foreach(processCluster, thisFileClusters);
389 };
390
391 std::vector<std::size_t> fileIdxs(fFileNames.size());
392 std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
393
394 // Enable this IMT use case (activate its locks)
396
397 pool.Foreach(processFile, fileIdxs);
398}
399
400////////////////////////////////////////////////////////////////////////
401/// \brief Sets the maximum number of tasks created per file, per worker.
402/// \return The maximum number of tasks created per file, per worker
404{
406}
407
408////////////////////////////////////////////////////////////////////////
409/// \brief Sets the maximum number of tasks created per file, per worker.
410/// \param[in] maxTasksPerFile Name of the file containing the tree to process.
411///
412/// This allows to create a reasonable number of tasks even if any of the
413/// processed files features a bad clustering, for example with a lot of
414/// entries and just a few entries per cluster.
415void TTreeProcessorMT::SetMaxTasksPerFilePerWorker(unsigned int maxTasksPerFile)
416{
417 fgMaxTasksPerFilePerWorker = maxTasksPerFile;
418}
void Class()
Definition: Class.C:29
SVector< double, 2 > v
Definition: Dict.h:5
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
long long Long64_t
Definition: RtypesCore.h:69
#define gDirectory
Definition: TDirectory.h:213
void Error(const char *location, const char *msgfmt,...)
std::vector< std::string > GetFilesFromTree(TTree &tree)
std::vector< std::string > CheckAndConvert(const std::vector< std::string_view > &views)
This class provides a simple interface to execute the same task multiple times in parallel,...
void Foreach(F func, unsigned nTimes, unsigned nChunks=0)
Execute func (with no arguments) nTimes in parallel.
A class to process the entries of a TTree in parallel.
static unsigned int GetMaxTasksPerFilePerWorker()
Sets the maximum number of tasks created per file, per worker.
const std::string fTreeName
Name of the tree.
const std::vector< std::string > fFileNames
Names of the files.
static void SetMaxTasksPerFilePerWorker(unsigned int m)
Sets the maximum number of tasks created per file, per worker.
static unsigned int fgMaxTasksPerFilePerWorker
TTreeProcessorMT(std::string_view filename, std::string_view treename="")
Constructor based on a file name.
const TEntryList fEntryList
User-defined selection of entry numbers to be processed, empty if none was provided.
void Process(std::function< void(TTreeReader &)> func)
Process the entries of a TTree in parallel.
ROOT::TThreadedObject< ROOT::Internal::TTreeView > treeView
! Thread-local TreeViews
const Internal::FriendInfo fFriendInfo
std::string FindTreeName()
Retrieve the name of the first TTree in the first input file, else throw.
A chain is a collection of files containing TTree objects.
Definition: TChain.h:33
TObjArray * GetListOfFiles() const
Definition: TChain.h:107
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual Long64_t GetN() const
Definition: TEntryList.h:75
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3975
A TFriendElement TF describes a TTree object TF in a file.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:24
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
An array of TObjects.
Definition: TObjArray.h:37
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:522
TObject * At(Int_t idx) const
Definition: TObjArray.h:165
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
A simple, robust and fast interface to read values from ROOT colmnar datasets such as TTree,...
Definition: TTreeReader.h:44
std::pair< std::vector< std::vector< EntryCluster > >, std::vector< Long64_t > > ClustersAndEntries
Return a vector of cluster boundaries for the given tree and files.
static std::vector< std::vector< Long64_t > > GetFriendEntries(const std::vector< std::pair< std::string, std::string > > &friendNames, const std::vector< std::vector< std::string > > &friendFileNames)
Return a vector containing the number of entries of each file of each friend TChain.
static ClustersAndEntries MakeClusters(const std::string &treeName, const std::vector< std::string > &fileNames)
static std::string GetTreeFullPath(const TTree &tree)
Return the full path of the tree.
void function(const Char_t *name_, T fun, const Char_t *docstring=0)
Definition: RExports.h:151
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
UInt_t GetImplicitMTPoolSize()
Returns the size of the pool used for implicit multi-threading.
Definition: TROOT.cxx:614
basic_string_view< char > string_view
Definition: RStringView.hxx:35
Definition: tree.py:1
std::vector< std::vector< std::string > > fFriendFileNames
Names of the files where each friend is stored.
std::vector< Internal::NameAlias > fFriendNames
Pairs of names and aliases of friend trees/chains.