Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleFillContext.cxx
Go to the documentation of this file.
1/// \file RNTupleFillContext.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2024-02-22
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
17
18#include <ROOT/RError.hxx>
19#include <ROOT/RFieldBase.hxx>
20#include <ROOT/RLogger.hxx>
22#include <ROOT/RNTupleModel.hxx>
24#include <ROOT/RPageStorage.hxx>
25
26#include <algorithm>
27#include <utility>
28
30 std::unique_ptr<Internal::RPageSink> sink)
31 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleFillContext")
32{
33 fModel->Freeze();
34 fSink->Init(*fModel);
35 fMetrics.ObserveMetrics(fSink->GetMetrics());
36
37 const auto &writeOpts = fSink->GetWriteOptions();
38 fMaxUnzippedClusterSize = writeOpts.GetMaxUnzippedClusterSize();
39 // First estimate is a factor 2 compression if compression is used at all
40 const int scale = writeOpts.GetCompression() ? 2 : 1;
41 fUnzippedClusterSizeEst = scale * writeOpts.GetApproxZippedClusterSize();
42}
43
45{
46 try {
47 FlushCluster();
48 } catch (const RException &err) {
49 R__LOG_ERROR(NTupleLog()) << "failure flushing cluster: " << err.GetError().GetReport();
50 }
51
52 if (!fStagedClusters.empty()) {
53 R__LOG_ERROR(NTupleLog()) << std::to_string(fStagedClusters.size())
54 << " staged clusters still pending, their data is lost";
55 }
56}
57
59{
60 for (auto &field : Internal::GetFieldZeroOfModel(*fModel)) {
62 }
63}
64
66{
67 if (fNEntries == fLastFlushed) {
68 return;
69 }
70 for (auto &field : Internal::GetFieldZeroOfModel(*fModel)) {
72 }
73 auto nEntriesInCluster = fNEntries - fLastFlushed;
74 if (fStagedClusterCommitting) {
75 auto stagedCluster = fSink->StageCluster(nEntriesInCluster);
76 fNBytesFlushed += stagedCluster.fNBytesWritten;
77 fStagedClusters.push_back(std::move(stagedCluster));
78 } else {
79 fNBytesFlushed += fSink->CommitCluster(nEntriesInCluster);
80 }
81 fNBytesFilled += fUnzippedClusterSize;
82
83 // Cap the compression factor at 1000 to prevent overflow of fUnzippedClusterSizeEst
84 const float compressionFactor =
85 std::min(1000.f, static_cast<float>(fNBytesFilled) / static_cast<float>(fNBytesFlushed));
86 fUnzippedClusterSizeEst =
87 compressionFactor * static_cast<float>(fSink->GetWriteOptions().GetApproxZippedClusterSize());
88
89 fLastFlushed = fNEntries;
90 fUnzippedClusterSize = 0;
91}
92
94{
95 if (fStagedClusters.empty()) {
96 return;
97 }
98 if (fModel->IsExpired()) {
99 throw RException(R__FAIL("invalid attempt to commit staged clusters after dataset was committed"));
100 }
101
102 fSink->CommitStagedClusters(fStagedClusters);
103 fStagedClusters.clear();
104}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_ERROR(...)
Definition RLogger.hxx:362
void ObserveMetrics(RNTupleMetrics &observee)
void FlushCluster()
Flush so far filled entries to storage.
std::size_t fUnzippedClusterSizeEst
Estimator of uncompressed cluster size, taking into account the estimated compression ratio.
std::unique_ptr< RNTupleModel > fModel
Needs to be destructed before fSink.
std::size_t fMaxUnzippedClusterSize
Limit for committing cluster no matter the other tunables.
void CommitStagedClusters()
Logically append staged clusters to the RNTuple.
RNTupleFillContext(std::unique_ptr< RNTupleModel > model, std::unique_ptr< Internal::RPageSink > sink)
void FlushColumns()
Flush column data, preparing for CommitCluster or to reduce memory usage.
std::unique_ptr< Internal::RPageSink > fSink
std::string GetReport() const
Format a dignostics report, e.g. for an exception message.
Definition RError.cxx:22
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
const RError & GetError() const
Definition RError.hxx:84
void CallFlushColumnsOnField(RFieldBase &)
void CallCommitClusterOnField(RFieldBase &)
RFieldZero & GetFieldZeroOfModel(RNTupleModel &model)
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.