Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2018-10-04
4
5/*************************************************************************
6 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include <ROOT/RPageStorage.hxx>
15#include <ROOT/RColumn.hxx>
16#include <ROOT/RFieldBase.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtils.hxx>
23#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <cstring>
38#include <functional>
39#include <memory>
40#include <string_view>
41#include <unordered_map>
42#include <utility>
43
53
55
59
65
70
72
74{
75 if (!fHasChecksum)
76 return;
77
78 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
79 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
80 std::uint64_t xxhash3;
81 RNTupleSerializer::SerializeXxHash3(charBuf, GetDataSize(), xxhash3, checksumBuf);
82}
83
85{
86 if (!fHasChecksum)
88
89 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
90 if (!success)
91 return R__FAIL("page checksum verification failed, data corruption detected");
93}
94
96{
97 if (!fHasChecksum)
98 return R__FAIL("invalid attempt to extract non-existing page checksum");
99
101 std::uint64_t checksum;
103 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
104 return checksum;
105}
106
107//------------------------------------------------------------------------------
108
111{
112 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
113 for (auto &columnInfo : itr->second) {
114 if (columnInfo.fElementId == elementId) {
115 columnInfo.fRefCounter++;
116 return;
117 }
118 }
119 itr->second.emplace_back(RColumnInfo{elementId, 1});
120}
121
124{
125 auto itr = fColumnInfos.find(physicalColumnId);
126 R__ASSERT(itr != fColumnInfos.end());
127 for (std::size_t i = 0; i < itr->second.size(); ++i) {
128 if (itr->second[i].fElementId != elementId)
129 continue;
130
131 itr->second[i].fRefCounter--;
132 if (itr->second[i].fRefCounter == 0) {
133 itr->second.erase(itr->second.begin() + i);
134 if (itr->second.empty()) {
135 fColumnInfos.erase(itr);
136 }
137 }
138 break;
139 }
140}
141
143{
145 for (const auto &[physicalColumnId, _] : fColumnInfos)
146 result.insert(physicalColumnId);
147 return result;
148}
149
151{
153 /// Entry range unset, we assume that the entry range covers the complete source
154 return true;
155 }
156
157 if (clusterDesc.GetNEntries() == 0)
158 return true;
159 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
160 return false;
161 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
162 return false;
163 return true;
164}
165
168 fOptions(options),
169 fClusterPool(*this, ROOT::Internal::RNTupleReadOptionsManip::GetClusterBunchSize(fOptions)),
170 fPagePool(*this)
171{
172}
173
175
176std::unique_ptr<ROOT::Internal::RPageSource>
177ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
178 const ROOT::RNTupleReadOptions &options)
179{
180 if (ntupleName.empty()) {
181 throw RException(R__FAIL("empty RNTuple name"));
182 }
183 if (location.empty()) {
184 throw RException(R__FAIL("empty storage location"));
185 }
186 if (location.find("daos://") == 0)
187#ifdef R__ENABLE_DAOS
188 return std::make_unique<ROOT::Experimental::Internal::RPageSourceDaos>(ntupleName, location, options);
189#else
190 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
191#endif
192
193 return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
194}
195
198{
200 auto physicalId =
201 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
203 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
204 return ColumnHandle_t{physicalId, &column};
205}
206
208{
209 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
210}
211
213{
214 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
215 throw RException(R__FAIL("invalid entry range"));
216 }
217 fEntryRange = range;
218}
219
226
234
235std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSource::Clone() const
236{
237 auto clone = CloneImpl();
238 if (fIsAttached) {
239 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
240 clone->fHasStructure = true;
241 clone->fIsAttached = true;
242 }
243 return clone;
244}
245
250
255
261
263{
264 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
265
266 const auto clusterId = cluster->GetId();
267 auto descriptorGuard = GetSharedDescriptorGuard();
268 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
269
270 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
271
272 std::atomic<bool> foundChecksumFailure{false};
273
274 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
275 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
276 for (const auto columnId : columnsInCluster) {
277 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
278 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
279 // of active columns.
280 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
281 continue;
282 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
283
284 allElements.reserve(allElements.size() + columnInfos.size());
285 for (const auto &info : columnInfos) {
286 allElements.emplace_back(GenerateColumnElement(info.fElementId));
287
288 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
289 std::uint64_t pageNo = 0;
290 std::uint64_t firstInPage = 0;
291 for (const auto &pi : pageRange.GetPageInfos()) {
292 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
293 RSealedPage sealedPage;
294 sealedPage.SetNElements(pi.GetNElements());
295 sealedPage.SetHasChecksum(pi.HasChecksum());
296 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
297 sealedPage.SetBuffer(onDiskPage->GetAddress());
298 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
299
300 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
301 &foundChecksumFailure,
302 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
303 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
304 auto rv = UnsealPage(sealedPage, *element);
305 if (!rv) {
306 foundChecksumFailure = true;
307 return;
308 }
309 auto newPage = rv.Unwrap();
310 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
311
312 newPage.SetWindow(indexOffset + firstInPage,
313 ROOT::Internal::RPage::RClusterInfo(clusterId, indexOffset));
314 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
315 };
316
317 fTaskScheduler->AddTask(taskFunc);
318
319 firstInPage += pi.GetNElements();
320 pageNo++;
321 } // for all pages in column
322
323 fCounters->fNPageUnsealed.Add(pageNo);
324 } // for all in-memory types of the column
325 } // for all columns in cluster
326
327 fTaskScheduler->Wait();
328
329 if (foundChecksumFailure) {
330 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
331 }
332}
333
335 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
337 perPageFunc)
338{
339 auto descriptorGuard = GetSharedDescriptorGuard();
340 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
341
342 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
343 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
344 continue;
345
346 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
347 ROOT::NTupleSize_t pageNo = 0;
348 for (const auto &pageInfo : pageRange.GetPageInfos()) {
349 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
350 pageZeroMap.Register(ROnDiskPage::Key{physicalColumnId, pageNo},
352 pageInfo.GetLocator().GetNBytesOnStorage()));
353 } else {
354 perPageFunc(physicalColumnId, pageNo, pageInfo);
355 }
356 ++pageNo;
357 }
358 }
359}
360
362{
363 if (fLastUsedCluster == clusterId)
364 return;
365
366 ROOT::NTupleSize_t firstEntryIndex =
367 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
368 auto itr = fPreloadedClusters.begin();
369 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
370 if (fPinnedClusters.count(itr->second) > 0) {
371 ++itr;
372 } else {
373 fPagePool.Evict(itr->second);
374 itr = fPreloadedClusters.erase(itr);
375 }
376 }
377 std::size_t poolWindow = 0;
378 while ((itr != fPreloadedClusters.end()) &&
380 ++itr;
381 ++poolWindow;
382 }
383 while (itr != fPreloadedClusters.end()) {
384 if (fPinnedClusters.count(itr->second) > 0) {
385 ++itr;
386 } else {
387 fPagePool.Evict(itr->second);
388 itr = fPreloadedClusters.erase(itr);
389 }
390 }
391
392 fLastUsedCluster = clusterId;
393}
394
397{
398 const auto columnId = columnHandle.fPhysicalId;
399 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
400 auto cachedPageRef =
401 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
402 if (!cachedPageRef.Get().IsNull()) {
403 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
404 return cachedPageRef;
405 }
406
407 std::uint64_t idxInCluster;
408 RClusterInfo clusterInfo;
409 {
410 auto descriptorGuard = GetSharedDescriptorGuard();
411 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
412
413 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
414 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
415
416 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
417 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
418 if (columnRange.IsSuppressed())
420
421 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
422 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
423 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
424 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
425 }
426
428 throw RException(R__FAIL("tried to read a page with an unknown locator"));
429
431 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
432}
433
436{
437 const auto clusterId = localIndex.GetClusterId();
438 const auto idxInCluster = localIndex.GetIndexInCluster();
439 const auto columnId = columnHandle.fPhysicalId;
440 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
441 auto cachedPageRef =
442 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
443 if (!cachedPageRef.Get().IsNull()) {
444 UpdateLastUsedCluster(clusterId);
445 return cachedPageRef;
446 }
447
448 if (clusterId == kInvalidDescriptorId)
449 throw RException(R__FAIL("entry out of bounds"));
450
451 RClusterInfo clusterInfo;
452 {
453 auto descriptorGuard = GetSharedDescriptorGuard();
454 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
455 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
456 if (columnRange.IsSuppressed())
458
459 clusterInfo.fClusterId = clusterId;
460 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
461 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
462 }
463
465 throw RException(R__FAIL("tried to read a page with an unknown locator"));
466
468 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
469}
470
472{
473 fMetrics = RNTupleMetrics(prefix);
474 fMetrics.ObserveMetrics(fClusterPool.GetMetrics());
475 fMetrics.ObserveMetrics(fPagePool.GetMetrics());
476 fCounters = std::make_unique<RCounters>(RCounters{
477 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
478 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
479 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadPayload", "B", "volume read from storage (required)"),
480 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadOverhead", "B", "volume read from storage (overhead)"),
481 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
482 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nClusterLoaded", "",
483 "number of partial clusters preloaded from storage"),
484 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
485 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageUnsealed", "", "number of pages unzipped and decoded"),
486 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
487 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
488 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuRead", "ns", "CPU time spent reading"),
489 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
490 "CPU time spent decompressing"),
491 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
492 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
493 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
494 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
495 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
496 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
497 if (auto walltime = timeWallRead->GetValueAsInt()) {
498 double payload = szReadPayload->GetValueAsInt();
499 double overhead = szReadOverhead->GetValueAsInt();
500 // unit: bytes / nanosecond = GB/s
501 return {true, (1000. * (payload + overhead) / walltime)};
502 }
503 }
504 }
505 }
506 return {false, -1.};
507 }),
508 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
509 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
510 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
511 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
512 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
513 if (auto walltime = timeWallRead->GetValueAsInt()) {
514 double unzip = szUnzip->GetValueAsInt();
515 // unit: bytes / nanosecond = GB/s
516 return {true, 1000. * unzip / walltime};
517 }
518 }
519 }
520 return {false, -1.};
521 }),
522 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
523 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
524 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
525 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
526 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
527 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
528 double unzip = szUnzip->GetValueAsInt();
529 // unit: bytes / nanosecond = GB/s
530 return {true, 1000. * unzip / walltime};
531 }
532 }
533 }
534 return {false, -1.};
535 }),
536 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
537 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
538 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
539 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
540 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
541 if (auto payload = szReadPayload->GetValueAsInt()) {
542 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
543 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
544 }
545 }
546 }
547 return {false, -1.};
548 }),
549 *fMetrics.MakeCounter<RNTupleCalcPerf *>("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
550 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
551 if (const auto szReadPayload =
552 metrics.GetLocalCounter("szReadPayload")) {
553 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
554 if (auto unzip = szUnzip->GetValueAsInt()) {
555 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
556 }
557 }
558 }
559 return {false, -1.};
560 })});
561}
562
565{
566 return UnsealPage(sealedPage, element, *fPageAllocator);
567}
568
570 const RColumnElementBase &element,
572{
573 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
574 // large enough to hold `sealedPage.fNElements`
576 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
577 page.GrowUnchecked(sealedPage.GetNElements());
578 memset(page.GetBuffer(), 0, page.GetNBytes());
579 return page;
580 }
581
582 auto rv = sealedPage.VerifyChecksumIfEnabled();
583 if (!rv)
584 return R__FORWARD_ERROR(rv);
585
586 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
587 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
588 if (sealedPage.GetDataSize() != bytesPacked) {
589 ROOT::Internal::RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), bytesPacked,
590 page.GetBuffer());
591 } else {
592 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
593 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
594 // Note that usually pages are compressed.
595 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
596 }
597
598 if (!element.IsMappable()) {
599 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
600 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
601 page = std::move(tmp);
602 }
603
604 page.GrowUnchecked(sealedPage.GetNElements());
605 return page;
606}
607
609{
611 return;
612
613 for (const auto &extraTypeInfo : fDescriptor.GetExtraTypeInfoIterable()) {
614 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
615 continue;
616 // We don't need the result, it's enough that during deserialization, BuildCheck() is called for every
617 // streamer info record.
618 RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap();
619 }
620
622}
623
624//------------------------------------------------------------------------------
625
627{
628 // Make the sort order unique by adding the physical on-disk column id as a secondary key
630 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
631 return fCurrentPageSize > other.fCurrentPageSize;
632}
633
634bool ROOT::Internal::RWritePageMemoryManager::TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
635{
636 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
637 return true;
638
639 auto itr = fColumnsSortedByPageSize.begin();
640 while (itr != fColumnsSortedByPageSize.end()) {
641 if (itr->fCurrentPageSize <= pageSizeLimit)
642 break;
643 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
644 ++itr;
645 continue;
646 }
647
648 // Flushing the current column will invalidate itr
649 auto itrFlush = itr++;
650
651 RColumnInfo next;
652 if (itr != fColumnsSortedByPageSize.end())
653 next = *itr;
654
655 itrFlush->fColumn->Flush();
656 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
657 return true;
658
659 if (next.fColumn == nullptr)
660 return false;
661 itr = fColumnsSortedByPageSize.find(next);
662 };
663
664 return false;
665}
666
667bool ROOT::Internal::RWritePageMemoryManager::TryUpdate(RColumn &column, std::size_t newWritePageSize)
668{
669 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
670 auto itr = fColumnsSortedByPageSize.find(key);
671 if (itr == fColumnsSortedByPageSize.end()) {
672 if (!TryEvict(newWritePageSize, 0))
673 return false;
674 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
675 fCurrentAllocatedBytes += newWritePageSize;
676 return true;
677 }
678
679 RColumnInfo elem{*itr};
680 assert(newWritePageSize >= elem.fInitialPageSize);
681
682 if (newWritePageSize == elem.fCurrentPageSize)
683 return true;
684
685 fColumnsSortedByPageSize.erase(itr);
686
687 if (newWritePageSize < elem.fCurrentPageSize) {
688 // Page got smaller
689 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
690 elem.fCurrentPageSize = newWritePageSize;
691 fColumnsSortedByPageSize.insert(elem);
692 return true;
693 }
694
695 // Page got larger, we may need to make space available
696 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
697 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
698 // Don't change anything, let the calling column flush itself
699 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
700 fColumnsSortedByPageSize.insert(elem);
701 return false;
702 }
703 fCurrentAllocatedBytes += diffBytes;
704 elem.fCurrentPageSize = newWritePageSize;
705 fColumnsSortedByPageSize.insert(elem);
706 return true;
707}
708
709//------------------------------------------------------------------------------
710
712 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
713{
715}
716
718
720{
721 assert(config.fPage);
722 assert(config.fElement);
723 assert(config.fBuffer);
724
725 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
726 bool isAdoptedBuffer = true;
727 auto nBytesPacked = config.fPage->GetNBytes();
728 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
729
730 if (!config.fElement->IsMappable()) {
731 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
732 pageBuf = new unsigned char[nBytesPacked];
733 isAdoptedBuffer = false;
734 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
735 }
736 auto nBytesZipped = nBytesPacked;
737
738 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
739 config.fWriteChecksum) {
740 nBytesZipped =
741 ROOT::Internal::RNTupleCompressor::Zip(pageBuf, nBytesPacked, config.fCompressionSettings, config.fBuffer);
742 if (!isAdoptedBuffer)
743 delete[] pageBuf;
744 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
745 isAdoptedBuffer = true;
746 }
747
748 R__ASSERT(isAdoptedBuffer);
749
750 RSealedPage sealedPage{pageBuf, nBytesZipped + nBytesChecksum, config.fPage->GetNElements(), config.fWriteChecksum};
751 sealedPage.ChecksumIfEnabled();
752
753 return sealedPage;
754}
755
758{
759 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
760 if (fSealPageBuffer.size() < nBytes)
761 fSealPageBuffer.resize(nBytes);
762
763 RSealPageConfig config;
764 config.fPage = &page;
765 config.fElement = &element;
766 config.fCompressionSettings = GetWriteOptions().GetCompression();
767 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
768 config.fAllowAlias = true;
769 config.fBuffer = fSealPageBuffer.data();
770
771 return SealPage(config);
772}
773
780
782{
783 R__ASSERT(nElements > 0);
784 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
785 const auto nBytes = elementSize * nElements;
786 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
787 return ROOT::Internal::RPage();
788 return fPageAllocator->NewPage(elementSize, nElements);
789}
790
791//------------------------------------------------------------------------------
792
793std::unique_ptr<ROOT::Internal::RPageSink>
794ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
795 const ROOT::RNTupleWriteOptions &options)
796{
797 if (ntupleName.empty()) {
798 throw RException(R__FAIL("empty RNTuple name"));
799 }
800 if (location.empty()) {
801 throw RException(R__FAIL("empty storage location"));
802 }
803 if (location.find("daos://") == 0) {
804#ifdef R__ENABLE_DAOS
805 return std::make_unique<ROOT::Experimental::Internal::RPageSinkDaos>(ntupleName, location, options);
806#else
807 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
808#endif
809 }
810
811 // Otherwise assume that the user wants us to create a file.
812 return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
813}
814
816 const ROOT::RNTupleWriteOptions &options)
817 : RPageSink(name, options)
818{
819}
820
822
825{
826 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
827 RColumnDescriptorBuilder columnBuilder;
828 columnBuilder.LogicalColumnId(columnId)
829 .PhysicalColumnId(columnId)
830 .FieldId(fieldId)
832 .ValueRange(column.GetValueRange())
833 .Type(column.GetType())
834 .Index(column.GetIndex())
837 // For late model extension, we assume that the primary column representation is the active one for the
838 // deferred range. All other representations are suppressed.
839 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
840 columnBuilder.SetSuppressedDeferred();
841 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
842 return ColumnHandle_t{columnId, &column};
843}
844
846 ROOT::NTupleSize_t firstEntry)
847{
848 if (fIsInitialized) {
849 for (const auto &field : changeset.fAddedFields) {
850 if (field->GetStructure() == ENTupleStructure::kStreamer) {
851 throw ROOT::RException(R__FAIL("a Model cannot be extended with Streamer fields"));
852 }
853 }
854 }
855
856 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
857
858 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
859 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
860 // of the changeset follow immediately the already existing physical columns
861 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
862 const auto &reps = f.GetColumnRepresentatives();
863 if (reps.empty())
864 return 0;
865 return reps.size() * reps[0].size();
866 };
867 std::uint32_t nNewPhysicalColumns = 0;
868 for (auto f : changeset.fAddedFields) {
869 nNewPhysicalColumns += getNColumns(*f);
870 for (const auto &descendant : *f)
871 nNewPhysicalColumns += getNColumns(descendant);
872 }
873 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
874 }
875
876 auto addField = [&](ROOT::RFieldBase &f) {
877 auto fieldId = descriptor.GetNFields();
878 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
879 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
880 f.SetOnDiskId(fieldId);
881 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
882 };
883 auto addProjectedField = [&](ROOT::RFieldBase &f) {
884 auto fieldId = descriptor.GetNFields();
885 auto sourceFieldId =
887 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
888 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
889 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
890 f.SetOnDiskId(fieldId);
891 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
892 auto targetId = descriptor.GetNLogicalColumns();
893 RColumnDescriptorBuilder columnBuilder;
894 columnBuilder.LogicalColumnId(targetId)
895 .PhysicalColumnId(source.GetLogicalId())
896 .FieldId(fieldId)
897 .BitsOnStorage(source.GetBitsOnStorage())
898 .ValueRange(source.GetValueRange())
899 .Type(source.GetType())
900 .Index(source.GetIndex())
901 .RepresentationIndex(source.GetRepresentationIndex());
902 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
903 }
904 };
905
906 R__ASSERT(firstEntry >= fPrevClusterNEntries);
907 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
908 for (auto f : changeset.fAddedFields) {
909 addField(*f);
910 for (auto &descendant : *f)
911 addField(descendant);
912 }
913 for (auto f : changeset.fAddedProjectedFields) {
914 addProjectedField(*f);
915 for (auto &descendant : *f)
916 addProjectedField(descendant);
917 }
918
919 const auto nColumns = descriptor.GetNPhysicalColumns();
920 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
921 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
922 for (ROOT::DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
924 columnRange.SetPhysicalColumnId(i);
925 // We set the first element index in the current cluster to the first element that is part of a materialized page
926 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
927 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
928 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
929 columnRange.SetNElements(0);
930 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
931 fOpenColumnRanges.emplace_back(columnRange);
933 pageRange.SetPhysicalColumnId(i);
934 fOpenPageRanges.emplace_back(std::move(pageRange));
935 }
936
937 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
938 // header was already serialized, this has to be done manually as it is required for page list serialization.
939 if (fSerializationContext.GetHeaderSize() > 0)
940 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
941}
942
944{
945 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
946 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
947
949}
950
952{
953 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
954 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
955
956 auto &fieldZero = ROOT::Internal::GetFieldZeroOfModel(model);
957 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
958 fieldZero.SetOnDiskId(0);
959 auto &projectedFields = ROOT::Internal::GetProjectedFieldsOfModel(model);
960 projectedFields.GetFieldZero().SetOnDiskId(0);
961
962 ROOT::Internal::RNTupleModelChangeset initialChangeset{model};
963 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
964 for (auto f : fieldZero.GetMutableSubfields())
965 initialChangeset.fAddedFields.emplace_back(f);
966 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
967 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
968 initialChangeset.fAddedProjectedFields.emplace_back(f);
969 UpdateSchema(initialChangeset, 0U);
970
971 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
972 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
973 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
974 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
975
976 fDescriptorBuilder.BeginHeaderExtension();
977}
978
979std::unique_ptr<ROOT::RNTupleModel>
981{
982 // Create new descriptor
983 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
984 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
985
986 // Create column/page ranges
987 const auto nColumns = descriptor.GetNPhysicalColumns();
988 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
989 fOpenColumnRanges.reserve(nColumns);
990 fOpenPageRanges.reserve(nColumns);
991 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
992 const auto &column = descriptor.GetColumnDescriptor(i);
994 columnRange.SetPhysicalColumnId(i);
995 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
996 columnRange.SetNElements(0);
997 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
998 fOpenColumnRanges.emplace_back(columnRange);
1000 pageRange.SetPhysicalColumnId(i);
1001 fOpenPageRanges.emplace_back(std::move(pageRange));
1002 }
1003
1004 if (copyClusters) {
1005 // Clone and add all cluster descriptors
1006 auto clusterId = srcDescriptor.FindClusterId(0, 0);
1007 while (clusterId != ROOT::kInvalidDescriptorId) {
1008 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
1009 auto nEntries = cluster.GetNEntries();
1010 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1011 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
1012 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
1013 break;
1014 const auto &columnRange = cluster.GetColumnRange(i);
1015 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
1016 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
1017 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
1018 }
1019 fDescriptorBuilder.AddCluster(cluster.Clone());
1020 fPrevClusterNEntries += nEntries;
1021
1022 clusterId = srcDescriptor.FindNextClusterId(clusterId);
1023 }
1024 }
1025
1026 // Create model
1028 modelOpts.SetReconstructProjections(true);
1029 // We want to emulate unknown types to allow merging RNTuples containing types that we lack dictionaries for.
1030 modelOpts.SetEmulateUnknownTypes(true);
1031 auto model = descriptor.CreateModel(modelOpts);
1032 if (!copyClusters) {
1033 auto &projectedFields = ROOT::Internal::GetProjectedFieldsOfModel(*model);
1034 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
1035 }
1036
1037 // Serialize header and init from it
1038 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
1039 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
1040 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
1041 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1042
1043 fDescriptorBuilder.BeginHeaderExtension();
1044
1045 // mark this sink as initialized
1046 fIsInitialized = true;
1047
1048 return model;
1049}
1050
1052{
1053 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1054}
1055
1057{
1058 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1059
1061 pageInfo.SetNElements(page.GetNElements());
1062 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1063 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1064 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1065}
1066
1068 const RPageStorage::RSealedPage &sealedPage)
1069{
1070 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1071
1073 pageInfo.SetNElements(sealedPage.GetNElements());
1074 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1075 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1076 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1077}
1078
1079std::vector<ROOT::RNTupleLocator>
1080ROOT::Internal::RPagePersistentSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
1081 const std::vector<bool> &mask)
1082{
1083 std::vector<ROOT::RNTupleLocator> locators;
1084 locators.reserve(mask.size());
1085 std::size_t i = 0;
1086 for (auto &range : ranges) {
1087 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1088 if (mask[i++])
1089 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1090 }
1091 }
1092 locators.shrink_to_fit();
1093 return locators;
1094}
1095
1096void ROOT::Internal::RPagePersistentSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
1097{
1098 /// Used in the `originalPages` map
1099 struct RSealedPageLink {
1100 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1101 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1102 };
1103
1104 std::vector<bool> mask;
1105 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1106 std::vector<std::size_t> locatorIndexes;
1107 // Maps page checksums to the first sealed page with that checksum
1108 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1109 std::size_t iLocator = 0;
1110 for (auto &range : ranges) {
1111 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1112 mask.reserve(mask.size() + rangeSize);
1113 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1114
1115 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1116 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1117 mask.emplace_back(true);
1118 locatorIndexes.emplace_back(iLocator++);
1119 continue;
1120 }
1121 // Same page merging requires page checksums - this is checked in the write options
1122 R__ASSERT(sealedPageIt->GetHasChecksum());
1123
1124 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1125 auto itr = originalPages.find(chk);
1126 if (itr == originalPages.end()) {
1127 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1128 mask.emplace_back(true);
1129 locatorIndexes.emplace_back(iLocator++);
1130 continue;
1131 }
1132
1133 const auto *p = itr->second.fSealedPage;
1134 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1135 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1136 mask.emplace_back(true);
1137 locatorIndexes.emplace_back(iLocator++);
1138 continue;
1139 }
1140
1141 mask.emplace_back(false);
1142 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1143 }
1144
1145 mask.shrink_to_fit();
1146 locatorIndexes.shrink_to_fit();
1147 }
1148
1149 auto locators = CommitSealedPageVImpl(ranges, mask);
1150 unsigned i = 0;
1151
1152 for (auto &range : ranges) {
1153 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1154 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1155
1157 pageInfo.SetNElements(sealedPageIt->GetNElements());
1158 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1159 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1160 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1161 }
1162 }
1163}
1164
1167{
1168 RStagedCluster stagedCluster;
1169 stagedCluster.fNBytesWritten = StageClusterImpl();
1170 stagedCluster.fNEntries = nNewEntries;
1171
1172 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1173 RStagedCluster::RColumnInfo columnInfo;
1174 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1175 if (fOpenColumnRanges[i].IsSuppressed()) {
1176 assert(fOpenPageRanges[i].GetPageInfos().empty());
1177 columnInfo.fPageRange.SetPhysicalColumnId(i);
1178 columnInfo.fIsSuppressed = true;
1179 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1180 fOpenColumnRanges[i].SetNElements(0);
1181 fOpenColumnRanges[i].SetIsSuppressed(false);
1182 } else {
1183 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1184 fOpenPageRanges[i].SetPhysicalColumnId(i);
1185
1186 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1187 fOpenColumnRanges[i].SetNElements(0);
1188 }
1189 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1190 }
1191
1192 return stagedCluster;
1193}
1194
1196{
1197 for (const auto &cluster : clusters) {
1198 RClusterDescriptorBuilder clusterBuilder;
1199 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1201 .NEntries(cluster.fNEntries);
1202 for (const auto &columnInfo : cluster.fColumnInfos) {
1203 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1204 if (columnInfo.fIsSuppressed) {
1205 assert(columnInfo.fPageRange.GetPageInfos().empty());
1206 clusterBuilder.MarkSuppressedColumnRange(colId);
1207 } else {
1208 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1209 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1210 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1211 }
1212 }
1213
1214 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1215 for (const auto &columnInfo : cluster.fColumnInfos) {
1216 if (!columnInfo.fIsSuppressed)
1217 continue;
1218 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1219 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1220 // cluster. This information has been determined for the committed cluster descriptor through
1221 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1222 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1223 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1224 columnRangeFromDesc.GetNElements());
1225 }
1226
1227 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1228 fPrevClusterNEntries += cluster.fNEntries;
1229 }
1230}
1231
1233{
1234 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1235
1236 const auto nClusters = descriptor.GetNActiveClusters();
1237 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1238 physClusterIDs.reserve(nClusters);
1239 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1240 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1241 }
1242
1243 auto szPageList =
1244 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1245 auto bufPageList = MakeUninitArray<unsigned char>(szPageList);
1246 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
1247
1248 const auto clusterGroupId = descriptor.GetNClusterGroups();
1249 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1251 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1252 if (fNextClusterInGroup == nClusters) {
1253 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1254 } else {
1255 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1256 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1257 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1258 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1259 firstClusterDesc.GetFirstEntryIndex())
1260 .NClusters(nClusters - fNextClusterInGroup);
1261 }
1262 std::vector<ROOT::DescriptorId_t> clusterIds;
1263 clusterIds.reserve(nClusters);
1264 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1265 clusterIds.emplace_back(i);
1266 }
1267 cgBuilder.AddSortedClusters(clusterIds);
1268 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1269 fSerializationContext.MapClusterGroupId(clusterGroupId);
1270
1271 fNextClusterInGroup = nClusters;
1272}
1273
1275 const RNTupleLink &attrAnchorInfo)
1276{
1278
1279 RNTupleAttrSetDescriptorBuilder attrSetDescBuilder;
1280 auto attrSetDesc = attrSetDescBuilder.SchemaVersion(kSchemaVersionMajor, kSchemaVersionMinor)
1281 .AnchorLength(attrAnchorInfo.fLength)
1282 .AnchorLocator(attrAnchorInfo.fLocator)
1283 .Name(attrSetName)
1285 .Unwrap();
1286 fDescriptorBuilder.AddAttributeSet(std::move(attrSetDesc)).ThrowOnError();
1287}
1288
1290{
1291 if (!fInfosOfStreamerFields.empty()) {
1292 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1293 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1294 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1295 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1296 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1297 // should be zero.
1298 R__ASSERT(etDesc.GetTypeName().empty());
1299 R__ASSERT(etDesc.GetTypeVersion() == 0);
1300 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1301 fInfosOfStreamerFields.merge(etInfo);
1302 }
1303 }
1304
1305 RExtraTypeInfoDescriptorBuilder extraInfoBuilder;
1308 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1309 }
1310
1311 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1312
1313 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1314 auto bufFooter = MakeUninitArray<unsigned char>(szFooter);
1315 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1316
1317 return CommitDatasetImpl(bufFooter.get(), szFooter);
1318}
1319
1321{
1322 fMetrics = RNTupleMetrics(prefix);
1323 fCounters = std::make_unique<RCounters>(RCounters{
1324 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageCommitted", "", "number of pages committed to storage"),
1325 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szWritePayload", "B", "volume written for committed pages"),
1326 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1327 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1328 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1329 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuWrite", "ns", "CPU time spent writing"),
1330 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1331 "CPU time spent compressing")});
1332}
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking).
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
char name[80]
Definition TGX11.cxx:148
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
RNTupleAttrSetDescriptorBuilder & AnchorLocator(const RNTupleLocator &loc)
RNTupleAttrSetDescriptorBuilder & SchemaVersion(std::uint16_t major, std::uint16_t minor)
RResult< ROOT::Experimental::RNTupleAttrSetDescriptor > MoveDescriptor()
Attempt to make an AttributeSet descriptor.
RNTupleAttrSetDescriptorBuilder & Name(std::string_view name)
RNTupleAttrSetDescriptorBuilder & AnchorLength(std::uint32_t length)
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< void > MarkSuppressedColumnRange(ROOT::DescriptorId_t physicalId)
Books the given column ID as being suppressed in this cluster.
RResult< void > CommitColumnRange(ROOT::DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RResult< void > CommitSuppressedColumnRanges(const RNTupleDescriptor &desc)
Sets the first element index and number of elements for all the suppressed column ranges.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
const RClusterDescriptor::RColumnRange & GetColumnRange(ROOT::DescriptorId_t physicalId)
RClusterDescriptorBuilder & ClusterId(ROOT::DescriptorId_t clusterId)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
void AddSortedClusters(const std::vector< ROOT::DescriptorId_t > &clusterIds)
RResult< RClusterGroupDescriptor > MoveDescriptor()
RClusterGroupDescriptorBuilder & ClusterGroupId(ROOT::DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:149
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:191
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:30
ROOT::DescriptorId_t GetId() const
Definition RCluster.hxx:190
A helper class for piece-wise construction of an RColumnDescriptor.
RColumnDescriptorBuilder & SetSuppressedDeferred()
RColumnDescriptorBuilder & LogicalColumnId(ROOT::DescriptorId_t logicalColumnId)
RResult< RColumnDescriptor > MakeDescriptor() const
Attempt to make a column descriptor.
RColumnDescriptorBuilder & FieldId(ROOT::DescriptorId_t fieldId)
RColumnDescriptorBuilder & BitsOnStorage(std::uint16_t bitsOnStorage)
RColumnDescriptorBuilder & ValueRange(double min, double max)
RColumnDescriptorBuilder & Type(ROOT::ENTupleColumnType type)
RColumnDescriptorBuilder & PhysicalColumnId(ROOT::DescriptorId_t physicalColumnId)
RColumnDescriptorBuilder & FirstElementIndex(std::uint64_t firstElementIdx)
RColumnDescriptorBuilder & Index(std::uint32_t index)
RColumnDescriptorBuilder & RepresentationIndex(std::uint16_t representationIndex)
A column element encapsulates the translation between basic C++ types and their column representation...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Unpack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
virtual RIdentifier GetIdentifier() const =0
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:37
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:345
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:351
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:338
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:339
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:353
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:360
ROOT::DescriptorId_t GetOnDiskId() const
Definition RColumn.hxx:352
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:340
std::uint32_t GetIndex() const
Definition RColumn.hxx:350
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
RResult< RExtraTypeInfoDescriptor > MoveDescriptor()
RExtraTypeInfoDescriptorBuilder & ContentId(EExtraTypeInfoIds contentId)
RExtraTypeInfoDescriptorBuilder & Content(const std::string &content)
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:98
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:115
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:40
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) override
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
std::uint64_t fNextClusterInGroup
Remembers the starting cluster id for the next cluster group.
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final
Adds the given anchor information (name + locator) into the main RNTuple's descriptor as an attribute...
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
ROOT::Internal::RNTupleSerializer::RContext fSerializationContext
Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization.
ROOT::NTupleSize_t fPrevClusterNEntries
Used to calculate the number of entries in the current cluster.
virtual RNTupleLink CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)=0
virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length)=0
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
std::vector< ROOT::RClusterDescriptor::RPageRange > fOpenPageRanges
Keeps track of the written pages in the currently open cluster. Indexed by column id.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
std::vector< ROOT::RClusterDescriptor::RColumnRange > fOpenColumnRanges
Keeps track of the number of elements in the currently open cluster. Indexed by column id.
std::unique_ptr< RCounters > fCounters
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)=0
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
ROOT::Internal::RNTupleSerializer::StreamerInfoMap_t fInfosOfStreamerFields
Union of the streamer info records that are sent from streamer fields to the sink before committing t...
virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page)=0
virtual std::uint64_t StageClusterImpl()=0
Returns the number of bytes written to storage (excluding metadata).
Reference to a page stored in the page pool.
virtual RNTupleLink CommitDatasetImpl()=0
RNTupleLink CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
std::vector< unsigned char > fSealPageBuffer
Used as destination buffer in the simple SealPage overload.
std::unique_ptr< ROOT::RNTupleWriteOptions > fOptions
std::vector< Callback_t > fOnDatasetCommitCallbacks
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
bool fIsInitialized
Flag if sink was initialized.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
RWritePageMemoryManager fWritePageMemoryManager
Used in ReservePage to maintain the page buffer budget.
std::unordered_map< ROOT::DescriptorId_t, std::vector< RColumnInfo > > fColumnInfos
Maps physical column IDs to all the requested in-memory representations.
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RClusterPool fClusterPool
The cluster pool asynchronously preloads the next few clusters.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
ROOT::DescriptorId_t fLastUsedCluster
Remembers the last cluster id from which a page was requested.
RExclDescriptorGuard GetExclDescriptorGuard()
Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further informatio...
ROOT::RNTupleReadOptions fOptions
void RegisterStreamerInfos()
Builds the streamer info records from the descriptor's extra type info section.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
void UnzipCluster(ROOT::Internal::RCluster *cluster)
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
std::map< ROOT::NTupleSize_t, ROOT::DescriptorId_t > fPreloadedClusters
Clusters from where pages got preloaded in UnzipClusterImpl(), ordered by first entry number of the c...
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
bool fHasStreamerInfosRegistered
Set to true when RegisterStreamerInfos() is called.
ROOT::NTupleSize_t GetNEntries()
REntryRange fEntryRange
Used by the cluster pool to prevent reading beyond the given range.
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
virtual ROOT::Internal::RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster)=0
virtual std::unique_ptr< RPageSource > CloneImpl() const =0
Returns a new, unattached page source for the same data set.
std::unique_ptr< RCounters > fCounters
std::unordered_set< ROOT::DescriptorId_t > fPinnedClusters
Pinned clusters and their $2 * (cluster bunch size) - 1$ successors will not be evicted from the clus...
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
virtual void LoadStructureImpl()=0
ROOT::RNTupleDescriptor fDescriptor
ROOT::Internal::RPagePool fPagePool
Pages that are unzipped with IMT are staged into the page pool.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
virtual ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode)=0
LoadStructureImpl() has been called before AttachImpl() is called
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
RActivePhysicalColumns fActivePhysicalColumns
The active columns are implicitly defined by the model fields or views.
bool fIsAttached
Set to true once Attach() is called.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location).
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
bool fHasStructure
Set to true once LoadStructure() is called.
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAllocator
For the time being, we will use the heap allocator for all sources and sinks. This may change in the ...
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
Definition RPage.hxx:52
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
std::uint32_t GetNElements() const
Definition RPage.hxx:120
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:111
void * GrowUnchecked(std::uint32_t nElements)
Increases the number elements in the page.
Definition RPage.hxx:149
void * GetBuffer() const
Definition RPage.hxx:142
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:22
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
std::size_t fCurrentAllocatedBytes
Sum of all the write page sizes (their capacity) of the columns in fColumnsSortedByPageSize.
std::size_t fMaxAllocatedBytes
Maximum allowed value for fCurrentAllocatedBytes, set from RNTupleWriteOptions::fPageBufferBudget.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
std::set< RColumnInfo, std::greater< RColumnInfo > > fColumnsSortedByPageSize
All columns that called ReservePage() (hence TryUpdate()) at least once, sorted by their current writ...
The window of element indexes of a particular column in a particular cluster.
void SetCompressionSettings(std::optional< std::uint32_t > comp)
void SetPhysicalColumnId(ROOT::DescriptorId_t id)
void SetFirstElementIndex(ROOT::NTupleSize_t idx)
Records the partition of data into pages for a particular column in a particular cluster.
void SetPhysicalColumnId(ROOT::DescriptorId_t id)
Metadata for RNTuple clusters.
ROOT::NTupleSize_t GetNEntries() const
ROOT::NTupleSize_t GetFirstEntryIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
const std::string & GetContent() const
EExtraTypeInfoIds GetContentId() const
A field translates read and write calls from/to underlying columns to/from tree values.
ROOT::DescriptorId_t GetOnDiskId() const
The on-storage metadata of an RNTuple.
ROOT::DescriptorId_t FindNextClusterId(ROOT::DescriptorId_t clusterId) const
ROOT::DescriptorId_t FindClusterId(ROOT::NTupleSize_t entryIdx) const
const RClusterDescriptor & GetClusterDescriptor(ROOT::DescriptorId_t clusterId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
ROOT::NTupleSize_t GetIndexInCluster() const
ROOT::DescriptorId_t GetClusterId() const
ELocatorType GetType() const
For non-disk locators, the value for the Type field.
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
RNTupleTimer< RNTupleAtomicCounter, RNTupleTickCounter< RNTupleAtomicCounter > > RNTupleAtomicTimer
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:151
ROOT::DescriptorId_t fClusterId
Definition RCluster.hxx:152
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel.
std::vector< ROOT::RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< ROOT::RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:50
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
std::vector< RColumnInfo > fColumnInfos
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
ROOT::RClusterDescriptor::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics.
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
bool fHasChecksum
If set, the last 8 bytes of the buffer are the xxhash of the rest of the buffer.
void SetNElements(std::uint32_t nElements)
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
void SetBufferSize(std::size_t bufferSize)
std::size_t fBufferSize
Size of the page payload and the trailing checksum (if available).
bool operator>(const RColumnInfo &other) const
Information about a single page in the context of a cluster's page range.
void SetLocator(const RNTupleLocator &locator)
const RNTupleLocator & GetLocator() const