Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RNTupleModel.hxx>
17#include <ROOT/RNTupleUtils.hxx>
19#include <ROOT/RNTupleZip.hxx>
20#include <ROOT/RPageSinkBuf.hxx>
21
22#include <algorithm>
23#include <memory>
24
32
34{
35 fBufferedPages.clear();
36 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
37 // clean-up is required
38 fSealedPages.clear();
39}
40
43{
44 fMetrics = RNTupleMetrics("RPageSinkBuf");
45 fCounters = std::make_unique<RCounters>(
46 RCounters{*fMetrics.MakeCounter<RNTuplePlainCounter *>("ParallelZip", "", "compressing pages in parallel"),
47 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
48 *fMetrics.MakeCounter<RNTuplePlainCounter *>("timeWallCriticalSection", "ns",
49 "wall clock time spent in critical sections"),
51 "CPU time spent compressing"),
53 "timeCpuCriticalSection", "ns", "CPU time spent in critical section")});
54 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
55}
56
58{
59 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
60 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
61 // base class destructor is invoked.
62 WaitForAllTasks();
63}
64
67{
68 return ColumnHandle_t{fNColumns++, &column};
69}
70
71void ROOT::Internal::RPageSinkBuf::ConnectFields(const std::vector<ROOT::RFieldBase *> &fields,
73{
74 auto connectField = [&](ROOT::RFieldBase &f) {
75 // Field Zero would have id 0.
76 ++fNFields;
77 f.SetOnDiskId(fNFields);
78 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
79 };
80 for (auto *f : fields) {
82 for (auto &descendant : *f) {
84 }
85 }
86 fBufferedColumns.resize(fNColumns);
87}
88
90{
91 return fInnerSink->GetDescriptor();
92}
93
95{
96 ConnectFields(GetFieldZeroOfModel(model).GetMutableSubfields(), 0U);
97
98 fInnerModel = model.Clone();
99 fInnerSink->Init(*fInnerModel);
100}
101
104{
105 ConnectFields(changeset.fAddedFields, firstEntry);
106
107 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
108 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
109 auto cloneAddField = [&](const ROOT::RFieldBase *field) {
110 auto cloned = field->Clone(field->GetFieldName());
111 auto p = &(*cloned);
112 fInnerModel->AddField(std::move(cloned));
113 return p;
114 };
116 auto cloned = field->Clone(field->GetFieldName());
117 auto p = &(*cloned);
120 fieldMap[p] = &fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
121 auto targetIt = cloned->begin();
122 for (auto &f : *field)
123 fieldMap[&(*targetIt++)] =
124 &fInnerModel->GetConstField(projectedFields.GetSourceField(&f)->GetQualifiedFieldName());
125 GetProjectedFieldsOfModel(*fInnerModel).Add(std::move(cloned), fieldMap);
126 return p;
127 };
129 fInnerModel->Unfreeze();
130 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
131 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
132 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
133 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
134 fInnerModel->Freeze();
135 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
136}
137
139{
140 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
141 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
142 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
143}
144
149
151{
152 auto colId = columnHandle.fPhysicalId;
153 const auto &element = *columnHandle.fColumn->GetElement();
154
155 // Safety: References are guaranteed to be valid until the element is destroyed. In other words, all buffered page
156 // elements are valid until DropBufferedPages().
157 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
158 std::size_t maxSealedPageBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
159 // Do not allocate the buffer yet, in case of IMT we only need it once the task is started.
160 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
161
164 R__ASSERT(zipItem.fBuf);
165 };
167 // If the sealed page is smaller than the maximum size (with compression), allocate what is needed and copy the
168 // sealed page content to save memory.
169 auto sealedBufferSize = sealedPage.GetBufferSize();
172 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
173 zipItem.fBuf = std::move(buf);
174 sealedPage.SetBuffer(zipItem.fBuf.get());
175 }
176 };
177
178 if (!fTaskScheduler) {
179 allocateBuf();
180 // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
181 RSealPageConfig config;
182 config.fPage = &page;
183 config.fElement = &element;
184 config.fCompressionSettings = GetWriteOptions().GetCompression();
185 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
186 config.fAllowAlias = false;
187 config.fBuffer = zipItem.fBuf.get();
188 {
189 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
190 sealedPage = SealPage(config);
191 }
193 zipItem.fSealedPage = &sealedPage;
194 return;
195 }
196
197 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
198 zipItem.fPage = fPageAllocator->NewPage(page.GetElementSize(), page.GetNElements());
199 // make sure the page is aware of how many elements it will have
200 zipItem.fPage.GrowUnchecked(page.GetNElements());
201 memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
202
203 fCounters->fParallelZip.SetValue(1);
204 // Thread safety: Each thread works on a distinct zipItem which owns its
205 // compression buffer.
206 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
207 allocateBuf();
208 RSealPageConfig config;
209 config.fPage = &zipItem.fPage;
210 config.fElement = &element;
211 config.fCompressionSettings = GetWriteOptions().GetCompression();
212 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
213 // Make sure the page buffer is not aliased so that we can free the uncompressed page.
214 config.fAllowAlias = false;
215 config.fBuffer = zipItem.fBuf.get();
216 // TODO: Somehow expose the time spent in zipping via the metrics. Wall time is tricky because the tasks run
217 // in parallel...
218 sealedPage = SealPage(config);
220 zipItem.fSealedPage = &sealedPage;
221 // Release the uncompressed page. This works because the "page allocator must be thread-safe."
222 zipItem.fPage = RPage();
223 });
224}
225
227 const RSealedPage & /*sealedPage*/)
228{
229 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
230}
231
233 std::span<ROOT::Internal::RPageStorage::RSealedPageGroup> /*ranges*/)
234{
235 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
236}
237
238// We implement both StageCluster() and CommitCluster() because we can call CommitCluster() on the inner sink more
239// efficiently in a single critical section. For parallel writing, it also guarantees that we produce a fully sequential
240// file.
242{
243 WaitForAllTasks();
244
245 std::vector<RSealedPageGroup> toCommit;
246 toCommit.reserve(fBufferedColumns.size());
247 for (auto &bufColumn : fBufferedColumns) {
248 R__ASSERT(bufColumn.HasSealedPagesOnly());
249 const auto &sealedPages = bufColumn.GetSealedPages();
250 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
251 }
252
253 {
254 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
255 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
256 fInnerSink->CommitSealedPageV(toCommit);
257
258 for (auto handle : fSuppressedColumns)
259 fInnerSink->CommitSuppressedColumn(handle);
260 fSuppressedColumns.clear();
261
263 }
264
265 for (auto &bufColumn : fBufferedColumns)
266 bufColumn.DropBufferedPages();
267}
268
270{
271 std::uint64_t nbytes;
272 FlushClusterImpl([&] { nbytes = fInnerSink->CommitCluster(nNewEntries); });
273 return nbytes;
274}
275
282
284{
285 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
286 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
287 fInnerSink->CommitStagedClusters(clusters);
288}
289
291{
292 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
293 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
294 fInnerSink->CommitClusterGroup();
295}
296
298{
299 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
300 RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
301 fInnerSink->CommitDataset();
302}
303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
#define g(i)
Definition RSha256.hxx:105
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::unique_ptr< RCounters > fCounters
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
const std::string & GetNTupleName() const
Returns the NTuple name.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
std::unique_ptr< RNTupleModel > Clone() const
const_iterator begin() const
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
A sealed page contains the bytes of a page as written to storage (packed & compressed).