Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl014_framework.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4///
5/// Example of framework usage for writing RNTuples:
6/// 1. Creation of (bare) RNTupleModels and RFieldTokens.
7/// 2. Creation of RNTupleWriter and RNTupleParallelWriter when appending to a single TFile.
8/// 3. Creation of RNTupleFillContext and RRawPtrWriteEntry per thread, and usage of BindRawPtr.
9/// 4. Usage of FillNoFlush(), RNTupleFillStatus::ShouldFlushCluster(), FlushColumns(), and FlushCluster().
10///
11/// Please note that this tutorial has very simplified versions of classes that could be found in a framework, such as
12/// DataProduct, FileService, ParallelOutputter, and SerializingOutputter. They try to mimick the usage in a framework
13/// (for example, Outputters are agnostic of the data written, which is encapsulated in std::vector<DataProduct>), but
14/// are not meant for production usage!
15///
16/// Also note that this tutorial uses std::thread and std::mutex directly instead of a task scheduling library such as
17/// Threading Building Blocks (TBB). For that reason, turning on ROOT's implicit multithreading (IMT) would not be very
18/// efficient with the simplified code in this tutorial because a thread blocking to acquire a std::mutex cannot "help"
19/// the other thread that is currently in the critical section by executing its tasks. If that is wanted, the framework
20/// should use synchronization methods provided by TBB directly (which goes beyond the scope of this tutorial).
21///
22/// \macro_code
23///
24/// \date September 2024
25/// \author The ROOT Team
26
28#include <ROOT/RFieldToken.hxx>
31#include <ROOT/RNTupleModel.hxx>
35
36#include <cassert>
37#include <cstddef> // for std::size_t
38#include <cstdint> // for std::uint32_t
39#include <functional> // for std::ref
40#include <memory>
41#include <mutex>
42#include <random>
43#include <string>
44#include <string_view>
45#include <thread>
46#include <utility> // for std::pair
47#include <vector>
48
49using ModelTokensPair = std::pair<std::unique_ptr<ROOT::RNTupleModel>, std::vector<ROOT::RFieldToken>>;
50
51// A DataProduct associates an arbitrary address to an index in the model.
52struct DataProduct {
53 std::size_t index;
54 const void *address;
55
56 DataProduct(std::size_t i, const void *a) : index(i), address(a) {}
57};
58
59// The FileService opens a TFile and provides synchronization.
60class FileService {
61 std::unique_ptr<TFile> fFile;
62 std::mutex fMutex;
63
64public:
65 FileService(std::string_view url, std::string_view options = "")
66 {
67 fFile.reset(TFile::Open(std::string(url).c_str(), std::string(options).c_str()));
68 // The file is automatically closed when destructing the std::unique_ptr.
69 }
70
71 TFile &GetFile() { return *fFile; }
72 std::mutex &GetMutex() { return fMutex; }
73};
74
75// An Outputter provides the interface to fill DataProducts into an RNTuple.
76class Outputter {
77public:
78 virtual ~Outputter() = default;
79
80 virtual void InitSlot(unsigned slot) = 0;
81 virtual void Fill(unsigned slot, const std::vector<DataProduct> &products) = 0;
82};
83
84// A ParallelOutputter uses an RNTupleParallelWriter to append an RNTuple to a TFile.
85class ParallelOutputter final : public Outputter {
87 std::unique_ptr<ROOT::RNTupleParallelWriter> fParallelWriter;
88 std::vector<ROOT::RFieldToken> fTokens;
89
90 struct SlotData {
91 std::shared_ptr<ROOT::RNTupleFillContext> fillContext;
92 std::unique_ptr<ROOT::Detail::RRawPtrWriteEntry> entry;
93 };
94 std::vector<SlotData> fSlots;
95
96public:
98 const ROOT::RNTupleWriteOptions &options)
100 {
101 auto &model = modelTokens.first;
102
103 std::lock_guard g(fileService.GetMutex());
105 ROOT::RNTupleParallelWriter::Append(std::move(model), ntupleName, fFileService.GetFile(), options);
106 }
107
108 void InitSlot(unsigned slot) final
109 {
110 if (slot >= fSlots.size()) {
111 fSlots.resize(slot + 1);
112 }
113 // Create an RNTupleFillContext and RRawPtrWriteEntry that are used for all fills from this slot.
114 fSlots[slot].fillContext = fParallelWriter->CreateFillContext();
115 fSlots[slot].entry = fSlots[slot].fillContext->GetModel().CreateRawPtrWriteEntry();
116 }
117
118 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
119 {
120 assert(slot < fSlots.size());
121 auto &fillContext = *fSlots[slot].fillContext;
122 auto &entry = *fSlots[slot].entry;
123
124 // Use the field tokens to bind the products' raw pointers.
125 for (auto &&product : products) {
126 entry.BindRawPtr(fTokens[product.index], product.address);
127 }
128
129 // Fill the entry without triggering an implicit flush.
131 fillContext.FillNoFlush(entry, status);
132 if (status.ShouldFlushCluster()) {
133 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
134 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
135 // (A framework may of course also decide to flush more often.)
136 fillContext.FlushColumns();
137
138 {
139 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
140 std::lock_guard g(fFileService.GetMutex());
141 fillContext.FlushCluster();
142 }
143 }
144 }
145};
146
147// A SerializingOutputter uses a sequential RNTupleWriter to append an RNTuple to a TFile and a std::mutex to
148// synchronize multiple threads. Note that ROOT's implicit multithreading would not be very efficient with this
149// implementation because a thread blocking to acquire a std::mutex cannot "help" the other thread that is currently
150// in the critical section by executing its tasks. See also the note at the top of the file.
151class SerializingOutputter final : public Outputter {
153 std::unique_ptr<ROOT::RNTupleWriter> fWriter;
154 std::mutex fWriterMutex;
155 std::vector<ROOT::RFieldToken> fTokens;
156
157 struct SlotData {
158 std::unique_ptr<ROOT::Detail::RRawPtrWriteEntry> entry;
159 };
160 std::vector<SlotData> fSlots;
161
162public:
164 const ROOT::RNTupleWriteOptions &options)
166 {
167 auto &model = modelTokens.first;
168
169 std::lock_guard g(fileService.GetMutex());
170 fWriter = ROOT::RNTupleWriter::Append(std::move(model), ntupleName, fileService.GetFile(), options);
171 }
172
173 void InitSlot(unsigned slot) final
174 {
175 if (slot >= fSlots.size()) {
176 fSlots.resize(slot + 1);
177 }
178 // Create an RRawPtrWriteEntry that is used for all fills from this slot.
179 fSlots[slot].entry = fWriter->GetModel().CreateRawPtrWriteEntry();
180 }
181
182 void Fill(unsigned slot, const std::vector<DataProduct> &products) final
183 {
184 assert(slot < fSlots.size());
185 auto &entry = *fSlots[slot].entry;
186
187 // Use the field tokens to bind the products' raw pointers.
188 for (auto &&product : products) {
189 entry.BindRawPtr(fTokens[product.index], product.address);
190 }
191
192 {
193 // Fill the entry without triggering an implicit flush. This requires synchronization with other threads using
194 // the same writer, but not (yet) with the underlying TFile.
195 std::lock_guard g(fWriterMutex);
197 fWriter->FillNoFlush(entry, status);
198 if (status.ShouldFlushCluster()) {
199 // If we are asked to flush, first try to do as much work as possible outside of the critical section:
200 // FlushColumns() will flush column data and trigger compression, but not actually write to storage.
201 // (A framework may of course also decide to flush more often.)
202 fWriter->FlushColumns();
203
204 {
205 // FlushCluster() will flush data to the underlying TFile, so it requires synchronization.
206 std::lock_guard g(fFileService.GetMutex());
207 fWriter->FlushCluster();
208 }
209 }
210 }
211 }
212};
213
214// === END OF TUTORIAL FRAMEWORK CODE ===
215
216// Simple structs to store events
217struct Track {
218 float eta;
219 float mass;
220 float pt;
221 float phi;
222};
223
224struct ChargedTrack : public Track {
225 std::int8_t charge;
226};
227
228struct Event {
229 std::uint32_t eventId;
230 std::uint32_t runId;
231 std::vector<ChargedTrack> electrons;
232 std::vector<Track> photons;
233 std::vector<ChargedTrack> muons;
234};
235
236// RNTupleModel for Events; in a real framework, this would likely be dynamic.
238{
239 // We recommend creating a bare model if the default entry is not used.
240 auto model = ROOT::RNTupleModel::CreateBare();
241 // For more efficient access, also create field tokens.
242 std::vector<ROOT::RFieldToken> tokens;
243
244 model->MakeField<decltype(Event::eventId)>("eventId");
245 tokens.push_back(model->GetToken("eventId"));
246
247 model->MakeField<decltype(Event::runId)>("runId");
248 tokens.push_back(model->GetToken("runId"));
249
250 model->MakeField<decltype(Event::electrons)>("electrons");
251 tokens.push_back(model->GetToken("electrons"));
252
253 model->MakeField<decltype(Event::photons)>("photons");
254 tokens.push_back(model->GetToken("photons"));
255
256 model->MakeField<decltype(Event::muons)>("muons");
257 tokens.push_back(model->GetToken("muons"));
258
259 return {std::move(model), std::move(tokens)};
260}
261
262// DataProducts with addresses that point into the Event object.
263std::vector<DataProduct> CreateEventDataProducts(Event &event)
264{
265 std::vector<DataProduct> products;
266 // The indices have to match the order of std::vector<ROOT::RFieldToken> above.
267 products.emplace_back(0, &event.eventId);
268 products.emplace_back(1, &event.runId);
269 products.emplace_back(2, &event.electrons);
270 products.emplace_back(3, &event.photons);
271 products.emplace_back(4, &event.muons);
272 return products;
273}
274
275// Simple struct to store runs
276struct Run {
277 std::uint32_t runId;
278 std::uint32_t nEvents;
279};
280
281// RNTupleModel for Runs; in a real framework, this would likely be dynamic.
283{
284 // We recommend creating a bare model if the default entry is not used.
285 auto model = ROOT::RNTupleModel::CreateBare();
286 // For more efficient access, also create field tokens.
287 std::vector<ROOT::RFieldToken> tokens;
288
289 model->MakeField<decltype(Run::runId)>("runId");
290 tokens.push_back(model->GetToken("runId"));
291
292 model->MakeField<decltype(Run::nEvents)>("nEvents");
293 tokens.push_back(model->GetToken("nEvents"));
294
295 return {std::move(model), std::move(tokens)};
296}
297
298// DataProducts with addresses that point into the Run object.
299std::vector<DataProduct> CreateRunDataProducts(Run &run)
300{
301 std::vector<DataProduct> products;
302 // The indices have to match the order of std::vector<ROOT::RFieldToken> above.
303 products.emplace_back(0, &run.runId);
304 products.emplace_back(1, &run.nEvents);
305 return products;
306}
307
308constexpr unsigned kNRunsPerThread = 100;
309constexpr unsigned kMeanNEventsPerRun = 400;
310constexpr unsigned kStddevNEventsPerRun = 100;
311constexpr unsigned kMeanNTracks = 5;
312
314{
315 std::mt19937 gen(threadId);
316 std::normal_distribution<double> nEventsDist(kMeanNEventsPerRun, kStddevNEventsPerRun);
317 std::poisson_distribution<> nTracksDist(kMeanNTracks);
318 std::uniform_real_distribution<float> floatDist;
319
320 for (std::uint32_t runId = threadId * kNRunsPerThread; runId < (threadId + 1) * kNRunsPerThread; runId++) {
321 double nEventsD = nEventsDist(gen);
322 std::uint32_t nEvents = 0;
323 if (nEventsD > 0) {
324 nEvents = static_cast<std::uint32_t>(nEventsD);
325 }
326
327 // Process events, reusing a single Event object.
328 Event event;
329 event.runId = runId;
331 for (std::uint32_t eventId = 0; eventId < nEvents; eventId++) {
332 event.eventId = eventId;
333
334 // Produce some data; eta, phi, and pt are just filled with uniformly distributed data.
335 event.electrons.resize(nTracksDist(gen));
336 for (auto &electron : event.electrons) {
337 electron.eta = floatDist(gen);
338 electron.mass = 0.511 /* MeV */;
339 electron.phi = floatDist(gen);
340 electron.pt = floatDist(gen);
341 electron.charge = (gen() % 2 ? 1 : -1);
342 }
343 event.photons.resize(nTracksDist(gen));
344 for (auto &photon : event.photons) {
345 photon.eta = floatDist(gen);
346 photon.mass = 0;
347 photon.phi = floatDist(gen);
348 photon.pt = floatDist(gen);
349 }
350 event.muons.resize(nTracksDist(gen));
351 for (auto &muon : event.muons) {
352 muon.eta = floatDist(gen);
353 muon.mass = 105.658 /* MeV */;
354 muon.phi = floatDist(gen);
355 muon.pt = floatDist(gen);
356 muon.charge = (gen() % 2 ? 1 : -1);
357 }
358
360 }
361
362 // Fill the Run data.
363 Run run;
364 run.runId = runId;
365 run.nEvents = nEvents;
366
369 }
370}
371
372constexpr unsigned kNThreads = 4;
373
375{
376 FileService fileService("ntpl014_framework.root", "RECREATE");
377
379 // Parallel writing requires buffered writing; force it on (even if it is the default).
380 options.SetUseBufferedWrite(true);
381 // For demonstration purposes, reduce the cluster size to 2 MiB.
382 options.SetApproxZippedClusterSize(2 * 1024 * 1024);
384
385 // SerializingOutputter also relies on buffered writing; force it on (even if it is the default).
386 options.SetUseBufferedWrite(true);
387 // For demonstration purposes, reduce the cluster size for the very simple Run data to 1 KiB.
388 options.SetApproxZippedClusterSize(1024);
390
391 // Initialize slots in the two Outputters.
392 for (unsigned i = 0; i < kNThreads; i++) {
393 eventOutputter.InitSlot(i);
394 runOutputter.InitSlot(i);
395 }
396
397 std::vector<std::thread> threads;
398 for (unsigned i = 0; i < kNThreads; i++) {
399 threads.emplace_back(ProcessRunsAndEvents, i, std::ref(eventOutputter), std::ref(runOutputter));
400 }
401 for (unsigned i = 0; i < kNThreads; i++) {
402 threads[i].join();
403 }
404}
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t index
A status object after filling an entry.
bool ShouldFlushCluster() const
Return true if the caller should call FlushCluster.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an RNTuple to the existing file.
Common user-tunable settings for storing RNTuples.
void SetApproxZippedClusterSize(std::size_t val)
static std::unique_ptr< RNTupleWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Creates an RNTupleWriter that writes into an existing TFile or TDirectory, without overwriting its co...
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:130
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3764
TPaveText * pt
double product(DoubleArray factors, std::size_t nFactors)
Definition MathFuncs.h:95