Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RNTupleFillContext.cxx
Go to the documentation of this file.
1/// \file RNTupleFillContext.cxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2024-02-22
4
5/*************************************************************************
6 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
7 * All rights reserved. *
8 * *
9 * For the licensing terms see $ROOTSYS/LICENSE. *
10 * For the list of contributors see $ROOTSYS/README/CREDITS. *
11 *************************************************************************/
12
14
15#include <ROOT/RError.hxx>
16#include <ROOT/RFieldBase.hxx>
17#include <ROOT/RLogger.hxx>
19#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RNTupleUtils.hxx>
22#include <ROOT/RPageStorage.hxx>
23
24#include <algorithm>
25#include <utility>
26
27ROOT::RNTupleFillContext::RNTupleFillContext(std::unique_ptr<ROOT::RNTupleModel> model,
28 std::unique_ptr<ROOT::Internal::RPageSink> sink)
29 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleFillContext")
30{
31 fModel->Freeze();
32 fSink->Init(*fModel);
33 fMetrics.ObserveMetrics(fSink->GetMetrics());
34
35 const auto &writeOpts = fSink->GetWriteOptions();
36 fMaxUnzippedClusterSize = writeOpts.GetMaxUnzippedClusterSize();
37 // First estimate is a factor 2 compression if compression is used at all
38 const int scale = writeOpts.GetCompression() ? 2 : 1;
39 fUnzippedClusterSizeEst = scale * writeOpts.GetApproxZippedClusterSize();
40}
41
43{
44 try {
46 } catch (const RException &err) {
47 R__LOG_ERROR(ROOT::Internal::NTupleLog()) << "failure flushing cluster: " << err.GetError().GetReport();
48 }
49
50 if (!fStagedClusters.empty()) {
52 << std::to_string(fStagedClusters.size()) << " staged clusters still pending, their data is lost";
53 }
54}
55
62
64{
65 if (fNEntries == fLastFlushed) {
66 return;
67 }
68 for (auto &field : ROOT::Internal::GetFieldZeroOfModel(*fModel)) {
70 }
71 auto nEntriesInCluster = fNEntries - fLastFlushed;
73 auto stagedCluster = fSink->StageCluster(nEntriesInCluster);
74 fNBytesFlushed += stagedCluster.fNBytesWritten;
75 fStagedClusters.push_back(std::move(stagedCluster));
76 } else {
77 fNBytesFlushed += fSink->CommitCluster(nEntriesInCluster);
78 }
80
81 // Cap the compression factor at 1000 to prevent overflow of fUnzippedClusterSizeEst
82 const float compressionFactor =
83 std::min(1000.f, static_cast<float>(fNBytesFilled) / static_cast<float>(fNBytesFlushed));
85 compressionFactor * static_cast<float>(fSink->GetWriteOptions().GetApproxZippedClusterSize());
86
89}
90
92{
93 if (fStagedClusters.empty()) {
94 return;
95 }
96 if (fModel->IsExpired()) {
97 throw RException(R__FAIL("invalid attempt to commit staged clusters after dataset was committed"));
98 }
99
100 fSink->CommitStagedClusters(fStagedClusters);
101 fStagedClusters.clear();
102}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_ERROR(...)
Definition RLogger.hxx:356
Double_t err
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
void FlushCluster()
Flush so far filled entries to storage.
ROOT::NTupleSize_t fLastFlushed
std::size_t fUnzippedClusterSizeEst
Estimator of uncompressed cluster size, taking into account the estimated compression ratio.
std::uint64_t fNBytesFilled
The total number of bytes filled into all the so far committed clusters, i.e.
Experimental::Detail::RNTupleMetrics fMetrics
std::uint64_t fNBytesFlushed
The total number of bytes written to storage (i.e., after compression).
std::vector< ROOT::Internal::RPageSink::RStagedCluster > fStagedClusters
Vector of currently staged clusters.
void FlushColumns()
Flush column data, preparing for CommitCluster or to reduce memory usage.
RNTupleFillContext(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
std::size_t fUnzippedClusterSize
Keeps track of the number of bytes written into the current cluster.
void CommitStagedClusters()
Logically append staged clusters to the RNTuple.
bool fStagedClusterCommitting
Whether to enable staged cluster committing, where only an explicit call to CommitStagedClusters() wi...
std::unique_ptr< ROOT::RNTupleModel > fModel
Needs to be destructed before fSink.
std::unique_ptr< ROOT::Internal::RPageSink > fSink
std::size_t fMaxUnzippedClusterSize
Limit for committing cluster no matter the other tunables.
ROOT::RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
void CallCommitClusterOnField(RFieldBase &)
void CallFlushColumnsOnField(RFieldBase &)