Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RLogger.hxx>
20#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleZip.hxx>
23#include <ROOT/RPage.hxx>
25#include <ROOT/RPagePool.hxx>
27#include <ROOT/RRawFile.hxx>
29#include <ROOT/RNTupleUtil.hxx>
30
31#include <RVersion.h>
32#include <TDirectory.h>
33#include <TError.h>
34
35#include <algorithm>
36#include <cstdio>
37#include <cstdlib>
38#include <cstring>
39#include <iterator>
40#include <limits>
41#include <utility>
42
43#include <functional>
44#include <mutex>
45
47 const RNTupleWriteOptions &options)
49{
50 fCompressor = std::make_unique<RNTupleCompressor>();
51 EnableDefaultMetrics("RPageSinkFile");
53}
54
61
68
70
72{
73 auto zipBuffer = std::make_unique<unsigned char[]>(length);
74 auto szZipHeader = fCompressor->Zip(serializedHeader, length, GetWriteOptions().GetCompression(),
76 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
77}
78
81 std::size_t bytesPacked)
82{
83 std::uint64_t offsetData;
84 {
85 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
86 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), bytesPacked);
87 }
88
90 result.fPosition = offsetData;
91 result.fBytesOnStorage = sealedPage.GetDataSize();
92 fCounters->fNPageCommitted.Inc();
93 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
94 fNBytesCurrentCluster += sealedPage.GetBufferSize();
95 return result;
96}
97
100{
101 auto element = columnHandle.fColumn->GetElement();
103 {
104 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
105 sealedPage = SealPage(page, *element);
106 }
107
108 fCounters->fSzZip.Add(page.GetNBytes());
109 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
110}
111
115{
116 const auto nBits = fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetBitsOnStorage();
117 const auto bytesPacked = (nBits * sealedPage.GetNElements() + 7) / 8;
118 return WriteSealedPage(sealedPage, bytesPacked);
119}
120
122 std::vector<RNTupleLocator> &locators)
123{
124 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
125
126 std::uint64_t offset = fWriter->ReserveBlob(batch.fSize, batch.fBytesPacked);
127
128 locators.reserve(locators.size() + batch.fSealedPages.size());
129
130 for (const auto *pagePtr : batch.fSealedPages) {
131 fWriter->WriteIntoReservedBlob(pagePtr->GetBuffer(), pagePtr->GetBufferSize(), offset);
133 locator.fPosition = offset;
134 locator.fBytesOnStorage = pagePtr->GetDataSize();
135 locators.push_back(locator);
136 offset += pagePtr->GetBufferSize();
137 }
138
139 fCounters->fNPageCommitted.Add(batch.fSealedPages.size());
140 fCounters->fSzWritePayload.Add(batch.fSize);
141 fNBytesCurrentCluster += batch.fSize;
142
143 batch.fSize = 0;
144 batch.fBytesPacked = 0;
145 batch.fSealedPages.clear();
146}
147
148std::vector<ROOT::Experimental::RNTupleLocator>
149ROOT::Experimental::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
150 const std::vector<bool> &mask)
151{
152 const std::uint64_t maxKeySize = fOptions->GetMaxKeySize();
153
155 std::vector<ROOT::Experimental::RNTupleLocator> locators;
156
157 std::size_t iPage = 0;
158 for (auto rangeIt = ranges.begin(); rangeIt != ranges.end(); ++rangeIt) {
159 auto &range = *rangeIt;
160 if (range.fFirst == range.fLast) {
161 // Skip empty ranges, they might not have a physical column ID!
162 continue;
163 }
164
165 const auto bitsOnStorage =
166 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetBitsOnStorage();
167
168 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt, ++iPage) {
169 if (!mask[iPage])
170 continue;
171
172 const auto bytesPacked = (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
173
174 if (batch.fSize > 0 && batch.fSize + sealedPageIt->GetBufferSize() > maxKeySize) {
175 /**
176 * Adding this page would exceed maxKeySize. Since we always want to write into a single key
177 * with vectorized writes, we commit the current set of pages before proceeding.
178 * NOTE: we do this *before* checking if sealedPageIt->GetBufferSize() > maxKeySize to guarantee that
179 * we always flush the current batch before doing an individual WriteBlob. This way we
180 * preserve the assumption that a CommitBatch always contain a sequential set of pages.
181 */
182 CommitBatchOfPages(batch, locators);
183 }
184
185 if (sealedPageIt->GetBufferSize() > maxKeySize) {
186 // This page alone is bigger than maxKeySize: save it by itself, since it will need to be
187 // split into multiple keys.
188
189 // Since this check implies the previous check on batchSize + newSize > maxSize, we should
190 // already have committed the current batch before writing this page.
191 assert(batch.fSize == 0);
192
193 std::uint64_t offset =
194 fWriter->WriteBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetBufferSize(), bytesPacked);
196 locator.fPosition = offset;
197 locator.fBytesOnStorage = sealedPageIt->GetDataSize();
198 locators.push_back(locator);
199
200 fCounters->fNPageCommitted.Inc();
201 fCounters->fSzWritePayload.Add(sealedPageIt->GetBufferSize());
202 fNBytesCurrentCluster += sealedPageIt->GetBufferSize();
203
204 } else {
205 batch.fSealedPages.emplace_back(&(*sealedPageIt));
206 batch.fSize += sealedPageIt->GetBufferSize();
207 batch.fBytesPacked += bytesPacked;
208 }
209 }
210 }
211
212 if (batch.fSize > 0) {
213 CommitBatchOfPages(batch, locators);
214 }
215
216 return locators;
217}
218
220{
221 auto result = fNBytesCurrentCluster;
222 fNBytesCurrentCluster = 0;
223 return result;
224}
225
228 std::uint32_t length)
229{
230 auto bufPageListZip = std::make_unique<unsigned char[]>(length);
231 auto szPageListZip = fCompressor->Zip(serializedPageList, length, GetWriteOptions().GetCompression(),
233
235 result.fBytesOnStorage = szPageListZip;
236 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length);
237 return result;
238}
239
241 std::uint32_t length)
242{
243 fWriter->UpdateStreamerInfos(fDescriptorBuilder.BuildStreamerInfos());
244 auto bufFooterZip = std::make_unique<unsigned char[]>(length);
245 auto szFooterZip = fCompressor->Zip(serializedFooter, length, GetWriteOptions().GetCompression(),
247 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
248 fWriter->Commit();
249}
250
251////////////////////////////////////////////////////////////////////////////////
252
254 const RNTupleReadOptions &options)
255 : RPageSource(ntupleName, options),
256 fClusterPool(std::make_unique<RClusterPool>(*this, options.GetClusterBunchSize()))
257{
258 EnableDefaultMetrics("RPageSourceFile");
259}
260
262 std::unique_ptr<ROOT::Internal::RRawFile> file,
263 const RNTupleReadOptions &options)
264 : RPageSourceFile(ntupleName, options)
265{
266 fFile = std::move(file);
269}
270
272 const RNTupleReadOptions &options)
273 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
274{
275}
276
277std::unique_ptr<ROOT::Experimental::Internal::RPageSourceFile>
279 const RNTupleReadOptions &options)
280{
281 if (!anchor.fFile)
282 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
283
284 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
285 // For local TFiles, TDavixFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the faster
286 // reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
287 // experiment frameworks.
288 std::string className = anchor.fFile->IsA()->GetName();
289 auto url = anchor.fFile->GetEndpointUrl();
290 auto protocol = std::string(url->GetProtocol());
291 if (className == "TFile") {
293 } else if (className == "TDavixFile" || className == "TNetXNGFile") {
295 } else {
297 }
298
299 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
300 pageSource->fAnchor = anchor;
301 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
302 return pageSource;
303}
304
306
308{
309 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
310 // Otherwise, the page source was created by OpenFromAnchor()
311 if (!fAnchor) {
312 fAnchor = fReader.GetNTuple(fNTupleName).Unwrap();
313 }
314 fReader.SetMaxKeySize(fAnchor->GetMaxKeySize());
315
316 // TOOD(jblomer): can the epoch check be factored out across anchors?
317 if (fAnchor->GetVersionEpoch() != RNTuple::kVersionEpoch) {
318 throw RException(R__FAIL("unsupported RNTuple epoch version: " + std::to_string(fAnchor->GetVersionEpoch())));
319 }
320
321 fDescriptorBuilder.SetOnDiskHeaderSize(fAnchor->GetNBytesHeader());
322 fDescriptorBuilder.AddToOnDiskFooterSize(fAnchor->GetNBytesFooter());
323
324 // Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
325 const auto bufSize = fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() +
326 std::max(fAnchor->GetLenHeader(), fAnchor->GetLenFooter());
327 fStructureBuffer.fBuffer = std::make_unique<unsigned char[]>(bufSize);
328 fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
329 fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor->GetNBytesHeader();
330
331 auto readvLimits = fFile->GetReadVLimits();
332 // Never try to vectorize reads to a split key
333 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fAnchor->GetMaxKeySize());
334
335 if ((readvLimits.fMaxReqs < 2) ||
336 (std::max(fAnchor->GetNBytesHeader(), fAnchor->GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
337 (fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
338 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
339 fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
340 fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
341 fCounters->fNRead.Add(2);
342 } else {
343 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
345 {fStructureBuffer.fPtrHeader, fAnchor->GetSeekHeader(), fAnchor->GetNBytesHeader(), 0},
346 {fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(), fAnchor->GetNBytesFooter(), 0}};
347 fFile->ReadV(readRequests, 2);
348 fCounters->fNReadV.Inc();
349 }
350}
351
353{
354 auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();
355
356 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetLenHeader(),
357 unzipBuf);
358 RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor->GetLenHeader(), fDescriptorBuilder);
359
360 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetLenFooter(),
361 unzipBuf);
362 RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);
363
364 auto desc = fDescriptorBuilder.MoveDescriptor();
365
366 std::vector<unsigned char> buffer;
367 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
368 buffer.resize(
369 std::max<size_t>(buffer.size(), cgDesc.GetPageListLength() + cgDesc.GetPageListLocator().fBytesOnStorage));
370 auto *zipBuffer = buffer.data() + cgDesc.GetPageListLength();
371 fReader.ReadBuffer(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage,
372 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
373 RNTupleDecompressor::Unzip(zipBuffer, cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
374 buffer.data());
375
376 RNTupleSerializer::DeserializePageList(buffer.data(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc);
377 }
378
379 // For the page reads, we rely on the I/O scheduler to define the read requests
380 fFile->SetBuffering(false);
381
382 return desc;
383}
384
387{
388 const auto clusterId = clusterIndex.GetClusterId();
389
391 {
392 auto descriptorGuard = GetSharedDescriptorGuard();
393 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
394 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.GetIndex());
395 }
396
397 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
398 sealedPage.SetNElements(pageInfo.fNElements);
399 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
400 if (!sealedPage.GetBuffer())
401 return;
402 if (pageInfo.fLocator.fType != RNTupleLocator::kTypePageZero) {
403 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
404 pageInfo.fLocator.GetPosition<std::uint64_t>());
405 } else {
406 assert(!pageInfo.fHasChecksum);
407 memcpy(const_cast<void *>(sealedPage.GetBuffer()), RPage::GetPageZeroBuffer(), sealedPage.GetBufferSize());
408 }
409
410 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
411}
412
417{
418 const auto columnId = columnHandle.fPhysicalId;
419 const auto clusterId = clusterInfo.fClusterId;
420 const auto pageInfo = clusterInfo.fPageInfo;
421
422 const auto element = columnHandle.fColumn->GetElement();
423 const auto elementSize = element->GetSize();
424 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
425
426 if (pageInfo.fLocator.fType == RNTupleLocator::kTypePageZero) {
428 pageZero.GrowUnchecked(pageInfo.fNElements);
429 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
431 return fPagePool.RegisterPage(std::move(pageZero), elementInMemoryType);
432 }
433
435 sealedPage.SetNElements(pageInfo.fNElements);
436 sealedPage.SetHasChecksum(pageInfo.fHasChecksum);
437 sealedPage.SetBufferSize(pageInfo.fLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum);
438 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
439
440 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
441 directReadBuffer = std::unique_ptr<unsigned char[]>(new unsigned char[sealedPage.GetBufferSize()]);
442 {
443 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
444 fReader.ReadBuffer(directReadBuffer.get(), sealedPage.GetBufferSize(),
445 pageInfo.fLocator.GetPosition<std::uint64_t>());
446 }
447 fCounters->fNPageRead.Inc();
448 fCounters->fNRead.Inc();
449 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
450 sealedPage.SetBuffer(directReadBuffer.get());
451 } else {
452 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
453 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
454 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
455
457 if (!cachedPageRef.Get().IsNull())
458 return cachedPageRef;
459
460 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
461 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
462 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
463 sealedPage.SetBuffer(onDiskPage->GetAddress());
464 }
465
467 {
468 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
469 newPage = UnsealPage(sealedPage, *element, columnId).Unwrap();
470 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
471 }
472
473 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.fFirstInPage,
475 fCounters->fNPageUnsealed.Inc();
476 return fPagePool.RegisterPage(std::move(newPage), elementInMemoryType);
477}
478
479std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
481{
482 auto clone = new RPageSourceFile(fNTupleName, fOptions);
483 clone->fFile = fFile->Clone();
484 clone->fReader = RMiniFileReader(clone->fFile.get());
485 return std::unique_ptr<RPageSourceFile>(clone);
486}
487
488std::unique_ptr<ROOT::Experimental::Internal::RCluster>
490 const RCluster::RKey &clusterKey, std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
491{
492 struct ROnDiskPageLocator {
495 std::uint64_t fOffset = 0;
496 std::uint64_t fSize = 0;
497 std::size_t fBufPos = 0;
498 };
499
500 std::vector<ROnDiskPageLocator> onDiskPages;
501 auto activeSize = 0;
502 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
503 PrepareLoadCluster(clusterKey, *pageZeroMap,
506 const auto &pageLocator = pageInfo.fLocator;
508 throw RException(R__FAIL("tried to read a page with an unknown locator"));
509 const auto nBytes = pageLocator.fBytesOnStorage + pageInfo.fHasChecksum * kNBytesPageChecksum;
511 onDiskPages.push_back(
512 {physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
513 });
514
515 // Linearize the page requests by file offset
516 std::sort(onDiskPages.begin(), onDiskPages.end(),
517 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
518
519 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
520 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
521 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
522 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
523 // of extra bytes.
524 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
525 // memory consumption, device block size.
526 float maxOverhead = 0.25 * float(activeSize);
527 std::vector<std::size_t> gaps;
528 if (onDiskPages.size())
529 gaps.reserve(onDiskPages.size() - 1);
530 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
531 std::int64_t gap =
532 static_cast<int64_t>(onDiskPages[i].fOffset) - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset);
533 gaps.emplace_back(std::max(gap, std::int64_t(0)));
534 // If the pages overlap, substract the overlapped bytes from `activeSize`
535 activeSize += std::min(gap, std::int64_t(0));
536 }
537 std::sort(gaps.begin(), gaps.end());
538 std::size_t gapCut = 0;
539 std::size_t currentGap = 0;
540 float szExtra = 0.0;
541 for (auto g : gaps) {
542 if (g != currentGap) {
544 currentGap = g;
545 }
546 szExtra += g;
547 if (szExtra > maxOverhead)
548 break;
549 }
550
551 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
552 // In a second step, we'll fix-up the memory destinations for the read calls given the
553 // address of the allocated buffer. We must not touch, however, the read requests from previous
554 // calls to PrepareSingleCluster()
555 const auto currentReadRequestIdx = readRequests.size();
556
558 // To simplify the first loop iteration, pretend an empty request starting at the first page's fOffset.
559 if (!onDiskPages.empty())
560 req.fOffset = onDiskPages[0].fOffset;
561 std::size_t szPayload = 0;
562 std::size_t szOverhead = 0;
563 const std::uint64_t maxKeySize = fReader.GetMaxKeySize();
564 for (auto &s : onDiskPages) {
565 R__ASSERT(s.fSize > 0);
566 const std::int64_t readUpTo = req.fOffset + req.fSize;
567 // Note: byte ranges of pages may overlap
568 const std::uint64_t overhead = std::max(static_cast<std::int64_t>(s.fOffset) - readUpTo, std::int64_t(0));
569 const std::uint64_t extent = std::max(static_cast<std::int64_t>(s.fOffset + s.fSize) - readUpTo, std::int64_t(0));
570 if (req.fSize + extent < maxKeySize && overhead <= gapCut) {
573 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + s.fOffset - req.fOffset;
574 req.fSize += extent;
575 continue;
576 }
577
578 // close the current request and open new one
579 if (req.fSize > 0)
580 readRequests.emplace_back(req);
581
582 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
583 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
584
585 szPayload += s.fSize;
586 req.fOffset = s.fOffset;
587 req.fSize = s.fSize;
588 }
589 readRequests.emplace_back(req);
590 fCounters->fSzReadPayload.Add(szPayload);
591 fCounters->fSzReadOverhead.Add(szOverhead);
592
593 // Register the on disk pages in a page map
594 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
595 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
596 for (const auto &s : onDiskPages) {
597 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
598 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
599 }
600 fCounters->fNPageRead.Add(onDiskPages.size());
601 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
602 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
603 }
604
605 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
606 cluster->Adopt(std::move(pageMap));
607 cluster->Adopt(std::move(pageZeroMap));
608 for (auto colId : clusterKey.fPhysicalColumnSet)
609 cluster->SetColumnAvailable(colId);
610 return cluster;
611}
612
613std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
615{
616 fCounters->fNClusterLoaded.Add(clusterKeys.size());
617
618 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>> clusters;
619 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
620
621 clusters.reserve(clusterKeys.size());
622 for (auto key : clusterKeys) {
623 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
624 }
625
626 auto nReqs = readRequests.size();
627 auto readvLimits = fFile->GetReadVLimits();
628 // We never want to do vectorized reads of split blobs, so we limit our single size to maxKeySize.
629 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fReader.GetMaxKeySize());
630
631 int iReq = 0;
632 while (nReqs > 0) {
633 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
634
635 if (readvLimits.HasSizeLimit()) {
636 std::uint64_t totalSize = 0;
637 for (std::size_t i = 0; i < nBatch; ++i) {
638 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
639 nBatch = i;
640 break;
641 }
642
643 totalSize += readRequests[iReq + i].fSize;
644 if (totalSize > readvLimits.fMaxTotalSize) {
645 nBatch = i;
646 break;
647 }
648 }
649 }
650
651 if (nBatch <= 1) {
652 nBatch = 1;
653 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
654 fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
655 } else {
656 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
657 fFile->ReadV(&readRequests[iReq], nBatch);
658 }
659 fCounters->fNReadV.Inc();
660 fCounters->fNRead.Add(nBatch);
661
662 iReq += nBatch;
663 nReqs -= nBatch;
664 }
665
666 return clusters;
667}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Managed a set of clusters containing compressed and packed pages.
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:58
static Writer_t MakeMemCopyWriter(unsigned char *dest)
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, EContainerFormat containerFormat, const RNTupleWriteOptions &options)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TDirectory &fileOrDirectory, std::uint64_t maxKeySize)
The directory parameter can also be a TFile object (TFile inherits from TDirectory).
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:42
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Definition RPagePool.hxx:93
Storage provider that write ntuple pages into a file.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
We pass bytesPacked so that TFile::ls() reports a reasonable value for the compression ratio of the c...
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void CommitBatchOfPages(CommitBatch &batch, std::vector< RNTupleLocator > &locators)
Subroutine of CommitSealedPageVImpl, used to perform a vector write of the (multi-)range of pages con...
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
std::unique_ptr< RNTupleFileWriter > fWriter
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) override
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const RNTupleReadOptions &options=RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
RNTupleDescriptor AttachImpl() final
LoadStructureImpl() has been called before AttachImpl() is called
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:55
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
Definition RPage.hxx:174
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:25
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:69
static constexpr std::uint16_t kVersionEpoch
Definition RNTuple.hxx:79
const_iterator begin() const
const_iterator end() const
Describe directory structure in memory.
Definition TDirectory.h:45
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:52
Summarizes cluster-level information that are necessary to load a certain page.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
We do not need to store the element size / uncompressed page size because we know to which column the...
Generic information about the physical location of data.
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:67
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:63
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:65