Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorage.hxx
Go to the documentation of this file.
1/// \file ROOT/RPageStorage.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-07-19
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT7_RPageStorage
17#define ROOT7_RPageStorage
18
19#include <ROOT/RCluster.hxx>
25#include <ROOT/RNTupleUtil.hxx>
26#include <ROOT/RPage.hxx>
28#include <ROOT/RSpan.hxx>
29#include <string_view>
30
31#include <atomic>
32#include <cstddef>
33#include <deque>
34#include <functional>
35#include <memory>
36#include <mutex>
37#include <shared_mutex>
38#include <unordered_set>
39#include <vector>
40
41namespace ROOT {
42namespace Experimental {
43
44class RFieldBase;
45class RNTupleModel;
46
47namespace Internal {
48class RColumn;
49class RColumnElementBase;
50class RNTupleCompressor;
51class RNTupleDecompressor;
52struct RNTupleModelChangeset;
53class RPagePool;
54
55enum class EPageStorageType {
56 kSink,
57 kSource,
58};
59
60// clang-format off
61/**
62\class ROOT::Experimental::Internal::RPageStorage
63\ingroup NTuple
64\brief Common functionality of an ntuple storage for both reading and writing
65
66The RPageStore provides access to a storage container that keeps the bits of pages and clusters comprising
67an ntuple. Concrete implementations can use a TFile, a raw file, an object store, and so on.
68*/
69// clang-format on
71public:
72 /// The interface of a task scheduler to schedule page (de)compression tasks
74 public:
75 virtual ~RTaskScheduler() = default;
76 /// Take a callable that represents a task
77 virtual void AddTask(const std::function<void(void)> &taskFunc) = 0;
78 /// Blocks until all scheduled tasks finished
79 virtual void Wait() = 0;
80 };
81
82 /// A sealed page contains the bytes of a page as written to storage (packed & compressed). It is used
83 /// as an input to UnsealPages() as well as to transfer pages between different storage media.
84 /// RSealedPage does _not_ own the buffer it is pointing to in order to not interfere with the memory management
85 /// of concrete page sink and page source implementations.
86 struct RSealedPage {
87 private:
88 const void *fBuffer = nullptr;
89 std::uint32_t fSize = 0;
90 std::uint32_t fNElements = 0;
91
92 public:
93 RSealedPage() = default;
94 RSealedPage(const void *b, std::uint32_t s, std::uint32_t n) : fBuffer(b), fSize(s), fNElements(n) {}
95 RSealedPage(const RSealedPage &other) = default;
96 RSealedPage &operator=(const RSealedPage &other) = default;
97 RSealedPage(RSealedPage &&other) = default;
98 RSealedPage& operator =(RSealedPage &&other) = default;
99
100 const void *GetBuffer() const { return fBuffer; }
101 void SetBuffer(const void *buffer) { fBuffer = buffer; }
102
103 std::uint32_t GetSize() const { return fSize; }
104 void SetSize(std::uint32_t size) { fSize = size; }
105
106 std::uint32_t GetNElements() const { return fNElements; }
107 void SetNElements(std::uint32_t nElements) { fNElements = nElements; }
108 };
109
110 using SealedPageSequence_t = std::deque<RSealedPage>;
111 /// A range of sealed pages referring to the same column that can be used for vector commit
114 SealedPageSequence_t::const_iterator fFirst;
115 SealedPageSequence_t::const_iterator fLast;
116
117 RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
119 {
120 }
121 };
122
123protected:
125
126 std::string fNTupleName;
129 {
130 if (!fTaskScheduler)
131 return;
133 }
134
135public:
136 explicit RPageStorage(std::string_view name);
137 RPageStorage(const RPageStorage &other) = delete;
138 RPageStorage& operator =(const RPageStorage &other) = delete;
139 RPageStorage(RPageStorage &&other) = default;
141 virtual ~RPageStorage();
142
143 /// Whether the concrete implementation is a sink or a source
145
148 const RColumn *fColumn = nullptr;
149
150 /// Returns true for a valid column handle; fColumn and fPhysicalId should always either both
151 /// be valid or both be invalid.
152 explicit operator bool() const { return fPhysicalId != kInvalidDescriptorId && fColumn; }
153 };
154 /// The column handle identifies a column with the current open page storage
156
157 /// Register a new column. When reading, the column must exist in the ntuple on disk corresponding to the meta-data.
158 /// When writing, every column can only be attached once.
159 virtual ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) = 0;
160 /// Unregisters a column. A page source decreases the reference counter for the corresponding active column.
161 /// For a page sink, dropping columns is currently a no-op.
162 virtual void DropColumn(ColumnHandle_t columnHandle) = 0;
163
164 /// Every page store needs to be able to free pages it handed out. But Sinks and sources have different means
165 /// of allocating pages.
166 virtual void ReleasePage(RPage &page) = 0;
167
168 /// Returns the default metrics object. Subclasses might alternatively provide their own metrics object by
169 /// overriding this.
171
172 /// Returns the NTuple name.
173 const std::string &GetNTupleName() const { return fNTupleName; }
174
175 void SetTaskScheduler(RTaskScheduler *taskScheduler) { fTaskScheduler = taskScheduler; }
176}; // class RPageStorage
177
178// clang-format off
179/**
180\class ROOT::Experimental::Internal::RPageSink
181\ingroup NTuple
182\brief Abstract interface to write data into an ntuple
183
184The page sink takes the list of columns and afterwards a series of page commits and cluster commits.
185The user is responsible to commit clusters at a consistent point, i.e. when all pages corresponding to data
186up to the given entry number are committed.
187
188An object of this class may either be a wrapper (for example a RPageSinkBuf) or a "persistent" sink,
189inheriting from RPagePersistentSink.
190*/
191// clang-format on
192class RPageSink : public RPageStorage {
193public:
194 using Callback_t = std::function<void(RPageSink &)>;
195
196protected:
197 std::unique_ptr<RNTupleWriteOptions> fOptions;
198
199 /// Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
200 /// There could be concrete page sinks that don't need a compressor. Therefore, and in order to stay consistent
201 /// with the page source, we leave it up to the derived class whether or not the compressor gets constructed.
202 std::unique_ptr<RNTupleCompressor> fCompressor;
203
204 /// Helper for streaming a page. This is commonly used in derived, concrete page sinks. Note that if
205 /// compressionSetting is 0 (uncompressed) and the page is mappable, the returned sealed page will
206 /// point directly to the input page buffer. Otherwise, the sealed page references an internal buffer
207 /// of fCompressor. Thus, the buffer pointed to by the RSealedPage should never be freed.
208 /// Usage of this method requires construction of fCompressor.
209 RSealedPage SealPage(const RPage &page, const RColumnElementBase &element);
210
211 /// Seal a page using the provided buffer.
212 static RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting, void *buf,
213 bool allowAlias = true);
214
215private:
216 /// Flag if sink was initialized
217 bool fIsInitialized = false;
218 std::vector<Callback_t> fOnDatasetCommitCallbacks;
219 std::vector<unsigned char> fSealPageBuffer; ///< Used as destination buffer in the simple SealPage overload
220
221public:
222 RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options);
223
224 RPageSink(const RPageSink&) = delete;
225 RPageSink& operator=(const RPageSink&) = delete;
226 RPageSink(RPageSink&&) = default;
228 ~RPageSink() override;
229
231 /// Returns the sink's write options.
232 const RNTupleWriteOptions &GetWriteOptions() const { return *fOptions; }
233
234 void DropColumn(ColumnHandle_t /*columnHandle*/) final {}
235
236 bool IsInitialized() const { return fIsInitialized; }
237
238 /// Return the RNTupleDescriptor being constructed.
239 virtual const RNTupleDescriptor &GetDescriptor() const = 0;
240
241 /// Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket)
242 /// Init() associates column handles to the columns referenced by the model
243 void Init(RNTupleModel &model)
244 {
245 if (fIsInitialized) {
246 throw RException(R__FAIL("already initialized"));
247 }
248 fIsInitialized = true;
249 InitImpl(model);
250 }
251
252protected:
253 virtual void InitImpl(RNTupleModel &model) = 0;
254 virtual void CommitDatasetImpl() = 0;
255
256public:
257 /// Incorporate incremental changes to the model into the ntuple descriptor. This happens, e.g. if new fields were
258 /// added after the initial call to `RPageSink::Init(RNTupleModel &)`.
259 /// `firstEntry` specifies the global index for the first stored element in the added columns.
260 virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) = 0;
261 /// Adds an extra type information record to schema. The extra type information will be written to the
262 /// extension header. The information in the record will be merged with the existing information, e.g.
263 /// duplicate streamer info records will be removed. This method is called by the "on commit dataset" callback
264 /// registered by specific fields (e.g., unsplit field) and during merging.
265 virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) = 0;
266
267 /// Write a page to the storage. The column must have been added before.
268 virtual void CommitPage(ColumnHandle_t columnHandle, const RPage &page) = 0;
269 /// Write a preprocessed page to storage. The column must have been added before.
270 virtual void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
271 /// Write a vector of preprocessed pages to storage. The corresponding columns must have been added before.
272 virtual void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) = 0;
273 /// Finalize the current cluster and create a new one for the following data.
274 /// Returns the number of bytes written to storage (excluding meta-data).
275 virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries) = 0;
276 /// Write out the page locations (page list envelope) for all the committed clusters since the last call of
277 /// CommitClusterGroup (or the beginning of writing).
278 virtual void CommitClusterGroup() = 0;
279
280 /// The registered callback is executed at the beginning of CommitDataset();
282 /// Run the registered callbacks and finalize the current cluster and the entrire data set.
283 void CommitDataset();
284
285 /// Get a new, empty page for the given column that can be filled with up to nElements. If nElements is zero,
286 /// the page sink picks an appropriate size.
287 virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) = 0;
288
289 /// An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
291 std::mutex *fLock;
292
293 public:
294 explicit RSinkGuard(std::mutex *lock) : fLock(lock)
295 {
296 if (fLock != nullptr) {
297 fLock->lock();
298 }
299 }
300 RSinkGuard(const RSinkGuard &) = delete;
301 RSinkGuard &operator=(const RSinkGuard &) = delete;
302 RSinkGuard(RSinkGuard &&) = delete;
305 {
306 if (fLock != nullptr) {
307 fLock->unlock();
308 }
309 }
310 };
311
313 {
314 // By default, there is no lock and the guard does nothing.
315 return RSinkGuard(nullptr);
316 }
317}; // class RPageSink
318
319// clang-format off
320/**
321\class ROOT::Experimental::Internal::RPagePersistentSink
322\ingroup NTuple
323\brief Base class for a sink with a physical storage backend
324*/
325// clang-format on
327private:
328 /// Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization
330
331 /// Remembers the starting cluster id for the next cluster group
332 std::uint64_t fNextClusterInGroup = 0;
333 /// Used to calculate the number of entries in the current cluster
335 /// Keeps track of the number of elements in the currently open cluster. Indexed by column id.
336 std::vector<RClusterDescriptor::RColumnRange> fOpenColumnRanges;
337 /// Keeps track of the written pages in the currently open cluster. Indexed by column id.
338 std::vector<RClusterDescriptor::RPageRange> fOpenPageRanges;
339
340 /// Union of the streamer info records that are sent from unsplit fields to the sink before committing the dataset.
342
343protected:
345
346 /// Default I/O performance counters that get registered in fMetrics
347 struct RCounters {
355 };
356 std::unique_ptr<RCounters> fCounters;
357
358 virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length) = 0;
359
360 virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) = 0;
361 virtual RNTupleLocator
362 CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
363 /// Vector commit of preprocessed pages. The `ranges` array specifies a range of sealed pages to be
364 /// committed for each column. The returned vector contains, in order, the RNTupleLocator for each
365 /// page on each range in `ranges`, i.e. the first N entries refer to the N pages in `ranges[0]`,
366 /// followed by M entries that refer to the M pages in `ranges[1]`, etc.
367 /// The default is to call `CommitSealedPageImpl` for each page; derived classes may provide an
368 /// optimized implementation though.
369 virtual std::vector<RNTupleLocator> CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges);
370 /// Returns the number of bytes written to storage (excluding metadata)
371 virtual std::uint64_t CommitClusterImpl() = 0;
372 /// Returns the locator of the page list envelope of the given buffer that contains the serialized page list.
373 /// Typically, the implementation takes care of compressing and writing the provided buffer.
374 virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) = 0;
375 virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) = 0;
376
377 /// Enables the default set of metrics provided by RPageSink. `prefix` will be used as the prefix for
378 /// the counters registered in the internal RNTupleMetrics object.
379 /// This set of counters can be extended by a subclass by calling `fMetrics.MakeCounter<...>()`.
380 ///
381 /// A subclass using the default set of metrics is always responsible for updating the counters
382 /// appropriately, e.g. `fCounters->fNPageCommited.Inc()`
383 void EnableDefaultMetrics(const std::string &prefix);
384
385public:
386 RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options);
387
392 ~RPagePersistentSink() override;
393
394 /// Guess the concrete derived page source from the location
395 static std::unique_ptr<RPageSink> Create(std::string_view ntupleName, std::string_view location,
396 const RNTupleWriteOptions &options = RNTupleWriteOptions());
397
398 ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final;
399
401
402 /// Updates the descriptor and calls InitImpl() that handles the backend-specific details (file, DAOS, etc.)
403 void InitImpl(RNTupleModel &model) final;
404 void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final;
405 void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final;
406
407 /// Initialize sink based on an existing descriptor and fill into the descriptor builder.
408 void InitFromDescriptor(const RNTupleDescriptor &descriptor);
409
410 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
411 void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
412 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
413 std::uint64_t CommitCluster(NTupleSize_t nEntries) final;
414 void CommitClusterGroup() final;
415 void CommitDatasetImpl() final;
416}; // class RPagePersistentSink
417
418// clang-format off
419/**
420\class ROOT::Experimental::Internal::RPageSource
421\ingroup NTuple
422\brief Abstract interface to read data from an ntuple
423
424The page source is initialized with the columns of interest. Alias columns from projected fields are mapped to the
425corresponding physical columns. Pages from the columns of interest can then be mapped into memory.
426The page source also gives access to the ntuple's meta-data.
427*/
428// clang-format on
429class RPageSource : public RPageStorage {
430public:
431 /// Used in SetEntryRange / GetEntryRange
432 struct REntryRange {
434 NTupleSize_t fNEntries = 0;
435
436 /// Returns true if the given cluster has entries within the entry range
437 bool IntersectsWith(const RClusterDescriptor &clusterDesc) const;
438 };
439
440 /// An RAII wrapper used for the read-only access to RPageSource::fDescriptor. See GetExclDescriptorGuard().
443 std::shared_mutex &fLock;
444
445 public:
446 RSharedDescriptorGuard(const RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
447 {
448 fLock.lock_shared();
449 }
454 ~RSharedDescriptorGuard() { fLock.unlock_shared(); }
455 const RNTupleDescriptor *operator->() const { return &fDescriptor; }
456 const RNTupleDescriptor &GetRef() const { return fDescriptor; }
457 };
458
459 /// An RAII wrapper used for the writable access to RPageSource::fDescriptor. See GetSharedDescriptorGuard().
462 std::shared_mutex &fLock;
463
464 public:
465 RExclDescriptorGuard(RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
466 {
467 fLock.lock();
468 }
474 {
475 fDescriptor.IncGeneration();
476 fLock.unlock();
477 }
478 RNTupleDescriptor *operator->() const { return &fDescriptor; }
479 void MoveIn(RNTupleDescriptor &&desc) { fDescriptor = std::move(desc); }
480 };
481
482private:
484 mutable std::shared_mutex fDescriptorLock;
485 REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
486
487protected:
488 /// Default I/O performance counters that get registered in fMetrics
489 struct RCounters {
507 };
508
509 /// Keeps track of the requested physical column IDs. When using alias columns (projected fields), physical
510 /// columns may be requested multiple times.
512 private:
513 std::vector<DescriptorId_t> fIDs;
514 std::vector<std::size_t> fRefCounters;
515
516 public:
517 void Insert(DescriptorId_t physicalColumnID);
518 void Erase(DescriptorId_t physicalColumnID);
519 RCluster::ColumnSet_t ToColumnSet() const;
520 };
521
522 std::unique_ptr<RCounters> fCounters;
523
525 /// The active columns are implicitly defined by the model fields or views
527
528 /// Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
529 /// Not all page sources need a decompressor (e.g. virtual ones for chains and friends don't), thus we
530 /// leave it up to the derived class whether or not the decompressor gets constructed.
531 std::unique_ptr<RNTupleDecompressor> fDecompressor;
532 /// Populated pages might be shared; the page pool might, at some point, be used by multiple page sources
533 std::shared_ptr<RPagePool> fPagePool;
534
536 // Only called if a task scheduler is set. No-op be default.
537 virtual void UnzipClusterImpl(RCluster *cluster);
538
539 /// Prepare a page range read for the column set in `clusterKey`. Specifically, pages referencing the
540 /// `kTypePageZero` locator are filled in `pageZeroMap`; otherwise, `perPageFunc` is called for each page. This is
541 /// commonly used as part of `LoadClusters()` in derived classes.
542 void PrepareLoadCluster(
543 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
544 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc);
545
546 /// Enables the default set of metrics provided by RPageSource. `prefix` will be used as the prefix for
547 /// the counters registered in the internal RNTupleMetrics object.
548 /// A subclass using the default set of metrics is responsible for updating the counters
549 /// appropriately, e.g. `fCounters->fNRead.Inc()`
550 /// Alternatively, a subclass might provide its own RNTupleMetrics object by overriding the
551 /// GetMetrics() member function.
552 void EnableDefaultMetrics(const std::string &prefix);
553
554 /// Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further information.
555 RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }
556
557public:
558 RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions);
559 RPageSource(const RPageSource&) = delete;
563 ~RPageSource() override;
564 /// Guess the concrete derived page source from the file name (location)
565 static std::unique_ptr<RPageSource> Create(std::string_view ntupleName, std::string_view location,
566 const RNTupleReadOptions &options = RNTupleReadOptions());
567 /// Open the same storage multiple time, e.g. for reading in multiple threads
568 virtual std::unique_ptr<RPageSource> Clone() const = 0;
569
571 const RNTupleReadOptions &GetReadOptions() const { return fOptions; }
572
573 /// Takes the read lock for the descriptor. Multiple threads can take the lock concurrently.
574 /// The underlying std::shared_mutex, however, is neither read nor write recursive:
575 /// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special
576 /// care in sections protected by GetSharedDescriptorGuard() and GetExclDescriptorGuard() especially to avoid that
577 /// the locks are acquired indirectly (e.g. by a call to GetNEntries()).
578 /// As a general guideline, no other method of the page source should be called (directly or indirectly) in a
579 /// guarded section.
581 {
582 return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
583 }
584
585 ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override;
586 void DropColumn(ColumnHandle_t columnHandle) override;
587
588 /// Open the physical storage container for the tree
589 void Attach() { GetExclDescriptorGuard().MoveIn(AttachImpl()); }
590 NTupleSize_t GetNEntries();
591 NTupleSize_t GetNElements(ColumnHandle_t columnHandle);
592 ColumnId_t GetColumnId(ColumnHandle_t columnHandle);
593
594 /// Promise to only read from the given entry range. If set, prevents the cluster pool from reading-ahead beyond
595 /// the given range. The range needs to be within [0, GetNEntries()).
596 void SetEntryRange(const REntryRange &range);
597 REntryRange GetEntryRange() const { return fEntryRange; }
598
599 /// Allocates and fills a page that contains the index-th element
600 virtual RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) = 0;
601 /// Another version of PopulatePage that allows to specify cluster-relative indexes
602 virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0;
603
604 /// Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage. The sealed page
605 /// can be used subsequently in a call to RPageSink::CommitSealedPage.
606 /// The fSize and fNElements member of the sealedPage parameters are always set. If sealedPage.fBuffer is nullptr,
607 /// no data will be copied but the returned size information can be used by the caller to allocate a large enough
608 /// buffer and call LoadSealedPage again.
609 virtual void
610 LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) = 0;
611
612 /// Helper for unstreaming a page. This is commonly used in derived, concrete page sources. The implementation
613 /// currently always makes a memory copy, even if the sealed page is uncompressed and in the final memory layout.
614 /// The optimization of directly mapping pages is left to the concrete page source implementations.
615 /// Usage of this method requires construction of fDecompressor. Memory is allocated via
616 /// `RPageAllocatorHeap`; use `RPageAllocatorHeap::DeletePage()` to deallocate returned pages.
617 RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId);
618
619 /// Populates all the pages of the given cluster ids and columns; it is possible that some columns do not
620 /// contain any pages. The page source may load more columns than the minimal necessary set from `columns`.
621 /// To indicate which columns have been loaded, LoadClusters() must mark them with SetColumnAvailable().
622 /// That includes the ones from the `columns` that don't have pages; otherwise subsequent requests
623 /// for the cluster would assume an incomplete cluster and trigger loading again.
624 /// LoadClusters() is typically called from the I/O thread of a cluster pool, i.e. the method runs
625 /// concurrently to other methods of the page source.
626 virtual std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) = 0;
627
628 /// Parallel decompression and unpacking of the pages in the given cluster. The unzipped pages are supposed
629 /// to be preloaded in a page pool attached to the source. The method is triggered by the cluster pool's
630 /// unzip thread. It is an optional optimization, the method can safely do nothing. In particular, the
631 /// actual implementation will only run if a task scheduler is set. In practice, a task scheduler is set
632 /// if implicit multi-threading is turned on.
633 void UnzipCluster(RCluster *cluster);
634}; // class RPageSource
635
636} // namespace Internal
637
638} // namespace Experimental
639} // namespace ROOT
640
641#endif
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:290
#define d(i)
Definition RSha256.hxx:102
#define b(i)
Definition RSha256.hxx:100
#define e(i)
Definition RSha256.hxx:103
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
char name[80]
Definition TGX11.cxx:110
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
An either thread-safe or non thread safe counter for CPU ticks.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
A column element encapsulates the translation between basic C++ types and their column representation...
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:43
A helper class for piece-wise construction of an RNTupleDescriptor.
The serialization context is used for the piecewise serialization of a descriptor.
std::unordered_map< Int_t, TVirtualStreamerInfo * > StreamerInfoMap_t
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
Base class for a sink with a physical storage backend.
RPagePersistentSink(const RPagePersistentSink &)=delete
RPagePersistentSink(RPagePersistentSink &&)=default
std::uint64_t fNextClusterInGroup
Remembers the starting cluster id for the next cluster group.
RPagePersistentSink & operator=(RPagePersistentSink &&)=default
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
RNTupleSerializer::RContext fSerializationContext
Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization.
virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page)=0
void InitFromDescriptor(const RNTupleDescriptor &descriptor)
Initialize sink based on an existing descriptor and fill into the descriptor builder.
virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length)=0
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
NTupleSize_t fPrevClusterNEntries
Used to calculate the number of entries in the current cluster.
std::vector< RClusterDescriptor::RPageRange > fOpenPageRanges
Keeps track of the written pages in the currently open cluster. Indexed by column id.
std::uint64_t CommitCluster(NTupleSize_t nEntries) final
Finalize the current cluster and create a new one for the following data.
const RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final
Write a page to the storage. The column must have been added before.
ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final
Register a new column.
virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)=0
RPagePersistentSink & operator=(const RPagePersistentSink &)=delete
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const RNTupleWriteOptions &options=RNTupleWriteOptions())
Guess the concrete derived page source from the location.
Internal::RNTupleDescriptorBuilder fDescriptorBuilder
RNTupleSerializer::StreamerInfoMap_t fStreamerInfos
Union of the streamer info records that are sent from unsplit fields to the sink before committing th...
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
std::vector< RClusterDescriptor::RColumnRange > fOpenColumnRanges
Keeps track of the number of elements in the currently open cluster. Indexed by column id.
virtual std::uint64_t CommitClusterImpl()=0
Returns the number of bytes written to storage (excluding metadata)
void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges)
Vector commit of preprocessed pages.
void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
virtual RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)=0
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
RSinkGuard & operator=(const RSinkGuard &)=delete
RSinkGuard & operator=(RSinkGuard &&)=delete
Abstract interface to write data into an ntuple.
std::vector< unsigned char > fSealPageBuffer
Used as destination buffer in the simple SealPage overload.
std::vector< Callback_t > fOnDatasetCommitCallbacks
RPageSink & operator=(RPageSink &&)=default
bool fIsInitialized
Flag if sink was initialized.
virtual void UpdateExtraTypeInfo(const RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual const RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual void CommitPage(ColumnHandle_t columnHandle, const RPage &page)=0
Write a page to the storage. The column must have been added before.
const RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
RPageSink & operator=(const RPageSink &)=delete
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)=0
Get a new, empty page for the given column that can be filled with up to nElements.
virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries)=0
Finalize the current cluster and create a new one for the following data.
void RegisterOnCommitDatasetCallback(Callback_t callback)
The registered callback is executed at the beginning of CommitDataset();.
RPageSink(const RPageSink &)=delete
std::function< void(RPageSink &)> Callback_t
virtual void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)=0
Write a preprocessed page to storage. The column must have been added before.
virtual void InitImpl(RNTupleModel &model)=0
void DropColumn(ColumnHandle_t) final
Unregisters a column.
EPageStorageType GetType() final
Whether the concrete implementation is a sink or a source.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
std::unique_ptr< RNTupleWriteOptions > fOptions
RSealedPage SealPage(const RPage &page, const RColumnElementBase &element)
Helper for streaming a page.
Keeps track of the requested physical column IDs.
An RAII wrapper used for the writable access to RPageSource::fDescriptor. See GetSharedDescriptorGuar...
RExclDescriptorGuard(RNTupleDescriptor &desc, std::shared_mutex &lock)
RExclDescriptorGuard(const RExclDescriptorGuard &)=delete
RExclDescriptorGuard & operator=(RExclDescriptorGuard &&)=delete
RExclDescriptorGuard & operator=(const RExclDescriptorGuard &)=delete
An RAII wrapper used for the read-only access to RPageSource::fDescriptor. See GetExclDescriptorGuard...
RSharedDescriptorGuard & operator=(RSharedDescriptorGuard &&)=delete
RSharedDescriptorGuard(const RSharedDescriptorGuard &)=delete
RSharedDescriptorGuard(const RNTupleDescriptor &desc, std::shared_mutex &lock)
RSharedDescriptorGuard & operator=(const RSharedDescriptorGuard &)=delete
Abstract interface to read data from an ntuple.
virtual void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage)=0
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
EPageStorageType GetType() final
Whether the concrete implementation is a sink or a source.
RPageSource(const RPageSource &)=delete
RPageSource & operator=(RPageSource &&)=delete
std::unique_ptr< RCounters > fCounters
std::shared_ptr< RPagePool > fPagePool
Populated pages might be shared; the page pool might, at some point, be used by multiple page sources...
virtual RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex)=0
Allocates and fills a page that contains the index-th element.
RExclDescriptorGuard GetExclDescriptorGuard()
Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further informatio...
RActivePhysicalColumns fActivePhysicalColumns
The active columns are implicitly defined by the model fields or views.
RPageSource & operator=(const RPageSource &)=delete
virtual RNTupleDescriptor AttachImpl()=0
virtual std::unique_ptr< RPageSource > Clone() const =0
Open the same storage multiple time, e.g. for reading in multiple threads.
const RNTupleReadOptions & GetReadOptions() const
virtual std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys)=0
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
REntryRange fEntryRange
Used by the cluster pool to prevent reading beyond the given range.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex)=0
Another version of PopulatePage that allows to specify cluster-relative indexes.
void Attach()
Open the physical storage container for the tree.
The interface of a task scheduler to schedule page (de)compression tasks.
virtual void Wait()=0
Blocks until all scheduled tasks finished.
virtual void AddTask(const std::function< void(void)> &taskFunc)=0
Take a callable that represents a task.
Common functionality of an ntuple storage for both reading and writing.
std::deque< RSealedPage > SealedPageSequence_t
virtual ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column)=0
Register a new column.
virtual void DropColumn(ColumnHandle_t columnHandle)=0
Unregisters a column.
virtual Detail::RNTupleMetrics & GetMetrics()
Returns the default metrics object.
const std::string & GetNTupleName() const
Returns the NTuple name.
virtual EPageStorageType GetType()=0
Whether the concrete implementation is a sink or a source.
RPageStorage & operator=(const RPageStorage &other)=delete
RPageStorage(const RPageStorage &other)=delete
RPageStorage(RPageStorage &&other)=default
virtual void ReleasePage(RPage &page)=0
Every page store needs to be able to free pages it handed out.
void SetTaskScheduler(RTaskScheduler *taskScheduler)
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
Meta-data for a set of ntuple clusters.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
Field specific extra type information from the header / extenstion header.
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
Common user-tunable settings for storing ntuples.
const Int_t n
Definition legend1.C:16
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
The incremental changes to a RNTupleModel
Default I/O performance counters that get registered in fMetrics.
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuZip
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuWrite
Default I/O performance counters that get registered in fMetrics.
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuUnzip
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuRead
A range of sealed pages referring to the same column that can be used for vector commit.
RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RSealedPage(const void *b, std::uint32_t s, std::uint32_t n)
RSealedPage & operator=(const RSealedPage &other)=default
We do not need to store the element size / uncompressed page size because we know to which column the...
Generic information about the physical location of data.