Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RField.hxx>
22#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RPagePool.hxx>
24#include <ROOT/RPageSinkBuf.hxx>
26#include <ROOT/RStringView.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <utility>
35
36
38{
39}
40
42
43//------------------------------------------------------------------------------
44
46{
47 for (unsigned i = 0; i < fIDs.size(); ++i) {
48 if (fIDs[i] == physicalColumnID) {
49 fRefCounters[i]++;
50 return;
51 }
52 }
53 fIDs.emplace_back(physicalColumnID);
54 fRefCounters.emplace_back(1);
55}
56
58{
59 for (unsigned i = 0; i < fIDs.size(); ++i) {
60 if (fIDs[i] == physicalColumnID) {
61 if (--fRefCounters[i] == 0) {
62 fIDs.erase(fIDs.begin() + i);
63 fRefCounters.erase(fRefCounters.begin() + i);
64 }
65 return;
66 }
67 }
68}
69
72{
74 for (const auto &id : fIDs)
75 result.insert(id);
76 return result;
77}
78
80 : RPageStorage(name), fMetrics(""), fOptions(options)
81{
82}
83
85{
86}
87
88std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSource::Create(
89 std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options)
90{
91 if (ntupleName.empty()) {
92 throw RException(R__FAIL("empty RNTuple name"));
93 }
94 if (location.empty()) {
95 throw RException(R__FAIL("empty storage location"));
96 }
97 if (location.find("daos://") == 0)
98#ifdef R__ENABLE_DAOS
99 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
100#else
101 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
102#endif
103
104 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
105}
106
109{
111 auto physicalId = GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex());
112 R__ASSERT(physicalId != kInvalidDescriptorId);
113 fActivePhysicalColumns.Insert(physicalId);
114 return ColumnHandle_t{physicalId, &column};
115}
116
118{
119 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId);
120}
121
123{
124 return GetSharedDescriptorGuard()->GetNEntries();
125}
126
128{
129 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
130}
131
133{
134 // TODO(jblomer) distinguish trees
135 return columnHandle.fPhysicalId;
136}
137
139{
140 if (fTaskScheduler)
141 UnzipClusterImpl(cluster);
142}
143
145 const RColumnElementBase &element,
146 DescriptorId_t physicalColumnId)
147{
148 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
149 // large enough to hold `sealedPage.fNElements`
150 if (sealedPage.fBuffer == RPage::GetPageZeroBuffer()) {
151 auto page = RPage::MakePageZero(physicalColumnId, element.GetSize());
152 page.GrowUnchecked(sealedPage.fNElements);
153 return page;
154 }
155
156 const auto bytesPacked = element.GetPackedSize(sealedPage.fNElements);
157 using Allocator_t = RPageAllocatorHeap;
158 auto page = Allocator_t::NewPage(physicalColumnId, element.GetSize(), sealedPage.fNElements);
159 if (sealedPage.fSize != bytesPacked) {
160 fDecompressor->Unzip(sealedPage.fBuffer, sealedPage.fSize, bytesPacked, page.GetBuffer());
161 } else {
162 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
163 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
164 // Note that usually pages are compressed.
165 memcpy(page.GetBuffer(), sealedPage.fBuffer, bytesPacked);
166 }
167
168 if (!element.IsMappable()) {
169 auto tmp = Allocator_t::NewPage(physicalColumnId, element.GetSize(), sealedPage.fNElements);
170 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.fNElements);
171 Allocator_t::DeletePage(page);
172 page = tmp;
173 }
174
175 page.GrowUnchecked(sealedPage.fNElements);
176 return page;
177}
178
180 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
181 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
182{
183 auto descriptorGuard = GetSharedDescriptorGuard();
184 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
185
186 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
187 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
188 NTupleSize_t pageNo = 0;
189 for (const auto &pageInfo : pageRange.fPageInfos) {
190 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
191 pageZeroMap.Register(
192 ROnDiskPage::Key{physicalColumnId, pageNo},
193 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.fBytesOnStorage));
194 } else {
195 perPageFunc(physicalColumnId, pageNo, pageInfo);
196 }
197 ++pageNo;
198 }
199 }
200}
201
203{
204 fMetrics = RNTupleMetrics(prefix);
205 fCounters = std::unique_ptr<RCounters>(new RCounters{
206 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nReadV", "", "number of vector read requests"),
207 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nRead", "", "number of byte ranges read"),
208 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szReadPayload", "B", "volume read from storage (required)"),
209 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szReadOverhead", "B", "volume read from storage (overhead)"),
210 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szUnzip", "B", "volume after unzipping"),
211 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nClusterLoaded", "",
212 "number of partial clusters preloaded from storage"),
213 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPageLoaded", "", "number of pages loaded from storage"),
214 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPagePopulated", "", "number of populated pages"),
215 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallRead", "ns", "wall clock time spent reading"),
216 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
217 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*>("timeCpuRead", "ns", "CPU time spent reading"),
218 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*> ("timeCpuUnzip", "ns",
219 "CPU time spent decompressing"),
220 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwRead", "MB/s", "bandwidth compressed bytes read per second",
221 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
222 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
223 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
224 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
225 if (auto walltime = timeWallRead->GetValueAsInt()) {
226 double payload = szReadPayload->GetValueAsInt();
227 double overhead = szReadOverhead->GetValueAsInt();
228 // unit: bytes / nanosecond = GB/s
229 return {true, (1000. * (payload + overhead) / walltime)};
230 }
231 }
232 }
233 }
234 return {false, -1.};
235 }
236 ),
237 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second",
238 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
239 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
240 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
241 if (auto walltime = timeWallRead->GetValueAsInt()) {
242 double unzip = szUnzip->GetValueAsInt();
243 // unit: bytes / nanosecond = GB/s
244 return {true, 1000. * unzip / walltime};
245 }
246 }
247 }
248 return {false, -1.};
249 }
250 ),
251 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second",
252 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
253 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
254 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
255 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
256 double unzip = szUnzip->GetValueAsInt();
257 // unit: bytes / nanosecond = GB/s
258 return {true, 1000. * unzip / walltime};
259 }
260 }
261 }
262 return {false, -1.};
263 }
264 ),
265 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("rtReadEfficiency", "", "ratio of payload over all bytes read",
266 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
267 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
268 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
269 if (auto payload = szReadPayload->GetValueAsInt()) {
270 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
271 return {true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
272 }
273 }
274 }
275 return {false, -1.};
276 }
277 ),
278 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
279 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
280 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
281 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
282 if (auto unzip = szUnzip->GetValueAsInt()) {
283 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
284 }
285 }
286 }
287 return {false, -1.};
288 }
289 )
290 });
291}
292
293
294//------------------------------------------------------------------------------
295
296
298 : RPageStorage(name), fMetrics(""), fOptions(options.Clone())
299{
300}
301
303{
304}
305
306std::unique_ptr<ROOT::Experimental::Detail::RPageSink> ROOT::Experimental::Detail::RPageSink::Create(
307 std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options)
308{
309 if (ntupleName.empty()) {
310 throw RException(R__FAIL("empty RNTuple name"));
311 }
312 if (location.empty()) {
313 throw RException(R__FAIL("empty storage location"));
314 }
315 std::unique_ptr<ROOT::Experimental::Detail::RPageSink> realSink;
316 if (location.find("daos://") == 0) {
317#ifdef R__ENABLE_DAOS
318 realSink = std::make_unique<RPageSinkDaos>(ntupleName, location, options);
319#else
320 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
321#endif
322 } else {
323 realSink = std::make_unique<RPageSinkFile>(ntupleName, location, options);
324 }
325
326 if (options.GetUseBufferedWrite())
327 return std::make_unique<RPageSinkBuf>(std::move(realSink));
328 return realSink;
329}
330
333{
334 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
335 fDescriptorBuilder.AddColumn(columnId, columnId, fieldId, column.GetModel(), column.GetIndex(),
336 column.GetFirstElementIndex());
337 return ColumnHandle_t{columnId, &column};
338}
339
341 NTupleSize_t firstEntry)
342{
343 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
344 auto addField = [&](RFieldBase &f) {
345 auto fieldId = descriptor.GetNFields();
346 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
347 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
348 f.SetOnDiskId(fieldId);
349 f.ConnectPageSink(*this, firstEntry); // issues in turn one or several calls to `AddColumn()`
350 };
351 auto addProjectedField = [&](RFieldBase &f) {
352 auto fieldId = descriptor.GetNFields();
353 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
354 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
355 f.SetOnDiskId(fieldId);
356 auto sourceFieldId = changeset.fModel.GetProjectedFields().GetSourceField(&f)->GetOnDiskId();
357 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
358 auto targetId = descriptor.GetNLogicalColumns();
359 fDescriptorBuilder.AddColumn(targetId, source.GetLogicalId(), fieldId, source.GetModel(), source.GetIndex());
360 }
361 };
362
363 R__ASSERT(firstEntry >= fPrevClusterNEntries);
364 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
365 for (auto f : changeset.fAddedFields) {
366 addField(*f);
367 for (auto &descendant : *f)
368 addField(descendant);
369 }
370 for (auto f : changeset.fAddedProjectedFields) {
371 addProjectedField(*f);
372 for (auto &descendant : *f)
373 addProjectedField(descendant);
374 }
375
376 const auto nColumns = descriptor.GetNPhysicalColumns();
377 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
379 columnRange.fPhysicalColumnId = i;
380 // We set the first element index in the current cluster to the first element that is part of a materialized page
381 // (i.e., that is part of a page list). For deferred columns, however, the column range is fixed up as needed by
382 // `RClusterDescriptorBuilder::AddDeferredColumnRanges()` on read back.
383 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
384 columnRange.fNElements = 0;
385 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
386 fOpenColumnRanges.emplace_back(columnRange);
388 pageRange.fPhysicalColumnId = i;
389 fOpenPageRanges.emplace_back(std::move(pageRange));
390 }
391
392 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
393 // header was already serialized, this has to be done manually as it is required for page list serialization.
394 if (fSerializationContext.GetHeaderSize() > 0)
395 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
396}
397
399{
400 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
401 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
402
403 auto &fieldZero = *model.GetFieldZero();
404 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
405 fieldZero.SetOnDiskId(0);
407
408 RNTupleModelChangeset initialChangeset{model};
409 for (auto f : fieldZero.GetSubFields())
410 initialChangeset.fAddedFields.emplace_back(f);
411 for (auto f : model.GetProjectedFields().GetFieldZero()->GetSubFields())
412 initialChangeset.fAddedProjectedFields.emplace_back(f);
413 UpdateSchema(initialChangeset, 0U);
414
415 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(nullptr, descriptor);
416 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
417 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(buffer.get(), descriptor);
418 CreateImpl(model, buffer.get(), fSerializationContext.GetHeaderSize());
419
420 fDescriptorBuilder.BeginHeaderExtension();
421}
422
424{
425 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
426
428 pageInfo.fNElements = page.GetNElements();
429 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
430 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
431}
432
434 ROOT::Experimental::DescriptorId_t physicalColumnId,
436{
437 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.fNElements;
438
440 pageInfo.fNElements = sealedPage.fNElements;
441 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
442 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
443}
444
445std::vector<ROOT::Experimental::RNTupleLocator>
446ROOT::Experimental::Detail::RPageSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
447{
448 std::vector<ROOT::Experimental::RNTupleLocator> locators;
449 for (auto &range : ranges) {
450 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
451 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
452 }
453 return locators;
454}
455
456void ROOT::Experimental::Detail::RPageSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
457{
458 auto locators = CommitSealedPageVImpl(ranges);
459 unsigned i = 0;
460
461 for (auto &range : ranges) {
462 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
463 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->fNElements;
464
466 pageInfo.fNElements = sealedPageIt->fNElements;
467 pageInfo.fLocator = locators[i++];
468 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
469 }
470 }
471}
472
474{
475 auto nbytes = CommitClusterImpl(nEntries);
476
477 R__ASSERT((nEntries - fPrevClusterNEntries) < ClusterSize_t(-1));
478 auto nEntriesInCluster = ClusterSize_t(nEntries - fPrevClusterNEntries);
479 RClusterDescriptorBuilder clusterBuilder(fDescriptorBuilder.GetDescriptor().GetNClusters(), fPrevClusterNEntries,
480 nEntriesInCluster);
481 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
483 fullRange.fPhysicalColumnId = i;
484 std::swap(fullRange, fOpenPageRanges[i]);
485 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
486 fOpenColumnRanges[i].fCompressionSettings, fullRange);
487 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
488 fOpenColumnRanges[i].fNElements = 0;
489 }
490 fDescriptorBuilder.AddClusterWithDetails(clusterBuilder.MoveDescriptor().Unwrap());
491 fPrevClusterNEntries = nEntries;
492 return nbytes;
493}
494
496{
497 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
498
499 const auto nClusters = descriptor.GetNClusters();
500 std::vector<DescriptorId_t> physClusterIDs;
501 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
502 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
503 }
504
505 auto szPageList =
506 Internal::RNTupleSerializer::SerializePageListV1(nullptr, descriptor, physClusterIDs, fSerializationContext);
507 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
508 Internal::RNTupleSerializer::SerializePageListV1(bufPageList.get(), descriptor, physClusterIDs,
509 fSerializationContext);
510
511 const auto clusterGroupId = descriptor.GetNClusterGroups();
512 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
514 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
515 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
516 cgBuilder.AddCluster(i);
517 }
518 fDescriptorBuilder.AddClusterGroup(std::move(cgBuilder));
519 fSerializationContext.MapClusterGroupId(clusterGroupId);
520
521 fNextClusterInGroup = nClusters;
522}
523
525{
526 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
527
528 auto szFooter = Internal::RNTupleSerializer::SerializeFooterV1(nullptr, descriptor, fSerializationContext);
529 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
530 Internal::RNTupleSerializer::SerializeFooterV1(bufFooter.get(), descriptor, fSerializationContext);
531
532 CommitDatasetImpl(bufFooter.get(), szFooter);
533}
534
537 const RColumnElementBase &element, int compressionSetting, void *buf)
538{
539 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(page.GetBuffer());
540 bool isAdoptedBuffer = true;
541 auto packedBytes = page.GetNBytes();
542
543 if (!element.IsMappable()) {
544 packedBytes = element.GetPackedSize(page.GetNElements());
545 pageBuf = new unsigned char[packedBytes];
546 isAdoptedBuffer = false;
547 element.Pack(pageBuf, page.GetBuffer(), page.GetNElements());
548 }
549 auto zippedBytes = packedBytes;
550
551 if ((compressionSetting != 0) || !element.IsMappable()) {
552 zippedBytes = RNTupleCompressor::Zip(pageBuf, packedBytes, compressionSetting, buf);
553 if (!isAdoptedBuffer)
554 delete[] pageBuf;
555 pageBuf = reinterpret_cast<unsigned char *>(buf);
556 isAdoptedBuffer = true;
557 }
558
559 R__ASSERT(isAdoptedBuffer);
560
561 return RSealedPage{pageBuf, static_cast<std::uint32_t>(zippedBytes), page.GetNElements()};
562}
563
566 const RPage &page, const RColumnElementBase &element, int compressionSetting)
567{
568 R__ASSERT(fCompressor);
569 return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
570}
571
573{
574 fMetrics = RNTupleMetrics(prefix);
575 fCounters = std::unique_ptr<RCounters>(new RCounters{
576 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPageCommitted", "", "number of pages committed to storage"),
577 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szWritePayload", "B", "volume written for committed pages"),
578 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szZip", "B", "volume before zipping"),
579 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallWrite", "ns", "wall clock time spent writing"),
580 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallZip", "ns", "wall clock time spent compressing"),
581 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*>("timeCpuWrite", "ns", "CPU time spent writing"),
582 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*> ("timeCpuZip", "ns",
583 "CPU time spent compressing")
584 });
585}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:303
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:155
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:157
A column element encapsulates the translation between basic C++ types and their column representation...
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetPackedSize(std::size_t nElements=1U) const
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
const RColumnModel & GetModel() const
Definition RColumn.hxx:323
std::uint32_t GetIndex() const
Definition RColumn.hxx:324
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:326
NTupleSize_t GetNElements() const
Definition RColumn.hxx:321
A field translates read and write calls from/to underlying columns to/from tree values.
Definition RField.hxx:83
void SetOnDiskId(DescriptorId_t id)
Definition RField.cxx:622
DescriptorId_t GetOnDiskId() const
Definition RField.hxx:610
std::vector< RFieldBase * > GetSubFields() const
Definition RField.cxx:598
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:105
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:122
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:43
Uses standard C++ memory allocation for the column data pages.
void CommitDataset()
Finalize the current cluster and the entrire data set.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitPage(ColumnHandle_t columnHandle, const RPage &page)
Write a page to the storage. The column must have been added before.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)
Incorporate incremental changes to the model into the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the file name (location)
std::uint64_t CommitCluster(NTupleSize_t nEntries)
Finalize the current cluster and create a new one for the following data.
void CommitClusterGroup()
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)
Write a preprocessed page to storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
Common functionality of an ntuple storage for both reading and writing.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:42
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:84
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:135
std::uint32_t GetNElements() const
Definition RPage.hxx:86
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & PageListLength(std::uint32_t pageListLength)
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
static RFieldDescriptorBuilder FromField(const Detail::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
const Detail::RFieldBase * GetSourceField(const Detail::RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
RFieldZero * GetFieldZero() const
const RProjectedFields & GetProjectedFields() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
RClusterSize ClusterSize_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:159
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:53
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Records the parition of data into pages for a particular column in a particular cluster.