48 "wall clock time spent in critical sections"),
50 "CPU time spent compressing"),
52 "timeCpuCriticalSection",
"ns",
"CPU time spent in critical section")});
79 for (
auto *
f : fields) {
81 for (
auto &descendant : *
f) {
82 connectField(descendant);
109 auto cloned = field->Clone(field->GetFieldName());
115 auto cloned = field->Clone(field->GetFieldName());
119 fieldMap[p] = &
fInnerModel->GetConstField(projectedFields.GetSourceField(field)->GetQualifiedFieldName());
120 auto targetIt = cloned->begin();
121 for (
auto &
f : *field)
122 fieldMap[&(*targetIt++)] =
123 &
fInnerModel->GetConstField(projectedFields.GetSourceField(&
f)->GetQualifiedFieldName());
130 std::back_inserter(innerChangeset.
fAddedFields), cloneAddField);
134 fInnerSink->UpdateSchema(innerChangeset, firstEntry);
141 fInnerSink->UpdateExtraTypeInfo(extraTypeInfo);
161 auto allocateBuf = [&zipItem, maxSealedPageBytes]() {
165 auto shrinkSealedPage = [&zipItem, maxSealedPageBytes, &sealedPage]() {
168 auto sealedBufferSize = sealedPage.GetBufferSize();
169 if (sealedBufferSize < maxSealedPageBytes) {
171 memcpy(buf.get(), sealedPage.GetBuffer(), sealedBufferSize);
172 zipItem.fBuf = std::move(buf);
173 sealedPage.SetBuffer(zipItem.fBuf.get());
181 bool enoughWork = bufferedUncompressed >
GetWriteOptions().GetApproxZippedClusterSize();
187 config.
fPage = &page;
192 config.
fBuffer = zipItem.fBuf.get();
198 zipItem.fSealedPage = &sealedPage;
210 assert(zipItem.fPage.GetNBytes() == page.
GetNBytes());
216 fTaskScheduler->AddTask([
this, &zipItem, &sealedPage, &element, allocateBuf, shrinkSealedPage] {
223 config.
fPage = &zipItem.fPage;
229 config.
fBuffer = zipItem.fBuf.get();
234 zipItem.fSealedPage = &sealedPage;
236 zipItem.fPage =
RPage();
247 std::span<ROOT::Internal::RPageStorage::RSealedPageGroup> )
260 std::vector<RSealedPageGroup> toCommit;
263 R__ASSERT(bufColumn.HasSealedPagesOnly());
264 const auto &sealedPages = bufColumn.GetSealedPages();
265 toCommit.emplace_back(bufColumn.GetHandle().fPhysicalId, sealedPages.cbegin(), sealedPages.cend());
281 bufColumn.DropBufferedPages();
286 std::uint64_t nbytes;
295 return stagedCluster;
321 return fInnerSink->ReservePage(columnHandle, nElements);
324std::unique_ptr<ROOT::Internal::RPageSink>
332 fInnerSink->CommitAttributeSet(attrSetName, attrAnchorInfo);
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
A non thread-safe integral performance counter.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
ROOT::Internal::RColumnElementBase * GetElement() const
std::deque< RPageZipItem > fBufferedPages
Using a deque guarantees that element iterators are never invalidated by appends to the end of the it...
RPageStorage::SealedPageSequence_t fSealedPages
Pages that have been already sealed by a concurrent task.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
std::unique_ptr< RPageSink > CloneAsHidden(std::string_view name, const RNTupleWriteOptions &opts) const final
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
std::vector< ColumnHandle_t > fSuppressedColumns
Columns committed as suppressed are stored and passed to the inner sink at cluster commit.
std::unique_ptr< RCounters > fCounters
ROOT::DescriptorId_t fNColumns
std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final
Finalize the current cluster and create a new one for the following data.
void FlushClusterImpl(std::function< void(void)> FlushClusterFn)
std::atomic< std::size_t > fBufferedUncompressed
The sum of uncompressed bytes in buffered pages. Used to heuristically reduce the memory usage.
void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RPageSinkBuf(std::unique_ptr< RPageSink > inner)
ROOT::DescriptorId_t fNFields
void CommitAttributeSet(std::string_view attrSetName, const RNTupleLink &attrAnchorInfo) final
Adds the given anchor information (name + locator) into the main RNTuple's descriptor as an attribute...
RNTupleLink CommitDatasetImpl() final
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
std::unique_ptr< ROOT::RNTupleModel > fInnerModel
The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
std::vector< RColumnBuf > fBufferedColumns
Vector of buffered column pages. Indexed by column id.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void InitImpl(ROOT::RNTupleModel &model) final
std::unique_ptr< RPageSink > fInnerSink
The inner sink, responsible for actually performing I/O.
void ConnectFields(const std::vector< ROOT::RFieldBase * > &fields, ROOT::NTupleSize_t firstEntry)
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final
Register a new column.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
RTaskScheduler * fTaskScheduler
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAllocator
For the time being, we will use the heap allocator for all sources and sinks. This may change in the ...
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
const std::string & GetNTupleName() const
Returns the NTuple name.
A page is a slice of a column that is mapped into memory.
std::uint32_t GetNElements() const
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
std::uint32_t GetElementSize() const
std::unordered_map< const ROOT::RFieldBase *, const ROOT::RFieldBase * > FieldMap_t
The map keys are the projected target fields, the map values are the backing source fields Note that ...
RResult< void > Add(std::unique_ptr< ROOT::RFieldBase > field, const FieldMap_t &fieldMap)
Adds a new projected field.
Base class for all ROOT issued exceptions.
A field translates read and write calls from/to underlying columns to/from tree values.
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
RNTupleTimer< RNTupleAtomicCounter, RNTupleTickCounter< RNTupleAtomicCounter > > RNTupleAtomicTimer
RNTupleTimer< RNTuplePlainCounter, RNTupleTickCounter< RNTuplePlainCounter > > RNTuplePlainTimer
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
RProjectedFields & GetProjectedFieldsOfModel(RNTupleModel &model)
void CallConnectPageSinkOnField(RFieldBase &, ROOT::Internal::RPageSink &, ROOT::NTupleSize_t firstEntry=0)
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel.
std::vector< ROOT::RFieldBase * > fAddedProjectedFields
Points to the projected fields in fModel that were added as part of an updater transaction.
std::vector< ROOT::RFieldBase * > fAddedFields
Points to the fields in fModel that were added as part of an updater transaction.
I/O performance counters that get registered in fMetrics.
Parameters for the SealPage() method.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
const ROOT::Internal::RPage * fPage
Input page to be sealed.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
Cluster that was staged, but not yet logically appended to the RNTuple.
ROOT::Internal::RColumn * fColumn
ROOT::DescriptorId_t fPhysicalId
A sealed page contains the bytes of a page as written to storage (packed & compressed).