Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2019-11-25
4
5/*************************************************************************
6 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
13#include <ROOT/RCluster.hxx>
14#include <ROOT/RLogger.hxx>
16#include <ROOT/RNTupleModel.hxx>
18#include <ROOT/RNTupleZip.hxx>
19#include <ROOT/RPage.hxx>
21#include <ROOT/RPagePool.hxx>
23#include <ROOT/RRawFile.hxx>
25#include <ROOT/RNTupleTypes.hxx>
26#include <ROOT/RNTupleUtils.hxx>
27
28#include <RVersion.h>
29#include <TDirectory.h>
30#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <cstring>
37#include <iterator>
38#include <limits>
39#include <utility>
40
41#include <functional>
42#include <mutex>
43
55
62
69
76
83
84ROOT::Internal::RPageSinkFile::RPageSinkFile(std::unique_ptr<ROOT::Internal::RNTupleFileWriter> writer,
85 const ROOT::RNTupleWriteOptions &options)
86 : RPageSinkFile(writer->GetNTupleName(), options)
87{
88 fWriter = std::move(writer);
89}
90
92
94{
96 auto szZipHeader =
97 RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get());
98 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
99}
100
103{
105
106 auto fnAddStreamerInfo = [this](const ROOT::RFieldBase *field) {
107 const TClass *cl = nullptr;
108 if (auto classField = dynamic_cast<const RClassField *>(field)) {
109 cl = classField->GetClass();
110 } else if (auto streamerField = dynamic_cast<const RStreamerField *>(field)) {
111 cl = streamerField->GetClass();
112 } else if (auto soaField = dynamic_cast<const ROOT::Experimental::RSoAField *>(field)) {
113 cl = soaField->GetSoAClass();
114 }
115 if (!cl)
116 return;
117
118 auto streamerInfo = cl->GetStreamerInfo(field->GetTypeVersion());
119 if (!streamerInfo) {
120 throw RException(R__FAIL(std::string("cannot get streamerInfo for ") + cl->GetName() + " [" +
121 std::to_string(field->GetTypeVersion()) + "]"));
122 }
123 fInfosOfClassFields[streamerInfo->GetNumber()] = streamerInfo;
124 };
125
126 for (const auto field : changeset.fAddedFields) {
128 for (const auto &subField : *field) {
130 }
131 }
132}
133
136{
137 std::uint64_t offsetData;
138 {
139 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
140 offsetData = fWriter->WriteBlob(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), bytesPacked);
141 }
142
144 result.SetPosition(offsetData);
145 result.SetNBytesOnStorage(sealedPage.GetDataSize());
146 fCounters->fNPageCommitted.Inc();
147 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
148 fNBytesCurrentCluster += sealedPage.GetBufferSize();
149 return result;
150}
151
154{
155 auto element = columnHandle.fColumn->GetElement();
157 {
158 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
159 sealedPage = SealPage(page, *element);
160 }
161
162 fCounters->fSzZip.Add(page.GetNBytes());
163 return WriteSealedPage(sealedPage, element->GetPackedSize(page.GetNElements()));
164}
165
168{
169 const auto nBits = fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(physicalColumnId).GetBitsOnStorage();
170 const auto bytesPacked = (nBits * sealedPage.GetNElements() + 7) / 8;
171 return WriteSealedPage(sealedPage, bytesPacked);
172}
173
175{
176 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
177
178 std::uint64_t offset = fWriter->ReserveBlob(batch.fSize, batch.fBytesPacked);
179
180 locators.reserve(locators.size() + batch.fSealedPages.size());
181
182 for (const auto *pagePtr : batch.fSealedPages) {
183 fWriter->WriteIntoReservedBlob(pagePtr->GetBuffer(), pagePtr->GetBufferSize(), offset);
185 locator.SetPosition(offset);
186 locator.SetNBytesOnStorage(pagePtr->GetDataSize());
187 locators.push_back(locator);
188 offset += pagePtr->GetBufferSize();
189 }
190
191 fCounters->fNPageCommitted.Add(batch.fSealedPages.size());
192 fCounters->fSzWritePayload.Add(batch.fSize);
193 fNBytesCurrentCluster += batch.fSize;
194
195 batch.fSize = 0;
196 batch.fBytesPacked = 0;
197 batch.fSealedPages.clear();
198}
199
200std::vector<ROOT::RNTupleLocator>
201ROOT::Internal::RPageSinkFile::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
202 const std::vector<bool> &mask)
203{
204 const std::uint64_t maxKeySize = fOptions->GetMaxKeySize();
205
207 std::vector<RNTupleLocator> locators;
208
209 std::size_t iPage = 0;
210 for (auto rangeIt = ranges.begin(); rangeIt != ranges.end(); ++rangeIt) {
211 auto &range = *rangeIt;
212 if (range.fFirst == range.fLast) {
213 // Skip empty ranges, they might not have a physical column ID!
214 continue;
215 }
216
217 const auto bitsOnStorage =
218 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(range.fPhysicalColumnId).GetBitsOnStorage();
219
220 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt, ++iPage) {
221 if (!mask[iPage])
222 continue;
223
224 const auto bytesPacked = (bitsOnStorage * sealedPageIt->GetNElements() + 7) / 8;
225
226 if (batch.fSize > 0 && batch.fSize + sealedPageIt->GetBufferSize() > maxKeySize) {
227 /**
228 * Adding this page would exceed maxKeySize. Since we always want to write into a single key
229 * with vectorized writes, we commit the current set of pages before proceeding.
230 * NOTE: we do this *before* checking if sealedPageIt->GetBufferSize() > maxKeySize to guarantee that
231 * we always flush the current batch before doing an individual WriteBlob. This way we
232 * preserve the assumption that a CommitBatch always contain a sequential set of pages.
233 */
234 CommitBatchOfPages(batch, locators);
235 }
236
237 if (sealedPageIt->GetBufferSize() > maxKeySize) {
238 // This page alone is bigger than maxKeySize: save it by itself, since it will need to be
239 // split into multiple keys.
240
241 // Since this check implies the previous check on batchSize + newSize > maxSize, we should
242 // already have committed the current batch before writing this page.
243 assert(batch.fSize == 0);
244
245 std::uint64_t offset =
246 fWriter->WriteBlob(sealedPageIt->GetBuffer(), sealedPageIt->GetBufferSize(), bytesPacked);
248 locator.SetPosition(offset);
249 locator.SetNBytesOnStorage(sealedPageIt->GetDataSize());
250 locators.push_back(locator);
251
252 fCounters->fNPageCommitted.Inc();
253 fCounters->fSzWritePayload.Add(sealedPageIt->GetBufferSize());
254 fNBytesCurrentCluster += sealedPageIt->GetBufferSize();
255
256 } else {
257 batch.fSealedPages.emplace_back(&(*sealedPageIt));
258 batch.fSize += sealedPageIt->GetBufferSize();
259 batch.fBytesPacked += bytesPacked;
260 }
261 }
262 }
263
264 if (batch.fSize > 0) {
265 CommitBatchOfPages(batch, locators);
266 }
267
268 return locators;
269}
270
272{
273 auto result = fNBytesCurrentCluster;
274 fNBytesCurrentCluster = 0;
275 return result;
276}
277
280{
282 auto szPageListZip =
283 RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get());
284
286 result.SetNBytesOnStorage(szPageListZip);
287 result.SetPosition(fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, length));
288 return result;
289}
290
293{
294 // Add the streamer info records from streamer fields: because of runtime polymorphism we may need to add additional
295 // types not covered by the type names of the class fields
296 for (const auto &extraTypeInfo : fDescriptorBuilder.GetDescriptor().GetExtraTypeInfoIterable()) {
298 continue;
299 // Ideally, we would avoid deserializing the streamer info records of the streamer fields that we just serialized.
300 // However, this happens only once at the end of writing and only when streamer fields are used, so the
301 // preference here is for code simplicity.
302 fInfosOfClassFields.merge(RNTupleSerializer::DeserializeStreamerInfos(extraTypeInfo.GetContent()).Unwrap());
303 }
304 fWriter->UpdateStreamerInfos(fInfosOfClassFields);
305
307 auto szFooterZip =
308 RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get());
309 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
310 return fWriter->Commit(GetWriteOptions().GetCompression());
311}
312
313std::unique_ptr<ROOT::Internal::RPageSink>
315{
316 auto writer = fWriter->CloneAsHidden(name);
317 auto cloned = std::unique_ptr<RPageSinkFile>(new RPageSinkFile(std::move(writer), opts));
318 return cloned;
319}
320
321////////////////////////////////////////////////////////////////////////////////
322
325{
326 EnableDefaultMetrics("RPageSourceFile");
327 fFileCounters = std::make_unique<RFileCounters>(RFileCounters{
328 *fMetrics.MakeCounter<RNTupleAtomicCounter *>("szSkip", "B",
329 "cumulative seek distance (excluding header/footer reads)"),
331 "szFile", "B", "total file size", fMetrics,
332 [this](const RNTupleMetrics &) -> std::pair<bool, double> {
333 if (fFileSize > 0)
334 return {true, static_cast<double>(fFileSize)};
335 return {false, -1.};
336 }),
338 "randomness", "",
339 "ratio of seek distance to bytes read (excluding file structure reads)", fMetrics,
340 [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
341 if (const auto szSkip = metrics.GetLocalCounter("szSkip")) {
342 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
343 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
344 auto totalRead = szReadPayload->GetValueAsInt() + szReadOverhead->GetValueAsInt();
345 if (totalRead > 0) {
346 return {true, (1. * szSkip->GetValueAsInt()) / totalRead};
347 }
348 }
349 }
350 }
351 return {false, -1.};
352 }),
354 "sparseness", "",
355 "ratio of bytes read to total file size (excluding file structure reads)", fMetrics,
356 [this](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
357 if (fFileSize > 0) {
358 if (const auto szReadPayload = metrics.GetLocalCounter("szReadPayload")) {
359 if (const auto szReadOverhead = metrics.GetLocalCounter("szReadOverhead")) {
360 auto totalRead = szReadPayload->GetValueAsInt() + szReadOverhead->GetValueAsInt();
361 return {true, (1. * totalRead) / fFileSize};
362 }
363 }
364 }
365 return {false, -1.};
366 })});
367}
368
370 std::unique_ptr<ROOT::Internal::RRawFile> file,
371 const ROOT::RNTupleReadOptions &options)
372 : RPageSourceFile(ntupleName, options)
373{
374 fFile = std::move(file);
377}
378
379ROOT::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
380 const ROOT::RNTupleReadOptions &options)
381 : RPageSourceFile(ntupleName, ROOT::Internal::RRawFile::Create(path), options)
382{
383}
384
385std::unique_ptr<ROOT::Internal::RPageSourceFile>
387{
388 if (!anchor.fFile)
389 throw RException(R__FAIL("This RNTuple object was not streamed from a ROOT file (TFile or descendant)"));
390
391 std::unique_ptr<ROOT::Internal::RRawFile> rawFile;
392 // For local TFiles, TDavixFile, TCurlFile, and TNetXNGFile, we want to open a new RRawFile to take advantage of the
393 // faster reading. We check the exact class name to avoid classes inheriting in ROOT (for example TMemFile) or in
394 // experiment frameworks.
395 const std::string className = anchor.fFile->IsA()->GetName();
396 const auto url = anchor.fFile->GetEndpointUrl();
397 if (className == "TFile") {
399 } else if (className == "TDavixFile" || className == "TCurlFile" || className == "TNetXNGFile") {
401 } else {
403 }
404
405 auto pageSource = std::make_unique<RPageSourceFile>("", std::move(rawFile), options);
406 pageSource->fAnchor = anchor;
407 // NOTE: fNTupleName gets set only upon Attach().
408 return pageSource;
409}
410
412{
413 StopClusterPoolBackgroundThread();
414}
415
416std::unique_ptr<ROOT::Internal::RPageSource>
418 const ROOT::RNTupleReadOptions &options)
419{
420 assert(anchorLink.fLocator.GetType() == RNTupleLocator::kTypeFile);
421
422 const auto anchorPos = anchorLink.fLocator.GetPosition<std::uint64_t>();
423 auto anchor =
424 fReader.GetNTupleProperAtOffset(anchorPos, anchorLink.fLocator.GetNBytesOnStorage(), anchorLink.fLength).Unwrap();
425 auto pageSource = std::make_unique<RPageSourceFile>("", fFile->Clone(), options);
426 pageSource->fAnchor = anchor;
427 // NOTE: fNTupleName gets set only upon Attach().
428 return pageSource;
429}
430
432{
433 // If we constructed the page source with (ntuple name, path), we need to find the anchor first.
434 // Otherwise, the page source was created by OpenFromAnchor()
435 if (!fAnchor) {
436 fAnchor = fReader.GetNTuple(fNTupleName).Unwrap();
437 }
438 fReader.SetMaxKeySize(fAnchor->GetMaxKeySize());
439
440 fDescriptorBuilder.SetVersion(fAnchor->GetVersionEpoch(), fAnchor->GetVersionMajor(), fAnchor->GetVersionMinor(),
441 fAnchor->GetVersionPatch());
442 fDescriptorBuilder.SetOnDiskHeaderSize(fAnchor->GetNBytesHeader());
443 fDescriptorBuilder.AddToOnDiskFooterSize(fAnchor->GetNBytesFooter());
444
445 // Reserve enough space for the compressed and the uncompressed header/footer (see AttachImpl)
446 const auto bufSize = fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() +
447 std::max(fAnchor->GetLenHeader(), fAnchor->GetLenFooter());
448 fStructureBuffer.fBuffer = MakeUninitArray<unsigned char>(bufSize);
449 fStructureBuffer.fPtrHeader = fStructureBuffer.fBuffer.get();
450 fStructureBuffer.fPtrFooter = fStructureBuffer.fBuffer.get() + fAnchor->GetNBytesHeader();
451
452 auto readvLimits = fFile->GetReadVLimits();
453 // Never try to vectorize reads to a split key
454 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fAnchor->GetMaxKeySize());
455
456 if ((readvLimits.fMaxReqs < 2) ||
457 (std::max(fAnchor->GetNBytesHeader(), fAnchor->GetNBytesFooter()) > readvLimits.fMaxSingleSize) ||
458 (fAnchor->GetNBytesHeader() + fAnchor->GetNBytesFooter() > readvLimits.fMaxTotalSize)) {
459 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
460 fReader.ReadBuffer(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetSeekHeader());
461 fReader.ReadBuffer(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetSeekFooter());
462 fCounters->fNRead.Add(2);
463 } else {
464 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
465 R__ASSERT(fAnchor->GetNBytesHeader() < std::numeric_limits<std::size_t>::max());
466 R__ASSERT(fAnchor->GetNBytesFooter() < std::numeric_limits<std::size_t>::max());
467 ROOT::Internal::RRawFile::RIOVec readRequests[2] = {{fStructureBuffer.fPtrHeader, fAnchor->GetSeekHeader(),
468 static_cast<std::size_t>(fAnchor->GetNBytesHeader()), 0},
469 {fStructureBuffer.fPtrFooter, fAnchor->GetSeekFooter(),
470 static_cast<std::size_t>(fAnchor->GetNBytesFooter()), 0}};
471 fFile->ReadV(readRequests, 2);
472 fCounters->fNReadV.Inc();
473 }
474}
475
477{
478 auto unzipBuf = reinterpret_cast<unsigned char *>(fStructureBuffer.fPtrFooter) + fAnchor->GetNBytesFooter();
479
480 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrHeader, fAnchor->GetNBytesHeader(), fAnchor->GetLenHeader(),
481 unzipBuf);
482 RNTupleSerializer::DeserializeHeader(unzipBuf, fAnchor->GetLenHeader(), fDescriptorBuilder);
483
484 RNTupleDecompressor::Unzip(fStructureBuffer.fPtrFooter, fAnchor->GetNBytesFooter(), fAnchor->GetLenFooter(),
485 unzipBuf);
486 RNTupleSerializer::DeserializeFooter(unzipBuf, fAnchor->GetLenFooter(), fDescriptorBuilder);
487
488 // fNTupleName is empty if and only if we created this source via CreateFromAnchor. If that's the case, this is the
489 // earliest we can set the name.
490 if (fNTupleName.empty())
491 fNTupleName = fDescriptorBuilder.GetDescriptor().GetName();
492
493 // For the page reads, we rely on the I/O scheduler to define the read requests
494 fFile->SetBuffering(false);
495
496 // Set file size once after buffering is turned off
497 fFileSize = fFile->GetSize();
498
499 return fDescriptorBuilder.MoveDescriptor();
500}
501
503{
504 fReader.ReadBuffer(buffer, locator.GetNBytesOnStorage(), locator.GetPosition<std::uint64_t>());
505}
506
508{
509 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
510 const auto offset = locator.GetPosition<std::uint64_t>();
511 // Track seek distance (excluding file structure reads)
512 if (fLastOffset != 0) {
513 R__ASSERT(fFileCounters);
514 const auto distance = static_cast<std::uint64_t>(
515 std::abs(static_cast<std::int64_t>(offset) - static_cast<std::int64_t>(fLastOffset)));
516 fFileCounters->fSzSkip.Add(distance);
517 }
518 fReader.ReadBuffer(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(),
519 locator.GetPosition<std::uint64_t>());
520 fLastOffset = offset + sealedPage.GetBufferSize();
521}
522
523std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Internal::RPageSourceFile::CloneImpl() const
524{
525 auto clone = new RPageSourceFile(fNTupleName, fOptions);
526 clone->fFile = fFile->Clone();
527 clone->fReader = ROOT::Internal::RMiniFileReader(clone->fFile.get());
528 return std::unique_ptr<RPageSourceFile>(clone);
529}
530
531std::unique_ptr<ROOT::Internal::RCluster>
533 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
534{
535 struct ROnDiskPageLocator {
536 ROOT::DescriptorId_t fColumnId = 0;
537 ROOT::NTupleSize_t fPageNo = 0;
538 std::uint64_t fOffset = 0;
539 std::uint64_t fSize = 0;
540 std::size_t fBufPos = 0;
541 };
542
543 std::vector<ROnDiskPageLocator> onDiskPages;
544 auto activeSize = 0;
545 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
546 PrepareLoadCluster(
550 const auto &pageLocator = pageInfo.GetLocator();
552 throw RException(R__FAIL("tried to read a page with an unknown locator"));
553 const auto nBytes = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
555 onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
556 });
557
558 // Linearize the page requests by file offset
559 std::sort(onDiskPages.begin(), onDiskPages.end(),
560 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) { return a.fOffset < b.fOffset; });
561
562 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
563 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
564 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
565 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
566 // of extra bytes.
567 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
568 // memory consumption, device block size.
569 float maxOverhead = 0.25 * float(activeSize);
570 std::vector<std::size_t> gaps;
571 if (onDiskPages.size())
572 gaps.reserve(onDiskPages.size() - 1);
573 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
574 std::int64_t gap =
575 static_cast<int64_t>(onDiskPages[i].fOffset) - (onDiskPages[i - 1].fSize + onDiskPages[i - 1].fOffset);
576 gaps.emplace_back(std::max(gap, std::int64_t(0)));
577 // If the pages overlap, substract the overlapped bytes from `activeSize`
578 activeSize += std::min(gap, std::int64_t(0));
579 }
580 std::sort(gaps.begin(), gaps.end());
581 std::size_t gapCut = 0;
582 std::size_t currentGap = 0;
583 float szExtra = 0.0;
584 for (auto g : gaps) {
585 if (g != currentGap) {
587 currentGap = g;
588 }
589 szExtra += g;
590 if (szExtra > maxOverhead)
591 break;
592 }
593
594 // In a first step, we coalesce the read requests and calculate the cluster buffer size.
595 // In a second step, we'll fix-up the memory destinations for the read calls given the
596 // address of the allocated buffer. We must not touch, however, the read requests from previous
597 // calls to PrepareSingleCluster()
598 const auto currentReadRequestIdx = readRequests.size();
599
601 // To simplify the first loop iteration, pretend an empty request starting at the first page's fOffset.
602 if (!onDiskPages.empty())
603 req.fOffset = onDiskPages[0].fOffset;
604 std::size_t szPayload = 0;
605 std::size_t szOverhead = 0;
606 const std::uint64_t maxKeySize = fReader.GetMaxKeySize();
607 for (auto &s : onDiskPages) {
608 R__ASSERT(s.fSize > 0);
609 const std::int64_t readUpTo = req.fOffset + req.fSize;
610 // Note: byte ranges of pages may overlap
611 const std::uint64_t overhead = std::max(static_cast<std::int64_t>(s.fOffset) - readUpTo, std::int64_t(0));
612 const std::uint64_t extent = std::max(static_cast<std::int64_t>(s.fOffset + s.fSize) - readUpTo, std::int64_t(0));
613 if (req.fSize + extent < maxKeySize && overhead <= gapCut) {
616 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + s.fOffset - req.fOffset;
617 req.fSize += extent;
618 continue;
619 }
620
621 // close the current request and open new one
622 if (req.fSize > 0)
623 readRequests.emplace_back(req);
624
625 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
626 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
627
628 szPayload += s.fSize;
629 req.fOffset = s.fOffset;
630 req.fSize = s.fSize;
631 }
632 readRequests.emplace_back(req);
633 fCounters->fSzReadPayload.Add(szPayload);
634 fCounters->fSzReadOverhead.Add(szOverhead);
635
636 // Register the on disk pages in a page map
637 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
638 auto pageMap = std::make_unique<ROOT::Internal::ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(buffer));
639 for (const auto &s : onDiskPages) {
640 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
641 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
642 }
643 fCounters->fNPageRead.Add(onDiskPages.size());
644 for (auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
645 readRequests[i].fBuffer = buffer + reinterpret_cast<intptr_t>(readRequests[i].fBuffer);
646 }
647
648 auto cluster = std::make_unique<RCluster>(clusterKey.fClusterId);
649 cluster->Adopt(std::move(pageMap));
650 cluster->Adopt(std::move(pageZeroMap));
651 for (auto colId : clusterKey.fPhysicalColumnSet)
652 cluster->SetColumnAvailable(colId);
653 return cluster;
654}
655
656std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
658{
659 fCounters->fNClusterLoaded.Add(clusterKeys.size());
660
661 std::vector<std::unique_ptr<ROOT::Internal::RCluster>> clusters;
662 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
663
664 clusters.reserve(clusterKeys.size());
665 for (const auto &key : clusterKeys) {
666 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
667 }
668
669 auto nReqs = readRequests.size();
670 auto readvLimits = fFile->GetReadVLimits();
671 // We never want to do vectorized reads of split blobs, so we limit our single size to maxKeySize.
672 readvLimits.fMaxSingleSize = std::min<size_t>(readvLimits.fMaxSingleSize, fReader.GetMaxKeySize());
673
674 int iReq = 0;
675 while (nReqs > 0) {
676 auto nBatch = std::min(nReqs, readvLimits.fMaxReqs);
677
678 if (readvLimits.HasSizeLimit()) {
679 std::uint64_t totalSize = 0;
680 for (std::size_t i = 0; i < nBatch; ++i) {
681 if (readRequests[iReq + i].fSize > readvLimits.fMaxSingleSize) {
682 nBatch = i;
683 break;
684 }
685
686 totalSize += readRequests[iReq + i].fSize;
687 if (totalSize > readvLimits.fMaxTotalSize) {
688 nBatch = i;
689 break;
690 }
691 }
692 }
693
694 // Track seek distance for each read request (excluding file structure reads)
695 R__ASSERT(fFileCounters);
696 for (std::size_t i = 0; i < nBatch; ++i) {
697 const auto offset = readRequests[iReq + i].fOffset;
698 if (fLastOffset != 0) {
699 const auto distance = static_cast<std::uint64_t>(std::abs(
700 static_cast<std::int64_t>(offset) - static_cast<std::int64_t>(fLastOffset)));
701 fFileCounters->fSzSkip.Add(distance);
702 }
703 fLastOffset = offset + readRequests[iReq + i].fSize;
704 }
705
706 if (nBatch <= 1) {
707 nBatch = 1;
708 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
709 fReader.ReadBuffer(readRequests[iReq].fBuffer, readRequests[iReq].fSize, readRequests[iReq].fOffset);
710 } else {
711 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
712 fFile->ReadV(&readRequests[iReq], nBatch);
713 }
714 fCounters->fNReadV.Inc();
715 fCounters->fNRead.Add(nBatch);
716
717 iReq += nBatch;
718 nReqs -= nBatch;
719 }
720
721 return clusters;
722}
723
725{
726 fReader.LoadStreamerInfo();
727}
fBuffer
dim_t fSize
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:322
#define b(i)
Definition RSha256.hxx:100
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
char name[80]
Definition TGX11.cxx:148
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
An interface to read from, or write to, a ROOT file, as well as performing other common operations.
Definition RFile.hxx:252
The SoA field provides I/O for an in-memory SoA layout linked to an on-disk collection of the underly...
Definition RFieldSoA.hxx:55
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
Definition RMiniFile.hxx:60
Helper class to compress data blocks in the ROOT compression frame format.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
Helper class to uncompress data blocks in the ROOT compression frame format.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
Write RNTuple data blocks in a TFile or a bare file container.
static std::unique_ptr< RNTupleFileWriter > Append(std::string_view ntupleName, TDirectory &fileOrDirectory, std::uint64_t maxKeySize, bool isHidden)
The directory parameter can also be a TFile object (TFile inherits from TDirectory).
static std::unique_ptr< RNTupleFileWriter > Recreate(std::string_view ntupleName, std::string_view path, EContainerFormat containerFormat, const ROOT::RNTupleWriteOptions &options)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:98
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:40
Base class for a sink with a physical storage backend.
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) override
Incorporate incremental changes to the model into the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Storage provider that write ntuple pages into a file.
void CommitBatchOfPages(CommitBatch &batch, std::vector< RNTupleLocator > &locators)
Subroutine of CommitSealedPageVImpl, used to perform a vector write of the (multi-)range of pages con...
RPageSinkFile(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
std::unique_ptr< RPageSink > CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const override
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) override
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
We pass bytesPacked so that TFile::ls() reports a reasonable value for the compression ratio of the c...
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
RNTupleLink CommitDatasetImpl() final
std::unique_ptr< ROOT::Internal::RNTupleFileWriter > fWriter
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
Storage provider that reads ntuple pages from a file.
ROOT::RNTupleDescriptor AttachImpl() final
LoadStructureImpl() has been called before AttachImpl() is called
std::int64_t fFileSize
Total file size, set once in AttachImpl()
std::unique_ptr< ROOT::Internal::RCluster > PrepareSingleCluster(const ROOT::Internal::RCluster::RKey &clusterKey, std::vector< RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > OpenWithDifferentAnchor(const ROOT::Internal::RNTupleLink &anchorLink, const ROOT::RNTupleReadOptions &options={}) final
Creates a new PageSource using the same underlying file as this but referring to a different RNTuple,...
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options=ROOT::RNTupleReadOptions())
Used from the RNTuple class to build a datasource if the anchor is already available.
void LoadPageListImpl(const RNTupleLocator &locator, unsigned char *buffer) final
std::vector< std::unique_ptr< ROOT::Internal::RCluster > > LoadClusters(std::span< ROOT::Internal::RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RFileCounters > fFileCounters
void LoadSealedPageImpl(const RNTupleLocator &locator, RSealedPage &sealedPage) final
RPageSourceFile(std::string_view ntupleName, const ROOT::RNTupleReadOptions &options)
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void LoadStructureImpl() final
Fills fStructureBuffer with the compressed header and footer.
void LoadStreamerInfo() final
Forces the loading of ROOT StreamerInfo from the underlying file.
std::unique_ptr< RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
ROOT::Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
The RRawFileTFile wraps an open TFile, but does not take ownership.
The RRawFile provides read-only access to local and remote files.
Definition RRawFile.hxx:43
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:64
The field for a class with dictionary.
Definition RField.hxx:135
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
Generic information about the physical location of data.
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
std::uint64_t GetMaxKeySize() const
Representation of an RNTuple data set in a ROOT file.
Definition RNTuple.hxx:67
const_iterator begin() const
const_iterator end() const
The field for a class using ROOT standard streaming.
Definition RField.hxx:234
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:84
TVirtualStreamerInfo * GetStreamerInfo(Int_t version=0, Bool_t isTransient=kFALSE) const
returns a pointer to the TVirtualStreamerInfo object for version If the object does not exist,...
Definition TClass.cxx:4657
Describe directory structure in memory.
Definition TDirectory.h:45
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:151
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:50
File-specific I/O performance counters.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:61
Information about a single page in the context of a cluster's page range.