Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl013_staged.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4/// Example of staged cluster committing in multi-threaded writing using RNTupleParallelWriter.
5///
6/// \macro_code
7///
8/// \date September 2024
9/// \author The ROOT Team
10
12#include <ROOT/RNTupleModel.hxx>
14
15#include <TRandom3.h>
16
17#include <iostream>
18#include <memory>
19#include <mutex>
20#include <random>
21#include <thread>
22#include <vector>
23
24// Where to store the ntuple of this example
25constexpr char const *kNTupleFileName = "ntpl013_staged.root";
26
27// Number of parallel threads to fill the ntuple
28constexpr int kNWriterThreads = 4;
29
30// Number of events to generate is kNEventsPerThread * kNWriterThreads
31constexpr int kNEventsPerThread = 25000;
32
33// Number of events per block
34constexpr int kNEventsPerBlock = 10000;
35
36// Thread function to generate and write events
38{
39 // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data
40 // structures in real code!
41 static std::mutex g_Mutex;
43
44 using generator = std::mt19937;
46
47 // Create a fill context and turn on staged cluster committing.
48 auto fillContext = writer->CreateFillContext();
49 fillContext->EnableStagedClusterCommitting();
50 auto entry = fillContext->CreateEntry();
51
52 auto eventId = entry->GetPtr<std::uint32_t>("eventId");
54 auto rndm = entry->GetPtr<float>("rndm");
55
56 for (int i = 0; i < kNEventsPerThread; i++) {
57 // Prepare the entry with an id and a random number.
58 *eventId = eventIdStart + i;
59 auto d = static_cast<double>(gen()) / generator::max();
60 *rndm = static_cast<float>(d);
61
62 // Fill might auto-flush a cluster, which will be staged.
63 fillContext->Fill(*entry);
64 }
65
66 // It is important to first FlushCluster() so that a cluster with the remaining entries is staged.
67 fillContext->FlushCluster();
68 {
69 std::lock_guard g(g_Mutex);
70 fillContext->CommitStagedClusters();
71 std::cout << "Thread #" << id << " wrote events #" << eventIdStart << " - #"
72 << (eventIdStart + kNEventsPerThread - 1) << " as entries #" << g_WrittenEntries << " - #"
73 << (g_WrittenEntries + kNEventsPerThread - 1) << std::endl;
75 }
76}
77
78// Generate kNEvents with multiple threads in kNTupleFileName
79void Write()
80{
81 std::cout << " === Writing with staged cluster committing ===" << std::endl;
82
83 // Create the data model
84 auto model = ROOT::RNTupleModel::CreateBare();
85 model->MakeField<std::uint32_t>("eventId");
86 model->MakeField<float>("rndm");
87
88 // Create RNTupleWriteOptions to make the writing commit multiple clusters.
89 // This is for demonstration purposes only to have multiple clusters per
90 // thread that are implicitly flushed, and should not be copied into real
91 // code!
93 options.SetApproxZippedClusterSize(32'000);
94
95 // We hand over the data model to a newly created ntuple of name "NTuple", stored in kNTupleFileName
96 auto writer = ROOT::RNTupleParallelWriter::Recreate(std::move(model), "NTuple", kNTupleFileName, options);
97
98 std::vector<std::thread> threads;
99 for (int i = 0; i < kNWriterThreads; ++i)
100 threads.emplace_back(FillData, i, writer.get());
101 for (int i = 0; i < kNWriterThreads; ++i)
102 threads[i].join();
103
104 // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk
105 // and closes the attached ROOT file.
106}
107
109{
110 // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data
111 // structures in real code!
112 static std::mutex g_Mutex;
114
115 using generator = std::mt19937;
117
118 // Create a fill context and turn on staged cluster committing.
119 auto fillContext = writer->CreateFillContext();
120 fillContext->EnableStagedClusterCommitting();
121 auto entry = fillContext->CreateEntry();
122
123 auto eventId = entry->GetPtr<std::uint32_t>("eventId");
125 int startOfBlock = 0;
126 auto rndm = entry->GetPtr<float>("rndm");
127
128 for (int i = 0; i < kNEventsPerThread; i++) {
129 // Prepare the entry with an id and a random number.
130 *eventId = eventIdStart + i;
131 auto d = static_cast<double>(gen()) / generator::max();
132 *rndm = static_cast<float>(d);
133
134 // Fill might auto-flush a cluster, which will be staged.
135 fillContext->Fill(*entry);
136
137 if ((i + 1) % kNEventsPerBlock == 0) {
138 // Decide to flush this cluster and logically append all staged clusters to the ntuple.
139 fillContext->FlushCluster();
140 {
141 std::lock_guard g(g_Mutex);
142 fillContext->CommitStagedClusters();
143 auto firstEvent = eventIdStart + startOfBlock;
144 auto lastEvent = eventIdStart + i;
145 std::cout << "Thread #" << id << " wrote events #" << firstEvent << " - #" << lastEvent << " as entries #"
146 << g_WrittenEntries << " - #" << (g_WrittenEntries + kNEventsPerBlock - 1) << std::endl;
149 }
150 }
151 }
152
153 // Flush the remaining data and commit staged clusters.
154 fillContext->FlushCluster();
155 {
156 std::lock_guard g(g_Mutex);
157 fillContext->CommitStagedClusters();
158 auto firstEvent = eventIdStart + startOfBlock;
161 std::cout << "Thread #" << id << " wrote events #" << firstEvent << " - #" << lastEvent << " as entries #"
162 << g_WrittenEntries << " - #" << (g_WrittenEntries + numEvents - 1) << std::endl;
164 }
165}
166
167// Generate kNEvents with multiple threads in kNTupleFileName, and sequence them in blocks of kNEventsPerBlock entries
168void WriteInBlocks()
169{
170 std::cout << "\n === ... with sequencing in blocks of " << kNEventsPerBlock << " events ===" << std::endl;
171
172 // Create the data model
173 auto model = ROOT::RNTupleModel::CreateBare();
174 model->MakeField<std::uint32_t>("eventId");
175 model->MakeField<float>("rndm");
176
177 // Create RNTupleWriteOptions to make the writing commit multiple clusters.
178 // This is for demonstration purposes only to have multiple clusters per
179 // thread and also per block that are implicitly flushed, and can be mixed
180 // with explicit calls to FlushCluster(). This should not be copied into real
181 // code!
183 options.SetApproxZippedClusterSize(32'000);
184
185 // We hand over the data model to a newly created ntuple of name "NTuple", stored in kNTupleFileName
186 auto writer = ROOT::RNTupleParallelWriter::Recreate(std::move(model), "NTuple", kNTupleFileName, options);
187
188 std::vector<std::thread> threads;
189 for (int i = 0; i < kNWriterThreads; ++i)
190 threads.emplace_back(FillDataInBlocks, i, writer.get());
191 for (int i = 0; i < kNWriterThreads; ++i)
192 threads[i].join();
193
194 // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk
195 // and closes the attached ROOT file.
196}
197
198void ntpl013_staged()
199{
200 Write();
202}
#define d(i)
Definition RSha256.hxx:102
#define g(i)
Definition RSha256.hxx:105
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
static std::unique_ptr< RNTupleModel > CreateBare()
Creates a "bare model", i.e. an RNTupleModel with no default entry.
A writer to fill an RNTuple from multiple contexts.
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Recreate a new file and return a writer to write an RNTuple.
Common user-tunable settings for storing RNTuples.
void SetApproxZippedClusterSize(std::size_t val)
void FillData(BinData &dv, const TH1 *hist, TF1 *func=nullptr)
fill the data vector from a TH1.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.