Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.hxx
Go to the documentation of this file.
1/// \file ROOT/RPageSinkBuf.hxx
2/// \ingroup NTuple
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT_RPageSinkBuf
17#define ROOT_RPageSinkBuf
18
20#include <ROOT/RPageStorage.hxx>
21
22#include <atomic>
23#include <cstddef>
24#include <deque>
25#include <functional>
26#include <iterator>
27#include <memory>
28#include <tuple>
29
30namespace ROOT {
31namespace Internal {
32
33// clang-format off
34/**
35\class ROOT::Internal::RPageSinkBuf
36\ingroup NTuple
37\brief Wrapper sink that coalesces cluster column page writes
38*/
39// clang-format on
40class RPageSinkBuf : public RPageSink {
41private:
42 /// A buffered column. The column is not responsible for RPage memory management (i.e. ReservePage),
43 /// which is handled by the enclosing RPageSinkBuf.
44 class RColumnBuf {
45 public:
46 struct RPageZipItem {
48 // Compression scratch buffer for fSealedPage.
49 std::unique_ptr<unsigned char[]> fBuf;
51 bool IsSealed() const { return fSealedPage != nullptr; }
52 };
53 public:
54 RColumnBuf() = default;
55 RColumnBuf(const RColumnBuf&) = delete;
56 RColumnBuf& operator=(const RColumnBuf&) = delete;
57 RColumnBuf(RColumnBuf&&) = default;
60
61 /// Returns a reference to the newly buffered page. The reference remains
62 /// valid until DropBufferedPages().
64 {
65 if (!fCol) {
67 }
68 // Safety: Insertion at the end of a deque never invalidates references
69 // to existing elements.
70 return fBufferedPages.emplace_back();
71 }
72 const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
73 bool IsEmpty() const { return fBufferedPages.empty(); }
74 bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
76
77 void DropBufferedPages();
78
79 // The returned reference points to a default-constructed RSealedPage. It can be used
80 // to fill in data after sealing.
82 {
83 return fSealedPages.emplace_back();
84 }
85
86 private:
88 /// Using a deque guarantees that element iterators are never invalidated
89 /// by appends to the end of the iterator by BufferPage.
90 std::deque<RPageZipItem> fBufferedPages;
91 /// Pages that have been already sealed by a concurrent task. A vector commit can be issued if all
92 /// buffered pages have been sealed.
93 /// Note that each RSealedPage refers to the same buffer as `fBufferedPages[i].fBuf` for some value of `i`, and
94 /// thus owned by RPageZipItem
96 };
97
98private:
99 /// I/O performance counters that get registered in fMetrics
108 std::unique_ptr<RCounters> fCounters;
109 /// The inner sink, responsible for actually performing I/O.
110 std::unique_ptr<RPageSink> fInnerSink;
111 /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
112 /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
113 std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
114 /// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
115 std::atomic<std::size_t> fBufferedUncompressed = 0;
116 /// Vector of buffered column pages. Indexed by column id.
117 std::vector<RColumnBuf> fBufferedColumns;
118 /// Columns committed as suppressed are stored and passed to the inner sink at cluster commit
119 std::vector<ColumnHandle_t> fSuppressedColumns;
122
123 void ConnectFields(const std::vector<ROOT::RFieldBase *> &fields, ROOT::NTupleSize_t firstEntry);
124 void FlushClusterImpl(std::function<void(void)> FlushClusterFn);
125
126public:
127 explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
128 RPageSinkBuf(const RPageSinkBuf&) = delete;
132 ~RPageSinkBuf() override;
133
135
137
138 ROOT::NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
139
140 void InitImpl(ROOT::RNTupleModel &model) final;
143
145 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
147 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
148 std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final;
149 RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
150 void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
153
155}; // RPageSinkBuf
156
157} // namespace Internal
158} // namespace ROOT
159
160#endif
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
A thread-safe integral performance counter.
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:38
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RColumnBuf & operator=(const RColumnBuf &)=delete
RColumnBuf & operator=(RColumnBuf &&)=default
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPageZipItem & BufferPage(RPageStorage::ColumnHandle_t columnHandle)
Returns a reference to the newly buffered page.
RColumnBuf(const RColumnBuf &)=delete
const RPageStorage::SealedPageSequence_t & GetSealedPages() const
RPageStorage::ColumnHandle_t fCol
const RPageStorage::ColumnHandle_t & GetHandle() const
Wrapper sink that coalesces cluster column page writes.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::vector< ColumnHandle_t > fSuppressedColumns
Columns committed as suppressed are stored and passed to the inner sink at cluster commit.
std::unique_ptr< RCounters > fCounters
ROOT::DescriptorId_t fNColumns
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
std::atomic< std::size_t > fBufferedUncompressed
The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
RPageSinkBuf & operator=(RPageSinkBuf &&)=delete
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
ROOT::NTupleSize_t GetNEntries() const final
ROOT::DescriptorId_t fNFields
RPageSinkBuf(const RPageSinkBuf &)=delete
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
std::unique_ptr< ROOT::RNTupleModel > fInnerModel
The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
std::vector< RColumnBuf > fBufferedColumns
Vector of buffered column pages. Indexed by column id.
RPageSinkBuf(RPageSinkBuf &&)=delete
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
RPageSinkBuf & operator=(const RPageSinkBuf &)=delete
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
Abstract interface to write data into an ntuple.
std::deque< RSealedPage > SealedPageSequence_t
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
Field specific extra type information from the header / extenstion header.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
ROOT::Experimental::Detail::RNTupleTickCounter< ROOT::Experimental::Detail::RNTupleAtomicCounter > & fTimeCpuZip
ROOT::Experimental::Detail::RNTupleTickCounter< ROOT::Experimental::Detail::RNTuplePlainCounter > & fTimeCpuCriticalSection
ROOT::Experimental::Detail::RNTupleAtomicCounter & fTimeWallZip
ROOT::Experimental::Detail::RNTuplePlainCounter & fParallelZip
ROOT::Experimental::Detail::RNTuplePlainCounter & fTimeWallCriticalSection
A sealed page contains the bytes of a page as written to storage (packed & compressed).