40#include <condition_variable>
52 "Do not store real data with this version of RNTuple!";
53 fCompressor = std::make_unique<RNTupleCompressor>();
79 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
91 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
93 auto buffer = std::make_unique<unsigned char []>(fSerializationContext.GetHeaderSize());
96 auto zipBuffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
98 fCompressor->Zip(buffer.get(), fSerializationContext.GetHeaderSize(), GetWriteOptions().GetCompression(),
99 [&zipBuffer](
const void *
b,
size_t n,
size_t o){ memcpy(zipBuffer.get() + o,
b,
n); } );
100 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader, fSerializationContext.GetHeaderSize());
108 std::uint64_t offsetData;
111 offsetData = fWriter->WriteBlob(sealedPage.
fBuffer, sealedPage.
fSize, bytesPacked);
113 fClusterMinOffset = std::min(offsetData, fClusterMinOffset);
114 fClusterMaxOffset = std::max(offsetData + sealedPage.
fSize, fClusterMaxOffset);
119 fCounters->fNPageCommitted.Inc();
120 fCounters->fSzWritePayload.Add(sealedPage.
fSize);
121 fNBytesCurrentCluster += sealedPage.
fSize;
133 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
137 return WriteSealedPage(sealedPage, element->GetPackedSize(page.
GetNElements()));
146 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(columnId).GetModel().GetType());
147 const auto bytesPacked = (bitsOnStorage * sealedPage.
fNElements + 7) / 8;
149 return WriteSealedPage(sealedPage, bytesPacked);
156 auto result = fNBytesCurrentCluster;
157 fNBytesCurrentCluster = 0;
164 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
166 std::vector<DescriptorId_t> physClusterIDs;
167 for (
const auto &
c : descriptor.GetClusterIterable()) {
168 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(
c.GetId()));
172 nullptr, descriptor, physClusterIDs, fSerializationContext);
173 auto bufPageList = std::make_unique<unsigned char []>(szPageList);
175 bufPageList.get(), descriptor, physClusterIDs, fSerializationContext);
177 auto bufPageListZip = std::make_unique<unsigned char []>(szPageList);
178 auto szPageListZip = fCompressor->Zip(bufPageList.get(), szPageList, GetWriteOptions().GetCompression(),
179 [&bufPageListZip](
const void *
b,
size_t n,
size_t o){ memcpy(bufPageListZip.get() + o,
b,
n); } );
180 auto offPageList = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip, szPageList);
186 fSerializationContext.AddClusterGroup(physClusterIDs.size(), pageListEnvelope);
189 auto bufFooter = std::make_unique<unsigned char []>(szFooter);
192 auto bufFooterZip = std::make_unique<unsigned char []>(szFooter);
193 auto szFooterZip = fCompressor->Zip(bufFooter.get(), szFooter, GetWriteOptions().GetCompression(),
194 [&bufFooterZip](
const void *
b,
size_t n,
size_t o){ memcpy(bufFooterZip.get() + o,
b,
n); } );
195 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip, szFooter);
206 return fPageAllocator->NewPage(columnHandle.
fId, elementSize, nElements);
211 fPageAllocator->DeletePage(page);
219 ColumnId_t columnId,
void *mem, std::size_t elementSize, std::size_t nElements)
221 RPage newPage(columnId, mem, elementSize, nElements);
230 delete[]
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
241 , fPagePool(std::make_shared<
RPagePool>())
242 , fClusterPool(std::make_unique<
RClusterPool>(*this, options.GetClusterBunchSize()))
265 auto ntpl = fReader.GetNTuple(fNTupleName).Unwrap();
268 auto buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
269 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
270 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fSeekHeader);
271 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
274 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
275 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
276 fReader.ReadBuffer(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fSeekFooter);
277 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
281 descBuilder.
SetOnDiskFooterSize(ntpl.fNBytesFooter + cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage);
282 buffer = std::make_unique<unsigned char[]>(cg.fPageListEnvelopeLink.fUnzippedSize);
283 zipBuffer = std::make_unique<unsigned char[]>(cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage);
284 fReader.ReadBuffer(zipBuffer.get(),
285 cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage,
286 cg.fPageListEnvelopeLink.fLocator.fPosition);
287 fDecompressor->Unzip(zipBuffer.get(),
288 cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage,
289 cg.fPageListEnvelopeLink.fUnzippedSize,
292 std::vector<RClusterDescriptorBuilder> clusters;
294 for (std::size_t i = 0; i < clusters.size(); ++i) {
295 descBuilder.
AddCluster(i, std::move(clusters[i]));
306 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
308 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.
GetIndex());
310 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
311 sealedPage.
fSize = bytesOnStorage;
314 fReader.ReadBuffer(
const_cast<void *
>(sealedPage.
fBuffer), bytesOnStorage, pageInfo.fLocator.fPosition);
320 const auto columnId = columnHandle.
fId;
321 const auto clusterId = clusterDescriptor.
GetId();
323 auto pageInfo = clusterDescriptor.
GetPageRange(columnId).
Find(idxInCluster);
326 const auto elementSize = element->
GetSize();
327 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
329 const void *sealedPageBuffer =
nullptr;
330 std::unique_ptr<unsigned char []> directReadBuffer;
333 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
334 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.fPosition);
335 fCounters->fNPageLoaded.Inc();
336 fCounters->fNRead.Inc();
337 fCounters->fSzReadPayload.Add(bytesOnStorage);
338 sealedPageBuffer = directReadBuffer.get();
340 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
341 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
342 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
344 auto cachedPage = fPagePool->GetPage(columnId,
RClusterIndex(clusterId, idxInCluster));
345 if (!cachedPage.IsNull())
349 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
350 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
351 sealedPageBuffer = onDiskPage->GetAddress();
354 std::unique_ptr<unsigned char []> pageBuffer;
357 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
358 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
362 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
363 newPage.SetWindow(indexOffset + pageInfo.fFirstInPage,
RPage::RClusterInfo(clusterId, indexOffset));
364 fPagePool->RegisterPage(newPage,
369 fCounters->fNPagePopulated.Inc();
377 const auto columnId = columnHandle.
fId;
378 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
379 if (!cachedPage.IsNull())
382 const auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
384 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
385 const auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
387 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
395 const auto idxInCluster = clusterIndex.
GetIndex();
396 const auto columnId = columnHandle.
fId;
397 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
398 if (!cachedPage.IsNull())
402 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
403 return PopulatePageFromCluster(columnHandle, clusterDescriptor, idxInCluster);
408 fPagePool->ReturnPage(page);
414 clone->fFile = fFile->Clone();
416 return std::unique_ptr<RPageSourceFile>(clone);
419std::unique_ptr<ROOT::Experimental::Detail::RCluster>
422 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
424 struct ROnDiskPageLocator {
427 std::uint64_t fOffset = 0;
428 std::uint64_t
fSize = 0;
429 std::size_t fBufPos = 0;
432 const auto &clusterDesc = GetDescriptor().GetClusterDescriptor(clusterKey.
fClusterId);
435 std::vector<ROnDiskPageLocator> onDiskPages;
438 const auto &pageRange = clusterDesc.GetPageRange(columnId);
440 for (
const auto &pageInfo : pageRange.fPageInfos) {
441 const auto &pageLocator = pageInfo.fLocator;
442 activeSize += pageLocator.fBytesOnStorage;
443 onDiskPages.push_back(
444 {columnId, pageNo, std::uint64_t(pageLocator.fPosition), pageLocator.fBytesOnStorage, 0});
450 std::sort(onDiskPages.begin(), onDiskPages.end(),
451 [](
const ROnDiskPageLocator &
a,
const ROnDiskPageLocator &
b) {return a.fOffset < b.fOffset;});
460 float maxOverhead = 0.25 * float(activeSize);
461 std::vector<std::size_t> gaps;
462 for (
unsigned i = 1; i < onDiskPages.size(); ++i) {
463 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].
fSize + onDiskPages[i-1].fOffset));
465 std::sort(gaps.begin(), gaps.end());
466 std::size_t gapCut = 0;
467 std::size_t currentGap = 0;
469 for (
auto g : gaps) {
470 if (
g != currentGap) {
475 if (szExtra > maxOverhead)
483 const auto currentReadRequestIdx = readRequests.size();
486 std::size_t szPayload = 0;
487 std::size_t szOverhead = 0;
488 for (
auto &s : onDiskPages) {
492 auto overhead = s.fOffset - readUpTo;
493 szPayload += s.fSize;
494 if (overhead <= gapCut) {
495 szOverhead += overhead;
496 s.fBufPos =
reinterpret_cast<intptr_t
>(req.
fBuffer) + req.
fSize + overhead;
497 req.
fSize += overhead + s.fSize;
503 readRequests.emplace_back(req);
506 s.fBufPos =
reinterpret_cast<intptr_t
>(req.
fBuffer);
511 readRequests.emplace_back(req);
512 fCounters->fSzReadPayload.Add(szPayload);
513 fCounters->fSzReadOverhead.Add(szOverhead);
516 auto buffer =
new unsigned char[
reinterpret_cast<intptr_t
>(req.
fBuffer) + req.
fSize];
517 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<
unsigned char []>(buffer));
518 for (
const auto &s : onDiskPages) {
520 pageMap->Register(key,
ROnDiskPage(buffer + s.fBufPos, s.fSize));
522 fCounters->fNPageLoaded.Add(onDiskPages.size());
523 for (
auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
524 readRequests[i].fBuffer = buffer +
reinterpret_cast<intptr_t
>(readRequests[i].fBuffer);
527 auto cluster = std::make_unique<RCluster>(clusterKey.
fClusterId);
528 cluster->Adopt(std::move(pageMap));
530 cluster->SetColumnAvailable(colId);
534std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
537 fCounters->fNClusterLoaded.Add(clusterKeys.size());
539 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> clusters;
540 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
542 for (
auto key: clusterKeys) {
543 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
546 auto nReqs = readRequests.size();
549 fFile->ReadV(&readRequests[0], nReqs);
551 fCounters->fNReadV.Inc();
552 fCounters->fNRead.Add(nReqs);
561 fTaskScheduler->Reset();
563 const auto clusterId = cluster->
GetId();
564 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
566 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
569 for (
const auto columnId : columnsInCluster) {
570 const auto &columnDesc = fDescriptor.GetColumnDescriptor(columnId);
574 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
575 std::uint64_t pageNo = 0;
576 std::uint64_t firstInPage = 0;
577 for (
const auto &pi : pageRange.fPageInfos) {
580 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
583 [
this, columnId, clusterId, firstInPage, onDiskPage,
584 element = allElements.back().get(),
585 nElements = pi.fNElements,
586 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
588 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
589 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
591 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
593 fPagePool->PreloadPage(newPage,
600 fTaskScheduler->AddTask(taskFunc);
602 firstInPage += pi.fNElements;
609 fTaskScheduler->Wait();
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
const ColumnSet_t & GetAvailColumns() const
size_t GetNOnDiskPages() const
DescriptorId_t GetId() const
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
virtual std::size_t GetBitsOnStorage() const
std::size_t GetSize() const
RColumnElementBase * GetElement() const
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Manages pages read from a the file.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
static void DeletePage(const RPage &page)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Storage provider that write ntuple pages into a file.
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
void CreateImpl(const RNTupleModel &model) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void CommitDatasetImpl() final
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
virtual ~RPageSourceFile()
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RNTupleDescriptor AttachImpl() final
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
ClusterSize_t::ValueType GetNElements() const
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
DescriptorId_t GetId() const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
A helper class for piece-wise construction of an RNTupleDescriptor.
RNTupleDescriptor MoveDescriptor()
Internal::RNTupleSerializer::RClusterGroup GetClusterGroup(std::uint32_t id) const
void SetOnDiskFooterSize(std::uint64_t size)
void AddCluster(DescriptorId_t clusterId, RNTupleVersion version, NTupleSize_t firstEntryIndex, ClusterSize_t nEntries)
void SetOnDiskHeaderSize(std::uint64_t size)
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
int GetCompression() const
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
DescriptorId_t fClusterId
On-disk pages within a page source are identified by the column and page number.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
std::uint32_t fUnzippedSize
NTupleSize_t fFirstElementIndex
A 64bit element index.
Generic information about the physical location of data.
std::uint32_t fBytesOnStorage
Used for vector reads from multiple offsets into multiple buffers.
std::size_t fSize
The number of desired bytes.
void * fBuffer
The destination for reading.
std::uint64_t fOffset
The file offset.