Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
28#include <ROOT/RRawFile.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33#include <TFile.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <iostream>
40#include <limits>
41#include <utility>
42
43#include <atomic>
44#include <condition_variable>
45#include <functional>
46#include <mutex>
47#include <thread>
48#include <queue>
49
51 const RNTupleWriteOptions &options)
52 : RPagePersistentSink(ntupleName, options), fPageAllocator(std::make_unique<RPageAllocatorHeap>())
53{
54 static std::once_flag once;
55 std::call_once(once, []() {
56 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. "
57 << "Do not store real data with this version of RNTuple!";
58 });
59 fCompressor = std::make_unique<RNTupleCompressor>();
60 EnableDefaultMetrics("RPageSinkFile");
61}
62
63ROOT::Experimental::Internal::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
64 const RNTupleWriteOptions &options)
65 : RPageSinkFile(ntupleName, options)
66{
67 fWriter = std::unique_ptr<RNTupleFileWriter>(RNTupleFileWriter::Recreate(
69}
70
72 const RNTupleWriteOptions &options)
73 : RPageSinkFile(ntupleName, options)
74{
75 fWriter = std::unique_ptr<RNTupleFileWriter>(RNTupleFileWriter::Append(ntupleName, file));
76}
77
79
80void ROOT::Experimental::Internal::RPageSinkFile::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
81{
82 auto zipBuffer = std::make_unique<unsigned char[]>(length);
83 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
85 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
86}
87
90 std::size_t bytesPacked)
91{
92 std::uint64_t offsetData;
93 {
94 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
95 offsetData = fWriter->WriteBlob(sealedPage.fBuffer, sealedPage.fSize, bytesPacked);
96 }
97
99 result.fPosition = offsetData;
100 result.fBytesOnStorage = sealedPage.fSize;
101 fCounters->fNPageCommitted.Inc();
102 fCounters->fSzWritePayload.Add(sealedPage.fSize);
103 fNBytesCurrentCluster += sealedPage.fSize;
104 return result;
105}
106
109{
110 auto element = columnHandle.fColumn->GetElement();
111 RPageStorage::RSealedPage sealedPage;
112 {
113 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
114 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
115 }
116
117 fCounters->fSzZip.Add(page.GetNBytes());
118 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
119}
120
123 const RPageStorage::RSealedPage &sealedPage)
124{
125 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
126 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetModel().GetType());
127 const auto bytesPacked = (bitsOnStorage * sealedPage.fNElements + 7) / 8;
128
129 return WriteSealedPage(sealedPage, bytesPacked);
130}
131
132std::vector<ROOT::Experimental::RNTupleLocator>
133ROOT::Experimental::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges)
134{
135 size_t size = 0, bytesPacked = 0;
136 for (auto &range : ranges) {
137 if (range.fFirst == range.fLast) {
138 // Skip empty ranges, they might not have a physical column ID!
139 continue;
140 }
141
142 const auto bitsOnStorage = RColumnElementBase::GetBitsOnStorage(
143 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetModel().GetType());
144 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
145 size += sealedPageIt->fSize;
146 bytesPacked += (bitsOnStorage * sealedPageIt->fNElements + 7) / 8;
147 }
148 }
149 if (size >= std::numeric_limits<std::int32_t>::max() || bytesPacked >= std::numeric_limits<std::int32_t>::max()) {
150 // Cannot fit it into one key, fall back to one key per page.
151 // TODO: Remove once there is support for large keys.
153 }
154
155 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
156 // Reserve a blob that is large enough to hold all pages.
157 std::uint64_t offset = fWriter->ReserveBlob(size, bytesPacked);
158
159 // Now write the individual pages and record their locators.
160 std::vector<ROOT::Experimental::RNTupleLocator> locators;
161 for (auto &range : ranges) {
162 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
163 fWriter->WriteIntoReservedBlob(sealedPageIt->fBuffer, sealedPageIt->fSize, offset);
164 RNTupleLocator locator;
165 locator.fPosition = offset;
166 locator.fBytesOnStorage = sealedPageIt->fSize;
167 locators.push_back(locator);
168 offset += sealedPageIt->fSize;
169 }
170 }
171
172 fCounters->fNPageCommitted.Add(locators.size());
173 fCounters->fSzWritePayload.Add(size);
174 fNBytesCurrentCluster += size;
175
176 return locators;
177}
178
180{
181 auto result = fNBytesCurrentCluster;
182 fNBytesCurrentCluster = 0;
183 return result;
184}
185
188 std::uint32_t length)
189{
190 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
191 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
192 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
193
195 result.fBytesOnStorage = szPageListZip;
196 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
197 return result;
198}
199
201 std::uint32_t length)
202{
203 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
204 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
205 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
206 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
207 fWriter->Commit();
208}
209
212{
213 if (nElements == 0)
214 throw RException(R__FAIL("invalid call: request empty page"));
215 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
216 return fPageAllocator->NewPage(columnHandle.fPhysicalId, elementSize, nElements);
217}
218
220{
221 fPageAllocator->DeletePage(page);
222}
223
224
225////////////////////////////////////////////////////////////////////////////////
226
228 const RNTupleReadOptions &options)
229 : RPageSource(ntupleName, options),
230 fPagePool(std::make_shared<RPagePool>()),
231 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
232{
233 fDecompressor = std::make_unique<RNTupleDecompressor>();
234 EnableDefaultMetrics("RPageSourceFile");
235}
236
238 std::unique_ptr<ROOT::Internal::RRawFile> file,
239 const RNTupleReadOptions &options)
240 : RPageSourceFile(ntupleName, options)
241{
242 fFile = std::move(file);
245}
246
247ROOT::Experimental::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
248 const RNTupleReadOptions &options)
249 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
250{
251}
252
254{
255 // TOOD(jblomer): can the epoch check be factored out across anchors?
256 if (anchor.GetVersionEpoch() != RNTuple::kVersionEpoch) {
257 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(anchor.GetVersionEpoch())));
258 }
259 if (anchor.GetVersionEpoch() == 0) {
260 static std::once_flag once;
261 std::call_once(once, [&anchor]() {
262 R__LOG_WARNING(NTupleLog()) << "Pre-release format version: RC " << anchor.GetVersionMajor();
263 });
264 }
265
266 fDescriptorBuilder.SetOnDiskHeaderSize(anchor.GetNBytesHeader());
267 auto buffer = std::make_unique<unsigned char[]>(anchor.GetLenHeader());
268 auto zipBuffer = std::make_unique<unsigned char[]>(anchor.GetNBytesHeader());
269 fReader.ReadBuffer(zipBuffer.get(), anchor.GetNBytesHeader(), anchor.GetSeekHeader());
270 fDecompressor->Unzip(zipBuffer.get(), anchor.GetNBytesHeader(), anchor.GetLenHeader(), buffer.get());
271 RNTupleSerializer::DeserializeHeader(buffer.get(), anchor.GetLenHeader(), fDescriptorBuilder);
272
273 fDescriptorBuilder.AddToOnDiskFooterSize(anchor.GetNBytesFooter());
274 buffer = std::make_unique<unsigned char[]>(anchor.GetLenFooter());
275 zipBuffer = std::make_unique<unsigned char[]>(anchor.GetNBytesFooter());
276 fReader.ReadBuffer(zipBuffer.get(), anchor.GetNBytesFooter(), anchor.GetSeekFooter());
277 fDecompressor->Unzip(zipBuffer.get(), anchor.GetNBytesFooter(), anchor.GetLenFooter(), buffer.get());
278 RNTupleSerializer::DeserializeFooter(buffer.get(), anchor.GetLenFooter(), fDescriptorBuilder);
279}
280
281std::unique_ptr<ROOT::Experimental::Internal::RPageSourceFile>
283 const RNTupleReadOptions &options)
284{
285 if (!anchor.fFile)
286 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
287
288 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
289 // For local files and supported transport protocols, we want to open a new RRawFile to take advantage of the faster
290 // reading. To detect local files, we do not check for "file" because this could also happen for endpoint URLs of
291 // TMemFiles, but for the RTTI to be exactly a TFile.
292 auto url = anchor.fFile->GetEndpointUrl();
293 auto protocol = std::string(url->GetProtocol());
294 if (typeid(*anchor.fFile) == typeid(TFile)) {
295 rawFile = ROOT::Internal::RRawFile::Create(url->GetFile());
296 } else if (protocol == "http" || protocol == "https" || protocol == "root" || protocol == "roots") {
297 rawFile = ROOT::Internal::RRawFile::Create(url->GetUrl());
298 } else {
299 rawFile.reset(new ROOT::Internal::RRawFileTFile(anchor.fFile));
300 }
301
302 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
303 pageSource->InitDescriptor(anchor);
304 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
305 return pageSource;
306}
307
309
311{
312 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
313 // Otherwise, the page source was created by OpenFromAnchor() and the header and footer are already processed.
314 if (fDescriptorBuilder.GetDescriptor().GetOnDiskHeaderSize() == 0) {
315 auto anchor = fReader.GetNTuple(fNTupleName).Unwrap();
316 InitDescriptor(anchor);
317 }
318
319 auto desc = fDescriptorBuilder.MoveDescriptor();
320
321 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
322 auto buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
323 auto zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
324 fReader.ReadBuffer(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage,
325 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
326 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
327 buffer.get());
328
329 RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
330 }
331
332 return desc;
333}
334
336 RClusterIndex clusterIndex, RSealedPage &sealedPage)
337{
338 const auto clusterId = clusterIndex.GetClusterId();
339
341 {
342 auto descriptorGuard = GetSharedDescriptorGuard();
343 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
344 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
345 }
346
347 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
348 sealedPage.fSize = bytesOnStorage;
349 sealedPage.fNElements = pageInfo.fNElements;
350 if (!sealedPage.fBuffer)
351 return;
353 fReader.ReadBuffer(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage,
354 pageInfo.fLocator.GetPosition<std::uint64_t>());
355 } else {
356 memcpy(const_cast<void *>(sealedPage.fBuffer), RPage::GetPageZeroBuffer(), bytesOnStorage);
357 }
358}
359
362 const RClusterInfo &clusterInfo,
363 ClusterSize_t::ValueType idxInCluster)
364{
365 const auto columnId = columnHandle.fPhysicalId;
366 const auto clusterId = clusterInfo.fClusterId;
367 const auto pageInfo = clusterInfo.fPageInfo;
368
369 const auto element = columnHandle.fColumn->GetElement();
370 const auto elementSize = element->GetSize();
371 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
372
373 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
374 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
375
376 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
377 auto pageZero = RPage::MakePageZero(columnId, elementSize);
378 pageZero.GrowUnchecked(pageInfo.fNElements);
379 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
380 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
381 fPagePool->RegisterPage(pageZero, RPageDeleter([](const RPage &, void *) {}, nullptr));
382 return pageZero;
383 }
384
385 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
386 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[bytesOnStorage]);
387 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.GetPosition<std::uint64_t>());
388 fCounters->fNPageLoaded.Inc();
389 fCounters->fNRead.Inc();
390 fCounters->fSzReadPayload.Add(bytesOnStorage);
391 sealedPageBuffer = directReadBuffer.get();
392 } else {
393 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
394 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
395 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
396
397 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
398 if (!cachedPage.IsNull())
399 return cachedPage;
400
401 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
402 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
403 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
404 sealedPageBuffer = onDiskPage->GetAddress();
405 }
406
407 RPage newPage;
408 {
409 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
410 newPage = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element, columnId);
411 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
412 }
413
414 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
415 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
416 fPagePool->RegisterPage(
417 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
418 fCounters->fNPagePopulated.Inc();
419 return newPage;
420}
421
424{
425 const auto columnId = columnHandle.fPhysicalId;
426 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
427 if (!cachedPage.IsNull())
428 return cachedPage;
429
430 std::uint64_t idxInCluster;
431 RClusterInfo clusterInfo;
432 {
433 auto descriptorGuard = GetSharedDescriptorGuard();
434 clusterInfo.fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
435
436 if (clusterInfo.fClusterId == kInvalidDescriptorId)
437 throw RException(R__FAIL("entry with index " + std::to_string(globalIndex) + " out of bounds"));
438
439 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.fClusterId);
440 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
441 R__ASSERT(clusterInfo.fColumnOffset <= globalIndex);
442 idxInCluster = globalIndex - clusterInfo.fColumnOffset;
443 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
444 }
445
446 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
447}
448
451{
452 const auto clusterId = clusterIndex.GetClusterId();
453 const auto idxInCluster = clusterIndex.GetIndex();
454 const auto columnId = columnHandle.fPhysicalId;
455 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
456 if (!cachedPage.IsNull())
457 return cachedPage;
458
459 if (clusterId == kInvalidDescriptorId)
460 throw RException(R__FAIL("entry out of bounds"));
461
462 RClusterInfo clusterInfo;
463 {
464 auto descriptorGuard = GetSharedDescriptorGuard();
465 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
466 clusterInfo.fClusterId = clusterId;
467 clusterInfo.fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
468 clusterInfo.fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
469 }
470
471 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
472}
473
475{
476 fPagePool->ReturnPage(page);
477}
478
479std::unique_ptr<ROOT::Experimental::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceFile::Clone() const
480{
481 auto clone = new RPageSourceFile(fNTupleName, fOptions);
482 clone->fFile = fFile->Clone();
483 clone->fReader = RMiniFileReader(clone->fFile.get());
484 return std::unique_ptr<RPageSourceFile>(clone);
485}
486
487std::unique_ptr<ROOT::Experimental::Internal::RCluster>
489 const RCluster::RKey &clusterKey, std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
490{
491 struct ROnDiskPageLocator {
494 std::uint64_t fOffset = 0;
495 std::uint64_t fSize = 0;
496 std::size_t fBufPos = 0;
497 };
498
499 std::vector<ROnDiskPageLocator> onDiskPages;
500 auto activeSize = 0;
501 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
502 PrepareLoadCluster(clusterKey, *pageZeroMap,
503 [&](DescriptorId_t physicalColumnId, NTupleSize_t pageNo,
505 const auto &pageLocator = pageInfo.fLocator;
506 activeSize += pageLocator.fBytesOnStorage;
507 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(),
508 pageLocator.fBytesOnStorage, 0});
509 });
510
511 // Linearize the page requests by file offset
512 std::sort(onDiskPages.begin(), onDiskPages.end(),
513 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) {return a.fOffset < b.fOffset;});
514
515 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
516 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
517 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
518 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
519 // of extra bytes.
520 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
521 // memory consumption, device block size.
522 float maxOverhead = 0.25 * float(activeSize);
523 std::vector<std::size_t> gaps;
524 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
525 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].fSize + onDiskPages[i-1].fOffset));
526 }
527 std::sort(gaps.begin(), gaps.end());
528 std::size_t gapCut = 0;
529 std::size_t currentGap = 0;
530 float szExtra = 0.0;
531 for (auto g : gaps) {
532 if (g != currentGap) {
533 gapCut = currentGap;
534 currentGap = g;
535 }
536 szExtra += g;
537 if (szExtra > maxOverhead)
538 break;
539 }
540
541 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
542 // In a second step, we'll fix-up the memory destinations for the read calls given the
543 // address of the allocated buffer. We must not touch, however, the read requests from previous
544 // calls to PrepareSingleCluster()
545 const auto currentReadRequestIdx = readRequests.size();
546
548 std::size_t szPayload = 0;
549 std::size_t szOverhead = 0;
550 for (auto &s : onDiskPages) {
551 R__ASSERT(s.fSize > 0);
552 auto readUpTo = req.fOffset + req.fSize;
553 R__ASSERT(s.fOffset >= readUpTo);
554 auto overhead = s.fOffset - readUpTo;
555 szPayload += s.fSize;
556 if (overhead <= gapCut) {
557 szOverhead += overhead;
558 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
559 req.fSize += overhead + s.fSize;
560 continue;
561 }
562
563 // close the current request and open new one
564 if (req.fSize > 0)
565 readRequests.emplace_back(req);
566
567 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
568 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
569
570 req.fOffset = s.fOffset;
571 req.fSize = s.fSize;
572 }
573 readRequests.emplace_back(req);
574 fCounters->fSzReadPayload.Add(szPayload);
575 fCounters->fSzReadOverhead.Add(szOverhead);
576
577 // Register the on disk pages in a page map
578 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
579 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
580 for (const auto &s : onDiskPages) {
581 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
582 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
583 }
584 fCounters->fNPageLoaded.Add(onDiskPages.size());
585 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
586 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
587 }
588
589 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
590 cluster->Adopt(std::move(pageMap));
591 cluster->Adopt(std::move(pageZeroMap));
592 for (auto colId : clusterKey.fPhysicalColumnSet)
593 cluster->SetColumnAvailable(colId);
594 return cluster;
595}
596
597std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
599{
600 fCounters->fNClusterLoaded.Add(clusterKeys.size());
601
602 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
603 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
604
605 for (auto key: clusterKeys) {
606 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
607 }
608
609 auto nReqs = readRequests.size();
610 auto readvLimits = fFile->GetReadVLimits();
611
612 int iReq = 0;
613 while (nReqs > 0) {
614 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
615
616 if (readvLimits.HasSizeLimit()) {
617 std::uint64_t totalSize = 0;
618 for (std::size_t i = 0; i < nBatch; ++i) {
619 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
620 nBatch = i;
621 break;
622 }
623
624 totalSize += readRequests[iReq + i].fSize;
625 if (totalSize > readvLimits.fMaxTotalSize) {
626 nBatch = i;
627 break;
628 }
629 }
630 }
631
632 if (nBatch <= 1) {
633 nBatch = 1;
634 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
635 fFile->ReadAt(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
636 } else {
637 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
638 fFile->ReadV(&readRequests[iReq], nBatch);
639 }
640 fCounters->fNReadV.Inc();
641 fCounters->fNRead.Add(nBatch);
642
643 iReq += nBatch;
644 nReqs -= nBatch;
645 }
646
647 return clusters;
648}
649
651{
652 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
653
654 const auto clusterId = cluster->GetId();
655 auto descriptorGuard = GetSharedDescriptorGuard();
656 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
657
658 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
659
660 const auto &columnsInCluster = cluster->GetAvailPhysicalColumns();
661 for (const auto columnId : columnsInCluster) {
662 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
663
664 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
665
666 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
667 std::uint64_t pageNo = 0;
668 std::uint64_t firstInPage = 0;
669 for (const auto &pi : pageRange.fPageInfos) {
670 ROnDiskPage::Key key(columnId, pageNo);
671 auto onDiskPage = cluster->GetOnDiskPage(key);
672 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
673
674 auto taskFunc = [this, columnId, clusterId, firstInPage, onDiskPage, element = allElements.back().get(),
675 nElements = pi.fNElements,
676 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
677 auto newPage = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element, columnId);
678 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
679
680 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
681 fPagePool->PreloadPage(
682 newPage, RPageDeleter([](const RPage &page, void *) { RPageAllocatorHeap::DeletePage(page); }, nullptr));
683 };
684
685 fTaskScheduler->AddTask(taskFunc);
686
687 firstInPage += pi.fNElements;
688 pageNo++;
689 } // for all pages in column
690 } // for all columns in cluster
691
692 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
693
694 fTaskScheduler->Wait();
695}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
TObject * clone(const char *newname) const override
Definition RooChi2Var.h:9
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
#define R__ASSERT(e)
Definition TError.h:118
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Record wall time and CPU time between construction and destruction.
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
const ColumnSet_t & GetAvailPhysicalColumns() const
Definition RCluster.hxx:196
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:32
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:325
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:54
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, EContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
Storage provider that write ntuple pages into a file.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
std::uint64_t CommitClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
std::unique_ptr< RNTupleFileWriter > fWriter
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
void InitDescriptor(const RNTuple &anchor)
Deserialized header and footer into a minimal descriptor held by fDescriptorBuilder.
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:48
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:134
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:83
std::uint32_t GetNElements() const
Definition RPage.hxx:84
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:117
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:18
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
std::uint64_t GetLenFooter() const
Definition RNTuple.hxx:118
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:67
std::uint64_t GetNBytesHeader() const
Definition RNTuple.hxx:113
std::uint64_t GetLenHeader() const
Definition RNTuple.hxx:114
std::uint16_t GetVersionEpoch() const
Definition RNTuple.hxx:107
std::uint64_t GetNBytesFooter() const
Definition RNTuple.hxx:117
TFile * fFile
! The file from which the ntuple was streamed, registered in the custom streamer
Definition RNTuple.hxx:101
std::uint64_t GetSeekHeader() const
Definition RNTuple.hxx:112
std::uint16_t GetVersionMajor() const
Definition RNTuple.hxx:108
std::uint64_t GetSeekFooter() const
Definition RNTuple.hxx:116
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:73
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual const TUrl * GetEndpointUrl() const
Definition TFile.h:235
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition TObject.cxx:223
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Generic information about the physical location of data.
ELocatorType fType
For non-disk locators, the value for the Type field.
std::variant< std::uint64_t, std::string, RNTupleLocatorObject64 > fPosition
Simple on-disk locators consisting of a 64-bit offset use variant type uint64_t; extended locators ha...
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:71
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:77
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:73
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:75