Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ntpl013_staged.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_ntuple
3/// \notebook
4/// Example of staged cluster committing in multi-threaded writing using RNTupleParallelWriter.
5///
6/// \macro_code
7///
8/// \date September 2024
9/// \author The ROOT Team
10
11// NOTE: The RNTuple classes are experimental at this point.
12// Functionality and interface are still subject to changes.
13
15#include <ROOT/RNTupleModel.hxx>
17
18#include <TRandom3.h>
19
20#include <iostream>
21#include <memory>
22#include <mutex>
23#include <random>
24#include <thread>
25#include <vector>
26
27// Import classes from experimental namespace for the time being
32
33// Where to store the ntuple of this example
34constexpr char const *kNTupleFileName = "ntpl013_staged.root";
35
36// Number of parallel threads to fill the ntuple
37constexpr int kNWriterThreads = 4;
38
39// Number of events to generate is kNEventsPerThread * kNWriterThreads
40constexpr int kNEventsPerThread = 25000;
41
42// Number of events per block
43constexpr int kNEventsPerBlock = 10000;
44
45// Thread function to generate and write events
46void FillData(int id, RNTupleParallelWriter *writer)
47{
48 // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data
49 // structures in real code!
50 static std::mutex g_Mutex;
51 static NTupleSize_t g_WrittenEntries = 0;
52
53 using generator = std::mt19937;
54 generator gen;
55
56 // Create a fill context and turn on staged cluster committing.
57 auto fillContext = writer->CreateFillContext();
58 fillContext->EnableStagedClusterCommitting();
59 auto entry = fillContext->CreateEntry();
60
61 auto eventId = entry->GetPtr<std::uint32_t>("eventId");
62 auto eventIdStart = id * kNEventsPerThread;
63 auto rndm = entry->GetPtr<float>("rndm");
64
65 for (int i = 0; i < kNEventsPerThread; i++) {
66 // Prepare the entry with an id and a random number.
67 *eventId = eventIdStart + i;
68 auto d = static_cast<double>(gen()) / generator::max();
69 *rndm = static_cast<float>(d);
70
71 // Fill might auto-flush a cluster, which will be staged.
72 fillContext->Fill(*entry);
73 }
74
75 // It is important to first FlushCluster() so that a cluster with the remaining entries is staged.
76 fillContext->FlushCluster();
77 {
78 std::lock_guard g(g_Mutex);
79 fillContext->CommitStagedClusters();
80 std::cout << "Thread #" << id << " wrote events #" << eventIdStart << " - #"
81 << (eventIdStart + kNEventsPerThread - 1) << " as entries #" << g_WrittenEntries << " - #"
82 << (g_WrittenEntries + kNEventsPerThread - 1) << std::endl;
83 g_WrittenEntries += kNEventsPerThread;
84 }
85}
86
87// Generate kNEvents with multiple threads in kNTupleFileName
88void Write()
89{
90 std::cout << " === Writing with staged cluster committing ===" << std::endl;
91
92 // Create the data model
93 auto model = RNTupleModel::CreateBare();
94 model->MakeField<std::uint32_t>("eventId");
95 model->MakeField<float>("rndm");
96
97 // Create RNTupleWriteOptions to make the writing commit multiple clusters.
98 // This is for demonstration purposes only to have multiple clusters per
99 // thread that are implicitly flushed, and should not be copied into real
100 // code!
101 RNTupleWriteOptions options;
102 options.SetApproxZippedClusterSize(32'000);
103
104 // We hand over the data model to a newly created ntuple of name "NTuple", stored in kNTupleFileName
105 auto writer = RNTupleParallelWriter::Recreate(std::move(model), "NTuple", kNTupleFileName, options);
106
107 std::vector<std::thread> threads;
108 for (int i = 0; i < kNWriterThreads; ++i)
109 threads.emplace_back(FillData, i, writer.get());
110 for (int i = 0; i < kNWriterThreads; ++i)
111 threads[i].join();
112
113 // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk
114 // and closes the attached ROOT file.
115}
116
117void FillDataInBlocks(int id, RNTupleParallelWriter *writer)
118{
119 // static variables that are shared between threads; this is done for simplicity in this tutorial, use proper data
120 // structures in real code!
121 static std::mutex g_Mutex;
122 static NTupleSize_t g_WrittenEntries = 0;
123
124 using generator = std::mt19937;
125 generator gen;
126
127 // Create a fill context and turn on staged cluster committing.
128 auto fillContext = writer->CreateFillContext();
129 fillContext->EnableStagedClusterCommitting();
130 auto entry = fillContext->CreateEntry();
131
132 auto eventId = entry->GetPtr<std::uint32_t>("eventId");
133 auto eventIdStart = id * kNEventsPerThread;
134 int startOfBlock = 0;
135 auto rndm = entry->GetPtr<float>("rndm");
136
137 for (int i = 0; i < kNEventsPerThread; i++) {
138 // Prepare the entry with an id and a random number.
139 *eventId = eventIdStart + i;
140 auto d = static_cast<double>(gen()) / generator::max();
141 *rndm = static_cast<float>(d);
142
143 // Fill might auto-flush a cluster, which will be staged.
144 fillContext->Fill(*entry);
145
146 if ((i + 1) % kNEventsPerBlock == 0) {
147 // Decide to flush this cluster and logically append all staged clusters to the ntuple.
148 fillContext->FlushCluster();
149 {
150 std::lock_guard g(g_Mutex);
151 fillContext->CommitStagedClusters();
152 auto firstEvent = eventIdStart + startOfBlock;
153 auto lastEvent = eventIdStart + i;
154 std::cout << "Thread #" << id << " wrote events #" << firstEvent << " - #" << lastEvent << " as entries #"
155 << g_WrittenEntries << " - #" << (g_WrittenEntries + kNEventsPerBlock - 1) << std::endl;
156 g_WrittenEntries += kNEventsPerBlock;
157 startOfBlock += kNEventsPerBlock;
158 }
159 }
160 }
161
162 // Flush the remaining data and commit staged clusters.
163 fillContext->FlushCluster();
164 {
165 std::lock_guard g(g_Mutex);
166 fillContext->CommitStagedClusters();
167 auto firstEvent = eventIdStart + startOfBlock;
168 auto lastEvent = eventIdStart + kNEventsPerThread - 1;
169 auto numEvents = kNEventsPerThread - startOfBlock;
170 std::cout << "Thread #" << id << " wrote events #" << firstEvent << " - #" << lastEvent << " as entries #"
171 << g_WrittenEntries << " - #" << (g_WrittenEntries + numEvents - 1) << std::endl;
172 g_WrittenEntries += numEvents;
173 }
174}
175
176// Generate kNEvents with multiple threads in kNTupleFileName, and sequence them in blocks of kNEventsPerBlock entries
177void WriteInBlocks()
178{
179 std::cout << "\n === ... with sequencing in blocks of " << kNEventsPerBlock << " events ===" << std::endl;
180
181 // Create the data model
182 auto model = RNTupleModel::CreateBare();
183 model->MakeField<std::uint32_t>("eventId");
184 model->MakeField<float>("rndm");
185
186 // Create RNTupleWriteOptions to make the writing commit multiple clusters.
187 // This is for demonstration purposes only to have multiple clusters per
188 // thread and also per block that are implicitly flushed, and can be mixed
189 // with explicit calls to FlushCluster(). This should not be copied into real
190 // code!
191 RNTupleWriteOptions options;
192 options.SetApproxZippedClusterSize(32'000);
193
194 // We hand over the data model to a newly created ntuple of name "NTuple", stored in kNTupleFileName
195 auto writer = RNTupleParallelWriter::Recreate(std::move(model), "NTuple", kNTupleFileName, options);
196
197 std::vector<std::thread> threads;
198 for (int i = 0; i < kNWriterThreads; ++i)
199 threads.emplace_back(FillDataInBlocks, i, writer.get());
200 for (int i = 0; i < kNWriterThreads; ++i)
201 threads[i].join();
202
203 // The writer unique pointer goes out of scope here. On destruction, the writer flushes unwritten data to disk
204 // and closes the attached ROOT file.
205}
206
207void ntpl013_staged()
208{
209 Write();
210 WriteInBlocks();
211}
#define d(i)
Definition RSha256.hxx:102
#define g(i)
Definition RSha256.hxx:105
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
Common user-tunable settings for storing ntuples.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
void FillData(BinData &dv, const TH1 *hist, TF1 *func=nullptr)
fill the data vector from a TH1.