38 const std::vector<ReadSpeedRegex> ®exes)
40 const auto f = std::unique_ptr<TFile>(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
41 if (
f ==
nullptr ||
f->IsZombie())
42 throw std::runtime_error(
"Could not open file '" + fileName +
'\'');
43 std::unique_ptr<TTree> t(
f->Get<
TTree>(treeName.c_str()));
45 throw std::runtime_error(
"Could not retrieve tree '" + treeName +
"' from file '" + fileName +
'\'');
48 std::set<ReadSpeedRegex> usedRegexes;
49 std::vector<std::string> branchNames;
51 auto filterBranchName = [regexes, &usedRegexes](
const std::string &bName) {
52 if (regexes.size() == 1 && regexes[0].text ==
".*") {
53 usedRegexes.insert(regexes[0]);
57 const auto matchBranch = [&usedRegexes, bName](
const ReadSpeedRegex ®ex) {
58 bool match = std::regex_match(bName, regex.regex);
61 usedRegexes.insert(regex);
66 const auto iterator = std::find_if(regexes.begin(), regexes.end(), matchBranch);
67 return iterator != regexes.end();
69 std::copy_if(unfilteredBranchNames.begin(), unfilteredBranchNames.end(), std::back_inserter(branchNames),
72 if (branchNames.empty()) {
73 std::cerr <<
"Provided branch regexes didn't match any branches in tree '" + treeName +
"' from file '" +
77 if (usedRegexes.size() != regexes.size()) {
78 std::string errString =
"The following regexes didn't match any branches in tree '" + treeName +
"' from file '" +
79 fileName +
"', this is probably unintended:\n";
80 for (
const auto ®ex : regexes) {
81 if (usedRegexes.find(regex) == usedRegexes.end())
82 errString +=
'\t' + regex.text +
'\n';
84 std::cerr << errString;
94 std::vector<std::vector<std::string>> fileBranchNames;
96 std::vector<ReadSpeedRegex> regexes;
98 std::transform(
d.fBranchNames.begin(),
d.fBranchNames.end(), std::back_inserter(regexes), [](std::string
text) {
99 return ReadSpeedRegex{text, std::regex(text)};
102 for (
const auto &fName :
d.fFileNames) {
103 std::vector<std::string> branchNames;
107 branchNames =
d.fBranchNames;
109 fileBranchNames.push_back(branchNames);
111 if (
d.fTreeNames.size() > 1)
115 return fileBranchNames;
119 const auto uncompressedBytes =
120 std::accumulate(bytesData.begin(), bytesData.end(), 0ull,
122 const auto compressedBytes =
123 std::accumulate(bytesData.begin(), bytesData.end(), 0ull,
126 return {uncompressedBytes, compressedBytes};
133 std::unique_ptr<TTree> t(
f->Get<
TTree>(treeName.c_str()));
135 throw std::runtime_error(
"Could not retrieve tree '" + treeName +
"' from file '" +
f->GetName() +
'\'');
137 t->SetBranchStatus(
"*", 0);
139 std::vector<TBranch *> branches;
140 for (
const auto &bName : branchNames) {
141 auto *
b = t->GetBranch(bName.c_str());
143 throw std::runtime_error(
"Could not retrieve branch '" + bName +
"' from tree '" + t->GetName() +
144 "' in file '" + t->GetCurrentFile()->GetName() +
'\'');
147 branches.push_back(
b);
150 const auto nEntries = t->GetEntries();
153 else if (range.
fEnd > nEntries)
154 throw std::runtime_error(
"Range end (" + std::to_string(range.
fEnd) +
") is beyond the end of tree '" +
155 t->GetName() +
"' in file '" + t->GetCurrentFile()->GetName() +
"' with " +
156 std::to_string(nEntries) +
" entries.");
159 const ULong64_t fileStartBytes =
f->GetBytesRead();
161 for (
auto *
b : branches)
162 bytesRead +=
b->GetEntry(
e);
164 const ULong64_t fileBytesRead =
f->GetBytesRead() - fileStartBytes;
165 return {bytesRead, fileBytesRead};
178 for (
const auto &fileName :
d.fFileNames) {
179 auto f = std::unique_ptr<TFile>(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
180 if (
f ==
nullptr ||
f->IsZombie())
181 throw std::runtime_error(
"Could not open file '" + fileName +
'\'');
185 const auto byteData =
ReadTree(
f.get(),
d.fTreeNames[treeIdx], fileBranchNames[fileIdx]);
186 uncompressedBytesRead += byteData.fUncompressedBytesRead;
187 compressedBytesRead += byteData.fCompressedBytesRead;
189 if (
d.fTreeNames.size() > 1)
196 return {sw.
RealTime(), sw.
CpuTime(), 0., 0., uncompressedBytesRead, compressedBytesRead, 0};
203 const auto nFiles =
d.fFileNames.size();
204 std::vector<std::vector<EntryRange>> ranges(nFiles);
205 for (
auto fileIdx = 0u; fileIdx < nFiles; ++fileIdx) {
206 const auto &fileName =
d.fFileNames[fileIdx];
207 std::unique_ptr<TFile>
f(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
208 if (
f ==
nullptr ||
f->IsZombie())
209 throw std::runtime_error(
"There was a problem opening file '" + fileName +
'\'');
210 const auto &treeName =
d.fTreeNames.size() > 1 ?
d.fTreeNames[fileIdx] :
d.fTreeNames[0];
211 auto *t =
f->Get<
TTree>(treeName.c_str());
213 throw std::runtime_error(
"There was a problem retrieving TTree '" + treeName +
"' from file '" + fileName +
216 const auto nEntries = t->GetEntries();
217 auto it = t->GetClusterIterator(0);
219 std::vector<EntryRange> rangesInFile;
220 while ((start = it.Next()) < nEntries)
221 rangesInFile.emplace_back(
EntryRange{start, it.GetNextEntry()});
222 ranges[fileIdx] = std::move(rangesInFile);
230std::vector<std::vector<EntryRange>>
233 std::vector<std::vector<EntryRange>> mergedClusters(clusters.size());
235 auto clustersIt = clusters.begin();
236 auto mergedClustersIt = mergedClusters.begin();
237 for (; clustersIt != clusters.end(); clustersIt++, mergedClustersIt++) {
238 const auto nClustersInThisFile = clustersIt->size();
239 const auto nFolds = nClustersInThisFile / maxTasksPerFile;
243 *mergedClustersIt = *clustersIt;
248 auto nReminderClusters = nClustersInThisFile % maxTasksPerFile;
249 const auto &clustersInThisFile = *clustersIt;
250 for (
auto i = 0ULL; i < nClustersInThisFile; ++i) {
251 const auto start = clustersInThisFile[i].fStart;
256 if (nReminderClusters > 0) {
260 const auto end = clustersInThisFile[i].fEnd;
261 mergedClustersIt->emplace_back(
EntryRange({start, end}));
263 assert(nReminderClusters == 0 &&
"This should never happen, cluster-merging logic is broken.");
266 return mergedClusters;
274 if (actualThreads != nThreads)
275 std::cerr <<
"Running with " << actualThreads <<
" threads even though " << nThreads <<
" were requested.\n";
279 const unsigned int maxTasksPerFile =
285 const size_t nranges =
286 std::accumulate(rangesPerFile.begin(), rangesPerFile.end(), 0u, [](
size_t s,
auto &
r) { return s + r.size(); });
287 std::cout <<
"Total number of tasks: " << nranges <<
'\n';
292 std::vector<int> lastFileIdxs(actualThreads, -1);
293 std::vector<std::unique_ptr<TFile>> lastTFiles(actualThreads);
295 auto processFile = [&](
int fileIdx) {
296 const auto &fileName =
d.fFileNames[fileIdx];
297 const auto &treeName =
d.fTreeNames.size() > 1 ?
d.fTreeNames[fileIdx] :
d.fTreeNames[0];
298 const auto &branchNames = fileBranchNames[fileIdx];
302 auto slotIndex = slotRAII.
fSlot;
303 auto &
file = lastTFiles[slotIndex];
304 auto &lastIndex = lastFileIdxs[slotIndex];
306 if (lastIndex != fileIdx) {
307 file.reset(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
311 if (
file ==
nullptr ||
file->IsZombie())
312 throw std::runtime_error(
"Could not open file '" + fileName +
'\'');
319 const auto byteData = pool.
MapReduce(readRange, rangesPerFile[fileIdx],
SumBytes);
333 totalByteData.fUncompressedBytesRead,
334 totalByteData.fCompressedBytesRead,
345 if (
d.fTreeNames.empty()) {
346 std::cerr <<
"Please provide at least one tree name\n";
349 if (
d.fFileNames.empty()) {
350 std::cerr <<
"Please provide at least one file name\n";
353 if (
d.fBranchNames.empty()) {
354 std::cerr <<
"Please provide at least one branch name\n";
357 if (
d.fTreeNames.size() != 1 &&
d.fTreeNames.size() !=
d.fFileNames.size()) {
358 std::cerr <<
"Please provide either one tree name or as many as the file names\n";
366 std::cerr << nThreads
367 <<
" threads were requested, but ROOT was built without implicit multi-threading (IMT) support.\n";
std::vector< std::vector< std::string > > GetPerFileBranchNames(const Data &d)
ByteData SumBytes(const std::vector< ByteData > &bytesData)
unsigned long long ULong64_t
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char text
A thread-safe stack of N indexes (0 to size - 1).
A pseudo container class which is a generator of indices.
This class provides a simple interface to execute the same task multiple times in parallel threads,...
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A ROOT file is composed of a header, followed by consecutive data records (TKey instances) with a wel...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A TTree represents a columnar dataset.
std::vector< std::string > GetTopLevelBranchNames(TTree &t)
Get all the top-level branches names, including the ones of the friend trees.
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Result EvalThroughputST(const Data &d)
std::vector< std::string > GetMatchingBranchNames(const std::string &fileName, const std::string &treeName, const std::vector< ReadSpeedRegex > ®exes)
std::vector< std::vector< EntryRange > > GetClusters(const Data &d)
Result EvalThroughputMT(const Data &d, unsigned nThreads)
Result EvalThroughput(const Data &d, unsigned nThreads)
std::vector< std::vector< EntryRange > > MergeClusters(std::vector< std::vector< EntryRange > > &&clusters, unsigned int maxTasksPerFile)
ByteData ReadTree(TFile *file, const std::string &treeName, const std::vector< std::string > &branchNames, EntryRange range={-1, -1})
A RAII object to pop and push slot numbers from a RSlotStack object.
static uint64_t sum(uint64_t i)