43 "compressing pages in parallel")
60 fInnerModel = model.
Clone();
61 fInnerSink->Create(*fInnerModel);
68 bool isIncremental = !fBufferedColumns.empty();
69 fBufferedColumns.resize(fDescriptorBuilder.GetDescriptor().GetNPhysicalColumns());
75 auto cloneAddField = [&](
const RFieldBase *field) {
76 auto cloned = field->Clone(field->GetName());
78 fInnerModel->AddField(std::move(cloned));
81 auto cloneAddProjectedField = [&](
RFieldBase *field) {
82 auto cloned = field->
Clone(field->GetName());
87 auto targetIt = cloned->
begin();
88 for (
auto &
f : *field)
89 fieldMap[&(*targetIt++)] = projectedFields.GetSourceField(&
f);
94 fInnerModel->Unfreeze();
96 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
98 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
99 fInnerModel->Freeze();
100 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
115 auto &zipItem = fBufferedColumns.at(columnHandle.
fPhysicalId).BufferPage(columnHandle, bufPage);
116 if (!fTaskScheduler) {
119 fCounters->fParallelZip.SetValue(1);
122 zipItem.AllocateSealedPageBuf();
124 auto &sealedPage = fBufferedColumns.at(columnHandle.
fPhysicalId).RegisterSealedPage();
125 fTaskScheduler->AddTask([
this, &zipItem, &sealedPage, colId = columnHandle.
fPhysicalId] {
126 sealedPage = SealPage(zipItem.fPage, *fBufferedColumns.at(colId).GetHandle().fColumn->GetElement(),
127 GetWriteOptions().GetCompression(), zipItem.fBuf.get());
128 zipItem.fSealedPage = &sealedPage;
140 fInnerSink->CommitSealedPage(physicalColumnId, sealedPage);
152 bool singleCommitCall = std::all_of(fBufferedColumns.begin(), fBufferedColumns.end(),
153 [](
auto &bufColumn) { return bufColumn.HasSealedPagesOnly(); });
154 if (singleCommitCall) {
155 std::vector<RSealedPageGroup> toCommit;
156 toCommit.reserve(fBufferedColumns.size());
157 for (
auto &bufColumn : fBufferedColumns) {
158 const auto &sealedPages = bufColumn.GetSealedPages();
159 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
161 fInnerSink->CommitSealedPageV(toCommit);
163 for (
auto &bufColumn : fBufferedColumns)
164 bufColumn.DropBufferedPages();
165 return fInnerSink->CommitCluster(nEntries);
169 for (
auto &bufColumn : fBufferedColumns) {
173 if (!bufColumn.IsEmpty() && bufColumn.HasSealedPagesOnly())
178 auto drained = bufColumn.DrainBufferedPages();
179 for (
auto &bufPage : std::get<std::deque<RColumnBuf::RPageZipItem>>(drained)) {
180 if (bufPage.IsSealed()) {
181 fInnerSink->CommitSealedPage(bufColumn.GetHandle().fPhysicalId, *bufPage.fSealedPage);
183 fInnerSink->CommitPage(bufColumn.GetHandle(), bufPage.fPage);
185 ReleasePage(bufPage.fPage);
188 return fInnerSink->CommitCluster(nEntries);
195 fInnerSink->CommitClusterGroup();
203 fInnerSink->CommitDataset();
209 return fInnerSink->ReservePage(columnHandle, nElements);
214 fInnerSink->ReleasePage(page);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
winID h TVirtualViewer3D TVirtualGLPainter p
RPageSink * GetPageSink() const
A field translates read and write calls from/to underlying columns to/from tree values.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::ColumnHandle_t fCol
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
std::unique_ptr< RCounters > fCounters
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
Abstract interface to write data into an ntuple.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)
Incorporate incremental changes to the model into the ntuple descriptor.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
const std::string & GetNTupleName() const
Returns the NTuple name.
virtual void ReleasePage(RPage &page)=0
Every page store needs to be able to free pages it handed out.
A page is a slice of a column that is mapped into memory.
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
std::uint32_t GetNElements() const
Base class for all ROOT issued exceptions.
Projected fields are fields whose columns are reused from existing fields.
std::unordered_map< const Detail::RFieldBase *, const Detail::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
const Detail::RFieldBase * GetSourceField(const Detail::RFieldBase *target) const
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
const RProjectedFields & GetProjectedFields() const
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
void Add(RHist< DIMENSIONS, PRECISION, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION, STAT_FROM... > &from)
Add two histograms.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Generic information about the physical location of data.