In this example, two threads share the work as follows: the first thread processes all the even entries numbers and the second thread all the odd entry numbers. The second thread works twice as slow as the first one.
As a result, the threads need the same clusters and pages but at different points in time. With the naive way of using the reader, this read pattern will result in cache thrashing.
Using the "active entry token" API, on the other hand, the reader will be informed about which entries need to be kept in the caches and cache thrashing is prevented.
#include <array>
#include <atomic>
#include <chrono>
#include <exception>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std::chrono_literals;
constexpr char const *kNTupleFileName = "ntpl017_shared_reader.root";
struct Point2D {
float fX;
float fY;
};
enum class EReadMode {
kNaive,
kInformed
};
std::ostream &
operator<<(std::ostream &os,
const EReadMode &
e)
{
case EReadMode::kNaive: os << "naive"; break;
case EReadMode::kInformed: os << "informed"; break;
default: os << "???";
}
return os;
}
static std::atomic<int> gNEntriesDone;
static std::atomic<int> gThreadId;
void ProcessEntries(
ROOT::RNTupleReader &reader, EReadMode readMode,
const std::chrono::microseconds &usPerEvent,
std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages,
std::vector<int> &nClusters, std::vector<int> &nPages)
{
static std::mutex gLock;
const int threadId = gThreadId++;
for (
unsigned int i = threadId; i < reader.
GetNEntries(); i += 2) {
{
std::lock_guard<std::mutex> guard(gLock);
switch (readMode) {
case EReadMode::kInformed: token.SetEntryNumber(i); break;
case EReadMode::kNaive: break;
default: std::terminate();
}
}
std::this_thread::sleep_for(usPerEvent);
const int entriesDone = gNEntriesDone++;
sumLoadedClusters.at(entriesDone) =
sumUnsealedPages[entriesDone] =
nClusters[entriesDone] =
nPages[entriesDone] =
}
}
void Read(EReadMode readMode, std::vector<int> &sumLoadedClusters, std::vector<int> &sumUnsealedPages,
std::vector<int> &nClusters, std::vector<int> &nPages)
{
gNEntriesDone = 0;
gThreadId = 0;
sumLoadedClusters.resize(
N);
sumUnsealedPages.resize(
N);
std::array<std::thread, 2> threads;
threads[0] = std::thread(ProcessEntries, std::ref(*reader), readMode, 100us, std::ref(sumLoadedClusters),
std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages));
threads[1] = std::thread(ProcessEntries, std::ref(*reader), readMode, 200us, std::ref(sumLoadedClusters),
std::ref(sumUnsealedPages), std::ref(nClusters), std::ref(nPages));
for (auto &t : threads) {
t.join();
}
std::cout << "Reading in mode '" << readMode << "':" << std::endl;
std::cout << "===========================" << std::endl;
std::cout << "===========================" << std::endl << std::endl;
}
void Write()
{
auto ptrPoint = model->MakeField<Point2D>("point");
for (int i = 0; i < 1000; ++i) {
if (i % 100 == 0)
auto prng = std::make_unique<TRandom3>();
prng->SetSeed();
ptrPoint->fX = prng->Rndm(1);
ptrPoint->fY = prng->Rndm(1);
}
}
TGraph *GetGraph(
const std::vector<int> &counts)
{
for (unsigned int i = 0; i < counts.size(); ++i) {
graph->SetPoint(i, i, counts[i]);
}
graph->GetXaxis()->SetTitle("Number of processed entries");
return graph;
}
void ntpl017_shared_reader()
{
Write();
std::vector<int> sumLoadedClusters;
std::vector<int> sumUnsealedPages;
std::vector<int> nClusters;
std::vector<int> nPages;
TCanvas *canvas =
new TCanvas(
"",
"Shared Reader Example", 200, 10, 1500, 1000);
Read(EReadMode::kNaive, sumLoadedClusters, sumUnsealedPages, nClusters, nPages);
auto graph1 = GetGraph(sumUnsealedPages);
graph1->SetLineColor(
kRed);
graph1->Draw("AL");
auto graph2 = GetGraph(sumLoadedClusters);
graph2->SetLineColor(
kBlue);
graph2->Draw("SAME L");
auto legend =
new TLegend(0.125, 0.725, 0.625, 0.875);
legend->AddEntry(graph1, "Number of decompressed pages", "l");
legend->AddEntry(graph2, "Number of loaded clusters", "l");
legend->Draw();
auto graph3 = GetGraph(nPages);
graph3->SetMarkerColor(
kRed);
graph3->GetYaxis()->SetNdivisions(8);
graph3->GetYaxis()->SetRangeUser(-0.5, 14);
graph3->Draw("AP");
auto graph4 = GetGraph(nClusters);
graph4->SetMarkerColor(
kBlue);
graph4->Draw("SAME P");
legend =
new TLegend(0.35, 0.725, 0.85, 0.875);
legend->AddEntry(graph3, "Number of currently cached pages", "p");
legend->AddEntry(graph4, "Number of currently cached clusters", "p");
legend->Draw();
Read(EReadMode::kInformed, sumLoadedClusters, sumUnsealedPages, nClusters, nPages);
auto graph5 = GetGraph(sumUnsealedPages);
graph5->SetLineColor(
kRed);
graph5->Draw("AL");
auto graph6 = GetGraph(sumLoadedClusters);
graph6->SetLineColor(
kBlue);
graph6->Draw("SAME L");
auto graph7 = GetGraph(nPages);
graph7->SetMarkerColor(
kRed);
graph7->GetYaxis()->SetNdivisions(8);
graph7->GetYaxis()->SetRangeUser(-0.5, 14);
graph7->Draw("AP");
auto graph8 = GetGraph(nClusters);
graph8->SetMarkerColor(
kBlue);
graph8->Draw("SAME P");
}
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
const RNTuplePerfCounter * GetCounter(std::string_view name) const
Searches this object and all the observed sub metrics.
void Print(std::ostream &output, const std::string &prefix="") const
virtual std::int64_t GetValueAsInt() const =0
static std::unique_ptr< RNTupleModel > Create()
Reads RNTuple data from storage.
static std::unique_ptr< RNTupleReader > Open(std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Open an RNTuple for reading.
const Experimental::Detail::RNTupleMetrics & GetMetrics() const
ROOT::NTupleSize_t GetNEntries() const
Returns the number of entries in this RNTuple.
RActiveEntryToken CreateActiveEntryToken()
Create a new active entry token, which will not be bound to any entry number initially.
void EnableMetrics()
Enable performance measurements (decompression time, bytes read from storage, etc....
void LoadEntry(ROOT::NTupleSize_t index)
Fills the default entry of the model.
static std::unique_ptr< RNTupleWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter backed by storage, overwriting it if one with the same URI exists.
virtual void SetTextAlign(Short_t align=11)
Set the text alignment.
TVirtualPad * cd(Int_t subpadnumber=0) override
Set current canvas & pad.
TLatex * DrawLatexNDC(Double_t x, Double_t y, const char *text)
void Divide(Int_t nx=1, Int_t ny=1, Float_t xmargin=0.01, Float_t ymargin=0.01, Int_t color=0) override
Automatic pad generation by division.
void EnableImplicitMT(UInt_t numthreads=0)
Enable ROOT's implicit multi-threading for all objects and methods that provide an internal paralleli...