Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
14#include <ROOT/RPageStorage.hxx>
16#include <ROOT/RColumn.hxx>
17#include <ROOT/RFieldBase.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleUtils.hxx>
24#include <ROOT/RNTupleZip.hxx>
26#include <ROOT/RPageSinkBuf.hxx>
28#ifdef R__ENABLE_DAOS
30#endif
31
32#include <Compression.h>
33#include <TError.h>
34
35#include <algorithm>
36#include <atomic>
37#include <cassert>
38#include <cstring>
39#include <functional>
40#include <memory>
41#include <string_view>
42#include <unordered_map>
43#include <utility>
44
54
56
60
66
68 : fMetrics(""), fPageAllocator(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()), fNTupleName(name)
69{
70}
71
73
75{
76 if (!fHasChecksum)
77 return;
78
79 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
80 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
81 std::uint64_t xxhash3;
83}
84
86{
87 if (!fHasChecksum)
89
90 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
91 if (!success)
92 return R__FAIL("page checksum verification failed, data corruption detected");
94}
95
97{
98 if (!fHasChecksum)
99 return R__FAIL("invalid attempt to extract non-existing page checksum");
100
101 assert(fBufferSize >= kNBytesPageChecksum);
102 std::uint64_t checksum;
104 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
105 return checksum;
106}
107
108//------------------------------------------------------------------------------
109
112{
113 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
114 for (auto &columnInfo : itr->second) {
115 if (columnInfo.fElementId == elementId) {
116 columnInfo.fRefCounter++;
117 return;
118 }
119 }
120 itr->second.emplace_back(RColumnInfo{elementId, 1});
121}
122
125{
126 auto itr = fColumnInfos.find(physicalColumnId);
127 R__ASSERT(itr != fColumnInfos.end());
128 for (std::size_t i = 0; i < itr->second.size(); ++i) {
129 if (itr->second[i].fElementId != elementId)
130 continue;
131
132 itr->second[i].fRefCounter--;
133 if (itr->second[i].fRefCounter == 0) {
134 itr->second.erase(itr->second.begin() + i);
135 if (itr->second.empty()) {
136 fColumnInfos.erase(itr);
137 }
138 }
139 break;
140 }
141}
142
150
152{
153 if (fFirstEntry == ROOT::kInvalidNTupleIndex) {
154 /// Entry range unset, we assume that the entry range covers the complete source
155 return true;
156 }
157
158 if (clusterDesc.GetNEntries() == 0)
159 return true;
160 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
161 return false;
162 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
163 return false;
164 return true;
165}
166
169 fOptions(options),
170 fClusterPool(*this, ROOT::Internal::RNTupleReadOptionsManip::GetClusterBunchSize(fOptions)),
171 fPagePool(*this)
172{
173}
174
176
177std::unique_ptr<ROOT::Internal::RPageSource>
178ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
179 const ROOT::RNTupleReadOptions &options)
180{
181 if (ntupleName.empty()) {
182 throw RException(R__FAIL("empty RNTuple name"));
183 }
184 if (location.empty()) {
185 throw RException(R__FAIL("empty storage location"));
186 }
187 if (location.find("daos://") == 0)
188#ifdef R__ENABLE_DAOS
189 return std::make_unique<ROOT::Experimental::Internal::RPageSourceDaos>(ntupleName, location, options);
190#else
191 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
192#endif
193
194 return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
195}
196
199{
201 auto physicalId =
202 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
204 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
205 return ColumnHandle_t{physicalId, &column};
206}
207
209{
210 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
211}
212
214{
215 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
216 throw RException(R__FAIL("invalid entry range"));
217 }
218 fEntryRange = range;
219}
220
222{
223 if (!fHasStructure)
224 LoadStructureImpl();
225 fHasStructure = true;
226}
227
229{
230 LoadStructure();
231 if (!fIsAttached)
232 GetExclDescriptorGuard().MoveIn(AttachImpl(mode));
233 fIsAttached = true;
234}
235
236std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSource::Clone() const
237{
238 auto clone = CloneImpl();
239 if (fIsAttached) {
240 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
241 clone->fHasStructure = true;
242 clone->fIsAttached = true;
243 }
244 return clone;
245}
246
248{
249 return GetSharedDescriptorGuard()->GetNEntries();
250}
251
253{
254 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
255}
256
258{
259 if (fTaskScheduler)
260 UnzipClusterImpl(cluster);
261}
262
264{
265 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
266
267 const auto clusterId = cluster->GetId();
268 auto descriptorGuard = GetSharedDescriptorGuard();
269 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
270
271 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
272
273 std::atomic<bool> foundChecksumFailure{false};
274
275 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
276 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
277 for (const auto columnId : columnsInCluster) {
278 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
279 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
280 // of active columns.
281 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
282 continue;
283 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
284
285 allElements.reserve(allElements.size() + columnInfos.size());
286 for (const auto &info : columnInfos) {
287 allElements.emplace_back(GenerateColumnElement(info.fElementId));
288
289 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
290 std::uint64_t pageNo = 0;
291 std::uint64_t firstInPage = 0;
292 for (const auto &pi : pageRange.GetPageInfos()) {
293 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
295 sealedPage.SetNElements(pi.GetNElements());
296 sealedPage.SetHasChecksum(pi.HasChecksum());
297 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
298 sealedPage.SetBuffer(onDiskPage->GetAddress());
299 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
300
301 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
303 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
304 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
305 auto rv = UnsealPage(sealedPage, *element);
306 if (!rv) {
308 return;
309 }
310 auto newPage = rv.Unwrap();
311 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
312
313 newPage.SetWindow(indexOffset + firstInPage,
315 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
316 };
317
318 fTaskScheduler->AddTask(taskFunc);
319
320 firstInPage += pi.GetNElements();
321 pageNo++;
322 } // for all pages in column
323
324 fCounters->fNPageUnsealed.Add(pageNo);
325 } // for all in-memory types of the column
326 } // for all columns in cluster
327
328 fTaskScheduler->Wait();
329
331 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
332 }
333}
334
339{
340 auto descriptorGuard = GetSharedDescriptorGuard();
341 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
342
343 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
344 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
345 continue;
346
347 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
349 for (const auto &pageInfo : pageRange.GetPageInfos()) {
350 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
353 pageInfo.GetLocator().GetNBytesOnStorage()));
354 } else {
356 }
357 ++pageNo;
358 }
359 }
360}
361
363{
364 if (fLastUsedCluster == clusterId)
365 return;
366
368 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
369 auto itr = fPreloadedClusters.begin();
370 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
371 if (fPinnedClusters.count(itr->second) > 0) {
372 ++itr;
373 } else {
374 fPagePool.Evict(itr->second);
375 itr = fPreloadedClusters.erase(itr);
376 }
377 }
378 std::size_t poolWindow = 0;
379 while ((itr != fPreloadedClusters.end()) &&
381 ++itr;
382 ++poolWindow;
383 }
384 while (itr != fPreloadedClusters.end()) {
385 if (fPinnedClusters.count(itr->second) > 0) {
386 ++itr;
387 } else {
388 fPagePool.Evict(itr->second);
389 itr = fPreloadedClusters.erase(itr);
390 }
391 }
392
393 fLastUsedCluster = clusterId;
394}
395
398{
399 const auto columnId = columnHandle.fPhysicalId;
400 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
401 auto cachedPageRef =
402 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
403 if (!cachedPageRef.Get().IsNull()) {
404 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
405 return cachedPageRef;
406 }
407
408 std::uint64_t idxInCluster;
410 {
411 auto descriptorGuard = GetSharedDescriptorGuard();
412 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
413
414 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
415 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
416
417 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
418 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
419 if (columnRange.IsSuppressed())
421
422 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
423 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
424 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
425 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
426 }
427
428 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
429 throw RException(R__FAIL("tried to read a page with an unknown locator"));
430
431 UpdateLastUsedCluster(clusterInfo.fClusterId);
432 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
433}
434
437{
438 const auto clusterId = localIndex.GetClusterId();
439 const auto idxInCluster = localIndex.GetIndexInCluster();
440 const auto columnId = columnHandle.fPhysicalId;
441 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
442 auto cachedPageRef =
443 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
444 if (!cachedPageRef.Get().IsNull()) {
445 UpdateLastUsedCluster(clusterId);
446 return cachedPageRef;
447 }
448
450 throw RException(R__FAIL("entry out of bounds"));
451
453 {
454 auto descriptorGuard = GetSharedDescriptorGuard();
455 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
456 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
457 if (columnRange.IsSuppressed())
459
460 clusterInfo.fClusterId = clusterId;
461 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
462 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
463 }
464
465 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
466 throw RException(R__FAIL("tried to read a page with an unknown locator"));
467
468 UpdateLastUsedCluster(clusterInfo.fClusterId);
469 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
470}
471
473{
474 fMetrics = RNTupleMetrics(prefix);
475 fMetrics.ObserveMetrics(fClusterPool.GetMetrics());
476 fMetrics.ObserveMetrics(fPagePool.GetMetrics());
477 fCounters = std::make_unique<RCounters>(RCounters{
478 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
479 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
480 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadPayload", "B", "volume read from storage (required)"),
481 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadOverhead", "B", "volume read from storage (overhead)"),
482 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
483 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nClusterLoaded", "",
484 "number of partial clusters preloaded from storage"),
485 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
486 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageUnsealed", "", "number of pages unzipped and decoded"),
487 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
488 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
489 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuRead", "ns", "CPU time spent reading"),
490 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
491 "CPU time spent decompressing"),
492 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
493 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
494 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
495 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
496 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
497 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
498 if (auto walltime = timeWallRead->GetValueAsInt()) {
499 double payload = szReadPayload->GetValueAsInt();
500 double overhead = szReadOverhead->GetValueAsInt();
501 // unit: bytes / nanosecond = GB/s
502 return {true, (1000. * (payload + overhead) / walltime)};
503 }
504 }
505 }
506 }
507 return {false, -1.};
508 }),
509 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
510 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
511 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
512 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
513 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
514 if (auto walltime = timeWallRead->GetValueAsInt()) {
515 double unzip = szUnzip->GetValueAsInt();
516 // unit: bytes / nanosecond = GB/s
517 return {true, 1000. * unzip / walltime};
518 }
519 }
520 }
521 return {false, -1.};
522 }),
523 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
524 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
525 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
526 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
527 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
528 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
529 double unzip = szUnzip->GetValueAsInt();
530 // unit: bytes / nanosecond = GB/s
531 return {true, 1000. * unzip / walltime};
532 }
533 }
534 }
535 return {false, -1.};
536 }),
537 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
538 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
539 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
540 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
541 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
542 if (auto payload = szReadPayload->GetValueAsInt()) {
543 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
544 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
545 }
546 }
547 }
548 return {false, -1.};
549 }),
550 *fMetrics.MakeCounter<RNTupleCalcPerf *>("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
551 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
552 if (const auto szReadPayload =
553 metrics.GetLocalCounter("szReadPayload")) {
554 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
555 if (auto unzip = szUnzip->GetValueAsInt()) {
556 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
557 }
558 }
559 }
560 return {false, -1.};
561 })});
562}
563
566{
567 return UnsealPage(sealedPage, element, *fPageAllocator);
568}
569
573{
574 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
575 // large enough to hold `sealedPage.fNElements`
577 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
578 page.GrowUnchecked(sealedPage.GetNElements());
579 memset(page.GetBuffer(), 0, page.GetNBytes());
580 return page;
581 }
582
583 auto rv = sealedPage.VerifyChecksumIfEnabled();
584 if (!rv)
585 return R__FORWARD_ERROR(rv);
586
587 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
588 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
589 if (sealedPage.GetDataSize() != bytesPacked) {
591 page.GetBuffer());
592 } else {
593 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
594 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
595 // Note that usually pages are compressed.
596 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
597 }
598
599 if (!element.IsMappable()) {
600 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
601 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
602 page = std::move(tmp);
603 }
604
605 page.GrowUnchecked(sealedPage.GetNElements());
606 return page;
607}
608
610{
611 if (fHasStreamerInfosRegistered)
612 return;
613
614 for (const auto &extraTypeInfo : fDescriptor.GetExtraTypeInfoIterable()) {
616 continue;
617 // We don't need the result, it's enough that during deserialization, BuildCheck() is called for every
618 // streamer info record.
620 }
621
622 fHasStreamerInfosRegistered = true;
623}
624
625//------------------------------------------------------------------------------
626
628{
629 // Make the sort order unique by adding the physical on-disk column id as a secondary key
630 if (fCurrentPageSize == other.fCurrentPageSize)
631 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
632 return fCurrentPageSize > other.fCurrentPageSize;
633}
634
636{
637 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
638 return true;
639
640 auto itr = fColumnsSortedByPageSize.begin();
641 while (itr != fColumnsSortedByPageSize.end()) {
642 if (itr->fCurrentPageSize <= pageSizeLimit)
643 break;
644 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
645 ++itr;
646 continue;
647 }
648
649 // Flushing the current column will invalidate itr
650 auto itrFlush = itr++;
651
652 RColumnInfo next;
653 if (itr != fColumnsSortedByPageSize.end())
654 next = *itr;
655
656 itrFlush->fColumn->Flush();
657 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
658 return true;
659
660 if (next.fColumn == nullptr)
661 return false;
662 itr = fColumnsSortedByPageSize.find(next);
663 };
664
665 return false;
666}
667
669{
670 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
671 auto itr = fColumnsSortedByPageSize.find(key);
672 if (itr == fColumnsSortedByPageSize.end()) {
673 if (!TryEvict(newWritePageSize, 0))
674 return false;
675 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
676 fCurrentAllocatedBytes += newWritePageSize;
677 return true;
678 }
679
681 assert(newWritePageSize >= elem.fInitialPageSize);
682
683 if (newWritePageSize == elem.fCurrentPageSize)
684 return true;
685
686 fColumnsSortedByPageSize.erase(itr);
687
688 if (newWritePageSize < elem.fCurrentPageSize) {
689 // Page got smaller
690 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
691 elem.fCurrentPageSize = newWritePageSize;
692 fColumnsSortedByPageSize.insert(elem);
693 return true;
694 }
695
696 // Page got larger, we may need to make space available
697 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
698 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
699 // Don't change anything, let the calling column flush itself
700 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
701 fColumnsSortedByPageSize.insert(elem);
702 return false;
703 }
704 fCurrentAllocatedBytes += diffBytes;
705 elem.fCurrentPageSize = newWritePageSize;
706 fColumnsSortedByPageSize.insert(elem);
707 return true;
708}
709
710//------------------------------------------------------------------------------
711
713 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
714{
716}
717
719
721{
722 assert(config.fPage);
723 assert(config.fElement);
724 assert(config.fBuffer);
725
726 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
727 bool isAdoptedBuffer = true;
728 auto nBytesPacked = config.fPage->GetNBytes();
729 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
730
731 if (!config.fElement->IsMappable()) {
732 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
733 pageBuf = new unsigned char[nBytesPacked];
734 isAdoptedBuffer = false;
735 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
736 }
738
739 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
740 config.fWriteChecksum) {
743 if (!isAdoptedBuffer)
744 delete[] pageBuf;
745 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
746 isAdoptedBuffer = true;
747 }
748
750
752 sealedPage.ChecksumIfEnabled();
753
754 return sealedPage;
755}
756
759{
760 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
761 if (fSealPageBuffer.size() < nBytes)
762 fSealPageBuffer.resize(nBytes);
763
764 RSealPageConfig config;
765 config.fPage = &page;
766 config.fElement = &element;
767 config.fCompressionSettings = GetWriteOptions().GetCompression();
768 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
769 config.fAllowAlias = true;
770 config.fBuffer = fSealPageBuffer.data();
771
772 return SealPage(config);
773}
774
776{
777 for (const auto &cb : fOnDatasetCommitCallbacks)
778 cb(*this);
779 return CommitDatasetImpl();
780}
781
783{
784 R__ASSERT(nElements > 0);
785 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
786 const auto nBytes = elementSize * nElements;
787 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
788 return ROOT::Internal::RPage();
789 return fPageAllocator->NewPage(elementSize, nElements);
790}
791
792//------------------------------------------------------------------------------
793
794std::unique_ptr<ROOT::Internal::RPageSink>
795ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
796 const ROOT::RNTupleWriteOptions &options)
797{
798 if (ntupleName.empty()) {
799 throw RException(R__FAIL("empty RNTuple name"));
800 }
801 if (location.empty()) {
802 throw RException(R__FAIL("empty storage location"));
803 }
804 if (location.find("daos://") == 0) {
805#ifdef R__ENABLE_DAOS
806 return std::make_unique<ROOT::Experimental::Internal::RPageSinkDaos>(ntupleName, location, options);
807#else
808 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
809#endif
810 }
811
812 // Otherwise assume that the user wants us to create a file.
813 return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
814}
815
817 const ROOT::RNTupleWriteOptions &options)
818 : RPageSink(name, options)
819{
820}
821
823
826{
827 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
829 columnBuilder.LogicalColumnId(columnId)
830 .PhysicalColumnId(columnId)
831 .FieldId(fieldId)
832 .BitsOnStorage(column.GetBitsOnStorage())
833 .ValueRange(column.GetValueRange())
834 .Type(column.GetType())
835 .Index(column.GetIndex())
836 .RepresentationIndex(column.GetRepresentationIndex())
837 .FirstElementIndex(column.GetFirstElementIndex());
838 // For late model extension, we assume that the primary column representation is the active one for the
839 // deferred range. All other representations are suppressed.
840 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
841 columnBuilder.SetSuppressedDeferred();
842 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
843 return ColumnHandle_t{columnId, &column};
844}
845
848{
849 if (fIsInitialized) {
850 for (const auto &field : changeset.fAddedFields) {
851 if (field->GetStructure() == ENTupleStructure::kStreamer) {
852 throw ROOT::RException(R__FAIL("a Model cannot be extended with Streamer fields"));
853 }
854 }
855 }
856
857 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
858
859 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
860 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
861 // of the changeset follow immediately the already existing physical columns
862 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
863 const auto &reps = f.GetColumnRepresentatives();
864 if (reps.empty())
865 return 0;
866 return reps.size() * reps[0].size();
867 };
868 std::uint32_t nNewPhysicalColumns = 0;
869 for (auto f : changeset.fAddedFields) {
871 for (const auto &descendant : *f)
873 }
874 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
875 }
876
877 auto addField = [&](ROOT::RFieldBase &f) {
878 auto fieldId = descriptor.GetNFields();
879 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
880 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
881 f.SetOnDiskId(fieldId);
882 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
883 };
885 auto fieldId = descriptor.GetNFields();
886 auto sourceFieldId =
888 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
889 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
890 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
891 f.SetOnDiskId(fieldId);
892 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
893 auto targetId = descriptor.GetNLogicalColumns();
895 columnBuilder.LogicalColumnId(targetId)
896 .PhysicalColumnId(source.GetLogicalId())
897 .FieldId(fieldId)
898 .BitsOnStorage(source.GetBitsOnStorage())
899 .ValueRange(source.GetValueRange())
900 .Type(source.GetType())
901 .Index(source.GetIndex())
902 .RepresentationIndex(source.GetRepresentationIndex());
903 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
904 }
905 };
906
907 R__ASSERT(firstEntry >= fPrevClusterNEntries);
908 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
909 for (auto f : changeset.fAddedFields) {
910 addField(*f);
911 for (auto &descendant : *f)
913 }
914 for (auto f : changeset.fAddedProjectedFields) {
916 for (auto &descendant : *f)
918 }
919
920 const auto nColumns = descriptor.GetNPhysicalColumns();
921 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
922 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
925 columnRange.SetPhysicalColumnId(i);
926 // We set the first element index in the current cluster to the first element that is part of a materialized page
927 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
928 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
929 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
930 columnRange.SetNElements(0);
931 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
932 fOpenColumnRanges.emplace_back(columnRange);
934 pageRange.SetPhysicalColumnId(i);
935 fOpenPageRanges.emplace_back(std::move(pageRange));
936 }
937
938 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
939 // header was already serialized, this has to be done manually as it is required for page list serialization.
940 if (fSerializationContext.GetHeaderSize() > 0)
941 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
942}
943
945{
947 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
948
949 fInfosOfStreamerFields.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
950}
951
953{
954 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
955 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
956
958 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
959 fieldZero.SetOnDiskId(0);
961 projectedFields.GetFieldZero().SetOnDiskId(0);
962
964 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
965 for (auto f : fieldZero.GetMutableSubfields())
966 initialChangeset.fAddedFields.emplace_back(f);
967 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
968 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
969 initialChangeset.fAddedProjectedFields.emplace_back(f);
970 UpdateSchema(initialChangeset, 0U);
971
972 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
973 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
974 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
975 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
976
977 fDescriptorBuilder.BeginHeaderExtension();
978}
979
980std::unique_ptr<ROOT::RNTupleModel>
982{
983 // Create new descriptor
984 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
985 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
986
987 // Create column/page ranges
988 const auto nColumns = descriptor.GetNPhysicalColumns();
989 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
990 fOpenColumnRanges.reserve(nColumns);
991 fOpenPageRanges.reserve(nColumns);
992 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
993 const auto &column = descriptor.GetColumnDescriptor(i);
995 columnRange.SetPhysicalColumnId(i);
996 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
997 columnRange.SetNElements(0);
998 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
999 fOpenColumnRanges.emplace_back(columnRange);
1001 pageRange.SetPhysicalColumnId(i);
1002 fOpenPageRanges.emplace_back(std::move(pageRange));
1003 }
1004
1005 if (copyClusters) {
1006 // Clone and add all cluster descriptors
1007 auto clusterId = srcDescriptor.FindClusterId(0, 0);
1009 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
1010 auto nEntries = cluster.GetNEntries();
1011 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1012 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
1013 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
1014 break;
1015 const auto &columnRange = cluster.GetColumnRange(i);
1016 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
1017 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
1018 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
1019 }
1020 fDescriptorBuilder.AddCluster(cluster.Clone());
1021 fPrevClusterNEntries += nEntries;
1022
1023 clusterId = srcDescriptor.FindNextClusterId(clusterId);
1024 }
1025 }
1026
1027 // Create model
1029 modelOpts.SetReconstructProjections(true);
1030 // We want to emulate unknown types to allow merging RNTuples containing types that we lack dictionaries for.
1031 modelOpts.SetEmulateUnknownTypes(true);
1032 auto model = descriptor.CreateModel(modelOpts);
1033 if (!copyClusters) {
1035 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
1036 }
1037
1038 // Serialize header and init from it
1039 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
1040 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
1041 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
1042 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1043
1044 fDescriptorBuilder.BeginHeaderExtension();
1045
1046 // mark this sink as initialized
1047 fIsInitialized = true;
1048
1049 return model;
1050}
1051
1053{
1054 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1055}
1056
1058{
1059 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1060
1062 pageInfo.SetNElements(page.GetNElements());
1063 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1064 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1065 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1066}
1067
1070{
1071 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1072
1074 pageInfo.SetNElements(sealedPage.GetNElements());
1075 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1076 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1077 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1078}
1079
1080std::vector<ROOT::RNTupleLocator>
1081ROOT::Internal::RPagePersistentSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
1082 const std::vector<bool> &mask)
1083{
1084 std::vector<ROOT::RNTupleLocator> locators;
1085 locators.reserve(mask.size());
1086 std::size_t i = 0;
1087 for (auto &range : ranges) {
1088 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1089 if (mask[i++])
1090 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1091 }
1092 }
1093 locators.shrink_to_fit();
1094 return locators;
1095}
1096
1097void ROOT::Internal::RPagePersistentSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
1098{
1099 /// Used in the `originalPages` map
1100 struct RSealedPageLink {
1101 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1102 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1103 };
1104
1105 std::vector<bool> mask;
1106 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1107 std::vector<std::size_t> locatorIndexes;
1108 // Maps page checksums to the first sealed page with that checksum
1109 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1110 std::size_t iLocator = 0;
1111 for (auto &range : ranges) {
1112 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1113 mask.reserve(mask.size() + rangeSize);
1114 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1115
1116 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1117 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1118 mask.emplace_back(true);
1119 locatorIndexes.emplace_back(iLocator++);
1120 continue;
1121 }
1122 // Same page merging requires page checksums - this is checked in the write options
1123 R__ASSERT(sealedPageIt->GetHasChecksum());
1124
1125 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1126 auto itr = originalPages.find(chk);
1127 if (itr == originalPages.end()) {
1128 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1129 mask.emplace_back(true);
1130 locatorIndexes.emplace_back(iLocator++);
1131 continue;
1132 }
1133
1134 const auto *p = itr->second.fSealedPage;
1135 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1136 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1137 mask.emplace_back(true);
1138 locatorIndexes.emplace_back(iLocator++);
1139 continue;
1140 }
1141
1142 mask.emplace_back(false);
1143 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1144 }
1145
1146 mask.shrink_to_fit();
1147 locatorIndexes.shrink_to_fit();
1148 }
1149
1150 auto locators = CommitSealedPageVImpl(ranges, mask);
1151 unsigned i = 0;
1152
1153 for (auto &range : ranges) {
1154 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1155 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1156
1158 pageInfo.SetNElements(sealedPageIt->GetNElements());
1159 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1160 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1161 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1162 }
1163 }
1164}
1165
1168{
1170 stagedCluster.fNBytesWritten = StageClusterImpl();
1171 stagedCluster.fNEntries = nNewEntries;
1172
1173 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1174 RStagedCluster::RColumnInfo columnInfo;
1175 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1176 if (fOpenColumnRanges[i].IsSuppressed()) {
1177 assert(fOpenPageRanges[i].GetPageInfos().empty());
1178 columnInfo.fPageRange.SetPhysicalColumnId(i);
1179 columnInfo.fIsSuppressed = true;
1180 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1181 fOpenColumnRanges[i].SetNElements(0);
1182 fOpenColumnRanges[i].SetIsSuppressed(false);
1183 } else {
1184 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1185 fOpenPageRanges[i].SetPhysicalColumnId(i);
1186
1187 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1188 fOpenColumnRanges[i].SetNElements(0);
1189 }
1190 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1191 }
1192
1193 return stagedCluster;
1194}
1195
1197{
1198 for (const auto &cluster : clusters) {
1200 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1201 .FirstEntryIndex(fPrevClusterNEntries)
1202 .NEntries(cluster.fNEntries);
1203 for (const auto &columnInfo : cluster.fColumnInfos) {
1204 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1205 if (columnInfo.fIsSuppressed) {
1206 assert(columnInfo.fPageRange.GetPageInfos().empty());
1207 clusterBuilder.MarkSuppressedColumnRange(colId);
1208 } else {
1209 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1210 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1211 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1212 }
1213 }
1214
1215 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1216 for (const auto &columnInfo : cluster.fColumnInfos) {
1217 if (!columnInfo.fIsSuppressed)
1218 continue;
1219 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1220 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1221 // cluster. This information has been determined for the committed cluster descriptor through
1222 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1223 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1224 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1225 columnRangeFromDesc.GetNElements());
1226 }
1227
1228 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1229 fPrevClusterNEntries += cluster.fNEntries;
1230 }
1231}
1232
1234{
1235 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1236
1237 const auto nClusters = descriptor.GetNActiveClusters();
1238 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1239 physClusterIDs.reserve(nClusters);
1240 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1241 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1242 }
1243
1244 auto szPageList =
1245 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1248
1249 const auto clusterGroupId = descriptor.GetNClusterGroups();
1250 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1252 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1253 if (fNextClusterInGroup == nClusters) {
1254 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1255 } else {
1256 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1257 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1258 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1259 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1260 firstClusterDesc.GetFirstEntryIndex())
1261 .NClusters(nClusters - fNextClusterInGroup);
1262 }
1263 std::vector<ROOT::DescriptorId_t> clusterIds;
1264 clusterIds.reserve(nClusters);
1265 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1266 clusterIds.emplace_back(i);
1267 }
1268 cgBuilder.AddSortedClusters(clusterIds);
1269 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1270 fSerializationContext.MapClusterGroupId(clusterGroupId);
1271
1272 fNextClusterInGroup = nClusters;
1273}
1274
1277{
1279
1281 auto attrSetDesc = attrSetDescBuilder.SchemaVersion(kSchemaVersionMajor, kSchemaVersionMinor)
1282 .AnchorLength(attrAnchorInfo.fLength)
1283 .AnchorLocator(attrAnchorInfo.fLocator)
1284 .Name(attrSetName)
1285 .MoveDescriptor()
1286 .Unwrap();
1287 fDescriptorBuilder.AddAttributeSet(std::move(attrSetDesc)).ThrowOnError();
1288}
1289
1291{
1292 if (!fInfosOfStreamerFields.empty()) {
1293 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1294 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1295 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1296 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1297 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1298 // should be zero.
1299 R__ASSERT(etDesc.GetTypeName().empty());
1300 R__ASSERT(etDesc.GetTypeVersion() == 0);
1301 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1302 fInfosOfStreamerFields.merge(etInfo);
1303 }
1304 }
1305
1308 .Content(RNTupleSerializer::SerializeStreamerInfos(fInfosOfStreamerFields));
1309 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1310 }
1311
1312 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1313
1314 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1316 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1317
1318 return CommitDatasetImpl(bufFooter.get(), szFooter);
1319}
1320
1322{
1323 fMetrics = RNTupleMetrics(prefix);
1324 fCounters = std::make_unique<RCounters>(RCounters{
1325 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageCommitted", "", "number of pages committed to storage"),
1326 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szWritePayload", "B", "volume written for committed pages"),
1327 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1328 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1329 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1330 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuWrite", "ns", "CPU time spent writing"),
1331 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1332 "CPU time spent compressing")});
1333}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:304
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:148
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:346
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:352
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:339
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:340
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:354
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:361
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:341
std::uint32_t GetIndex() const
Definition RColumn.hxx:351
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:99
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) override
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final
Adds the given anchor information (name + locator) into the main RNTuple's descriptor as an attribute...
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
RNTupleLink CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void RegisterStreamerInfos()
Builds the streamer info records from the descriptor's extra type info section.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
void UnzipCluster(ROOT::Internal::RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::NTupleSize_t GetNEntries()
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
The window of element indexes of a particular column in a particular cluster.
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
The RNTupleModel encapulates the schema of an RNTuple.
const std::string & GetDescription() const
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:290
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:198
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
bool operator>(const RColumnInfo &other) const
Information about a single page in the context of a cluster's page range.