Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RLogger.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleZip.hxx>
23#include <ROOT/RPage.hxx>
25#include <ROOT/RPagePool.hxx>
27#include <ROOT/RRawFile.hxx>
29#include <ROOT/RNTupleUtil.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33#include <TFile.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <iterator>
40#include <limits>
41#include <utility>
42
43#include <functional>
44#include <mutex>
45
47 const RNTupleWriteOptions &options)
48 : RPagePersistentSink(ntupleName, options)
49{
50 static std::once_flag once;
51 std::call_once(once, []() {
52 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. "
53 << "Do not store real data with this version of RNTuple!";
54 });
55 fCompressor = std::make_unique<RNTupleCompressor>();
56 EnableDefaultMetrics("RPageSinkFile");
58}
59
60ROOT::Experimental::Internal::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
61 const RNTupleWriteOptions &options)
62 : RPageSinkFile(ntupleName, options)
63{
65}
66
68 const RNTupleWriteOptions &options)
69 : RPageSinkFile(ntupleName, options)
70{
71 fWriter = RNTupleFileWriter::Append(ntupleName, file, options.GetMaxKeySize());
72}
73
75
76void ROOT::Experimental::Internal::RPageSinkFile::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
77{
78 auto zipBuffer = std::make_unique<unsigned char[]>(length);
79 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
81 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
82}
83
86 std::size_t bytesPacked)
87{
88 std::uint64_t offsetData;
89 {
90 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
91 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), bytesPacked);
92 }
93
95 result.fPosition = offsetData;
96 result.fBytesOnStorage = sealedPage.GetDataSize();
97 fCounters->fNPageCommitted.Inc();
98 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
99 fNBytesCurrentCluster += sealedPage.GetBufferSize();
100 return result;
101}
102
105{
106 auto element = columnHandle.fColumn->GetElement();
107 RPageStorage::RSealedPage sealedPage;
108 {
109 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
110 sealedPage = SealPage(page, *element);
111 }
112
113 fCounters->fSzZip.Add(page.GetNBytes());
114 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
115}
116
119 const RPageStorage::RSealedPage &sealedPage)
120{
121 const auto nBits = fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetBitsOnStorage();
122 const auto bytesPacked = (nBits * sealedPage.GetNElements() + 7) / 8;
123 return WriteSealedPage(sealedPage, bytesPacked);
124}
125
127 std::vector<RNTupleLocator> &locators)
128{
129 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
130
131 std::uint64_t offset = fWriter->ReserveBlob(batch.fSize, batch.fBytesPacked);
132
133 locators.reserve(locators.size() + batch.fSealedPages.size());
134
135 for (const auto *pagePtr : batch.fSealedPages) {
136 fWriter->WriteIntoReservedBlob(pagePtr->GetBuffer(), pagePtr->GetBufferSize(), offset);
137 RNTupleLocator locator;
138 locator.fPosition = offset;
139 locator.fBytesOnStorage = pagePtr->GetDataSize();
140 locators.push_back(locator);
141 offset += pagePtr->GetBufferSize();
142 }
143
144 fCounters->fNPageCommitted.Add(batch.fSealedPages.size());
145 fCounters->fSzWritePayload.Add(batch.fSize);
146 fNBytesCurrentCluster += batch.fSize;
147
148 batch.fSize = 0;
149 batch.fBytesPacked = 0;
150 batch.fSealedPages.clear();
151}
152
153std::vector<ROOT::Experimental::RNTupleLocator>
154ROOT::Experimental::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
155 const std::vector<bool> &mask)
156{
157 const std::uint64_t maxKeySize = fOptions->GetMaxKeySize();
158
159 CommitBatch batch{};
160 std::vector<ROOT::Experimental::RNTupleLocator> locators;
161
162 std::size_t iPage = 0;
163 for (auto rangeIt = ranges.begin(); rangeIt != ranges.end(); ++rangeIt) {
164 auto &range = *rangeIt;
165 if (range.fFirst == range.fLast) {
166 // Skip empty ranges, they might not have a physical column ID!
167 continue;
168 }
169
170 const auto bitsOnStorage =
171 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetBitsOnStorage();
172
173 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt, ++iPage) {
174 if (!mask[iPage])
175 continue;
176
177 const auto bytesPacked = (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
178
179 if (batch.fSize > 0 && batch.fSize + sealedPageIt->GetBufferSize() > maxKeySize) {
180 /**
181 * Adding this page would exceed maxKeySize. Since we always want to write into a single key
182 * with vectorized writes, we commit the current set of pages before proceeding.
183 * NOTE: we do this *before* checking if sealedPageIt->GetBufferSize() > maxKeySize to guarantee that
184 * we always flush the current batch before doing an individual WriteBlob. This way we
185 * preserve the assumption that a CommitBatch always contain a sequential set of pages.
186 */
187 CommitBatchOfPages(batch, locators);
188 }
189
190 if (sealedPageIt->GetBufferSize() > maxKeySize) {
191 // This page alone is bigger than maxKeySize: save it by itself, since it will need to be
192 // split into multiple keys.
193
194 // Since this check implies the previous check on batchSize + newSize > maxSize, we should
195 // already have committed the current batch before writing this page.
196 assert(batch.fSize == 0);
197
198 std::uint64_t offset =
199 fWriter->WriteBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetBufferSize(), bytesPacked);
200 RNTupleLocator locator;
201 locator.fPosition = offset;
202 locator.fBytesOnStorage = sealedPageIt->GetDataSize();
203 locators.push_back(locator);
204
205 fCounters->fNPageCommitted.Inc();
206 fCounters->fSzWritePayload.Add(sealedPageIt->GetBufferSize());
207 fNBytesCurrentCluster += sealedPageIt->GetBufferSize();
208
209 } else {
210 batch.fSealedPages.emplace_back(&(*sealedPageIt));
211 batch.fSize += sealedPageIt->GetBufferSize();
212 batch.fBytesPacked += bytesPacked;
213 }
214 }
215 }
216
217 if (batch.fSize > 0) {
218 CommitBatchOfPages(batch, locators);
219 }
220
221 return locators;
222}
223
225{
226 auto result = fNBytesCurrentCluster;
227 fNBytesCurrentCluster = 0;
228 return result;
229}
230
233 std::uint32_t length)
234{
235 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
236 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
237 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
238
240 result.fBytesOnStorage = szPageListZip;
241 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
242 return result;
243}
244
246 std::uint32_t length)
247{
248 fWriter->UpdateStreamerInfos(fDescriptorBuilder.BuildStreamerInfos());
249 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
250 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
251 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
252 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
253 fWriter->Commit();
254}
255
256////////////////////////////////////////////////////////////////////////////////
257
259 const RNTupleReadOptions &options)
260 : RPageSource(ntupleName, options),
261 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
262{
263 EnableDefaultMetrics("RPageSourceFile");
264}
265
267 std::unique_ptr<ROOT::Internal::RRawFile> file,
268 const RNTupleReadOptions &options)
269 : RPageSourceFile(ntupleName, options)
270{
271 fFile = std::move(file);
274}
275
276ROOT::Experimental::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
277 const RNTupleReadOptions &options)
278 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
279{
280}
281
282std::unique_ptr<ROOT::Experimental::Internal::RPageSourceFile>
284 const RNTupleReadOptions &options)
285{
286 if (!anchor.fFile)
287 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
288
289 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
290 // For local TFiles, TDavixFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the faster
291 // reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
292 // experiment frameworks.
293 std::string className = anchor.fFile->IsA()->GetName();
294 auto url = anchor.fFile->GetEndpointUrl();
295 auto protocol = std::string(url->GetProtocol());
296 if (className == "TFile") {
297 rawFile = ROOT::Internal::RRawFile::Create(url->GetFile());
298 } else if (className == "TDavixFile" || className == "TNetXNGFile") {
299 rawFile = ROOT::Internal::RRawFile::Create(url->GetUrl());
300 } else {
301 rawFile.reset(new ROOT::Internal::RRawFileTFile(anchor.fFile));
302 }
303
304 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
305 pageSource->fAnchor = anchor;
306 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
307 return pageSource;
308}
309
311
313{
314 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
315 // Otherwise, the page source was created by OpenFromAnchor()
316 if (!fAnchor)
317 fAnchor = fReader.GetNTuple(fNTupleName).Unwrap();
318
319 // TOOD(jblomer): can the epoch check be factored out across anchors?
320 if (fAnchor->GetVersionEpoch() != RNTuple::kVersionEpoch) {
321 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(fAnchor->GetVersionEpoch())));
322 }
323 if (fAnchor->GetVersionEpoch() == 0) {
324 static std::once_flag once;
325 std::call_once(once, [this]() {
326 R__LOG_WARNING(NTupleLog()) << "Pre-release format version: RC " << fAnchor->GetVersionMajor();
327 });
328 }
329
330 fDescriptorBuilder.SetOnDiskHeaderSize(fAnchor->GetNBytesHeader());
331 fDescriptorBuilder.AddToOnDiskFooterSize(fAnchor->GetNBytesFooter());
332
333 // Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
334 const auto bufSize = fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() +
335 std::max(fAnchor->GetLenHeader(), fAnchor->GetLenFooter());
336 fStructureBuffer.fBuffer = std::make_unique<unsigned char[]>(bufSize);
337 fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
338 fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor->GetNBytesHeader();
339
340 auto readvLimits = fFile->GetReadVLimits();
341 // Never try to vectorize reads to a split key
342 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fAnchor->GetMaxKeySize());
343
344 if ((readvLimits.fMaxReqs < 2) ||
345 (std::max(fAnchor->GetNBytesHeader(), fAnchor->GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
346 (fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
347 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
348 fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
349 fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
350 fCounters->fNRead.Add(2);
351 } else {
352 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
353 ROOT::Internal::RRawFile::RIOVec readRequests[2] = {
354 {fStructureBuffer.fPtrHeader, fAnchor->GetSeekHeader(), fAnchor->GetNBytesHeader(), 0},
355 {fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(), fAnchor->GetNBytesFooter(), 0}};
356 fFile->ReadV(readRequests, 2);
357 fCounters->fNReadV.Inc();
358 }
359}
360
362{
363 auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();
364
365 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetLenHeader(),
366 unzipBuf);
367 RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor->GetLenHeader(), fDescriptorBuilder);
368
369 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetLenFooter(),
370 unzipBuf);
371 RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);
372
373 auto desc = fDescriptorBuilder.MoveDescriptor();
374
375 std::vector<unsigned char> buffer;
376 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
377 buffer.resize(
378 std::max<size_t>(buffer.size(), cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().fBytesOnStorage));
379 auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
380 fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage,
381 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
382 RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
383 buffer.data());
384
385 RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
386 }
387
388 // For the page reads, we rely on the I/O scheduler to define the read requests
389 fFile->SetBuffering(false);
390
391 return desc;
392}
393
395 RClusterIndex clusterIndex, RSealedPage &sealedPage)
396{
397 const auto clusterId = clusterIndex.GetClusterId();
398
400 {
401 auto descriptorGuard = GetSharedDescriptorGuard();
402 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
403 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
404 }
405
406 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
407 sealedPage.SetNElements(pageInfo.fNElements);
408 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
409 if (!sealedPage.GetBuffer())
410 return;
412 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
413 pageInfo.fLocator.GetPosition<std::uint64_t>());
414 } else {
415 assert(!pageInfo.fHasChecksum);
416 memcpy(const_cast<void *>(sealedPage.GetBuffer()), RPage::GetPageZeroBuffer(), sealedPage.GetBufferSize());
417 }
418
420}
421
424 const RClusterInfo &clusterInfo,
425 ClusterSize_t::ValueType idxInCluster)
426{
427 const auto columnId = columnHandle.fPhysicalId;
428 const auto clusterId = clusterInfo.fClusterId;
429 const auto pageInfo = clusterInfo.fPageInfo;
430
431 const auto element = columnHandle.fColumn->GetElement();
432 const auto elementSize = element->GetSize();
433
434 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
435 auto pageZero = RPage::MakePageZero(columnId, elementSize);
436 pageZero.GrowUnchecked(pageInfo.fNElements);
437 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
438 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
439 return fPagePool.RegisterPage(std::move(pageZero));
440 }
441
442 RSealedPage sealedPage;
443 sealedPage.SetNElements(pageInfo.fNElements);
444 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
445 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
446 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
447
448 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
449 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[sealedPage.GetBufferSize()]);
450 fReader.ReadBuffer(directReadBuffer.get(), sealedPage.GetBufferSize(),
451 pageInfo.fLocator.GetPosition<std::uint64_t>());
452 fCounters->fNPageRead.Inc();
453 fCounters->fNRead.Inc();
454 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
455 sealedPage.SetBuffer(directReadBuffer.get());
456 } else {
457 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
458 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
459 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
460
461 auto cachedPageRef = fPagePool.GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
462 if (!cachedPageRef.Get().IsNull())
463 return cachedPageRef;
464
465 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
466 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
467 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
468 sealedPage.SetBuffer(onDiskPage->GetAddress());
469 }
470
471 RPage newPage;
472 {
473 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
474 newPage = UnsealPage(sealedPage, *element, columnId).Unwrap();
475 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
476 }
477
478 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
479 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
480 fCounters->fNPageUnsealed.Inc();
481 return fPagePool.RegisterPage(std::move(newPage));
482}
483
484std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
486{
487 auto clone = new RPageSourceFile(fNTupleName, fOptions);
488 clone->fFile = fFile->Clone();
489 clone->fReader = RMiniFileReader(clone->fFile.get());
490 return std::unique_ptr<RPageSourceFile>(clone);
491}
492
493std::unique_ptr<ROOT::Experimental::Internal::RCluster>
495 const RCluster::RKey &clusterKey, std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
496{
497 struct ROnDiskPageLocator {
500 std::uint64_t fOffset = 0;
501 std::uint64_t fSize = 0;
502 std::size_t fBufPos = 0;
503 };
504
505 std::vector<ROnDiskPageLocator> onDiskPages;
506 auto activeSize = 0;
507 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
508 PrepareLoadCluster(
509 clusterKey, *pageZeroMap,
510 [&](DescriptorId_t physicalColumnId, NTupleSize_t pageNo,
512 const auto &pageLocator = pageInfo.fLocator;
513 const auto nBytes = pageLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum;
514 activeSize += nBytes;
515 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
516 });
517
518 // Linearize the page requests by file offset
519 std::sort(onDiskPages.begin(), onDiskPages.end(),
520 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
521
522 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
523 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
524 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
525 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
526 // of extra bytes.
527 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
528 // memory consumption, device block size.
529 float maxOverhead = 0.25 * float(activeSize);
530 std::vector<std::size_t> gaps;
531 if (onDiskPages.size())
532 gaps.reserve(onDiskPages.size() - 1);
533 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
534 std::int64_t gap =
535 static_cast<int64_t>(onDiskPages[i].fOffset) - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset);
536 gaps.emplace_back(std::max(gap, std::int64_t(0)));
537 // If the pages overlap, substract the overlapped bytes from `activeSize`
538 activeSize += std::min(gap, std::int64_t(0));
539 }
540 std::sort(gaps.begin(), gaps.end());
541 std::size_t gapCut = 0;
542 std::size_t currentGap = 0;
543 float szExtra = 0.0;
544 for (auto g : gaps) {
545 if (g != currentGap) {
546 gapCut = currentGap;
547 currentGap = g;
548 }
549 szExtra += g;
550 if (szExtra > maxOverhead)
551 break;
552 }
553
554 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
555 // In a second step, we'll fix-up the memory destinations for the read calls given the
556 // address of the allocated buffer. We must not touch, however, the read requests from previous
557 // calls to PrepareSingleCluster()
558 const auto currentReadRequestIdx = readRequests.size();
559
561 std::size_t szPayload = 0;
562 std::size_t szOverhead = 0;
563 const std::uint64_t maxKeySize = fReader.GetMaxKeySize();
564 for (auto &s : onDiskPages) {
565 R__ASSERT(s.fSize > 0);
566 const std::int64_t readUpTo = req.fOffset + req.fSize;
567 // Note: byte ranges of pages may overlap
568 const std::uint64_t overhead = std::max(static_cast<std::int64_t>(s.fOffset) - readUpTo, std::int64_t(0));
569 const std::uint64_t extent = std::max(static_cast<std::int64_t>(s.fOffset + s.fSize) - readUpTo, std::int64_t(0));
570 szPayload += extent;
571 if (req.fSize + extent < maxKeySize && overhead <= gapCut) {
572 szOverhead += overhead;
573 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + s.fOffset - req.fOffset;
574 req.fSize += extent;
575 continue;
576 }
577
578 // close the current request and open new one
579 if (req.fSize > 0)
580 readRequests.emplace_back(req);
581
582 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
583 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
584
585 req.fOffset = s.fOffset;
586 req.fSize = s.fSize;
587 }
588 readRequests.emplace_back(req);
589 fCounters->fSzReadPayload.Add(szPayload);
590 fCounters->fSzReadOverhead.Add(szOverhead);
591
592 // Register the on disk pages in a page map
593 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
594 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
595 for (const auto &s : onDiskPages) {
596 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
597 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
598 }
599 fCounters->fNPageRead.Add(onDiskPages.size());
600 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
601 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
602 }
603
604 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
605 cluster->Adopt(std::move(pageMap));
606 cluster->Adopt(std::move(pageZeroMap));
607 for (auto colId : clusterKey.fPhysicalColumnSet)
608 cluster->SetColumnAvailable(colId);
609 return cluster;
610}
611
612std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
614{
615 fCounters->fNClusterLoaded.Add(clusterKeys.size());
616
617 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
618 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
619
620 clusters.reserve(clusterKeys.size());
621 for (auto key : clusterKeys) {
622 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
623 }
624
625 auto nReqs = readRequests.size();
626 auto readvLimits = fFile->GetReadVLimits();
627 // We never want to do vectorized reads of split blobs, so we limit our single size to maxKeySize.
628 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fReader.GetMaxKeySize());
629
630 int iReq = 0;
631 while (nReqs > 0) {
632 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
633
634 if (readvLimits.HasSizeLimit()) {
635 std::uint64_t totalSize = 0;
636 for (std::size_t i = 0; i < nBatch; ++i) {
637 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
638 nBatch = i;
639 break;
640 }
641
642 totalSize += readRequests[iReq + i].fSize;
643 if (totalSize > readvLimits.fMaxTotalSize) {
644 nBatch = i;
645 break;
646 }
647 }
648 }
649
650 if (nBatch <= 1) {
651 nBatch = 1;
652 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
653 fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
654 } else {
655 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
656 fFile->ReadV(&readRequests[iReq], nBatch);
657 }
658 fCounters->fNReadV.Inc();
659 fCounters->fNRead.Add(nBatch);
660
661 iReq += nBatch;
662 nReqs -= nBatch;
663 }
664
665 return clusters;
666}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Record wall time and CPU time between construction and destruction.
Managed a set of clusters containing compressed and packed pages.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:356
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:59
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TFile &file, std::uint64_t maxKeySize)
Add a new RNTuple identified by ntupleName to the existing TFile.
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, EContainerFormat containerFormat, const RNTupleWriteOptions &options)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Definition RPagePool.hxx:85
Storage provider that write ntuple pages into a file.
std::uint64_t CommitClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
We pass bytesPacked so that TFile::ls() reports a reasonable value for the compression ratio of the c...
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void CommitBatchOfPages(CommitBatch &batch, std::vector< RNTupleLocator > &locators)
Subroutine of CommitSealedPageVImpl, used to perform a vector write of the (multi-)range of pages con...
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
std::unique_ptr< RNTupleFileWriter > fWriter
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
RNTupleDescriptor AttachImpl() final
LoadStructureImpl() has been called before AttachImpl() is called
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:167
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:122
std::uint32_t GetNElements() const
Definition RPage.hxx:124
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:157
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:61
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:67
TFile * fFile
! The file from which the ntuple was streamed, registered in the custom streamer
Definition RNTuple.hxx:99
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:281
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual const TUrl * GetEndpointUrl() const
Definition TFile.h:235
TClass * IsA() const override
Definition TFile.h:344
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
size_t fBytesPacked
Total uncompressed size of the elements in the page batch.
std::vector< const RSealedPage * > fSealedPages
The list of pages to commit.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Generic information about the physical location of data.
ELocatorType fType
For non-disk locators, the value for the Type field.
std::variant< std::uint64_t, std::string, RNTupleLocatorObject64 > fPosition
Simple on-disk locators consisting of a 64-bit offset use variant type uint64_t; extended locators ha...
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:67
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:63
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:65