Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageSinkBuf.cxx
Go to the documentation of this file.
1/// \file RPageSinkBuf.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \author Max Orok <maxwellorok@gmail.com>
5/// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
6/// \date 2021-03-17
7/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
8/// is welcome!
9
10/*************************************************************************
11 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
12 * All rights reserved. *
13 * *
14 * For the licensing terms see $ROOTSYS/LICENSE. *
15 * For the list of contributors see $ROOTSYS/README/CREDITS. *
16 *************************************************************************/
17
19#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleZip.hxx>
21#include <ROOT/RPageSinkBuf.hxx>
22
23#include <algorithm>
24
26{
27 for (auto &bufPage : fBufferedPages) {
28 fCol.fColumn->GetPageSink()->ReleasePage(bufPage.fPage);
29 }
30 fBufferedPages.clear();
31 // Each RSealedPage points to the same region as `fBuf` for some element in `fBufferedPages`; thus, no further
32 // clean-up is required
33 fSealedPages.clear();
34}
35
37 : RPageSink(inner->GetNTupleName(), inner->GetWriteOptions())
38 , fMetrics("RPageSinkBuf")
39 , fInnerSink(std::move(inner))
40{
41 fCounters = std::unique_ptr<RCounters>(new RCounters{
42 *fMetrics.MakeCounter<RNTuplePlainCounter*>("ParallelZip", "",
43 "compressing pages in parallel")
44 });
45 fMetrics.ObserveMetrics(fInnerSink->GetMetrics());
46}
47
49{
50 // Wait for unterminated tasks, if any, as they may still hold a reference to `this`.
51 // This cannot be moved to the base class destructor, given non-static members have been destroyed by the time the
52 // base class destructor is invoked.
53 WaitForAllTasks();
54}
55
57 unsigned char * /* serializedHeader */,
58 std::uint32_t /* length */)
59{
60 fInnerModel = model.Clone();
61 fInnerSink->Create(*fInnerModel);
62}
63
65 NTupleSize_t firstEntry)
66{
67 RPageSink::UpdateSchema(changeset, firstEntry);
68 bool isIncremental = !fBufferedColumns.empty();
69 fBufferedColumns.resize(fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns());
70 if (!isIncremental)
71 return;
72
73 // The buffered page sink maintains a copy of the RNTupleModel for the inner sink; replicate the changes there
74 // TODO(jalopezg): we should be able, in general, to simplify the buffered sink.
75 auto cloneAddField = [&](const RFieldBase *field) {
76 auto cloned = field->Clone(field->GetName());
77 auto p = &(*cloned);
78 fInnerModel->AddField(std::move(cloned));
79 return p;
80 };
81 auto cloneAddProjectedField = [&](RFieldBase *field) {
82 auto cloned = field->Clone(field->GetName());
83 auto p = &(*cloned);
84 auto &projectedFields = changeset.fModel.GetProjectedFields();
86 fieldMap[p] = projectedFields.GetSourceField(field);
87 auto targetIt = cloned->begin();
88 for (auto &f : *field)
89 fieldMap[&(*targetIt++)] = projectedFields.GetSourceField(&f);
90 const_cast<RNTupleModel::RProjectedFields &>(fInnerModel->GetProjectedFields()).Add(std::move(cloned), fieldMap);
91 return p;
92 };
93 RNTupleModelChangeset innerChangeset{*fInnerModel};
94 fInnerModel->Unfreeze();
95 std::transform(changeset.fAddedFields.cbegin(), changeset.fAddedFields.cend(),
96 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
97 std::transform(changeset.fAddedProjectedFields.cbegin(), changeset.fAddedProjectedFields.cend(),
98 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
99 fInnerModel->Freeze();
100 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
101}
102
105{
106 // TODO avoid frequent (de)allocations by holding on to allocated buffers in RColumnBuf
107 RPage bufPage = ReservePage(columnHandle, page.GetNElements());
108 // make sure the page is aware of how many elements it will have
109 bufPage.GrowUnchecked(page.GetNElements());
110 memcpy(bufPage.GetBuffer(), page.GetBuffer(), page.GetNBytes());
111 // Safety: References are guaranteed to be valid until the
112 // element is destroyed. In other words, all buffered page elements are
113 // valid until the return value of DrainBufferedPages() goes out of scope in
114 // CommitCluster().
115 auto &zipItem = fBufferedColumns.at(columnHandle.fPhysicalId).BufferPage(columnHandle, bufPage);
116 if (!fTaskScheduler) {
117 return RNTupleLocator{};
118 }
119 fCounters->fParallelZip.SetValue(1);
120 // Thread safety: Each thread works on a distinct zipItem which owns its
121 // compression buffer.
122 zipItem.AllocateSealedPageBuf();
123 R__ASSERT(zipItem.fBuf);
124 auto &sealedPage = fBufferedColumns.at(columnHandle.fPhysicalId).RegisterSealedPage();
125 fTaskScheduler->AddTask([this, &zipItem, &sealedPage, colId = columnHandle.fPhysicalId] {
126 sealedPage = SealPage(zipItem.fPage, *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
127 GetWriteOptions().GetCompression(), zipItem.fBuf.get());
128 zipItem.fSealedPage = &sealedPage;
129 });
130
131 // we're feeding bad locators to fOpenPageRanges but it should not matter
132 // because they never get written out
133 return RNTupleLocator{};
134}
135
138 const RSealedPage &sealedPage)
139{
140 fInnerSink->CommitSealedPage(physicalColumnId, sealedPage);
141 // we're feeding bad locators to fOpenPageRanges but it should not matter
142 // because they never get written out
143 return RNTupleLocator{};
144}
145
146std::uint64_t
148{
149 WaitForAllTasks();
150
151 // If we have only sealed pages in all buffered columns, commit them in a single `CommitSealedPageV()` call
152 bool singleCommitCall = std::all_of(fBufferedColumns.begin(), fBufferedColumns.end(),
153 [](auto &bufColumn) { return bufColumn.HasSealedPagesOnly(); });
154 if (singleCommitCall) {
155 std::vector<RSealedPageGroup> toCommit;
156 toCommit.reserve(fBufferedColumns.size());
157 for (auto &bufColumn : fBufferedColumns) {
158 const auto &sealedPages = bufColumn.GetSealedPages();
159 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
160 }
161 fInnerSink->CommitSealedPageV(toCommit);
162
163 for (auto &bufColumn : fBufferedColumns)
164 bufColumn.DropBufferedPages();
165 return fInnerSink->CommitCluster(nEntries);
166 }
167
168 // Otherwise, try to do it per column
169 for (auto &bufColumn : fBufferedColumns) {
170 // In practice, either all (see above) or none of the buffered pages have been sealed, depending on whether
171 // a task scheduler is available. The rare condition of a few columns consisting only of sealed pages should
172 // not happen unless the API is misused.
173 if (!bufColumn.IsEmpty() && bufColumn.HasSealedPagesOnly())
174 throw RException(R__FAIL("only a few columns have all pages sealed"));
175
176 // Slow path: if the buffered column contains both sealed and unsealed pages, commit them one by one.
177 // TODO(jalopezg): coalesce contiguous sealed pages and commit via `CommitSealedPageV()`.
178 auto drained = bufColumn.DrainBufferedPages();
179 for (auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained)) {
180 if (bufPage.IsSealed()) {
181 fInnerSink->CommitSealedPage(bufColumn.GetHandle().fPhysicalId, *bufPage.fSealedPage);
182 } else {
183 fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
184 }
185 ReleasePage(bufPage.fPage);
186 }
187 }
188 return fInnerSink->CommitCluster(nEntries);
189}
190
193 std::uint32_t /* length */)
194{
195 fInnerSink->CommitClusterGroup();
196 // We're not using that locator any further, so it is safe to return a dummy one
197 return RNTupleLocator{};
198}
199
200void ROOT::Experimental::Detail::RPageSinkBuf::CommitDatasetImpl(unsigned char * /* serializedFooter */,
201 std::uint32_t /* length */)
202{
203 fInnerSink->CommitDataset();
204}
205
208{
209 return fInnerSink->ReservePage(columnHandle, nElements);
210}
211
213{
214 fInnerSink->ReleasePage(page);
215}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:303
#define f(i)
Definition RSha256.hxx:104
#define R__ASSERT(e)
Definition TError.h:118
winID h TVirtualViewer3D TVirtualGLPainter p
RPageSink * GetPageSink() const
Definition RColumn.hxx:328
A field translates read and write calls from/to underlying columns to/from tree values.
Definition RField.hxx:83
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RCounters > fCounters
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
Abstract interface to write data into an ntuple.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)
Incorporate incremental changes to the model into the ntuple descriptor.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
const std::string & GetNTupleName() const
Returns the NTuple name.
virtual void ReleasePage(RPage &page)=0
Every page store needs to be able to free pages it handed out.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:42
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:84
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition RPage.hxx:112
std::uint32_t GetNElements() const
Definition RPage.hxx:86
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Projected fields are fields whose columns are reused from existing fields.
std::unordered_map< const Detail::RFieldBase *, const Detail::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
const Detail::RFieldBase * GetSourceField(const Detail::RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
const RProjectedFields & GetProjectedFields() const
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
Definition TObject.cxx:223
void Add(RHist< DIMENSIONS, PRECISION, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION, STAT_FROM... > &from)
Add two histograms.
Definition RHist.hxx:339
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Generic information about the physical location of data.