Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ReadSpeed.cxx
Go to the documentation of this file.
1// Author: Enrico Guiraud, David Poulton 2022
2
3/*************************************************************************
4 * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers. *
5 * All rights reserved. *
6 * *
7 * For the licensing terms see $ROOTSYS/LICENSE. *
8 * For the list of contributors see $ROOTSYS/README/CREDITS. *
9 *************************************************************************/
10
11#include "ReadSpeed.hxx"
12
13#include <ROOT/TSeq.hxx>
14
15#ifdef R__USE_IMT
17#include <ROOT/TTreeProcessorMT.hxx> // for TTreeProcessorMT::GetTasksPerWorkerHint
18#include <ROOT/RSlotStack.hxx>
19#endif
20
21#include <ROOT/InternalTreeUtils.hxx> // for ROOT::Internal::TreeUtils::GetTopLevelBranchNames
22#include <TBranch.h>
23#include <TStopwatch.h>
24#include <TTree.h>
25
26#include <algorithm>
27#include <cassert>
28#include <cmath> // std::ceil
29#include <memory>
30#include <numeric> // std::accumulate
31#include <stdexcept>
32#include <exception>
33#include <set>
34#include <iostream>
35
36using namespace ReadSpeed;
37
38std::vector<std::string> ReadSpeed::GetMatchingBranchNames(const std::string &fileName, const std::string &treeName,
39 const std::vector<ReadSpeedRegex> &regexes)
40{
41 const auto f = std::unique_ptr<TFile>(TFile::Open(fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
42 if (f == nullptr || f->IsZombie())
43 throw std::runtime_error("Could not open file '" + fileName + '\'');
44 std::unique_ptr<TTree> t(f->Get<TTree>(treeName.c_str()));
45 if (t == nullptr)
46 throw std::runtime_error("Could not retrieve tree '" + treeName + "' from file '" + fileName + '\'');
47
49 std::set<ReadSpeedRegex> usedRegexes;
50 std::vector<std::string> branchNames;
51
52 auto filterBranchName = [regexes, &usedRegexes](const std::string &bName) {
53 if (regexes.size() == 1 && regexes[0].text == ".*") {
54 usedRegexes.insert(regexes[0]);
55 return true;
56 }
57
58 const auto matchBranch = [&usedRegexes, bName](const ReadSpeedRegex &regex) {
59 bool match = std::regex_match(bName, regex.regex);
60
61 if (match)
62 usedRegexes.insert(regex);
63
64 return match;
65 };
66
67 const auto iterator = std::find_if(regexes.begin(), regexes.end(), matchBranch);
68 return iterator != regexes.end();
69 };
70 std::copy_if(unfilteredBranchNames.begin(), unfilteredBranchNames.end(), std::back_inserter(branchNames),
72
73 if (branchNames.empty()) {
74 std::cerr << "Provided branch regexes didn't match any branches in tree '" + treeName + "' from file '" +
75 fileName + ".\n";
76 std::terminate();
77 }
78 if (usedRegexes.size() != regexes.size()) {
79 std::string errString = "The following regexes didn't match any branches in tree '" + treeName + "' from file '" +
80 fileName + "', this is probably unintended:\n";
81 for (const auto &regex : regexes) {
82 if (usedRegexes.find(regex) == usedRegexes.end())
83 errString += '\t' + regex.text + '\n';
84 }
85 std::cerr << errString;
86 std::terminate();
87 }
88
89 return branchNames;
90}
91
92std::vector<std::vector<std::string>> GetPerFileBranchNames(const Data &d)
93{
94 auto treeIdx = 0;
95 std::vector<std::vector<std::string>> fileBranchNames;
96
97 std::vector<ReadSpeedRegex> regexes;
98 if (d.fUseRegex)
99 std::transform(d.fBranchNames.begin(), d.fBranchNames.end(), std::back_inserter(regexes), [](std::string text) {
100 return ReadSpeedRegex{text, std::regex(text)};
101 });
102
103 for (const auto &fName : d.fFileNames) {
104 std::vector<std::string> branchNames;
105 if (d.fUseRegex)
106 branchNames = GetMatchingBranchNames(fName, d.fTreeNames[treeIdx], regexes);
107 else
108 branchNames = d.fBranchNames;
109
110 fileBranchNames.push_back(branchNames);
111
112 if (d.fTreeNames.size() > 1)
113 ++treeIdx;
114 }
115
116 return fileBranchNames;
117}
118
119ByteData SumBytes(const std::vector<ByteData> &bytesData) {
120 const auto uncompressedBytes =
121 std::accumulate(bytesData.begin(), bytesData.end(), 0ull,
122 [](ULong64_t sum, const ByteData &o) { return sum + o.fUncompressedBytesRead; });
123 const auto compressedBytes =
124 std::accumulate(bytesData.begin(), bytesData.end(), 0ull,
125 [](ULong64_t sum, const ByteData &o) { return sum + o.fCompressedBytesRead; });
126
128};
129
130// Read branches listed in branchNames in tree treeName in file fileName, return number of uncompressed bytes read.
131ByteData ReadSpeed::ReadTree(TFile *f, const std::string &treeName, const std::vector<std::string> &branchNames,
133{
134 std::unique_ptr<TTree> t(f->Get<TTree>(treeName.c_str()));
135 if (t == nullptr)
136 throw std::runtime_error("Could not retrieve tree '" + treeName + "' from file '" + f->GetName() + '\'');
137
138 t->SetBranchStatus("*", 0);
139
140 std::vector<TBranch *> branches;
141 for (const auto &bName : branchNames) {
142 auto *b = t->GetBranch(bName.c_str());
143 if (b == nullptr)
144 throw std::runtime_error("Could not retrieve branch '" + bName + "' from tree '" + t->GetName() +
145 "' in file '" + t->GetCurrentFile()->GetName() + '\'');
146
147 b->SetStatus(1);
148 branches.push_back(b);
149 }
150
151 const auto nEntries = t->GetEntries();
152 if (range.fStart == -1ll)
154 else if (range.fEnd > nEntries)
155 throw std::runtime_error("Range end (" + std::to_string(range.fEnd) + ") is beyond the end of tree '" +
156 t->GetName() + "' in file '" + t->GetCurrentFile()->GetName() + "' with " +
157 std::to_string(nEntries) + " entries.");
158
160 const ULong64_t fileStartBytes = f->GetBytesRead();
161 for (auto e = range.fStart; e < range.fEnd; ++e)
162 for (auto *b : branches)
163 bytesRead += b->GetEntry(e);
164
165 const ULong64_t fileBytesRead = f->GetBytesRead() - fileStartBytes;
166 return {bytesRead, fileBytesRead};
167}
168
170{
171 auto treeIdx = 0;
172 auto fileIdx = 0;
175
178
179 for (const auto &fileName : d.fFileNames) {
180 auto f = std::unique_ptr<TFile>(TFile::Open(fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
181 if (f == nullptr || f->IsZombie())
182 throw std::runtime_error("Could not open file '" + fileName + '\'');
183
184 sw.Start(false);
185
186 const auto byteData = ReadTree(f.get(), d.fTreeNames[treeIdx], fileBranchNames[fileIdx]);
187 uncompressedBytesRead += byteData.fUncompressedBytesRead;
188 compressedBytesRead += byteData.fCompressedBytesRead;
189
190 if (d.fTreeNames.size() > 1)
191 ++treeIdx;
192 ++fileIdx;
193
194 sw.Stop();
195 }
196
197 return {sw.RealTime(), sw.CpuTime(), 0., 0., uncompressedBytesRead, compressedBytesRead, 0};
198}
199
200// Return a vector of EntryRanges per file, i.e. a vector of vectors of EntryRanges with outer size equal to
201// d.fFileNames.
202std::vector<std::vector<EntryRange>> ReadSpeed::GetClusters(const Data &d)
203{
204 const auto nFiles = d.fFileNames.size();
205 std::vector<std::vector<EntryRange>> ranges(nFiles);
206 for (auto fileIdx = 0u; fileIdx < nFiles; ++fileIdx) {
207 const auto &fileName = d.fFileNames[fileIdx];
208 std::unique_ptr<TFile> f(TFile::Open(fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
209 if (f == nullptr || f->IsZombie())
210 throw std::runtime_error("There was a problem opening file '" + fileName + '\'');
211 const auto &treeName = d.fTreeNames.size() > 1 ? d.fTreeNames[fileIdx] : d.fTreeNames[0];
212 auto *t = f->Get<TTree>(treeName.c_str()); // TFile owns this TTree
213 if (t == nullptr)
214 throw std::runtime_error("There was a problem retrieving TTree '" + treeName + "' from file '" + fileName +
215 '\'');
216
217 const auto nEntries = t->GetEntries();
218 auto it = t->GetClusterIterator(0);
219 Long64_t start = 0;
220 std::vector<EntryRange> rangesInFile;
221 while ((start = it.Next()) < nEntries)
222 rangesInFile.emplace_back(EntryRange{start, it.GetNextEntry()});
223 ranges[fileIdx] = std::move(rangesInFile);
224 }
225 return ranges;
226}
227
228// Mimic the logic of TTreeProcessorMT::MakeClusters: merge entry ranges together such that we
229// run around TTreeProcessorMT::GetTasksPerWorkerHint tasks per worker thread.
230// TODO it would be better to expose TTreeProcessorMT's actual logic and call the exact same method from here
231std::vector<std::vector<EntryRange>>
232ReadSpeed::MergeClusters(std::vector<std::vector<EntryRange>> &&clusters, unsigned int maxTasksPerFile)
233{
234 std::vector<std::vector<EntryRange>> mergedClusters(clusters.size());
235
236 auto clustersIt = clusters.begin();
239 const auto nClustersInThisFile = clustersIt->size();
241 // If the number of clusters is less than maxTasksPerFile
242 // we take the clusters as they are
243 if (nFolds == 0) {
245 continue;
246 }
247 // Otherwise, we have to merge clusters, distributing the reminder evenly
248 // between the first clusters
250 const auto &clustersInThisFile = *clustersIt;
251 for (auto i = 0ULL; i < nClustersInThisFile; ++i) {
252 const auto start = clustersInThisFile[i].fStart;
253 // We lump together at least nFolds clusters, therefore
254 // we need to jump ahead of nFolds-1.
255 i += (nFolds - 1);
256 // We now add a cluster if we have some reminder left
257 if (nReminderClusters > 0) {
258 i += 1U;
260 }
261 const auto end = clustersInThisFile[i].fEnd;
262 mergedClustersIt->emplace_back(EntryRange({start, end}));
263 }
264 assert(nReminderClusters == 0 && "This should never happen, cluster-merging logic is broken.");
265 }
266
267 return mergedClusters;
268}
269
271{
272#ifdef R__USE_IMT
275 if (actualThreads != nThreads)
276 std::cerr << "Running with " << actualThreads << " threads even though " << nThreads << " were requested.\n";
277
279 clsw.Start();
280 const unsigned int maxTasksPerFile =
281 std::ceil(float(ROOT::TTreeProcessorMT::GetTasksPerWorkerHint() * actualThreads) / float(d.fFileNames.size()));
282
284 clsw.Stop();
285
286 const size_t nranges =
287 std::accumulate(rangesPerFile.begin(), rangesPerFile.end(), 0u, [](size_t s, auto &r) { return s + r.size(); });
288 std::cout << "Total number of tasks: " << nranges << '\n';
289
291
293 std::vector<int> lastFileIdxs(actualThreads, -1);
294 std::vector<std::unique_ptr<TFile>> lastTFiles(actualThreads);
295
296 auto processFile = [&](int fileIdx) {
297 const auto &fileName = d.fFileNames[fileIdx];
298 const auto &treeName = d.fTreeNames.size() > 1 ? d.fTreeNames[fileIdx] : d.fTreeNames[0];
299 const auto &branchNames = fileBranchNames[fileIdx];
300
301 auto readRange = [&](const EntryRange &range) -> ByteData {
303 auto slotIndex = slotRAII.fSlot;
304 auto &file = lastTFiles[slotIndex];
305 auto &lastIndex = lastFileIdxs[slotIndex];
306
307 if (lastIndex != fileIdx) {
308 file.reset(TFile::Open(fileName.c_str(), "READ_WITHOUT_GLOBALREGISTRATION"));
309 lastIndex = fileIdx;
310 }
311
312 if (file == nullptr || file->IsZombie())
313 throw std::runtime_error("Could not open file '" + fileName + '\'');
314
315 auto result = ReadTree(file.get(), treeName, branchNames, range);
316
317 return result;
318 };
319
320 const auto byteData = pool.MapReduce(readRange, rangesPerFile[fileIdx], SumBytes);
321
322 return byteData;
323 };
324
326 sw.Start();
327 const auto totalByteData = pool.MapReduce(processFile, ROOT::TSeqUL(d.fFileNames.size()), SumBytes);
328 sw.Stop();
329
330 return {sw.RealTime(),
331 sw.CpuTime(),
332 clsw.RealTime(),
333 clsw.CpuTime(),
334 totalByteData.fUncompressedBytesRead,
335 totalByteData.fCompressedBytesRead,
337#else
338 (void)d;
339 (void)nThreads;
340 return {};
341#endif // R__USE_IMT
342}
343
345{
346 if (d.fTreeNames.empty()) {
347 std::cerr << "Please provide at least one tree name\n";
348 std::terminate();
349 }
350 if (d.fFileNames.empty()) {
351 std::cerr << "Please provide at least one file name\n";
352 std::terminate();
353 }
354 if (d.fBranchNames.empty()) {
355 std::cerr << "Please provide at least one branch name\n";
356 std::terminate();
357 }
358 if (d.fTreeNames.size() != 1 && d.fTreeNames.size() != d.fFileNames.size()) {
359 std::cerr << "Please provide either one tree name or as many as the file names\n";
360 std::terminate();
361 }
362
363#ifdef R__USE_IMT
365#else
366 if (nThreads > 0) {
367 std::cerr << nThreads
368 << " threads were requested, but ROOT was built without implicit multi-threading (IMT) support.\n";
369 std::terminate();
370 }
371 return EvalThroughputST(d);
372#endif
373}
#define d(i)
Definition RSha256.hxx:102
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
std::vector< std::vector< std::string > > GetPerFileBranchNames(const Data &d)
Definition ReadSpeed.cxx:92
ByteData SumBytes(const std::vector< ByteData > &bytesData)
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char text
A thread-safe list of N indexes (0 to size - 1).
const_iterator begin() const
const_iterator end() const
This class provides a simple interface to execute the same task multiple times in parallel threads,...
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:131
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3764
Stopwatch class.
Definition TStopwatch.h:28
A TTree represents a columnar dataset.
Definition TTree.h:89
std::vector< std::string > GetTopLevelBranchNames(TTree &t)
Get all the top-level branches names, including the ones of the friend trees.
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Definition TROOT.cxx:607
Result EvalThroughputST(const Data &d)
std::vector< std::string > GetMatchingBranchNames(const std::string &fileName, const std::string &treeName, const std::vector< ReadSpeedRegex > &regexes)
Definition ReadSpeed.cxx:38
std::vector< std::vector< EntryRange > > GetClusters(const Data &d)
Result EvalThroughputMT(const Data &d, unsigned nThreads)
Result EvalThroughput(const Data &d, unsigned nThreads)
std::vector< std::vector< EntryRange > > MergeClusters(std::vector< std::vector< EntryRange > > &&clusters, unsigned int maxTasksPerFile)
ByteData ReadTree(TFile *file, const std::string &treeName, const std::vector< std::string > &branchNames, EntryRange range={-1, -1})
A RAII object to pop and push slot numbers from a RSlotStack object.
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2339