Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.hxx
Go to the documentation of this file.
1/// \file ROOT/RPageSinkBuf.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
8/// is welcome!
9
10/*************************************************************************
11 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
12 * All rights reserved. *
13 * *
14 * For the licensing terms see $ROOTSYS/LICENSE. *
15 * For the list of contributors see $ROOTSYS/README/CREDITS. *
16 *************************************************************************/
17
18#ifndef ROOT7_RPageSinkBuf
19#define ROOT7_RPageSinkBuf
20
22#include <ROOT/RPageStorage.hxx>
23
24#include <deque>
25#include <iterator>
26#include <memory>
27#include <tuple>
28
29namespace ROOT {
30namespace Experimental {
31namespace Internal {
32
33// clang-format off
34/**
35\class ROOT::Experimental::Internal::RPageSinkBuf
36\ingroup NTuple
37\brief Wrapper sink that coalesces cluster column page writes
38*
39* TODO(jblomer): The interplay of derived class and RPageSink is not yet optimally designed for page storage wrapper
40* classes like this one. Header and footer serialization, e.g., are done twice. To be revised.
41*/
42// clang-format on
43class RPageSinkBuf : public RPageSink {
44private:
45 /// A buffered column. The column is not responsible for RPage memory management (i.e.
46 /// ReservePage/ReleasePage), which is handled by the enclosing RPageSinkBuf.
47 class RColumnBuf {
48 public:
49 struct RPageZipItem {
51 // Compression scratch buffer for fSealedPage.
52 std::unique_ptr<unsigned char[]> fBuf;
54 bool IsSealed() const { return fSealedPage != nullptr; }
55 void AllocateSealedPageBuf(std::size_t nBytes)
56 {
57 fBuf = std::unique_ptr<unsigned char[]>(new unsigned char[nBytes]);
58 }
59 };
60 public:
61 RColumnBuf() = default;
62 RColumnBuf(const RColumnBuf&) = delete;
63 RColumnBuf& operator=(const RColumnBuf&) = delete;
64 RColumnBuf(RColumnBuf&&) = default;
67
68 /// Returns a reference to the newly buffered page. The reference remains
69 /// valid until the return value of DrainBufferedPages() is destroyed.
71 {
72 if (!fCol) {
73 fCol = columnHandle;
74 }
75 // Safety: Insertion at the end of a deque never invalidates references
76 // to existing elements.
77 return fBufferedPages.emplace_back();
78 }
79 const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
80 bool IsEmpty() const { return fBufferedPages.empty(); }
81 bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
83
84 using BufferedPages_t = std::tuple<std::deque<RPageZipItem>, RPageStorage::SealedPageSequence_t>;
85 /// When the return value of DrainBufferedPages() is destroyed, all references
86 /// returned by GetBuffer are invalidated.
87 /// This function gives up on the ownership of the buffered pages. Thus, `ReleasePage()` must be called
88 /// accordingly.
90 {
91 BufferedPages_t drained;
92 std::swap(fBufferedPages, std::get<decltype(fBufferedPages)>(drained));
93 std::swap(fSealedPages, std::get<decltype(fSealedPages)>(drained));
94 return drained;
95 }
96 void DropBufferedPages();
97
98 // The returned reference points to a default-constructed RSealedPage. It can be used
99 // to fill in data after sealing.
101 {
102 return fSealedPages.emplace_back();
103 }
104
105 private:
107 /// Using a deque guarantees that element iterators are never invalidated
108 /// by appends to the end of the iterator by BufferPage.
109 std::deque<RPageZipItem> fBufferedPages;
110 /// Pages that have been already sealed by a concurrent task. A vector commit can be issued if all
111 /// buffered pages have been sealed.
112 /// Note that each RSealedPage refers to the same buffer as `fBufferedPages[i].fBuf` for some value of `i`, and
113 /// thus owned by RPageZipItem
115 };
116
117private:
118 /// I/O performance counters that get registered in fMetrics
119 struct RCounters {
123 };
124 std::unique_ptr<RCounters> fCounters;
125 /// The inner sink, responsible for actually performing I/O.
126 std::unique_ptr<RPageSink> fInnerSink;
127 /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
128 /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
129 std::unique_ptr<RNTupleModel> fInnerModel;
130 /// Vector of buffered column pages. Indexed by column id.
131 std::vector<RColumnBuf> fBufferedColumns;
134
135 void ConnectFields(const std::vector<RFieldBase *> &fields, NTupleSize_t firstEntry);
136
137public:
138 explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
139 RPageSinkBuf(const RPageSinkBuf&) = delete;
143 ~RPageSinkBuf() override;
144
145 ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final;
146
147 void Init(RNTupleModel &model) final;
148 void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final;
149
150 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
151 void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final;
152 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
153 std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final;
154 void CommitClusterGroup() final;
155 void CommitDataset() final;
156
157 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
158 void ReleasePage(RPage &page) final;
159}; // RPageSinkBuf
160
161} // namespace Internal
162} // namespace Experimental
163} // namespace ROOT
164
165#endif
A non thread-safe integral performance counter.
An either thread-safe or non thread safe counter for CPU ticks.
std::tuple< std::deque< RPageZipItem >, RPageStorage::SealedPageSequence_t > BufferedPages_t
RPageZipItem & BufferPage(RPageStorage::ColumnHandle_t columnHandle)
Returns a reference to the newly buffered page.
RColumnBuf & operator=(const RColumnBuf &)=delete
const RPageStorage::ColumnHandle_t & GetHandle() const
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
BufferedPages_t DrainBufferedPages()
When the return value of DrainBufferedPages() is destroyed, all references returned by GetBuffer are ...
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RColumnBuf & operator=(RColumnBuf &&)=default
const RPageStorage::SealedPageSequence_t & GetSealedPages() const
Wrapper sink that coalesces cluster column page writes.
RPageSinkBuf & operator=(RPageSinkBuf &&)=default
void CommitDataset() final
Finalize the current cluster and the entrire data set.
std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
RPageSinkBuf(const RPageSinkBuf &)=delete
std::vector< RColumnBuf > fBufferedColumns
Vector of buffered column pages. Indexed by column id.
void Init(RNTupleModel &model) final
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
std::unique_ptr< RNTupleModel > fInnerModel
The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
std::unique_ptr< RCounters > fCounters
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void ConnectFields(const std::vector< RFieldBase * > &fields, NTupleSize_t firstEntry)
void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
RPageSinkBuf & operator=(const RPageSinkBuf &)=delete
Abstract interface to write data into an ntuple.
std::deque< RSealedPage > SealedPageSequence_t
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
The RNTupleModel encapulates the schema of an ntuple.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
The incremental changes to a RNTupleModel
I/O performance counters that get registered in fMetrics.
Detail::RNTupleTickCounter< Detail::RNTuplePlainCounter > & fTimeCpuCriticalSection
A sealed page contains the bytes of a page as written to storage (packed & compressed).