Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
8/// is welcome!
9
10/*************************************************************************
11 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
12 * All rights reserved. *
13 * *
14 * For the licensing terms see $ROOTSYS/LICENSE. *
15 * For the list of contributors see $ROOTSYS/README/CREDITS. *
16 *************************************************************************/
17
18#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleZip.hxx>
21#include <ROOT/RPageSinkBuf.hxx>
22
23#include <algorithm>
24#include <memory>
25
27{
28 fBufferedPages.clear();
29 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
30 // clean-up is required
31 fSealedPages.clear();
32}
33
35 : RPageSink(inner->GetNTupleName(), inner->GetWriteOptions()), fInnerSink(std::move(inner))
36{
37 fMetrics = Detail::RNTupleMetrics("RPageSinkBuf");
38 fCounters = std::make_unique<RCounters>(RCounters{
39 *fMetrics.MakeCounter<Detail::RNTuplePlainCounter *>("ParallelZip", "", "compressing pages in parallel"),
40 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
41 *fMetrics.MakeCounter<Detail::RNTuplePlainCounter *>("timeWallCriticalSection", "ns",
42 "wall clock time spent in critical sections"),
44 "CPU time spent compressing"),
46 "timeCpuCriticalSection", "ns", "CPU time spent in critical section")});
47 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
48}
49
51{
52 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
53 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
54 // base class destructor is invoked.
55 WaitForAllTasks();
56}
57
60{
61 return ColumnHandle_t{fNColumns++, &column};
62}
63
64void ROOT::Experimental::Internal::RPageSinkBuf::ConnectFields(const std::vector<RFieldBase *> &fields,
65 NTupleSize_t firstEntry)
66{
67 auto connectField = [&](RFieldBase &f) {
68 // Field Zero would have id 0.
69 ++fNFields;
70 f.SetOnDiskId(fNFields);
71 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
72 };
73 for (auto *f : fields) {
74 connectField(*f);
75 for (auto &descendant : *f) {
76 connectField(descendant);
77 }
78 }
79 fBufferedColumns.resize(fNColumns);
80}
81
83{
84 return fInnerSink->GetDescriptor();
85}
86
88{
89 ConnectFields(Internal::GetFieldZeroOfModel(model).GetSubFields(), 0U);
90
91 fInnerModel = model.Clone();
92 fInnerSink->Init(*fInnerModel);
93}
94
96 NTupleSize_t firstEntry)
97{
98 ConnectFields(changeset.fAddedFields, firstEntry);
99
100 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
101 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
102 auto cloneAddField = [&](const RFieldBase *field) {
103 auto cloned = field->Clone(field->GetFieldName());
104 auto p = &(*cloned);
105 fInnerModel->AddField(std::move(cloned));
106 return p;
107 };
108 auto cloneAddProjectedField = [&](RFieldBase *field) {
109 auto cloned = field->Clone(field->GetFieldName());
110 auto p = &(*cloned);
111 auto &projectedFields = Internal::GetProjectedFieldsOfModel(changeset.fModel);
113 fieldMap[p] = &fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
114 auto targetIt = cloned->begin();
115 for (auto &f : *field)
116 fieldMap[&(*targetIt++)] =
117 &fInnerModel->GetConstField(projectedFields.GetSourceField(&f)->GetQualifiedFieldName());
118 Internal::GetProjectedFieldsOfModel(*fInnerModel).Add(std::move(cloned), fieldMap);
119 return p;
120 };
121 RNTupleModelChangeset innerChangeset{*fInnerModel};
122 fInnerModel->Unfreeze();
123 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
124 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
125 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
126 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
127 fInnerModel->Freeze();
128 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
129}
130
132{
133 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
134 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
135 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
136}
137
139{
140 fSuppressedColumns.emplace_back(columnHandle);
141}
142
144{
145 auto colId = columnHandle.fPhysicalId;
146 const auto &element = *columnHandle.fColumn->GetElement();
147
148 // Safety: References are guaranteed to be valid until the element is destroyed. In other words, all buffered page
149 // elements are valid until DropBufferedPages().
150 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
151 std::size_t maxSealedPageBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
152 // Do not allocate the buffer yet, in case of IMT we only need it once the task is started.
153 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
154
155 auto allocateBuf = [&zipItem, maxSealedPageBytes]() {
156 zipItem.fBuf = MakeUninitArray<unsigned char>(maxSealedPageBytes);
157 R__ASSERT(zipItem.fBuf);
158 };
159 auto shrinkSealedPage = [&zipItem, maxSealedPageBytes, &sealedPage]() {
160 // If the sealed page is smaller than the maximum size (with compression), allocate what is needed and copy the
161 // sealed page content to save memory.
162 auto sealedBufferSize = sealedPage.GetBufferSize();
163 if (sealedBufferSize < maxSealedPageBytes) {
164 auto buf = MakeUninitArray<unsigned char>(sealedBufferSize);
165 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
166 zipItem.fBuf = std::move(buf);
167 sealedPage.SetBuffer(zipItem.fBuf.get());
168 }
169 };
170
171 if (!fTaskScheduler) {
172 allocateBuf();
173 // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
174 RSealPageConfig config;
175 config.fPage = &page;
176 config.fElement = &element;
177 config.fCompressionSetting = GetWriteOptions().GetCompression();
178 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
179 config.fAllowAlias = false;
180 config.fBuffer = zipItem.fBuf.get();
181 {
182 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
183 sealedPage = SealPage(config);
184 }
185 shrinkSealedPage();
186 zipItem.fSealedPage = &sealedPage;
187 return;
188 }
189
190 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
191 zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
192 // make sure the page is aware of how many elements it will have
193 zipItem.fPage.GrowUnchecked(page.GetNElements());
194 memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
195
196 fCounters->fParallelZip.SetValue(1);
197 // Thread safety: Each thread works on a distinct zipItem which owns its
198 // compression buffer.
199 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
200 allocateBuf();
201 RSealPageConfig config;
202 config.fPage = &zipItem.fPage;
203 config.fElement = &element;
204 config.fCompressionSetting = GetWriteOptions().GetCompression();
205 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
206 // Make sure the page buffer is not aliased so that we can free the uncompressed page.
207 config.fAllowAlias = false;
208 config.fBuffer = zipItem.fBuf.get();
209 // TODO: Somehow expose the time spent in zipping via the metrics. Wall time is tricky because the tasks run
210 // in parallel...
211 sealedPage = SealPage(config);
212 shrinkSealedPage();
213 zipItem.fSealedPage = &sealedPage;
214 // Release the uncompressed page. This works because the "page allocator must be thread-safe."
215 zipItem.fPage = RPage();
216 });
217}
218
220 const RSealedPage & /*sealedPage*/)
221{
222 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
223}
224
225void ROOT::Experimental::Internal::RPageSinkBuf::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> /*ranges*/)
226{
227 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
228}
229
230// We implement both StageCluster() and CommitCluster() because we can call CommitCluster() on the inner sink more
231// efficiently in a single critical section. For parallel writing, it also guarantees that we produce a fully sequential
232// file.
233void ROOT::Experimental::Internal::RPageSinkBuf::FlushClusterImpl(std::function<void(void)> FlushClusterFn)
234{
235 WaitForAllTasks();
236
237 std::vector<RSealedPageGroup> toCommit;
238 toCommit.reserve(fBufferedColumns.size());
239 for (auto &bufColumn : fBufferedColumns) {
240 R__ASSERT(bufColumn.HasSealedPagesOnly());
241 const auto &sealedPages = bufColumn.GetSealedPages();
242 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
243 }
244
245 {
246 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
247 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
248 fInnerSink->CommitSealedPageV(toCommit);
249
250 for (auto handle : fSuppressedColumns)
251 fInnerSink->CommitSuppressedColumn(handle);
252 fSuppressedColumns.clear();
253
254 FlushClusterFn();
255 }
256
257 for (auto &bufColumn : fBufferedColumns)
258 bufColumn.DropBufferedPages();
259}
260
262{
263 std::uint64_t nbytes;
264 FlushClusterImpl([&] { nbytes = fInnerSink->CommitCluster(nNewEntries); });
265 return nbytes;
266}
267
270{
272 FlushClusterImpl([&] { stagedCluster = fInnerSink->StageCluster(nNewEntries); });
273 return stagedCluster;
274}
275
277{
278 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
279 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
280 fInnerSink->CommitStagedClusters(clusters);
281}
282
284{
285 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
286 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
287 fInnerSink->CommitClusterGroup();
288}
289
291{
292 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
293 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
294 fInnerSink->CommitDataset();
295}
296
299{
300 return fInnerSink->ReservePage(columnHandle, nElements);
301}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define g(i)
Definition RSha256.hxx:105
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
RColumnElementBase * GetElement() const
Definition RColumn.hxx:331
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
const RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
std::unique_ptr< RCounters > fCounters
void InitImpl(RNTupleModel &model) final
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void ConnectFields(const std::vector< RFieldBase * > &fields, NTupleSize_t firstEntry)
void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
const std::string & GetNTupleName() const
Returns the NTuple name.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:47
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:115
std::uint32_t GetElementSize() const
Definition RPage.hxx:123
std::uint32_t GetNElements() const
Definition RPage.hxx:124
std::unordered_map< const RFieldBase *, const RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition TObject.cxx:241
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
A sealed page contains the bytes of a page as written to storage (packed & compressed).