59   std::string_view ntupleName, std::string_view location, 
const RNTupleReadOptions &options)
 
   61   if (ntupleName.empty()) {
 
   64   if (location.empty()) {
 
   67   if (location.find(
"daos://") == 0)
 
   69      return std::make_unique<RPageSourceDaos>(ntupleName, location, options);
 
   74   return std::make_unique<RPageSourceFile>(ntupleName, location, options);
 
   81   auto columnId = GetSharedDescriptorGuard()->FindColumnId(fieldId, column.
GetIndex());
 
   83   fActiveColumns.emplace(columnId);
 
   89   fActiveColumns.erase(columnHandle.
fId);
 
   94   return GetSharedDescriptorGuard()->GetNEntries();
 
  105   return columnHandle.
fId;
 
  111      UnzipClusterImpl(cluster);
 
  123   auto pageBuffer = std::make_unique<unsigned char[]>(bytesPacked);
 
  124   if (sealedPage.
fSize != bytesPacked) {
 
  125      fDecompressor->Unzip(sealedPage.
fBuffer, sealedPage.
fSize, bytesPacked, pageBuffer.get());
 
  130      memcpy(pageBuffer.get(), sealedPage.
fBuffer, bytesPacked);
 
  134      auto unpackedBuffer = 
new unsigned char[pageSize];
 
  136      pageBuffer = std::unique_ptr<unsigned char []>(unpackedBuffer);
 
  145   fCounters = std::unique_ptr<RCounters>(
new RCounters{
 
  148      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szReadPayload", 
"B", 
"volume read from storage (required)"),
 
  149      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szReadOverhead", 
"B", 
"volume read from storage (overhead)"),
 
  152                                                   "number of partial clusters preloaded from storage"),
 
  153      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPageLoaded", 
"", 
"number of pages loaded from storage"),
 
  154      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPagePopulated", 
"", 
"number of populated pages"),
 
  155      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallRead", 
"ns", 
"wall clock time spent reading"),
 
  156      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallUnzip", 
"ns", 
"wall clock time spent decompressing"),
 
  159                                                                        "CPU time spent decompressing"),
 
  160      *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwRead", 
