Please note that this tutorial has very simplified versions of classes that could be found in a framework, such as DataProduct, FileService, ParallelOutputter, and SerializingOutputter. They try to mimick the usage in a framework (for example, Outputters are agnostic of the data written, which is encapsulated in std::vector<DataProduct>), but are not meant for production usage!
Also note that this tutorial uses std::thread and std::mutex directly instead of a task scheduling library such as Threading Building Blocks (TBB). For that reason, turning on ROOT's implicit multithreading (IMT) would not be very efficient with the simplified code in this tutorial because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently in the critical section by executing its tasks. If that is wanted, the framework should use synchronization methods provided by TBB directly (which goes beyond the scope of this tutorial).
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <random>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>
using ModelTokensPair = std::pair<std::unique_ptr<ROOT::RNTupleModel>, std::vector<ROOT::RFieldToken>>;
struct DataProduct {
std::size_t index;
const void *address;
DataProduct(std::size_t i,
const void *
a) : index(i), address(
a) {}
};
class FileService {
std::unique_ptr<TFile> fFile;
std::mutex fMutex;
public:
FileService(std::string_view url, std::string_view options = "")
{
fFile.reset(
TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
}
TFile &GetFile() { return *fFile; }
std::mutex &GetMutex() { return fMutex; }
};
class Outputter {
public:
virtual ~Outputter() = default;
virtual void InitSlot(unsigned slot) = 0;
virtual void Fill(
unsigned slot,
const std::vector<DataProduct> &products) = 0;
};
class ParallelOutputter final : public Outputter {
FileService &fFileService;
std::unique_ptr<ROOT::RNTupleParallelWriter> fParallelWriter;
std::vector<ROOT::RFieldToken> fTokens;
struct SlotData {
std::shared_ptr<ROOT::RNTupleFillContext> fillContext;
std::unique_ptr<ROOT::Detail::RRawPtrWriteEntry> entry;
};
std::vector<SlotData> fSlots;
public:
ParallelOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
const ROOT::RNTupleWriteOptions &options)
: fFileService(fileService), fTokens(std::move(modelTokens.second))
{
auto &model = modelTokens.first;
std::lock_guard
g(fileService.GetMutex());
fParallelWriter =
}
void InitSlot(unsigned slot) final
{
if (slot >= fSlots.size()) {
fSlots.resize(slot + 1);
}
fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateRawPtrWriteEntry();
}
void Fill(
unsigned slot,
const std::vector<DataProduct> &products)
final
{
assert(slot < fSlots.size());
auto &fillContext = *fSlots[slot].fillContext;
auto &entry = *fSlots[slot].entry;
for (auto &&product : products) {
}
ROOT::RNTupleFillStatus status;
fillContext.FillNoFlush(entry, status);
fillContext.FlushColumns();
{
std::lock_guard
g(fFileService.GetMutex());
fillContext.FlushCluster();
}
}
}
};
class SerializingOutputter final : public Outputter {
FileService &fFileService;
std::unique_ptr<ROOT::RNTupleWriter> fWriter;
std::mutex fWriterMutex;
std::vector<ROOT::RFieldToken> fTokens;
struct SlotData {
std::unique_ptr<ROOT::Detail::RRawPtrWriteEntry> entry;
};
std::vector<SlotData> fSlots;
public:
SerializingOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
const ROOT::RNTupleWriteOptions &options)
: fFileService(fileService), fTokens(std::move(modelTokens.second))
{
auto &model = modelTokens.first;
std::lock_guard
g(fileService.GetMutex());
}
void InitSlot(unsigned slot) final
{
if (slot >= fSlots.size()) {
fSlots.resize(slot + 1);
}
fSlots[slot].entry = fWriter->GetModel().CreateRawPtrWriteEntry();
}
void Fill(
unsigned slot,
const std::vector<DataProduct> &products)
final
{
assert(slot < fSlots.size());
auto &entry = *fSlots[slot].entry;
for (auto &&product : products) {
}
{
std::lock_guard
g(fWriterMutex);
ROOT::RNTupleFillStatus status;
fWriter->FillNoFlush(entry, status);
fWriter->FlushColumns();
{
std::lock_guard
g(fFileService.GetMutex());
fWriter->FlushCluster();
}
}
}
}
};
float eta;
float mass;
float phi;
};
struct ChargedTrack :
public Track {
std::int8_t charge;
};
std::uint32_t runId;
std::vector<ChargedTrack> electrons;
std::vector<Track> photons;
std::vector<ChargedTrack> muons;
};
ModelTokensPair CreateEventModel()
{
std::vector<ROOT::RFieldToken> tokens;
tokens.push_back(model->GetToken("eventId"));
model->MakeField<decltype(Event::runId)>("runId");
tokens.push_back(model->GetToken("runId"));
model->MakeField<decltype(Event::electrons)>("electrons");
tokens.push_back(model->GetToken("electrons"));
model->MakeField<decltype(Event::photons)>("photons");
tokens.push_back(model->GetToken("photons"));
model->MakeField<decltype(Event::muons)>("muons");
tokens.push_back(model->GetToken("muons"));
return {std::move(model), std::move(tokens)};
}
std::vector<DataProduct> CreateEventDataProducts(
Event &event)
{
std::vector<DataProduct> products;
products.emplace_back(0, &event.
eventId);
products.emplace_back(1, &event.runId);
products.emplace_back(2, &event.electrons);
products.emplace_back(3, &event.photons);
products.emplace_back(4, &event.muons);
return products;
}
struct Run {
std::uint32_t runId;
std::uint32_t nEvents;
};
ModelTokensPair CreateRunModel()
{
std::vector<ROOT::RFieldToken> tokens;
model->MakeField<decltype(Run::runId)>("runId");
tokens.push_back(model->GetToken("runId"));
model->MakeField<decltype(Run::nEvents)>("nEvents");
tokens.push_back(model->GetToken("nEvents"));
return {std::move(model), std::move(tokens)};
}
std::vector<DataProduct> CreateRunDataProducts(Run &run)
{
std::vector<DataProduct> products;
products.emplace_back(0, &run.runId);
products.emplace_back(1, &run.nEvents);
return products;
}
constexpr unsigned kNRunsPerThread = 100;
constexpr unsigned kMeanNEventsPerRun = 400;
constexpr unsigned kStddevNEventsPerRun = 100;
constexpr unsigned kMeanNTracks = 5;
void ProcessRunsAndEvents(unsigned threadId, Outputter &eventOutputter, Outputter &runOutputter)
{
std::mt19937 gen(threadId);
std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
std::poisson_distribution<> nTracksDist(kMeanNTracks);
std::uniform_real_distribution<float> floatDist;
for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
double nEventsD = nEventsDist(gen);
std::uint32_t nEvents = 0;
if (nEventsD > 0) {
nEvents = static_cast<std::uint32_t>(nEventsD);
}
event.runId = runId;
auto eventProducts = CreateEventDataProducts(event);
for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
event.eventId = eventId;
event.electrons.resize(nTracksDist(gen));
for (auto &electron : event.electrons) {
electron.eta = floatDist(gen);
electron.mass = 0.511 ;
electron.phi = floatDist(gen);
electron.pt = floatDist(gen);
electron.charge = (gen() % 2 ? 1 : -1);
}
event.photons.resize(nTracksDist(gen));
for (auto &photon : event.photons) {
photon.eta = floatDist(gen);
photon.mass = 0;
photon.phi = floatDist(gen);
photon.pt = floatDist(gen);
}
event.muons.resize(nTracksDist(gen));
for (auto &muon : event.muons) {
muon.eta = floatDist(gen);
muon.mass = 105.658 ;
muon.phi = floatDist(gen);
muon.pt = floatDist(gen);
muon.charge = (gen() % 2 ? 1 : -1);
}
eventOutputter.Fill(threadId, eventProducts);
}
Run run;
run.runId = runId;
run.nEvents = nEvents;
auto runProducts = CreateRunDataProducts(run);
runOutputter.Fill(threadId, runProducts);
}
}
constexpr unsigned kNThreads = 4;
void ntpl014_framework()
{
FileService fileService("ntpl014_framework.root", "RECREATE");
ParallelOutputter eventOutputter(CreateEventModel(), fileService, "Events", options);
SerializingOutputter runOutputter(CreateRunModel(), fileService, "Runs", options);
for (unsigned i = 0; i < kNThreads; i++) {
eventOutputter.InitSlot(i);
runOutputter.InitSlot(i);
}
std::vector<std::thread> threads;
for (unsigned i = 0; i < kNThreads; i++) {
threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
}
for (unsigned i = 0; i < kNThreads; i++) {
threads[i].join();
}
}
bool ShouldFlushCluster() const
Return true if the caller should call FlushCluster.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an RNTuple to the existing file.
Common user-tunable settings for storing RNTuples.
void SetUseBufferedWrite(bool val)
void SetApproxZippedClusterSize(std::size_t val)
static std::unique_ptr< RNTupleWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter that writes into an existing TFile or TDirectory, without overwriting its co...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
double product(DoubleArray factors, std::size_t nFactors)
void Fill(float *output, float value, int size)