Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
28#include <ROOT/RRawFile.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33#include <TFile.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <iostream>
40#include <limits>
41#include <utility>
42
43#include <atomic>
44#include <condition_variable>
45#include <functional>
46#include <mutex>
47#include <thread>
48#include <queue>
49
51 const RNTupleWriteOptions &options)
52 : RPagePersistentSink(ntupleName, options), fPageAllocator(std::make_unique<RPageAllocatorHeap>())
53{
54 static std::once_flag once;
55 std::call_once(once, []() {
56 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. "
57 << "Do not store real data with this version of RNTuple!";
58 });
59 fCompressor = std::make_unique<RNTupleCompressor>();
60 EnableDefaultMetrics("RPageSinkFile");
61}
62
64 const RNTupleWriteOptions &options)
65 : RPageSinkFile(ntupleName, options)
66{
67 fWriter = std::unique_ptr<RNTupleFileWriter>(RNTupleFileWriter::Recreate(
69}
70
72 const RNTupleWriteOptions &options)
73 : RPageSinkFile(ntupleName, options)
74{
75 fWriter = std::unique_ptr<RNTupleFileWriter>(RNTupleFileWriter::Append(ntupleName, file));
76}
77
79
81{
82 auto zipBuffer = std::make_unique<unsigned char[]>(length);
83 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
85 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
86}
87
90 std::size_t bytesPacked)
91{
92 std::uint64_t offsetData;
93 {
94 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
95 offsetData = fWriter->WriteBlob(sealedPage.fBuffer, sealedPage.fSize, bytesPacked);
96 }
97
99 result.fPosition = offsetData;
100 result.fBytesOnStorage = sealedPage.fSize;
101 fCounters->fNPageCommitted.Inc();
102 fCounters->fSzWritePayload.Add(sealedPage.fSize);
103 fNBytesCurrentCluster += sealedPage.fSize;
104 return result;
105}
106
109{
110 auto element = columnHandle.fColumn->GetElement();
112 {
113 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
114 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
115 }
116
117 fCounters->fSzZip.Add(page.GetNBytes());
118 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
119}
120
124{
126 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetModel().GetType());
127 const auto bytesPacked = (bitsOnStorage * sealedPage.fNElements + 7) / 8;
128
129 return WriteSealedPage(sealedPage, bytesPacked);
130}
131
132std::vector<ROOT::Experimental::RNTupleLocator>
133ROOT::Experimental::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
134{
135 size_t size = 0, bytesPacked = 0;
136 for (auto &range : ranges) {
137 if (range.fFirst == range.fLast) {
138 // Skip empty ranges, they might not have a physical column ID!
139 continue;
140 }
141
143 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetModel().GetType());
144 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
145 size += sealedPageIt->fSize;
146 bytesPacked += (bitsOnStorage * sealedPageIt->fNElements + 7) / 8;
147 }
148 }
149 if (size >= std::numeric_limits<std::int32_t>::max() || bytesPacked >= std::numeric_limits<std::int32_t>::max()) {
150 // Cannot fit it into one key, fall back to one key per page.
151 // TODO: Remove once there is support for large keys.
153 }
154
155 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
156 // Reserve a blob that is large enough to hold all pages.
157 std::uint64_t offset = fWriter->ReserveBlob(size, bytesPacked);
158
159 // Now write the individual pages and record their locators.
160 std::vector<ROOT::Experimental::RNTupleLocator> locators;
161 for (auto &range : ranges) {
162 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
163 fWriter->WriteIntoReservedBlob(sealedPageIt->fBuffer, sealedPageIt->fSize, offset);
165 locator.fPosition = offset;
166 locator.fBytesOnStorage = sealedPageIt->fSize;
167 locators.push_back(locator);
168 offset += sealedPageIt->fSize;
169 }
170 }
171
172 fCounters->fNPageCommitted.Add(locators.size());
173 fCounters->fSzWritePayload.Add(size);
174 fNBytesCurrentCluster += size;
175
176 return locators;
177}
178
180{
181 auto result = fNBytesCurrentCluster;
182 fNBytesCurrentCluster = 0;
183 return result;
184}
185
188 std::uint32_t length)
189{
190 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
191 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
193
195 result.fBytesOnStorage = szPageListZip;
196 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
197 return result;
198}
199
201 std::uint32_t length)
202{
203 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
204 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
206 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
207 fWriter->Commit();
208}
209
212{
213 if (nElements == 0)
214 throw RException(R__FAIL("invalid call: request empty page"));
215 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
216 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
217}
218
220{
221 fPageAllocator->DeletePage(page);
222}
223
224
225////////////////////////////////////////////////////////////////////////////////
226
228 const RNTupleReadOptions &options)
229 : RPageSource(ntupleName, options),
230 fPagePool(std::make_shared<RPagePool>()),
231 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
232{
233 fDecompressor = std::make_unique<RNTupleDecompressor>();
234 EnableDefaultMetrics("RPageSourceFile");
235}
236
238 std::unique_ptr<ROOT::Internal::RRawFile> file,
239 const RNTupleReadOptions &options)
240 : RPageSourceFile(ntupleName, options)
241{
242 fFile = std::move(file);
245}
246
248 const RNTupleReadOptions &options)
249 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
250{
251}
252
254{
255 // TOOD(jblomer): can the epoch check be factored out across anchors?
256 if (anchor.GetVersionEpoch() != RNTuple::kVersionEpoch) {
257 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(anchor.GetVersionEpoch())));
258 }
259 if (anchor.GetVersionEpoch() == 0) {
260 static std::once_flag once;
261 std::call_once(once, [&anchor]() {
262 R__LOG_WARNING(NTupleLog()) << "Pre-release format version: RC " << anchor.GetVersionMajor();
263 });
264 }
265
266 fDescriptorBuilder.SetOnDiskHeaderSize(anchor.GetNBytesHeader());
267 auto buffer = std::make_unique<unsigned char[]>(anchor.GetLenHeader());
268 auto zipBuffer = std::make_unique<unsigned char[]>(anchor.GetNBytesHeader());
269 fReader.ReadBuffer(zipBuffer.get(), anchor.GetNBytesHeader(), anchor.GetSeekHeader());
270 fDecompressor->Unzip(zipBuffer.get(), anchor.GetNBytesHeader(), anchor.GetLenHeader(), buffer.get());
271 RNTupleSerializer::DeserializeHeader(buffer.get(), anchor.GetLenHeader(), fDescriptorBuilder);
272
273 fDescriptorBuilder.AddToOnDiskFooterSize(anchor.GetNBytesFooter());
274 buffer = std::make_unique<unsigned char[]>(anchor.GetLenFooter());
275 zipBuffer = std::make_unique<unsigned char[]>(anchor.GetNBytesFooter());
276 fReader.ReadBuffer(zipBuffer.get(), anchor.GetNBytesFooter(), anchor.GetSeekFooter());
277 fDecompressor->Unzip(zipBuffer.get(), anchor.GetNBytesFooter(), anchor.GetLenFooter(), buffer.get());
278 RNTupleSerializer::DeserializeFooter(buffer.get(), anchor.GetLenFooter(), fDescriptorBuilder);
279}
280
281std::unique_ptr<ROOT::Experimental::Internal::RPageSourceFile>
283 const RNTupleReadOptions &options)
284{
285 if (!anchor.fFile)
286 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
287
288 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
289 // For local files and supported transport protocols, we want to open a new RRawFile to take advantage of the faster
290 // reading. To detect local files, we do not check for "file" because this could also happen for endpoint URLs of
291 // TMemFiles, but for the RTTI to be exactly a TFile.
292 auto url = anchor.fFile->GetEndpointUrl();
293 auto protocol = std::string(url->GetProtocol());
294 if (typeid(*anchor.fFile) == typeid(TFile)) {
296 } else if (protocol == "http" || protocol == "https" || protocol == "root" || protocol == "roots") {
298 } else {
300 }
301
302 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
303 pageSource->InitDescriptor(anchor);
304 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
305 return pageSource;
306}
307
309
311{
312 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
313 // Otherwise, the page source was created by OpenFromAnchor() and the header and footer are already processed.
314 if (fDescriptorBuilder.GetDescriptor().GetOnDiskHeaderSize() == 0) {
315 auto anchor = fReader.GetNTuple(fNTupleName).Unwrap();
316 InitDescriptor(anchor);
317 }
318
319 auto desc = fDescriptorBuilder.MoveDescriptor();
320
321 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
322 auto buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
323 auto zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
324 fReader.ReadBuffer(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage,
325 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
326 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
327 buffer.get());
328
329 RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
330 }
331
332 return desc;
333}
334
337{
338 const auto clusterId = clusterIndex.GetClusterId();
339
341 {
342 auto descriptorGuard = GetSharedDescriptorGuard();
343 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
344 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
345 }
346
347 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
349 sealedPage.fNElements = pageInfo.fNElements;
350 if (!sealedPage.fBuffer)
351 return;
352 if (pageInfo.fLocator.fType != RNTupleLocator::kTypePageZero) {
353 fReader.ReadBuffer(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage,
354 pageInfo.fLocator.GetPosition<std::uint64_t>());
355 } else {
356 memcpy(const_cast<void *>(sealedPage.fBuffer), RPage::GetPageZeroBuffer(), bytesOnStorage);
357 }
358}
359
364{
365 const auto columnId = columnHandle.fPhysicalId;
366 const auto clusterId = clusterInfo.fClusterId;
367 const auto pageInfo = clusterInfo.fPageInfo;
368
369 const auto element = columnHandle.fColumn->GetElement();
370 const auto elementSize = element->GetSize();
371 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
372
373 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
374 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
375
376 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
378 pageZero.GrowUnchecked(pageInfo.fNElements);
379 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
381 fPagePool->RegisterPage(pageZero, RPageDeleter([](const RPage &, void *) {}, nullptr));
382 return pageZero;
383 }
384
385 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
386 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[bytesOnStorage]);
387 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.GetPosition<std::uint64_t>());
388 fCounters->fNPageLoaded.Inc();
389 fCounters->fNRead.Inc();
390 fCounters->fSzReadPayload.Add(bytesOnStorage);
392 } else {
393 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
394 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
395 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
396
397 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
398 if (!cachedPage.IsNull())
399 return cachedPage;
400
401 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
402 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
403 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
404 sealedPageBuffer = onDiskPage->GetAddress();
405 }
406
408 {
409 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
410 newPage = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element, columnId);
411 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
412 }
413
414 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
416 fPagePool->RegisterPage(
417 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
418 fCounters->fNPagePopulated.Inc();
419 return newPage;
420}
421
424{
425 const auto columnId = columnHandle.fPhysicalId;
426 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
427 if (!cachedPage.IsNull())
428 return cachedPage;
429
430 std::uint64_t idxInCluster;
432 {
433 auto descriptorGuard = GetSharedDescriptorGuard();
434 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
435
436 if (clusterInfo.fClusterId == kInvalidDescriptorId)
437 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
438
439 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
440 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
441 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
442 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
443 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
444 }
445
446 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
447}
448
451{
452 const auto clusterId = clusterIndex.GetClusterId();
453 const auto idxInCluster = clusterIndex.GetIndex();
454 const auto columnId = columnHandle.fPhysicalId;
455 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
456 if (!cachedPage.IsNull())
457 return cachedPage;
458
460 throw RException(R__FAIL("entry out of bounds"));
461
463 {
464 auto descriptorGuard = GetSharedDescriptorGuard();
465 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
466 clusterInfo.fClusterId = clusterId;
467 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
468 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
469 }
470
471 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
472}
473
478
479std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceFile::Clone() const
480{
481 auto clone = new RPageSourceFile(fNTupleName, fOptions);
482 clone->fFile = fFile->Clone();
483 clone->fReader = RMiniFileReader(clone->fFile.get());
484 return std::unique_ptr<RPageSourceFile>(clone);
485}
486
487std::unique_ptr<ROOT::Experimental::Internal::RCluster>
489 const RCluster::RKey &clusterKey, std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
490{
491 struct ROnDiskPageLocator {
494 std::uint64_t fOffset = 0;
495 std::uint64_t fSize = 0;
496 std::size_t fBufPos = 0;
497 };
498
499 std::vector<ROnDiskPageLocator> onDiskPages;
500 auto activeSize = 0;
501 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
502 PrepareLoadCluster(clusterKey, *pageZeroMap,
505 const auto &pageLocator = pageInfo.fLocator;
506 activeSize += pageLocator.fBytesOnStorage;
507 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(),
508 pageLocator.fBytesOnStorage, 0});
509 });
510
511 // Linearize the page requests by file offset
512 std::sort(onDiskPages.begin(), onDiskPages.end(),
513 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) {return a.fOffset < b.fOffset;});
514
515 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
516 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
517 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
518 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
519 // of extra bytes.
520 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
521 // memory consumption, device block size.
522 float maxOverhead = 0.25 * float(activeSize);
523 std::vector<std::size_t> gaps;
524 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
525 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].fSize + onDiskPages[i-1].fOffset));
526 }
527 std::sort(gaps.begin(), gaps.end());
528 std::size_t gapCut = 0;
529 std::size_t currentGap = 0;
530 float szExtra = 0.0;
531 for (auto g : gaps) {
532 if (g != currentGap) {
534 currentGap = g;
535 }
536 szExtra += g;
537 if (szExtra > maxOverhead)
538 break;
539 }
540
541 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
542 // In a second step, we'll fix-up the memory destinations for the read calls given the
543 // address of the allocated buffer. We must not touch, however, the read requests from previous
544 // calls to PrepareSingleCluster()
545 const auto currentReadRequestIdx = readRequests.size();
546
548 std::size_t szPayload = 0;
549 std::size_t szOverhead = 0;
550 for (auto &s : onDiskPages) {
551 R__ASSERT(s.fSize > 0);
552 auto readUpTo = req.fOffset + req.fSize;
553 R__ASSERT(s.fOffset >= readUpTo);
554 auto overhead = s.fOffset - readUpTo;
555 szPayload += s.fSize;
556 if (overhead <= gapCut) {
558 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
559 req.fSize += overhead + s.fSize;
560 continue;
561 }
562
563 // close the current request and open new one
564 if (req.fSize > 0)
565 readRequests.emplace_back(req);
566
567 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
568 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
569
570 req.fOffset = s.fOffset;
571 req.fSize = s.fSize;
572 }
573 readRequests.emplace_back(req);
574 fCounters->fSzReadPayload.Add(szPayload);
575 fCounters->fSzReadOverhead.Add(szOverhead);
576
577 // Register the on disk pages in a page map
578 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
579 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
580 for (const auto &s : onDiskPages) {
581 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
582 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
583 }
584 fCounters->fNPageLoaded.Add(onDiskPages.size());
585 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
586 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
587 }
588
589 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
590 cluster->Adopt(std::move(pageMap));
591 cluster->Adopt(std::move(pageZeroMap));
592 for (auto colId : clusterKey.fPhysicalColumnSet)
593 cluster->SetColumnAvailable(colId);
594 return cluster;
595}
596
597std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
599{
600 fCounters->fNClusterLoaded.Add(clusterKeys.size());
601
602 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
603 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
604
605 for (auto key: clusterKeys) {
606 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
607 }
608
609 auto nReqs = readRequests.size();
610 auto readvLimits = fFile->GetReadVLimits();
611
612 int iReq = 0;
613 while (nReqs > 0) {
614 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
615
616 if (readvLimits.HasSizeLimit()) {
617 std::uint64_t totalSize = 0;
618 for (std::size_t i = 0; i < nBatch; ++i) {
619 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
620 nBatch = i;
621 break;
622 }
623
624 totalSize += readRequests[iReq + i].fSize;
625 if (totalSize > readvLimits.fMaxTotalSize) {
626 nBatch = i;
627 break;
628 }
629 }
630 }
631
632 if (nBatch <= 1) {
633 nBatch = 1;
634 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
635 fFile->ReadAt(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
636 } else {
637 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
638 fFile->ReadV(&readRequests[iReq], nBatch);
639 }
640 fCounters->fNReadV.Inc();
641 fCounters->fNRead.Add(nBatch);
642
643 iReq += nBatch;
644 nReqs -= nBatch;
645 }
646
647 return clusters;
648}
649
651{
652 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
653
654 const auto clusterId = cluster->GetId();
655 auto descriptorGuard = GetSharedDescriptorGuard();
656 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
657
658 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
659
660 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
661 for (const auto columnId : columnsInCluster) {
662 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
663
664 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
665
666 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
667 std::uint64_t pageNo = 0;
668 std::uint64_t firstInPage = 0;
669 for (const auto &pi : pageRange.fPageInfos) {
671 auto onDiskPage = cluster->GetOnDiskPage(key);
672 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
673
674 auto taskFunc = [this, columnId, clusterId, firstInPage, onDiskPage, element = allElements.back().get(),
675 nElements = pi.fNElements,
676 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
677 auto newPage = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element, columnId);
678 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
679
681 fPagePool->PreloadPage(
682 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
683 };
684
685 fTaskScheduler->AddTask(taskFunc);
686
687 firstInPage += pi.fNElements;
688 pageNo++;
689 } // for all pages in column
690 } // for all columns in cluster
691
692 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
693
694 fTaskScheduler->Wait();
695}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
TObject * clone(const char *newname) const override
Definition RooChi2Var.h:9
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:54
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, EContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
Storage provider that write ntuple pages into a file.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
std::uint64_t CommitClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
std::unique_ptr< RNTupleFileWriter > fWriter
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
void InitDescriptor(const RNTuple &anchor)
Deserialized header and footer into a minimal descriptor held by fDescriptorBuilder.
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:48
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:134
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:18
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:67
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:73
const_iterator begin() const
const_iterator end() const
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition TObject.cxx:223
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Summarizes cluster-level information that are necessary to populate a certain page.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
Generic information about the physical location of data.
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:71
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:77
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:73
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:75