40#include <unordered_map>
69 return R__FAIL(
"page checksum verification failed, data corruption detected");
76 return R__FAIL(
"invalid attempt to extract non-existing page checksum");
78 assert(fBufferSize >= kNBytesPageChecksum);
81 reinterpret_cast<const unsigned char *
>(
fBuffer) + fBufferSize - kNBytesPageChecksum,
checksum);
105 for (std::size_t i = 0; i <
itr->second.size(); ++i) {
109 itr->second[i].fRefCounter--;
110 if (
itr->second[i].fRefCounter == 0) {
112 if (
itr->second.empty()) {
113 fColumnInfos.erase(
itr);
140 if (
clusterDesc.GetFirstEntryIndex() >= (fFirstEntry + fNEntries))
152std::unique_ptr<ROOT::Experimental::Internal::RPageSource>
159 if (location.empty()) {
162 if (location.find(
"daos://") == 0)
164 return std::make_unique<RPageSourceDaos>(
ntupleName, location, options);
169 return std::make_unique<RPageSourceFile>(
ntupleName, location, options);
190 if ((
range.fFirstEntry +
range.fNEntries) > GetNEntries()) {
200 fHasStructure =
true;
207 GetExclDescriptorGuard().MoveIn(AttachImpl());
213 auto clone = CloneImpl();
215 clone->GetExclDescriptorGuard().MoveIn(std::move(*GetSharedDescriptorGuard()->Clone()));
216 clone->fHasStructure =
true;
217 clone->fIsAttached =
true;
224 return GetSharedDescriptorGuard()->GetNEntries();
229 return GetSharedDescriptorGuard()->GetNElements(
columnHandle.fPhysicalId);
248 std::vector<std::unique_ptr<RColumnElementBase>>
allElements;
254 if (!fActivePhysicalColumns.HasColumnInfos(
columnId))
264 for (
const auto &pi :
pageRange.fPageInfos) {
270 sealedPage.SetBufferSize(pi.fLocator.fBytesOnStorage + pi.fHasChecksum * kNBytesPageChecksum);
286 fPagePool.PreloadPage(std::move(
newPage),
element->GetIdentifier().fInMemoryType);
295 fCounters->fNPageUnsealed.Add(
pageNo);
299 fTaskScheduler->Wait();
302 throw RException(
R__FAIL(
"page checksum verification failed, data corruption detected"));
403 fCounters = std::make_unique<RCounters>(
RCounters{
407 "volume read from storage (required)"),
409 "volume read from storage (overhead)"),
412 "number of partial clusters preloaded from storage"),
415 "number of pages unzipped and decoded"),
418 "wall clock time spent decompressing"),
420 "CPU time spent reading"),
422 "CPU time spent decompressing"),
424 "bwRead",
"MB/s",
"bandwidth compressed bytes read per second", fMetrics,
441 "bwReadUnzip",
"MB/s",
"bandwidth uncompressed bytes read per second", fMetrics,
455 "bwUnzip",
"MB/s",
"decompression bandwidth of uncompressed bytes per second", fMetrics,
469 "rtReadEfficiency",
"",
"ratio of payload over all bytes read", fMetrics,
475 return {
true, 1. / (1. + (1. *
szReadOverhead->GetValueAsInt()) / payload)};
482 "rtCompression",
"",
"ratio of compressed bytes / uncompressed bytes", fMetrics,
532 page = std::move(tmp);
544 if (fCurrentPageSize ==
other.fCurrentPageSize)
545 return fColumn->GetOnDiskId() >
other.fColumn->GetOnDiskId();
546 return fCurrentPageSize >
other.fCurrentPageSize;
555 auto itr = fColumnsSortedByPageSize.
begin();
556 while (
itr != fColumnsSortedByPageSize.
end()) {
559 if (
itr->fCurrentPageSize ==
itr->fInitialPageSize) {
568 if (
itr != fColumnsSortedByPageSize.
end())
577 itr = fColumnsSortedByPageSize.find(next);
586 auto itr = fColumnsSortedByPageSize.find(key);
587 if (
itr == fColumnsSortedByPageSize.
end()) {
601 fColumnsSortedByPageSize.erase(
itr);
607 fColumnsSortedByPageSize.insert(
elem);
616 fColumnsSortedByPageSize.insert(
elem);
621 fColumnsSortedByPageSize.insert(
elem);
628 :
RPageStorage(
name), fOptions(options.Clone()), fWritePageMemoryManager(options.GetPageBufferBudget())
675 const auto nBytes =
page.GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
676 if (fSealPageBuffer.size() <
nBytes)
677 fSealPageBuffer.resize(
nBytes);
683 config.
fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
685 config.
fBuffer = fSealPageBuffer.data();
687 return SealPage(config);
692 for (
const auto &cb : fOnDatasetCommitCallbacks)
710std::unique_ptr<ROOT::Experimental::Internal::RPageSink>
717 if (location.empty()) {
720 if (location.find(
"daos://") == 0) {
722 return std::make_unique<RPageSinkDaos>(
ntupleName, location, options);
729 return std::make_unique<RPageSinkFile>(
ntupleName, location, options);
743 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
758 fDescriptorBuilder.AddColumn(
columnBuilder.MakeDescriptor().Unwrap());
765 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
771 const auto &
reps =
f.GetColumnRepresentatives();
774 return reps.size() *
reps[0].size();
788 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(),
fieldId);
796 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(),
fieldId);
803 .PhysicalColumnId(
source.GetLogicalId())
805 .BitsOnStorage(
source.GetBitsOnStorage())
806 .ValueRange(
source.GetValueRange())
809 .RepresentationIndex(
source.GetRepresentationIndex());
810 fDescriptorBuilder.AddColumn(
columnBuilder.MakeDescriptor().Unwrap());
821 for (
auto f :
changeset.fAddedProjectedFields) {
836 columnRange.fCompressionSettings = GetWriteOptions().GetCompression();
840 fOpenPageRanges.emplace_back(std::move(
pageRange));
845 if (fSerializationContext.GetHeaderSize() > 0)
846 fSerializationContext.MapSchema(
descriptor,
true);
853 throw RException(
R__FAIL(
"ROOT bug: unexpected type extra info in UpdateExtraTypeInfo()"));
860 fDescriptorBuilder.SetNTuple(fNTupleName, model.
GetDescription());
861 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
877 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
879 InitImpl(buffer.get(), fSerializationContext.GetHeaderSize());
881 fDescriptorBuilder.BeginHeaderExtension();
898 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
899 .FirstEntryIndex(fPrevClusterNEntries)
902 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
903 R__ASSERT(fOpenColumnRanges[i].fPhysicalColumnId == i);
910 fOpenColumnRanges[i].fFirstElementIndex +=
columnRange.fNElements;
912 fDescriptorBuilder.AddCluster(
clusterBuilder.MoveDescriptor().Unwrap());
921 fOpenColumnRanges.at(
columnHandle.fPhysicalId).fIsSuppressed =
true;
926 fOpenColumnRanges.at(
columnHandle.fPhysicalId).fNElements +=
page.GetNElements();
931 pageInfo.fHasChecksum = GetWriteOptions().GetEnablePageChecksums();
947std::vector<ROOT::Experimental::RNTupleLocator>
949 std::span<RPageStorage::RSealedPageGroup> ranges,
const std::vector<bool> &
mask)
951 std::vector<ROOT::Experimental::RNTupleLocator>
locators;
954 for (
auto &
range : ranges) {
965 std::span<RPageStorage::RSealedPageGroup> ranges)
973 std::vector<bool>
mask;
977 std::unordered_map<std::uint64_t, RSealedPageLink>
originalPages;
979 for (
auto &
range : ranges) {
985 if (!fFeatures.fCanMergePages || !
sealedPageIt->GetHasChecksum()) {
986 mask.emplace_back(
true);
995 mask.emplace_back(
true);
1000 const auto *
p =
itr->second.fSealedPage;
1003 mask.emplace_back(
true);
1008 mask.emplace_back(
false);
1012 mask.shrink_to_fit();
1019 for (
auto &
range : ranges) {
1021 fOpenColumnRanges.at(
range.fPhysicalColumnId).fNElements +=
sealedPageIt->GetNElements();
1027 fOpenPageRanges.at(
range.fPhysicalColumnId).fPageInfos.emplace_back(
pageInfo);
1039 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
1041 if (fOpenColumnRanges[i].fIsSuppressed) {
1042 assert(fOpenPageRanges[i].fPageInfos.empty());
1046 fOpenColumnRanges[i].fNElements = 0;
1047 fOpenColumnRanges[i].fIsSuppressed =
false;
1049 std::swap(
columnInfo.fPageRange, fOpenPageRanges[i]);
1050 fOpenPageRanges[i].fPhysicalColumnId = i;
1052 columnInfo.fNElements = fOpenColumnRanges[i].fNElements;
1053 fOpenColumnRanges[i].fNElements = 0;
1065 clusterBuilder.ClusterId(fDescriptorBuilder.GetDescriptor().GetNActiveClusters())
1066 .FirstEntryIndex(fPrevClusterNEntries)
1075 fOpenColumnRanges[
colId].fCompressionSettings,
columnInfo.fPageRange);
1080 clusterBuilder.CommitSuppressedColumnRanges(fDescriptorBuilder.GetDescriptor()).ThrowOnError();
1089 fOpenColumnRanges[
colId].fFirstElementIndex =
1093 fDescriptorBuilder.AddCluster(
clusterBuilder.MoveDescriptor().Unwrap());
1094 fPrevClusterNEntries +=
cluster.fNEntries;
1100 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
1104 for (
auto i = fNextClusterInGroup; i <
nClusters; ++i) {
1105 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
1117 cgBuilder.MinEntry(0).EntrySpan(0).NClusters(0);
1124 .NClusters(
nClusters - fNextClusterInGroup);
1127 for (
auto i = fNextClusterInGroup; i <
nClusters; ++i) {
1131 fDescriptorBuilder.AddClusterGroup(
cgBuilder.MoveDescriptor().Unwrap());
1139 if (!fStreamerInfos.empty()) {
1143 fDescriptorBuilder.AddExtraTypeInfo(
extraInfoBuilder.MoveDescriptor().Unwrap());
1146 const auto &
descriptor = fDescriptorBuilder.GetDescriptor();
1158 fCounters = std::make_unique<RCounters>(
RCounters{
1160 "number of pages committed to storage"),
1162 "volume written for committed pages"),
1167 "CPU time spent writing"),
1169 "CPU time spent compressing")});
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
An either thread-safe or non thread safe counter for CPU ticks.
A helper class for piece-wise construction of an RClusterDescriptor.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
A helper class for piece-wise construction of an RColumnDescriptor.
A column element encapsulates the translation between basic C++ types and their column representation...
virtual RIdentifier GetIdentifier() const =0
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Pack(void *destination, const void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
std::size_t GetPackedSize(std::size_t nElements=1U) const
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
RColumnElementBase * GetElement() const
std::optional< std::pair< double, double > > GetValueRange() const
std::uint16_t GetRepresentationIndex() const
EColumnType GetType() const
std::uint32_t GetIndex() const
std::size_t GetWritePageCapacity() const
NTupleSize_t GetFirstElementIndex() const
std::uint16_t GetBitsOnStorage() const
static RFieldDescriptorBuilder FromField(const RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static std::uint32_t SerializeXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3, void *buffer)
Writes a XxHash-3 64bit checksum of the byte range given by data and length.
static RContext SerializeHeader(void *buffer, const RNTupleDescriptor &desc)
static std::string SerializeStreamerInfos(const StreamerInfoMap_t &infos)
static std::uint32_t SerializePageList(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< StreamerInfoMap_t > DeserializeStreamerInfos(const std::string &extraTypeInfoContent)
static RResult< void > VerifyXxHash3(const unsigned char *data, std::uint64_t length, std::uint64_t &xxhash3)
Expects an xxhash3 checksum in the 8 bytes following data + length and verifies it.
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeFooter(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A memory region that contains packed and compressed pages.
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
Abstract interface to allocate and release pages.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
~RPagePersistentSink() override
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitDatasetImpl() final
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
RCluster::ColumnSet_t ToColumnSet() const
void Insert(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
void Erase(DescriptorId_t physicalColumnId, RColumnElementBase::RIdentifier elementId)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) override
Register a new column.
void LoadStructure()
Loads header and footer without decompressing or deserializing them.
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
std::unique_ptr< RPageSource > Clone() const
Open the same storage multiple time, e.g.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
virtual RPageRef LoadPage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
Allocates and fills a page that contains the index-th element.
void SetEntryRange(const REntryRange &range)
Promise to only read from the given entry range.
virtual void UnzipClusterImpl(RCluster *cluster)
void Attach()
Open the physical storage container and deserialize header and footer.
static RResult< RPage > UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId, RPageAllocator &pageAlloc)
Helper for unstreaming a page.
NTupleSize_t GetNEntries()
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
std::uint32_t GetNElements() const
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
const RFieldBase * GetSourceField(const RFieldBase *target) const
bool TryUpdate(RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
DescriptorId_t GetOnDiskId() const
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
const std::string & GetDescription() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
const_iterator begin() const
const_iterator end() const
std::unique_ptr< RColumnElementBase > GenerateColumnElement(std::type_index inMemoryType, EColumnType onDiskType)
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
RResult< void > EnsureValidNameForRNTuple(std::string_view name, std::string_view where)
Check whether a given string is a valid name according to the RNTuple specification.
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
On-disk pages within a page source are identified by the column and page number.
Default I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
const RPage * fPage
Input page to be sealed.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
Default I/O performance counters that get registered in fMetrics
Used in SetEntryRange / GetEntryRange.
bool IntersectsWith(const RClusterDescriptor &clusterDesc) const
Returns true if the given cluster has entries within the entry range.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
bool operator>(const RColumnInfo &other) const
The window of element indexes of a particular column in a particular cluster.