40#include <condition_variable>
52 "Do not store real data with this version of RNTuple!";
53 fCompressor = std::make_unique<RNTupleCompressor>();
79 fWriter = std::unique_ptr<Internal::RNTupleFileWriter>(
89 unsigned char *serializedHeader, std::uint32_t
length)
91 auto zipBuffer = std::make_unique<unsigned char[]>(
length);
92 auto szZipHeader = fCompressor->Zip(serializedHeader,
length, GetWriteOptions().GetCompression(),
94 fWriter->WriteNTupleHeader(zipBuffer.get(), szZipHeader,
length);
102 std::uint64_t offsetData;
105 offsetData = fWriter->WriteBlob(sealedPage.
fBuffer, sealedPage.
fSize, bytesPacked);
109 result.fPosition = offsetData;
111 fCounters->fNPageCommitted.Inc();
112 fCounters->fSzWritePayload.Add(sealedPage.
fSize);
113 fNBytesCurrentCluster += sealedPage.
fSize;
125 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
129 return WriteSealedPage(sealedPage, element->GetPackedSize(page.
GetNElements()));
138 fDescriptorBuilder.GetDescriptor().GetColumnDescriptor(columnId).GetModel().GetType());
139 const auto bytesPacked = (bitsOnStorage * sealedPage.
fNElements + 7) / 8;
141 return WriteSealedPage(sealedPage, bytesPacked);
148 auto result = fNBytesCurrentCluster;
149 fNBytesCurrentCluster = 0;
157 auto bufPageListZip = std::make_unique<unsigned char[]>(
length);
158 auto szPageListZip = fCompressor->Zip(serializedPageList,
length, GetWriteOptions().GetCompression(),
162 result.fBytesOnStorage = szPageListZip;
163 result.fPosition = fWriter->WriteBlob(bufPageListZip.get(), szPageListZip,
length);
169 auto bufFooterZip = std::make_unique<unsigned char[]>(
length);
170 auto szFooterZip = fCompressor->Zip(serializedFooter,
length, GetWriteOptions().GetCompression(),
172 fWriter->WriteNTupleFooter(bufFooterZip.get(), szFooterZip,
length);
182 return fPageAllocator->NewPage(columnHandle.
fId, elementSize, nElements);
187 fPageAllocator->DeletePage(page);
195 ColumnId_t columnId,
void *mem, std::size_t elementSize, std::size_t nElements)
197 RPage newPage(columnId, mem, elementSize, nElements);
206 delete[]
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
217 , fPagePool(std::make_shared<
RPagePool>())
218 , fClusterPool(std::make_unique<
RClusterPool>(*this, options.GetClusterBunchSize()))
236 fDescriptorBuilder.SetOnDiskHeaderSize(anchor.
fNBytesHeader);
237 auto buffer = std::make_unique<unsigned char[]>(anchor.
fLenHeader);
238 auto zipBuffer = std::make_unique<unsigned char[]>(anchor.
fNBytesHeader);
243 fDescriptorBuilder.AddToOnDiskFooterSize(anchor.
fNBytesFooter);
244 buffer = std::make_unique<unsigned char[]>(anchor.
fLenFooter);
245 zipBuffer = std::make_unique<unsigned char[]>(anchor.
fNBytesFooter);
251std::unique_ptr<ROOT::Experimental::Detail::RPageSourceFile>
255 auto pageSource = std::make_unique<RPageSourceFile>(
"", path, options);
256 pageSource->InitDescriptor(anchor);
257 pageSource->fNTupleName = pageSource->fDescriptorBuilder.GetDescriptor().GetName();
268 if (fDescriptorBuilder.GetDescriptor().GetOnDiskHeaderSize() == 0) {
269 auto anchor = fReader.GetNTuple(fNTupleName).Unwrap();
270 InitDescriptor(anchor);
273 auto ntplDesc = fDescriptorBuilder.MoveDescriptor();
275 for (
const auto &cgDesc : ntplDesc.GetClusterGroupIterable()) {
276 auto buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
277 auto zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
278 fReader.ReadBuffer(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage,
279 cgDesc.GetPageListLocator().GetPosition<std::uint64_t>());
280 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
285 for (std::size_t i = 0; i < clusters.size(); ++i) {
286 ntplDesc.AddClusterDetails(clusters[i].MoveDescriptor().Unwrap());
301 auto descriptorGuard = GetSharedDescriptorGuard();
302 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
303 pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.
GetIndex());
307 sealedPage.
fSize = bytesOnStorage;
310 fReader.ReadBuffer(
const_cast<void *
>(sealedPage.
fBuffer), bytesOnStorage,
319 const auto columnId = columnHandle.
fId;
320 const auto clusterId = clusterInfo.
fClusterId;
321 const auto pageInfo = clusterInfo.
fPageInfo;
324 const auto elementSize = element->
GetSize();
325 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
327 const void *sealedPageBuffer =
nullptr;
328 std::unique_ptr<unsigned char []> directReadBuffer;
331 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
332 fReader.ReadBuffer(directReadBuffer.get(), bytesOnStorage, pageInfo.fLocator.GetPosition<std::uint64_t>());
333 fCounters->fNPageLoaded.Inc();
334 fCounters->fNRead.Inc();
335 fCounters->fSzReadPayload.Add(bytesOnStorage);
336 sealedPageBuffer = directReadBuffer.get();
338 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
339 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
340 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
342 auto cachedPage = fPagePool->GetPage(columnId,
RClusterIndex(clusterId, idxInCluster));
343 if (!cachedPage.IsNull())
347 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
348 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
349 sealedPageBuffer = onDiskPage->GetAddress();
352 std::unique_ptr<unsigned char []> pageBuffer;
355 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
356 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
359 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
360 newPage.SetWindow(clusterInfo.
fColumnOffset + pageInfo.fFirstInPage,
362 fPagePool->RegisterPage(newPage,
367 fCounters->fNPagePopulated.Inc();
375 const auto columnId = columnHandle.
fId;
376 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
377 if (!cachedPage.IsNull())
380 std::uint64_t idxInCluster;
383 auto descriptorGuard = GetSharedDescriptorGuard();
384 clusterInfo.
fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
387 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.
fClusterId);
388 clusterInfo.
fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
391 clusterInfo.
fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
394 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
402 const auto idxInCluster = clusterIndex.
GetIndex();
403 const auto columnId = columnHandle.
fId;
404 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
405 if (!cachedPage.IsNull())
411 auto descriptorGuard = GetSharedDescriptorGuard();
412 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
414 clusterInfo.
fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
415 clusterInfo.
fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
418 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
423 fPagePool->ReturnPage(page);
429 clone->fFile = fFile->Clone();
431 return std::unique_ptr<RPageSourceFile>(clone);
434std::unique_ptr<ROOT::Experimental::Detail::RCluster>
437 std::vector<ROOT::Internal::RRawFile::RIOVec> &readRequests)
439 struct ROnDiskPageLocator {
442 std::uint64_t fOffset = 0;
443 std::uint64_t
fSize = 0;
444 std::size_t fBufPos = 0;
447 std::vector<ROnDiskPageLocator> onDiskPages;
450 auto descriptorGuard = GetSharedDescriptorGuard();
451 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.
fClusterId);
455 const auto &pageRange = clusterDesc.GetPageRange(columnId);
457 for (
const auto &pageInfo : pageRange.fPageInfos) {
458 const auto &pageLocator = pageInfo.fLocator;
459 activeSize += pageLocator.fBytesOnStorage;
460 onDiskPages.push_back(
461 {columnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), pageLocator.fBytesOnStorage, 0});
468 std::sort(onDiskPages.begin(), onDiskPages.end(),
469 [](
const ROnDiskPageLocator &
a,
const ROnDiskPageLocator &
b) {return a.fOffset < b.fOffset;});
478 float maxOverhead = 0.25 * float(activeSize);
479 std::vector<std::size_t> gaps;
480 for (
unsigned i = 1; i < onDiskPages.size(); ++i) {
481 gaps.emplace_back(onDiskPages[i].fOffset - (onDiskPages[i-1].
fSize + onDiskPages[i-1].fOffset));
483 std::sort(gaps.begin(), gaps.end());
484 std::size_t gapCut = 0;
485 std::size_t currentGap = 0;
487 for (
auto g : gaps) {
488 if (
g != currentGap) {
493 if (szExtra > maxOverhead)
501 const auto currentReadRequestIdx = readRequests.size();
504 std::size_t szPayload = 0;
505 std::size_t szOverhead = 0;
506 for (
auto &s : onDiskPages) {
510 auto overhead = s.fOffset - readUpTo;
511 szPayload += s.fSize;
512 if (overhead <= gapCut) {
513 szOverhead += overhead;
514 s.fBufPos =
reinterpret_cast<intptr_t
>(req.
fBuffer) + req.
fSize + overhead;
515 req.
fSize += overhead + s.fSize;
521 readRequests.emplace_back(req);
524 s.fBufPos =
reinterpret_cast<intptr_t
>(req.
fBuffer);
529 readRequests.emplace_back(req);
530 fCounters->fSzReadPayload.Add(szPayload);
531 fCounters->fSzReadOverhead.Add(szOverhead);
534 auto buffer =
new unsigned char[
reinterpret_cast<intptr_t
>(req.
fBuffer) + req.
fSize];
535 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<
unsigned char []>(buffer));
536 for (
const auto &s : onDiskPages) {
538 pageMap->Register(key,
ROnDiskPage(buffer + s.fBufPos, s.fSize));
540 fCounters->fNPageLoaded.Add(onDiskPages.size());
541 for (
auto i = currentReadRequestIdx; i < readRequests.size(); ++i) {
542 readRequests[i].fBuffer = buffer +
reinterpret_cast<intptr_t
>(readRequests[i].fBuffer);
545 auto cluster = std::make_unique<RCluster>(clusterKey.
fClusterId);
546 cluster->Adopt(std::move(pageMap));
548 cluster->SetColumnAvailable(colId);
552std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
555 fCounters->fNClusterLoaded.Add(clusterKeys.size());
557 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> clusters;
558 std::vector<ROOT::Internal::RRawFile::RIOVec> readRequests;
560 for (
auto key: clusterKeys) {
561 clusters.emplace_back(PrepareSingleCluster(key, readRequests));
564 auto nReqs = readRequests.size();
567 fFile->ReadV(&readRequests[0], nReqs);
569 fCounters->fNReadV.Inc();
570 fCounters->fNRead.Add(nReqs);
579 fTaskScheduler->Reset();
581 const auto clusterId = cluster->
GetId();
582 auto descriptorGuard = GetSharedDescriptorGuard();
583 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
585 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
588 for (
const auto columnId : columnsInCluster) {
589 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
593 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
594 std::uint64_t pageNo = 0;
595 std::uint64_t firstInPage = 0;
596 for (
const auto &pi : pageRange.fPageInfos) {
599 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
602 [
this, columnId, clusterId, firstInPage, onDiskPage,
603 element = allElements.back().get(),
604 nElements = pi.fNElements,
605 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
607 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
608 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
610 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
612 fPagePool->PreloadPage(newPage,
619 fTaskScheduler->AddTask(taskFunc);
621 firstInPage += pi.fNElements;
628 fTaskScheduler->Wait();
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
const ColumnSet_t & GetAvailColumns() const
size_t GetNOnDiskPages() const
DescriptorId_t GetId() const
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
virtual std::size_t GetBitsOnStorage() const
std::size_t GetSize() const
RColumnElementBase * GetElement() const
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Manages pages read from a the file.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
static void DeletePage(const RPage &page)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Storage provider that write ntuple pages into a file.
~RPageSinkFile() override
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RPageSinkFile(std::string_view ntupleName, const RNTupleWriteOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::unique_ptr< Internal::RNTupleFileWriter > fWriter
RNTupleLocator WriteSealedPage(const RPageStorage::RSealedPage &sealedPage, std::size_t bytesPacked)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a file.
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
void InitDescriptor(const Internal::RFileNTupleAnchor &anchor)
Deserialized header and footer into a minimal descriptor held by fDescriptorBuilder.
Internal::RMiniFileReader fReader
Takes the fFile to read ntuple blobs from it.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
static std::unique_ptr< RPageSourceFile > CreateFromAnchor(const Internal::RFileNTupleAnchor &anchor, std::string_view path, const RNTupleReadOptions &options)
Used from the RNTuple class to build a datasource if the anchor is already available.
RNTupleDescriptor AttachImpl() final
std::unique_ptr< RCluster > PrepareSingleCluster(const RCluster::RKey &clusterKey, std::vector< ROOT::Internal::RRawFile::RIOVec > &readRequests)
Helper function for LoadClusters: it prepares the memory buffer (page map) and the read requests for ...
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new raw file and reader and opens its own file descriptor to the dat...
void UnzipClusterImpl(RCluster *cluster) final
~RPageSourceFile() override
RPageSourceFile(std::string_view ntupleName, const RNTupleReadOptions &options)
std::unique_ptr< ROOT::Internal::RRawFile > fFile
An RRawFile is used to request the necessary byte ranges from a local or a remote file.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
ClusterSize_t::ValueType GetNElements() const
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Read RNTuple data blocks from a TFile container, provided by a RRawFile.
static RNTupleFileWriter * Append(std::string_view ntupleName, TFile &file)
Add a new RNTuple identified by ntupleName to the existing TFile.
static RNTupleFileWriter * Recreate(std::string_view ntupleName, std::string_view path, int defaultCompression, ENTupleContainerFormat containerFormat)
Create or truncate the local file given by path with the new empty RNTuple identified by ntupleName.
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::vector< RClusterDescriptorBuilder > GetClusterSummaries(const RNTupleDescriptor &ntplDesc, DescriptorId_t clusterGroupId)
Used to prepare the cluster descriptor builders when loading the page locations for a certain cluster...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
int GetCompression() const
ENTupleContainerFormat GetContainerFormat() const
static std::unique_ptr< RRawFile > Create(std::string_view url, ROptions options=ROptions())
Factory method that returns a suitable concrete implementation according to the transport in the url.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
DescriptorId_t fClusterId
On-disk pages within a page source are identified by the column and page number.
Summarizes cluster-level information that are necessary to populate a certain page.
DescriptorId_t fClusterId
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Entry point for an RNTuple in a ROOT file.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint64_t fSeekFooter
The file offset of the footer excluding the TKey part.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
std::uint64_t fSeekHeader
The file offset of the header excluding the TKey part.
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
Generic information about the physical location of data.
std::uint32_t fBytesOnStorage
const T & GetPosition() const
Used for vector reads from multiple offsets into multiple buffers.
std::size_t fSize
The number of desired bytes.
void * fBuffer
The destination for reading.
std::uint64_t fOffset
The file offset.