60using ModelTokensPair = std::pair<std::unique_ptr<RNTupleModel>, std::vector<REntry::RFieldToken>>;
67 DataProduct(std::size_t i,
void *
a) :
index(i), address(
a) {}
72 std::unique_ptr<TFile> fFile;
76 FileService(std::string_view url, std::string_view options =
"")
78 fFile.reset(
TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
82 TFile &GetFile() {
return *fFile; }
83 std::mutex &GetMutex() {
return fMutex; }
89 virtual ~Outputter() =
default;
91 virtual void InitSlot(
unsigned slot) = 0;
92 virtual void Fill(
unsigned slot,
const std::vector<DataProduct> &products) = 0;
96class ParallelOutputter final :
public Outputter {
97 FileService &fFileService;
98 std::unique_ptr<RNTupleParallelWriter> fParallelWriter;
99 std::vector<REntry::RFieldToken> fTokens;
102 std::shared_ptr<RNTupleFillContext> fillContext;
103 std::unique_ptr<REntry> entry;
105 std::vector<SlotData> fSlots;
108 ParallelOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
109 const RNTupleWriteOptions &options)
110 : fFileService(fileService), fTokens(std::move(modelTokens.second))
112 auto &model = modelTokens.first;
114 std::lock_guard
g(fileService.GetMutex());
115 fParallelWriter = RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options);
118 void InitSlot(
unsigned slot)
final
120 if (slot >= fSlots.size()) {
121 fSlots.resize(slot + 1);
125 fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
126 fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateBareEntry();
129 void Fill(
unsigned slot,
const std::vector<DataProduct> &products)
final
131 assert(slot < fSlots.size());
132 auto &fillContext = *fSlots[slot].fillContext;
133 auto &entry = *fSlots[slot].entry;
136 for (
auto &&product : products) {
141 RNTupleFillStatus status;
142 fillContext.FillNoFlush(entry, status);
143 if (status.ShouldFlushCluster()) {
147 fillContext.FlushColumns();
151 std::lock_guard
g(fFileService.GetMutex());
152 fillContext.FlushCluster();
162class SerializingOutputter final :
public Outputter {
163 FileService &fFileService;
164 std::unique_ptr<RNTupleWriter> fWriter;
165 std::mutex fWriterMutex;
166 std::vector<REntry::RFieldToken> fTokens;
169 std::unique_ptr<REntry> entry;
171 std::vector<SlotData> fSlots;
174 SerializingOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
175 const RNTupleWriteOptions &options)
176 : fFileService(fileService), fTokens(std::move(modelTokens.second))
178 auto &model = modelTokens.first;
180 std::lock_guard
g(fileService.GetMutex());
181 fWriter = RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options);
184 void InitSlot(
unsigned slot)
final
186 if (slot >= fSlots.size()) {
187 fSlots.resize(slot + 1);
191 fSlots[slot].entry = fWriter->GetModel().CreateBareEntry();
194 void Fill(
unsigned slot,
const std::vector<DataProduct> &products)
final
196 assert(slot < fSlots.size());
197 auto &entry = *fSlots[slot].entry;
200 for (
auto &&product : products) {
207 std::lock_guard
g(fWriterMutex);
208 RNTupleFillStatus status;
209 fWriter->FillNoFlush(entry, status);
210 if (status.ShouldFlushCluster()) {
214 fWriter->FlushColumns();
218 std::lock_guard
g(fFileService.GetMutex());
219 fWriter->FlushCluster();
236struct ChargedTrack :
public Track {
243 std::vector<ChargedTrack> electrons;
244 std::vector<Track> photons;
245 std::vector<ChargedTrack> muons;
249ModelTokensPair CreateEventModel()
252 auto model = RNTupleModel::CreateBare();
254 std::vector<REntry::RFieldToken> tokens;
257 tokens.push_back(model->GetToken(
"eventId"));
259 model->MakeField<
decltype(Event::runId)>(
"runId");
260 tokens.push_back(model->GetToken(
"runId"));
262 model->MakeField<
decltype(Event::electrons)>(
"electrons");
263 tokens.push_back(model->GetToken(
"electrons"));
265 model->MakeField<
decltype(Event::photons)>(
"photons");
266 tokens.push_back(model->GetToken(
"photons"));
268 model->MakeField<
decltype(Event::muons)>(
"muons");
269 tokens.push_back(model->GetToken(
"muons"));
271 return {std::move(model), std::move(tokens)};
275std::vector<DataProduct> CreateEventDataProducts(
Event &event)
277 std::vector<DataProduct> products;
279 products.emplace_back(0, &event.
eventId);
280 products.emplace_back(1, &event.runId);
281 products.emplace_back(2, &event.electrons);
282 products.emplace_back(3, &event.photons);
283 products.emplace_back(4, &event.muons);
290 std::uint32_t nEvents;
294ModelTokensPair CreateRunModel()
297 auto model = RNTupleModel::CreateBare();
299 std::vector<REntry::RFieldToken> tokens;
301 model->MakeField<
decltype(Run::runId)>(
"runId");
302 tokens.push_back(model->GetToken(
"runId"));
304 model->MakeField<
decltype(Run::nEvents)>(
"nEvents");
305 tokens.push_back(model->GetToken(
"nEvents"));
307 return {std::move(model), std::move(tokens)};
311std::vector<DataProduct> CreateRunDataProducts(Run &run)
313 std::vector<DataProduct> products;
315 products.emplace_back(0, &run.runId);
316 products.emplace_back(1, &run.nEvents);
320constexpr unsigned kNRunsPerThread = 100;
321constexpr unsigned kMeanNEventsPerRun = 400;
322constexpr unsigned kStddevNEventsPerRun = 100;
323constexpr unsigned kMeanNTracks = 5;
325void ProcessRunsAndEvents(
unsigned threadId, Outputter &eventOutputter, Outputter &runOutputter)
327 std::mt19937 gen(threadId);
328 std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
329 std::poisson_distribution<> nTracksDist(kMeanNTracks);
330 std::uniform_real_distribution<float> floatDist;
332 for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
333 double nEventsD = nEventsDist(gen);
334 std::uint32_t nEvents = 0;
336 nEvents =
static_cast<std::uint32_t
>(nEventsD);
342 auto eventProducts = CreateEventDataProducts(event);
343 for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
344 event.eventId = eventId;
347 event.electrons.resize(nTracksDist(gen));
348 for (
auto &electron : event.electrons) {
349 electron.eta = floatDist(gen);
350 electron.mass = 0.511 ;
351 electron.phi = floatDist(gen);
352 electron.pt = floatDist(gen);
353 electron.charge = (gen() % 2 ? 1 : -1);
355 event.photons.resize(nTracksDist(gen));
356 for (
auto &photon : event.photons) {
357 photon.eta = floatDist(gen);
359 photon.phi = floatDist(gen);
360 photon.pt = floatDist(gen);
362 event.muons.resize(nTracksDist(gen));
363 for (
auto &muon : event.muons) {
364 muon.eta = floatDist(gen);
365 muon.mass = 105.658 ;
366 muon.phi = floatDist(gen);
367 muon.pt = floatDist(gen);
368 muon.charge = (gen() % 2 ? 1 : -1);
371 eventOutputter.Fill(threadId, eventProducts);
377 run.nEvents = nEvents;
379 auto runProducts = CreateRunDataProducts(run);
380 runOutputter.Fill(threadId, runProducts);
384constexpr unsigned kNThreads = 4;
386void ntpl014_framework()
388 FileService fileService(
"ntpl014_framework.root",
"RECREATE");
390 RNTupleWriteOptions options;
392 options.SetUseBufferedWrite(
true);
394 options.SetApproxZippedClusterSize(2 * 1024 * 1024);
395 ParallelOutputter eventOutputter(CreateEventModel(), fileService,
"Events", options);
398 options.SetUseBufferedWrite(
true);
400 options.SetApproxZippedClusterSize(1024);
401 SerializingOutputter runOutputter(CreateRunModel(), fileService,
"Runs", options);
404 for (
unsigned i = 0; i < kNThreads; i++) {
405 eventOutputter.InitSlot(i);
406 runOutputter.InitSlot(i);
409 std::vector<std::thread> threads;
410 for (
unsigned i = 0; i < kNThreads; i++) {
411 threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
413 for (
unsigned i = 0; i < kNThreads; i++) {
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
The REntry is a collection of values in an ntuple corresponding to a complete row in the data set.
A context for filling entries (data) into clusters of an RNTuple.
A status object after filling an entry.
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
Common user-tunable settings for storing ntuples.
An RNTuple that gets filled with entries (data) and writes them to storage.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
double product(double const *factors, std::size_t nFactors)