41 "wall clock time spent in critical sections"),
43 "timeCpuCriticalSection",
"ns",
"CPU time spent in critical section")});
67 f.SetOnDiskId(fNFields);
70 for (
auto *
f : fields) {
72 for (
auto &descendant : *
f) {
73 connectField(descendant);
76 fBufferedColumns.resize(fNColumns);
81 return fInnerSink->GetDescriptor();
88 fInnerModel = model.
Clone();
89 fInnerSink->Init(*fInnerModel);
99 auto cloneAddField = [&](
const RFieldBase *field) {
100 auto cloned = field->Clone(field->GetFieldName());
102 fInnerModel->AddField(std::move(cloned));
105 auto cloneAddProjectedField = [&](
RFieldBase *field) {
106 auto cloned = field->
Clone(field->GetFieldName());
110 fieldMap[
p] = &fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
111 auto targetIt = cloned->begin();
112 for (
auto &
f : *field)
113 fieldMap[&(*targetIt++)] =
114 &fInnerModel->GetConstField(projectedFields.GetSourceField(&
f)->GetQualifiedFieldName());
119 fInnerModel->Unfreeze();
121 std::back_inserter(innerChangeset.fAddedFields), cloneAddField);
123 std::back_inserter(innerChangeset.fAddedProjectedFields), cloneAddProjectedField);
124 fInnerModel->Freeze();
125 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
132 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
137 fSuppressedColumns.emplace_back(columnHandle);
147 auto &zipItem = fBufferedColumns.at(colId).BufferPage(columnHandle);
148 std::size_t maxSealedPageBytes = page.
GetNBytes() + GetWriteOptions().GetEnablePageChecksums() * kNBytesPageChecksum;
150 auto &sealedPage = fBufferedColumns.at(colId).RegisterSealedPage();
152 auto allocateBuf = [&zipItem, maxSealedPageBytes]() {
153 zipItem.fBuf = std::make_unique<unsigned char[]>(maxSealedPageBytes);
156 auto shrinkSealedPage = [&zipItem, maxSealedPageBytes, &sealedPage]() {
159 auto sealedBufferSize = sealedPage.GetBufferSize();
160 if (sealedBufferSize < maxSealedPageBytes) {
161 auto buf = std::make_unique<unsigned char[]>(sealedBufferSize);
162 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
163 zipItem.fBuf = std::move(buf);
164 sealedPage.SetBuffer(zipItem.fBuf.get());
168 if (!fTaskScheduler) {
172 config.
fPage = &page;
175 config.
fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
177 config.
fBuffer = zipItem.fBuf.get();
178 sealedPage = SealPage(config);
180 zipItem.fSealedPage = &sealedPage;
190 fCounters->fParallelZip.SetValue(1);
193 fTaskScheduler->AddTask([
this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
196 config.
fPage = &zipItem.fPage;
199 config.
fWriteChecksum = GetWriteOptions().GetEnablePageChecksums();
202 config.
fBuffer = zipItem.fBuf.get();
203 sealedPage = SealPage(config);
205 zipItem.fSealedPage = &sealedPage;
207 zipItem.fPage =
RPage();
229 std::vector<RSealedPageGroup> toCommit;
230 toCommit.reserve(fBufferedColumns.size());
231 for (
auto &bufColumn : fBufferedColumns) {
232 R__ASSERT(bufColumn.HasSealedPagesOnly());
233 const auto &sealedPages = bufColumn.GetSealedPages();
234 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
240 fInnerSink->CommitSealedPageV(toCommit);
242 for (
auto handle : fSuppressedColumns)
243 fInnerSink->CommitSuppressedColumn(handle);
244 fSuppressedColumns.clear();
249 for (
auto &bufColumn : fBufferedColumns)
250 bufColumn.DropBufferedPages();
255 std::uint64_t nbytes;
256 FlushClusterImpl([&] { nbytes = fInnerSink->CommitCluster(nNewEntries); });
264 FlushClusterImpl([&] { stagedCluster = fInnerSink->StageCluster(nNewEntries); });
265 return stagedCluster;
272 fInnerSink->CommitStagedClusters(clusters);
279 fInnerSink->CommitClusterGroup();
286 fInnerSink->CommitDataset();
292 return fInnerSink->ReservePage(columnHandle, nElements);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
winID h TVirtualViewer3D TVirtualGLPainter p
A collection of Counter objects with a name, a unit, and a description.
void ObserveMetrics(RNTupleMetrics &observee)
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
A non thread-safe integral performance counter.
An either thread-safe or non thread safe counter for CPU ticks.
Record wall time and CPU time between construction and destruction.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
RColumnElementBase * GetElement() const
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
void CommitDatasetImpl() final
std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, RColumn &column) final
Register a new column.
RStagedCluster StageCluster(NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
const RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
std::unique_ptr< RCounters > fCounters
void InitImpl(RNTupleModel &model) final
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void ConnectFields(const std::vector< RFieldBase * > &fields, NTupleSize_t firstEntry)
void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
Abstract interface to write data into an ntuple.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
const std::string & GetNTupleName() const
Returns the NTuple name.
Detail::RNTupleMetrics fMetrics
A page is a slice of a column that is mapped into memory.
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
std::uint32_t GetElementSize() const
std::uint32_t GetNElements() const
std::unordered_map< const RFieldBase *, const RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
std::unique_ptr< RNTupleModel > Clone() const
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, RPageSink &, NTupleSize_t firstEntry=0)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel
std::vector< RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
const RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
int fCompressionSetting
Compression algorithm and level to apply.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
const RPage * fPage
Input page to be sealed.
Cluster that was staged, but not yet logically appended to the RNTuple.
DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).