Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RLogger.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleZip.hxx>
23#include <ROOT/RPage.hxx>
25#include <ROOT/RPagePool.hxx>
27#include <ROOT/RRawFile.hxx>
29#include <ROOT/RNTupleUtil.hxx>
30
31#include <RVersion.h>
32#include <TDirectory.h>
33#include <TError.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <iterator>
40#include <limits>
41#include <utility>
42
43#include <functional>
44#include <mutex>
45
47 const RNTupleWriteOptions &options)
48 : RPagePersistentSink(ntupleName, options)
49{
50 fCompressor = std::make_unique<RNTupleCompressor>();
51 EnableDefaultMetrics("RPageSinkFile");
53}
54
55ROOT::Experimental::Internal::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
56 const RNTupleWriteOptions &options)
57 : RPageSinkFile(ntupleName, options)
58{
60}
61
62ROOT::Experimental::Internal::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, TDirectory &fileOrDirectory,
63 const RNTupleWriteOptions &options)
64 : RPageSinkFile(ntupleName, options)
65{
66 fWriter = RNTupleFileWriter::Append(ntupleName, fileOrDirectory, options.GetMaxKeySize());
67}
68
70
71void ROOT::Experimental::Internal::RPageSinkFile::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
72{
73 auto zipBuffer = MakeUninitArray<unsigned char>(length);
74 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
76 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
77}
78
81 std::size_t bytesPacked)
82{
83 std::uint64_t offsetData;
84 {
85 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
86 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), bytesPacked);
87 }
88
90 result.fPosition = offsetData;
91 result.fBytesOnStorage = sealedPage.GetDataSize();
92 fCounters->fNPageCommitted.Inc();
93 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
94 fNBytesCurrentCluster += sealedPage.GetBufferSize();
95 return result;
96}
97
100{
101 auto element = columnHandle.fColumn->GetElement();
102 RPageStorage::RSealedPage sealedPage;
103 {
104 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
105 sealedPage = SealPage(page, *element);
106 }
107
108 fCounters->fSzZip.Add(page.GetNBytes());
109 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
110}
111
114 const RPageStorage::RSealedPage &sealedPage)
115{
116 const auto nBits = fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetBitsOnStorage();
117 const auto bytesPacked = (nBits * sealedPage.GetNElements() + 7) / 8;
118 return WriteSealedPage(sealedPage, bytesPacked);
119}
120
122 std::vector<RNTupleLocator> &locators)
123{
124 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
125
126 std::uint64_t offset = fWriter->ReserveBlob(batch.fSize, batch.fBytesPacked);
127
128 locators.reserve(locators.size() + batch.fSealedPages.size());
129
130 for (const auto *pagePtr : batch.fSealedPages) {
131 fWriter->WriteIntoReservedBlob(pagePtr->GetBuffer(), pagePtr->GetBufferSize(), offset);
132 RNTupleLocator locator;
133 locator.fPosition = offset;
134 locator.fBytesOnStorage = pagePtr->GetDataSize();
135 locators.push_back(locator);
136 offset += pagePtr->GetBufferSize();
137 }
138
139 fCounters->fNPageCommitted.Add(batch.fSealedPages.size());
140 fCounters->fSzWritePayload.Add(batch.fSize);
141 fNBytesCurrentCluster += batch.fSize;
142
143 batch.fSize = 0;
144 batch.fBytesPacked = 0;
145 batch.fSealedPages.clear();
146}
147
148std::vector<ROOT::Experimental::RNTupleLocator>
149ROOT::Experimental::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
150 const std::vector<bool> &mask)
151{
152 const std::uint64_t maxKeySize = fOptions->GetMaxKeySize();
153
154 CommitBatch batch{};
155 std::vector<ROOT::Experimental::RNTupleLocator> locators;
156
157 std::size_t iPage = 0;
158 for (auto rangeIt = ranges.begin(); rangeIt != ranges.end(); ++rangeIt) {
159 auto &range = *rangeIt;
160 if (range.fFirst == range.fLast) {
161 // Skip empty ranges, they might not have a physical column ID!
162 continue;
163 }
164
165 const auto bitsOnStorage =
166 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetBitsOnStorage();
167
168 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt, ++iPage) {
169 if (!mask[iPage])
170 continue;
171
172 const auto bytesPacked = (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
173
174 if (batch.fSize > 0 && batch.fSize + sealedPageIt->GetBufferSize() > maxKeySize) {
175 /**
176 * Adding this page would exceed maxKeySize. Since we always want to write into a single key
177 * with vectorized writes, we commit the current set of pages before proceeding.
178 * NOTE: we do this *before* checking if sealedPageIt->GetBufferSize() > maxKeySize to guarantee that
179 * we always flush the current batch before doing an individual WriteBlob. This way we
180 * preserve the assumption that a CommitBatch always contain a sequential set of pages.
181 */
182 CommitBatchOfPages(batch, locators);
183 }
184
185 if (sealedPageIt->GetBufferSize() > maxKeySize) {
186 // This page alone is bigger than maxKeySize: save it by itself, since it will need to be
187 // split into multiple keys.
188
189 // Since this check implies the previous check on batchSize + newSize > maxSize, we should
190 // already have committed the current batch before writing this page.
191 assert(batch.fSize == 0);
192
193 std::uint64_t offset =
194 fWriter->WriteBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetBufferSize(), bytesPacked);
195 RNTupleLocator locator;
196 locator.fPosition = offset;
197 locator.fBytesOnStorage = sealedPageIt->GetDataSize();
198 locators.push_back(locator);
199
200 fCounters->fNPageCommitted.Inc();
201 fCounters->fSzWritePayload.Add(sealedPageIt->GetBufferSize());
202 fNBytesCurrentCluster += sealedPageIt->GetBufferSize();
203
204 } else {
205 batch.fSealedPages.emplace_back(&(*sealedPageIt));
206 batch.fSize += sealedPageIt->GetBufferSize();
207 batch.fBytesPacked += bytesPacked;
208 }
209 }
210 }
211
212 if (batch.fSize > 0) {
213 CommitBatchOfPages(batch, locators);
214 }
215
216 return locators;
217}
218
220{
221 auto result = fNBytesCurrentCluster;
222 fNBytesCurrentCluster = 0;
223 return result;
224}
225
228 std::uint32_t length)
229{
230 auto bufPageListZip = MakeUninitArray<unsigned char>(length);
231 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
232 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
233
235 result.fBytesOnStorage = szPageListZip;
236 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
237 return result;
238}
239
241 std::uint32_t length)
242{
243 fWriter->UpdateStreamerInfos(fDescriptorBuilder.BuildStreamerInfos());
244 auto bufFooterZip = MakeUninitArray<unsigned char>(length);
245 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
246 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
247 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
248 fWriter->Commit();
249}
250
251////////////////////////////////////////////////////////////////////////////////
252
254 const RNTupleReadOptions &options)
255 : RPageSource(ntupleName, options),
256 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
257{
258 EnableDefaultMetrics("RPageSourceFile");
259}
260
262 std::unique_ptr<ROOT::Internal::RRawFile> file,
263 const RNTupleReadOptions &options)
264 : RPageSourceFile(ntupleName, options)
265{
266 fFile = std::move(file);
269}
270
271ROOT::Experimental::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
272 const RNTupleReadOptions &options)
273 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
274{
275}
276
277std::unique_ptr<ROOT::Experimental::Internal::RPageSourceFile>
279 const RNTupleReadOptions &options)
280{
281 if (!anchor.fFile)
282 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
283
284 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
285 // For local TFiles, TDavixFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the faster
286 // reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
287 // experiment frameworks.
288 std::string className = anchor.fFile->IsA()->GetName();
289 auto url = anchor.fFile->GetEndpointUrl();
290 auto protocol = std::string(url->GetProtocol());
291 if (className == "TFile") {
292 rawFile = ROOT::Internal::RRawFile::Create(url->GetFile());
293 } else if (className == "TDavixFile" || className == "TNetXNGFile") {
294 rawFile = ROOT::Internal::RRawFile::Create(url->GetUrl());
295 } else {
296 rawFile.reset(new ROOT::Internal::RRawFileTFile(anchor.fFile));
297 }
298
299 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
300 pageSource->fAnchor = anchor;
301 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
302 return pageSource;
303}
304
306
308{
309 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
310 // Otherwise, the page source was created by OpenFromAnchor()
311 if (!fAnchor) {
312 fAnchor = fReader.GetNTuple(fNTupleName).Unwrap();
313 }
314 fReader.SetMaxKeySize(fAnchor->GetMaxKeySize());
315
316 // TOOD(jblomer): can the epoch check be factored out across anchors?
317 if (fAnchor->GetVersionEpoch() != RNTuple::kVersionEpoch) {
318 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(fAnchor->GetVersionEpoch())));
319 }
320
321 fDescriptorBuilder.SetOnDiskHeaderSize(fAnchor->GetNBytesHeader());
322 fDescriptorBuilder.AddToOnDiskFooterSize(fAnchor->GetNBytesFooter());
323
324 // Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
325 const auto bufSize = fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() +
326 std::max(fAnchor->GetLenHeader(), fAnchor->GetLenFooter());
327 fStructureBuffer.fBuffer = MakeUninitArray<unsigned char>(bufSize);
328 fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
329 fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor->GetNBytesHeader();
330
331 auto readvLimits = fFile->GetReadVLimits();
332 // Never try to vectorize reads to a split key
333 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fAnchor->GetMaxKeySize());
334
335 if ((readvLimits.fMaxReqs < 2) ||
336 (std::max(fAnchor->GetNBytesHeader(), fAnchor->GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
337 (fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
338 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
339 fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
340 fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
341 fCounters->fNRead.Add(2);
342 } else {
343 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
344 ROOT::Internal::RRawFile::RIOVec readRequests[2] = {
345 {fStructureBuffer.fPtrHeader, fAnchor->GetSeekHeader(), fAnchor->GetNBytesHeader(), 0},
346 {fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(), fAnchor->GetNBytesFooter(), 0}};
347 fFile->ReadV(readRequests, 2);
348 fCounters->fNReadV.Inc();
349 }
350}
351
353{
354 auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();
355
356 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetLenHeader(),
357 unzipBuf);
358 RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor->GetLenHeader(), fDescriptorBuilder);
359
360 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetLenFooter(),
361 unzipBuf);
362 RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);
363
364 auto desc = fDescriptorBuilder.MoveDescriptor();
365
366 std::vector<unsigned char> buffer;
367 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
368 buffer.resize(
369 std::max<size_t>(buffer.size(), cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().fBytesOnStorage));
370 auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
371 fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage,
372 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
373 RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
374 buffer.data());
375
376 RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
377 }
378
379 // For the page reads, we rely on the I/O scheduler to define the read requests
380 fFile->SetBuffering(false);
381
382 return desc;
383}
384
386 RClusterIndex clusterIndex, RSealedPage &sealedPage)
387{
388 const auto clusterId = clusterIndex.GetClusterId();
389
391 {
392 auto descriptorGuard = GetSharedDescriptorGuard();
393 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
394 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
395 }
396
397 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
398 sealedPage.SetNElements(pageInfo.fNElements);
399 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
400 if (!sealedPage.GetBuffer())
401 return;
403 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
404 pageInfo.fLocator.GetPosition<std::uint64_t>());
405 } else {
406 assert(!pageInfo.fHasChecksum);
407 memcpy(const_cast<void *>(sealedPage.GetBuffer()), RPage::GetPageZeroBuffer(), sealedPage.GetBufferSize());
408 }
409
411}
412
415 const RClusterInfo &clusterInfo,
416 ClusterSize_t::ValueType idxInCluster)
417{
418 const auto columnId = columnHandle.fPhysicalId;
419 const auto clusterId = clusterInfo.fClusterId;
420 const auto pageInfo = clusterInfo.fPageInfo;
421
422 const auto element = columnHandle.fColumn->GetElement();
423 const auto elementSize = element->GetSize();
424 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
425
426 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
427 auto pageZero = fPageAllocator->NewPage(elementSize, pageInfo.fNElements);
428 pageZero.GrowUnchecked(pageInfo.fNElements);
429 memset(pageZero.GetBuffer(), 0, pageZero.GetNBytes());
430 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
431 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
432 return fPagePool.RegisterPage(std::move(pageZero), RPagePool::RKey{columnId, elementInMemoryType});
433 }
434
435 RSealedPage sealedPage;
436 sealedPage.SetNElements(pageInfo.fNElements);
437 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
438 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
439 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
440
441 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
442 directReadBuffer = MakeUninitArray<unsigned char>(sealedPage.GetBufferSize());
443 {
444 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
445 fReader.ReadBuffer(directReadBuffer.get(), sealedPage.GetBufferSize(),
446 pageInfo.fLocator.GetPosition<std::uint64_t>());
447 }
448 fCounters->fNPageRead.Inc();
449 fCounters->fNRead.Inc();
450 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
451 sealedPage.SetBuffer(directReadBuffer.get());
452 } else {
453 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
454 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
455 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
456
457 auto cachedPageRef =
458 fPagePool.GetPage(RPagePool::RKey{columnId, elementInMemoryType}, RClusterIndex(clusterId, idxInCluster));
459 if (!cachedPageRef.Get().IsNull())
460 return cachedPageRef;
461
462 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
463 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
464 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
465 sealedPage.SetBuffer(onDiskPage->GetAddress());
466 }
467
468 RPage newPage;
469 {
470 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
471 newPage = UnsealPage(sealedPage, *element).Unwrap();
472 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
473 }
474
475 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
476 RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
477 fCounters->fNPageUnsealed.Inc();
478 return fPagePool.RegisterPage(std::move(newPage), RPagePool::RKey{columnId, elementInMemoryType});
479}
480
481std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
483{
484 auto clone = new RPageSourceFile(fNTupleName, fOptions);
485 clone->fFile = fFile->Clone();
486 clone->fReader = RMiniFileReader(clone->fFile.get());
487 return std::unique_ptr<RPageSourceFile>(clone);
488}
489
490std::unique_ptr<ROOT::Experimental::Internal::RCluster>
492 const RCluster::RKey &clusterKey, std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
493{
494 struct ROnDiskPageLocator {
497 std::uint64_t fOffset = 0;
498 std::uint64_t fSize = 0;
499 std::size_t fBufPos = 0;
500 };
501
502 std::vector<ROnDiskPageLocator> onDiskPages;
503 auto activeSize = 0;
504 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
505 PrepareLoadCluster(clusterKey, *pageZeroMap,
506 [&](DescriptorId_t physicalColumnId, NTupleSize_t pageNo,
508 const auto &pageLocator = pageInfo.fLocator;
509 if (pageLocator.fType == RNTupleLocator::kTypeUnknown)
510 throw RException(R__FAIL("tried to read a page with an unknown locator"));
511 const auto nBytes = pageLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum;
512 activeSize += nBytes;
513 onDiskPages.push_back(
514 {physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
515 });
516
517 // Linearize the page requests by file offset
518 std::sort(onDiskPages.begin(), onDiskPages.end(),
519 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
520
521 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
522 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
523 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
524 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
525 // of extra bytes.
526 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
527 // memory consumption, device block size.
528 float maxOverhead = 0.25 * float(activeSize);
529 std::vector<std::size_t> gaps;
530 if (onDiskPages.size())
531 gaps.reserve(onDiskPages.size() - 1);
532 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
533 std::int64_t gap =
534 static_cast<int64_t>(onDiskPages[i].fOffset) - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset);
535 gaps.emplace_back(std::max(gap, std::int64_t(0)));
536 // If the pages overlap, substract the overlapped bytes from `activeSize`
537 activeSize += std::min(gap, std::int64_t(0));
538 }
539 std::sort(gaps.begin(), gaps.end());
540 std::size_t gapCut = 0;
541 std::size_t currentGap = 0;
542 float szExtra = 0.0;
543 for (auto g : gaps) {
544 if (g != currentGap) {
545 gapCut = currentGap;
546 currentGap = g;
547 }
548 szExtra += g;
549 if (szExtra > maxOverhead)
550 break;
551 }
552
553 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
554 // In a second step, we'll fix-up the memory destinations for the read calls given the
555 // address of the allocated buffer. We must not touch, however, the read requests from previous
556 // calls to PrepareSingleCluster()
557 const auto currentReadRequestIdx = readRequests.size();
558
560 // To simplify the first loop iteration, pretend an empty request starting at the first page's fOffset.
561 if (!onDiskPages.empty())
562 req.fOffset = onDiskPages[0].fOffset;
563 std::size_t szPayload = 0;
564 std::size_t szOverhead = 0;
565 const std::uint64_t maxKeySize = fReader.GetMaxKeySize();
566 for (auto &s : onDiskPages) {
567 R__ASSERT(s.fSize > 0);
568 const std::int64_t readUpTo = req.fOffset + req.fSize;
569 // Note: byte ranges of pages may overlap
570 const std::uint64_t overhead = std::max(static_cast<std::int64_t>(s.fOffset) - readUpTo, std::int64_t(0));
571 const std::uint64_t extent = std::max(static_cast<std::int64_t>(s.fOffset + s.fSize) - readUpTo, std::int64_t(0));
572 if (req.fSize + extent < maxKeySize && overhead <= gapCut) {
573 szPayload += (extent - overhead);
574 szOverhead += overhead;
575 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + s.fOffset - req.fOffset;
576 req.fSize += extent;
577 continue;
578 }
579
580 // close the current request and open new one
581 if (req.fSize > 0)
582 readRequests.emplace_back(req);
583
584 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
585 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
586
587 szPayload += s.fSize;
588 req.fOffset = s.fOffset;
589 req.fSize = s.fSize;
590 }
591 readRequests.emplace_back(req);
592 fCounters->fSzReadPayload.Add(szPayload);
593 fCounters->fSzReadOverhead.Add(szOverhead);
594
595 // Register the on disk pages in a page map
596 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
597 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
598 for (const auto &s : onDiskPages) {
599 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
600 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
601 }
602 fCounters->fNPageRead.Add(onDiskPages.size());
603 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
604 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
605 }
606
607 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
608 cluster->Adopt(std::move(pageMap));
609 cluster->Adopt(std::move(pageZeroMap));
610 for (auto colId : clusterKey.fPhysicalColumnSet)
611 cluster->SetColumnAvailable(colId);
612 return cluster;
613}
614
615std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
617{
618 fCounters->fNClusterLoaded.Add(clusterKeys.size());
619
620 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
621 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
622
623 clusters.reserve(clusterKeys.size());
624 for (auto key : clusterKeys) {
625 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
626 }
627
628 auto nReqs = readRequests.size();
629 auto readvLimits = fFile->GetReadVLimits();
630 // We never want to do vectorized reads of split blobs, so we limit our single size to maxKeySize.
631 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fReader.GetMaxKeySize());
632
633 int iReq = 0;
634 while (nReqs > 0) {
635 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
636
637 if (readvLimits.HasSizeLimit()) {
638 std::uint64_t totalSize = 0;
639 for (std::size_t i = 0; i < nBatch; ++i) {
640 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
641 nBatch = i;
642 break;
643 }
644
645 totalSize += readRequests[iReq + i].fSize;
646 if (totalSize > readvLimits.fMaxTotalSize) {
647 nBatch = i;
648 break;
649 }
650 }
651 }
652
653 if (nBatch <= 1) {
654 nBatch = 1;
655 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
656 fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
657 } else {
658 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
659 fFile->ReadV(&readRequests[iReq], nBatch);
660 }
661 fCounters->fNReadV.Inc();
662 fCounters->fNRead.Add(nBatch);
663
664 iReq += nBatch;
665 nReqs -= nBatch;
666 }
667
668 return clusters;
669}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Record wall time and CPU time between construction and destruction.
Managed a set of clusters containing compressed and packed pages.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:337
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:58
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, EContainerFormat containerFormat, const RNTupleWriteOptions &options)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TDirectory &fileOrDirectory, std::uint64_t maxKeySize)
The directory parameter can also be a TFile object (TFile inherits from TDirectory).
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Storage provider that write ntuple pages into a file.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
We pass bytesPacked so that TFile::ls() reports a reasonable value for the compression ratio of the c...
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void CommitBatchOfPages(CommitBatch &batch, std::vector< RNTupleLocator > &locators)
Subroutine of CommitSealedPageVImpl, used to perform a vector write of the (multi-)range of pages con...
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
std::unique_ptr< RNTupleFileWriter > fWriter
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) override
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
RNTupleDescriptor AttachImpl() final
LoadStructureImpl() has been called before AttachImpl() is called
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:56
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:47
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:115
std::uint32_t GetNElements() const
Definition RPage.hxx:124
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:162
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:79
TFile * fFile
! The file from which the ntuple was streamed, registered in the custom streamer
Definition RNTuple.hxx:111
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
Describe directory structure in memory.
Definition TDirectory.h:45
virtual const TUrl * GetEndpointUrl() const
Definition TFile.h:235
TClass * IsA() const override
Definition TFile.h:344
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
size_t fBytesPacked
Total uncompressed size of the elements in the page batch.
std::vector< const RSealedPage * > fSealedPages
The list of pages to commit.
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
std::uint32_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
bool fHasChecksum
If true, the 8 bytes following the serialized page are an xxhash of the on-disk page data.
RNTupleLocator fLocator
The meaning of fLocator depends on the storage backend.
Generic information about the physical location of data.
ELocatorType fType
For non-disk locators, the value for the Type field.
std::variant< std::uint64_t, RNTupleLocatorObject64 > fPosition
Simple on-disk locators consisting of a 64-bit offset use variant type uint64_t; extended locators ha...
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:67
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:63
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:65