Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
14#include <ROOT/RPageStorage.hxx>
16#include <ROOT/RColumn.hxx>
17#include <ROOT/RFieldBase.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtils.hxx>
23#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <cstring>
38#include <functional>
39#include <memory>
40#include <string_view>
41#include <unordered_map>
42#include <utility>
43
53
57
63
65 : fMetrics(""), fPageAllocator(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()), fNTupleName(name)
66{
67}
68
70
72{
73 if (!fHasChecksum)
74 return;
75
76 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
77 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
78 std::uint64_t xxhash3;
80}
81
83{
84 if (!fHasChecksum)
86
87 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
88 if (!success)
89 return R__FAIL("page checksum verification failed, data corruption detected");
91}
92
94{
95 if (!fHasChecksum)
96 return R__FAIL("invalid attempt to extract non-existing page checksum");
97
98 assert(fBufferSize >= kNBytesPageChecksum);
99 std::uint64_t checksum;
101 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
102 return checksum;
103}
104
105//------------------------------------------------------------------------------
106
109{
110 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
111 for (auto &columnInfo : itr->second) {
112 if (columnInfo.fElementId == elementId) {
113 columnInfo.fRefCounter++;
114 return;
115 }
116 }
117 itr->second.emplace_back(RColumnInfo{elementId, 1});
118}
119
122{
123 auto itr = fColumnInfos.find(physicalColumnId);
124 R__ASSERT(itr != fColumnInfos.end());
125 for (std::size_t i = 0; i < itr->second.size(); ++i) {
126 if (itr->second[i].fElementId != elementId)
127 continue;
128
129 itr->second[i].fRefCounter--;
130 if (itr->second[i].fRefCounter == 0) {
131 itr->second.erase(itr->second.begin() + i);
132 if (itr->second.empty()) {
133 fColumnInfos.erase(itr);
134 }
135 }
136 break;
137 }
138}
139
147
149{
150 if (fFirstEntry == ROOT::kInvalidNTupleIndex) {
151 /// Entry range unset, we assume that the entry range covers the complete source
152 return true;
153 }
154
155 if (clusterDesc.GetNEntries() == 0)
156 return true;
157 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
158 return false;
159 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
160 return false;
161 return true;
162}
163
165 : RPageStorage(name), fOptions(options)
166{
167}
168
170
171std::unique_ptr<ROOT::Internal::RPageSource>
172ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
173 const ROOT::RNTupleReadOptions &options)
174{
175 if (ntupleName.empty()) {
176 throw RException(R__FAIL("empty RNTuple name"));
177 }
178 if (location.empty()) {
179 throw RException(R__FAIL("empty storage location"));
180 }
181 if (location.find("daos://") == 0)
182#ifdef R__ENABLE_DAOS
183 return std::make_unique<ROOT::Experimental::Internal::RPageSourceDaos>(ntupleName, location, options);
184#else
185 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
186#endif
187
188 return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
189}
190
193{
195 auto physicalId =
196 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
198 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
199 return ColumnHandle_t{physicalId, &column};
200}
201
203{
204 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
205}
206
208{
209 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
210 throw RException(R__FAIL("invalid entry range"));
211 }
212 fEntryRange = range;
213}
214
216{
217 if (!fHasStructure)
218 LoadStructureImpl();
219 fHasStructure = true;
220}
221
223{
224 LoadStructure();
225 if (!fIsAttached)
226 GetExclDescriptorGuard().MoveIn(AttachImpl(mode));
227 fIsAttached = true;
228}
229
230std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSource::Clone() const
231{
232 auto clone = CloneImpl();
233 if (fIsAttached) {
234 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
235 clone->fHasStructure = true;
236 clone->fIsAttached = true;
237 }
238 return clone;
239}
240
242{
243 return GetSharedDescriptorGuard()->GetNEntries();
244}
245
247{
248 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
249}
250
252{
253 if (fTaskScheduler)
254 UnzipClusterImpl(cluster);
255}
256
258{
259 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
260
261 const auto clusterId = cluster->GetId();
262 auto descriptorGuard = GetSharedDescriptorGuard();
263 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
264
265 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
266
267 std::atomic<bool> foundChecksumFailure{false};
268
269 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
270 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
271 for (const auto columnId : columnsInCluster) {
272 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
273 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
274 // of active columns.
275 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
276 continue;
277 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
278
279 allElements.reserve(allElements.size() + columnInfos.size());
280 for (const auto &info : columnInfos) {
281 allElements.emplace_back(GenerateColumnElement(info.fElementId));
282
283 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
284 std::uint64_t pageNo = 0;
285 std::uint64_t firstInPage = 0;
286 for (const auto &pi : pageRange.GetPageInfos()) {
287 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
289 sealedPage.SetNElements(pi.GetNElements());
290 sealedPage.SetHasChecksum(pi.HasChecksum());
291 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
292 sealedPage.SetBuffer(onDiskPage->GetAddress());
293 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
294
295 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
297 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
298 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
299 auto rv = UnsealPage(sealedPage, *element);
300 if (!rv) {
302 return;
303 }
304 auto newPage = rv.Unwrap();
305 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
306
307 newPage.SetWindow(indexOffset + firstInPage,
309 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
310 };
311
312 fTaskScheduler->AddTask(taskFunc);
313
314 firstInPage += pi.GetNElements();
315 pageNo++;
316 } // for all pages in column
317
318 fCounters->fNPageUnsealed.Add(pageNo);
319 } // for all in-memory types of the column
320 } // for all columns in cluster
321
322 fTaskScheduler->Wait();
323
325 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
326 }
327}
328
333{
334 auto descriptorGuard = GetSharedDescriptorGuard();
335 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
336
337 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
338 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
339 continue;
340
341 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
343 for (const auto &pageInfo : pageRange.GetPageInfos()) {
344 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
347 pageInfo.GetLocator().GetNBytesOnStorage()));
348 } else {
350 }
351 ++pageNo;
352 }
353 }
354}
355
357{
358 if (fLastUsedCluster == clusterId)
359 return;
360
362 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
363 auto itr = fPreloadedClusters.begin();
364 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
365 fPagePool.Evict(itr->second);
366 itr = fPreloadedClusters.erase(itr);
367 }
368 std::size_t poolWindow = 0;
369 while ((itr != fPreloadedClusters.end()) &&
371 ++itr;
372 ++poolWindow;
373 }
374 while (itr != fPreloadedClusters.end()) {
375 fPagePool.Evict(itr->second);
376 itr = fPreloadedClusters.erase(itr);
377 }
378
379 fLastUsedCluster = clusterId;
380}
381
384{
385 const auto columnId = columnHandle.fPhysicalId;
386 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
387 auto cachedPageRef =
388 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
389 if (!cachedPageRef.Get().IsNull()) {
390 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
391 return cachedPageRef;
392 }
393
394 std::uint64_t idxInCluster;
396 {
397 auto descriptorGuard = GetSharedDescriptorGuard();
398 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
399
400 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
401 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
402
403 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
404 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
405 if (columnRange.IsSuppressed())
407
408 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
409 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
410 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
411 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
412 }
413
414 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
415 throw RException(R__FAIL("tried to read a page with an unknown locator"));
416
417 UpdateLastUsedCluster(clusterInfo.fClusterId);
418 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
419}
420
423{
424 const auto clusterId = localIndex.GetClusterId();
425 const auto idxInCluster = localIndex.GetIndexInCluster();
426 const auto columnId = columnHandle.fPhysicalId;
427 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
428 auto cachedPageRef =
429 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
430 if (!cachedPageRef.Get().IsNull()) {
431 UpdateLastUsedCluster(clusterId);
432 return cachedPageRef;
433 }
434
436 throw RException(R__FAIL("entry out of bounds"));
437
439 {
440 auto descriptorGuard = GetSharedDescriptorGuard();
441 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
442 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
443 if (columnRange.IsSuppressed())
445
446 clusterInfo.fClusterId = clusterId;
447 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
448 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
449 }
450
451 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
452 throw RException(R__FAIL("tried to read a page with an unknown locator"));
453
454 UpdateLastUsedCluster(clusterInfo.fClusterId);
455 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
456}
457
459{
460 fMetrics = RNTupleMetrics(prefix);
461 fCounters = std::make_unique<RCounters>(RCounters{
462 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
463 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
464 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadPayload", "B", "volume read from storage (required)"),
465 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadOverhead", "B", "volume read from storage (overhead)"),
466 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
467 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nClusterLoaded", "",
468 "number of partial clusters preloaded from storage"),
469 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
470 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageUnsealed", "", "number of pages unzipped and decoded"),
471 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
472 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
473 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuRead", "ns", "CPU time spent reading"),
474 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
475 "CPU time spent decompressing"),
476 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
477 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
478 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
479 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
480 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
481 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
482 if (auto walltime = timeWallRead->GetValueAsInt()) {
483 double payload = szReadPayload->GetValueAsInt();
484 double overhead = szReadOverhead->GetValueAsInt();
485 // unit: bytes / nanosecond = GB/s
486 return {true, (1000. * (payload + overhead) / walltime)};
487 }
488 }
489 }
490 }
491 return {false, -1.};
492 }),
493 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
494 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
495 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
496 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
497 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
498 if (auto walltime = timeWallRead->GetValueAsInt()) {
499 double unzip = szUnzip->GetValueAsInt();
500 // unit: bytes / nanosecond = GB/s
501 return {true, 1000. * unzip / walltime};
502 }
503 }
504 }
505 return {false, -1.};
506 }),
507 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
508 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
509 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
510 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
511 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
512 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
513 double unzip = szUnzip->GetValueAsInt();
514 // unit: bytes / nanosecond = GB/s
515 return {true, 1000. * unzip / walltime};
516 }
517 }
518 }
519 return {false, -1.};
520 }),
521 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
522 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
523 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
524 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
525 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
526 if (auto payload = szReadPayload->GetValueAsInt()) {
527 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
528 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
529 }
530 }
531 }
532 return {false, -1.};
533 }),
534 *fMetrics.MakeCounter<RNTupleCalcPerf *>("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
535 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
536 if (const auto szReadPayload =
537 metrics.GetLocalCounter("szReadPayload")) {
538 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
539 if (auto unzip = szUnzip->GetValueAsInt()) {
540 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
542 }
543 }
544 return {false, -1.};
545 })});
546}
547
550{
551 return UnsealPage(sealedPage, element, *fPageAllocator);
552}
553
557{
558 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
559 // large enough to hold `sealedPage.fNElements`
561 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
562 page.GrowUnchecked(sealedPage.GetNElements());
563 memset(page.GetBuffer(), 0, page.GetNBytes());
564 return page;
565 }
566
567 auto rv = sealedPage.VerifyChecksumIfEnabled();
568 if (!rv)
569 return R__FORWARD_ERROR(rv);
570
571 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
572 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
573 if (sealedPage.GetDataSize() != bytesPacked) {
575 page.GetBuffer());
576 } else {
577 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
578 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
579 // Note that usually pages are compressed.
580 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
581 }
582
583 if (!element.IsMappable()) {
584 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
585 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
586 page = std::move(tmp);
587 }
588
589 page.GrowUnchecked(sealedPage.GetNElements());
590 return page;
591}
592
594{
595 if (fHasStreamerInfosRegistered)
596 return;
597
598 for (const auto &extraTypeInfo : fDescriptor.GetExtraTypeInfoIterable()) {
600 continue;
601 // We don't need the result, it's enough that during deserialization, BuildCheck() is called for every
602 // streamer info record.
604 }
605
606 fHasStreamerInfosRegistered = true;
607}
608
609//------------------------------------------------------------------------------
610
612{
613 // Make the sort order unique by adding the physical on-disk column id as a secondary key
614 if (fCurrentPageSize == other.fCurrentPageSize)
615 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
616 return fCurrentPageSize > other.fCurrentPageSize;
617}
618
620{
621 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
622 return true;
623
624 auto itr = fColumnsSortedByPageSize.begin();
625 while (itr != fColumnsSortedByPageSize.end()) {
626 if (itr->fCurrentPageSize <= pageSizeLimit)
627 break;
628 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
629 ++itr;
630 continue;
631 }
632
633 // Flushing the current column will invalidate itr
634 auto itrFlush = itr++;
635
636 RColumnInfo next;
637 if (itr != fColumnsSortedByPageSize.end())
638 next = *itr;
639
640 itrFlush->fColumn->Flush();
641 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
642 return true;
643
644 if (next.fColumn == nullptr)
645 return false;
646 itr = fColumnsSortedByPageSize.find(next);
647 };
648
649 return false;
650}
651
653{
654 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
655 auto itr = fColumnsSortedByPageSize.find(key);
656 if (itr == fColumnsSortedByPageSize.end()) {
657 if (!TryEvict(newWritePageSize, 0))
658 return false;
659 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
660 fCurrentAllocatedBytes += newWritePageSize;
661 return true;
662 }
663
665 assert(newWritePageSize >= elem.fInitialPageSize);
666
667 if (newWritePageSize == elem.fCurrentPageSize)
668 return true;
669
670 fColumnsSortedByPageSize.erase(itr);
671
672 if (newWritePageSize < elem.fCurrentPageSize) {
673 // Page got smaller
674 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
675 elem.fCurrentPageSize = newWritePageSize;
676 fColumnsSortedByPageSize.insert(elem);
677 return true;
678 }
679
680 // Page got larger, we may need to make space available
681 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
682 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
683 // Don't change anything, let the calling column flush itself
684 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
685 fColumnsSortedByPageSize.insert(elem);
686 return false;
687 }
688 fCurrentAllocatedBytes += diffBytes;
689 elem.fCurrentPageSize = newWritePageSize;
690 fColumnsSortedByPageSize.insert(elem);
691 return true;
692}
693
694//------------------------------------------------------------------------------
695
697 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
698{
700}
701
703
705{
706 assert(config.fPage);
707 assert(config.fElement);
708 assert(config.fBuffer);
709
710 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
711 bool isAdoptedBuffer = true;
712 auto nBytesPacked = config.fPage->GetNBytes();
713 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
714
715 if (!config.fElement->IsMappable()) {
716 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
717 pageBuf = new unsigned char[nBytesPacked];
718 isAdoptedBuffer = false;
719 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
720 }
722
723 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
724 config.fWriteChecksum) {
727 if (!isAdoptedBuffer)
728 delete[] pageBuf;
729 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
730 isAdoptedBuffer = true;
731 }
732
734
736 sealedPage.ChecksumIfEnabled();
737
738 return sealedPage;
739}
740
743{
744 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
745 if (fSealPageBuffer.size() < nBytes)
746 fSealPageBuffer.resize(nBytes);
747
748 RSealPageConfig config;
749 config.fPage = &page;
750 config.fElement = &element;
751 config.fCompressionSettings = GetWriteOptions().GetCompression();
752 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
753 config.fAllowAlias = true;
754 config.fBuffer = fSealPageBuffer.data();
755
756 return SealPage(config);
757}
758
760{
761 for (const auto &cb : fOnDatasetCommitCallbacks)
762 cb(*this);
763 CommitDatasetImpl();
764}
765
767{
768 R__ASSERT(nElements > 0);
769 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
770 const auto nBytes = elementSize * nElements;
771 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
772 return ROOT::Internal::RPage();
773 return fPageAllocator->NewPage(elementSize, nElements);
774}
775
776//------------------------------------------------------------------------------
777
778std::unique_ptr<ROOT::Internal::RPageSink>
779ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
780 const ROOT::RNTupleWriteOptions &options)
781{
782 if (ntupleName.empty()) {
783 throw RException(R__FAIL("empty RNTuple name"));
784 }
785 if (location.empty()) {
786 throw RException(R__FAIL("empty storage location"));
787 }
788 if (location.find("daos://") == 0) {
789#ifdef R__ENABLE_DAOS
790 return std::make_unique<ROOT::Experimental::Internal::RPageSinkDaos>(ntupleName, location, options);
791#else
792 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
793#endif
794 }
795
796 // Otherwise assume that the user wants us to create a file.
797 return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
798}
799
801 const ROOT::RNTupleWriteOptions &options)
802 : RPageSink(name, options)
803{
804}
805
807
810{
811 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
813 columnBuilder.LogicalColumnId(columnId)
814 .PhysicalColumnId(columnId)
815 .FieldId(fieldId)
816 .BitsOnStorage(column.GetBitsOnStorage())
817 .ValueRange(column.GetValueRange())
818 .Type(column.GetType())
819 .Index(column.GetIndex())
820 .RepresentationIndex(column.GetRepresentationIndex())
821 .FirstElementIndex(column.GetFirstElementIndex());
822 // For late model extension, we assume that the primary column representation is the active one for the
823 // deferred range. All other representations are suppressed.
824 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
825 columnBuilder.SetSuppressedDeferred();
826 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
827 return ColumnHandle_t{columnId, &column};
828}
829
832{
833 if (fIsInitialized)
834 for (const auto &field : changeset.fAddedFields)
835 if (field->GetStructure() == ENTupleStructure::kStreamer)
836 throw ROOT::RException(R__FAIL("a Model cannot be extended with Streamer fields"));
837
838 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
839
840 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
841 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
842 // of the changeset follow immediately the already existing physical columns
843 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
844 const auto &reps = f.GetColumnRepresentatives();
845 if (reps.empty())
846 return 0;
847 return reps.size() * reps[0].size();
848 };
849 std::uint32_t nNewPhysicalColumns = 0;
850 for (auto f : changeset.fAddedFields) {
852 for (const auto &descendant : *f)
854 }
855 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
856 }
857
858 auto addField = [&](ROOT::RFieldBase &f) {
859 auto fieldId = descriptor.GetNFields();
860 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
861 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
862 f.SetOnDiskId(fieldId);
863 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
864 };
866 auto fieldId = descriptor.GetNFields();
867 auto sourceFieldId =
869 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
870 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
871 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
872 f.SetOnDiskId(fieldId);
873 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
874 auto targetId = descriptor.GetNLogicalColumns();
876 columnBuilder.LogicalColumnId(targetId)
877 .PhysicalColumnId(source.GetLogicalId())
878 .FieldId(fieldId)
879 .BitsOnStorage(source.GetBitsOnStorage())
880 .ValueRange(source.GetValueRange())
881 .Type(source.GetType())
882 .Index(source.GetIndex())
883 .RepresentationIndex(source.GetRepresentationIndex());
884 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
885 }
886 };
887
888 R__ASSERT(firstEntry >= fPrevClusterNEntries);
889 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
890 for (auto f : changeset.fAddedFields) {
891 addField(*f);
892 for (auto &descendant : *f)
894 }
895 for (auto f : changeset.fAddedProjectedFields) {
897 for (auto &descendant : *f)
899 }
900
901 const auto nColumns = descriptor.GetNPhysicalColumns();
902 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
903 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
906 columnRange.SetPhysicalColumnId(i);
907 // We set the first element index in the current cluster to the first element that is part of a materialized page
908 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
909 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
910 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
911 columnRange.SetNElements(0);
912 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
913 fOpenColumnRanges.emplace_back(columnRange);
915 pageRange.SetPhysicalColumnId(i);
916 fOpenPageRanges.emplace_back(std::move(pageRange));
917 }
918
919 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
920 // header was already serialized, this has to be done manually as it is required for page list serialization.
921 if (fSerializationContext.GetHeaderSize() > 0)
922 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
923}
924
926{
928 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
929
930 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
931}
932
934{
935 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
936 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
937
939 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
940 fieldZero.SetOnDiskId(0);
942 projectedFields.GetFieldZero().SetOnDiskId(0);
943
945 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
946 for (auto f : fieldZero.GetMutableSubfields())
947 initialChangeset.fAddedFields.emplace_back(f);
948 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
949 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
950 initialChangeset.fAddedProjectedFields.emplace_back(f);
951 UpdateSchema(initialChangeset, 0U);
952
953 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
954 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
955 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
956 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
957
958 fDescriptorBuilder.BeginHeaderExtension();
959}
960
961std::unique_ptr<ROOT::RNTupleModel>
963{
964 // Create new descriptor
965 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
966 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
967
968 // Create column/page ranges
969 const auto nColumns = descriptor.GetNPhysicalColumns();
970 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
971 fOpenColumnRanges.reserve(nColumns);
972 fOpenPageRanges.reserve(nColumns);
973 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
974 const auto &column = descriptor.GetColumnDescriptor(i);
976 columnRange.SetPhysicalColumnId(i);
977 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
978 columnRange.SetNElements(0);
979 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
980 fOpenColumnRanges.emplace_back(columnRange);
982 pageRange.SetPhysicalColumnId(i);
983 fOpenPageRanges.emplace_back(std::move(pageRange));
984 }
985
986 if (copyClusters) {
987 // Clone and add all cluster descriptors
988 auto clusterId = srcDescriptor.FindClusterId(0, 0);
990 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
991 auto nEntries = cluster.GetNEntries();
992 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
993 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
994 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
995 break;
996 const auto &columnRange = cluster.GetColumnRange(i);
997 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
998 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
999 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
1000 }
1001 fDescriptorBuilder.AddCluster(cluster.Clone());
1002 fPrevClusterNEntries += nEntries;
1003
1004 clusterId = srcDescriptor.FindNextClusterId(clusterId);
1005 }
1006 }
1007
1008 // Create model
1010 modelOpts.SetReconstructProjections(true);
1011 auto model = descriptor.CreateModel(modelOpts);
1012 if (!copyClusters) {
1014 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
1015 }
1016
1017 // Serialize header and init from it
1018 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
1019 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
1020 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
1021 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1022
1023 fDescriptorBuilder.BeginHeaderExtension();
1024
1025 // mark this sink as initialized
1026 fIsInitialized = true;
1027
1028 return model;
1029}
1030
1032{
1033 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1034}
1035
1037{
1038 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1039
1041 pageInfo.SetNElements(page.GetNElements());
1042 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1043 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1044 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1045}
1046
1049{
1050 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1051
1053 pageInfo.SetNElements(sealedPage.GetNElements());
1054 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1055 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1056 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1057}
1058
1059std::vector<ROOT::RNTupleLocator>
1060ROOT::Internal::RPagePersistentSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
1061 const std::vector<bool> &mask)
1062{
1063 std::vector<ROOT::RNTupleLocator> locators;
1064 locators.reserve(mask.size());
1065 std::size_t i = 0;
1066 for (auto &range : ranges) {
1067 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1068 if (mask[i++])
1069 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1070 }
1071 }
1072 locators.shrink_to_fit();
1073 return locators;
1074}
1075
1076void ROOT::Internal::RPagePersistentSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
1077{
1078 /// Used in the `originalPages` map
1079 struct RSealedPageLink {
1080 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1081 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1082 };
1083
1084 std::vector<bool> mask;
1085 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1086 std::vector<std::size_t> locatorIndexes;
1087 // Maps page checksums to the first sealed page with that checksum
1088 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1089 std::size_t iLocator = 0;
1090 for (auto &range : ranges) {
1091 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1092 mask.reserve(mask.size() + rangeSize);
1093 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1094
1095 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1096 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1097 mask.emplace_back(true);
1098 locatorIndexes.emplace_back(iLocator++);
1099 continue;
1100 }
1101 // Same page merging requires page checksums - this is checked in the write options
1102 R__ASSERT(sealedPageIt->GetHasChecksum());
1103
1104 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1105 auto itr = originalPages.find(chk);
1106 if (itr == originalPages.end()) {
1107 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1108 mask.emplace_back(true);
1109 locatorIndexes.emplace_back(iLocator++);
1110 continue;
1111 }
1112
1113 const auto *p = itr->second.fSealedPage;
1114 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1115 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1116 mask.emplace_back(true);
1117 locatorIndexes.emplace_back(iLocator++);
1118 continue;
1119 }
1120
1121 mask.emplace_back(false);
1122 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1123 }
1124
1125 mask.shrink_to_fit();
1126 locatorIndexes.shrink_to_fit();
1127 }
1128
1129 auto locators = CommitSealedPageVImpl(ranges, mask);
1130 unsigned i = 0;
1131
1132 for (auto &range : ranges) {
1133 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1134 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1135
1137 pageInfo.SetNElements(sealedPageIt->GetNElements());
1138 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1139 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1140 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1141 }
1142 }
1143}
1144
1147{
1149 stagedCluster.fNBytesWritten = StageClusterImpl();
1150 stagedCluster.fNEntries = nNewEntries;
1151
1152 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1153 RStagedCluster::RColumnInfo columnInfo;
1154 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1155 if (fOpenColumnRanges[i].IsSuppressed()) {
1156 assert(fOpenPageRanges[i].GetPageInfos().empty());
1157 columnInfo.fPageRange.SetPhysicalColumnId(i);
1158 columnInfo.fIsSuppressed = true;
1159 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1160 fOpenColumnRanges[i].SetNElements(0);
1161 fOpenColumnRanges[i].SetIsSuppressed(false);
1162 } else {
1163 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1164 fOpenPageRanges[i].SetPhysicalColumnId(i);
1165
1166 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1167 fOpenColumnRanges[i].SetNElements(0);
1168 }
1169 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1170 }
1171
1172 return stagedCluster;
1173}
1174
1176{
1177 for (const auto &cluster : clusters) {
1179 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1180 .FirstEntryIndex(fPrevClusterNEntries)
1181 .NEntries(cluster.fNEntries);
1182 for (const auto &columnInfo : cluster.fColumnInfos) {
1183 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1184 if (columnInfo.fIsSuppressed) {
1185 assert(columnInfo.fPageRange.GetPageInfos().empty());
1186 clusterBuilder.MarkSuppressedColumnRange(colId);
1187 } else {
1188 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1189 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1190 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1191 }
1192 }
1193
1194 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1195 for (const auto &columnInfo : cluster.fColumnInfos) {
1196 if (!columnInfo.fIsSuppressed)
1197 continue;
1198 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1199 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1200 // cluster. This information has been determined for the committed cluster descriptor through
1201 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1202 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1203 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1204 columnRangeFromDesc.GetNElements());
1205 }
1206
1207 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1208 fPrevClusterNEntries += cluster.fNEntries;
1209 }
1210}
1211
1213{
1214 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1215
1216 const auto nClusters = descriptor.GetNActiveClusters();
1217 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1218 physClusterIDs.reserve(nClusters);
1219 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1220 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1221 }
1222
1223 auto szPageList =
1224 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1227
1228 const auto clusterGroupId = descriptor.GetNClusterGroups();
1229 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1231 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1232 if (fNextClusterInGroup == nClusters) {
1233 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1234 } else {
1235 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1236 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1237 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1238 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1239 firstClusterDesc.GetFirstEntryIndex())
1240 .NClusters(nClusters - fNextClusterInGroup);
1241 }
1242 std::vector<ROOT::DescriptorId_t> clusterIds;
1243 clusterIds.reserve(nClusters);
1244 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1245 clusterIds.emplace_back(i);
1246 }
1247 cgBuilder.AddSortedClusters(clusterIds);
1248 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1249 fSerializationContext.MapClusterGroupId(clusterGroupId);
1250
1251 fNextClusterInGroup = nClusters;
1252}
1253
1255{
1256 if (!fStreamerInfos.empty()) {
1257 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1258 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1259 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1260 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1261 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1262 // should be zero.
1263 R__ASSERT(etDesc.GetTypeName().empty());
1264 R__ASSERT(etDesc.GetTypeVersion() == 0);
1265 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1266 fStreamerInfos.merge(etInfo);
1267 }
1268 }
1269
1272 .Content(RNTupleSerializer::SerializeStreamerInfos(fStreamerInfos));
1273 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1274 }
1275
1276 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1277
1278 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1280 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1281
1282 CommitDatasetImpl(bufFooter.get(), szFooter);
1283}
1284
1286{
1287 fMetrics = RNTupleMetrics(prefix);
1288 fCounters = std::make_unique<RCounters>(RCounters{
1289 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageCommitted", "", "number of pages committed to storage"),
1290 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szWritePayload", "B", "volume written for committed pages"),
1291 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1292 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1293 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1294 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuWrite", "ns", "CPU time spent writing"),
1295 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1296 "CPU time spent compressing")});
1297}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:304
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:346
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:352
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:339
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:340
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:354
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:361
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:341
std::uint32_t GetIndex() const
Definition RColumn.hxx:351
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:99
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void RegisterStreamerInfos()
Builds the streamer info records from the descriptor's extra type info section.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
void UnzipCluster(ROOT::Internal::RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::NTupleSize_t GetNEntries()
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
The window of element indexes of a particular column in a particular cluster.
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
The RNTupleModel encapulates the schema of an RNTuple.
const std::string & GetDescription() const
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:290
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:198
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
Namespace for new ROOT classes and functions.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
bool operator>(const RColumnInfo &other) const
Information about a single page in the context of a cluster's page range.