Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RNTupleParallelWriter.cxx
Go to the documentation of this file.
1/// \file RNTupleParallelWriter.cxx
2/// \ingroup NTuple ROOT7
3/// \author Jonas Hahnfeld <jonas.hahnfeld@cern.ch>
4/// \date 2024-02-01
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
17
18#include <ROOT/RNTupleModel.hxx>
20#include <ROOT/RPageSinkBuf.hxx>
21#include <ROOT/RPageStorage.hxx>
23
24namespace {
25
35
36/// An internal RPageSink that enables multiple RNTupleFillContext to write into a single common RPageSink.
37///
38/// The setup with two contexts looks as follows:
39///
40/// +------ owned by RNTupleFillContext ------+
41/// | |
42/// RPageSinkBuf --- forwards to ---> RPageSynchronizingSink ---+
43/// (and owns) |
44/// (via raw fInnerSink ptr) +-- RPageSink (usually a persistent sink)
45/// |
46/// RPageSinkBuf --- forwards to ---> RPageSynchronizingSink ---+
47/// | (and owns) |
48/// | |
49/// +------ owned by RNTupleFillContext ------+
50///
51/// The mutex used by the synchronizing sinks is owned by the RNTupleParallelWriter that also owns the original model,
52/// the "final" sink (usually a persistent sink) and keeps weak_ptr's of the contexts (to make sure they are destroyed
53/// before the writer is destructed).
54class RPageSynchronizingSink : public RPageSink {
55private:
56 /// The wrapped inner sink, not owned by this class.
57 RPageSink *fInnerSink;
58 std::mutex *fMutex;
59
60public:
61 explicit RPageSynchronizingSink(RPageSink &inner, std::mutex &mutex)
62 : RPageSink(inner.GetNTupleName(), inner.GetWriteOptions()), fInnerSink(&inner), fMutex(&mutex)
63 {
64 // Do not observe the sink's metrics: It will contain some counters for all threads, which is misleading for the
65 // users.
66 // fMetrics.ObserveMetrics(fSink->GetMetrics());
67 }
68 RPageSynchronizingSink(const RPageSynchronizingSink &) = delete;
69 RPageSynchronizingSink &operator=(const RPageSynchronizingSink &) = delete;
70
71 const RNTupleDescriptor &GetDescriptor() const final { return fInnerSink->GetDescriptor(); }
72
73 ColumnHandle_t AddColumn(DescriptorId_t, const RColumn &) final { return {}; }
74 void InitImpl(RNTupleModel &) final {}
75 void UpdateSchema(const RNTupleModelChangeset &, NTupleSize_t) final
76 {
77 throw RException(R__FAIL("UpdateSchema not supported via RPageSynchronizingSink"));
78 }
79
80 void CommitPage(ColumnHandle_t, const RPage &) final
81 {
82 throw RException(R__FAIL("should never commit single pages via RPageSynchronizingSink"));
83 }
84 void CommitSealedPage(DescriptorId_t, const RSealedPage &) final
85 {
86 throw RException(R__FAIL("should never commit sealed pages via RPageSynchronizingSink"));
87 }
88 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final
89 {
90 fInnerSink->CommitSealedPageV(ranges);
91 }
92 std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final { return fInnerSink->CommitCluster(nNewEntries); }
93 void CommitClusterGroup() final
94 {
95 throw RException(R__FAIL("should never commit cluster group via RPageSynchronizingSink"));
96 }
97 void CommitDataset() final { throw RException(R__FAIL("should never commit dataset via RPageSynchronizingSink")); }
98
99 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
100 {
101 return fInnerSink->ReservePage(columnHandle, nElements);
102 }
103 void ReleasePage(RPage &page) final { fInnerSink->ReleasePage(page); }
104
105 RSinkGuard GetSinkGuard() final { return RSinkGuard(fMutex); }
106};
107
108} // namespace
109
111 std::unique_ptr<Internal::RPageSink> sink)
112 : fSink(std::move(sink)), fModel(std::move(model)), fMetrics("RNTupleParallelWriter")
113{
114 fModel->Freeze();
115 fSink->Init(*fModel.get());
116 fMetrics.ObserveMetrics(fSink->GetMetrics());
117}
118
120{
121 for (const auto &context : fFillContexts) {
122 if (!context.expired()) {
123 R__LOG_ERROR(NTupleLog()) << "RNTupleFillContext has not been destructed";
124 return;
125 }
126 }
127
128 // Now commit all clusters as a cluster group and then the dataset.
129 try {
130 fSink->CommitClusterGroup();
131 fSink->CommitDataset();
132 } catch (const RException &err) {
133 R__LOG_ERROR(NTupleLog()) << "failure committing ntuple: " << err.GetError().GetReport();
134 }
135}
136
137std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
138ROOT::Experimental::RNTupleParallelWriter::Recreate(std::unique_ptr<RNTupleModel> model, std::string_view ntupleName,
139 std::string_view storage, const RNTupleWriteOptions &options)
140{
141 if (!options.GetUseBufferedWrite()) {
142 throw RException(R__FAIL("parallel writing requires buffering"));
143 }
144
145 auto sink = Internal::RPagePersistentSink::Create(ntupleName, storage, options);
146 // Cannot use std::make_unique because the constructor of RNTupleParallelWriter is private.
147 return std::unique_ptr<RNTupleParallelWriter>(new RNTupleParallelWriter(std::move(model), std::move(sink)));
148}
149
150std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
151ROOT::Experimental::RNTupleParallelWriter::Append(std::unique_ptr<RNTupleModel> model, std::string_view ntupleName,
152 TFile &file, const RNTupleWriteOptions &options)
153{
154 if (!options.GetUseBufferedWrite()) {
155 throw RException(R__FAIL("parallel writing requires buffering"));
156 }
157
158 auto sink = std::make_unique<Internal::RPageSinkFile>(ntupleName, file, options);
159 // Cannot use std::make_unique because the constructor of RNTupleParallelWriter is private.
160 return std::unique_ptr<RNTupleParallelWriter>(new RNTupleParallelWriter(std::move(model), std::move(sink)));
161}
162
163std::shared_ptr<ROOT::Experimental::RNTupleFillContext> ROOT::Experimental::RNTupleParallelWriter::CreateFillContext()
164{
165 std::lock_guard g(fMutex);
166
167 auto model = fModel->Clone();
168
169 // TODO: Think about honoring RNTupleWriteOptions::SetUseBufferedWrite(false); this requires synchronization on every
170 // call to CommitPage() *and* preparing multiple cluster descriptors in parallel!
171 auto sink = std::make_unique<Internal::RPageSinkBuf>(std::make_unique<RPageSynchronizingSink>(*fSink, fSinkMutex));
172
173 // Cannot use std::make_shared because the constructor of RNTupleFillContext is private. Also it would mean that the
174 // (direct) memory of all contexts stays around until the vector of weak_ptr's is cleared.
175 std::shared_ptr<RNTupleFillContext> context(new RNTupleFillContext(std::move(model), std::move(sink)));
176 fFillContexts.push_back(context);
177 return context;
178}
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define R__LOG_ERROR(...)
Definition RLogger.hxx:362
#define g(i)
Definition RSha256.hxx:105
Binding & operator=(OUT(*fun)(void))
void ObserveMetrics(RNTupleMetrics &observee)
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
std::string GetReport() const
Format a dignostics report, e.g. for an exception message.
Definition RError.cxx:25
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
const RError & GetError() const
Definition RError.hxx:82
The on-storage meta-data of an ntuple.
A context for filling entries (data) into clusters of an RNTuple.
The RNTupleModel encapulates the schema of an ntuple.
A writer to fill an RNTuple from multiple contexts.
RNTupleParallelWriter(std::unique_ptr< RNTupleModel > model, std::unique_ptr< Internal::RPageSink > sink)
std::unique_ptr< RNTupleModel > fModel
The original RNTupleModel connected to fSink; needs to be destructed before it.
std::unique_ptr< Internal::RPageSink > fSink
The final RPageSink that represents the synchronization point.
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Recreate a new file and return a writer to write an ntuple.
std::shared_ptr< RNTupleFillContext > CreateFillContext()
Create a new RNTupleFillContext that can be used to fill entries and prepare clusters in parallel.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< RNTupleModel > model, std::string_view ntupleName, TFile &file, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Append an ntuple to the existing file, which must not be accessed while data is filled into any creat...
Common user-tunable settings for storing ntuples.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
The incremental changes to a RNTupleModel