45 for (
unsigned i = 0; i < fIDs.size(); ++i) {
52 fRefCounters.emplace_back(1);
57 for (
unsigned i = 0; i < fIDs.size(); ++i) {
59 if (--fRefCounters[i] == 0) {
60 fIDs.erase(fIDs.begin() + i);
61 fRefCounters.erase(fRefCounters.begin() + i);
72 for (
const auto &
id : fIDs)
88 if (
clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
100std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
107 if (location.empty()) {
110 if (location.find(
"daos://") == 0)
112 return std::make_unique<RPageSourceDaos>(
ntupleName, location, options);
117 return std::make_unique<RPageSourceFile>(
ntupleName, location, options);
137 if ((
range.fFirstEntry +
range.fNEntries) > GetNEntries()) {
145 return GetSharedDescriptorGuard()->GetNEntries();
150 return GetSharedDescriptorGuard()->GetNElements(
columnHandle.fPhysicalId);
191 fCounters = std::make_unique<RCounters>(
RCounters{
195 "volume read from storage (required)"),
197 "volume read from storage (overhead)"),
200 "number of partial clusters preloaded from storage"),
205 "wall clock time spent decompressing"),
207 "CPU time spent reading"),
209 "CPU time spent decompressing"),
211 "bwRead",
"MB/s",
"bandwidth compressed bytes read per second", fMetrics,
228 "bwReadUnzip",
"MB/s",
"bandwidth uncompressed bytes read per second", fMetrics,
242 "bwUnzip",
"MB/s",
"decompression bandwidth of uncompressed bytes per second", fMetrics,
256 "rtReadEfficiency",
"",
"ratio of payload over all bytes read", fMetrics,
262 return {
true, 1./(1. + (1. *
szReadOverhead->GetValueAsInt()) / payload)};
269 "rtCompression",
"",
"ratio of compressed bytes / uncompressed bytes", fMetrics,
309 Allocator_t::DeletePage(
page);
330 unsigned char *
pageBuf =
reinterpret_cast<unsigned char *
>(
page.GetBuffer());
346 pageBuf =
reinterpret_cast<unsigned char *
>(buf);
365std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
372 if (location.empty()) {
375 if (location.find(
"daos://") == 0) {
377 return std::make_unique<RPageSinkDaos>(
ntupleName, location, options);
384 return std::make_unique<RPageSinkFile>(
ntupleName, location, options);
398 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
407 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
411 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(),
fieldId);
418 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(),
fieldId);
434 for (
auto f :
changeset.fAddedProjectedFields) {
449 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
453 fOpenPageRanges.emplace_back(std::move(
pageRange));
458 if (fSerializationContext.GetHeaderSize() > 0)
459 fSerializationContext.MapSchema(
descriptor,
true);
464 fDescriptorBuilder.SetNTuple(fNTupleName, model.
GetDescription());
465 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
480 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
482 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
484 fDescriptorBuilder.BeginHeaderExtension();
501 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
502 .FirstEntryIndex(fPrevClusterNEntries)
505 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
506 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
513 fOpenColumnRanges[i].fFirstElementIndex +=
columnRange.fNElements;
515 fDescriptorBuilder.AddCluster(
clusterBuilder.MoveDescriptor().Unwrap());
524 fOpenColumnRanges.at(
columnHandle.fPhysicalId).fNElements +=
page.GetNElements();
543std::vector<ROOT::Experimental::RNTupleLocator>
545 std::span<RPageStorage::RSealedPageGroup> ranges)
547 std::vector<ROOT::Experimental::RNTupleLocator>
locators;
548 for (
auto &
range : ranges) {
556 std::span<RPageStorage::RSealedPageGroup> ranges)
558 auto locators = CommitSealedPageVImpl(ranges);
561 for (
auto &
range : ranges) {
563 fOpenColumnRanges.at(
range.fPhysicalColumnId).fNElements +=
sealedPageIt->fNElements;
568 fOpenPageRanges.at(
range.fPhysicalColumnId).fPageInfos.emplace_back(
pageInfo);
576 auto nbytes = CommitClusterImpl();
579 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
580 .FirstEntryIndex(fPrevClusterNEntries)
582 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
585 std::swap(
fullRange, fOpenPageRanges[i]);
586 clusterBuilder.CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex,
587 fOpenColumnRanges[i].fCompressionSettings,
fullRange);
588 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
589 fOpenColumnRanges[i].fNElements = 0;
591 fDescriptorBuilder.AddCluster(
clusterBuilder.MoveDescriptor().Unwrap());
598 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
602 for (
auto i = fNextClusterInGroup; i <
nClusters; ++i) {
603 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
615 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
622 .NClusters(
nClusters - fNextClusterInGroup);
625 for (
auto i = fNextClusterInGroup; i <
nClusters; ++i) {
629 fDescriptorBuilder.AddClusterGroup(
cgBuilder.MoveDescriptor().Unwrap());
637 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
649 fCounters = std::make_unique<RCounters>(
RCounters{
651 "number of pages committed to storage"),
653 "volume written for committed pages"),
658 "CPU time spent writing"),
660 "CPU time spent compressing")});
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
An either thread-safe or non thread safe counter for CPU ticks.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
std::uint32_t GetIndex() const
const RColumnModel & GetModel() const
NTupleSize_t GetFirstElementIndex() const
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
~RPagePersistentSink() override
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
std::uint64_t CommitCluster(NTupleSize_t nEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitDataset() final
Finalize the current cluster and the entrire data set.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
RCluster::ColumnSet_t ToColumnSet() const
void Erase(DescriptorId_t physicalColumnID)
void Insert(DescriptorId_t physicalColumnID)
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
NTupleSize_t GetNEntries()
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
A page is a slice of a column that is mapped into memory.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Meta-data for a set of ntuple clusters.
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
std::string GetDescription() const
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing and to set the on-disk f...
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.