Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
8/// is welcome!
9
10/*************************************************************************
11 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
12 * All rights reserved. *
13 * *
14 * For the licensing terms see $ROOTSYS/LICENSE. *
15 * For the list of contributors see $ROOTSYS/README/CREDITS. *
16 *************************************************************************/
17
18#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleZip.hxx>
21#include <ROOT/RPageSinkBuf.hxx>
22
23#include <algorithm>
24#include <memory>
25
27{
28 for (auto &bufPage : fBufferedPages) {
29 if (!bufPage.fPage.IsNull()) {
30 fCol.fColumn->GetPageSink()->ReleasePage(bufPage.fPage);
31 }
32 }
33 fBufferedPages.clear();
34 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
35 // clean-up is required
36 fSealedPages.clear();
37}
38
40 : RPageSink(inner->GetNTupleName(), inner->GetWriteOptions()), fInnerSink(std::move(inner))
41{
42 fMetrics = Detail::RNTupleMetrics("RPageSinkBuf");
43 fCounters = std::make_unique<RCounters>(RCounters{
44 *fMetrics.MakeCounter<Detail::RNTuplePlainCounter *>("ParallelZip", "", "compressing pages in parallel"),
45 *fMetrics.MakeCounter<Detail::RNTuplePlainCounter *>("timeWallCriticalSection", "ns",
46 "wall clock time spent in critical sections"),
48 "timeCpuCriticalSection", "ns", "CPU time spent in critical section")});
49 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
50}
51
53{
54 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
55 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
56 // base class destructor is invoked.
57 WaitForAllTasks();
58}
59
62{
63 return ColumnHandle_t{fNColumns++, &column};
64}
65
66void ROOT::Experimental::Internal::RPageSinkBuf::ConnectFields(const std::vector<RFieldBase *> &fields,
67 NTupleSize_t firstEntry)
68{
69 auto connectField = [&](RFieldBase &f) {
70 // Field Zero would have id 0.
71 ++fNFields;
72 f.SetOnDiskId(fNFields);
73 CallConnectPageSinkOnField(f, *this, firstEntry); // issues in turn calls to `AddColumn()`
74 };
75 for (auto *f : fields) {
76 connectField(*f);
77 for (auto &descendant : *f) {
78 connectField(descendant);
79 }
80 }
81 fBufferedColumns.resize(fNColumns);
82}
83
85{
86 return fInnerSink->GetDescriptor();
87}
88
90{
91 ConnectFields(model.GetFieldZero().GetSubFields(), 0U);
92
93 fInnerModel = model.Clone();
94 fInnerSink->Init(*fInnerModel);
95}
96
98 NTupleSize_t firstEntry)
99{
100 ConnectFields(changeset.fAddedFields, firstEntry);
101
102 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
103 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
104 auto cloneAddField = [&](const RFieldBase *field) {
105 auto cloned = field->Clone(field->GetFieldName());
106 auto p = &(*cloned);
107 fInnerModel->AddField(std::move(cloned));
108 return p;
109 };
110 auto cloneAddProjectedField = [&](RFieldBase *field) {
111 auto cloned = field->Clone(field->GetFieldName());
112 auto p = &(*cloned);
113 auto &projectedFields = changeset.fModel.GetProjectedFields();
115 fieldMap[p] = projectedFields.GetSourceField(field);
116 auto targetIt = cloned->begin();
117 for (auto &f : *field)
118 fieldMap[&(*targetIt++)] = projectedFields.GetSourceField(&f);
119 const_cast<RNTupleModel::RProjectedFields &>(fInnerModel->GetProjectedFields()).Add(std::move(cloned), fieldMap);
120 return p;
121 };
122 RNTupleModelChangeset innerChangeset{*fInnerModel};
123 fInnerModel->Unfreeze();
124 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
125 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
126 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
127 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
128 fInnerModel->Freeze();
129 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
130}
131
133{
134 auto colId = columnHandle.fPhysicalId;
135 const auto &element = *columnHandle.fColumn->GetElement();
136
137 // Safety: References are guaranteed to be valid until the
138 // element is destroyed. In other words, all buffered page elements are
139 // valid until the return value of DrainBufferedPages() goes out of scope in
140 // CommitCluster().
141 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
142 zipItem.AllocateSealedPageBuf(page.GetNBytes());
143 R__ASSERT(zipItem.fBuf);
144 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
145
146 if (!fTaskScheduler) {
147 // Seal the page right now, avoiding the allocation and copy, but making sure that the page buffer is not aliased.
148 sealedPage =
149 SealPage(page, element, GetWriteOptions().GetCompression(), zipItem.fBuf.get(), /*allowAlias=*/false);
150 zipItem.fSealedPage = &sealedPage;
151 return;
152 }
153
154 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
155 zipItem.fPage = ReservePage(columnHandle, page.GetNElements());
156 // make sure the page is aware of how many elements it will have
157 zipItem.fPage.GrowUnchecked(page.GetNElements());
158 memcpy(zipItem.fPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
159
160 fCounters->fParallelZip.SetValue(1);
161 // Thread safety: Each thread works on a distinct zipItem which owns its
162 // compression buffer.
163 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, &element] {
164 sealedPage = SealPage(zipItem.fPage, element, GetWriteOptions().GetCompression(), zipItem.fBuf.get());
165 zipItem.fSealedPage = &sealedPage;
166 });
167}
168
170 const RSealedPage & /*sealedPage*/)
171{
172 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
173}
174
175void ROOT::Experimental::Internal::RPageSinkBuf::CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> /*ranges*/)
176{
177 throw RException(R__FAIL("should never commit sealed pages to RPageSinkBuf"));
178}
179
181{
182 WaitForAllTasks();
183
184 std::vector<RSealedPageGroup> toCommit;
185 toCommit.reserve(fBufferedColumns.size());
186 for (auto &bufColumn : fBufferedColumns) {
187 R__ASSERT(bufColumn.HasSealedPagesOnly());
188 const auto &sealedPages = bufColumn.GetSealedPages();
189 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
190 }
191
192 std::uint64_t nbytes;
193 {
194 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
195 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
196 fInnerSink->CommitSealedPageV(toCommit);
197
198 nbytes = fInnerSink->CommitCluster(nNewEntries);
199 }
200
201 for (auto &bufColumn : fBufferedColumns)
202 bufColumn.DropBufferedPages();
203 return nbytes;
204}
205
207{
208 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
209 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
210 fInnerSink->CommitClusterGroup();
211}
212
214{
215 RPageSink::RSinkGuard g(fInnerSink->GetSinkGuard());
216 Detail::RNTuplePlainTimer timer(fCounters->fTimeWallCriticalSection, fCounters->fTimeCpuCriticalSection);
217 fInnerSink->CommitDataset();
218}
219
222{
223 return fInnerSink->ReservePage(columnHandle, nElements);
224}
225
227{
228 fInnerSink->ReleasePage(page);
229}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define f(i)
Definition RSha256.hxx:104
#define g(i)
Definition RSha256.hxx:105
#define R__ASSERT(e)
Definition TError.h:118
winID h TVirtualViewer3D TVirtualGLPainter p
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
RColumnElementBase * GetElement() const
Definition RColumn.hxx:325
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
void CommitDataset() final
Finalize the current cluster and the entrire data set.
std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
const RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
std::unique_ptr< RCounters > fCounters
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
void InitImpl(RNTupleModel &model) final
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void ConnectFields(const std::vector< RFieldBase * > &fields, NTupleSize_t firstEntry)
void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
const std::string & GetNTupleName() const
Returns the NTuple name.
virtual void ReleasePage(RPage &page)=0
Every page store needs to be able to free pages it handed out.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:83
std::uint32_t GetNElements() const
Definition RPage.hxx:84
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
A field translates read and write calls from/to underlying columns to/from tree values.
Definition RField.hxx:94
std::vector< RFieldBase * > GetSubFields()
Definition RField.cxx:873
RSchemaIterator begin()
Definition RField.hxx:700
The on-storage meta-data of an ntuple.
Projected fields are fields whose columns are reused from existing fields.
std::unordered_map< const RFieldBase *, const RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
const RFieldBase * GetSourceField(const RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
const RProjectedFields & GetProjectedFields() const
RFieldZero & GetFieldZero()
Non-const access to the root field is used to commit clusters during writing and to set the on-disk f...
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition TObject.cxx:223
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
Definition RField.cxx:358
void Add(RHist< DIMENSIONS, PRECISION, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION, STAT_FROM... > &from)
Add two histograms.
Definition RHist.hxx:342
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).