Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5
6/*************************************************************************
7 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
8 * All rights reserved. *
9 * *
10 * For the licensing terms see $ROOTSYS/LICENSE. *
11 * For the list of contributors see $ROOTSYS/README/CREDITS. *
12 *************************************************************************/
13
14#include <ROOT/RPageStorage.hxx>
16#include <ROOT/RColumn.hxx>
17#include <ROOT/RFieldBase.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleUtils.hxx>
23#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <cstring>
38#include <functional>
39#include <memory>
40#include <string_view>
41#include <unordered_map>
42#include <utility>
43
53
57
63
65 : fMetrics(""), fPageAllocator(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()), fNTupleName(name)
66{
67}
68
70
72{
73 if (!fHasChecksum)
74 return;
75
76 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
77 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
78 std::uint64_t xxhash3;
80}
81
83{
84 if (!fHasChecksum)
86
87 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
88 if (!success)
89 return R__FAIL("page checksum verification failed, data corruption detected");
91}
92
94{
95 if (!fHasChecksum)
96 return R__FAIL("invalid attempt to extract non-existing page checksum");
97
98 assert(fBufferSize >= kNBytesPageChecksum);
99 std::uint64_t checksum;
101 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
102 return checksum;
103}
104
105//------------------------------------------------------------------------------
106
109{
110 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
111 for (auto &columnInfo : itr->second) {
112 if (columnInfo.fElementId == elementId) {
113 columnInfo.fRefCounter++;
114 return;
115 }
116 }
117 itr->second.emplace_back(RColumnInfo{elementId, 1});
118}
119
122{
123 auto itr = fColumnInfos.find(physicalColumnId);
124 R__ASSERT(itr != fColumnInfos.end());
125 for (std::size_t i = 0; i < itr->second.size(); ++i) {
126 if (itr->second[i].fElementId != elementId)
127 continue;
128
129 itr->second[i].fRefCounter--;
130 if (itr->second[i].fRefCounter == 0) {
131 itr->second.erase(itr->second.begin() + i);
132 if (itr->second.empty()) {
133 fColumnInfos.erase(itr);
134 }
135 }
136 break;
137 }
138}
139
147
149{
150 if (fFirstEntry == ROOT::kInvalidNTupleIndex) {
151 /// Entry range unset, we assume that the entry range covers the complete source
152 return true;
153 }
154
155 if (clusterDesc.GetNEntries() == 0)
156 return true;
157 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
158 return false;
159 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
160 return false;
161 return true;
162}
163
166 fOptions(options),
167 fClusterPool(*this, ROOT::Internal::RNTupleReadOptionsManip::GetClusterBunchSize(fOptions)),
168 fPagePool(*this)
169{
170}
171
173
174std::unique_ptr<ROOT::Internal::RPageSource>
175ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
176 const ROOT::RNTupleReadOptions &options)
177{
178 if (ntupleName.empty()) {
179 throw RException(R__FAIL("empty RNTuple name"));
180 }
181 if (location.empty()) {
182 throw RException(R__FAIL("empty storage location"));
183 }
184 if (location.find("daos://") == 0)
185#ifdef R__ENABLE_DAOS
186 return std::make_unique<ROOT::Experimental::Internal::RPageSourceDaos>(ntupleName, location, options);
187#else
188 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
189#endif
190
191 return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
192}
193
196{
198 auto physicalId =
199 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
201 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
202 return ColumnHandle_t{physicalId, &column};
203}
204
206{
207 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
208}
209
211{
212 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
213 throw RException(R__FAIL("invalid entry range"));
214 }
215 fEntryRange = range;
216}
217
219{
220 if (!fHasStructure)
221 LoadStructureImpl();
222 fHasStructure = true;
223}
224
226{
227 LoadStructure();
228 if (!fIsAttached)
229 GetExclDescriptorGuard().MoveIn(AttachImpl(mode));
230 fIsAttached = true;
231}
232
233std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSource::Clone() const
234{
235 auto clone = CloneImpl();
236 if (fIsAttached) {
237 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
238 clone->fHasStructure = true;
239 clone->fIsAttached = true;
240 }
241 return clone;
242}
243
245{
246 return GetSharedDescriptorGuard()->GetNEntries();
247}
248
250{
251 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
252}
253
255{
256 if (fTaskScheduler)
257 UnzipClusterImpl(cluster);
258}
259
261{
262 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
263
264 const auto clusterId = cluster->GetId();
265 auto descriptorGuard = GetSharedDescriptorGuard();
266 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
267
268 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
269
270 std::atomic<bool> foundChecksumFailure{false};
271
272 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
273 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
274 for (const auto columnId : columnsInCluster) {
275 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
276 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
277 // of active columns.
278 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
279 continue;
280 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
281
282 allElements.reserve(allElements.size() + columnInfos.size());
283 for (const auto &info : columnInfos) {
284 allElements.emplace_back(GenerateColumnElement(info.fElementId));
285
286 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
287 std::uint64_t pageNo = 0;
288 std::uint64_t firstInPage = 0;
289 for (const auto &pi : pageRange.GetPageInfos()) {
290 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
292 sealedPage.SetNElements(pi.GetNElements());
293 sealedPage.SetHasChecksum(pi.HasChecksum());
294 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
295 sealedPage.SetBuffer(onDiskPage->GetAddress());
296 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
297
298 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
300 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
301 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
302 auto rv = UnsealPage(sealedPage, *element);
303 if (!rv) {
305 return;
306 }
307 auto newPage = rv.Unwrap();
308 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
309
310 newPage.SetWindow(indexOffset + firstInPage,
312 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
313 };
314
315 fTaskScheduler->AddTask(taskFunc);
316
317 firstInPage += pi.GetNElements();
318 pageNo++;
319 } // for all pages in column
320
321 fCounters->fNPageUnsealed.Add(pageNo);
322 } // for all in-memory types of the column
323 } // for all columns in cluster
324
325 fTaskScheduler->Wait();
326
328 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
329 }
330}
331
336{
337 auto descriptorGuard = GetSharedDescriptorGuard();
338 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
339
340 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
341 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
342 continue;
343
344 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
346 for (const auto &pageInfo : pageRange.GetPageInfos()) {
347 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
350 pageInfo.GetLocator().GetNBytesOnStorage()));
351 } else {
353 }
354 ++pageNo;
355 }
356 }
357}
358
360{
361 if (fLastUsedCluster == clusterId)
362 return;
363
365 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
366 auto itr = fPreloadedClusters.begin();
367 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
368 if (fPinnedClusters.count(itr->second) > 0) {
369 ++itr;
370 } else {
371 fPagePool.Evict(itr->second);
372 itr = fPreloadedClusters.erase(itr);
373 }
374 }
375 std::size_t poolWindow = 0;
376 while ((itr != fPreloadedClusters.end()) &&
378 ++itr;
379 ++poolWindow;
380 }
381 while (itr != fPreloadedClusters.end()) {
382 if (fPinnedClusters.count(itr->second) > 0) {
383 ++itr;
384 } else {
385 fPagePool.Evict(itr->second);
386 itr = fPreloadedClusters.erase(itr);
387 }
388 }
389
390 fLastUsedCluster = clusterId;
391}
392
395{
396 const auto columnId = columnHandle.fPhysicalId;
397 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
398 auto cachedPageRef =
399 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
400 if (!cachedPageRef.Get().IsNull()) {
401 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
402 return cachedPageRef;
403 }
404
405 std::uint64_t idxInCluster;
407 {
408 auto descriptorGuard = GetSharedDescriptorGuard();
409 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
410
411 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
412 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
413
414 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
415 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
416 if (columnRange.IsSuppressed())
418
419 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
420 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
421 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
422 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
423 }
424
425 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
426 throw RException(R__FAIL("tried to read a page with an unknown locator"));
427
428 UpdateLastUsedCluster(clusterInfo.fClusterId);
429 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
430}
431
434{
435 const auto clusterId = localIndex.GetClusterId();
436 const auto idxInCluster = localIndex.GetIndexInCluster();
437 const auto columnId = columnHandle.fPhysicalId;
438 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
439 auto cachedPageRef =
440 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
441 if (!cachedPageRef.Get().IsNull()) {
442 UpdateLastUsedCluster(clusterId);
443 return cachedPageRef;
444 }
445
447 throw RException(R__FAIL("entry out of bounds"));
448
450 {
451 auto descriptorGuard = GetSharedDescriptorGuard();
452 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
453 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
454 if (columnRange.IsSuppressed())
456
457 clusterInfo.fClusterId = clusterId;
458 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
459 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
460 }
461
462 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
463 throw RException(R__FAIL("tried to read a page with an unknown locator"));
464
465 UpdateLastUsedCluster(clusterInfo.fClusterId);
466 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
467}
468
470{
471 fMetrics = RNTupleMetrics(prefix);
472 fMetrics.ObserveMetrics(fClusterPool.GetMetrics());
473 fMetrics.ObserveMetrics(fPagePool.GetMetrics());
474 fCounters = std::make_unique<RCounters>(RCounters{
475 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
476 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
477 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadPayload", "B", "volume read from storage (required)"),
478 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szReadOverhead", "B", "volume read from storage (overhead)"),
479 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
480 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nClusterLoaded", "",
481 "number of partial clusters preloaded from storage"),
482 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
483 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageUnsealed", "", "number of pages unzipped and decoded"),
484 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
485 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
486 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuRead", "ns", "CPU time spent reading"),
487 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
488 "CPU time spent decompressing"),
489 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
490 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
491 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
492 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
493 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
494 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
495 if (auto walltime = timeWallRead->GetValueAsInt()) {
496 double payload = szReadPayload->GetValueAsInt();
497 double overhead = szReadOverhead->GetValueAsInt();
498 // unit: bytes / nanosecond = GB/s
499 return {true, (1000. * (payload + overhead) / walltime)};
500 }
501 }
502 }
503 }
504 return {false, -1.};
505 }),
506 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
507 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
508 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
509 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
510 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
511 if (auto walltime = timeWallRead->GetValueAsInt()) {
512 double unzip = szUnzip->GetValueAsInt();
513 // unit: bytes / nanosecond = GB/s
514 return {true, 1000. * unzip / walltime};
515 }
516 }
517 }
518 return {false, -1.};
519 }),
520 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
521 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
522 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
523 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
524 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
525 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
526 double unzip = szUnzip->GetValueAsInt();
527 // unit: bytes / nanosecond = GB/s
528 return {true, 1000. * unzip / walltime};
529 }
530 }
531 }
532 return {false, -1.};
533 }),
534 *fMetrics.MakeCounter<RNTupleCalcPerf *>(
535 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
536 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
537 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
538 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
539 if (auto payload = szReadPayload->GetValueAsInt()) {
540 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
541 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
543 }
544 }
545 return {false, -1.};
546 }),
547 *fMetrics.MakeCounter<RNTupleCalcPerf *>("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
548 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
549 if (const auto szReadPayload =
550 metrics.GetLocalCounter("szReadPayload")) {
551 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
552 if (auto unzip = szUnzip->GetValueAsInt()) {
553 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
554 }
555 }
556 }
557 return {false, -1.};
558 })});
559}
560
563{
564 return UnsealPage(sealedPage, element, *fPageAllocator);
565}
566
570{
571 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
572 // large enough to hold `sealedPage.fNElements`
574 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
575 page.GrowUnchecked(sealedPage.GetNElements());
576 memset(page.GetBuffer(), 0, page.GetNBytes());
577 return page;
578 }
579
580 auto rv = sealedPage.VerifyChecksumIfEnabled();
581 if (!rv)
582 return R__FORWARD_ERROR(rv);
583
584 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
585 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
586 if (sealedPage.GetDataSize() != bytesPacked) {
588 page.GetBuffer());
589 } else {
590 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
591 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
592 // Note that usually pages are compressed.
593 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
594 }
595
596 if (!element.IsMappable()) {
597 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
598 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
599 page = std::move(tmp);
600 }
601
602 page.GrowUnchecked(sealedPage.GetNElements());
603 return page;
604}
605
607{
608 if (fHasStreamerInfosRegistered)
609 return;
610
611 for (const auto &extraTypeInfo : fDescriptor.GetExtraTypeInfoIterable()) {
613 continue;
614 // We don't need the result, it's enough that during deserialization, BuildCheck() is called for every
615 // streamer info record.
617 }
618
619 fHasStreamerInfosRegistered = true;
620}
621
622//------------------------------------------------------------------------------
623
625{
626 // Make the sort order unique by adding the physical on-disk column id as a secondary key
627 if (fCurrentPageSize == other.fCurrentPageSize)
628 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
629 return fCurrentPageSize > other.fCurrentPageSize;
630}
631
633{
634 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
635 return true;
636
637 auto itr = fColumnsSortedByPageSize.begin();
638 while (itr != fColumnsSortedByPageSize.end()) {
639 if (itr->fCurrentPageSize <= pageSizeLimit)
640 break;
641 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
642 ++itr;
643 continue;
644 }
645
646 // Flushing the current column will invalidate itr
647 auto itrFlush = itr++;
648
649 RColumnInfo next;
650 if (itr != fColumnsSortedByPageSize.end())
651 next = *itr;
652
653 itrFlush->fColumn->Flush();
654 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
655 return true;
656
657 if (next.fColumn == nullptr)
658 return false;
659 itr = fColumnsSortedByPageSize.find(next);
660 };
661
662 return false;
663}
664
666{
667 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
668 auto itr = fColumnsSortedByPageSize.find(key);
669 if (itr == fColumnsSortedByPageSize.end()) {
670 if (!TryEvict(newWritePageSize, 0))
671 return false;
672 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
673 fCurrentAllocatedBytes += newWritePageSize;
674 return true;
675 }
676
678 assert(newWritePageSize >= elem.fInitialPageSize);
679
680 if (newWritePageSize == elem.fCurrentPageSize)
681 return true;
682
683 fColumnsSortedByPageSize.erase(itr);
684
685 if (newWritePageSize < elem.fCurrentPageSize) {
686 // Page got smaller
687 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
688 elem.fCurrentPageSize = newWritePageSize;
689 fColumnsSortedByPageSize.insert(elem);
690 return true;
691 }
692
693 // Page got larger, we may need to make space available
694 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
695 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
696 // Don't change anything, let the calling column flush itself
697 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
698 fColumnsSortedByPageSize.insert(elem);
699 return false;
700 }
701 fCurrentAllocatedBytes += diffBytes;
702 elem.fCurrentPageSize = newWritePageSize;
703 fColumnsSortedByPageSize.insert(elem);
704 return true;
705}
706
707//------------------------------------------------------------------------------
708
710 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
711{
713}
714
716
718{
719 assert(config.fPage);
720 assert(config.fElement);
721 assert(config.fBuffer);
722
723 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
724 bool isAdoptedBuffer = true;
725 auto nBytesPacked = config.fPage->GetNBytes();
726 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
727
728 if (!config.fElement->IsMappable()) {
729 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
730 pageBuf = new unsigned char[nBytesPacked];
731 isAdoptedBuffer = false;
732 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
733 }
735
736 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
737 config.fWriteChecksum) {
740 if (!isAdoptedBuffer)
741 delete[] pageBuf;
742 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
743 isAdoptedBuffer = true;
744 }
745
747
749 sealedPage.ChecksumIfEnabled();
750
751 return sealedPage;
752}
753
756{
757 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
758 if (fSealPageBuffer.size() < nBytes)
759 fSealPageBuffer.resize(nBytes);
760
761 RSealPageConfig config;
762 config.fPage = &page;
763 config.fElement = &element;
764 config.fCompressionSettings = GetWriteOptions().GetCompression();
765 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
766 config.fAllowAlias = true;
767 config.fBuffer = fSealPageBuffer.data();
768
769 return SealPage(config);
770}
771
773{
774 for (const auto &cb : fOnDatasetCommitCallbacks)
775 cb(*this);
776 CommitDatasetImpl();
777}
778
780{
781 R__ASSERT(nElements > 0);
782 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
783 const auto nBytes = elementSize * nElements;
784 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
785 return ROOT::Internal::RPage();
786 return fPageAllocator->NewPage(elementSize, nElements);
787}
788
789//------------------------------------------------------------------------------
790
791std::unique_ptr<ROOT::Internal::RPageSink>
792ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
793 const ROOT::RNTupleWriteOptions &options)
794{
795 if (ntupleName.empty()) {
796 throw RException(R__FAIL("empty RNTuple name"));
797 }
798 if (location.empty()) {
799 throw RException(R__FAIL("empty storage location"));
800 }
801 if (location.find("daos://") == 0) {
802#ifdef R__ENABLE_DAOS
803 return std::make_unique<ROOT::Experimental::Internal::RPageSinkDaos>(ntupleName, location, options);
804#else
805 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
806#endif
807 }
808
809 // Otherwise assume that the user wants us to create a file.
810 return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
811}
812
814 const ROOT::RNTupleWriteOptions &options)
815 : RPageSink(name, options)
816{
817}
818
820
823{
824 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
826 columnBuilder.LogicalColumnId(columnId)
827 .PhysicalColumnId(columnId)
828 .FieldId(fieldId)
829 .BitsOnStorage(column.GetBitsOnStorage())
830 .ValueRange(column.GetValueRange())
831 .Type(column.GetType())
832 .Index(column.GetIndex())
833 .RepresentationIndex(column.GetRepresentationIndex())
834 .FirstElementIndex(column.GetFirstElementIndex());
835 // For late model extension, we assume that the primary column representation is the active one for the
836 // deferred range. All other representations are suppressed.
837 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
838 columnBuilder.SetSuppressedDeferred();
839 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
840 return ColumnHandle_t{columnId, &column};
841}
842
845{
846 if (fIsInitialized)
847 for (const auto &field : changeset.fAddedFields)
848 if (field->GetStructure() == ENTupleStructure::kStreamer)
849 throw ROOT::RException(R__FAIL("a Model cannot be extended with Streamer fields"));
850
851 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
852
853 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
854 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
855 // of the changeset follow immediately the already existing physical columns
856 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
857 const auto &reps = f.GetColumnRepresentatives();
858 if (reps.empty())
859 return 0;
860 return reps.size() * reps[0].size();
861 };
862 std::uint32_t nNewPhysicalColumns = 0;
863 for (auto f : changeset.fAddedFields) {
865 for (const auto &descendant : *f)
867 }
868 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
869 }
870
871 auto addField = [&](ROOT::RFieldBase &f) {
872 auto fieldId = descriptor.GetNFields();
873 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
874 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
875 f.SetOnDiskId(fieldId);
876 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
877 };
879 auto fieldId = descriptor.GetNFields();
880 auto sourceFieldId =
882 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
883 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
884 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
885 f.SetOnDiskId(fieldId);
886 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
887 auto targetId = descriptor.GetNLogicalColumns();
889 columnBuilder.LogicalColumnId(targetId)
890 .PhysicalColumnId(source.GetLogicalId())
891 .FieldId(fieldId)
892 .BitsOnStorage(source.GetBitsOnStorage())
893 .ValueRange(source.GetValueRange())
894 .Type(source.GetType())
895 .Index(source.GetIndex())
896 .RepresentationIndex(source.GetRepresentationIndex());
897 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
898 }
899 };
900
901 R__ASSERT(firstEntry >= fPrevClusterNEntries);
902 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
903 for (auto f : changeset.fAddedFields) {
904 addField(*f);
905 for (auto &descendant : *f)
907 }
908 for (auto f : changeset.fAddedProjectedFields) {
910 for (auto &descendant : *f)
912 }
913
914 const auto nColumns = descriptor.GetNPhysicalColumns();
915 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
916 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
919 columnRange.SetPhysicalColumnId(i);
920 // We set the first element index in the current cluster to the first element that is part of a materialized page
921 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
922 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
923 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
924 columnRange.SetNElements(0);
925 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
926 fOpenColumnRanges.emplace_back(columnRange);
928 pageRange.SetPhysicalColumnId(i);
929 fOpenPageRanges.emplace_back(std::move(pageRange));
930 }
931
932 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
933 // header was already serialized, this has to be done manually as it is required for page list serialization.
934 if (fSerializationContext.GetHeaderSize() > 0)
935 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
936}
937
939{
941 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
942
943 fInfosOfStreamerFields.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
944}
945
947{
948 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
949 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
950
952 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
953 fieldZero.SetOnDiskId(0);
955 projectedFields.GetFieldZero().SetOnDiskId(0);
956
958 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
959 for (auto f : fieldZero.GetMutableSubfields())
960 initialChangeset.fAddedFields.emplace_back(f);
961 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
962 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
963 initialChangeset.fAddedProjectedFields.emplace_back(f);
964 UpdateSchema(initialChangeset, 0U);
965
966 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
967 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
968 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
969 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
970
971 fDescriptorBuilder.BeginHeaderExtension();
972}
973
974std::unique_ptr<ROOT::RNTupleModel>
976{
977 // Create new descriptor
978 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
979 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
980
981 // Create column/page ranges
982 const auto nColumns = descriptor.GetNPhysicalColumns();
983 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
984 fOpenColumnRanges.reserve(nColumns);
985 fOpenPageRanges.reserve(nColumns);
986 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
987 const auto &column = descriptor.GetColumnDescriptor(i);
989 columnRange.SetPhysicalColumnId(i);
990 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
991 columnRange.SetNElements(0);
992 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
993 fOpenColumnRanges.emplace_back(columnRange);
995 pageRange.SetPhysicalColumnId(i);
996 fOpenPageRanges.emplace_back(std::move(pageRange));
997 }
998
999 if (copyClusters) {
1000 // Clone and add all cluster descriptors
1001 auto clusterId = srcDescriptor.FindClusterId(0, 0);
1003 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
1004 auto nEntries = cluster.GetNEntries();
1005 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1006 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
1007 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
1008 break;
1009 const auto &columnRange = cluster.GetColumnRange(i);
1010 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
1011 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
1012 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
1013 }
1014 fDescriptorBuilder.AddCluster(cluster.Clone());
1015 fPrevClusterNEntries += nEntries;
1016
1017 clusterId = srcDescriptor.FindNextClusterId(clusterId);
1018 }
1019 }
1020
1021 // Create model
1023 modelOpts.SetReconstructProjections(true);
1024 // We want to emulate unknown types to allow merging RNTuples containing types that we lack dictionaries for.
1025 modelOpts.SetEmulateUnknownTypes(true);
1026 auto model = descriptor.CreateModel(modelOpts);
1027 if (!copyClusters) {
1029 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
1030 }
1031
1032 // Serialize header and init from it
1033 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
1034 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
1035 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
1036 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1037
1038 fDescriptorBuilder.BeginHeaderExtension();
1039
1040 // mark this sink as initialized
1041 fIsInitialized = true;
1042
1043 return model;
1044}
1045
1047{
1048 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1049}
1050
1052{
1053 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1054
1056 pageInfo.SetNElements(page.GetNElements());
1057 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1058 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1059 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1060}
1061
1064{
1065 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1066
1068 pageInfo.SetNElements(sealedPage.GetNElements());
1069 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1070 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1071 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1072}
1073
1074std::vector<ROOT::RNTupleLocator>
1075ROOT::Internal::RPagePersistentSink::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
1076 const std::vector<bool> &mask)
1077{
1078 std::vector<ROOT::RNTupleLocator> locators;
1079 locators.reserve(mask.size());
1080 std::size_t i = 0;
1081 for (auto &range : ranges) {
1082 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1083 if (mask[i++])
1084 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1085 }
1086 }
1087 locators.shrink_to_fit();
1088 return locators;
1089}
1090
1091void ROOT::Internal::RPagePersistentSink::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
1092{
1093 /// Used in the `originalPages` map
1094 struct RSealedPageLink {
1095 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1096 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1097 };
1098
1099 std::vector<bool> mask;
1100 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1101 std::vector<std::size_t> locatorIndexes;
1102 // Maps page checksums to the first sealed page with that checksum
1103 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1104 std::size_t iLocator = 0;
1105 for (auto &range : ranges) {
1106 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1107 mask.reserve(mask.size() + rangeSize);
1108 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1109
1110 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1111 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1112 mask.emplace_back(true);
1113 locatorIndexes.emplace_back(iLocator++);
1114 continue;
1115 }
1116 // Same page merging requires page checksums - this is checked in the write options
1117 R__ASSERT(sealedPageIt->GetHasChecksum());
1118
1119 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1120 auto itr = originalPages.find(chk);
1121 if (itr == originalPages.end()) {
1122 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1123 mask.emplace_back(true);
1124 locatorIndexes.emplace_back(iLocator++);
1125 continue;
1126 }
1127
1128 const auto *p = itr->second.fSealedPage;
1129 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1130 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1131 mask.emplace_back(true);
1132 locatorIndexes.emplace_back(iLocator++);
1133 continue;
1134 }
1135
1136 mask.emplace_back(false);
1137 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1138 }
1139
1140 mask.shrink_to_fit();
1141 locatorIndexes.shrink_to_fit();
1142 }
1143
1144 auto locators = CommitSealedPageVImpl(ranges, mask);
1145 unsigned i = 0;
1146
1147 for (auto &range : ranges) {
1148 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1149 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1150
1152 pageInfo.SetNElements(sealedPageIt->GetNElements());
1153 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1154 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1155 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1156 }
1157 }
1158}
1159
1162{
1164 stagedCluster.fNBytesWritten = StageClusterImpl();
1165 stagedCluster.fNEntries = nNewEntries;
1166
1167 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1168 RStagedCluster::RColumnInfo columnInfo;
1169 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1170 if (fOpenColumnRanges[i].IsSuppressed()) {
1171 assert(fOpenPageRanges[i].GetPageInfos().empty());
1172 columnInfo.fPageRange.SetPhysicalColumnId(i);
1173 columnInfo.fIsSuppressed = true;
1174 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1175 fOpenColumnRanges[i].SetNElements(0);
1176 fOpenColumnRanges[i].SetIsSuppressed(false);
1177 } else {
1178 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1179 fOpenPageRanges[i].SetPhysicalColumnId(i);
1180
1181 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1182 fOpenColumnRanges[i].SetNElements(0);
1183 }
1184 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1185 }
1186
1187 return stagedCluster;
1188}
1189
1191{
1192 for (const auto &cluster : clusters) {
1194 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1195 .FirstEntryIndex(fPrevClusterNEntries)
1196 .NEntries(cluster.fNEntries);
1197 for (const auto &columnInfo : cluster.fColumnInfos) {
1198 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1199 if (columnInfo.fIsSuppressed) {
1200 assert(columnInfo.fPageRange.GetPageInfos().empty());
1201 clusterBuilder.MarkSuppressedColumnRange(colId);
1202 } else {
1203 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1204 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1205 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1206 }
1207 }
1208
1209 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1210 for (const auto &columnInfo : cluster.fColumnInfos) {
1211 if (!columnInfo.fIsSuppressed)
1212 continue;
1213 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1214 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1215 // cluster. This information has been determined for the committed cluster descriptor through
1216 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1217 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1218 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1219 columnRangeFromDesc.GetNElements());
1220 }
1221
1222 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1223 fPrevClusterNEntries += cluster.fNEntries;
1224 }
1225}
1226
1228{
1229 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1230
1231 const auto nClusters = descriptor.GetNActiveClusters();
1232 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1233 physClusterIDs.reserve(nClusters);
1234 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1235 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1236 }
1237
1238 auto szPageList =
1239 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1242
1243 const auto clusterGroupId = descriptor.GetNClusterGroups();
1244 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1246 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1247 if (fNextClusterInGroup == nClusters) {
1248 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1249 } else {
1250 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1251 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1252 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1253 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1254 firstClusterDesc.GetFirstEntryIndex())
1255 .NClusters(nClusters - fNextClusterInGroup);
1256 }
1257 std::vector<ROOT::DescriptorId_t> clusterIds;
1258 clusterIds.reserve(nClusters);
1259 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1260 clusterIds.emplace_back(i);
1261 }
1262 cgBuilder.AddSortedClusters(clusterIds);
1263 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1264 fSerializationContext.MapClusterGroupId(clusterGroupId);
1265
1266 fNextClusterInGroup = nClusters;
1267}
1268
1270{
1271 if (!fInfosOfStreamerFields.empty()) {
1272 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1273 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1274 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1275 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1276 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1277 // should be zero.
1278 R__ASSERT(etDesc.GetTypeName().empty());
1279 R__ASSERT(etDesc.GetTypeVersion() == 0);
1280 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1281 fInfosOfStreamerFields.merge(etInfo);
1282 }
1283 }
1284
1287 .Content(RNTupleSerializer::SerializeStreamerInfos(fInfosOfStreamerFields));
1288 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1289 }
1290
1291 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1292
1293 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1295 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1296
1297 CommitDatasetImpl(bufFooter.get(), szFooter);
1298}
1299
1301{
1302 fMetrics = RNTupleMetrics(prefix);
1303 fCounters = std::make_unique<RCounters>(RCounters{
1304 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("nPageCommitted", "", "number of pages committed to storage"),
1305 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szWritePayload", "B", "volume written for committed pages"),
1306 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1307 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1308 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1309 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuWrite", "ns", "CPU time spent writing"),
1310 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1311 "CPU time spent compressing")});
1312}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:304
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:150
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:346
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:352
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:339
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:340
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:354
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:361
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:341
std::uint32_t GetIndex() const
Definition RColumn.hxx:351
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:99
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) override
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void RegisterStreamerInfos()
Builds the streamer info records from the descriptor's extra type info section.
void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode=ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
void UnzipCluster(ROOT::Internal::RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::NTupleSize_t GetNEntries()
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
The window of element indexes of a particular column in a particular cluster.
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
The RNTupleModel encapulates the schema of an RNTuple.
const std::string & GetDescription() const
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:290
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:198
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
bool operator>(const RColumnInfo &other) const
Information about a single page in the context of a cluster's page range.