30 std::unique_ptr<Internal::RPageSink> sink)
31 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics(
"RNTupleFillContext")
37 const auto &writeOpts =
fSink->GetWriteOptions();
40 const int scale = writeOpts.GetCompression() ? 2 : 1;
52 if (!fStagedClusters.empty()) {
54 <<
" staged clusters still pending, their data is lost";
67 if (fNEntries == fLastFlushed) {
73 auto nEntriesInCluster = fNEntries - fLastFlushed;
74 if (fStagedClusterCommitting) {
75 auto stagedCluster = fSink->StageCluster(nEntriesInCluster);
76 fNBytesFlushed += stagedCluster.fNBytesWritten;
77 fStagedClusters.push_back(std::move(stagedCluster));
79 fNBytesFlushed += fSink->CommitCluster(nEntriesInCluster);
81 fNBytesFilled += fUnzippedClusterSize;
84 const float compressionFactor =
85 std::min(1000.f,
static_cast<float>(fNBytesFilled) /
static_cast<float>(fNBytesFlushed));
86 fUnzippedClusterSizeEst =
87 compressionFactor *
static_cast<float>(fSink->GetWriteOptions().GetApproxZippedClusterSize());
89 fLastFlushed = fNEntries;
90 fUnzippedClusterSize = 0;
95 if (fStagedClusters.empty()) {
98 if (fModel->IsExpired()) {
99 throw RException(
R__FAIL(
"invalid attempt to commit staged clusters after dataset was committed"));
102 fSink->CommitStagedClusters(fStagedClusters);
103 fStagedClusters.clear();
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_ERROR(...)
void ObserveMetrics(RNTupleMetrics &observee)
void FlushCluster()
Flush so far filled entries to storage.
Detail::RNTupleMetrics fMetrics
std::size_t fUnzippedClusterSizeEst
Estimator of uncompressed cluster size, taking into account the estimated compression ratio.
std::unique_ptr< RNTupleModel > fModel
Needs to be destructed before fSink.
std::size_t fMaxUnzippedClusterSize
Limit for committing cluster no matter the other tunables.
void CommitStagedClusters()
Logically append staged clusters to the RNTuple.
RNTupleFillContext(std::unique_ptr< RNTupleModel > model, std::unique_ptr< Internal::RPageSink > sink)
void FlushColumns()
Flush column data, preparing for CommitCluster or to reduce memory usage.
std::unique_ptr< Internal::RPageSink > fSink
std::string GetReport() const
Format a dignostics report, e.g. for an exception message.
Base class for all ROOT issued exceptions.
const RError & GetError() const
void CallFlushColumnsOnField(RFieldBase &)
void CallCommitClusterOnField(RFieldBase &)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.