Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl017_shared_reader.C File Reference

Detailed Description

View in nbviewer Open in SWAN
Example of efficient multi-threaded reading when multiple threads share a single reader.

In this example, two threads share the work as follows: the first thread processes all the even entries numbers and the second thread all the odd entry numbers. The second thread works twice as slow as the first one.

As a result, the threads need the same clusters and pages but at different points in time. With the naive way of using the reader, this read pattern will result in cache thrashing.

Using the "active entry token" API, on the other hand, the reader will be informed about which entries need to be kept in the caches and cache thrashing is prevented.

#include <TAxis.h>
#include <TCanvas.h>
#include <TGraph.h>
#include <TLatex.h>
#include <TLegend.h>
#include <TRandom3.h>
#include <TROOT.h>
#include <TStyle.h>
#include <array>
#include <atomic>
#include <chrono>
#include <exception>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std::chrono_literals;
// Where to store the ntuple of this example
constexpr char const *kNTupleFileName = "ntpl017_shared_reader.root";
/// The sample class that is stored in the RNTuple
struct Point2D {
float fX;
float fY;
};
/// Whether we read with setting active entry tokens (informed) or not (naive)
enum class EReadMode {
};
/// Nicify output of EReadMode values
std::ostream &operator<<(std::ostream &os, const EReadMode &e)
{
switch (e) {
case EReadMode::kNaive: os << "naive"; break;
case EReadMode::kInformed: os << "informed"; break;
default: os << "???";
}
return os;
}
// To be reset between ProcessEntries calls to Read()
static std::atomic<int> gNEntriesDone;
static std::atomic<int> gThreadId;
/// The read thread's main function
void ProcessEntries(ROOT::RNTupleReader &reader, EReadMode readMode, const std::chrono::microseconds &usPerEvent,
std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages,
std::vector<int> &nClusters, std::vector<int> &nPages)
{
static std::mutex gLock;
const int threadId = gThreadId++;
auto token = reader.CreateActiveEntryToken();
for (unsigned int i = threadId; i < reader.GetNEntries(); i += 2) {
{
std::lock_guard<std::mutex> guard(gLock);
// The only difference between naive and informed reading: in informed reading, we indicate which
// entry we are going to use before loading it.
switch (readMode) {
case EReadMode::kInformed: token.SetEntryNumber(i); break;
case EReadMode::kNaive: break;
default: std::terminate(); // never here
}
reader.LoadEntry(i);
}
std::this_thread::sleep_for(usPerEvent);
const int entriesDone = gNEntriesDone++;
reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nClusterLoaded")->GetValueAsInt();
reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nPageUnsealed")->GetValueAsInt();
reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.RClusterPool.nCluster")->GetValueAsInt();
reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.RPagePool.nPage")->GetValueAsInt();
}
}
void Read(EReadMode readMode, std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages,
std::vector<int> &nClusters, std::vector<int> &nPages)
{
reader->EnableMetrics();
gThreadId = 0;
const auto N = reader->GetNEntries();
nClusters.resize(N);
nPages.resize(N);
std::array<std::thread, 2> threads;
threads[0] = std::thread(ProcessEntries, std::ref(*reader), readMode, 100us, std::ref(sumLoadedClusters),
std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages));
threads[1] = std::thread(ProcessEntries, std::ref(*reader), readMode, 200us, std::ref(sumLoadedClusters),
std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages));
for (auto &t : threads) {
t.join();
}
std::cout << "Reading in mode '" << readMode << "':" << std::endl;
std::cout << "===========================" << std::endl;
reader->GetMetrics().Print(std::cout);
std::cout << "===========================" << std::endl << std::endl;
}
void Write()
{
auto ptrPoint = model->MakeField<Point2D>("point");
auto writer = ROOT::RNTupleWriter::Recreate(std::move(model), "ntpl", kNTupleFileName);
for (int i = 0; i < 1000; ++i) {
if (i % 100 == 0)
writer->CommitCluster();
auto prng = std::make_unique<TRandom3>();
prng->SetSeed();
ptrPoint->fX = prng->Rndm(1);
ptrPoint->fY = prng->Rndm(1);
writer->Fill();
}
}
TGraph *GetGraph(const std::vector<int> &counts)
{
auto graph = new TGraph();
for (unsigned int i = 0; i < counts.size(); ++i) {
graph->SetPoint(i, i, counts[i]);
}
graph->GetXaxis()->SetTitle("Number of processed entries");
return graph;
}
{
Write();
std::vector<int> sumLoadedClusters;
std::vector<int> sumUnsealedPages;
std::vector<int> nClusters;
std::vector<int> nPages;
TCanvas *canvas = new TCanvas("", "Shared Reader Example", 200, 10, 1500, 1000);
canvas->Divide(2, 2);
Read(EReadMode::kNaive, sumLoadedClusters, sumUnsealedPages, nClusters, nPages);
canvas->cd(1);
auto graph1 = GetGraph(sumUnsealedPages);
graph1->SetLineColor(kRed);
graph1->Draw("AL");
auto graph2 = GetGraph(sumLoadedClusters);
graph2->SetLineColor(kBlue);
graph2->Draw("SAME L");
auto legend = new TLegend(0.125, 0.725, 0.625, 0.875);
legend->AddEntry(graph1, "Number of decompressed pages", "l");
legend->AddEntry(graph2, "Number of loaded clusters", "l");
legend->Draw();
latex.SetTextAlign(22);
latex.DrawLatexNDC(0.5, 0.95, "Naive Reading");
canvas->cd(3);
auto graph3 = GetGraph(nPages);
graph3->SetMarkerColor(kRed);
graph3->GetYaxis()->SetNdivisions(8);
graph3->GetYaxis()->SetRangeUser(-0.5, 14);
graph3->Draw("AP");
auto graph4 = GetGraph(nClusters);
graph4->SetMarkerColor(kBlue);
graph4->Draw("SAME P");
legend = new TLegend(0.35, 0.725, 0.85, 0.875);
legend->AddEntry(graph3, "Number of currently cached pages", "p");
legend->AddEntry(graph4, "Number of currently cached clusters", "p");
legend->Draw();
// ===============================
Read(EReadMode::kInformed, sumLoadedClusters, sumUnsealedPages, nClusters, nPages);
canvas->cd(2);
auto graph5 = GetGraph(sumUnsealedPages);
graph5->SetLineColor(kRed);
graph5->Draw("AL");
auto graph6 = GetGraph(sumLoadedClusters);
graph6->SetLineColor(kBlue);
graph6->Draw("SAME L");
latex.SetTextAlign(22);
latex.DrawLatexNDC(0.5, 0.95, "Informed Reading");
canvas->cd(4);
auto graph7 = GetGraph(nPages);
graph7->SetMarkerColor(kRed);
graph7->GetYaxis()->SetNdivisions(8);
graph7->GetYaxis()->SetRangeUser(-0.5, 14);
graph7->Draw("AP");
auto graph8 = GetGraph(nClusters);
graph8->SetMarkerColor(kBlue);
graph8->Draw("SAME P");
}
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
@ kRed
Definition Rtypes.h:67
@ kBlue
Definition Rtypes.h:67
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
R__EXTERN TStyle * gStyle
Definition TStyle.h:442
static std::unique_ptr< RNTupleModel > Create()
Reads RNTuple data from storage.
static std::unique_ptr< RNTupleReader > Open(std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Open an RNTuple for reading.
static std::unique_ptr< RNTupleWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter backed by storage, overwriting it if one with the same URI exists.
virtual void SetLineWidth(Width_t lwidth)
Set the line width.
Definition TAttLine.h:45
virtual void SetMarkerStyle(Style_t mstyle=1)
Set the marker style.
Definition TAttMarker.h:41
The Canvas class.
Definition TCanvas.h:23
TVirtualPad * cd(Int_t subpadnumber=0) override
Set current canvas & pad.
Definition TCanvas.cxx:717
A TGraph is an object made of two arrays X and Y with npoints each.
Definition TGraph.h:41
To draw Mathematical Formula.
Definition TLatex.h:20
This class displays a legend box (TPaveText) containing several legend entries.
Definition TLegend.h:23
void Divide(Int_t nx=1, Int_t ny=1, Float_t xmargin=0.01, Float_t ymargin=0.01, Int_t color=0) override
Automatic pad generation by division.
Definition TPad.cxx:1287
void SetOptStat(Int_t stat=1)
The type of information printed in the histogram statistics box can be selected via the parameter mod...
Definition TStyle.cxx:1641
void EnableImplicitMT(UInt_t numthreads=0)
Enable ROOT's implicit multi-threading for all objects and methods that provide an internal paralleli...
Definition TROOT.cxx:544
Date
October 2025
Author
The ROOT Team

Definition in file ntpl017_shared_reader.C.