Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.cxx
Go to the documentation of this file.
1/// \file RPageStorage.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-10-04
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RPageStorage.hxx>
18#include <ROOT/RColumn.hxx>
19#include <ROOT/RFieldBase.hxx>
22#include <ROOT/RNTupleModel.hxx>
25#include <ROOT/RPageSinkBuf.hxx>
27#ifdef R__ENABLE_DAOS
29#endif
30
31#include <Compression.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <atomic>
36#include <cassert>
37#include <cstring>
38#include <functional>
39#include <memory>
40#include <string_view>
41#include <unordered_map>
42#include <utility>
43
45 : fMetrics(""), fPageAllocator(std::make_unique<RPageAllocatorHeap>()), fNTupleName(name)
46{
47}
48
50
52{
53 if (!fHasChecksum)
54 return;
55
56 auto charBuf = reinterpret_cast<const unsigned char *>(fBuffer);
57 auto checksumBuf = const_cast<unsigned char *>(charBuf) + GetDataSize();
58 std::uint64_t xxhash3;
59 RNTupleSerializer::SerializeXxHash3(charBuf, GetDataSize(), xxhash3, checksumBuf);
60}
61
64{
65 if (!fHasChecksum)
67
68 auto success = RNTupleSerializer::VerifyXxHash3(reinterpret_cast<const unsigned char *>(fBuffer), GetDataSize());
69 if (!success)
70 return R__FAIL("page checksum verification failed, data corruption detected");
72}
73
75{
76 if (!fHasChecksum)
77 return R__FAIL("invalid attempt to extract non-existing page checksum");
78
79 assert(fBufferSize >= kNBytesPageChecksum);
80 std::uint64_t checksum;
82 reinterpret_cast<const unsigned char *>(fBuffer) + fBufferSize - kNBytesPageChecksum, checksum);
83 return checksum;
84}
85
86//------------------------------------------------------------------------------
87
89 DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
90{
91 auto [itr, _] = fColumnInfos.emplace(physicalColumnId, std::vector<RColumnInfo>());
92 for (auto &columnInfo : itr->second) {
93 if (columnInfo.fElementId == elementId) {
94 columnInfo.fRefCounter++;
95 return;
96 }
97 }
98 itr->second.emplace_back(RColumnInfo{elementId, 1});
99}
100
103{
104 auto itr = fColumnInfos.find(physicalColumnId);
105 R__ASSERT(itr != fColumnInfos.end());
106 for (std::size_t i = 0; i < itr->second.size(); ++i) {
107 if (itr->second[i].fElementId != elementId)
108 continue;
109
110 itr->second[i].fRefCounter--;
111 if (itr->second[i].fRefCounter == 0) {
112 itr->second.erase(itr->second.begin() + i);
113 if (itr->second.empty()) {
114 fColumnInfos.erase(itr);
115 }
116 }
117 break;
118 }
119}
120
123{
125 for (const auto &[physicalColumnId, _] : fColumnInfos)
126 result.insert(physicalColumnId);
127 return result;
128}
129
131{
132 if (fFirstEntry == kInvalidNTupleIndex) {
133 /// Entry range unset, we assume that the entry range covers the complete source
134 return true;
135 }
136
137 if (clusterDesc.GetNEntries() == 0)
138 return true;
139 if ((clusterDesc.GetFirstEntryIndex() + clusterDesc.GetNEntries()) <= fFirstEntry)
140 return false;
141 if (clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
142 return false;
143 return true;
144}
145
147 : RPageStorage(name), fOptions(options)
148{
149}
150
152
153std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
154ROOT::Experimental::Internal::RPageSource::Create(std::string_view ntupleName, std::string_view location,
155 const RNTupleReadOptions &options)
156{
157 if (ntupleName.empty()) {
158 throw RException(R__FAIL("empty RNTuple name"));
159 }
160 if (location.empty()) {
161 throw RException(R__FAIL("empty storage location"));
162 }
163 if (location.find("daos://") == 0)
164#ifdef R__ENABLE_DAOS
165 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
166#else
167 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
168#endif
169
170 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
171}
172
175{
177 auto physicalId =
178 GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.GetIndex(), column.GetRepresentationIndex());
179 R__ASSERT(physicalId != kInvalidDescriptorId);
180 fActivePhysicalColumns.Insert(physicalId, column.GetElement()->GetIdentifier());
181 return ColumnHandle_t{physicalId, &column};
182}
183
185{
186 fActivePhysicalColumns.Erase(columnHandle.fPhysicalId, columnHandle.fColumn->GetElement()->GetIdentifier());
187}
188
190{
191 if ((range.fFirstEntry + range.fNEntries) > GetNEntries()) {
192 throw RException(R__FAIL("invalid entry range"));
193 }
194 fEntryRange = range;
195}
196
198{
199 if (!fHasStructure)
200 LoadStructureImpl();
201 fHasStructure = true;
202}
203
205{
206 LoadStructure();
207 if (!fIsAttached)
208 GetExclDescriptorGuard().MoveIn(AttachImpl());
209 fIsAttached = true;
210}
211
212std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSource::Clone() const
213{
214 auto clone = CloneImpl();
215 if (fIsAttached) {
216 clone->GetExclDescriptorGuard().MoveIn(std::move(*GetSharedDescriptorGuard()->Clone()));
217 clone->fHasStructure = true;
218 clone->fIsAttached = true;
219 }
220 return clone;
221}
222
224{
225 return GetSharedDescriptorGuard()->GetNEntries();
226}
227
229{
230 return GetSharedDescriptorGuard()->GetNElements(columnHandle.fPhysicalId);
231}
232
234{
235 if (fTaskScheduler)
236 UnzipClusterImpl(cluster);
237}
238
240{
241 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
242
243 const auto clusterId = cluster->GetId();
244 auto descriptorGuard = GetSharedDescriptorGuard();
245 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
246
247 fPreloadedClusters[clusterDescriptor.GetFirstEntryIndex()] = clusterId;
248
249 std::atomic<bool> foundChecksumFailure{false};
250
251 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
252 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
253 for (const auto columnId : columnsInCluster) {
254 // By the time we unzip a cluster, the set of active columns may have already changed wrt. to the moment when
255 // we requested reading the cluster. That doesn't matter much, we simply decompress what is now in the list
256 // of active columns.
257 if (!fActivePhysicalColumns.HasColumnInfos(columnId))
258 continue;
259 const auto &columnInfos = fActivePhysicalColumns.GetColumnInfos(columnId);
260
261 for (const auto &info : columnInfos) {
262 allElements.emplace_back(GenerateColumnElement(info.fElementId));
263
264 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
265 std::uint64_t pageNo = 0;
266 std::uint64_t firstInPage = 0;
267 for (const auto &pi : pageRange.fPageInfos) {
268 auto onDiskPage = cluster->GetOnDiskPage(ROnDiskPage::Key{columnId, pageNo});
269 RSealedPage sealedPage;
270 sealedPage.SetNElements(pi.fNElements);
271 sealedPage.SetHasChecksum(pi.fHasChecksum);
272 sealedPage.SetBufferSize(pi.fLocator.fBytesOnStorage + pi.fHasChecksum * kNBytesPageChecksum);
273 sealedPage.SetBuffer(onDiskPage->GetAddress());
274 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == sealedPage.GetBufferSize()));
275
276 auto taskFunc = [this, columnId, clusterId, firstInPage, sealedPage, element = allElements.back().get(),
277 &foundChecksumFailure,
278 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
279 const RPagePool::RKey keyPagePool{columnId, element->GetIdentifier().fInMemoryType};
280 auto rv = UnsealPage(sealedPage, *element);
281 if (!rv) {
282 foundChecksumFailure = true;
283 return;
284 }
285 auto newPage = rv.Unwrap();
286 fCounters->fSzUnzip.Add(element->GetSize() * sealedPage.GetNElements());
287
288 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
289 fPagePool.PreloadPage(std::move(newPage), keyPagePool);
290 };
291
292 fTaskScheduler->AddTask(taskFunc);
293
294 firstInPage += pi.fNElements;
295 pageNo++;
296 } // for all pages in column
297
298 fCounters->fNPageUnsealed.Add(pageNo);
299 } // for all in-memory types of the column
300 } // for all columns in cluster
301
302 fTaskScheduler->Wait();
303
304 if (foundChecksumFailure) {
305 throw RException(R__FAIL("page checksum verification failed, data corruption detected"));
306 }
307}
308
310 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
311 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
312{
313 auto descriptorGuard = GetSharedDescriptorGuard();
314 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.fClusterId);
315
316 for (auto physicalColumnId : clusterKey.fPhysicalColumnSet) {
317 if (clusterDesc.GetColumnRange(physicalColumnId).fIsSuppressed)
318 continue;
319
320 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
321 NTupleSize_t pageNo = 0;
322 for (const auto &pageInfo : pageRange.fPageInfos) {
323 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
324 pageZeroMap.Register(
325 ROnDiskPage::Key{physicalColumnId, pageNo},
326 ROnDiskPage(const_cast<void *>(RPage::GetPageZeroBuffer()), pageInfo.fLocator.fBytesOnStorage));
327 } else {
328 perPageFunc(physicalColumnId, pageNo, pageInfo);
329 }
330 ++pageNo;
331 }
332 }
333}
334
336{
337 if (fLastUsedCluster == clusterId)
338 return;
339
340 NTupleSize_t firstEntryIndex = GetSharedDescriptorGuard()->GetClusterDescriptor(clusterId).GetFirstEntryIndex();
341 auto itr = fPreloadedClusters.begin();
342 while ((itr != fPreloadedClusters.end()) && (itr->first < firstEntryIndex)) {
343 fPagePool.Evict(itr->second);
344 itr = fPreloadedClusters.erase(itr);
345 }
346 std::size_t poolWindow = 0;
347 while ((itr != fPreloadedClusters.end()) && (poolWindow < 2 * fOptions.GetClusterBunchSize())) {
348 ++itr;
349 ++poolWindow;
350 }
351 while (itr != fPreloadedClusters.end()) {
352 fPagePool.Evict(itr->second);
353 itr = fPreloadedClusters.erase(itr);
354 }
355
356 fLastUsedCluster = clusterId;
357}
358
361{
362 const auto columnId = columnHandle.fPhysicalId;
363 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
364 auto cachedPageRef = fPagePool.GetPage(RPagePool::RKey{columnId, columnElementId.fInMemoryType}, globalIndex);
365 if (!cachedPageRef.Get().IsNull()) {
366 UpdateLastUsedCluster(cachedPageRef.Get().GetClusterInfo().GetId());
367 return cachedPageRef;
368 }
369
370 std::uint64_t idxInCluster;
371 RClusterInfo clusterInfo;
372 {
373 auto descriptorGuard = GetSharedDescriptorGuard();
374 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
375
376 if (clusterInfo.fClusterId == kInvalidDescriptorId)
377 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
378
379 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
380 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
381 if (columnRange.fIsSuppressed)
382 return RPageRef();
383
384 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
385 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
386 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
387 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
388 }
389
391 throw RException(R__FAIL("tried to read a page with an unknown locator"));
392
393 UpdateLastUsedCluster(clusterInfo.fClusterId);
394 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
395}
396
399{
400 const auto clusterId = clusterIndex.GetClusterId();
401 const auto idxInCluster = clusterIndex.GetIndex();
402 const auto columnId = columnHandle.fPhysicalId;
403 const auto columnElementId = columnHandle.fColumn->GetElement()->GetIdentifier();
404 auto cachedPageRef = fPagePool.GetPage(RPagePool::RKey{columnId, columnElementId.fInMemoryType}, clusterIndex);
405 if (!cachedPageRef.Get().IsNull()) {
406 UpdateLastUsedCluster(clusterId);
407 return cachedPageRef;
408 }
409
410 if (clusterId == kInvalidDescriptorId)
411 throw RException(R__FAIL("entry out of bounds"));
412
413 RClusterInfo clusterInfo;
414 {
415 auto descriptorGuard = GetSharedDescriptorGuard();
416 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
417 const auto &columnRange = clusterDescriptor.GetColumnRange(columnId);
418 if (columnRange.fIsSuppressed)
419 return RPageRef();
420
421 clusterInfo.fClusterId = clusterId;
422 clusterInfo.fColumnOffset = columnRange.fFirstElementIndex;
423 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
424 }
425
427 throw RException(R__FAIL("tried to read a page with an unknown locator"));
428
429 UpdateLastUsedCluster(clusterInfo.fClusterId);
430 return LoadPageImpl(columnHandle, clusterInfo, idxInCluster);
431}
432
434{
435 fMetrics = Detail::RNTupleMetrics(prefix);
436 fCounters = std::make_unique<RCounters>(RCounters{
437 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nReadV", "", "number of vector read requests"),
438 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nRead", "", "number of byte ranges read"),
439 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadPayload", "B",
440 "volume read from storage (required)"),
441 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szReadOverhead", "B",
442 "volume read from storage (overhead)"),
443 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szUnzip", "B", "volume after unzipping"),
444 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nClusterLoaded", "",
445 "number of partial clusters preloaded from storage"),
446 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageRead", "", "number of pages read from storage"),
447 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageUnsealed", "",
448 "number of pages unzipped and decoded"),
449 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallRead", "ns", "wall clock time spent reading"),
450 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallUnzip", "ns",
451 "wall clock time spent decompressing"),
452 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuRead", "ns",
453 "CPU time spent reading"),
454 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuUnzip", "ns",
455 "CPU time spent decompressing"),
456 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
457 "bwRead", "MB/s", "bandwidth compressed bytes read per second", fMetrics,
458 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
459 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
460 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
461 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
462 if (auto walltime = timeWallRead->GetValueAsInt()) {
463 double payload = szReadPayload->GetValueAsInt();
464 double overhead = szReadOverhead->GetValueAsInt();
465 // unit: bytes / nanosecond = GB/s
466 return {true, (1000. * (payload + overhead) / walltime)};
467 }
468 }
469 }
470 }
471 return {false, -1.};
472 }),
473 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
474 "bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second", fMetrics,
475 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
476 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
477 if (const auto timeWallRead = metrics.GetLocalCounter("timeWallRead")) {
478 if (auto walltime = timeWallRead->GetValueAsInt()) {
479 double unzip = szUnzip->GetValueAsInt();
480 // unit: bytes / nanosecond = GB/s
481 return {true, 1000. * unzip / walltime};
482 }
483 }
484 }
485 return {false, -1.};
486 }),
487 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
488 "bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second", fMetrics,
489 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
490 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
491 if (const auto timeWallUnzip = metrics.GetLocalCounter("timeWallUnzip")) {
492 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
493 double unzip = szUnzip->GetValueAsInt();
494 // unit: bytes / nanosecond = GB/s
495 return {true, 1000. * unzip / walltime};
496 }
497 }
498 }
499 return {false, -1.};
500 }),
501 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
502 "rtReadEfficiency", "", "ratio of payload over all bytes read", fMetrics,
503 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
504 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
505 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
506 if (auto payload = szReadPayload->GetValueAsInt()) {
507 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
508 return {true, 1. / (1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
509 }
510 }
511 }
512 return {false, -1.};
513 }),
514 *fMetrics.MakeCounter<Detail::RNTupleCalcPerf *>(
515 "rtCompression", "", "ratio of compressed bytes / uncompressed bytes", fMetrics,
516 [](const Detail::RNTupleMetrics &metrics) -> std::pair<bool, double> {
517 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
518 if (const auto szUnzip = metrics.GetLocalCounter("szUnzip")) {
519 if (auto unzip = szUnzip->GetValueAsInt()) {
520 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
521 }
522 }
524 return {false, -1.};
525 })});
526}
527
530{
531 return UnsealPage(sealedPage, element, *fPageAllocator);
532}
533
536 RPageAllocator &pageAlloc)
538 // Unsealing a page zero is a no-op. `RPageRange::ExtendToFitColumnRange()` guarantees that the page zero buffer is
539 // large enough to hold `sealedPage.fNElements`
540 if (sealedPage.GetBuffer() == RPage::GetPageZeroBuffer()) {
541 auto page = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
542 page.GrowUnchecked(sealedPage.GetNElements());
543 memset(page.GetBuffer(), 0, page.GetNBytes());
544 return page;
545 }
546
547 auto rv = sealedPage.VerifyChecksumIfEnabled();
548 if (!rv)
549 return R__FORWARD_ERROR(rv);
550
551 const auto bytesPacked = element.GetPackedSize(sealedPage.GetNElements());
552 auto page = pageAlloc.NewPage(element.GetPackedSize(), sealedPage.GetNElements());
553 if (sealedPage.GetDataSize() != bytesPacked) {
554 RNTupleDecompressor::Unzip(sealedPage.GetBuffer(), sealedPage.GetDataSize(), bytesPacked, page.GetBuffer());
555 } else {
556 // We cannot simply map the sealed page as we don't know its life time. Specialized page sources
557 // may decide to implement to not use UnsealPage but to custom mapping / decompression code.
558 // Note that usually pages are compressed.
559 memcpy(page.GetBuffer(), sealedPage.GetBuffer(), bytesPacked);
560 }
561
562 if (!element.IsMappable()) {
563 auto tmp = pageAlloc.NewPage(element.GetSize(), sealedPage.GetNElements());
564 element.Unpack(tmp.GetBuffer(), page.GetBuffer(), sealedPage.GetNElements());
565 page = std::move(tmp);
566 }
567
568 page.GrowUnchecked(sealedPage.GetNElements());
569 return page;
570}
571
572//------------------------------------------------------------------------------
573
575{
576 // Make the sort order unique by adding the physical on-disk column id as a secondary key
577 if (fCurrentPageSize == other.fCurrentPageSize)
578 return fColumn->GetOnDiskId() > other.fColumn->GetOnDiskId();
579 return fCurrentPageSize > other.fCurrentPageSize;
580}
581
583 std::size_t pageSizeLimit)
584{
585 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
586 return true;
587
588 auto itr = fColumnsSortedByPageSize.begin();
589 while (itr != fColumnsSortedByPageSize.end()) {
590 if (itr->fCurrentPageSize <= pageSizeLimit)
591 break;
592 if (itr->fCurrentPageSize == itr->fInitialPageSize) {
593 ++itr;
594 continue;
595 }
596
597 // Flushing the current column will invalidate itr
598 auto itrFlush = itr++;
599
600 RColumnInfo next;
601 if (itr != fColumnsSortedByPageSize.end())
602 next = *itr;
603
604 itrFlush->fColumn->Flush();
605 if (fMaxAllocatedBytes - fCurrentAllocatedBytes >= targetAvailableSize)
606 return true;
607
608 if (next.fColumn == nullptr)
609 return false;
610 itr = fColumnsSortedByPageSize.find(next);
611 };
612
613 return false;
614}
615
617{
618 const RColumnInfo key{&column, column.GetWritePageCapacity(), 0};
619 auto itr = fColumnsSortedByPageSize.find(key);
620 if (itr == fColumnsSortedByPageSize.end()) {
621 if (!TryEvict(newWritePageSize, 0))
622 return false;
623 fColumnsSortedByPageSize.insert({&column, newWritePageSize, newWritePageSize});
624 fCurrentAllocatedBytes += newWritePageSize;
625 return true;
626 }
627
628 RColumnInfo elem{*itr};
629 assert(newWritePageSize >= elem.fInitialPageSize);
630
631 if (newWritePageSize == elem.fCurrentPageSize)
632 return true;
633
634 fColumnsSortedByPageSize.erase(itr);
635
636 if (newWritePageSize < elem.fCurrentPageSize) {
637 // Page got smaller
638 fCurrentAllocatedBytes -= elem.fCurrentPageSize - newWritePageSize;
639 elem.fCurrentPageSize = newWritePageSize;
640 fColumnsSortedByPageSize.insert(elem);
641 return true;
642 }
643
644 // Page got larger, we may need to make space available
645 const auto diffBytes = newWritePageSize - elem.fCurrentPageSize;
646 if (!TryEvict(diffBytes, elem.fCurrentPageSize)) {
647 // Don't change anything, let the calling column flush itself
648 // TODO(jblomer): we may consider skipping the column in TryEvict and thus avoiding erase+insert
649 fColumnsSortedByPageSize.insert(elem);
650 return false;
651 }
652 fCurrentAllocatedBytes += diffBytes;
653 elem.fCurrentPageSize = newWritePageSize;
654 fColumnsSortedByPageSize.insert(elem);
655 return true;
656}
657
658//------------------------------------------------------------------------------
659
661 : RPageStorage(name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
662{
664}
665
667
670{
671 assert(config.fPage);
672 assert(config.fElement);
673 assert(config.fBuffer);
674
675 unsigned char *pageBuf = reinterpret_cast<unsigned char *>(config.fPage->GetBuffer());
676 bool isAdoptedBuffer = true;
677 auto nBytesPacked = config.fPage->GetNBytes();
678 auto nBytesChecksum = config.fWriteChecksum * kNBytesPageChecksum;
679
680 if (!config.fElement->IsMappable()) {
681 nBytesPacked = config.fElement->GetPackedSize(config.fPage->GetNElements());
682 pageBuf = new unsigned char[nBytesPacked];
683 isAdoptedBuffer = false;
684 config.fElement->Pack(pageBuf, config.fPage->GetBuffer(), config.fPage->GetNElements());
685 }
686 auto nBytesZipped = nBytesPacked;
687
688 if ((config.fCompressionSetting != 0) || !config.fElement->IsMappable() || !config.fAllowAlias ||
689 config.fWriteChecksum) {
690 nBytesZipped = RNTupleCompressor::Zip(pageBuf, nBytesPacked, config.fCompressionSetting, config.fBuffer);
691 if (!isAdoptedBuffer)
692 delete[] pageBuf;
693 pageBuf = reinterpret_cast<unsigned char *>(config.fBuffer);
694 isAdoptedBuffer = true;
695 }
696
697 R__ASSERT(isAdoptedBuffer);
698
699 RSealedPage sealedPage{pageBuf, nBytesZipped + nBytesChecksum, config.fPage->GetNElements(), config.fWriteChecksum};
700 sealedPage.ChecksumIfEnabled();
701
702 return sealedPage;
703}
704
707{
708 const auto nBytes = page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
709 if (fSealPageBuffer.size() < nBytes)
710 fSealPageBuffer.resize(nBytes);
711
712 RSealPageConfig config;
713 config.fPage = &page;
714 config.fElement = &element;
715 config.fCompressionSetting = GetWriteOptions().GetCompression();
716 config.fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
717 config.fAllowAlias = true;
718 config.fBuffer = fSealPageBuffer.data();
719
720 return SealPage(config);
721}
722
724{
725 for (const auto &cb : fOnDatasetCommitCallbacks)
726 cb(*this);
727 CommitDatasetImpl();
728}
729
732{
733 R__ASSERT(nElements > 0);
734 const auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
735 const auto nBytes = elementSize * nElements;
736 if (!fWritePageMemoryManager.TryUpdate(*columnHandle.fColumn, nBytes))
737 return RPage();
738 return fPageAllocator->NewPage(elementSize, nElements);
739}
740
741//------------------------------------------------------------------------------
742
743std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
744ROOT::Experimental::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::string_view location,
745 const RNTupleWriteOptions &options)
746{
747 if (ntupleName.empty()) {
748 throw RException(R__FAIL("empty RNTuple name"));
749 }
750 if (location.empty()) {
751 throw RException(R__FAIL("empty storage location"));
752 }
753 if (location.find("daos://") == 0) {
754#ifdef R__ENABLE_DAOS
755 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
756#else
757 throw RException(R__FAIL("This RNTuple build does not support DAOS."));
758#endif
759 }
760
761 // Otherwise assume that the user wants us to create a file.
762 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
763}
764
766 const RNTupleWriteOptions &options)
767 : RPageSink(name, options)
768{
769}
770
772
775{
776 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
777 RColumnDescriptorBuilder columnBuilder;
778 columnBuilder.LogicalColumnId(columnId)
779 .PhysicalColumnId(columnId)
780 .FieldId(fieldId)
782 .ValueRange(column.GetValueRange())
783 .Type(column.GetType())
784 .Index(column.GetIndex())
787 // For late model extension, we assume that the primary column representation is the active one for the
788 // deferred range. All other representations are suppressed.
789 if (column.GetFirstElementIndex() > 0 && column.GetRepresentationIndex() > 0)
790 columnBuilder.SetSuppressedDeferred();
791 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
792 return ColumnHandle_t{columnId, &column};
793}
794
796 NTupleSize_t firstEntry)
797{
798 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
799
800 if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
801 // If we already have alias columns, add an offset to the alias columns so that the new physical columns
802 // of the changeset follow immediately the already existing physical columns
803 auto getNColumns = [](const RFieldBase &f) -> std::size_t {
804 const auto &reps = f.GetColumnRepresentatives();
805 if (reps.empty())
806 return 0;
807 return reps.size() * reps[0].size();
808 };
809 std::uint32_t nNewPhysicalColumns = 0;
810 for (auto f : changeset.fAddedFields) {
811 nNewPhysicalColumns += getNColumns(*f);
812 for (const auto &descendant : *f)
813 nNewPhysicalColumns += getNColumns(descendant);
814 }
815 fDescriptorBuilder.ShiftAliasColumns(nNewPhysicalColumns);
816 }
817
818 auto addField = [&](RFieldBase &f) {
819 auto fieldId = descriptor.GetNFields();
820 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
821 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
822 f.SetOnDiskId(fieldId);
823 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
824 };
825 auto addProjectedField = [&](RFieldBase &f) {
826 auto fieldId = descriptor.GetNFields();
827 auto sourceFieldId = GetProjectedFieldsOfModel(changeset.fModel).GetSourceField(&f)->GetOnDiskId();
828 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(f).FieldId(fieldId).MakeDescriptor().Unwrap());
829 fDescriptorBuilder.AddFieldLink(f.GetParent()->GetOnDiskId(), fieldId);
830 fDescriptorBuilder.AddFieldProjection(sourceFieldId, fieldId);
831 f.SetOnDiskId(fieldId);
832 for (const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
833 auto targetId = descriptor.GetNLogicalColumns();
834 RColumnDescriptorBuilder columnBuilder;
835 columnBuilder.LogicalColumnId(targetId)
836 .PhysicalColumnId(source.GetLogicalId())
837 .FieldId(fieldId)
838 .BitsOnStorage(source.GetBitsOnStorage())
839 .ValueRange(source.GetValueRange())
840 .Type(source.GetType())
841 .Index(source.GetIndex())
842 .RepresentationIndex(source.GetRepresentationIndex());
843 fDescriptorBuilder.AddColumn(columnBuilder.MakeDescriptor().Unwrap());
844 }
845 };
846
847 R__ASSERT(firstEntry >= fPrevClusterNEntries);
848 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
849 for (auto f : changeset.fAddedFields) {
850 addField(*f);
851 for (auto &descendant : *f)
852 addField(descendant);
853 }
854 for (auto f : changeset.fAddedProjectedFields) {
855 addProjectedField(*f);
856 for (auto &descendant : *f)
857 addProjectedField(descendant);
858 }
859
860 const auto nColumns = descriptor.GetNPhysicalColumns();
861 for (DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
863 columnRange.fPhysicalColumnId = i;
864 // We set the first element index in the current cluster to the first element that is part of a materialized page
865 // (i.e., that is part of a page list). For columns created during late model extension, however, the column range
866 // is fixed up as needed by `RClusterDescriptorBuilder::AddExtendedColumnRanges()` on read back.
867 columnRange.fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
868 columnRange.fNElements = 0;
869 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
870 fOpenColumnRanges.emplace_back(columnRange);
872 pageRange.fPhysicalColumnId = i;
873 fOpenPageRanges.emplace_back(std::move(pageRange));
874 }
875
876 // Mapping of memory to on-disk column IDs usually happens during serialization of the ntuple header. If the
877 // header was already serialized, this has to be done manually as it is required for page list serialization.
878 if (fSerializationContext.GetHeaderSize() > 0)
879 fSerializationContext.MapSchema(descriptor, /*forHeaderExtension=*/true);
880}
881
883 const RExtraTypeInfoDescriptor &extraTypeInfo)
884{
885 if (extraTypeInfo.GetContentId() != EExtraTypeInfoIds::kStreamerInfo)
886 throw RException(R__FAIL("ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
887
888 fStreamerInfos.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
889}
890
892{
893 fDescriptorBuilder.SetNTuple(fNTupleName, model.GetDescription());
894 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
895
896 auto &fieldZero = Internal::GetFieldZeroOfModel(model);
897 fDescriptorBuilder.AddField(RFieldDescriptorBuilder::FromField(fieldZero).FieldId(0).MakeDescriptor().Unwrap());
898 fieldZero.SetOnDiskId(0);
899 auto &projectedFields = GetProjectedFieldsOfModel(model);
900 projectedFields.GetFieldZero().SetOnDiskId(0);
901
902 RNTupleModelChangeset initialChangeset{model};
903 for (auto f : fieldZero.GetSubFields())
904 initialChangeset.fAddedFields.emplace_back(f);
905 for (auto f : projectedFields.GetFieldZero().GetSubFields())
906 initialChangeset.fAddedProjectedFields.emplace_back(f);
907 UpdateSchema(initialChangeset, 0U);
908
909 fSerializationContext = RNTupleSerializer::SerializeHeader(nullptr, descriptor);
910 auto buffer = MakeUninitArray<unsigned char>(fSerializationContext.GetHeaderSize());
911 fSerializationContext = RNTupleSerializer::SerializeHeader(buffer.get(), descriptor);
912 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
913
914 fDescriptorBuilder.BeginHeaderExtension();
915}
916
918{
919 {
920 auto model = descriptor.CreateModel();
921 Init(*model.get());
922 }
923
924 auto clusterId = descriptor.FindClusterId(0, 0);
925
926 while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
927 auto &cluster = descriptor.GetClusterDescriptor(clusterId);
928 auto nEntries = cluster.GetNEntries();
929
930 RClusterDescriptorBuilder clusterBuilder;
931 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
932 .FirstEntryIndex(fPrevClusterNEntries)
933 .NEntries(nEntries);
934
935 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
936 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
937 const auto &columnRange = cluster.GetColumnRange(i);
938 R__ASSERT(columnRange.fPhysicalColumnId == i);
939 const auto &pageRange = cluster.GetPageRange(i);
940 R__ASSERT(pageRange.fPhysicalColumnId == i);
941 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
942 pageRange);
943 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
944 }
945 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
946 fPrevClusterNEntries += nEntries;
947
948 clusterId = descriptor.FindNextClusterId(clusterId);
949 }
950}
951
953{
954 fOpenColumnRanges.at(columnHandle.fPhysicalId).fIsSuppressed = true;
955}
956
958{
959 fOpenColumnRanges.at(columnHandle.fPhysicalId).fNElements += page.GetNElements();
960
962 pageInfo.fNElements = page.GetNElements();
963 pageInfo.fLocator = CommitPageImpl(columnHandle, page);
964 pageInfo.fHasChecksum = GetWriteOptions().GetEnablePageChecksums();
965 fOpenPageRanges.at(columnHandle.fPhysicalId).fPageInfos.emplace_back(pageInfo);
966}
967
969 const RPageStorage::RSealedPage &sealedPage)
970{
971 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.GetNElements();
972
974 pageInfo.fNElements = sealedPage.GetNElements();
975 pageInfo.fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
976 pageInfo.fHasChecksum = sealedPage.GetHasChecksum();
977 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
978}
979
980std::vector<ROOT::Experimental::RNTupleLocator>
982 std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask)
983{
984 std::vector<ROOT::Experimental::RNTupleLocator> locators;
985 locators.reserve(mask.size());
986 std::size_t i = 0;
987 for (auto &range : ranges) {
988 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
989 if (mask[i++])
990 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
991 }
992 }
993 locators.shrink_to_fit();
994 return locators;
995}
996
998 std::span<RPageStorage::RSealedPageGroup> ranges)
999{
1000 /// Used in the `originalPages` map
1001 struct RSealedPageLink {
1002 const RSealedPage *fSealedPage = nullptr; ///< Points to the first occurrence of a page with a specific checksum
1003 std::size_t fLocatorIdx = 0; ///< The index in the locator vector returned by CommitSealedPageVImpl()
1004 };
1005
1006 std::vector<bool> mask;
1007 // For every sealed page, stores the corresponding index in the locator vector returned by CommitSealedPageVImpl()
1008 std::vector<std::size_t> locatorIndexes;
1009 // Maps page checksums to the first sealed page with that checksum
1010 std::unordered_map<std::uint64_t, RSealedPageLink> originalPages;
1011 std::size_t iLocator = 0;
1012 for (auto &range : ranges) {
1013 const auto rangeSize = std::distance(range.fFirst, range.fLast);
1014 mask.reserve(mask.size() + rangeSize);
1015 locatorIndexes.reserve(locatorIndexes.size() + rangeSize);
1016
1017 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1018 if (!fFeatures.fCanMergePages || !sealedPageIt->GetHasChecksum()) {
1019 mask.emplace_back(true);
1020 locatorIndexes.emplace_back(iLocator++);
1021 continue;
1022 }
1023
1024 const auto chk = sealedPageIt->GetChecksum().Unwrap();
1025 auto itr = originalPages.find(chk);
1026 if (itr == originalPages.end()) {
1027 originalPages.insert({chk, {&(*sealedPageIt), iLocator}});
1028 mask.emplace_back(true);
1029 locatorIndexes.emplace_back(iLocator++);
1030 continue;
1031 }
1032
1033 const auto *p = itr->second.fSealedPage;
1034 if (sealedPageIt->GetDataSize() != p->GetDataSize() ||
1035 memcmp(sealedPageIt->GetBuffer(), p->GetBuffer(), p->GetDataSize())) {
1036 mask.emplace_back(true);
1037 locatorIndexes.emplace_back(iLocator++);
1038 continue;
1039 }
1040
1041 mask.emplace_back(false);
1042 locatorIndexes.emplace_back(itr->second.fLocatorIdx);
1043 }
1044
1045 mask.shrink_to_fit();
1046 locatorIndexes.shrink_to_fit();
1047 }
1048
1049 auto locators = CommitSealedPageVImpl(ranges, mask);
1050 unsigned i = 0;
1051
1052 for (auto &range : ranges) {
1053 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
1054 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->GetNElements();
1055
1057 pageInfo.fNElements = sealedPageIt->GetNElements();
1058 pageInfo.fLocator = locators[locatorIndexes[i++]];
1059 pageInfo.fHasChecksum = sealedPageIt->GetHasChecksum();
1060 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
1061 }
1062 }
1063}
1064
1067{
1068 RStagedCluster stagedCluster;
1069 stagedCluster.fNBytesWritten = StageClusterImpl();
1070 stagedCluster.fNEntries = nNewEntries;
1071
1072 for (unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1073 RStagedCluster::RColumnInfo columnInfo;
1074 if (fOpenColumnRanges[i].fIsSuppressed) {
1075 assert(fOpenPageRanges[i].fPageInfos.empty());
1076 columnInfo.fPageRange.fPhysicalColumnId = i;
1077 columnInfo.fIsSuppressed = true;
1078 // We reset suppressed columns to the state they would have if they were active (not suppressed).
1079 fOpenColumnRanges[i].fNElements = 0;
1080 fOpenColumnRanges[i].fIsSuppressed = false;
1081 } else {
1082 std::swap(columnInfo.fPageRange, fOpenPageRanges[i]);
1083 fOpenPageRanges[i].fPhysicalColumnId = i;
1084
1085 columnInfo.fNElements = fOpenColumnRanges[i].fNElements;
1086 fOpenColumnRanges[i].fNElements = 0;
1087 }
1088 stagedCluster.fColumnInfos.push_back(std::move(columnInfo));
1089 }
1090
1091 return stagedCluster;
1092}
1093
1095{
1096 for (const auto &cluster : clusters) {
1097 RClusterDescriptorBuilder clusterBuilder;
1098 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1099 .FirstEntryIndex(fPrevClusterNEntries)
1100 .NEntries(cluster.fNEntries);
1101 for (const auto &columnInfo : cluster.fColumnInfos) {
1102 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1103 if (columnInfo.fIsSuppressed) {
1104 assert(columnInfo.fPageRange.fPageInfos.empty());
1105 clusterBuilder.MarkSuppressedColumnRange(colId);
1106 } else {
1107 clusterBuilder.CommitColumnRange(colId, fOpenColumnRanges[colId].fFirstElementIndex,
1108 fOpenColumnRanges[colId].fCompressionSettings, columnInfo.fPageRange);
1109 fOpenColumnRanges[colId].fFirstElementIndex += columnInfo.fNElements;
1110 }
1111 }
1112
1113 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1114 for (const auto &columnInfo : cluster.fColumnInfos) {
1115 if (!columnInfo.fIsSuppressed)
1116 continue;
1117 DescriptorId_t colId = columnInfo.fPageRange.fPhysicalColumnId;
1118 // For suppressed columns, we need to reset the first element index to the first element of the next (upcoming)
1119 // cluster. This information has been determined for the committed cluster descriptor through
1120 // CommitSuppressedColumnRanges(), so we can use the information from the descriptor.
1121 const auto &columnRangeFromDesc = clusterBuilder.GetColumnRange(colId);
1122 fOpenColumnRanges[colId].fFirstElementIndex =
1123 columnRangeFromDesc.fFirstElementIndex + columnRangeFromDesc.fNElements;
1124 }
1125
1126 fDescriptorBuilder.AddCluster(clusterBuilder.MoveDescriptor().Unwrap());
1127 fPrevClusterNEntries += cluster.fNEntries;
1128 }
1129}
1130
1132{
1133 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1134
1135 const auto nClusters = descriptor.GetNActiveClusters();
1136 std::vector<DescriptorId_t> physClusterIDs;
1137 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1138 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1139 }
1140
1141 auto szPageList = RNTupleSerializer::SerializePageList(nullptr, descriptor, physClusterIDs, fSerializationContext);
1142 auto bufPageList = MakeUninitArray<unsigned char>(szPageList);
1143 RNTupleSerializer::SerializePageList(bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
1144
1145 const auto clusterGroupId = descriptor.GetNClusterGroups();
1146 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
1148 cgBuilder.ClusterGroupId(clusterGroupId).PageListLocator(locator).PageListLength(szPageList);
1149 if (fNextClusterInGroup == nClusters) {
1150 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1151 } else {
1152 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
1153 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
1154 cgBuilder.MinEntry(firstClusterDesc.GetFirstEntryIndex())
1155 .EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
1156 firstClusterDesc.GetFirstEntryIndex())
1157 .NClusters(nClusters - fNextClusterInGroup);
1158 }
1159 std::vector<DescriptorId_t> clusterIds;
1160 for (auto i = fNextClusterInGroup; i < nClusters; ++i) {
1161 clusterIds.emplace_back(i);
1162 }
1163 cgBuilder.AddClusters(clusterIds);
1164 fDescriptorBuilder.AddClusterGroup(cgBuilder.MoveDescriptor().Unwrap());
1165 fSerializationContext.MapClusterGroupId(clusterGroupId);
1166
1167 fNextClusterInGroup = nClusters;
1168}
1169
1171{
1172 if (!fStreamerInfos.empty()) {
1173 RExtraTypeInfoDescriptorBuilder extraInfoBuilder;
1176 fDescriptorBuilder.AddExtraTypeInfo(extraInfoBuilder.MoveDescriptor().Unwrap());
1177 }
1178
1179 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
1180
1181 auto szFooter = RNTupleSerializer::SerializeFooter(nullptr, descriptor, fSerializationContext);
1182 auto bufFooter = MakeUninitArray<unsigned char>(szFooter);
1183 RNTupleSerializer::SerializeFooter(bufFooter.get(), descriptor, fSerializationContext);
1184
1185 CommitDatasetImpl(bufFooter.get(), szFooter);
1186}
1187
1189{
1190 fMetrics = Detail::RNTupleMetrics(prefix);
1191 fCounters = std::make_unique<RCounters>(RCounters{
1192 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("nPageCommitted", "",
1193 "number of pages committed to storage"),
1194 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szWritePayload", "B",
1195 "volume written for committed pages"),
1196 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("szZip", "B", "volume before zipping"),
1197 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallWrite", "ns", "wall clock time spent writing"),
1198 *fMetrics.MakeCounter<Detail::RNTupleAtomicCounter *>("timeWallZip", "ns", "wall clock time spent compressing"),
1199 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuWrite", "ns",
1200 "CPU time spent writing"),
1201 *fMetrics.MakeCounter<Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> *>("timeCpuZip", "ns",
1202 "CPU time spent compressing")});
1203}
fBuffer
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:294
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
char name[80]
Definition TGX11.cxx:110
#define _(A, B)
Definition cfortran.h:108
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
const RClusterDescriptor::RColumnRange & GetColumnRange(DescriptorId_t physicalId)
RResult< void > MarkSuppressedColumnRange(DescriptorId_t physicalId)
Books the given column ID as being suppressed in this cluster.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< void > CommitSuppressedColumnRanges(const RNTupleDescriptor &desc)
Sets the first element index and number of elements for all the suppressed column ranges.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A helper class for piece-wise construction of an RColumnDescriptor.
RColumnDescriptorBuilder & PhysicalColumnId(DescriptorId_t physicalColumnId)
RColumnDescriptorBuilder & Type(EColumnType type)
RColumnDescriptorBuilder & BitsOnStorage(std::uint16_t bitsOnStorage)
RColumnDescriptorBuilder & RepresentationIndex(std::uint16_t representationIndex)
RColumnDescriptorBuilder & FieldId(DescriptorId_t fieldId)
RColumnDescriptorBuilder & Index(std::uint32_t index)
RColumnDescriptorBuilder & FirstElementIndex(std::uint64_t firstElementIdx)
RResult< RColumnDescriptor > MakeDescriptor() const
Attempt to make a column descriptor.
RColumnDescriptorBuilder & LogicalColumnId(DescriptorId_t logicalColumnId)
RColumnDescriptorBuilder & ValueRange(double min, double max)
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
RColumnElementBase * GetElement() const
Definition RColumn.hxx:331
DescriptorId_t GetOnDiskId() const
Definition RColumn.hxx:345
std::optional< std::pair< double, double > > GetValueRange() const
Definition RColumn.hxx:338
std::uint16_t GetRepresentationIndex() const
Definition RColumn.hxx:344
std::size_t GetWritePageCapacity() const
Definition RColumn.hxx:353
NTupleSize_t GetFirstElementIndex() const
Definition RColumn.hxx:346
std::uint16_t GetBitsOnStorage() const
Definition RColumn.hxx:333
A helper class for piece-wise construction of an RExtraTypeInfoDescriptor.
RExtraTypeInfoDescriptorBuilder & Content(const std::string &content)
RExtraTypeInfoDescriptorBuilder & ContentId(EExtraTypeInfoIds contentId)
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
Definition RCluster.hxx:120
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
virtual RPage NewPage(std::size_t elementSize, std::size_t nElements)=0
Reserves memory large enough to hold nElements of the given size.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
Reference to a page stored in the page pool.
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
void Insert(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
void Erase(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) override
Register a new column.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
virtual RPageRef LoadPage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *cluster)
void Attach()
Open the physical storage container and deserialize header and footer.
void UpdateLastUsedCluster(DescriptorId_t clusterId)
Does nothing if fLastUsedCluster == clusterId.
Common functionality of an ntuple storage for both reading and writing.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:56
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:47
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Increases the number elements in the page.
Definition RPage.hxx:154
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:115
std::uint32_t GetNElements() const
Definition RPage.hxx:124
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
const RFieldBase * GetSourceField(const RFieldBase *target) const
bool TryUpdate(RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
Records the partition of data into pages for a particular column in a particular cluster.
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
A field translates read and write calls from/to underlying columns to/from tree values.
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
std::unique_ptr< RNTupleModel > CreateModel(const RCreateModelOptions &options=RCreateModelOptions()) const
Re-create the C++ model from the stored meta-data.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
The RNTupleModel encapulates the schema of an ntuple.
const std::string & GetDescription() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:194
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Default I/O performance counters that get registered in fMetrics.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
The window of element indexes of a particular column in a particular cluster.
NTupleSize_t fFirstElementIndex
The global index of the first column element in the cluster.
int fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
ELocatorType fType
For non-disk locators, the value for the Type field.