39 const std::vector<ReadSpeedRegex> ®exes)
41 const auto f = std::unique_ptr<TFile>(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
42 if (
f ==
nullptr ||
f->IsZombie())
43 throw std::runtime_error(
"Could not open file '" + fileName +
'\'');
44 std::unique_ptr<TTree> t(
f->Get<
TTree>(treeName.c_str()));
46 throw std::runtime_error(
"Could not retrieve tree '" + treeName +
"' from file '" + fileName +
'\'');
49 std::set<ReadSpeedRegex> usedRegexes;
50 std::vector<std::string> branchNames;
52 auto filterBranchName = [regexes, &usedRegexes](
const std::string &bName) {
53 if (regexes.size() == 1 && regexes[0].text ==
".*") {
54 usedRegexes.insert(regexes[0]);
58 const auto matchBranch = [&usedRegexes, bName](
const ReadSpeedRegex ®ex) {
59 bool match = std::regex_match(bName, regex.regex);
62 usedRegexes.insert(regex);
67 const auto iterator = std::find_if(regexes.begin(), regexes.end(), matchBranch);
68 return iterator != regexes.end();
70 std::copy_if(unfilteredBranchNames.begin(), unfilteredBranchNames.end(), std::back_inserter(branchNames),
73 if (branchNames.empty()) {
74 std::cerr <<
"Provided branch regexes didn't match any branches in tree '" + treeName +
"' from file '" +
78 if (usedRegexes.size() != regexes.size()) {
79 std::string errString =
"The following regexes didn't match any branches in tree '" + treeName +
"' from file '" +
80 fileName +
"', this is probably unintended:\n";
81 for (
const auto ®ex : regexes) {
82 if (usedRegexes.find(regex) == usedRegexes.end())
83 errString +=
'\t' + regex.text +
'\n';
85 std::cerr << errString;
95 std::vector<std::vector<std::string>> fileBranchNames;
97 std::vector<ReadSpeedRegex> regexes;
99 std::transform(
d.fBranchNames.begin(),
d.fBranchNames.end(), std::back_inserter(regexes), [](std::string
text) {
100 return ReadSpeedRegex{text, std::regex(text)};
103 for (
const auto &fName :
d.fFileNames) {
104 std::vector<std::string> branchNames;
108 branchNames =
d.fBranchNames;
110 fileBranchNames.push_back(branchNames);
112 if (
d.fTreeNames.size() > 1)
116 return fileBranchNames;
120 const auto uncompressedBytes =
121 std::accumulate(bytesData.begin(), bytesData.end(), 0ull,
123 const auto compressedBytes =
124 std::accumulate(bytesData.begin(), bytesData.end(), 0ull,
127 return {uncompressedBytes, compressedBytes};
134 std::unique_ptr<TTree> t(
f->Get<
TTree>(treeName.c_str()));
136 throw std::runtime_error(
"Could not retrieve tree '" + treeName +
"' from file '" +
f->GetName() +
'\'');
138 t->SetBranchStatus(
"*", 0);
140 std::vector<TBranch *> branches;
141 for (
const auto &bName : branchNames) {
142 auto *
b = t->GetBranch(bName.c_str());
144 throw std::runtime_error(
"Could not retrieve branch '" + bName +
"' from tree '" + t->GetName() +
145 "' in file '" + t->GetCurrentFile()->GetName() +
'\'');
148 branches.push_back(
b);
151 const auto nEntries = t->GetEntries();
154 else if (range.
fEnd > nEntries)
155 throw std::runtime_error(
"Range end (" + std::to_string(range.
fEnd) +
") is beyond the end of tree '" +
156 t->GetName() +
"' in file '" + t->GetCurrentFile()->GetName() +
"' with " +
157 std::to_string(nEntries) +
" entries.");
160 const ULong64_t fileStartBytes =
f->GetBytesRead();
162 for (
auto *
b : branches)
163 bytesRead +=
b->GetEntry(
e);
165 const ULong64_t fileBytesRead =
f->GetBytesRead() - fileStartBytes;
166 return {bytesRead, fileBytesRead};
179 for (
const auto &fileName :
d.fFileNames) {
180 auto f = std::unique_ptr<TFile>(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
181 if (
f ==
nullptr ||
f->IsZombie())
182 throw std::runtime_error(
"Could not open file '" + fileName +
'\'');
186 const auto byteData =
ReadTree(
f.get(),
d.fTreeNames[treeIdx], fileBranchNames[fileIdx]);
187 uncompressedBytesRead += byteData.fUncompressedBytesRead;
188 compressedBytesRead += byteData.fCompressedBytesRead;
190 if (
d.fTreeNames.size() > 1)
197 return {sw.
RealTime(), sw.
CpuTime(), 0., 0., uncompressedBytesRead, compressedBytesRead, 0};
204 const auto nFiles =
d.fFileNames.size();
205 std::vector<std::vector<EntryRange>> ranges(nFiles);
206 for (
auto fileIdx = 0u; fileIdx < nFiles; ++fileIdx) {
207 const auto &fileName =
d.fFileNames[fileIdx];
208 std::unique_ptr<TFile>
f(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
209 if (
f ==
nullptr ||
f->IsZombie())
210 throw std::runtime_error(
"There was a problem opening file '" + fileName +
'\'');
211 const auto &treeName =
d.fTreeNames.size() > 1 ?
d.fTreeNames[fileIdx] :
d.fTreeNames[0];
212 auto *t =
f->Get<
TTree>(treeName.c_str());
214 throw std::runtime_error(
"There was a problem retrieving TTree '" + treeName +
"' from file '" + fileName +
217 const auto nEntries = t->GetEntries();
218 auto it = t->GetClusterIterator(0);
220 std::vector<EntryRange> rangesInFile;
221 while ((
start = it.Next()) < nEntries)
223 ranges[fileIdx] = std::move(rangesInFile);
231std::vector<std::vector<EntryRange>>
234 std::vector<std::vector<EntryRange>> mergedClusters(clusters.size());
236 auto clustersIt = clusters.begin();
237 auto mergedClustersIt = mergedClusters.begin();
238 for (; clustersIt != clusters.end(); clustersIt++, mergedClustersIt++) {
239 const auto nClustersInThisFile = clustersIt->size();
240 const auto nFolds = nClustersInThisFile / maxTasksPerFile;
244 *mergedClustersIt = *clustersIt;
249 auto nReminderClusters = nClustersInThisFile % maxTasksPerFile;
250 const auto &clustersInThisFile = *clustersIt;
251 for (
auto i = 0ULL; i < nClustersInThisFile; ++i) {
252 const auto start = clustersInThisFile[i].fStart;
257 if (nReminderClusters > 0) {
261 const auto end = clustersInThisFile[i].fEnd;
264 assert(nReminderClusters == 0 &&
"This should never happen, cluster-merging logic is broken.");
267 return mergedClusters;
275 if (actualThreads != nThreads)
276 std::cerr <<
"Running with " << actualThreads <<
" threads even though " << nThreads <<
" were requested.\n";
280 const unsigned int maxTasksPerFile =
286 const size_t nranges =
287 std::accumulate(rangesPerFile.begin(), rangesPerFile.end(), 0u, [](
size_t s,
auto &
r) { return s + r.size(); });
288 std::cout <<
"Total number of tasks: " << nranges <<
'\n';
293 std::vector<int> lastFileIdxs(actualThreads, -1);
294 std::vector<std::unique_ptr<TFile>> lastTFiles(actualThreads);
296 auto processFile = [&](
int fileIdx) {
297 const auto &fileName =
d.fFileNames[fileIdx];
298 const auto &treeName =
d.fTreeNames.size() > 1 ?
d.fTreeNames[fileIdx] :
d.fTreeNames[0];
299 const auto &branchNames = fileBranchNames[fileIdx];
303 auto slotIndex = slotRAII.
fSlot;
304 auto &file = lastTFiles[slotIndex];
305 auto &lastIndex = lastFileIdxs[slotIndex];
307 if (lastIndex != fileIdx) {
308 file.reset(
TFile::Open(fileName.c_str(),
"READ_WITHOUT_GLOBALREGISTRATION"));
312 if (file ==
nullptr || file->IsZombie())
313 throw std::runtime_error(
"Could not open file '" + fileName +
'\'');
315 auto result =
ReadTree(file.get(), treeName, branchNames, range);
320 const auto byteData = pool.
MapReduce(readRange, rangesPerFile[fileIdx],
SumBytes);
334 totalByteData.fUncompressedBytesRead,
335 totalByteData.fCompressedBytesRead,
346 if (
d.fTreeNames.empty()) {
347 std::cerr <<
"Please provide at least one tree name\n";
350 if (
d.fFileNames.empty()) {
351 std::cerr <<
"Please provide at least one file name\n";
354 if (
d.fBranchNames.empty()) {
355 std::cerr <<
"Please provide at least one branch name\n";
358 if (
d.fTreeNames.size() != 1 &&
d.fTreeNames.size() !=
d.fFileNames.size()) {
359 std::cerr <<
"Please provide either one tree name or as many as the file names\n";
367 std::cerr << nThreads
368 <<
" threads were requested, but ROOT was built without implicit multi-threading (IMT) support.\n";
std::vector< std::vector< std::string > > GetPerFileBranchNames(const Data &d)
ByteData SumBytes(const std::vector< ByteData > &bytesData)
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
A thread-safe list of N indexes (0 to size - 1).
This class provides a simple interface to execute the same task multiple times in parallel threads,...
auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t< F >
Execute a function nTimes in parallel (Map) and accumulate the results into a single value (Reduce).
static unsigned int GetTasksPerWorkerHint()
Retrieve the current value for the desired number of tasks per worker.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Stop()
Stop the stopwatch.
A TTree represents a columnar dataset.
std::vector< std::string > GetTopLevelBranchNames(TTree &t)
Get all the top-level branches names, including the ones of the friend trees.
TSeq< unsigned long > TSeqUL
UInt_t GetThreadPoolSize()
Returns the size of ROOT's thread pool.
Result EvalThroughputST(const Data &d)
std::vector< std::string > GetMatchingBranchNames(const std::string &fileName, const std::string &treeName, const std::vector< ReadSpeedRegex > ®exes)
std::vector< std::vector< EntryRange > > GetClusters(const Data &d)
Result EvalThroughputMT(const Data &d, unsigned nThreads)
Result EvalThroughput(const Data &d, unsigned nThreads)
std::vector< std::vector< EntryRange > > MergeClusters(std::vector< std::vector< EntryRange > > &&clusters, unsigned int maxTasksPerFile)
ByteData ReadTree(TFile *file, const std::string &treeName, const std::vector< std::string > &branchNames, EntryRange range={-1, -1})
A RAII object to pop and push slot numbers from a RSlotStack object.
static uint64_t sum(uint64_t i)