47 for (
unsigned i = 0; i < fIDs.size(); ++i) {
48 if (fIDs[i] == physicalColumnID) {
53 fIDs.emplace_back(physicalColumnID);
54 fRefCounters.emplace_back(1);
59 for (
unsigned i = 0; i < fIDs.size(); ++i) {
60 if (fIDs[i] == physicalColumnID) {
61 if (--fRefCounters[i] == 0) {
62 fIDs.erase(fIDs.begin() + i);
63 fRefCounters.erase(fRefCounters.begin() + i);
74 for (
const auto &
id : fIDs)
89 std::string_view ntupleName, std::string_view location,
const RNTupleReadOptions &options)
91 if (ntupleName.empty()) {
94 if (location.empty()) {
97 if (location.find(
"daos://") == 0)
99 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
104 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
111 auto physicalId = GetSharedDescriptorGuard()->FindPhysicalColumnId(fieldId, column.
GetIndex());
113 fActivePhysicalColumns.Insert(physicalId);
119 fActivePhysicalColumns.Erase(columnHandle.
fPhysicalId);
124 return GetSharedDescriptorGuard()->GetNEntries();
141 UnzipClusterImpl(cluster);
158 auto page = Allocator_t::NewPage(physicalColumnId, element.
GetSize(), sealedPage.
fNElements);
159 if (sealedPage.
fSize != bytesPacked) {
160 fDecompressor->Unzip(sealedPage.
fBuffer, sealedPage.
fSize, bytesPacked, page.GetBuffer());
165 memcpy(page.GetBuffer(), sealedPage.
fBuffer, bytesPacked);
169 auto tmp = Allocator_t::NewPage(physicalColumnId, element.
GetSize(), sealedPage.
fNElements);
171 Allocator_t::DeletePage(page);
183 auto descriptorGuard = GetSharedDescriptorGuard();
184 const auto &clusterDesc = descriptorGuard->GetClusterDescriptor(clusterKey.
fClusterId);
187 const auto &pageRange = clusterDesc.GetPageRange(physicalColumnId);
189 for (
const auto &pageInfo : pageRange.fPageInfos) {
195 perPageFunc(physicalColumnId, pageNo, pageInfo);
205 fCounters = std::unique_ptr<RCounters>(
new RCounters{
208 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szReadPayload",
"B",
"volume read from storage (required)"),
209 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szReadOverhead",
"B",
"volume read from storage (overhead)"),
212 "number of partial clusters preloaded from storage"),
213 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPageLoaded",
"",
"number of pages loaded from storage"),
214 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPagePopulated",
"",
"number of populated pages"),
215 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallRead",
"ns",
"wall clock time spent reading"),
216 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallUnzip",
"ns",
"wall clock time spent decompressing"),
219 "CPU time spent decompressing"),
220 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwRead",
"MB/s",
"bandwidth compressed bytes read per second",
221 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
222 if (
const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
223 if (
const auto szReadOverhead = metrics.GetLocalCounter(
"szReadOverhead")) {
224 if (
const auto timeWallRead = metrics.GetLocalCounter(
"timeWallRead")) {
225 if (
auto walltime = timeWallRead->GetValueAsInt()) {
226 double payload = szReadPayload->GetValueAsInt();
227 double overhead = szReadOverhead->GetValueAsInt();
229 return {
true, (1000. * (payload + overhead) / walltime)};
237 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwReadUnzip",
"MB/s",
"bandwidth uncompressed bytes read per second",
238 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
240 if (
const auto timeWallRead = metrics.
GetLocalCounter(
"timeWallRead")) {
241 if (
auto walltime = timeWallRead->GetValueAsInt()) {
244 return {
true, 1000. * unzip / walltime};
251 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwUnzip",
"MB/s",
"decompression bandwidth of uncompressed bytes per second",
252 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
254 if (
const auto timeWallUnzip = metrics.
GetLocalCounter(
"timeWallUnzip")) {
255 if (
auto walltime = timeWallUnzip->GetValueAsInt()) {
258 return {
true, 1000. * unzip / walltime};
265 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"rtReadEfficiency",
"",
"ratio of payload over all bytes read",
266 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
267 if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
268 if (
const auto szReadOverhead = metrics.
GetLocalCounter(
"szReadOverhead")) {
269 if (
auto payload = szReadPayload->GetValueAsInt()) {
271 return {
true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
278 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"rtCompression",
"",
"ratio of compressed bytes / uncompressed bytes",
279 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
280 if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
282 if (
auto unzip = szUnzip->GetValueAsInt()) {
283 return {
true, (1. * szReadPayload->GetValueAsInt()) / unzip};
307 std::string_view ntupleName, std::string_view location,
const RNTupleWriteOptions &options)
309 if (ntupleName.empty()) {
312 if (location.empty()) {
315 std::unique_ptr<ROOT::Experimental::Detail::RPageSink> realSink;
316 if (location.find(
"daos://") == 0) {
318 realSink = std::make_unique<RPageSinkDaos>(ntupleName, location, options);
323 realSink = std::make_unique<RPageSinkFile>(ntupleName, location, options);
327 return std::make_unique<RPageSinkBuf>(std::move(realSink));
334 auto columnId = fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns();
335 fDescriptorBuilder.AddColumn(columnId, columnId, fieldId, column.
GetModel(), column.
GetIndex(),
343 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
345 auto fieldId = descriptor.GetNFields();
347 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(), fieldId);
348 f.SetOnDiskId(fieldId);
349 f.ConnectPageSink(*
this, firstEntry);
352 auto fieldId = descriptor.GetNFields();
354 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(), fieldId);
355 f.SetOnDiskId(fieldId);
357 for (
const auto &source : descriptor.GetColumnIterable(sourceFieldId)) {
358 auto targetId = descriptor.GetNLogicalColumns();
359 fDescriptorBuilder.AddColumn(targetId, source.GetLogicalId(), fieldId, source.GetModel(), source.GetIndex());
363 R__ASSERT(firstEntry >= fPrevClusterNEntries);
364 const auto nColumnsBeforeUpdate = descriptor.GetNPhysicalColumns();
367 for (
auto &descendant : *
f)
368 addField(descendant);
371 addProjectedField(*
f);
372 for (
auto &descendant : *
f)
373 addProjectedField(descendant);
376 const auto nColumns = descriptor.GetNPhysicalColumns();
377 for (
DescriptorId_t i = nColumnsBeforeUpdate; i < nColumns; ++i) {
383 columnRange.
fFirstElementIndex = descriptor.GetColumnDescriptor(i).GetFirstElementIndex();
386 fOpenColumnRanges.emplace_back(columnRange);
389 fOpenPageRanges.emplace_back(std::move(pageRange));
394 if (fSerializationContext.GetHeaderSize() > 0)
395 fSerializationContext.MapSchema(descriptor,
true);
400 fDescriptorBuilder.SetNTuple(fNTupleName, model.
GetDescription());
401 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
409 for (
auto f : fieldZero.GetSubFields())
410 initialChangeset.fAddedFields.emplace_back(
f);
412 initialChangeset.fAddedProjectedFields.emplace_back(
f);
413 UpdateSchema(initialChangeset, 0U);
416 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
418 CreateImpl(model, buffer.get(), fSerializationContext.GetHeaderSize());
420 fDescriptorBuilder.BeginHeaderExtension();
429 pageInfo.
fLocator = CommitPageImpl(columnHandle, page);
430 fOpenPageRanges.at(columnHandle.
fPhysicalId).fPageInfos.emplace_back(pageInfo);
437 fOpenColumnRanges.at(physicalColumnId).fNElements += sealedPage.
fNElements;
441 pageInfo.
fLocator = CommitSealedPageImpl(physicalColumnId, sealedPage);
442 fOpenPageRanges.at(physicalColumnId).fPageInfos.emplace_back(pageInfo);
445std::vector<ROOT::Experimental::RNTupleLocator>
448 std::vector<ROOT::Experimental::RNTupleLocator> locators;
449 for (
auto &range : ranges) {
450 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
451 locators.push_back(CommitSealedPageImpl(range.fPhysicalColumnId, *sealedPageIt));
458 auto locators = CommitSealedPageVImpl(ranges);
461 for (
auto &range : ranges) {
462 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
463 fOpenColumnRanges.at(range.fPhysicalColumnId).fNElements += sealedPageIt->fNElements;
466 pageInfo.
fNElements = sealedPageIt->fNElements;
468 fOpenPageRanges.at(range.fPhysicalColumnId).fPageInfos.emplace_back(pageInfo);
475 auto nbytes = CommitClusterImpl(nEntries);
478 auto nEntriesInCluster =
ClusterSize_t(nEntries - fPrevClusterNEntries);
481 for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
484 std::swap(fullRange, fOpenPageRanges[i]);
486 fOpenColumnRanges[i].fCompressionSettings, fullRange);
487 fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
488 fOpenColumnRanges[i].fNElements = 0;
490 fDescriptorBuilder.AddClusterWithDetails(clusterBuilder.
MoveDescriptor().Unwrap());
491 fPrevClusterNEntries = nEntries;
497 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
499 const auto nClusters = descriptor.GetNClusters();
500 std::vector<DescriptorId_t> physClusterIDs;
501 for (
auto i = fNextClusterInGroup; i < nClusters; ++i) {
502 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
507 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
509 fSerializationContext);
511 const auto clusterGroupId = descriptor.GetNClusterGroups();
512 const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
515 for (
auto i = fNextClusterInGroup; i < nClusters; ++i) {
518 fDescriptorBuilder.AddClusterGroup(std::move(cgBuilder));
519 fSerializationContext.MapClusterGroupId(clusterGroupId);
521 fNextClusterInGroup = nClusters;
526 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
529 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
532 CommitDatasetImpl(bufFooter.get(), szFooter);
539 unsigned char *pageBuf =
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
540 bool isAdoptedBuffer =
true;
545 pageBuf =
new unsigned char[packedBytes];
546 isAdoptedBuffer =
false;
549 auto zippedBytes = packedBytes;
551 if ((compressionSetting != 0) || !element.
IsMappable()) {
553 if (!isAdoptedBuffer)
555 pageBuf =
reinterpret_cast<unsigned char *
>(buf);
556 isAdoptedBuffer =
true;
569 return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
575 fCounters = std::unique_ptr<RCounters>(
new RCounters{
576 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPageCommitted",
"",
"number of pages committed to storage"),
577 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szWritePayload",
"B",
"volume written for committed pages"),
579 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallWrite",
"ns",
"wall clock time spent writing"),
580 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallZip",
"ns",
"wall clock time spent compressing"),
583 "CPU time spent compressing")
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
A column element encapsulates the translation between basic C++ types and their column representation...
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
std::size_t GetPackedSize(std::size_t nElements=1U) const
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
std::size_t GetSize() const
const RColumnModel & GetModel() const
std::uint32_t GetIndex() const
NTupleSize_t GetFirstElementIndex() const
NTupleSize_t GetNElements() const
A field translates read and write calls from/to underlying columns to/from tree values.
void SetOnDiskId(DescriptorId_t id)
DescriptorId_t GetOnDiskId() const
std::vector< RFieldBase * > GetSubFields() const
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
A memory region that contains packed and compressed pages.
void Register(const ROnDiskPage::Key &key, const ROnDiskPage &onDiskPage)
Inserts information about a page stored in fMemory.
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
void CommitDataset()
Finalize the current cluster and the entrire data set.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitPage(ColumnHandle_t columnHandle, const RPage &page)
Write a page to the storage. The column must have been added before.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)
Incorporate incremental changes to the model into the ntuple descriptor.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the file name (location)
std::uint64_t CommitCluster(NTupleSize_t nEntries)
Finalize the current cluster and create a new one for the following data.
void CommitClusterGroup()
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)
Write a preprocessed page to storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
RCluster::ColumnSet_t ToColumnSet() const
void Erase(DescriptorId_t physicalColumnID)
void Insert(DescriptorId_t physicalColumnID)
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
NTupleSize_t GetNEntries()
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId)
Helper for unstreaming a page.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void PrepareLoadCluster(const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap, std::function< void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
A page is a slice of a column that is mapped into memory.
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
std::uint32_t GetNElements() const
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
A helper class for piece-wise construction of an RClusterDescriptor.
RResult< void > CommitColumnRange(DescriptorId_t physicalId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
A helper class for piece-wise construction of an RClusterGroupDescriptor.
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
void AddCluster(DescriptorId_t clusterId)
RClusterGroupDescriptorBuilder & PageListLength(std::uint32_t pageListLength)
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
Base class for all ROOT issued exceptions.
static RFieldDescriptorBuilder FromField(const Detail::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
RFieldZero * GetFieldZero() const
const Detail::RFieldBase * GetSourceField(const Detail::RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::string GetDescription() const
RFieldZero * GetFieldZero() const
const RProjectedFields & GetProjectedFields() const
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
bool GetUseBufferedWrite() const
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
RClusterSize ClusterSize_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
On-disk pages within a page source are identified by the column and page number.
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
NTupleSize_t fFirstElementIndex
A 64bit element index.
ClusterSize_t fNElements
The number of column elements in the cluster.
DescriptorId_t fPhysicalColumnId