Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RField.hxx>
22#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RPagePool.hxx>
24#include <ROOT/RPageSinkBuf.hxx>
26#include <memory>
27#include <string_view>
28#ifdef R__ENABLE_DAOS
30#endif
31
32#include <Compression.h>
33#include <TError.h>
34
35#include <utility>
36
37ROOT::Experimental::Internal::RPageStorage::RPageStorage(std::string_view name) : fMetrics(""), fNTupleName(name) {}
38
40
41//------------------------------------------------------------------------------
42
44{
45 for (unsigned i = 0; i < fIDs.size(); ++i) {
46 if (fIDs[i] == physicalColumnID) {
47 fRefCounters[i]++;
48 return;
49 }
50 }
51 fIDs.emplace_back(physicalColumnID);
52 fRefCounters.emplace_back(1);
53}
54
56{
57 for (unsigned i = 0; i < fIDs.size(); ++i) {
58 if (fIDs[i] == physicalColumnID) {
59 if (--fRefCounters[i] == 0) {
60 fIDs.erase(fIDs.begin() + i);
61 fRefCounters.erase(fRefCounters.begin() + i);
62 }
63 return;
64 }
65 }
66}
67
70{
72 for (const auto &id : fIDs)
73 result.insert(id);
74 return result;
75}
76
78{
79 if (fFirstEntry == kInvalidNTupleIndex) {
80 /// Entry range unset, we assume that the entry range covers the complete source
81 return true;
82 }
83
84 if (clusterDesc.GetNEntries() == 0)
85 return true;
86 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
87 return false;
88 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
89 return false;
90 return true;
91}
92
94 : RPageStorage(name), fOptions(options)
95{
96}
97
99
100std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
101ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
102 const RNTupleReadOptions &options)
103{
104 if (ntupleName.empty()) {
105 throw RException(R__FAIL("empty RNTuple name"));
106 }
107 if (location.empty()) {
108 throw RException(R__FAIL("empty storage location"));
109 }
110 if (location.find("daos://") == 0)
111#ifdef R__ENABLE_DAOS
112 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
113#else
114 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
115#endif
116
117 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
118}
119
122{
124 auto physicalId = GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex());
125 R__ASSERT(physicalId != kInvalidDescriptorId);
126 fActivePhysicalColumns.Insert(physicalId);
127 return ColumnHandle_t{physicalId, &column};
128}
129
131{
132 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId);
133}
134
136{
137 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
138 throw RException(R__FAIL("invalid entry range"));
139 }
140 fEntryRange = range;
141}
142
144{
145 return GetSharedDescriptorGuard()->GetNEntries();
146}
147
149{
150 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
151}
152
154{
155 // TODO(jblomer) distinguish trees
156 return columnHandle.fPhysicalId;
157}
158
160{
161 if (fTaskScheduler)
162 UnzipClusterImpl(cluster);
163}
164
166 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
167 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
168{
169 auto descriptorGuard = GetSharedDescriptorGuard();
170 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
171
172 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
173 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
174 NTupleSize_t pageNo = 0;
175 for (const auto &pageInfo : pageRange.fPageInfos) {
176 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
177 pageZeroMap.Register(
178 ROnDiskPage::Key{physicalColumnId, pageNo},
179 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.fBytesOnStorage));
180 } else {
181 perPageFunc(physicalColumnId, pageNo, pageInfo);
182 }
183 ++pageNo;
184 }
185 }
186}
187
189{
190 fMetrics = Detail::RNTupleMetrics(prefix);
191 fCounters = std::make_unique<RCounters>(RCounters{
192 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
193 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
194 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
195 "volume read from storage (required)"),
196 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
197 "volume read from storage (overhead)"),
198 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
199 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
200 "number of partial clusters preloaded from storage"),
201 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageLoaded", "", "number of pages loaded from storage"),
202 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPagePopulated", "", "number of populated pages"),
203 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
204 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
205 "wall clock time spent decompressing"),
206 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
207 "CPU time spent reading"),
208 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
209 "CPU time spent decompressing"),
210 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
211 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
212 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
213 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
214 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
215 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
216 if (auto walltime = timeWallRead->GetValueAsInt()) {
217 double payload = szReadPayload->GetValueAsInt();
218 double overhead = szReadOverhead->GetValueAsInt();
219 // unit: bytes / nanosecond = GB/s
220 return {true, (1000. * (payload + overhead) / walltime)};
221 }
222 }
223 }
224 }
225 return {false, -1.};
226 }),
227 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
228 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
229 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
230 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
231 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
232 if (auto walltime = timeWallRead->GetValueAsInt()) {
233 double unzip = szUnzip->GetValueAsInt();
234 // unit: bytes / nanosecond = GB/s
235 return {true, 1000. * unzip / walltime};
236 }
237 }
238 }
239 return {false, -1.};
240 }),
241 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
242 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
243 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
244 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
245 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
246 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
247 double unzip = szUnzip->GetValueAsInt();
248 // unit: bytes / nanosecond = GB/s
249 return {true, 1000. * unzip / walltime};
250 }
251 }
252 }
253 return {false, -1.};
254 }),
255 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
256 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
257 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
258 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
259 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
260 if (auto payload = szReadPayload->GetValueAsInt()) {
261 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
262 return {true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
263 }
264 }
265 }
266 return {false, -1.};
267 }),
268 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
269 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
270 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
271 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
272 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
273 if (auto unzip = szUnzip->GetValueAsInt()) {
274 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
275 }
276 }
277 }
278 return {false, -1.};
279 })});
280}
281
284 DescriptorId_t physicalColumnId)
285{
286 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
287 // large enough to hold `sealedPage.fNElements`
288 if (sealedPage.fBuffer == RPage::GetPageZeroBuffer()) {
289 auto page = RPage::MakePageZero(physicalColumnId, element.GetSize());
290 page.GrowUnchecked(sealedPage.fNElements);
291 return page;
292 }
293
294 const auto bytesPacked = element.GetPackedSize(sealedPage.fNElements);
295 using Allocator_t = RPageAllocatorHeap;
296 auto page = Allocator_t::NewPage(physicalColumnId, element.GetSize(), sealedPage.fNElements);
297 if (sealedPage.fSize != bytesPacked) {
298 fDecompressor->Unzip(sealedPage.fBuffer, sealedPage.fSize, bytesPacked, page.GetBuffer());
299 } else {
300 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
301 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
302 // Note that usually pages are compressed.
303 memcpy(page.GetBuffer(), sealedPage.fBuffer, bytesPacked);
304 }
305
306 if (!element.IsMappable()) {
307 auto tmp = Allocator_t::NewPage(physicalColumnId, element.GetSize(), sealedPage.fNElements);
308 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.fNElements);
309 Allocator_t::DeletePage(page);
310 page = tmp;
311 }
312
313 page.GrowUnchecked(sealedPage.fNElements);
314 return page;
315}
316
317//------------------------------------------------------------------------------
318
320 : RPageStorage(name), fOptions(options.Clone())
321{
322}
323
325
328 int compressionSetting, void *buf, bool allowAlias)
329{
330 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(page.GetBuffer());
331 bool isAdoptedBuffer = true;
332 auto packedBytes = page.GetNBytes();
333
334 if (!element.IsMappable()) {
335 packedBytes = element.GetPackedSize(page.GetNElements());
336 pageBuf = new unsigned char[packedBytes];
337 isAdoptedBuffer = false;
338 element.Pack(pageBuf, page.GetBuffer(), page.GetNElements());
339 }
340 auto zippedBytes = packedBytes;
341
342 if ((compressionSetting != 0) || !element.IsMappable() || !allowAlias) {
343 zippedBytes = RNTupleCompressor::Zip(pageBuf, packedBytes, compressionSetting, buf);
344 if (!isAdoptedBuffer)
345 delete[] pageBuf;
346 pageBuf = reinterpret_cast<unsigned char *>(buf);
347 isAdoptedBuffer = true;
348 }
349
350 R__ASSERT(isAdoptedBuffer);
351
352 return RSealedPage{pageBuf, static_cast<std::uint32_t>(zippedBytes), page.GetNElements()};
353}
354
357 int compressionSetting)
358{
359 R__ASSERT(fCompressor);
360 return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
361}
362
363//------------------------------------------------------------------------------
364
365std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
366ROOT::Experimental::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
367 const RNTupleWriteOptions &options)
368{
369 if (ntupleName.empty()) {
370 throw RException(R__FAIL("empty RNTuple name"));
371 }
372 if (location.empty()) {
373 throw RException(R__FAIL("empty storage location"));
374 }
375 if (location.find("daos://") == 0) {
376#ifdef R__ENABLE_DAOS
377 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
378#else
379 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
380#endif
381 }
382
383 // Otherwise assume that the user wants us to create a file.
384 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
385}
386
388 const RNTupleWriteOptions &options)
389 : RPageSink(name, options)
390{
391}
392
394
397{
398 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
399 fDescriptorBuilder.AddColumn(columnId, columnId, fieldId, column.GetModel(), column.GetIndex(),
400 column.GetFirstElementIndex());
401 return ColumnHandle_t{columnId, &column};
402}
403
405 NTupleSize_t firstEntry)
406{
407 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
408 auto addField = [&](RFieldBase &f) {
409 auto fieldId = descriptor.GetNFields();
410 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
411 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
412 f.SetOnDiskId(fieldId);
413 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
414 };
415 auto addProjectedField = [&](RFieldBase &f) {
416 auto fieldId = descriptor.GetNFields();
417 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
418 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
419 f.SetOnDiskId(fieldId);
420 auto sourceFieldId = changeset.fModel.GetProjectedFields().GetSourceField(&f)->GetOnDiskId();
421 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
422 auto targetId = descriptor.GetNLogicalColumns();
423 fDescriptorBuilder.AddColumn(targetId, source.GetLogicalId(), fieldId, source.GetModel(), source.GetIndex());
424 }
425 };
426
427 R__ASSERT(firstEntry >= fPrevClusterNEntries);
428 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
429 for (auto f : changeset.fAddedFields) {
430 addField(*f);
431 for (auto &descendant : *f)
432 addField(descendant);
433 }
434 for (auto f : changeset.fAddedProjectedFields) {
435 addProjectedField(*f);
436 for (auto &descendant : *f)
437 addProjectedField(descendant);
438 }
439
440 const auto nColumns = descriptor.GetNPhysicalColumns();
441 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
443 columnRange.fPhysicalColumnId = i;
444 // We set the first element index in the current cluster to the first element that is part of a materialized page
445 // (i.e., that is part of a page list). For deferred columns, however, the column range is fixed up as needed by
446 // `RClusterDescriptorBuilder::AddDeferredColumnRanges()` on read back.
447 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
448 columnRange.fNElements = 0;
449 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
450 fOpenColumnRanges.emplace_back(columnRange);
452 pageRange.fPhysicalColumnId = i;
453 fOpenPageRanges.emplace_back(std::move(pageRange));
454 }
455
456 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
457 // header was already serialized, this has to be done manually as it is required for page list serialization.
458 if (fSerializationContext.GetHeaderSize() > 0)
459 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
460}
461
463{
464 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
465 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
466
467 auto &fieldZero = model.GetFieldZero();
468 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
469 fieldZero.SetOnDiskId(0);
471
472 RNTupleModelChangeset initialChangeset{model};
473 for (auto f : fieldZero.GetSubFields())
474 initialChangeset.fAddedFields.emplace_back(f);
475 for (auto f : model.GetProjectedFields().GetFieldZero()->GetSubFields())
476 initialChangeset.fAddedProjectedFields.emplace_back(f);
477 UpdateSchema(initialChangeset, 0U);
478
479 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
480 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
481 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
482 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
483
484 fDescriptorBuilder.BeginHeaderExtension();
485}
486
488{
489 {
490 auto model = descriptor.CreateModel();
491 Init(*model.get());
492 }
493
494 auto clusterId = descriptor.FindClusterId(0, 0);
495
496 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
497 auto &cluster = descriptor.GetClusterDescriptor(clusterId);
498 auto nEntries = cluster.GetNEntries();
499
500 RClusterDescriptorBuilder clusterBuilder;
501 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
502 .FirstEntryIndex(fPrevClusterNEntries)
503 .NEntries(nEntries);
504
505 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
506 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
507 const auto &columnRange = cluster.GetColumnRange(i);
508 R__ASSERT(columnRange.fPhysicalColumnId == i);
509 const auto &pageRange = cluster.GetPageRange(i);
510 R__ASSERT(pageRange.fPhysicalColumnId == i);
511 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
512 pageRange);
513 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
514 }
515 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
516 fPrevClusterNEntries += nEntries;
517
518 clusterId = descriptor.FindNextClusterId(clusterId);
519 }
520}
521
523{
524 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
525
527 pageInfo.fNElements = page.GetNElements();
528 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
529 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
530}
531
533 const RPageStorage::RSealedPage &sealedPage)
534{
535 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.fNElements;
536
538 pageInfo.fNElements = sealedPage.fNElements;
539 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
540 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
541}
542
543std::vector<ROOT::Experimental::RNTupleLocator>
545 std::span<RPageStorage::RSealedPageGroup> ranges)
546{
547 std::vector<ROOT::Experimental::RNTupleLocator> locators;
548 for (auto &range : ranges) {
549 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
550 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
551 }
552 return locators;
553}
554
556 std::span<RPageStorage::RSealedPageGroup> ranges)
557{
558 auto locators = CommitSealedPageVImpl(ranges);
559 unsigned i = 0;
560
561 for (auto &range : ranges) {
562 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
563 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->fNElements;
564
566 pageInfo.fNElements = sealedPageIt->fNElements;
567 pageInfo.fLocator = locators[i++];
568 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
569 }
570 }
571}
572
573std::uint64_t
575{
576 auto nbytes = CommitClusterImpl();
577
578 RClusterDescriptorBuilder clusterBuilder;
579 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
580 .FirstEntryIndex(fPrevClusterNEntries)
581 .NEntries(nNewEntries);
582 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
584 fullRange.fPhysicalColumnId = i;
585 std::swap(fullRange, fOpenPageRanges[i]);
586 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
587 fOpenColumnRanges[i].fCompressionSettings, fullRange);
588 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
589 fOpenColumnRanges[i].fNElements = 0;
590 }
591 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
592 fPrevClusterNEntries += nNewEntries;
593 return nbytes;
594}
595
597{
598 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
599
600 const auto nClusters = descriptor.GetNActiveClusters();
601 std::vector<DescriptorId_t> physClusterIDs;
602 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
603 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
604 }
605
606 auto szPageList = RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext);
607 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
608 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
609
610 const auto clusterGroupId = descriptor.GetNClusterGroups();
611 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
613 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
614 if (fNextClusterInGroup == nClusters) {
615 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
616 } else {
617 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
618 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
619 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
620 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
621 firstClusterDesc.GetFirstEntryIndex())
622 .NClusters(nClusters - fNextClusterInGroup);
623 }
624 std::vector<DescriptorId_t> clusterIds;
625 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
626 clusterIds.emplace_back(i);
627 }
628 cgBuilder.AddClusters(clusterIds);
629 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
630 fSerializationContext.MapClusterGroupId(clusterGroupId);
631
632 fNextClusterInGroup = nClusters;
633}
634
636{
637 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
638
639 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext);
640 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
641 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
642
643 CommitDatasetImpl(bufFooter.get(), szFooter);
644}
645
647{
648 fMetrics = Detail::RNTupleMetrics(prefix);
649 fCounters = std::make_unique<RCounters>(RCounters{
650 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
651 "number of pages committed to storage"),
652 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
653 "volume written for committed pages"),
654 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
655 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
656 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
657 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
658 "CPU time spent writing"),
659 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
660 "CPU time spent compressing")});
661}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A column element encapsulates the translation between basic C++ types and their column representation...
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetPackedSize(std::size_t nElements=1U) const
const RColumnModel & GetModel() const
Definition RColumn.hxx:326
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:329
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:120
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
std::uint64_t CommitCluster(NTupleSize_t nEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitDataset() final
Finalize the current cluster and the entrire data set.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
Common functionality of an ntuple storage for both reading and writing.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:134
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:83
std::uint32_t GetNElements() const
Definition RPage.hxx:84
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:18
Records the parition of data into pages for a particular column in a particular cluster.
Meta-data for a set of ntuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
A field translates read and write calls from/to underlying columns to/from tree values.
Definition RField.hxx:95
std::vector< RFieldBase * > GetSubFields()
Definition RField.cxx:876
void SetOnDiskId(DescriptorId_t id)
Definition RField.cxx:911
DescriptorId_t GetOnDiskId() const
Definition RField.hxx:682
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::unique_ptr< RNTupleModel > CreateModel() const
Re-create the C++ model from the stored meta-data.
const RFieldBase * GetSourceField(const RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing and to set the on-disk f...
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
Definition RField.cxx:359
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.