Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl014_framework.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4///
5/// Example of framework usage for writing RNTuples:
6/// 1. Creation of (bare) RNTupleModels and RFieldTokens.
7/// 2. Creation of RNTupleWriter and RNTupleParallelWriter when appending to a single TFile.
8/// 3. Creation of RNTupleFillContext and (bare) REntry per thread, and usage of BindRawPtr.
9/// 4. Usage of FillNoFlush(), RNTupleFillStatus::ShouldFlushCluster(), FlushColumns(), and FlushCluster().
10///
11/// Please note that this tutorial has very simplified versions of classes that could be found in a framework, such as
12/// DataProduct, FileService, ParallelOutputter, and SerializingOutputter. They try to mimick the usage in a framework
13/// (for example, Outputters are agnostic of the data written, which is encapsulated in std::vector<DataProduct>), but
14/// are not meant for production usage!
15///
16/// Also note that this tutorial uses std::thread and std::mutex directly instead of a task scheduling library such as
17/// Threading Building Blocks (TBB). For that reason, turning on ROOT's implicit multithreading (IMT) would not be very
18/// efficient with the simplified code in this tutorial because a thread blocking to acquire a std::mutex cannot "help"
19/// the other thread that is currently in the critical section by executing its tasks. If that is wanted, the framework
20/// should use synchronization methods provided by TBB directly (which goes beyond the scope of this tutorial).
21///
22/// \macro_code
23///
24/// \date September 2024
25/// \author The ROOT Team
26
27// NOTE: The RNTuple classes are experimental at this point.
28// Functionality and interface are still subject to changes.
29
30#include <ROOT/REntry.hxx>
33#include <ROOT/RNTupleModel.hxx>
37
38#include <cassert>
39#include <cstddef> // for std::size_t
40#include <cstdint> // for std::uint32_t
41#include <functional> // for std::ref
42#include <memory>
43#include <mutex>
44#include <random>
45#include <string>
46#include <string_view>
47#include <thread>
48#include <utility> // for std::pair
49#include <vector>
50
51// Import classes from Experimental namespace for the time being
59
60using ModelTokensPair = std::pair<std::unique_ptr<RNTupleModel>, std::vector<REntry::RFieldToken>>;
61
62// A DataProduct associates an arbitrary address to an index in the model.
63struct DataProduct {
64 std::size_t index;
65 void *address;
66
67 DataProduct(std::size_t i, void *a) : index(i), address(a) {}
68};
69
70// The FileService opens a TFile and provides synchronization.
71class FileService {
72 std::unique_ptr<TFile> fFile;
73 std::mutex fMutex;
74
75public:
76 FileService(std::string_view url, std::string_view options = "")
77 {
78 fFile.reset(TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
79 // The file is automatically closed when destructing the std::unique_ptr.
80 }
81
82 TFile &GetFile() { return *fFile; }
83 std::mutex &GetMutex() { return fMutex; }
84};
85
86// An Outputter provides the interface to fill DataProducts into an RNTuple.
87class Outputter {
88public:
89 virtual ~Outputter() = default;
90
91 virtual void InitSlot(unsigned slot) = 0;
92 virtual void Fill(unsigned slot, const std::vector<DataProduct> &products) = 0;
93};
94
95// A ParallelOutputter uses an RNTupleParallelWriter to append an RNTuple to a TFile.
96class ParallelOutputter final : public Outputter {
97 FileService &fFileService;
98 std::unique_ptr<RNTupleParallelWriter> fParallelWriter;
99 std::vector<REntry::RFieldToken> fTokens;
100
101 struct SlotData {
102 std::shared_ptr<RNTupleFillContext> fillContext;
103 std::unique_ptr<REntry> entry;
104 };
105 std::vector<SlotData> fSlots;
106
107public:
108 ParallelOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
109 const RNTupleWriteOptions &options)
110 : fFileService(fileService), fTokens(std::move(modelTokens.second))
111 {
112 auto &model = modelTokens.first;
113
114 std::lock_guard g(fileService.GetMutex());
115 fParallelWriter = RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options);
116 }
117
118 void InitSlot(unsigned slot) final
119 {
120 if (slot >= fSlots.size()) {
121 fSlots.resize(slot + 1);
122 }
123 // Create an RNTupleFillContext and REntry that are used for all fills from this slot. We recommend creating a
124 // bare entry if binding all fields.
125 fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
126 fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateBareEntry();
127 }
128
129 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
130 {
131 assert(slot < fSlots.size());
132 auto &fillContext = *fSlots[slot].fillContext;
133 auto &entry = *fSlots[slot].entry;
134
135 // Use the field tokens to bind the products' raw pointers.
136 for (auto &&product : products) {
137 entry.BindRawPtr(fTokens[product.index], product.address);
138 }
139
140 // Fill the entry without triggering an implicit flush.
141 RNTupleFillStatus status;
142 fillContext.FillNoFlush(entry, status);
143 if (status.ShouldFlushCluster()) {
144 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
145 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
146 // (A framework may of course also decide to flush more often.)
147 fillContext.FlushColumns();
148
149 {
150 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
151 std::lock_guard g(fFileService.GetMutex());
152 fillContext.FlushCluster();
153 }
154 }
155 }
156};
157
158// A SerializingOutputter uses a sequential RNTupleWriter to append an RNTuple to a TFile and a std::mutex to
159// synchronize multiple threads. Note that ROOT's implicit multithreading would not be very efficient with this
160// implementation because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently
161// in the critical section by executing its tasks. See also the note at the top of the file.
162class SerializingOutputter final : public Outputter {
163 FileService &fFileService;
164 std::unique_ptr<RNTupleWriter> fWriter;
165 std::mutex fWriterMutex;
166 std::vector<REntry::RFieldToken> fTokens;
167
168 struct SlotData {
169 std::unique_ptr<REntry> entry;
170 };
171 std::vector<SlotData> fSlots;
172
173public:
174 SerializingOutputter(ModelTokensPair modelTokens, FileService &fileService, std::string_view ntupleName,
175 const RNTupleWriteOptions &options)
176 : fFileService(fileService), fTokens(std::move(modelTokens.second))
177 {
178 auto &model = modelTokens.first;
179
180 std::lock_guard g(fileService.GetMutex());
181 fWriter = RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options);
182 }
183
184 void InitSlot(unsigned slot) final
185 {
186 if (slot >= fSlots.size()) {
187 fSlots.resize(slot + 1);
188 }
189 // Create an REntry that is used for all fills from this slot. We recommend creating a bare entry if binding all
190 // fields.
191 fSlots[slot].entry = fWriter->GetModel().CreateBareEntry();
192 }
193
194 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
195 {
196 assert(slot < fSlots.size());
197 auto &entry = *fSlots[slot].entry;
198
199 // Use the field tokens to bind the products' raw pointers.
200 for (auto &&product : products) {
201 entry.BindRawPtr(fTokens[product.index], product.address);
202 }
203
204 {
205 // Fill the entry without triggering an implicit flush. This requires synchronization with other threads using
206 // the same writer, but not (yet) with the underlying TFile.
207 std::lock_guard g(fWriterMutex);
208 RNTupleFillStatus status;
209 fWriter->FillNoFlush(entry, status);
210 if (status.ShouldFlushCluster()) {
211 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
212 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
213 // (A framework may of course also decide to flush more often.)
214 fWriter->FlushColumns();
215
216 {
217 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
218 std::lock_guard g(fFileService.GetMutex());
219 fWriter->FlushCluster();
220 }
221 }
222 }
223 }
224};
225
226// === END OF TUTORIAL FRAMEWORK CODE ===
227
228// Simple structs to store events
229struct Track {
230 float eta;
231 float mass;
232 float pt;
233 float phi;
234};
235
236struct ChargedTrack : public Track {
237 std::int8_t charge;
238};
239
240struct Event {
241 std::uint32_t eventId;
242 std::uint32_t runId;
243 std::vector<ChargedTrack> electrons;
244 std::vector<Track> photons;
245 std::vector<ChargedTrack> muons;
246};
247
248// RNTupleModel for Events; in a real framework, this would likely be dynamic.
249ModelTokensPair CreateEventModel()
250{
251 // We recommend creating a bare model if the default entry is not used.
252 auto model = RNTupleModel::CreateBare();
253 // For more efficient access, also create field tokens.
254 std::vector<REntry::RFieldToken> tokens;
255
256 model->MakeField<decltype(Event::eventId)>("eventId");
257 tokens.push_back(model->GetToken("eventId"));
258
259 model->MakeField<decltype(Event::runId)>("runId");
260 tokens.push_back(model->GetToken("runId"));
261
262 model->MakeField<decltype(Event::electrons)>("electrons");
263 tokens.push_back(model->GetToken("electrons"));
264
265 model->MakeField<decltype(Event::photons)>("photons");
266 tokens.push_back(model->GetToken("photons"));
267
268 model->MakeField<decltype(Event::muons)>("muons");
269 tokens.push_back(model->GetToken("muons"));
270
271 return {std::move(model), std::move(tokens)};
272}
273
274// DataProducts with addresses that point into the Event object.
275std::vector<DataProduct> CreateEventDataProducts(Event &event)
276{
277 std::vector<DataProduct> products;
278 // The indices have to match the order of std::vector<REntry::RFieldToken> above.
279 products.emplace_back(0, &event.eventId);
280 products.emplace_back(1, &event.runId);
281 products.emplace_back(2, &event.electrons);
282 products.emplace_back(3, &event.photons);
283 products.emplace_back(4, &event.muons);
284 return products;
285}
286
287// Simple struct to store runs
288struct Run {
289 std::uint32_t runId;
290 std::uint32_t nEvents;
291};
292
293// RNTupleModel for Runs; in a real framework, this would likely be dynamic.
294ModelTokensPair CreateRunModel()
295{
296 // We recommend creating a bare model if the default entry is not used.
297 auto model = RNTupleModel::CreateBare();
298 // For more efficient access, also create field tokens.
299 std::vector<REntry::RFieldToken> tokens;
300
301 model->MakeField<decltype(Run::runId)>("runId");
302 tokens.push_back(model->GetToken("runId"));
303
304 model->MakeField<decltype(Run::nEvents)>("nEvents");
305 tokens.push_back(model->GetToken("nEvents"));
306
307 return {std::move(model), std::move(tokens)};
308}
309
310// DataProducts with addresses that point into the Run object.
311std::vector<DataProduct> CreateRunDataProducts(Run &run)
312{
313 std::vector<DataProduct> products;
314 // The indices have to match the order of std::vector<REntry::RFieldToken> above.
315 products.emplace_back(0, &run.runId);
316 products.emplace_back(1, &run.nEvents);
317 return products;
318}
319
320constexpr unsigned kNRunsPerThread = 100;
321constexpr unsigned kMeanNEventsPerRun = 400;
322constexpr unsigned kStddevNEventsPerRun = 100;
323constexpr unsigned kMeanNTracks = 5;
324
325void ProcessRunsAndEvents(unsigned threadId, Outputter &eventOutputter, Outputter &runOutputter)
326{
327 std::mt19937 gen(threadId);
328 std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
329 std::poisson_distribution<> nTracksDist(kMeanNTracks);
330 std::uniform_real_distribution<float> floatDist;
331
332 for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
333 double nEventsD = nEventsDist(gen);
334 std::uint32_t nEvents = 0;
335 if (nEventsD > 0) {
336 nEvents = static_cast<std::uint32_t>(nEventsD);
337 }
338
339 // Process events, reusing a single Event object.
340 Event event;
341 event.runId = runId;
342 auto eventProducts = CreateEventDataProducts(event);
343 for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
344 event.eventId = eventId;
345
346 // Produce some data; eta, phi, and pt are just filled with uniformly distributed data.
347 event.electrons.resize(nTracksDist(gen));
348 for (auto &electron : event.electrons) {
349 electron.eta = floatDist(gen);
350 electron.mass = 0.511 /* MeV */;
351 electron.phi = floatDist(gen);
352 electron.pt = floatDist(gen);
353 electron.charge = (gen() % 2 ? 1 : -1);
354 }
355 event.photons.resize(nTracksDist(gen));
356 for (auto &photon : event.photons) {
357 photon.eta = floatDist(gen);
358 photon.mass = 0;
359 photon.phi = floatDist(gen);
360 photon.pt = floatDist(gen);
361 }
362 event.muons.resize(nTracksDist(gen));
363 for (auto &muon : event.muons) {
364 muon.eta = floatDist(gen);
365 muon.mass = 105.658 /* MeV */;
366 muon.phi = floatDist(gen);
367 muon.pt = floatDist(gen);
368 muon.charge = (gen() % 2 ? 1 : -1);
369 }
370
371 eventOutputter.Fill(threadId, eventProducts);
372 }
373
374 // Fill the Run data.
375 Run run;
376 run.runId = runId;
377 run.nEvents = nEvents;
378
379 auto runProducts = CreateRunDataProducts(run);
380 runOutputter.Fill(threadId, runProducts);
381 }
382}
383
384constexpr unsigned kNThreads = 4;
385
386void ntpl014_framework()
387{
388 FileService fileService("ntpl014_framework.root", "RECREATE");
389
390 RNTupleWriteOptions options;
391 // Parallel writing requires buffered writing; force it on (even if it is the default).
392 options.SetUseBufferedWrite(true);
393 // For demonstration purposes, reduce the cluster size to 2 MiB.
394 options.SetApproxZippedClusterSize(2 * 1024 * 1024);
395 ParallelOutputter eventOutputter(CreateEventModel(), fileService, "Events", options);
396
397 // SerializingOutputter also relies on buffered writing; force it on (even if it is the default).
398 options.SetUseBufferedWrite(true);
399 // For demonstration purposes, reduce the cluster size for the very simple Run data to 1 KiB.
400 options.SetApproxZippedClusterSize(1024);
401 SerializingOutputter runOutputter(CreateRunModel(), fileService, "Runs", options);
402
403 // Initialize slots in the two Outputters.
404 for (unsigned i = 0; i < kNThreads; i++) {
405 eventOutputter.InitSlot(i);
406 runOutputter.InitSlot(i);
407 }
408
409 std::vector<std::thread> threads;
410 for (unsigned i = 0; i < kNThreads; i++) {
411 threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
412 }
413 for (unsigned i = 0; i < kNThreads; i++) {
414 threads[i].join();
415 }
416}
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
The REntry is a collection of values in an ntuple corresponding to a complete row in the data set.
Definition REntry.hxx:51
A context for filling entries (data) into clusters of an RNTuple.
A status object after filling an entry.
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
Common user-tunable settings for storing ntuples.
An RNTuple that gets filled with entries (data) and writes them to storage.
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4086
TPaveText * pt
double product(double const *factors, std::size_t nFactors)
Definition MathFuncs.h:93