"MB/s", 
"bandwidth compressed bytes read per second",
 
  161         fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
 
  162            if (
const auto szReadPayload = metrics.GetLocalCounter(
"szReadPayload")) {
 
  163               if (
const auto szReadOverhead = metrics.GetLocalCounter(
"szReadOverhead")) {
 
  164                  if (
const auto timeWallRead = metrics.GetLocalCounter(
"timeWallRead")) {
 
  165                     if (
auto walltime = timeWallRead->GetValueAsInt()) {
 
  166                        double payload = szReadPayload->GetValueAsInt();
 
  167                        double overhead = szReadOverhead->GetValueAsInt();
 
  169                        return {
true, (1000. * (payload + overhead) / walltime)};
 
  177      *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwReadUnzip", 
"MB/s", 
"bandwidth uncompressed bytes read per second",
 
  178         fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
 
  180               if (
const auto timeWallRead = metrics.
GetLocalCounter(
"timeWallRead")) {
 
  181                  if (
auto walltime = timeWallRead->GetValueAsInt()) {
 
  184                     return {
true, 1000. * unzip / walltime};
 
  191      *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"bwUnzip", 
"MB/s", 
"decompression bandwidth of uncompressed bytes per second",
 
  192         fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
 
  194               if (
const auto timeWallUnzip = metrics.
GetLocalCounter(
"timeWallUnzip")) {
 
  195                  if (
auto walltime = timeWallUnzip->GetValueAsInt()) {
 
  198                     return {
true, 1000. * unzip / walltime};
 
  205      *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"rtReadEfficiency", 
"", 
"ratio of payload over all bytes read",
 
  206         fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
 
  207            if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
 
  208               if (
const auto szReadOverhead = metrics.
GetLocalCounter(
"szReadOverhead")) {
 
  209                  if (
auto payload = szReadPayload->GetValueAsInt()) {
 
  211                     return {
true, 1./(1. + (1. * szReadOverhead->GetValueAsInt()) / payload)};
 
  218      *fMetrics.MakeCounter<
RNTupleCalcPerf*> (
"rtCompression", 
"", 
"ratio of compressed bytes / uncompressed bytes",
 
  219         fMetrics, [](
const RNTupleMetrics &metrics) -> std::pair<bool, double> {
 
  220            if (
const auto szReadPayload = metrics.
GetLocalCounter(
"szReadPayload")) {
 
  222                  if (
auto unzip = szUnzip->GetValueAsInt()) {
 
  223                     return {
true, (1. * szReadPayload->GetValueAsInt()) / unzip};
 
  247   std::string_view ntupleName, std::string_view location, 
const RNTupleWriteOptions &options)
 
  249   if (ntupleName.empty()) {
 
  252   if (location.empty()) {
 
  255   std::unique_ptr<ROOT::Experimental::Detail::RPageSink> realSink;
 
  256   if (location.find(
"daos://") == 0) {
 
  258      realSink = std::make_unique<RPageSinkDaos>(ntupleName, location, options);
 
  263      realSink = std::make_unique<RPageSinkFile>(ntupleName, location, options);
 
  267      return std::make_unique<RPageSinkBuf>(std::move(realSink));
 
  274   auto columnId = fDescriptorBuilder.GetDescriptor().GetNColumns();
 
  275   fDescriptorBuilder.AddColumn(columnId, fieldId, column.
GetModel(), column.
GetIndex());
 
  282   fDescriptorBuilder.SetNTuple(fNTupleName, model.
GetDescription());
 
  283   const auto &descriptor = fDescriptorBuilder.GetDescriptor();
 
  288   for (
auto &
f : fieldZero) {
 
  289      auto fieldId = descriptor.GetNFields();
 
  291      fDescriptorBuilder.AddFieldLink(
f.GetParent()->GetOnDiskId(), fieldId);
 
  292      f.SetOnDiskId(fieldId);
 
  293      f.ConnectPageSink(*
this); 
 
  296   auto nColumns = descriptor.GetNColumns();
 
  303      fOpenColumnRanges.emplace_back(columnRange);
 
  306      fOpenPageRanges.emplace_back(std::move(pageRange));
 
  310   auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
 
  313   CreateImpl(model, buffer.get(), fSerializationContext.GetHeaderSize());
 
  319   fOpenColumnRanges.at(columnHandle.
fId).fNElements += page.
GetNElements();
 
  323   pageInfo.
fLocator = CommitPageImpl(columnHandle, page);
 
  324   fOpenPageRanges.at(columnHandle.
fId).fPageInfos.emplace_back(pageInfo);
 
  332   fOpenColumnRanges.at(columnId).fNElements += sealedPage.
fNElements;
 
  336   pageInfo.
fLocator = CommitSealedPageImpl(columnId, sealedPage);
 
  337   fOpenPageRanges.at(columnId).fPageInfos.emplace_back(pageInfo);
 
  340std::vector<ROOT::Experimental::RNTupleLocator>
 
  343   std::vector<ROOT::Experimental::RNTupleLocator> locators;
 
  344   for (
auto &range : ranges) {
 
  345      for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt)
 
  346         locators.push_back(CommitSealedPageImpl(range.fColumnId, *sealedPageIt));
 
  353   auto locators = CommitSealedPageVImpl(ranges);
 
  356   for (
auto &range : ranges) {
 
  357      for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
 
  358         fOpenColumnRanges.at(range.fColumnId).fNElements += sealedPageIt->fNElements;
 
  361         pageInfo.
fNElements = sealedPageIt->fNElements;
 
  363         fOpenPageRanges.at(range.fColumnId).fPageInfos.emplace_back(pageInfo);
 
  370   auto nbytes = CommitClusterImpl(nEntries);
 
  373   auto nEntriesInCluster = 
ClusterSize_t(nEntries - fPrevClusterNEntries);
 
  376   for (
unsigned int i = 0; i < fOpenColumnRanges.size(); ++i) {
 
  379      std::swap(fullRange, fOpenPageRanges[i]);
 
  381                                       fOpenColumnRanges[i].fCompressionSettings, fullRange);
 
  382      fOpenColumnRanges[i].fFirstElementIndex += fOpenColumnRanges[i].fNElements;
 
  383      fOpenColumnRanges[i].fNElements = 0;
 
  385   fDescriptorBuilder.AddClusterWithDetails(clusterBuilder.
MoveDescriptor().Unwrap());
 
  386   fPrevClusterNEntries = nEntries;
 
  392   const auto &descriptor = fDescriptorBuilder.GetDescriptor();
 
  394   const auto nClusters = descriptor.GetNClusters();
 
  395   std::vector<DescriptorId_t> physClusterIDs;
 
  396   for (
auto i = fNextClusterInGroup; i < nClusters; ++i) {
 
  397      physClusterIDs.emplace_back(fSerializationContext.MapClusterId(i));
 
  402   auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
 
  404                                                    fSerializationContext);
 
  406   const auto clusterGroupId = descriptor.GetNClusterGroups();
 
  407   const auto locator = CommitClusterGroupImpl(bufPageList.get(), szPageList);
 
  410   for (
auto i = fNextClusterInGroup; i < nClusters; ++i) {
 
  413   fDescriptorBuilder.AddClusterGroup(std::move(cgBuilder));
 
  414   fSerializationContext.MapClusterGroupId(clusterGroupId);
 
  416   fNextClusterInGroup = nClusters;
 
  421   const auto &descriptor = fDescriptorBuilder.GetDescriptor();
 
  424   auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
 
  427   CommitDatasetImpl(bufFooter.get(), szFooter);
 
  434   unsigned char *pageBuf = 
reinterpret_cast<unsigned char *
>(page.
GetBuffer());
 
  435   bool isAdoptedBuffer = 
true;
 
  440      pageBuf = 
new unsigned char[packedBytes];
 
  441      isAdoptedBuffer = 
false;
 
  444   auto zippedBytes = packedBytes;
 
  446   if ((compressionSetting != 0) || !element.
IsMappable()) {
 
  448      if (!isAdoptedBuffer)
 
  450      pageBuf = 
reinterpret_cast<unsigned char *
>(buf);
 
  451      isAdoptedBuffer = 
true;
 
  464   return SealPage(page, element, compressionSetting, fCompressor->GetZipBuffer());
 
  470   fCounters = std::unique_ptr<RCounters>(
new RCounters{
 
  471      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"nPageCommitted", 
"", 
"number of pages committed to storage"),
 
  472      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"szWritePayload", 
"B", 
"volume written for committed pages"),
 
  474      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallWrite", 
"ns", 
"wall clock time spent writing"),
 
  475      *fMetrics.MakeCounter<
RNTupleAtomicCounter*>(
"timeWallZip", 
"ns", 
"wall clock time spent compressing"),
 
  478                                                                        "CPU time spent compressing")
 
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
 
An in-memory subset of the packed and compressed pages of a cluster.
 
virtual void Pack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, packing creates an on-disk page from an in-...
 
virtual bool IsMappable() const
Derived, typed classes tell whether the on-storage layout is bitwise identical to the memory layout.
 
virtual void Unpack(void *destination, void *source, std::size_t count) const
If the on-storage layout and the in-memory layout differ, unpacking creates a memory page from an on-...
 
std::size_t GetPackedSize(std::size_t nElements) const
 
std::size_t GetSize() const
 
const RColumnModel & GetModel() const
 
std::uint32_t GetIndex() const
 
NTupleSize_t GetNElements() const
 
void SetOnDiskId(DescriptorId_t id)
 
A thread-safe integral performance counter.
 
A metric element that computes its floating point value from other counters.
 
std::int64_t GetValueAsInt() const override
 
size_t Zip(const void *from, size_t nbytes, int compression, Writer_t fnWriter)
Returns the size of the compressed data.
 
A collection of Counter objects with a name, a unit, and a description.
 
const RNTuplePerfCounter * GetLocalCounter(std::string_view name) const
Searches counters registered in this object only. Returns nullptr if name is not found.
 
An either thread-safe or non thread safe counter for CPU ticks.
 
void CommitDataset()
Finalize the current cluster and the entrire data set.
 
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting)
Helper for streaming a page.
 
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
 
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
 
RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options)
 
void CommitPage(ColumnHandle_t columnHandle, const RPage &page)
Write a page to the storage. The column must have been added before.
 
void CommitSealedPage(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage)
Write a preprocessed page to storage. The column must have been added before.
 
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the file name (location)
 
std::uint64_t CommitCluster(NTupleSize_t nEntries)
Finalize the current cluster and create a new one for the following data.
 
void CommitClusterGroup()
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
 
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
 
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
 
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
 
std::unique_ptr< unsigned char[]> UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element)
Helper for unstreaming a page.
 
NTupleSize_t GetNEntries()
 
void DropColumn(ColumnHandle_t columnHandle) override
Unregisters a column.
 
NTupleSize_t GetNElements(ColumnHandle_t columnHandle)
 
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override
Register a new column.
 
static std::unique_ptr< RPageSource > Create(std::string_view ntupleName, std::string_view location, const RNTupleReadOptions &options=RNTupleReadOptions())
Guess the concrete derived page source from the file name (location)
 
RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions)
 
void UnzipCluster(RCluster *cluster)
Parallel decompression and unpacking of the pages in the given cluster.
 
ColumnId_t GetColumnId(ColumnHandle_t columnHandle)
 
Common functionality of an ntuple storage for both reading and writing.
 
RPageStorage(std::string_view name)
 
A page is a slice of a column that is mapped into memory.
 
ClusterSize_t::ValueType GetNElements() const
 
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
 
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
 
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
 
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
 
A helper class for piece-wise construction of an RClusterDescriptor.
 
RResult< void > CommitColumnRange(DescriptorId_t columnId, std::uint64_t firstElementIndex, std::uint32_t compressionSettings, const RClusterDescriptor::RPageRange &pageRange)
 
RResult< RClusterDescriptor > MoveDescriptor()
Move out the full cluster descriptor including page locations.
 
A helper class for piece-wise construction of an RClusterGroupDescriptor.
 
RClusterGroupDescriptorBuilder & ClusterGroupId(DescriptorId_t clusterGroupId)
 
void AddCluster(DescriptorId_t clusterId)
 
RClusterGroupDescriptorBuilder & PageListLength(std::uint32_t pageListLength)
 
RClusterGroupDescriptorBuilder & PageListLocator(const RNTupleLocator &pageListLocator)
 
Base class for all ROOT issued exceptions.
 
static RFieldDescriptorBuilder FromField(const Detail::RFieldBase &field)
Make a new RFieldDescriptorBuilder based off a live NTuple field.
 
RResult< RFieldDescriptor > MakeDescriptor() const
Attempt to make a field descriptor.
 
RFieldDescriptorBuilder & FieldId(DescriptorId_t fieldId)
 
The RNTupleModel encapulates the schema of an ntuple.
 
std::string GetDescription() const
 
RFieldZero * GetFieldZero() const
 
Common user-tunable settings for reading ntuples.
 
Common user-tunable settings for storing ntuples.
 
bool GetUseBufferedWrite() const
 
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
 
RClusterSize ClusterSize_t
 
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
 
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
 
constexpr DescriptorId_t kInvalidDescriptorId
 
Default I/O performance counters that get registered in fMetrics.
 
Default I/O performance counters that get registered in fMetrics.
 
A sealed page contains the bytes of a page as written to storage (packed & compressed).
 
The window of element indexes of a particular column in a particular cluster.
 
std::int64_t fCompressionSettings
The usual format for ROOT compression settings (see Compression.h).
 
NTupleSize_t fFirstElementIndex
A 64bit element index.
 
ClusterSize_t fNElements
A 32bit value for the number of column elements in the cluster.