45 for (
unsigned i = 0;
i <
fIDs.size(); ++
i) {
46 if (
fIDs[
i] == physicalColumnID) {
51 fIDs.emplace_back(physicalColumnID);
57 for (
unsigned i = 0;
i <
fIDs.size(); ++
i) {
58 if (
fIDs[
i] == physicalColumnID) {
72 for (
const auto &
id :
fIDs)
100std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
104 if (ntupleName.empty()) {
107 if (location.empty()) {
110 if (location.find(
"daos://") == 0)
112 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
117 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
170 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.
fClusterId);
173 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
175 for (
const auto &pageInfo : pageRange.fPageInfos) {
181 perPageFunc(physicalColumnId, pageNo, pageInfo);
195 "volume read from storage (required)"),
197 "volume read from storage (overhead)"),
200 "number of partial clusters preloaded from storage"),
205 "wall clock time spent decompressing"),
207 "CPU time spent reading"),
209 "CPU time spent decompressing"),
211 "bwRead",
"MB/s",
"bandwidth compressed bytes read per second",
fMetrics,
213 if (
const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
214 if (
const auto szReadOverhead = metrics.GetLocalCounter(
"szReadOverhead")) {
215 if (
const auto timeWallRead = metrics.GetLocalCounter(
"timeWallRead")) {
216 if (
auto walltime = timeWallRead->GetValueAsInt()) {
217 double payload = szReadPayload->GetValueAsInt();
218 double overhead = szReadOverhead->GetValueAsInt();
220 return {
true, (1000. * (payload + overhead) / walltime)};
228 "bwReadUnzip",
"MB/s",
"bandwidth uncompressed bytes read per second",
fMetrics,
230 if (const auto szUnzip = metrics.GetLocalCounter(
"szUnzip")) {
231 if (const auto timeWallRead = metrics.GetLocalCounter(
"timeWallRead")) {
232 if (auto walltime = timeWallRead->GetValueAsInt()) {
233 double unzip = szUnzip->GetValueAsInt();
235 return {true, 1000. * unzip / walltime};
242 "bwUnzip",
"MB/s",
"decompression bandwidth of uncompressed bytes per second",
fMetrics,
244 if (const auto szUnzip = metrics.GetLocalCounter(
"szUnzip")) {
245 if (const auto timeWallUnzip = metrics.GetLocalCounter(
"timeWallUnzip")) {
246 if (auto walltime = timeWallUnzip->GetValueAsInt()) {
247 double unzip = szUnzip->GetValueAsInt();
249 return {true, 1000. * unzip / walltime};
256 "rtReadEfficiency",
"",
"ratio of payload over all bytes read", fMetrics,
258 if (const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
259 if (const auto szReadOverhead = metrics.GetLocalCounter(
"szReadOverhead")) {
260 if (auto payload = szReadPayload->GetValueAsInt()) {
262 return {true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
269 "rtCompression",
"",
"ratio of compressed bytes / uncompressed bytes", fMetrics,
271 if (const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
272 if (const auto szUnzip = metrics.GetLocalCounter(
"szUnzip")) {
273 if (auto unzip = szUnzip->GetValueAsInt()) {
274 return {true, (1. * szReadPayload->GetValueAsInt()) / unzip};
282ROOT::Experimental::Internal::RPage
296 auto page = Allocator_t::NewPage(physicalColumnId, element.
GetSize(), sealedPage.
fNElements);
297 if (sealedPage.
fSize != bytesPacked) {
303 memcpy(page.GetBuffer(), sealedPage.
fBuffer, bytesPacked);
307 auto tmp = Allocator_t::NewPage(physicalColumnId, element.
GetSize(), sealedPage.
fNElements);
309 Allocator_t::DeletePage(page);
328 int compressionSetting,
void *buf,
bool allowAlias)
330 unsigned char *pageBuf =
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
331 bool isAdoptedBuffer =
true;
336 pageBuf =
new unsigned char[packedBytes];
337 isAdoptedBuffer =
false;
340 auto zippedBytes = packedBytes;
342 if ((compressionSetting != 0) || !element.
IsMappable() || !allowAlias) {
344 if (!isAdoptedBuffer)
346 pageBuf =
reinterpret_cast<unsigned char *
>(buf);
347 isAdoptedBuffer =
true;
357 int compressionSetting)
365std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
369 if (ntupleName.empty()) {
372 if (location.empty()) {
375 if (location.find(
"daos://") == 0) {
377 return std::make_unique<RPageSinkDaos>(ntupleName, location, options);
384 return std::make_unique<RPageSinkFile>(ntupleName, location, options);
409 auto fieldId = descriptor.GetNFields();
412 f.SetOnDiskId(fieldId);
416 auto fieldId = descriptor.GetNFields();
419 f.SetOnDiskId(fieldId);
421 for (
const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
422 auto targetId = descriptor.GetNLogicalColumns();
423 fDescriptorBuilder.AddColumn(targetId, source.GetLogicalId(), fieldId, source.GetModel(), source.GetIndex());
428 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
431 for (
auto &descendant : *
f)
432 addField(descendant);
435 addProjectedField(*
f);
436 for (
auto &descendant : *
f)
437 addProjectedField(descendant);
440 const auto nColumns = descriptor.GetNPhysicalColumns();
469 fieldZero.SetOnDiskId(0);
473 for (
auto f : fieldZero.GetSubFields())
476 initialChangeset.fAddedProjectedFields.emplace_back(
f);
507 const auto &columnRange = cluster.GetColumnRange(
i);
508 R__ASSERT(columnRange.fPhysicalColumnId ==
i);
509 const auto &pageRange = cluster.GetPageRange(
i);
540 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
543std::vector<ROOT::Experimental::RNTupleLocator>
545 std::span<RPageStorage::RSealedPageGroup> ranges)
547 std::vector<ROOT::Experimental::RNTupleLocator> locators;
548 for (
auto &range : ranges) {
549 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
556 std::span<RPageStorage::RSealedPageGroup> ranges)
561 for (
auto &range : ranges) {
562 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
563 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->fNElements;
566 pageInfo.
fNElements = sealedPageIt->fNElements;
568 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
600 const auto nClusters = descriptor.GetNActiveClusters();
601 std::vector<DescriptorId_t> physClusterIDs;
607 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
610 const auto clusterGroupId = descriptor.GetNClusterGroups();
618 const auto &lastClusterDesc = descriptor.GetClusterDescriptor(nClusters - 1);
619 cgBuilder.
MinEntry(firstClusterDesc.GetFirstEntryIndex())
620 .
EntrySpan(lastClusterDesc.GetFirstEntryIndex() + lastClusterDesc.GetNEntries() -
621 firstClusterDesc.GetFirstEntryIndex())
624 std::vector<DescriptorId_t> clusterIds;
626 clusterIds.emplace_back(
i);
640 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
651 "number of pages committed to storage"),
653 "volume written for committed pages"),
658 "CPU time spent writing"),
660 "CPU time spent compressing")});
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
An either thread-safe or non thread safe counter for CPU ticks.
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
RClusterDescriptorBuilder & ClusterId(DescriptorId_t clusterId)
RClusterDescriptorBuilder & NEntries(std::uint64_t nEntries)
RClusterDescriptorBuilder & FirstEntryIndex(std::uint64_t firstEntryIndex)
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
void AddClusters(const std::vector< DescriptorId_t > &clusterIds)
RClusterGroupDescriptorBuilder & MinEntry(std::uint64_t minEntry)
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
RClusterGroupDescriptorBuilder & EntrySpan(std::uint64_t entrySpan)
RClusterGroupDescriptorBuilder & NClusters(std::uint32_t nClusters)
RClusterGroupDescriptorBuilder & PageListLength(std::uint64_t pageListLength)
RResult< RClusterGroupDescriptor > MoveDescriptor()
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
std::size_t GetSize() const
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetPackedSize(std::size_t nElements=1U) const
std::uint32_t GetIndex() const
const RColumnModel & GetModel() const
NTupleSize_t GetFirstElementIndex() const
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
std::uint64_t fNextClusterInGroup
Remembers the starting cluster id for the next cluster group.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
~RPagePersistentSink() override
RNTupleSerializer::RContext fSerializationContext
Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization.
virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page)=0
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length)=0
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
NTupleSize_t fPrevClusterNEntries
Used to calculate the number of entries in the current cluster.
std::vector< RClusterDescriptor::RPageRange > fOpenPageRanges
Keeps track of the written pages in the currently open cluster. Indexed by column id.
std::uint64_t CommitCluster(NTupleSize_t nEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)=0
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
void CommitDataset() final
Finalize the current cluster and the entrire data set.
std::unique_ptr< RCounters > fCounters
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
Internal::RNTupleDescriptorBuilder fDescriptorBuilder
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
std::vector< RClusterDescriptor::RColumnRange > fOpenColumnRanges
Keeps track of the number of elements in the currently open cluster. Indexed by column id.
virtual std::uint64_t CommitClusterImpl()=0
Returns the number of bytes written to storage (excluding metadata)
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
virtual RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)=0
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
std::unique_ptr< RNTupleWriteOptions > fOptions
RCluster::ColumnSet_t ToColumnSet() const
void Erase(DescriptorId_t physicalColumnID)
void Insert(DescriptorId_t physicalColumnID)
std::vector< DescriptorId_t > fIDs
std::vector< std::size_t > fRefCounters
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
RNTupleReadOptions fOptions
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RCounters > fCounters
RActivePhysicalColumns fActivePhysicalColumns
The active columns are implicitly defined by the model fields or views.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
REntryRange fEntryRange
Used by the cluster pool to prevent reading beyond the given range.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *)
NTupleSize_t GetNEntries()
RTaskScheduler * fTaskScheduler
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
RPageStorage(std::string_view name)
Detail::RNTupleMetrics fMetrics
A page is a slice of a column that is mapped into memory.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
std::uint32_t GetNElements() const
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Meta-data for a set of ntuple clusters.
NTupleSize_t GetFirstEntryIndex() const
ClusterSize_t GetNEntries() const
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
std::vector< RFieldBase * > GetSubFields()
void SetOnDiskId(DescriptorId_t id)
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
DescriptorId_t FindNextClusterId(DescriptorId_t clusterId) const
DescriptorId_t FindClusterId(DescriptorId_t physicalColumnId, NTupleSize_t index) const
const RClusterDescriptor & GetClusterDescriptor(DescriptorId_t clusterId) const
std::unique_ptr< RNTupleModel > CreateModel() const
Re-create the C++ model from the stored meta-data.
RFieldZero * GetFieldZero() const
const RFieldBase * GetSourceField(const RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::string GetDescription() const
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing and to set the on-disk f...
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
NTupleSize_t fFirstElementIndex
A 64bit element index.
int fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
ClusterSize_t fNElements
The number of column elements in the cluster.
DescriptorId_t fPhysicalColumnId