49using ModelTokensPair = std::pair<std::unique_ptr<ROOT::RNTupleModel>, std::vector<ROOT::RFieldToken>>;
56 DataProduct(std::size_t i,
const void *
a) : index(i), address(
a) {}
61 std::unique_ptr<TFile> fFile;
65 FileService(std::string_view url, std::string_view options =
"")
67 fFile.reset(
TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
71 TFile &GetFile() {
return *fFile; }
72 std::mutex &GetMutex() {
return fMutex; }
78 virtual ~Outputter() =
default;
80 virtual void InitSlot(
unsigned slot) = 0;
81 virtual void Fill(
unsigned slot,
const std::vector<DataProduct> &products) = 0;
85class ParallelOutputter final :
public Outputter {
86 FileService &fFileService;
87 std::unique_ptr<ROOT::RNTupleParallelWriter> fParallelWriter;
88 std::vector<ROOT::RFieldToken> fTokens;
91 std::shared_ptr<ROOT::RNTupleFillContext> fillContext;
92 std::unique_ptr<ROOT::Detail::RRawPtrWriteEntry> entry;
94 std::vector<SlotData> fSlots;
97 ParallelOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
98 const ROOT::RNTupleWriteOptions &options)
99 : fFileService(fileService), fTokens(std::move(modelTokens.second))
101 auto &model = modelTokens.first;
103 std::lock_guard
g(fileService.GetMutex());
108 void InitSlot(
unsigned slot)
final
110 if (slot >= fSlots.size()) {
111 fSlots.resize(slot + 1);
114 fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
115 fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateRawPtrWriteEntry();
118 void Fill(
unsigned slot,
const std::vector<DataProduct> &products)
final
120 assert(slot < fSlots.size());
121 auto &fillContext = *fSlots[slot].fillContext;
122 auto &entry = *fSlots[slot].entry;
125 for (
auto &&product : products) {
130 ROOT::RNTupleFillStatus status;
131 fillContext.FillNoFlush(entry, status);
136 fillContext.FlushColumns();
140 std::lock_guard
g(fFileService.GetMutex());
141 fillContext.FlushCluster();
151class SerializingOutputter final :
public Outputter {
152 FileService &fFileService;
153 std::unique_ptr<ROOT::RNTupleWriter> fWriter;
154 std::mutex fWriterMutex;
155 std::vector<ROOT::RFieldToken> fTokens;
158 std::unique_ptr<ROOT::Detail::RRawPtrWriteEntry> entry;
160 std::vector<SlotData> fSlots;
163 SerializingOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
164 const ROOT::RNTupleWriteOptions &options)
165 : fFileService(fileService), fTokens(std::move(modelTokens.second))
167 auto &model = modelTokens.first;
169 std::lock_guard
g(fileService.GetMutex());
173 void InitSlot(
unsigned slot)
final
175 if (slot >= fSlots.size()) {
176 fSlots.resize(slot + 1);
179 fSlots[slot].entry = fWriter->GetModel().CreateRawPtrWriteEntry();
182 void Fill(
unsigned slot,
const std::vector<DataProduct> &products)
final
184 assert(slot < fSlots.size());
185 auto &entry = *fSlots[slot].entry;
188 for (
auto &&product : products) {
195 std::lock_guard
g(fWriterMutex);
196 ROOT::RNTupleFillStatus status;
197 fWriter->FillNoFlush(entry, status);
202 fWriter->FlushColumns();
206 std::lock_guard
g(fFileService.GetMutex());
207 fWriter->FlushCluster();
224struct ChargedTrack :
public Track {
231 std::vector<ChargedTrack> electrons;
232 std::vector<Track> photons;
233 std::vector<ChargedTrack> muons;
237ModelTokensPair CreateEventModel()
242 std::vector<ROOT::RFieldToken> tokens;
245 tokens.push_back(model->GetToken(
"eventId"));
247 model->MakeField<
decltype(Event::runId)>(
"runId");
248 tokens.push_back(model->GetToken(
"runId"));
250 model->MakeField<
decltype(Event::electrons)>(
"electrons");
251 tokens.push_back(model->GetToken(
"electrons"));
253 model->MakeField<
decltype(Event::photons)>(
"photons");
254 tokens.push_back(model->GetToken(
"photons"));
256 model->MakeField<
decltype(Event::muons)>(
"muons");
257 tokens.push_back(model->GetToken(
"muons"));
259 return {std::move(model), std::move(tokens)};
263std::vector<DataProduct> CreateEventDataProducts(
Event &event)
265 std::vector<DataProduct> products;
267 products.emplace_back(0, &event.
eventId);
268 products.emplace_back(1, &event.runId);
269 products.emplace_back(2, &event.electrons);
270 products.emplace_back(3, &event.photons);
271 products.emplace_back(4, &event.muons);
278 std::uint32_t nEvents;
282ModelTokensPair CreateRunModel()
287 std::vector<ROOT::RFieldToken> tokens;
289 model->MakeField<
decltype(Run::runId)>(
"runId");
290 tokens.push_back(model->GetToken(
"runId"));
292 model->MakeField<
decltype(Run::nEvents)>(
"nEvents");
293 tokens.push_back(model->GetToken(
"nEvents"));
295 return {std::move(model), std::move(tokens)};
299std::vector<DataProduct> CreateRunDataProducts(Run &run)
301 std::vector<DataProduct> products;
303 products.emplace_back(0, &run.runId);
304 products.emplace_back(1, &run.nEvents);
308constexpr unsigned kNRunsPerThread = 100;
309constexpr unsigned kMeanNEventsPerRun = 400;
310constexpr unsigned kStddevNEventsPerRun = 100;
311constexpr unsigned kMeanNTracks = 5;
313void ProcessRunsAndEvents(
unsigned threadId, Outputter &eventOutputter, Outputter &runOutputter)
315 std::mt19937 gen(threadId);
316 std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
317 std::poisson_distribution<> nTracksDist(kMeanNTracks);
318 std::uniform_real_distribution<float> floatDist;
320 for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
321 double nEventsD = nEventsDist(gen);
322 std::uint32_t nEvents = 0;
324 nEvents =
static_cast<std::uint32_t
>(nEventsD);
330 auto eventProducts = CreateEventDataProducts(event);
331 for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
332 event.eventId = eventId;
335 event.electrons.resize(nTracksDist(gen));
336 for (
auto &electron : event.electrons) {
337 electron.eta = floatDist(gen);
338 electron.mass = 0.511 ;
339 electron.phi = floatDist(gen);
340 electron.pt = floatDist(gen);
341 electron.charge = (gen() % 2 ? 1 : -1);
343 event.photons.resize(nTracksDist(gen));
344 for (
auto &photon : event.photons) {
345 photon.eta = floatDist(gen);
347 photon.phi = floatDist(gen);
348 photon.pt = floatDist(gen);
350 event.muons.resize(nTracksDist(gen));
351 for (
auto &muon : event.muons) {
352 muon.eta = floatDist(gen);
353 muon.mass = 105.658 ;
354 muon.phi = floatDist(gen);
355 muon.pt = floatDist(gen);
356 muon.charge = (gen() % 2 ? 1 : -1);
359 eventOutputter.Fill(threadId, eventProducts);
365 run.nEvents = nEvents;
367 auto runProducts = CreateRunDataProducts(run);
368 runOutputter.Fill(threadId, runProducts);
372constexpr unsigned kNThreads = 4;
374void ntpl014_framework()
376 FileService fileService(
"ntpl014_framework.root",
"RECREATE");
383 ParallelOutputter eventOutputter(CreateEventModel(), fileService,
"Events", options);
389 SerializingOutputter runOutputter(CreateRunModel(), fileService,
"Runs", options);
392 for (
unsigned i = 0; i < kNThreads; i++) {
393 eventOutputter.InitSlot(i);
394 runOutputter.InitSlot(i);
397 std::vector<std::thread> threads;
398 for (
unsigned i = 0; i < kNThreads; i++) {
399 threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
401 for (
unsigned i = 0; i < kNThreads; i++) {
bool ShouldFlushCluster() const
Return true if the caller should call FlushCluster.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an RNTuple to the existing file.
Common user-tunable settings for storing RNTuples.
void SetUseBufferedWrite(bool val)
void SetApproxZippedClusterSize(std::size_t val)
static std::unique_ptr< RNTupleWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter that writes into an existing TFile or TDirectory, without overwriting its co...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
double product(DoubleArray factors, std::size_t nFactors)
void Fill(float *output, float value, int size)