Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageFile.cxx
Go to the documentation of this file.
1/// \file RPageStorageFile.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2019-11-25
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleZip.hxx>
23#include <ROOT/RPage.hxx>
25#include <ROOT/RPagePool.hxx>
27#include <ROOT/RRawFile.hxx>
28
29#include <RVersion.h>
30#include <TError.h>
31
32#include <algorithm>
33#include <cstdio>
34#include <cstdlib>
35#include <iostream>
36#include <utility>
37
38#include <atomic>
39#include <condition_variable>
40#include <functional>
41#include <mutex>
42#include <thread>
43#include <queue>
44
45ROOT::Experimental::Detail::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
46 const RNTupleWriteOptions &options)
47 : RPageSink(ntupleName, options)
48 , fMetrics("RPageSinkRoot")
49 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
50{
51 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. " <<
52 "Do not store real data with this version of RNTuple!";
53
54 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Recreate(
55 ntupleName, path, options.GetCompression(), options.GetContainerFormat()));
56}
57
58
60 const RNTupleWriteOptions &options)
61 : RPageSink(ntupleName, options)
62 , fMetrics("RPageSinkRoot")
63 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
64{
65 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. " <<
66 "Do not store real data with this version of RNTuple!";
67
68 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(Internal::RNTupleFileWriter::Append(ntupleName, file));
69}
70
71
72ROOT::Experimental::Detail::RPageSinkFile::RPageSinkFile(std::string_view ntupleName, std::string_view path,
73 const RNTupleWriteOptions &options, std::unique_ptr<TFile> &file)
74 : RPageSink(ntupleName, options)
75 , fMetrics("RPageSinkRoot")
76 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
77{
78 R__LOG_WARNING(NTupleLog()) << "The RNTuple file format will change. " <<
79 "Do not store real data with this version of RNTuple!";
80 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
82}
83
84
86{
87}
88
89
91{
92 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
93 auto szHeader = descriptor.GetHeaderSize();
94 auto buffer = std::make_unique<unsigned char[]>(szHeader);
95 descriptor.SerializeHeader(buffer.get());
96
97 auto zipBuffer = std::make_unique<unsigned char[]>(szHeader);
98 auto szZipHeader = fCompressor(buffer.get(), szHeader, fOptions.GetCompression(),
99 [&zipBuffer](const void *b, size_t n, size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
100 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, szHeader);
101}
102
103
106{
107 unsigned char *buffer = reinterpret_cast<unsigned char *>(page.GetBuffer());
108 bool isAdoptedBuffer = true;
109 auto packedBytes = page.GetSize();
110 auto element = columnHandle.fColumn->GetElement();
111 const auto isMappable = element->IsMappable();
112
113 if (!isMappable) {
114 packedBytes = (page.GetNElements() * element->GetBitsOnStorage() + 7) / 8;
115 buffer = new unsigned char[packedBytes];
116 isAdoptedBuffer = false;
117 element->Pack(buffer, page.GetBuffer(), page.GetNElements());
118 }
119 auto zippedBytes = packedBytes;
120
121 if (fOptions.GetCompression() != 0) {
122 zippedBytes = fCompressor(buffer, packedBytes, fOptions.GetCompression());
123 if (!isAdoptedBuffer)
124 delete[] buffer;
125 buffer = const_cast<unsigned char *>(reinterpret_cast<const unsigned char *>(fCompressor.GetZipBuffer()));
126 isAdoptedBuffer = true;
127 }
128
129 auto offsetData = fWriter->WriteBlob(buffer, zippedBytes, packedBytes);
130 fClusterMinOffset = std::min(offsetData, fClusterMinOffset);
131 fClusterMaxOffset = std::max(offsetData + zippedBytes, fClusterMaxOffset);
132
133 if (!isAdoptedBuffer)
134 delete[] buffer;
135
137 result.fPosition = offsetData;
138 result.fBytesOnStorage = zippedBytes;
139 return result;
140}
141
142
145{
147 result.fPosition = fClusterMinOffset;
148 result.fBytesOnStorage = fClusterMaxOffset - fClusterMinOffset;
149 fClusterMinOffset = std::uint64_t(-1);
150 fClusterMaxOffset = 0;
151 return result;
152}
153
154
156{
157 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
158 auto szFooter = descriptor.GetFooterSize();
159 auto buffer = std::make_unique<unsigned char []>(szFooter);
160 descriptor.SerializeFooter(buffer.get());
161
162 auto zipBuffer = std::make_unique<unsigned char []>(szFooter);
163 auto szZipFooter = fCompressor(buffer.get(), szFooter, fOptions.GetCompression(),
164 [&zipBuffer](const void *b, size_t n, size_t o){ memcpy(zipBuffer.get() + o, b, n); } );
165 fWriter->WriteNTupleFooter(zipBuffer.get(), szZipFooter, szFooter);
166 fWriter->Commit();
167}
168
169
172{
173 if (nElements == 0)
174 nElements = kDefaultElementsPerPage;
175 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
176 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
177}
178
180{
181 fPageAllocator->DeletePage(page);
182}
183
184
185////////////////////////////////////////////////////////////////////////////////
186
187
189 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
190{
191 RPage newPage(columnId, mem, elementSize * nElements, elementSize);
192 newPage.TryGrow(nElements);
193 return newPage;
194}
195
197{
198 if (page.IsNull())
199 return;
200 delete[] reinterpret_cast<unsigned char *>(page.GetBuffer());
201}
202
203
204////////////////////////////////////////////////////////////////////////////////
205
206
208 const RNTupleReadOptions &options)
209 : RPageSource(ntupleName, options)
210 , fMetrics("RPageSourceFile")
211 , fPageAllocator(std::make_unique<RPageAllocatorFile>())
212 , fPagePool(std::make_shared<RPagePool>())
213 , fClusterPool(std::make_unique<RClusterPool>(*this))
214{
215 fCounters = std::unique_ptr<RCounters>(new RCounters{
216 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nReadV", "", "number of vector read requests"),
217 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nRead", "", "number of byte ranges read"),
218 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szReadPayload", "B", "volume read from file (required)"),
219 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szReadOverhead", "B", "volume read from file (overhead)"),
220 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("szUnzip", "B", "volume after unzipping"),
221 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nClusterLoaded", "",
222 "number of partial clusters preloaded from storage"),
223 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPageLoaded", "", "number of pages loaded from storage"),
224 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("nPagePopulated", "", "number of populated pages"),
225 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallRead", "ns", "wall clock time spent reading"),
226 *fMetrics.MakeCounter<RNTupleAtomicCounter*>("timeWallUnzip", "ns", "wall clock time spent decompressing"),
227 *fMetrics.MakeCounter<RNTupleTickCounter<RNTupleAtomicCounter>*>("timeCpuRead", "ns", "CPU time spent reading"),
229 "CPU time spent decompressing"),
230 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwRead", "MB/s", "bandwidth compressed bytes read per second",
231 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
232 if (const auto szReadPayload = metrics.GetCounter("RPageSourceFile.szReadPayload")) {
233 if (const auto szReadOverhead = metrics.GetCounter("RPageSourceFile.szReadOverhead")) {
234 if (const auto timeWallRead = metrics.GetCounter("RPageSourceFile.timeWallRead")) {
235 if (auto walltime = timeWallRead->GetValueAsInt()) {
236 double payload = szReadPayload->GetValueAsInt();
237 double overhead = szReadOverhead->GetValueAsInt();
238 // unit: bytes / nanosecond = GB/s
239 return {true, (1000. * (payload + overhead) / walltime)};
240 }
241 }
242 }
243 }
244 return {false, -1.};
245 }
246 ),
247 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwReadUnzip", "MB/s", "bandwidth uncompressed bytes read per second",
248 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
249 if (const auto szUnzip = metrics.GetCounter("RPageSourceFile.szUnzip")) {
250 if (const auto timeWallRead = metrics.GetCounter("RPageSourceFile.timeWallRead")) {
251 if (auto walltime = timeWallRead->GetValueAsInt()) {
252 double unzip = szUnzip->GetValueAsInt();
253 // unit: bytes / nanosecond = GB/s
254 return {true, 1000. * unzip / walltime};
255 }
256 }
257 }
258 return {false, -1.};
259 }
260 ),
261 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("bwUnzip", "MB/s", "decompression bandwidth of uncompressed bytes per second",
262 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
263 if (const auto szUnzip = metrics.GetCounter("RPageSourceFile.szUnzip")) {
264 if (const auto timeWallUnzip = metrics.GetCounter("RPageSourceFile.timeWallUnzip")) {
265 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
266 double unzip = szUnzip->GetValueAsInt();
267 // unit: bytes / nanosecond = GB/s
268 return {true, 1000. * unzip / walltime};
269 }
270 }
271 }
272 return {false, -1.};
273 }
274 ),
275 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("rtReadEfficiency", "", "ratio of payload over all bytes read",
276 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
277 if (const auto szReadPayload = metrics.GetCounter("RPageSourceFile.szReadPayload")) {
278 if (const auto szReadOverhead = metrics.GetCounter("RPageSourceFile.szReadOverhead")) {
279 if (auto payload = szReadPayload->GetValueAsInt()) {
280 // r/(r+o) = 1/((r+o)/r) = 1/(1 + o/r)
281 return {true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
282 }
283 }
284 }
285 return {false, -1.};
286 }
287 ),
288 *fMetrics.MakeCounter<RNTupleCalcPerf*> ("rtCompression", "", "ratio of compressed bytes / uncompressed bytes",
289 fMetrics, [](const RNTupleMetrics &metrics) -> std::pair<bool, double> {
290 if (const auto szReadPayload = metrics.GetCounter("RPageSourceFile.szReadPayload")) {
291 if (const auto szUnzip = metrics.GetCounter("RPageSourceFile.szUnzip")) {
292 if (auto unzip = szUnzip->GetValueAsInt()) {
293 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
294 }
295 }
296 }
297 return {false, -1.};
298 }
299 )
300 });
301}
302
303
304ROOT::Experimental::Detail::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, std::string_view path,
305 const RNTupleReadOptions &options)
306 : RPageSourceFile(ntupleName, options)
307{
311}
312
313
315
316
318{
319 RNTupleDescriptorBuilder descBuilder;
320 auto ntpl = fReader.GetNTuple(fNTupleName).Unwrap();
321
322 auto buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
323 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
324 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fSeekHeader);
325 fDecompressor(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
326 descBuilder.SetFromHeader(buffer.get());
327
328 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
329 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
330 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fSeekFooter);
331 fDecompressor(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
332 descBuilder.AddClustersFromFooter(buffer.get());
333
334 return descBuilder.MoveDescriptor();
335}
336
337
339 ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
340{
341 const auto columnId = columnHandle.fId;
342 const auto clusterId = clusterDescriptor.GetId();
343 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
344
345 // TODO(jblomer): binary search
347 decltype(idxInCluster) firstInPage = 0;
348 NTupleSize_t pageNo = 0;
349 for (const auto &pi : pageRange.fPageInfos) {
350 if (firstInPage + pi.fNElements > idxInCluster) {
351 pageInfo = pi;
352 break;
353 }
354 firstInPage += pi.fNElements;
355 ++pageNo;
356 }
357 R__ASSERT(firstInPage <= idxInCluster);
358 R__ASSERT((firstInPage + pageInfo.fNElements) > idxInCluster);
359
360 const auto element = columnHandle.fColumn->GetElement();
361 const auto elementSize = element->GetSize();
362
363 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
364 const auto bytesPacked = (element->GetBitsOnStorage() * pageInfo.fNElements + 7) / 8;
365 const auto pageSize = elementSize * pageInfo.fNElements;
366
367 auto pageBuffer = new unsigned char[bytesPacked];
368 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
369 fReader.ReadBuffer(pageBuffer, bytesOnStorage, pageInfo.fLocator.fPosition);
370 fCounters->fNPageLoaded.Inc();
371 } else {
372 //printf("GETTING CLUSTER %ld\n", clusterId);
373 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
374 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
375 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
376
377 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
378 if (!cachedPage.IsNull())
379 return cachedPage;
380
381 ROnDiskPage::Key key(columnId, pageNo);
382 //printf("POPULATE cluster %ld column %ld page %ld\n", clusterId, columnId, pageNo);
383 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
384 R__ASSERT(onDiskPage);
385 R__ASSERT(bytesOnStorage == onDiskPage->GetSize());
386 memcpy(pageBuffer, onDiskPage->GetAddress(), onDiskPage->GetSize());
387 }
388
389 if (bytesOnStorage != bytesPacked) {
390 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
391 fDecompressor(pageBuffer, bytesOnStorage, bytesPacked);
392 fCounters->fSzUnzip.Add(bytesPacked);
393 }
394
395 if (!element->IsMappable()) {
396 auto unpackedBuffer = new unsigned char[pageSize];
397 element->Unpack(unpackedBuffer, pageBuffer, pageInfo.fNElements);
398 delete[] pageBuffer;
399 pageBuffer = unpackedBuffer;
400 }
401
402 const auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
403 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, elementSize, pageInfo.fNElements);
404 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
405 fPagePool->RegisterPage(newPage,
406 RPageDeleter([](const RPage &page, void * /*userData*/)
407 {
409 }, nullptr));
410 fCounters->fNPagePopulated.Inc();
411 return newPage;
412}
413
414
416 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
417{
418 const auto columnId = columnHandle.fId;
419 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
420 if (!cachedPage.IsNull())
421 return cachedPage;
422
423 const auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
424 R__ASSERT(clusterId != kInvalidDescriptorId);
425 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
426 const auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
427 R__ASSERT(selfOffset <= globalIndex);
428 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
429}
430
431
433 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
434{
435 const auto clusterId = clusterIndex.GetClusterId();
436 const auto idxInCluster = clusterIndex.GetIndex();
437 const auto columnId = columnHandle.fId;
438 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
439 if (!cachedPage.IsNull())
440 return cachedPage;
441
442 R__ASSERT(clusterId != kInvalidDescriptorId);
443 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
444 return PopulatePageFromCluster(columnHandle, clusterDescriptor, idxInCluster);
445}
446
448{
449 fPagePool->ReturnPage(page);
450}
451
452std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceFile::Clone() const
453{
454 auto clone = new RPageSourceFile(fNTupleName, fOptions);
455 clone->fFile = fFile->Clone();
456 clone->fReader = Internal::RMiniFileReader(clone->fFile.get());
457 return std::unique_ptr<RPageSourceFile>(clone);
458}
459
460std::unique_ptr<ROOT::Experimental::Detail::RCluster>
462{
463 fCounters->fNClusterLoaded.Inc();
464
465 const auto &clusterDesc = GetDescriptor().GetClusterDescriptor(clusterId);
466 auto clusterLocator = clusterDesc.GetLocator();
467 auto clusterSize = clusterLocator.fBytesOnStorage;
468 R__ASSERT(clusterSize > 0);
469
470 struct ROnDiskPageLocator {
471 ROnDiskPageLocator() = default;
472 ROnDiskPageLocator(DescriptorId_t c, NTupleSize_t p, std::uint64_t o, std::uint64_t s)
473 : fColumnId(c), fPageNo(p), fOffset(o), fSize(s) {}
474 DescriptorId_t fColumnId = 0;
475 NTupleSize_t fPageNo = 0;
476 std::uint64_t fOffset = 0;
477 std::uint64_t fSize = 0;
478 std::size_t fBufPos = 0;
479 };
480
481 // Collect the page necessary page meta-data and sum up the total size of the compressed and packed pages
482 std::vector<ROnDiskPageLocator> onDiskPages;
483 auto activeSize = 0;
484 for (auto columnId : columns) {
485 const auto &pageRange = clusterDesc.GetPageRange(columnId);
486 NTupleSize_t pageNo = 0;
487 for (const auto &pageInfo : pageRange.fPageInfos) {
488 const auto &pageLocator = pageInfo.fLocator;
489 activeSize += pageLocator.fBytesOnStorage;
490 onDiskPages.emplace_back(ROnDiskPageLocator(
491 columnId, pageNo, pageLocator.fPosition, pageLocator.fBytesOnStorage));
492 ++pageNo;
493 }
494 }
495
496 // Linearize the page requests by file offset
497 std::sort(onDiskPages.begin(), onDiskPages.end(),
498 [](const ROnDiskPageLocator &a, const ROnDiskPageLocator &b) {return a.fOffset < b.fOffset;});
499
500 // In order to coalesce close-by pages, we collect the sizes of the gaps between pages on disk. We then order
501 // the gaps by size, sum them up and find a cutoff for the largest gap that we tolerate when coalescing pages.
502 // The size of the cutoff is given by the fraction of extra bytes we are willing to read in order to reduce
503 // the number of read requests. We thus schedule the lowest number of requests given a tolerable fraction
504 // of extra bytes.
505 // TODO(jblomer): Eventually we may want to select the parameter at runtime according to link latency and speed,
506 // memory consumption, device block size.
507 float maxOverhead = 0.25 * float(activeSize);
508 std::vector<std::size_t> gaps;
509 for (unsigned i = 1; i < onDiskPages.size(); ++i) {
510 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].fSize + onDiskPages[i-1].fOffset));
511 }
512 std::sort(gaps.begin(), gaps.end());
513 std::size_t gapCut = 0;
514 float szExtra = 0.0;
515 for (auto g : gaps) {
516 szExtra += g;
517 if (szExtra > maxOverhead)
518 break;
519 gapCut = g;
520 }
521
522 // Prepare the input vector for the RRawFile::ReadV() call
523 struct RReadRequest {
524 RReadRequest() = default;
525 RReadRequest(std::size_t b, std::uint64_t o, std::uint64_t s) : fBufPos(b), fOffset(o), fSize(s) {}
526 std::size_t fBufPos = 0;
527 std::uint64_t fOffset = 0;
528 std::uint64_t fSize = 0;
529 };
530 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
531
533 std::size_t szPayload = 0;
534 std::size_t szOverhead = 0;
535 for (auto &s : onDiskPages) {
536 R__ASSERT(s.fSize > 0);
537 auto readUpTo = req.fOffset + req.fSize;
538 R__ASSERT(s.fOffset >= readUpTo);
539 auto overhead = s.fOffset - readUpTo;
540 szPayload += s.fSize;
541 if (overhead <= gapCut) {
542 szOverhead += overhead;
543 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize + overhead;
544 req.fSize += overhead + s.fSize;
545 continue;
546 }
547
548 // close the current request and open new one
549 if (req.fSize > 0)
550 readRequests.emplace_back(req);
551
552 req.fBuffer = reinterpret_cast<unsigned char *>(req.fBuffer) + req.fSize;
553 s.fBufPos = reinterpret_cast<intptr_t>(req.fBuffer);
554
555 req.fOffset = s.fOffset;
556 req.fSize = s.fSize;
557 }
558 readRequests.emplace_back(req);
559 fCounters->fSzReadPayload.Add(szPayload);
560 fCounters->fSzReadOverhead.Add(szOverhead);
561
562 // Register the on disk pages in a page map
563 auto buffer = new unsigned char[reinterpret_cast<intptr_t>(req.fBuffer) + req.fSize];
564 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
565 for (const auto &s : onDiskPages) {
566 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
567 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
568 }
569 fCounters->fNPageLoaded.Add(onDiskPages.size());
570 for (auto &r : readRequests) {
571 r.fBuffer = buffer + reinterpret_cast<intptr_t>(r.fBuffer);
572 }
573
574 auto nReqs = readRequests.size();
575 {
576 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
577 fFile->ReadV(&readRequests[0], nReqs);
578 }
579 fCounters->fNReadV.Inc();
580 fCounters->fNRead.Add(nReqs);
581
582 auto cluster = std::make_unique<RCluster>(clusterId);
583 cluster->Adopt(std::move(pageMap));
584 for (auto colId : columns)
585 cluster->SetColumnAvailable(colId);
586 return cluster;
587}
588
589
591{
592 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
593 fTaskScheduler->Reset();
594
595 const auto clusterId = cluster->GetId();
596 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
597
598 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
599
600 const auto &columnsInCluster = cluster->GetAvailColumns();
601 for (const auto columnId : columnsInCluster) {
602 const auto &columnDesc = fDescriptor.GetColumnDescriptor(columnId);
603
604 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
605
606 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
607 std::uint64_t pageNo = 0;
608 std::uint64_t firstInPage = 0;
609 for (const auto &pi : pageRange.fPageInfos) {
610 ROnDiskPage::Key key(columnId, pageNo);
611 auto onDiskPage = cluster->GetOnDiskPage(key);
612 R__ASSERT(onDiskPage);
613 R__ASSERT(onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage);
614
615 auto taskFunc =
616 [this, columnId, clusterId, firstInPage, onDiskPage,
617 element = allElements.back().get(),
618 nElements = pi.fNElements,
619 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
620 ] () {
621 const auto bytesPacked = (element->GetBitsOnStorage() * nElements + 7) / 8;
622 const auto pageSize = element->GetSize() * nElements;
623
624 auto pageBufferPacked = new unsigned char[bytesPacked];
625 if (onDiskPage->GetSize() != bytesPacked) {
626 fDecompressor(onDiskPage->GetAddress(), onDiskPage->GetSize(), bytesPacked, pageBufferPacked);
627 fCounters->fSzUnzip.Add(bytesPacked);
628 } else {
629 // We cannot simply map the onDiskPage because the cluster pool and the page pool have different
630 // life times
631 memcpy(pageBufferPacked, onDiskPage->GetAddress(), bytesPacked);
632 }
633
634 auto pageBuffer = pageBufferPacked;
635 if (!element->IsMappable()) {
636 pageBuffer = new unsigned char[pageSize];
637 element->Unpack(pageBuffer, pageBufferPacked, nElements);
638 delete[] pageBufferPacked;
639 }
640
641 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, element->GetSize(), nElements);
642 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
643 fPagePool->PreloadPage(newPage,
644 RPageDeleter([](const RPage &page, void * /*userData*/)
645 {
647 }, nullptr));
648 };
649
650 fTaskScheduler->AddTask(taskFunc);
651
652 firstInPage += pi.fNElements;
653 pageNo++;
654 } // for all pages in column
655 } // for all columns in cluster
656
657 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
658
659 fTaskScheduler->Wait();
660}
size_t fSize
ROOT::R::TRInterface & r
Definition Object.C:4
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
#define R__ASSERT(e)
Definition TError.h:120
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:154
const std::unordered_set< DescriptorId_t > & GetAvailColumns() const
Definition RCluster.hxx:190
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:231
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetCounter(std::string_view name) const
Searches this object and all the observed sub metrics. Returns nullptr if name is not found.
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:43
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
RPageSinkFile(std::string_view ntupleName, std::string_view path, const RNTupleWriteOptions &options)
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements=0) final
Get a new, empty page for the given column that can be filled with up to nElements.
void CreateImpl(const RNTupleModel &model) final
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RClusterDescriptor::RLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RClusterDescriptor::RLocator CommitClusterImpl(NTupleSize_t nEntries) final
Abstract interface to write data into an ntuple.
Storage provider that reads ntuple pages from a file.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
std::unique_ptr< RCluster > LoadCluster(DescriptorId_t clusterId, const ColumnSet_t &columns) final
Populates all the pages of the given cluster id and columns; it is possible that some columns do not ...
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RNTupleMetrics fMetrics
Wraps the I/O counters and is observed by the RNTupleReader metrics.
Abstract interface to read data from an ntuple.
std::unordered_set< DescriptorId_t > ColumnSet_t
Derived from the model (fields) that are actually being requested at a given point in time.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
ClusterSize_t::ValueType GetNElements() const
Definition RPage.hxx:83
void * TryGrow(ClusterSize_t::ValueType nElements)
Return a pointer after the last element that has space for nElements new elements.
Definition RPage.hxx:107
ClusterSize_t::ValueType GetSize() const
The space taken by column elements in the buffer.
Definition RPage.hxx:81
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
A helper class for piece-wise construction of an RNTupleDescriptor.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
Definition RRawFile.cxx:73
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
const Int_t n
Definition legend1.C:16
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
Definition file.py:1
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:53
I/O performance counters that get registered in fMetrics.
Generic information about the physical location of data.
We do not need to store the element size / uncompressed page size because we know to which column the...
RLocator fLocator
The meaning of fLocator depends on the storage backend.
ClusterSize_t fNElements
The sum of the elements of all the pages must match the corresponding fNElements field in fColumnRang...
Used for vector reads from multiple offsets into multiple buffers.
Definition RRawFile.hxx:71
std::size_t fSize
The number of desired bytes.
Definition RRawFile.hxx:77
void * fBuffer
The destination for reading.
Definition RRawFile.hxx:73
std::uint64_t fOffset
The file offset.
Definition RRawFile.hxx:75