60class RPageSynchronizingSink :
public RPageSink {
63 RPageSink *fInnerSink;
67 explicit RPageSynchronizingSink(RPageSink &
inner, std::mutex &mutex)
68 : RPageSink(
inner.GetNTupleName(),
inner.GetWriteOptions()), fInnerSink(&
inner), fMutex(&mutex)
74 RPageSynchronizingSink(
const RPageSynchronizingSink &) =
delete;
75 RPageSynchronizingSink &
operator=(
const RPageSynchronizingSink &) =
delete;
77 const RNTupleDescriptor &GetDescriptor()
const final {
return fInnerSink->GetDescriptor(); }
79 NTupleSize_t GetNEntries()
const final {
return fInnerSink->GetNEntries(); }
81 ColumnHandle_t AddColumn(DescriptorId_t, RColumn &)
final {
return {}; }
83 void UpdateSchema(
const RNTupleModelChangeset &, NTupleSize_t)
final
87 void UpdateExtraTypeInfo(
const RExtraTypeInfoDescriptor &)
final
92 void CommitSuppressedColumn(ColumnHandle_t handle)
final { fInnerSink->CommitSuppressedColumn(handle); }
93 void CommitPage(ColumnHandle_t,
const RPage &)
final
97 void CommitSealedPage(DescriptorId_t,
const RSealedPage &)
final
101 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges)
final
103 fInnerSink->CommitSealedPageV(ranges);
105 std::uint64_t CommitCluster(NTupleSize_t
nNewEntries)
final {
return fInnerSink->CommitCluster(
nNewEntries); }
106 RStagedCluster StageCluster(NTupleSize_t
nNewEntries)
final {
return fInnerSink->StageCluster(
nNewEntries); }
107 void CommitStagedClusters(std::span<RStagedCluster>
clusters)
final { fInnerSink->CommitStagedClusters(
clusters); }
108 void CommitClusterGroup()
final
112 void CommitDatasetImpl()
final
117 RSinkGuard GetSinkGuard()
final {
return RSinkGuard(fMutex); }
123 std::unique_ptr<RPageSink>
sink)
124 : fSink(std::
move(
sink)), fModel(std::
move(model)), fMetrics(
"RNTupleParallelWriter")
126 if (
fModel->GetRegisteredSubfieldNames().size() > 0) {
127 throw RException(
R__FAIL(
"cannot create an RNTupleParallelWriter from a model with registered subfields"));
145 if (fModel->IsExpired())
148 for (
const auto &context : fFillContexts) {
149 if (!context.expired()) {
155 fSink->CommitClusterGroup();
156 fSink->CommitDataset();
160std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
174std::unique_ptr<ROOT::Experimental::RNTupleParallelWriter>
182 R__FAIL(
"RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into a directory "
183 "that is not backed by a file"));
185 if (!file->IsBinary()) {
186 throw RException(
R__FAIL(
"RNTupleParallelWriter only supports writing to a ROOT file. Cannot write into " +
187 std::string(file->GetName())));
200 std::lock_guard
g(fMutex);
202 auto model = fModel->Clone();
204 std::make_unique<ROOT::Internal::RPageSinkBuf>(std::make_unique<RPageSynchronizingSink>(*fSink, fSinkMutex));
209 fFillContexts.push_back(context);
214 context->fZipTasks = std::make_unique<ROOT::Experimental::Internal::RNTupleImtTaskScheduler>();
215 context->fSink->SetTaskScheduler(context->fZipTasks.get());
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_ERROR(...)
Binding & operator=(OUT(*fun)(void))
void ObserveMetrics(RNTupleMetrics &observee)
A context for filling entries (data) into clusters of an RNTuple.
A writer to fill an RNTuple from multiple contexts.
static std::unique_ptr< RNTupleParallelWriter > Append(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, TDirectory &fileOrDirectory, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Append an ntuple to the existing file, which must not be accessed while data is filled into any creat...
static std::unique_ptr< RNTupleParallelWriter > Recreate(std::unique_ptr< ROOT::RNTupleModel > model, std::string_view ntupleName, std::string_view storage, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Recreate a new file and return a writer to write an ntuple.
std::shared_ptr< RNTupleFillContext > CreateFillContext()
Create a new RNTupleFillContext that can be used to fill entries and prepare clusters in parallel.
RNTupleParallelWriter(std::unique_ptr< ROOT::RNTupleModel > model, std::unique_ptr< ROOT::Internal::RPageSink > sink)
std::unique_ptr< ROOT::RNTupleModel > fModel
The original RNTupleModel connected to fSink; needs to be destructed before it.
void CommitDataset()
Automatically called by the destructor.
Detail::RNTupleMetrics fMetrics
std::unique_ptr< ROOT::Internal::RPageSink > fSink
The final RPageSink that represents the synchronization point.
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Abstract interface to write data into an ntuple.
A page is a slice of a column that is mapped into memory.
Base class for all ROOT issued exceptions.
const RError & GetError() const
The on-storage metadata of an RNTuple.
The RNTupleModel encapulates the schema of an RNTuple.
Common user-tunable settings for storing RNTuples.
bool GetUseBufferedWrite() const
Describe directory structure in memory.
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
Bool_t IsImplicitMTEnabled()
Returns true if the implicit multi-threading in ROOT is enabled.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
The incremental changes to a RNTupleModel