Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl017_shared_reader.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4/// Example of efficient multi-threaded reading when multiple threads share a single reader.
5/// In this example, two threads share the work as follows: the first thread processes all the even entries numbers
6/// and the second thread all the odd entry numbers. The second thread works twice as slow as the first one.
7///
8/// As a result, the threads need the same clusters and pages but at different points in time.
9/// With the naive way of using the reader, this read pattern will result in cache thrashing.
10///
11/// Using the "active entry token" API, on the other hand, the reader will be informed about which entries
12/// need to be kept in the caches and cache thrashing is prevented.
13///
14/// \macro_image
15/// \macro_code
16///
17/// \date October 2025
18/// \author The ROOT Team
19
20#include <ROOT/RNTupleModel.hxx>
23
24#include <TAxis.h>
25#include <TCanvas.h>
26#include <TGraph.h>
27#include <TLatex.h>
28#include <TLegend.h>
29#include <TRandom3.h>
30#include <TROOT.h>
31#include <TStyle.h>
32
33#include <array>
34#include <atomic>
35#include <chrono>
36#include <exception>
37#include <functional>
38#include <iostream>
39#include <mutex>
40#include <thread>
41#include <vector>
42
43using namespace std::chrono_literals;
44
45// Where to store the ntuple of this example
46constexpr char const *kNTupleFileName = "ntpl017_shared_reader.root";
47
48/// The sample class that is stored in the RNTuple
49struct Point2D {
50 float fX;
51 float fY;
52};
53
54/// Whether we read with setting active entry tokens (informed) or not (naive)
55enum class EReadMode {
56 kNaive,
58};
59
60/// Nicify output of EReadMode values
61std::ostream &operator<<(std::ostream &os, const EReadMode &e)
62{
63 switch (e) {
64 case EReadMode::kNaive: os << "naive"; break;
65 case EReadMode::kInformed: os << "informed"; break;
66 default: os << "???";
67 }
68 return os;
69}
70
71// To be reset between ProcessEntries calls to Read()
72static std::atomic<int> gNEntriesDone;
73static std::atomic<int> gThreadId;
74
75/// The read thread's main function
76void ProcessEntries(ROOT::RNTupleReader &reader, EReadMode readMode, const std::chrono::microseconds &usPerEvent,
77 std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages,
78 std::vector<int> &nClusters, std::vector<int> &nPages)
79{
80 static std::mutex gLock;
81
82 const int threadId = gThreadId++;
83
84 auto token = reader.CreateActiveEntryToken();
85 for (unsigned int i = threadId; i < reader.GetNEntries(); i += 2) {
86 {
87 std::lock_guard<std::mutex> guard(gLock);
88
89 // The only difference between naive and informed reading: in informed reading, we indicate which
90 // entry we are going to use before loading it.
91 switch (readMode) {
92 case EReadMode::kInformed: token.SetEntryNumber(i); break;
93 case EReadMode::kNaive: break;
94 default: std::terminate(); // never here
95 }
96
97 reader.LoadEntry(i);
98 }
99
100 std::this_thread::sleep_for(usPerEvent);
101
102 const int entriesDone = gNEntriesDone++;
104 reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nClusterLoaded")->GetValueAsInt();
106 reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.nPageUnsealed")->GetValueAsInt();
108 reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.RClusterPool.nCluster")->GetValueAsInt();
110 reader.GetMetrics().GetCounter("RNTupleReader.RPageSourceFile.RPagePool.nPage")->GetValueAsInt();
111 }
112}
113
114void Read(EReadMode readMode, std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages,
115 std::vector<int> &nClusters, std::vector<int> &nPages)
116{
118 reader->EnableMetrics();
119
120 gNEntriesDone = 0;
121 gThreadId = 0;
122
123 const auto N = reader->GetNEntries();
124 sumLoadedClusters.resize(N);
125 sumUnsealedPages.resize(N);
126 nClusters.resize(N);
127 nPages.resize(N);
128
129 std::array<std::thread, 2> threads;
130 threads[0] = std::thread(ProcessEntries, std::ref(*reader), readMode, 100us, std::ref(sumLoadedClusters),
131 std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages));
132 threads[1] = std::thread(ProcessEntries, std::ref(*reader), readMode, 200us, std::ref(sumLoadedClusters),
133 std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages));
134 for (auto &t : threads) {
135 t.join();
136 }
137
138 std::cout << "Reading in mode '" << readMode << "':" << std::endl;
139 std::cout << "===========================" << std::endl;
140 reader->GetMetrics().Print(std::cout);
141 std::cout << "===========================" << std::endl << std::endl;
142}
143
144void Write()
145{
146 auto model = ROOT::RNTupleModel::Create();
147 auto ptrPoint = model->MakeField<Point2D>("point");
148
149 auto writer = ROOT::RNTupleWriter::Recreate(std::move(model), "ntpl", kNTupleFileName);
150
151 for (int i = 0; i < 1000; ++i) {
152 if (i % 100 == 0)
153 writer->CommitCluster();
154
155 auto prng = std::make_unique<TRandom3>();
156 prng->SetSeed();
157
158 ptrPoint->fX = prng->Rndm(1);
159 ptrPoint->fY = prng->Rndm(1);
160 writer->Fill();
161 }
162}
163
164TGraph *GetGraph(const std::vector<int> &counts)
165{
166 auto graph = new TGraph();
167 for (unsigned int i = 0; i < counts.size(); ++i) {
168 graph->SetPoint(i, i, counts[i]);
169 }
170 graph->GetXaxis()->SetTitle("Number of processed entries");
171 return graph;
172}
173
175{
177
178 Write();
180
181 std::vector<int> sumLoadedClusters;
182 std::vector<int> sumUnsealedPages;
183 std::vector<int> nClusters;
184 std::vector<int> nPages;
186
187 gStyle->SetOptStat(0);
190 TCanvas *canvas = new TCanvas("", "Shared Reader Example", 200, 10, 1500, 1000);
191
192 canvas->Divide(2, 2);
193
194 Read(EReadMode::kNaive, sumLoadedClusters, sumUnsealedPages, nClusters, nPages);
195
196 canvas->cd(1);
197 auto graph1 = GetGraph(sumUnsealedPages);
198 graph1->SetLineColor(kRed);
199 graph1->Draw("AL");
200 auto graph2 = GetGraph(sumLoadedClusters);
201 graph2->SetLineColor(kBlue);
202 graph2->Draw("SAME L");
203
204 auto legend = new TLegend(0.125, 0.725, 0.625, 0.875);
205 legend->AddEntry(graph1, "Number of decompressed pages", "l");
206 legend->AddEntry(graph2, "Number of loaded clusters", "l");
207 legend->Draw();
208
209 latex.SetTextAlign(22);
210 latex.DrawLatexNDC(0.5, 0.95, "Naive Reading");
211
212 canvas->cd(3);
213
214 auto graph3 = GetGraph(nPages);
215 graph3->SetMarkerColor(kRed);
216 graph3->GetYaxis()->SetNdivisions(8);
217 graph3->GetYaxis()->SetRangeUser(-0.5, 14);
218 graph3->Draw("AP");
219
220 auto graph4 = GetGraph(nClusters);
221 graph4->SetMarkerColor(kBlue);
222 graph4->Draw("SAME P");
223
224 legend = new TLegend(0.35, 0.725, 0.85, 0.875);
225 legend->AddEntry(graph3, "Number of currently cached pages", "p");
226 legend->AddEntry(graph4, "Number of currently cached clusters", "p");
227 legend->Draw();
228
229 // ===============================
230
231 Read(EReadMode::kInformed, sumLoadedClusters, sumUnsealedPages, nClusters, nPages);
232
233 canvas->cd(2);
234 auto graph5 = GetGraph(sumUnsealedPages);
235 graph5->SetLineColor(kRed);
236 graph5->Draw("AL");
237
238 auto graph6 = GetGraph(sumLoadedClusters);
239 graph6->SetLineColor(kBlue);
240 graph6->Draw("SAME L");
241
242 latex.SetTextAlign(22);
243 latex.DrawLatexNDC(0.5, 0.95, "Informed Reading");
244
245 canvas->cd(4);
246
247 auto graph7 = GetGraph(nPages);
248 graph7->SetMarkerColor(kRed);
249 graph7->GetYaxis()->SetNdivisions(8);
250 graph7->GetYaxis()->SetRangeUser(-0.5, 14);
251 graph7->Draw("AP");
252
253 auto graph8 = GetGraph(nClusters);
254 graph8->SetMarkerColor(kBlue);
255 graph8->Draw("SAME P");
256}
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
@ kRed
Definition Rtypes.h:67
@ kBlue
Definition Rtypes.h:67
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define N
R__EXTERN TStyle * gStyle
Definition TStyle.h:442
static std::unique_ptr< RNTupleModel > Create()
Reads RNTuple data from storage.
static std::unique_ptr< RNTupleReader > Open(std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Open an RNTuple for reading.
static std::unique_ptr< RNTupleWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter backed by storage, overwriting it if one with the same URI exists.
virtual void SetLineWidth(Width_t lwidth)
Set the line width.
Definition TAttLine.h:45
virtual void SetMarkerStyle(Style_t mstyle=1)
Set the marker style.
Definition TAttMarker.h:41
The Canvas class.
Definition TCanvas.h:23
TVirtualPad * cd(Int_t subpadnumber=0) override
Set current canvas & pad.
Definition TCanvas.cxx:717
A TGraph is an object made of two arrays X and Y with npoints each.
Definition TGraph.h:41
To draw Mathematical Formula.
Definition TLatex.h:20
This class displays a legend box (TPaveText) containing several legend entries.
Definition TLegend.h:23
void Divide(Int_t nx=1, Int_t ny=1, Float_t xmargin=0.01, Float_t ymargin=0.01, Int_t color=0) override
Automatic pad generation by division.
Definition TPad.cxx:1287
void SetOptStat(Int_t stat=1)
The type of information printed in the histogram statistics box can be selected via the parameter mod...
Definition TStyle.cxx:1641
void EnableImplicitMT(UInt_t numthreads=0)
Enable ROOT's implicit multi-threading for all objects and methods that provide an internal paralleli...
Definition TROOT.cxx:544