59 std::string_view ntupleName, std::string_view location,
const RNTupleReadOptions &options)
61 if (ntupleName.empty()) {
64 if (location.empty()) {
67 if (location.find(
"daos://") == 0)
69 return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
74 return std::make_unique<RPageSourceFile>(ntupleName, location, options);
81 auto columnId = fDescriptor.FindColumnId(fieldId, column.
GetIndex());
83 fActiveColumns.emplace(columnId);
89 fActiveColumns.erase(columnHandle.
fId);
94 return fDescriptor.GetNEntries();
105 return columnHandle.
fId;
111 UnzipClusterImpl(cluster);
123 auto pageBuffer = std::make_unique<unsigned char[]>(bytesPacked);
124 if (sealedPage.
fSize != bytesPacked) {
125 fDecompressor->Unzip(sealedPage.
fBuffer, sealedPage.
fSize, bytesPacked, pageBuffer.get());
130 memcpy(pageBuffer.get(), sealedPage.
fBuffer, bytesPacked);
134 auto unpackedBuffer =
new unsigned char[pageSize];
136 pageBuffer = std::unique_ptr<unsigned char []>(unpackedBuffer);
145 fCounters = std::unique_ptr<RCounters>(
new RCounters{
148 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szReadPayload",
"B",
"volume read from storage (required)"),
149 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szReadOverhead",
"B",
"volume read from storage (overhead)"),
152 "number of partial clusters preloaded from storage"),
153 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPageLoaded",
"",
"number of pages loaded from storage"),
154 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPagePopulated",
"",
"number of populated pages"),
155 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallRead",
"ns",
"wall clock time spent reading"),
156 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallUnzip",
"ns",
"wall clock time spent decompressing"),
159 "CPU time spent decompressing"),
160 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwRead",
"MB/s",
"bandwidth compressed bytes read per second",
161 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
162 if (
const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
163 if (
const auto szReadOverhead = metrics.GetLocalCounter(
"szReadOverhead")) {
164 if (
const auto timeWallRead = metrics.GetLocalCounter(
"timeWallRead")) {
165 if (
auto walltime = timeWallRead->GetValueAsInt()) {
166 double payload = szReadPayload->GetValueAsInt();
167 double overhead = szReadOverhead->GetValueAsInt();
169 return {
true, (1000. * (payload + overhead) / walltime)};
177 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwReadUnzip",
"MB/s",
"bandwidth uncompressed bytes read per second",
178 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
180 if (
const auto timeWallRead = metrics.
GetLocalCounter(
"timeWallRead")) {
181 if (
auto walltime = timeWallRead->GetValueAsInt()) {
184 return {
true, 1000. * unzip / walltime};
191 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwUnzip",
"MB/s",
"decompression bandwidth of uncompressed bytes per second",
192 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
194 if (
const auto timeWallUnzip = metrics.
GetLocalCounter(
"timeWallUnzip")) {
195 if (
auto walltime = timeWallUnzip->GetValueAsInt()) {
198 return {
true, 1000. * unzip / walltime};
205 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"rtReadEfficiency",
"",
"ratio of payload over all bytes read",
206 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
207 if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
208 if (
const auto szReadOverhead = metrics.
GetLocalCounter(
"szReadOverhead")) {
209 if (
auto payload = szReadPayload->GetValueAsInt()) {
211 return {
true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
218 *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"rtCompression",
"",
"ratio of compressed bytes / uncompressed bytes",
219 fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
220 if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
222 if (
auto unzip = szUnzip->GetValueAsInt()) {
223 return {
true, (1. * szReadPayload->GetValueAsInt()) / unzip};
247 std::string_view ntupleName, std::string_view location,
const RNTupleWriteOptions &options)
249 if (ntupleName.empty()) {
252 if (location.empty()) {
255 std::unique_ptr<ROOT::Experimental::Detail::RPageSink> realSink;
256 if (location.find(
"daos://") == 0) {
258 realSink = std::make_unique<RPageSinkDaos>(ntupleName, location, options);
263 realSink = std::make_unique<RPageSinkFile>(ntupleName, location, options);
267 return std::make_unique<RPageSinkBuf>(std::move(realSink));
274 auto columnId = fLastColumnId++;
282 fDescriptorBuilder.SetNTuple(fNTupleName, model.
GetDescription(),
"undefined author",
286 fDescriptorBuilder.AddField(
288 .FieldId(fLastFieldId)
295 fDescriptorBuilder.AddField(
301 fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(), fLastFieldId);
302 f.SetOnDiskId(fLastFieldId);
303 f.ConnectPageSink(*
this);
306 auto nColumns = fLastColumnId;
313 fOpenColumnRanges.emplace_back(columnRange);
316 fOpenPageRanges.emplace_back(std::move(pageRange));
325 fOpenColumnRanges.at(columnHandle.
fId).fNElements += page.
GetNElements();
329 pageInfo.
fLocator = CommitPageImpl(columnHandle, page);
330 fOpenPageRanges.at(columnHandle.
fId).fPageInfos.emplace_back(pageInfo);
338 fOpenColumnRanges.at(columnId).fNElements += sealedPage.
fNElements;
342 pageInfo.
fLocator = CommitSealedPageImpl(columnId, sealedPage);
343 fOpenPageRanges.at(columnId).fPageInfos.emplace_back(pageInfo);
349 auto nbytes = CommitClusterImpl(nEntries);
352 fDescriptorBuilder.AddCluster(fLastClusterId,
RNTupleVersion(), fPrevClusterNEntries,
354 for (
auto &range : fOpenColumnRanges) {
355 fDescriptorBuilder.AddClusterColumnRange(fLastClusterId, range);
356 range.fFirstElementIndex += range.fNElements;
357 range.fNElements = 0;
359 for (
auto &range : fOpenPageRanges) {
361 std::swap(fullRange, range);
363 fDescriptorBuilder.AddClusterPageRange(fLastClusterId, std::move(fullRange));
365 fPrevClusterNEntries = nEntries;
374 unsigned char *pageBuf =
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
375 bool isAdoptedBuffer =
true;
380 pageBuf =
new unsigned char[packedBytes];
381 isAdoptedBuffer =
false;
384 auto zippedBytes = packedBytes;
386 if ((compressionSetting != 0) || !element.
IsMappable()) {
388 if (!isAdoptedBuffer)
390 pageBuf =
reinterpret_cast<unsigned char *
>(buf);
391 isAdoptedBuffer =
true;
404 return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
410 fCounters = std::unique_ptr<RCounters>(
new RCounters{
411 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPageCommitted",
"",
"number of pages committed to storage"),
412 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szWritePayload",
"B",
"volume written for committed pages"),
414 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallWrite",
"ns",
"wall clock time spent writing"),
415 *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallZip",
"ns",
"wall clock time spent compressing"),
418 "CPU time spent compressing")
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
An in-memory subset of the packed and compressed pages of a cluster.
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
std::size_t GetPackedSize(std::size_t nElements) const
std::size_t GetSize() const
const RColumnModel & GetModel() const
std::uint32_t GetIndex() const
RNTupleVersion GetVersion() const
NTupleSize_t GetNElements() const
void SetOnDiskId(DescriptorId_t id)
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
std::int64_t GetValueAsInt() const override
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
A collection of Counter objects with a name, a unit, and a description.
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
An either thread-safe or non thread safe counter for CPU ticks.
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
void CommitPage(ColumnHandle_t columnHandle, const RPage &page)
Write a page to the storage. The column must have been added before.
void CommitSealedPage(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage)
Write a preprocessed page to storage.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the file name (location)
std::uint64_t CommitCluster(NTupleSize_t nEntries)
Finalize the current cluster and create a new one for the following data.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< unsigned char[]> UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element)
Helper for unstreaming a page.
NTupleSize_t GetNEntries()
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
Common functionality of an ntuple storage for both reading and writing.
RPageStorage(std::string_view name)
A page is a slice of a column that is mapped into memory.
ClusterSize_t::ValueType GetNElements() const
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Base class for all ROOT issued exceptions.
static RFieldDescriptorBuilder FromField(const Detail::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
The RNTupleModel encapulates the schema of an ntuple.
std::string GetDescription() const
RNTupleUuid GetUuid() const
RNTupleVersion GetVersion() const
RFieldZero * GetFieldZero() const
Common user-tunable settings for reading ntuples.
For forward and backward compatibility, attach version information to the consitituents of the file f...
Common user-tunable settings for storing ntuples.
bool GetUseBufferedWrite() const
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
RClusterSize ClusterSize_t
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
Default I/O performance counters that get registered in fMetrics.
Default I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
The window of element indexes of a particular column in a particular cluster.
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
NTupleSize_t fFirstElementIndex
A 64bit element index.
ClusterSize_t fNElements
A 32bit value for the number of column elements in the cluster.