Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RPageSinkBuf.hxx
Go to the documentation of this file.
1/// \file ROOT/RPageSinkBuf.hxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \author Max Orok <maxwellorok@gmail.com>
4/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
5/// \date 2021-03-17
6
7/*************************************************************************
8 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#ifndef ROOT_RPageSinkBuf
16#define ROOT_RPageSinkBuf
17
19#include <ROOT/RPageStorage.hxx>
20
21#include <atomic>
22#include <cstddef>
23#include <deque>
24#include <functional>
25#include <iterator>
26#include <memory>
27#include <tuple>
28
29namespace ROOT {
30namespace Internal {
31
32// clang-format off
33/**
34\class ROOT::Internal::RPageSinkBuf
35\ingroup NTuple
36\brief Wrapper sink that coalesces cluster column page writes
37*/
38// clang-format on
39class RPageSinkBuf : public RPageSink {
40private:
41 /// A buffered column. The column is not responsible for RPage memory management (i.e. ReservePage),
42 /// which is handled by the enclosing RPageSinkBuf.
43 class RColumnBuf {
44 public:
45 struct RPageZipItem {
47 // Compression scratch buffer for fSealedPage.
48 std::unique_ptr<unsigned char[]> fBuf;
50 bool IsSealed() const { return fSealedPage != nullptr; }
51 };
52 public:
53 RColumnBuf() = default;
54 RColumnBuf(const RColumnBuf&) = delete;
55 RColumnBuf& operator=(const RColumnBuf&) = delete;
56 RColumnBuf(RColumnBuf&&) = default;
59
60 /// Returns a reference to the newly buffered page. The reference remains
61 /// valid until DropBufferedPages().
63 {
64 if (!fCol) {
65 fCol = columnHandle;
66 }
67 // Safety: Insertion at the end of a deque never invalidates references
68 // to existing elements.
69 return fBufferedPages.emplace_back();
70 }
71 const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
72 bool IsEmpty() const { return fBufferedPages.empty(); }
73 bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
75
76 void DropBufferedPages();
77
78 // The returned reference points to a default-constructed RSealedPage. It can be used
79 // to fill in data after sealing.
81 {
82 return fSealedPages.emplace_back();
83 }
84
85 private:
87 /// Using a deque guarantees that element iterators are never invalidated
88 /// by appends to the end of the iterator by BufferPage.
89 std::deque<RPageZipItem> fBufferedPages;
90 /// Pages that have been already sealed by a concurrent task. A vector commit can be issued if all
91 /// buffered pages have been sealed.
92 /// Note that each RSealedPage refers to the same buffer as `fBufferedPages[i].fBuf` for some value of `i`, and
93 /// thus owned by RPageZipItem
95 };
96
97private:
98 /// I/O performance counters that get registered in fMetrics
107 std::unique_ptr<RCounters> fCounters;
108 /// The inner sink, responsible for actually performing I/O.
109 std::unique_ptr<RPageSink> fInnerSink;
110 /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
111 /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
112 std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
113 /// The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
114 std::atomic<std::size_t> fBufferedUncompressed = 0;
115 /// Vector of buffered column pages. Indexed by column id.
116 std::vector<RColumnBuf> fBufferedColumns;
117 /// Columns committed as suppressed are stored and passed to the inner sink at cluster commit
118 std::vector<ColumnHandle_t> fSuppressedColumns;
121
122 void ConnectFields(const std::vector<ROOT::RFieldBase *> &fields, ROOT::NTupleSize_t firstEntry);
123 void FlushClusterImpl(std::function<void(void)> FlushClusterFn);
124
125public:
126 explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
127 RPageSinkBuf(const RPageSinkBuf&) = delete;
131 ~RPageSinkBuf() override;
132
134
135 const ROOT::RNTupleDescriptor &GetDescriptor() const final;
136
137 ROOT::NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
138
139 void InitImpl(ROOT::RNTupleModel &model) final;
140 void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final;
141 void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final;
142
143 void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
144 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
145 void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final;
146 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
147 std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final;
148 RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
149 void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
150 void CommitClusterGroup() final;
152 void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final;
153
154 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
155
156 std::unique_ptr<RPageSink> CloneAsHidden(std::string_view name, const RNTupleWriteOptions &opts) const final;
157}; // RPageSinkBuf
158
159} // namespace Internal
160} // namespace ROOT
161
162#endif
char name[80]
Definition TGX11.cxx:148
A thread-safe integral performance counter.
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:37
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RColumnBuf & operator=(const RColumnBuf &)=delete
RColumnBuf & operator=(RColumnBuf &&)=default
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPageZipItem & BufferPage(RPageStorage::ColumnHandle_t columnHandle)
Returns a reference to the newly buffered page.
RColumnBuf(const RColumnBuf &)=delete
const RPageStorage::SealedPageSequence_t & GetSealedPages() const
RPageStorage::ColumnHandle_t fCol
const RPageStorage::ColumnHandle_t & GetHandle() const
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
std::unique_ptr< RPageSink > CloneAsHidden(std::string_view name, const RNTupleWriteOptions &opts) const final
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::vector< ColumnHandle_t > fSuppressedColumns
Columns committed as suppressed are stored and passed to the inner sink at cluster commit.
std::unique_ptr< RCounters > fCounters
ROOT::DescriptorId_t fNColumns
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
std::atomic< std::size_t > fBufferedUncompressed
The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
RPageSinkBuf & operator=(RPageSinkBuf &&)=delete
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
ROOT::NTupleSize_t GetNEntries() const final
ROOT::DescriptorId_t fNFields
void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final
Adds the given anchor information (name + locator) into the main RNTuple's descriptor as an attribute...
RNTupleLink CommitDatasetImpl() final
RPageSinkBuf(const RPageSinkBuf &)=delete
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
std::unique_ptr< ROOT::RNTupleModel > fInnerModel
The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
std::vector< RColumnBuf > fBufferedColumns
Vector of buffered column pages. Indexed by column id.
RPageSinkBuf(RPageSinkBuf &&)=delete
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
RPageSinkBuf & operator=(const RPageSinkBuf &)=delete
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
Abstract interface to write data into an ntuple.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
std::deque< RSealedPage > SealedPageSequence_t
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
Field specific extra type information from the header / extenstion header.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
STL class.
STL class.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel.
I/O performance counters that get registered in fMetrics.
ROOT::Experimental::Detail::RNTupleTickCounter< ROOT::Experimental::Detail::RNTupleAtomicCounter > & fTimeCpuZip
ROOT::Experimental::Detail::RNTupleTickCounter< ROOT::Experimental::Detail::RNTuplePlainCounter > & fTimeCpuCriticalSection
ROOT::Experimental::Detail::RNTupleAtomicCounter & fTimeWallZip
ROOT::Experimental::Detail::RNTuplePlainCounter & fParallelZip
ROOT::Experimental::Detail::RNTuplePlainCounter & fTimeWallCriticalSection
A sealed page contains the bytes of a page as written to storage (packed & compressed).