Please note that this tutorial has very simplified versions of classes that could be found in a framework, such as DataProduct, FileService, ParallelOutputter, and SerializingOutputter. They try to mimick the usage in a framework (for example, Outputters are agnostic of the data written, which is encapsulated in std::vector<DataProduct>), but are not meant for production usage!
Also note that this tutorial uses std::thread and std::mutex directly instead of a task scheduling library such as Threading Building Blocks (TBB). For that reason, turning on ROOT's implicit multithreading (IMT) would not be very efficient with the simplified code in this tutorial because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently in the critical section by executing its tasks. If that is wanted, the framework should use synchronization methods provided by TBB directly (which goes beyond the scope of this tutorial).
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <random>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>
using ModelTokensPair = std::pair<std::unique_ptr<RNTupleModel>, std::vector<REntry::RFieldToken>>;
struct DataProduct {
void *address;
DataProduct(std::size_t i,
void *
a) :
index(i), address(
a) {}
};
class FileService {
std::unique_ptr<TFile> fFile;
std::mutex fMutex;
public:
FileService(std::string_view url, std::string_view options = "")
{
fFile.reset(
TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
}
TFile &GetFile() {
return *fFile; }
std::mutex &GetMutex() { return fMutex; }
};
class Outputter {
public:
virtual ~Outputter() = default;
virtual void InitSlot(unsigned slot) = 0;
virtual void Fill(unsigned slot, const std::vector<DataProduct> &products) = 0;
};
class ParallelOutputter final : public Outputter {
FileService &fFileService;
std::unique_ptr<RNTupleParallelWriter> fParallelWriter;
std::vector<REntry::RFieldToken> fTokens;
struct SlotData {
std::shared_ptr<RNTupleFillContext> fillContext;
std::unique_ptr<REntry> entry;
};
std::vector<SlotData> fSlots;
public:
ParallelOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
const RNTupleWriteOptions &options)
: fFileService(fileService), fTokens(std::move(modelTokens.second))
{
auto &model = modelTokens.first;
std::lock_guard
g(fileService.GetMutex());
fParallelWriter = RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options);
}
void InitSlot(unsigned slot) final
{
if (slot >= fSlots.size()) {
fSlots.resize(slot + 1);
}
fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateBareEntry();
}
void Fill(unsigned slot, const std::vector<DataProduct> &products) final
{
assert(slot < fSlots.size());
auto &fillContext = *fSlots[slot].fillContext;
auto &entry = *fSlots[slot].entry;
for (auto &&product : products) {
}
RNTupleFillStatus status;
fillContext.FillNoFlush(entry, status);
if (status.ShouldFlushCluster()) {
fillContext.FlushColumns();
{
std::lock_guard
g(fFileService.GetMutex());
fillContext.FlushCluster();
}
}
}
};
class SerializingOutputter final : public Outputter {
FileService &fFileService;
std::unique_ptr<RNTupleWriter> fWriter;
std::mutex fWriterMutex;
std::vector<REntry::RFieldToken> fTokens;
struct SlotData {
std::unique_ptr<REntry> entry;
};
std::vector<SlotData> fSlots;
public:
SerializingOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
const RNTupleWriteOptions &options)
: fFileService(fileService), fTokens(std::move(modelTokens.second))
{
auto &model = modelTokens.first;
std::lock_guard
g(fileService.GetMutex());
fWriter = RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options);
}
void InitSlot(unsigned slot) final
{
if (slot >= fSlots.size()) {
fSlots.resize(slot + 1);
}
fSlots[slot].entry = fWriter->GetModel().CreateBareEntry();
}
void Fill(unsigned slot, const std::vector<DataProduct> &products) final
{
assert(slot < fSlots.size());
auto &entry = *fSlots[slot].entry;
for (auto &&product : products) {
}
{
std::lock_guard
g(fWriterMutex);
RNTupleFillStatus status;
fWriter->FillNoFlush(entry, status);
if (status.ShouldFlushCluster()) {
fWriter->FlushColumns();
{
std::lock_guard
g(fFileService.GetMutex());
fWriter->FlushCluster();
}
}
}
}
};
float eta;
float mass;
float phi;
};
struct ChargedTrack :
public Track {
std::int8_t charge;
};
std::uint32_t runId;
std::vector<ChargedTrack> electrons;
std::vector<Track> photons;
std::vector<ChargedTrack> muons;
};
ModelTokensPair CreateEventModel()
{
auto model = RNTupleModel::CreateBare();
std::vector<REntry::RFieldToken> tokens;
tokens.push_back(model->GetToken("eventId"));
model->MakeField<decltype(Event::runId)>("runId");
tokens.push_back(model->GetToken("runId"));
model->MakeField<decltype(Event::electrons)>("electrons");
tokens.push_back(model->GetToken("electrons"));
model->MakeField<decltype(Event::photons)>("photons");
tokens.push_back(model->GetToken("photons"));
model->MakeField<decltype(Event::muons)>("muons");
tokens.push_back(model->GetToken("muons"));
return {std::move(model), std::move(tokens)};
}
std::vector<DataProduct> CreateEventDataProducts(
Event &event)
{
std::vector<DataProduct> products;
products.emplace_back(0, &event.
eventId);
products.emplace_back(1, &event.runId);
products.emplace_back(2, &event.electrons);
products.emplace_back(3, &event.photons);
products.emplace_back(4, &event.muons);
return products;
}
struct Run {
std::uint32_t runId;
std::uint32_t nEvents;
};
ModelTokensPair CreateRunModel()
{
auto model = RNTupleModel::CreateBare();
std::vector<REntry::RFieldToken> tokens;
model->MakeField<decltype(Run::runId)>("runId");
tokens.push_back(model->GetToken("runId"));
model->MakeField<decltype(Run::nEvents)>("nEvents");
tokens.push_back(model->GetToken("nEvents"));
return {std::move(model), std::move(tokens)};
}
std::vector<DataProduct> CreateRunDataProducts(Run &run)
{
std::vector<DataProduct> products;
products.emplace_back(0, &run.runId);
products.emplace_back(1, &run.nEvents);
return products;
}
constexpr unsigned kNRunsPerThread = 100;
constexpr unsigned kMeanNEventsPerRun = 400;
constexpr unsigned kStddevNEventsPerRun = 100;
constexpr unsigned kMeanNTracks = 5;
void ProcessRunsAndEvents(unsigned threadId, Outputter &eventOutputter, Outputter &runOutputter)
{
std::mt19937 gen(threadId);
std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
std::poisson_distribution<> nTracksDist(kMeanNTracks);
std::uniform_real_distribution<float> floatDist;
for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
double nEventsD = nEventsDist(gen);
std::uint32_t nEvents = 0;
if (nEventsD > 0) {
nEvents = static_cast<std::uint32_t>(nEventsD);
}
event.runId = runId;
auto eventProducts = CreateEventDataProducts(event);
for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
event.eventId = eventId;
event.electrons.resize(nTracksDist(gen));
for (auto &electron : event.electrons) {
electron.eta = floatDist(gen);
electron.mass = 0.511 ;
electron.phi = floatDist(gen);
electron.pt = floatDist(gen);
electron.charge = (gen() % 2 ? 1 : -1);
}
event.photons.resize(nTracksDist(gen));
for (auto &photon : event.photons) {
photon.eta = floatDist(gen);
photon.mass = 0;
photon.phi = floatDist(gen);
photon.pt = floatDist(gen);
}
event.muons.resize(nTracksDist(gen));
for (auto &muon : event.muons) {
muon.eta = floatDist(gen);
muon.mass = 105.658 ;
muon.phi = floatDist(gen);
muon.pt = floatDist(gen);
muon.charge = (gen() % 2 ? 1 : -1);
}
eventOutputter.Fill(threadId, eventProducts);
}
Run run;
run.runId = runId;
run.nEvents = nEvents;
auto runProducts = CreateRunDataProducts(run);
runOutputter.Fill(threadId, runProducts);
}
}
constexpr unsigned kNThreads = 4;
void ntpl014_framework()
{
FileService fileService("ntpl014_framework.root", "RECREATE");
RNTupleWriteOptions options;
options.SetUseBufferedWrite(true);
options.SetApproxZippedClusterSize(2 * 1024 * 1024);
ParallelOutputter eventOutputter(CreateEventModel(), fileService, "Events", options);
options.SetUseBufferedWrite(true);
options.SetApproxZippedClusterSize(1024);
SerializingOutputter runOutputter(CreateRunModel(), fileService, "Runs", options);
for (unsigned i = 0; i < kNThreads; i++) {
eventOutputter.InitSlot(i);
runOutputter.InitSlot(i);
}
std::vector<std::thread> threads;
for (unsigned i = 0; i < kNThreads; i++) {
threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
}
for (unsigned i = 0; i < kNThreads; i++) {
threads[i].join();
}
}
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
The REntry is a collection of values in an ntuple corresponding to a complete row in the data set.
A context for filling entries (data) into clusters of an RNTuple.
A status object after filling an entry.
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
Common user-tunable settings for storing ntuples.
An RNTuple that gets filled with entries (data) and writes them to storage.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
double product(double const *factors, std::size_t nFactors)