25   : 
RPageSink(inner->GetNTupleName(), inner->GetWriteOptions())
 
   26   , fMetrics(
"RPageSinkBuf")
 
   27   , fInnerSink(std::move(inner))
 
   31         "compressing pages in parallel")
 
   40   fBufferedColumns.resize(fDescriptorBuilder.GetDescriptor().GetNColumns());
 
   41   fInnerModel = model.
Clone();
 
   42   fInnerSink->Create(*fInnerModel);
 
   58      fBufferedColumns.at(columnHandle.
fId).BufferPage(columnHandle, bufPage);
 
   59   if (!fTaskScheduler) {
 
   62   fCounters->fParallelZip.SetValue(1);
 
   65   zipItem->AllocateSealedPageBuf();
 
   67   auto sealedPage = fBufferedColumns.at(columnHandle.
fId).RegisterSealedPage();
 
   68   fTaskScheduler->AddTask([
this, zipItem, sealedPage, colId = columnHandle.
fId] {
 
   69      *sealedPage = SealPage(zipItem->fPage, *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
 
   70                             GetWriteOptions().GetCompression(), zipItem->fBuf.get());
 
   71      zipItem->fSealedPage = &(*sealedPage);
 
   83   fInnerSink->CommitSealedPage(columnId, sealedPage);
 
   93      fTaskScheduler->Wait();
 
   94      fTaskScheduler->Reset();
 
   98   bool singleCommitCall = std::all_of(fBufferedColumns.begin(), fBufferedColumns.end(),
 
   99                                       [](
auto &bufColumn) { return bufColumn.HasSealedPagesOnly(); });
 
  100   if (singleCommitCall) {
 
  101      std::vector<RSealedPageGroup> toCommit;
 
  102      toCommit.reserve(fBufferedColumns.size());
 
  103      for (
auto &bufColumn : fBufferedColumns) {
 
  104         const auto &sealedPages = bufColumn.GetSealedPages();
 
  105         toCommit.emplace_back(bufColumn.GetHandle().fId, sealedPages.cbegin(), sealedPages.cend());
 
  107      fInnerSink->CommitSealedPageV(toCommit);
 
  109      for (
auto &bufColumn : fBufferedColumns) {
 
  110         auto drained = bufColumn.DrainBufferedPages();
 
  111         for (
auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained))
 
  112            ReleasePage(bufPage.fPage);
 
  114      return fInnerSink->CommitCluster(nEntries);
 
  118   for (
auto &bufColumn : fBufferedColumns) {
 
  122      if (bufColumn.HasSealedPagesOnly())
 
  127      auto drained = bufColumn.DrainBufferedPages();
 
  128      for (
auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained)) {
 
  129         if (bufPage.IsSealed()) {
 
  130            fInnerSink->CommitSealedPage(bufColumn.GetHandle().fId, *bufPage.fSealedPage);
 
  132            fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
 
  134         ReleasePage(bufPage.fPage);
 
  137   return fInnerSink->CommitCluster(nEntries);
 
  144   fInnerSink->CommitClusterGroup();
 
  152   fInnerSink->CommitDataset();
 
  158   return fInnerSink->ReservePage(columnHandle, nElements);
 
  163   fInnerSink->ReleasePage(page);
 
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
 
void ObserveMetrics(RNTupleMetrics &observee)
 
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
 
A non thread-safe integral performance counter.
 
std::deque< RPageZipItem >::iterator iterator
 
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RSealedPage &sealedPage) final
 
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
 
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
 
std::unique_ptr< RCounters > fCounters
 
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
 
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
 
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
 
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
 
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
 
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
 
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
 
Abstract interface to write data into an ntuple.
 
A page is a slice of a column that is mapped into memory.
 
ClusterSize_t::ValueType GetNElements() const
 
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
 
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
 
Base class for all ROOT issued exceptions.
 
The RNTupleModel encapulates the schema of an ntuple.
 
std::unique_ptr< RNTupleModel > Clone() const
 
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
 
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
 
I/O performance counters that get registered in fMetrics.
 
A sealed page contains the bytes of a page as written to storage (packed & compressed).
 
Generic information about the physical location of data.