45 for (
unsigned i = 0; i < fIDs.size(); ++i) {
46 if (fIDs[i] == physicalColumnID) {
51 fIDs.emplace_back(physicalColumnID);
52 fRefCounters.emplace_back(1);
57 for (
unsigned i = 0; i < fIDs.size(); ++i) {
58 if (fIDs[i] == physicalColumnID) {
59 if (--fRefCounters[i] == 0) {
60 fIDs.erase(fIDs.begin() + i);
61 fRefCounters.erase(fRefCounters.begin() + i);
72 for (
const auto &
id : fIDs)
100std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
104 if (ntupleName.empty()) {
107 if (location.empty()) {
110 if (location.find(
"daos://") == 0)
112 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
117 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
124 auto physicalId = GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.
GetIndex());
126 fActivePhysicalColumns.Insert(physicalId);
132 fActivePhysicalColumns.Erase(columnHandle.
fPhysicalId);
145 return GetSharedDescriptorGuard()->GetNEntries();
150 return GetSharedDescriptorGuard()->GetNElements(columnHandle.
fPhysicalId);
162 UnzipClusterImpl(cluster);
169 auto descriptorGuard = GetSharedDescriptorGuard();
170 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.
fClusterId);
173 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
175 for (
const auto &pageInfo : pageRange.fPageInfos) {
181 perPageFunc(physicalColumnId, pageNo, pageInfo);
191 fCounters = std::make_unique<RCounters>(
RCounters{
195 "volume read from storage (required)"),
197 "volume read from storage (overhead)"),
200 "number of partial clusters preloaded from storage"),
205 "wall clock time spent decompressing"),
207 "CPU time spent reading"),
209 "CPU time spent decompressing"),
211 "bwRead",
"MB/s",
"bandwidth compressed bytes read per second", fMetrics,
213 if (
const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
214 if (
const auto szReadOverhead = metrics.GetLocalCounter(
"szReadOverhead")) {
215 if (
const auto timeWallRead = metrics.GetLocalCounter(
"timeWallRead")) {
216 if (
auto walltime = timeWallRead->GetValueAsInt()) {
217 double payload = szReadPayload->GetValueAsInt();
218 double overhead = szReadOverhead->GetValueAsInt();
220 return {
true, (1000. * (payload + overhead) / walltime)};
228 "bwReadUnzip",
"MB/s",
"bandwidth uncompressed bytes read per second", fMetrics,
231 if (
const auto timeWallRead = metrics.
GetLocalCounter(
"timeWallRead")) {
232 if (
auto walltime = timeWallRead->GetValueAsInt()) {
235 return {
true, 1000. * unzip / walltime};
242 "bwUnzip",
"MB/s",
"decompression bandwidth of uncompressed bytes per second", fMetrics,
245 if (
const auto timeWallUnzip = metrics.
GetLocalCounter(
"timeWallUnzip")) {
246 if (
auto walltime = timeWallUnzip->GetValueAsInt()) {
249 return {
true, 1000. * unzip / walltime};
256 "rtReadEfficiency",
"",
"ratio of payload over all bytes read", fMetrics,
258 if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
259 if (
const auto szReadOverhead = metrics.
GetLocalCounter(
"szReadOverhead")) {
260 if (
auto payload = szReadPayload->GetValueAsInt()) {
262 return {
true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
269 "rtCompression",
"",
"ratio of compressed bytes / uncompressed bytes", fMetrics,
271 if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
273 if (
auto unzip = szUnzip->GetValueAsInt()) {
274 return {
true, (1. * szReadPayload->GetValueAsInt()) / unzip};
296 auto page = Allocator_t::NewPage(physicalColumnId, element.
GetSize(), sealedPage.
fNElements);
297 if (sealedPage.
fSize != bytesPacked) {
298 fDecompressor->Unzip(sealedPage.
fBuffer, sealedPage.
fSize, bytesPacked, page.GetBuffer());
303 memcpy(page.GetBuffer(), sealedPage.
fBuffer, bytesPacked);
307 auto tmp = Allocator_t::NewPage(physicalColumnId, element.
GetSize(), sealedPage.
fNElements);
309 Allocator_t::DeletePage(page);
328 int compressionSetting,
void *buf,
bool allowAlias)
330 unsigned char *pageBuf =
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
331 bool isAdoptedBuffer =
true;
336 pageBuf =
new unsigned char[packedBytes];
337 isAdoptedBuffer =
false;
340 auto zippedBytes = packedBytes;
342 if ((compressionSetting != 0) || !element.
IsMappable() || !allowAlias) {
344 if (!isAdoptedBuffer)
346 pageBuf =
reinterpret_cast<unsigned char *
>(buf);
347 isAdoptedBuffer =
true;
357 int compressionSetting)
360 return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
365std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
369 if (ntupleName.empty()) {
372 if (location.empty()) {
375 if (location.find(
"daos://") == 0) {
377 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
384 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
398 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
399 fDescriptorBuilder.AddColumn(columnId, columnId, fieldId, column.
GetModel(), column.
GetIndex(),
407 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
409 auto fieldId = descriptor.GetNFields();
411 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(), fieldId);
412 f.SetOnDiskId(fieldId);
416 auto fieldId = descriptor.GetNFields();
418 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(), fieldId);
419 f.SetOnDiskId(fieldId);
421 for (
const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
422 auto targetId = descriptor.GetNLogicalColumns();
423 fDescriptorBuilder.AddColumn(targetId, source.GetLogicalId(), fieldId, source.GetModel(), source.GetIndex());
427 R__ASSERT(firstEntry >= fPrevClusterNEntries);
428 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
431 for (
auto &descendant : *
f)
432 addField(descendant);
435 addProjectedField(*
f);
436 for (
auto &descendant : *
f)
437 addProjectedField(descendant);
440 const auto nColumns = descriptor.GetNPhysicalColumns();
441 for (
DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
447 columnRange.
fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
450 fOpenColumnRanges.emplace_back(columnRange);
453 fOpenPageRanges.emplace_back(std::move(pageRange));
458 if (fSerializationContext.GetHeaderSize() > 0)
459 fSerializationContext.MapSchema(descriptor,
true);
464 fDescriptorBuilder.SetNTuple(fNTupleName, model.
GetDescription());
465 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
473 for (
auto f : fieldZero.GetSubFields())
474 initialChangeset.fAddedFields.emplace_back(
f);
476 initialChangeset.fAddedProjectedFields.emplace_back(
f);
477 UpdateSchema(initialChangeset, 0U);
480 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
482 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
484 fDescriptorBuilder.BeginHeaderExtension();
501 clusterBuilder.
ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
505 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
506 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
507 const auto &columnRange = cluster.GetColumnRange(i);
508 R__ASSERT(columnRange.fPhysicalColumnId == i);
509 const auto &pageRange = cluster.GetPageRange(i);
510 R__ASSERT(pageRange.fPhysicalColumnId == i);
511 clusterBuilder.
CommitColumnRange(i, fOpenColumnRanges[i].fFirstElementIndex, columnRange.fCompressionSettings,
513 fOpenColumnRanges[i].fFirstElementIndex += columnRange.fNElements;
515 fDescriptorBuilder.AddCluster(clusterBuilder.
MoveDescriptor().Unwrap());
516 fPrevClusterNEntries += nEntries;
528 pageInfo.
fLocator = CommitPageImpl(columnHandle, page);
529 fOpenPageRanges.at(columnHandle.
fPhysicalId).fPageInfos.emplace_back(pageInfo);
535 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.
fNElements;
539 pageInfo.
fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
540 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
543std::vector<ROOT::Experimental::RNTupleLocator>
545 std::span<RPageStorage::RSealedPageGroup> ranges)
547 std::vector<ROOT::Experimental::RNTupleLocator> locators;
548 for (
auto &range : ranges) {
549 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
550 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
556 std::span<RPageStorage::RSealedPageGroup> ranges)
558 auto locators = CommitSealedPageVImpl(ranges);
561 for (
auto &range : ranges) {
562 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
563 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->fNElements;
566 pageInfo.
fNElements = sealedPageIt->fNElements;
568 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
576 auto nbytes = CommitClusterImpl();
579 clusterBuilder.
ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
582 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
585 std::swap(fullRange, fOpenPageRanges[i]);
587 fOpenColumnRanges[i].fCompressionSettings, fullRange);
588 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
589 fOpenColumnRanges[i].fNElements = 0;
591 fDescriptorBuilder.AddCluster(clusterBuilder.
MoveDescriptor().Unwrap());
592 fPrevClusterNEntries += nNewEntries;
598 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
600 const auto nClusters = descriptor.GetNActiveClusters();
601 std::vector<DescriptorId_t> physClusterIDs;
602 for (
auto i = fNextClusterInGroup; i < nClusters; ++i) {
603 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
607 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
610 const auto clusterGroupId = descriptor.GetNClusterGroups();
611 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
614 if (fNextClusterInGroup == nClusters) {
617 const auto &firstClusterDesc = descriptor.GetClusterDescriptor(fNextClusterInGroup);
618 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
619 cgBuilder.
MinEntry(firstClusterDesc.GetFirstEntryIndex())
620 .
EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
621 firstClusterDesc.GetFirstEntryIndex())
622 .
NClusters(nClusters - fNextClusterInGroup);
624 std::vector<DescriptorId_t> clusterIds;
625 for (
auto i = fNextClusterInGroup; i < nClusters; ++i) {
626 clusterIds.emplace_back(i);
629 fDescriptorBuilder.AddClusterGroup(cgBuilder.
MoveDescriptor().Unwrap());
630 fSerializationContext.MapClusterGroupId(clusterGroupId);
632 fNextClusterInGroup = nClusters;
637 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
640 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
643 CommitDatasetImpl(bufFooter.get(), szFooter);
649 fCounters = std::make_unique<RCounters>(
RCounters{
651 "number of pages committed to storage"),
653 "volume written for committed pages"),
658 "CPU time spent writing"),
660 "CPU time spent compressing")});
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
RResult< RClusterGroupDescriptor > MoveDescriptor()
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
std::size_t GetSize() const
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetPackedSize(std::size_t nElements=1U) const
std::uint32_t GetIndex() const
const RColumnModel & GetModel() const
NTupleSize_t GetFirstElementIndex() const
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
~RPagePersistentSink() override
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
std::uint64_t CommitCluster(NTupleSize_t nEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitDataset() final
Finalize the current cluster and the entrire data set.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
RCluster::ColumnSet_t ToColumnSet() const
void Erase(DescriptorId_t physicalColumnID)
void Insert(DescriptorId_t physicalColumnID)
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
NTupleSize_t GetNEntries()
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
A page is a slice of a column that is mapped into memory.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
std::uint32_t GetNElements() const
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Meta-data for a set of ntuple clusters.
NTupleSize_t GetFirstEntryIndex() const
ClusterSize_t GetNEntries() const
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
std::vector< RFieldBase * > GetSubFields()
void SetOnDiskId(DescriptorId_t id)
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::unique_ptr< RNTupleModel > CreateModel() const
Re-create the C++ model from the stored meta-data.
RFieldZero * GetFieldZero() const
const RFieldBase * GetSourceField(const RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::string GetDescription() const
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing and to set the on-disk f...
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
NTupleSize_t fFirstElementIndex
A 64bit element index.
ClusterSize_t fNElements
The number of column elements in the cluster.
DescriptorId_t fPhysicalColumnId