Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RFieldBase.hxx>
22#include <ROOT/RNTupleModel.hxx>
24#include <ROOT/RNTupleZip.hxx>
26#include <ROOT/RPageSinkBuf.hxx>
28#ifdef R__ENABLE_DAOS
30#endif
31
32#include <Compression.h>
33#include <TError.h>
34
35#include <algorithm>
36#include <atomic>
37#include <cassert>
38#include <cstring>
39#include <functional>
40#include <memory>
41#include <string_view>
42#include <unordered_map>
43#include <utility>
44
53
55 : fMetrics(""), fPageAllocator(std::make_unique<ROOT::Internal::RPageAllocatorHeap>()), fNTupleName(name)
56{
57}
58
60
62{
63 if (!fHasChecksum)
64 return;
65
66 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
67 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
68 std::uint64_t xxhash3;
70}
71
73{
74 if (!fHasChecksum)
76
77 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
78 if (!success)
79 return R__FAIL("page checksum verification failed, data corruption detected");
81}
82
84{
85 if (!fHasChecksum)
86 return R__FAIL("invalid attempt to extract non-existing page checksum");
87
88 assert(fBufferSize >= kNBytesPageChecksum);
89 std::uint64_t checksum;
91 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
92 return checksum;
93}
94
95//------------------------------------------------------------------------------
96
99{
100 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
101 for (auto &columnInfo : itr->second) {
102 if (columnInfo.fElementId == elementId) {
103 columnInfo.fRefCounter++;
104 return;
105 }
106 }
107 itr->second.emplace_back(RColumnInfo{elementId, 1});
108}
109
112{
113 auto itr = fColumnInfos.find(physicalColumnId);
114 R__ASSERT(itr != fColumnInfos.end());
115 for (std::size_t i = 0; i < itr->second.size(); ++i) {
116 if (itr->second[i].fElementId != elementId)
117 continue;
118
119 itr->second[i].fRefCounter--;
120 if (itr->second[i].fRefCounter == 0) {
121 itr->second.erase(itr->second.begin() + i);
122 if (itr->second.empty()) {
123 fColumnInfos.erase(itr);
124 }
125 }
126 break;
127 }
128}
129
138
141{
142 if (fFirstEntry == ROOT::kInvalidNTupleIndex) {
143 /// Entry range unset, we assume that the entry range covers the complete source
144 return true;
145 }
146
147 if (clusterDesc.GetNEntries() == 0)
148 return true;
149 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
150 return false;
151 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
152 return false;
153 return true;
154}
155
157 : RPageStorage(name), fOptions(options)
158{
159}
160
162
163std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
164ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
165 const ROOT::RNTupleReadOptions &options)
166{
167 if (ntupleName.empty()) {
168 throw RException(R__FAIL("empty RNTuple name"));
169 }
170 if (location.empty()) {
171 throw RException(R__FAIL("empty storage location"));
172 }
173 if (location.find("daos://") == 0)
174#ifdef R__ENABLE_DAOS
175 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
176#else
177 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
178#endif
179
180 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
181}
182
185{
187 auto physicalId =
188 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
190 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
191 return ColumnHandle_t{physicalId, &column};
192}
193
195{
196 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
197}
198
200{
201 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
202 throw RException(R__FAIL("invalid entry range"));
203 }
204 fEntryRange = range;
205}
206
208{
209 if (!fHasStructure)
210 LoadStructureImpl();
211 fHasStructure = true;
212}
213
215{
216 LoadStructure();
217 if (!fIsAttached)
218 GetExclDescriptorGuard().MoveIn(AttachImpl(mode));
219 fIsAttached = true;
220}
221
222std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSource::Clone() const
223{
224 auto clone = CloneImpl();
225 if (fIsAttached) {
226 clone->GetExclDescriptorGuard().MoveIn(GetSharedDescriptorGuard()->Clone());
227 clone->fHasStructure = true;
228 clone->fIsAttached = true;
229 }
230 return clone;
231}
232
234{
235 return GetSharedDescriptorGuard()->GetNEntries();
236}
237
239{
240 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
241}
242
244{
245 if (fTaskScheduler)
246 UnzipClusterImpl(cluster);
247}
248
250{
251 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
252
253 const auto clusterId = cluster->GetId();
254 auto descriptorGuard = GetSharedDescriptorGuard();
255 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
256
257 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
258
259 std::atomic<bool> foundChecksumFailure{false};
260
261 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
262 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
263 for (const auto columnId : columnsInCluster) {
264 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
265 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
266 // of active columns.
267 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
268 continue;
269 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
270
271 allElements.reserve(allElements.size() + columnInfos.size());
272 for (const auto &info : columnInfos) {
273 allElements.emplace_back(GenerateColumnElement(info.fElementId));
274
275 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
276 std::uint64_t pageNo = 0;
277 std::uint64_t firstInPage = 0;
278 for (const auto &pi : pageRange.GetPageInfos()) {
279 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
281 sealedPage.SetNElements(pi.GetNElements());
282 sealedPage.SetHasChecksum(pi.HasChecksum());
283 sealedPage.SetBufferSize(pi.GetLocator().GetNBytesOnStorage() + pi.HasChecksum() * kNBytesPageChecksum);
284 sealedPage.SetBuffer(onDiskPage->GetAddress());
285 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
286
287 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
289 indexOffset = clusterDescriptor.GetColumnRange(columnId).GetFirstElementIndex()]() {
290 const ROOT::Internal::RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
291 auto rv = UnsealPage(sealedPage, *element);
292 if (!rv) {
294 return;
295 }
296 auto newPage = rv.Unwrap();
297 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
298
299 newPage.SetWindow(indexOffset + firstInPage,
301 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
302 };
303
304 fTaskScheduler->AddTask(taskFunc);
305
306 firstInPage += pi.GetNElements();
307 pageNo++;
308 } // for all pages in column
309
310 fCounters->fNPageUnsealed.Add(pageNo);
311 } // for all in-memory types of the column
312 } // for all columns in cluster
313
314 fTaskScheduler->Wait();
315
317 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
318 }
319}
320
325{
326 auto descriptorGuard = GetSharedDescriptorGuard();
327 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
328
329 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
330 if (clusterDesc.GetColumnRange(physicalColumnId).IsSuppressed())
331 continue;
332
333 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
335 for (const auto &pageInfo : pageRange.GetPageInfos()) {
336 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
339 pageInfo.GetLocator().GetNBytesOnStorage()));
340 } else {
342 }
343 ++pageNo;
344 }
345 }
346}
347
349{
350 if (fLastUsedCluster == clusterId)
351 return;
352
354 GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
355 auto itr = fPreloadedClusters.begin();
356 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
357 fPagePool.Evict(itr->second);
358 itr = fPreloadedClusters.erase(itr);
359 }
360 std::size_t poolWindow = 0;
361 while ((itr != fPreloadedClusters.end()) &&
363 ++itr;
364 ++poolWindow;
365 }
366 while (itr != fPreloadedClusters.end()) {
367 fPagePool.Evict(itr->second);
368 itr = fPreloadedClusters.erase(itr);
369 }
370
371 fLastUsedCluster = clusterId;
372}
373
376{
377 const auto columnId = columnHandle.fPhysicalId;
378 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
379 auto cachedPageRef =
380 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
381 if (!cachedPageRef.Get().IsNull()) {
382 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
383 return cachedPageRef;
384 }
385
386 std::uint64_t idxInCluster;
388 {
389 auto descriptorGuard = GetSharedDescriptorGuard();
390 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
391
392 if (clusterInfo.fClusterId == ROOT::kInvalidDescriptorId)
393 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
394
395 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
396 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
397 if (columnRange.IsSuppressed())
399
400 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
401 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
402 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
403 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
404 }
405
406 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
407 throw RException(R__FAIL("tried to read a page with an unknown locator"));
408
409 UpdateLastUsedCluster(clusterInfo.fClusterId);
410 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
411}
412
415{
416 const auto clusterId = localIndex.GetClusterId();
417 const auto idxInCluster = localIndex.GetIndexInCluster();
418 const auto columnId = columnHandle.fPhysicalId;
419 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
420 auto cachedPageRef =
421 fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, columnElementId.fInMemoryType}, localIndex);
422 if (!cachedPageRef.Get().IsNull()) {
423 UpdateLastUsedCluster(clusterId);
424 return cachedPageRef;
425 }
426
428 throw RException(R__FAIL("entry out of bounds"));
429
431 {
432 auto descriptorGuard = GetSharedDescriptorGuard();
433 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
434 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
435 if (columnRange.IsSuppressed())
437
438 clusterInfo.fClusterId = clusterId;
439 clusterInfo.fColumnOffset = columnRange.GetFirstElementIndex();
440 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
441 }
442
443 if (clusterInfo.fPageInfo.GetLocator().GetType() == RNTupleLocator::kTypeUnknown)
444 throw RException(R__FAIL("tried to read a page with an unknown locator"));
445
446 UpdateLastUsedCluster(clusterInfo.fClusterId);
447 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
448}
449
451{
452 fMetrics = Detail::RNTupleMetrics(prefix);
453 fCounters = std::make_unique<RCounters>(RCounters{
454 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
455 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
456 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
457 "volume read from storage (required)"),
458 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
459 "volume read from storage (overhead)"),
460 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
461 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
462 "number of partial clusters preloaded from storage"),
463 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
464 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageUnsealed", "",
465 "number of pages unzipped and decoded"),
466 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
467 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
468 "wall clock time spent decompressing"),
469 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
470 "CPU time spent reading"),
471 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
472 "CPU time spent decompressing"),
473 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
474 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
475 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
476 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
477 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
478 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
479 if (auto walltime = timeWallRead->GetValueAsInt()) {
480 double payload = szReadPayload->GetValueAsInt();
481 double overhead = szReadOverhead->GetValueAsInt();
482 // unit: bytes / nanosecond = GB/s
483 return {true, (1000. * (payload + overhead) / walltime)};
484 }
485 }
486 }
487 }
488 return {false, -1.};
489 }),
490 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
491 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
492 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
493 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
494 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
495 if (auto walltime = timeWallRead->GetValueAsInt()) {
496 double unzip = szUnzip->GetValueAsInt();
497 // unit: bytes / nanosecond = GB/s
498 return {true, 1000. * unzip / walltime};
499 }
500 }
501 }
502 return {false, -1.};
503 }),
504 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
505 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
506 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
507 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
508 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
509 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
510 double unzip = szUnzip->GetValueAsInt();
511 // unit: bytes / nanosecond = GB/s
512 return {true, 1000. * unzip / walltime};
513 }
514 }
515 }
516 return {false, -1.};
517 }),
518 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
519 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
520 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
521 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
522 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
523 if (auto payload = szReadPayload->GetValueAsInt()) {
524 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
525 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
526 }
527 }
528 }
529 return {false, -1.};
530 }),
531 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
532 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
533 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
534 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
535 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
536 if (auto unzip = szUnzip->GetValueAsInt()) {
537 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
538 }
539 }
540 }
541 return {false, -1.};
542 })});
543}
544
548 return UnsealPage(sealedPage, element, *fPageAllocator);
549}
550
554{
555 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
556 // large enough to hold `sealedPage.fNElements`
558 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
559 page.GrowUnchecked(sealedPage.GetNElements());
560 memset(page.GetBuffer(), 0, page.GetNBytes());
561 return page;
562 }
563
564 auto rv = sealedPage.VerifyChecksumIfEnabled();
565 if (!rv)
566 return R__FORWARD_ERROR(rv);
567
568 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
569 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
570 if (sealedPage.GetDataSize() != bytesPacked) {
572 page.GetBuffer());
573 } else {
574 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
575 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
576 // Note that usually pages are compressed.
577 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
578 }
579
580 if (!element.IsMappable()) {
581 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
582 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
583 page = std::move(tmp);
584 }
585
586 page.GrowUnchecked(sealedPage.GetNElements());
587 return page;
588}
589
590//------------------------------------------------------------------------------
591
593{
594 // Make the sort order unique by adding the physical on-disk column id as a secondary key
595 if (fCurrentPageSize == other.fCurrentPageSize)
596 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
597 return fCurrentPageSize > other.fCurrentPageSize;
598}
599
601 std::size_t pageSizeLimit)
602{
603 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
604 return true;
605
606 auto itr = fColumnsSortedByPageSize.begin();
607 while (itr != fColumnsSortedByPageSize.end()) {
608 if (itr->fCurrentPageSize <= pageSizeLimit)
609 break;
610 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
611 ++itr;
612 continue;
613 }
614
615 // Flushing the current column will invalidate itr
616 auto itrFlush = itr++;
617
618 RColumnInfo next;
619 if (itr != fColumnsSortedByPageSize.end())
620 next = *itr;
621
622 itrFlush->fColumn->Flush();
623 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
624 return true;
625
626 if (next.fColumn == nullptr)
627 return false;
628 itr = fColumnsSortedByPageSize.find(next);
629 };
630
631 return false;
632}
633
635{
636 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
637 auto itr = fColumnsSortedByPageSize.find(key);
638 if (itr == fColumnsSortedByPageSize.end()) {
639 if (!TryEvict(newWritePageSize, 0))
640 return false;
641 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
642 fCurrentAllocatedBytes += newWritePageSize;
643 return true;
644 }
645
647 assert(newWritePageSize >= elem.fInitialPageSize);
648
649 if (newWritePageSize == elem.fCurrentPageSize)
650 return true;
651
652 fColumnsSortedByPageSize.erase(itr);
653
654 if (newWritePageSize < elem.fCurrentPageSize) {
655 // Page got smaller
656 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
657 elem.fCurrentPageSize = newWritePageSize;
658 fColumnsSortedByPageSize.insert(elem);
659 return true;
660 }
661
662 // Page got larger, we may need to make space available
663 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
664 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
665 // Don't change anything, let the calling column flush itself
666 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
667 fColumnsSortedByPageSize.insert(elem);
668 return false;
669 }
670 fCurrentAllocatedBytes += diffBytes;
671 elem.fCurrentPageSize = newWritePageSize;
672 fColumnsSortedByPageSize.insert(elem);
673 return true;
674}
675
676//------------------------------------------------------------------------------
677
679 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
680{
682}
683
685
688{
689 assert(config.fPage);
690 assert(config.fElement);
691 assert(config.fBuffer);
692
693 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
694 bool isAdoptedBuffer = true;
695 auto nBytesPacked = config.fPage->GetNBytes();
696 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
697
698 if (!config.fElement->IsMappable()) {
699 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
700 pageBuf = new unsigned char[nBytesPacked];
701 isAdoptedBuffer = false;
702 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
703 }
705
706 if ((config.fCompressionSettings != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
707 config.fWriteChecksum) {
710 if (!isAdoptedBuffer)
711 delete[] pageBuf;
712 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
713 isAdoptedBuffer = true;
714 }
715
717
719 sealedPage.ChecksumIfEnabled();
720
721 return sealedPage;
722}
723
726{
727 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
728 if (fSealPageBuffer.size() < nBytes)
729 fSealPageBuffer.resize(nBytes);
730
731 RSealPageConfig config;
732 config.fPage = &page;
733 config.fElement = &element;
734 config.fCompressionSettings = GetWriteOptions().GetCompression();
735 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
736 config.fAllowAlias = true;
737 config.fBuffer = fSealPageBuffer.data();
738
739 return SealPage(config);
740}
741
743{
744 for (const auto &cb : fOnDatasetCommitCallbacks)
745 cb(*this);
746 CommitDatasetImpl();
747}
748
751{
752 R__ASSERT(nElements > 0);
753 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
754 const auto nBytes = elementSize * nElements;
755 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
756 return ROOT::Internal::RPage();
757 return fPageAllocator->NewPage(elementSize, nElements);
758}
759
760//------------------------------------------------------------------------------
761
762std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
764 const ROOT::RNTupleWriteOptions &options)
765{
766 if (ntupleName.empty()) {
767 throw RException(R__FAIL("empty RNTuple name"));
768 }
769 if (location.empty()) {
770 throw RException(R__FAIL("empty storage location"));
771 }
772 if (location.find("daos://") == 0) {
773#ifdef R__ENABLE_DAOS
774 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
775#else
776 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
777#endif
778 }
779
780 // Otherwise assume that the user wants us to create a file.
781 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
782}
783
789
791
794{
795 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
797 columnBuilder.LogicalColumnId(columnId)
798 .PhysicalColumnId(columnId)
799 .FieldId(fieldId)
800 .BitsOnStorage(column.GetBitsOnStorage())
801 .ValueRange(column.GetValueRange())
802 .Type(column.GetType())
803 .Index(column.GetIndex())
804 .RepresentationIndex(column.GetRepresentationIndex())
805 .FirstElementIndex(column.GetFirstElementIndex());
806 // For late model extension, we assume that the primary column representation is the active one for the
807 // deferred range. All other representations are suppressed.
808 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
809 columnBuilder.SetSuppressedDeferred();
810 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
811 return ColumnHandle_t{columnId, &column};
812}
813
816{
817 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
818
819 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
820 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
821 // of the changeset follow immediately the already existing physical columns
822 auto getNColumns = [](const ROOT::RFieldBase &f) -> std::size_t {
823 const auto &reps = f.GetColumnRepresentatives();
824 if (reps.empty())
825 return 0;
826 return reps.size() * reps[0].size();
827 };
828 std::uint32_t nNewPhysicalColumns = 0;
829 for (auto f : changeset.fAddedFields) {
831 for (const auto &descendant : *f)
833 }
834 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
835 }
836
837 auto addField = [&](ROOT::RFieldBase &f) {
838 auto fieldId = descriptor.GetNFields();
839 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
840 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
841 f.SetOnDiskId(fieldId);
842 ROOT::Internal::CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
843 };
845 auto fieldId = descriptor.GetNFields();
846 auto sourceFieldId =
848 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
849 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
850 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
851 f.SetOnDiskId(fieldId);
852 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
853 auto targetId = descriptor.GetNLogicalColumns();
855 columnBuilder.LogicalColumnId(targetId)
856 .PhysicalColumnId(source.GetLogicalId())
857 .FieldId(fieldId)
858 .BitsOnStorage(source.GetBitsOnStorage())
859 .ValueRange(source.GetValueRange())
860 .Type(source.GetType())
861 .Index(source.GetIndex())
862 .RepresentationIndex(source.GetRepresentationIndex());
863 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
864 }
865 };
866
867 R__ASSERT(firstEntry >= fPrevClusterNEntries);
868 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
869 for (auto f : changeset.fAddedFields) {
870 addField(*f);
871 for (auto &descendant : *f)
873 }
874 for (auto f : changeset.fAddedProjectedFields) {
876 for (auto &descendant : *f)
878 }
879
880 const auto nColumns = descriptor.GetNPhysicalColumns();
881 fOpenColumnRanges.reserve(fOpenColumnRanges.size() + (nColumns - nColumnsBeforeUpdate));
882 fOpenPageRanges.reserve(fOpenPageRanges.size() + (nColumns - nColumnsBeforeUpdate));
885 columnRange.SetPhysicalColumnId(i);
886 // We set the first element index in the current cluster to the first element that is part of a materialized page
887 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
888 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
889 columnRange.SetFirstElementIndex(descriptor.GetColumnDescriptor(i).GetFirstElementIndex());
890 columnRange.SetNElements(0);
891 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
892 fOpenColumnRanges.emplace_back(columnRange);
894 pageRange.SetPhysicalColumnId(i);
895 fOpenPageRanges.emplace_back(std::move(pageRange));
896 }
897
898 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
899 // header was already serialized, this has to be done manually as it is required for page list serialization.
900 if (fSerializationContext.GetHeaderSize() > 0)
901 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
902}
903
906{
908 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
909
910 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
911}
912
914{
915 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
916 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
917
919 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
920 fieldZero.SetOnDiskId(0);
922 projectedFields.GetFieldZero().SetOnDiskId(0);
923
925 initialChangeset.fAddedFields.reserve(fieldZero.GetMutableSubfields().size());
926 for (auto f : fieldZero.GetMutableSubfields())
927 initialChangeset.fAddedFields.emplace_back(f);
928 initialChangeset.fAddedProjectedFields.reserve(projectedFields.GetFieldZero().GetMutableSubfields().size());
929 for (auto f : projectedFields.GetFieldZero().GetMutableSubfields())
930 initialChangeset.fAddedProjectedFields.emplace_back(f);
931 UpdateSchema(initialChangeset, 0U);
932
933 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
934 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
935 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
936 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
937
938 fDescriptorBuilder.BeginHeaderExtension();
939}
940
941std::unique_ptr<ROOT::RNTupleModel>
943 bool copyClusters)
944{
945 // Create new descriptor
946 fDescriptorBuilder.SetSchemaFromExisting(srcDescriptor);
947 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
948
949 // Create column/page ranges
950 const auto nColumns = descriptor.GetNPhysicalColumns();
951 R__ASSERT(fOpenColumnRanges.empty() && fOpenPageRanges.empty());
952 fOpenColumnRanges.reserve(nColumns);
953 fOpenPageRanges.reserve(nColumns);
954 for (ROOT::DescriptorId_t i = 0; i < nColumns; ++i) {
955 const auto &column = descriptor.GetColumnDescriptor(i);
957 columnRange.SetPhysicalColumnId(i);
958 columnRange.SetFirstElementIndex(column.GetFirstElementIndex());
959 columnRange.SetNElements(0);
960 columnRange.SetCompressionSettings(GetWriteOptions().GetCompression());
961 fOpenColumnRanges.emplace_back(columnRange);
963 pageRange.SetPhysicalColumnId(i);
964 fOpenPageRanges.emplace_back(std::move(pageRange));
965 }
966
967 if (copyClusters) {
968 // Clone and add all cluster descriptors
969 auto clusterId = srcDescriptor.FindClusterId(0, 0);
971 auto &cluster = srcDescriptor.GetClusterDescriptor(clusterId);
972 auto nEntries = cluster.GetNEntries();
973 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
974 R__ASSERT(fOpenColumnRanges[i].GetPhysicalColumnId() == i);
975 if (!cluster.ContainsColumn(i)) // a cluster may not contain a column if that column is deferred
976 break;
977 const auto &columnRange = cluster.GetColumnRange(i);
978 R__ASSERT(columnRange.GetPhysicalColumnId() == i);
979 // TODO: properly handle suppressed columns (check MarkSuppressedColumnRange())
980 fOpenColumnRanges[i].IncrementFirstElementIndex(columnRange.GetNElements());
981 }
982 fDescriptorBuilder.AddCluster(cluster.Clone());
983 fPrevClusterNEntries += nEntries;
984
985 clusterId = srcDescriptor.FindNextClusterId(clusterId);
986 }
987 }
988
989 // Create model
991 modelOpts.SetReconstructProjections(true);
992 auto model = descriptor.CreateModel(modelOpts);
993 if (!copyClusters) {
995 projectedFields.GetFieldZero().SetOnDiskId(model->GetConstFieldZero().GetOnDiskId());
996 }
997
998 // Serialize header and init from it
999 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor).Unwrap();
1000 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
1001 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor).Unwrap();
1002 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
1003
1004 fDescriptorBuilder.BeginHeaderExtension();
1005
1006 // mark this sink as initialized
1007 fIsInitialized = true;
1008
1009 return model;
1010}
1011
1013{
1014 fOpenColumnRanges.at(columnHandle.fPhysicalId).SetIsSuppressed(true);
1015}
1016
1019{
1020 fOpenColumnRanges.at(columnHandle.fPhysicalId).IncrementNElements(page.GetNElements());
1021
1023 pageInfo.SetNElements(page.GetNElements());
1024 pageInfo.SetLocator(CommitPageImpl(columnHandle, page));
1025 pageInfo.SetHasChecksum(GetWriteOptions().GetEnablePageChecksums());
1026 fOpenPageRanges.at(columnHandle.fPhysicalId).GetPageInfos().emplace_back(pageInfo);
1027}
1028
1031{
1032 fOpenColumnRanges.at(physicalColumnId).IncrementNElements(sealedPage.GetNElements());
1033
1035 pageInfo.SetNElements(sealedPage.GetNElements());
1036 pageInfo.SetLocator(CommitSealedPageImpl(physicalColumnId, sealedPage));
1037 pageInfo.SetHasChecksum(sealedPage.GetHasChecksum());
1038 fOpenPageRanges.at(physicalColumnId).GetPageInfos().emplace_back(pageInfo);
1039}
1040
1042 std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask)
1043{
1044 std::vector<ROOT::RNTupleLocator> locators;
1045 locators.reserve(mask.size());
1046 std::size_t i = 0;
1047 for (auto &range : ranges) {
1048 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1049 if (mask[i++])
1050 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
1051 }
1052 }
1053 locators.shrink_to_fit();
1054 return locators;
1055}
1056
1058 std::span<RPageStorage::RSealedPageGroup> ranges)
1059{
1060 /// Used in the `originalPages` map
1061 struct RSealedPageLink {
1062 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1063 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1064 };
1065
1066 std::vector<bool> mask;
1067 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1068 std::vector<std::size_t> locatorIndexes;
1069 // Maps page checksums to the first sealed page with that checksum
1070 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1071 std::size_t iLocator = 0;
1072 for (auto &range : ranges) {
1073 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1074 mask.reserve(mask.size() + rangeSize);
1075 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1076
1077 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1078 if (!fFeatures.fCanMergePages || !fOptions->GetEnableSamePageMerging()) {
1079 mask.emplace_back(true);
1080 locatorIndexes.emplace_back(iLocator++);
1081 continue;
1082 }
1083 // Same page merging requires page checksums - this is checked in the write options
1084 R__ASSERT(sealedPageIt->GetHasChecksum());
1085
1086 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1087 auto itr = originalPages.find(chk);
1088 if (itr == originalPages.end()) {
1089 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1090 mask.emplace_back(true);
1091 locatorIndexes.emplace_back(iLocator++);
1092 continue;
1093 }
1094
1095 const auto *p = itr->second.fSealedPage;
1096 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1097 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1098 mask.emplace_back(true);
1099 locatorIndexes.emplace_back(iLocator++);
1100 continue;
1101 }
1102
1103 mask.emplace_back(false);
1104 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1105 }
1106
1107 mask.shrink_to_fit();
1108 locatorIndexes.shrink_to_fit();
1109 }
1110
1111 auto locators = CommitSealedPageVImpl(ranges, mask);
1112 unsigned i = 0;
1113
1114 for (auto &range : ranges) {
1115 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1116 fOpenColumnRanges.at(range.fPhysicalColumnId).IncrementNElements(sealedPageIt->GetNElements());
1117
1119 pageInfo.SetNElements(sealedPageIt->GetNElements());
1120 pageInfo.SetLocator(locators[locatorIndexes[i++]]);
1121 pageInfo.SetHasChecksum(sealedPageIt->GetHasChecksum());
1122 fOpenPageRanges.at(range.fPhysicalColumnId).GetPageInfos().emplace_back(pageInfo);
1123 }
1124 }
1125}
1126
1129{
1131 stagedCluster.fNBytesWritten = StageClusterImpl();
1132 stagedCluster.fNEntries = nNewEntries;
1133
1134 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1135 RStagedCluster::RColumnInfo columnInfo;
1136 columnInfo.fCompressionSettings = fOpenColumnRanges[i].GetCompressionSettings().value();
1137 if (fOpenColumnRanges[i].IsSuppressed()) {
1138 assert(fOpenPageRanges[i].GetPageInfos().empty());
1139 columnInfo.fPageRange.SetPhysicalColumnId(i);
1140 columnInfo.fIsSuppressed = true;
1141 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1142 fOpenColumnRanges[i].SetNElements(0);
1143 fOpenColumnRanges[i].SetIsSuppressed(false);
1144 } else {
1145 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1146 fOpenPageRanges[i].SetPhysicalColumnId(i);
1147
1148 columnInfo.fNElements = fOpenColumnRanges[i].GetNElements();
1149 fOpenColumnRanges[i].SetNElements(0);
1150 }
1151 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1152 }
1153
1154 return stagedCluster;
1155}
1156
1158{
1159 for (const auto &cluster : clusters) {
1161 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1162 .FirstEntryIndex(fPrevClusterNEntries)
1163 .NEntries(cluster.fNEntries);
1164 for (const auto &columnInfo : cluster.fColumnInfos) {
1165 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1166 if (columnInfo.fIsSuppressed) {
1167 assert(columnInfo.fPageRange.GetPageInfos().empty());
1168 clusterBuilder.MarkSuppressedColumnRange(colId);
1169 } else {
1170 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].GetFirstElementIndex(),
1171 columnInfo.fCompressionSettings, columnInfo.fPageRange);
1172 fOpenColumnRanges[colId].IncrementFirstElementIndex(columnInfo.fNElements);
1173 }
1174 }
1175
1176 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1177 for (const auto &columnInfo : cluster.fColumnInfos) {
1178 if (!columnInfo.fIsSuppressed)
1179 continue;
1180 const auto colId = columnInfo.fPageRange.GetPhysicalColumnId();
1181 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1182 // cluster. This information has been determined for the committed cluster descriptor through
1183 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1184 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1185 fOpenColumnRanges[colId].SetFirstElementIndex(columnRangeFromDesc.GetFirstElementIndex() +
1186 columnRangeFromDesc.GetNElements());
1187 }
1188
1189 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1190 fPrevClusterNEntries += cluster.fNEntries;
1191 }
1192}
1193
1195{
1196 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1197
1198 const auto nClusters = descriptor.GetNActiveClusters();
1199 std::vector<ROOT::DescriptorId_t> physClusterIDs;
1200 physClusterIDs.reserve(nClusters);
1201 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1202 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1203 }
1204
1205 auto szPageList =
1206 RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext).Unwrap();
1209
1210 const auto clusterGroupId = descriptor.GetNClusterGroups();
1211 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1213 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1214 if (fNextClusterInGroup == nClusters) {
1215 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1216 } else {
1217 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1218 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1219 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1220 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1221 firstClusterDesc.GetFirstEntryIndex())
1222 .NClusters(nClusters - fNextClusterInGroup);
1223 }
1224 std::vector<ROOT::DescriptorId_t> clusterIds;
1225 clusterIds.reserve(nClusters);
1226 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1227 clusterIds.emplace_back(i);
1228 }
1229 cgBuilder.AddSortedClusters(clusterIds);
1230 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1231 fSerializationContext.MapClusterGroupId(clusterGroupId);
1232
1233 fNextClusterInGroup = nClusters;
1234}
1235
1237{
1238 if (!fStreamerInfos.empty()) {
1239 // De-duplicate extra type infos before writing. Usually we won't have them already in the descriptor, but
1240 // this may happen when we are writing back an already-existing RNTuple, e.g. when doing incremental merging.
1241 for (const auto &etDesc : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
1242 if (etDesc.GetContentId() == EExtraTypeInfoIds::kStreamerInfo) {
1243 // The specification mandates that the type name for a kStreamerInfo should be empty and the type version
1244 // should be zero.
1245 R__ASSERT(etDesc.GetTypeName().empty());
1246 R__ASSERT(etDesc.GetTypeVersion() == 0);
1247 auto etInfo = RNTupleSerializer::DeserializeStreamerInfos(etDesc.GetContent()).Unwrap();
1248 fStreamerInfos.merge(etInfo);
1249 }
1250 }
1251
1254 .Content(RNTupleSerializer::SerializeStreamerInfos(fStreamerInfos));
1255 fDescriptorBuilder.ReplaceExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1256 }
1257
1258 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1259
1260 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext).Unwrap();
1262 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1263
1264 CommitDatasetImpl(bufFooter.get(), szFooter);
1265}
1266
1268{
1269 fMetrics = Detail::RNTupleMetrics(prefix);
1270 fCounters = std::make_unique<RCounters>(RCounters{
1271 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
1272 "number of pages committed to storage"),
1273 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
1274 "volume written for committed pages"),
1275 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1276 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1277 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1278 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
1279 "CPU time spent writing"),
1280 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1281 "CPU time spent compressing")});
1282}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define f(i)
Definition RSha256.hxx:104
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static RResult< std::uint32_t > SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< ROOT::DescriptorId_t > physClusterIDs, const RContext &context)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< std::uint32_t > SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static RResult< RContext > SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId)
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override
Register a new column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
void Attach(RNTupleSerializer::EDescriptorDeserializeMode mode=RNTupleSerializer::EDescriptorDeserializeMode::kForReading)
Open the physical storage container and deserialize header and footer.
virtual void UnzipClusterImpl(RCluster *cluster)
Common functionality of an ntuple storage for both reading and writing.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:348
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:354
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:341
ROOT::ENTupleColumnType GetType() const
Definition RColumn.hxx:342
ROOT::NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:356
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:363
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:343
std::uint32_t GetIndex() const
Definition RColumn.hxx:353
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
A helper class for piece-wise construction of an RFieldDescriptor.
static RFieldDescriptorBuilder FromField(const ROOT::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live RNTuple field.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static unsigned int GetClusterBunchSize(const RNTupleReadOptions &options)
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
Reference to a page stored in the page pool.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
const ROOT::RFieldBase * GetSourceField(const ROOT::RFieldBase *target) const
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
The RNTupleModel encapulates the schema of an RNTuple.
const std::string & GetDescription() const
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
const_iterator begin() const
const_iterator end() const
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Experimental::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, ROOT::ENTupleColumnType onDiskType)
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RPage * fPage
Input page to be sealed.
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
Information about a single page in the context of a cluster's page range.