Logo ROOT  
Reference Guide
 
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
Loading...
Searching...
No Matches
RPageStorage.hxx
Go to the documentation of this file.
1/// \file ROOT/RPageStorage.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2018-07-19
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT7_RPageStorage
17#define ROOT7_RPageStorage
18
19#include <ROOT/RError.hxx>
20#include <ROOT/RCluster.hxx>
27#include <ROOT/RNTupleUtil.hxx>
28#include <ROOT/RPage.hxx>
29#include <ROOT/RPagePool.hxx>
30#include <ROOT/RSpan.hxx>
31#include <string_view>
32
33#include <atomic>
34#include <cassert>
35#include <cstddef>
36#include <deque>
37#include <functional>
38#include <map>
39#include <memory>
40#include <mutex>
41#include <set>
42#include <shared_mutex>
43#include <unordered_map>
44#include <unordered_set>
45#include <vector>
46
47namespace ROOT {
48
49namespace Internal {
50class RPageAllocator;
51class RColumn;
52struct RNTupleModelChangeset;
53}
54
55class RNTupleModel;
56
57namespace Experimental {
58
59namespace Internal {
60
61enum class EPageStorageType {
62 kSink,
63 kSource,
64};
65
66// clang-format off
67/**
68\class ROOT::Experimental::Internal::RPageStorage
69\ingroup NTuple
70\brief Common functionality of an ntuple storage for both reading and writing
71
72The RPageStore provides access to a storage container that keeps the bits of pages and clusters comprising
73an ntuple. Concrete implementations can use a TFile, a raw file, an object store, and so on.
74*/
75// clang-format on
77public:
78 /// The page checksum is a 64bit xxhash3
79 static constexpr std::size_t kNBytesPageChecksum = sizeof(std::uint64_t);
80
81 /// The interface of a task scheduler to schedule page (de)compression tasks
83 public:
84 virtual ~RTaskScheduler() = default;
85 /// Take a callable that represents a task
86 virtual void AddTask(const std::function<void(void)> &taskFunc) = 0;
87 /// Blocks until all scheduled tasks finished
88 virtual void Wait() = 0;
89 };
90
91 /// A sealed page contains the bytes of a page as written to storage (packed & compressed). It is used
92 /// as an input to UnsealPages() as well as to transfer pages between different storage media.
93 /// RSealedPage does _not_ own the buffer it is pointing to in order to not interfere with the memory management
94 /// of concrete page sink and page source implementations.
95 struct RSealedPage {
96 private:
97 const void *fBuffer = nullptr;
98 std::size_t fBufferSize = 0; ///< Size of the page payload and the trailing checksum (if available)
99 std::uint32_t fNElements = 0;
100 bool fHasChecksum = false; ///< If set, the last 8 bytes of the buffer are the xxhash of the rest of the buffer
101
102 public:
103 RSealedPage() = default;
104 RSealedPage(const void *buffer, std::size_t bufferSize, std::uint32_t nElements, bool hasChecksum = false)
106 {
107 }
108 RSealedPage(const RSealedPage &other) = default;
112
113 const void *GetBuffer() const { return fBuffer; }
114 void SetBuffer(const void *buffer) { fBuffer = buffer; }
115
121 std::size_t GetBufferSize() const { return fBufferSize; }
122 void SetBufferSize(std::size_t bufferSize) { fBufferSize = bufferSize; }
123
124 std::uint32_t GetNElements() const { return fNElements; }
125 void SetNElements(std::uint32_t nElements) { fNElements = nElements; }
126
127 bool GetHasChecksum() const { return fHasChecksum; }
129
130 void ChecksumIfEnabled();
132 /// Returns a failure if the sealed page has no checksum
134 };
135
136 using SealedPageSequence_t = std::deque<RSealedPage>;
137 /// A range of sealed pages referring to the same column that can be used for vector commit
140 SealedPageSequence_t::const_iterator fFirst;
141 SealedPageSequence_t::const_iterator fLast;
142
143 RSealedPageGroup() = default;
144 RSealedPageGroup(ROOT::DescriptorId_t d, SealedPageSequence_t::const_iterator b,
145 SealedPageSequence_t::const_iterator e)
147 {
148 }
149 };
150
151protected:
153
154 /// For the time being, we will use the heap allocator for all sources and sinks. This may change in the future.
155 std::unique_ptr<ROOT::Internal::RPageAllocator> fPageAllocator;
156
157 std::string fNTupleName;
160 {
161 if (!fTaskScheduler)
162 return;
164 }
165
166public:
167 explicit RPageStorage(std::string_view name);
172 virtual ~RPageStorage();
173
174 /// Whether the concrete implementation is a sink or a source
176
180
181 /// Returns true for a valid column handle; fColumn and fPhysicalId should always either both
182 /// be valid or both be invalid.
183 explicit operator bool() const { return fPhysicalId != ROOT::kInvalidDescriptorId && fColumn; }
184 };
185 /// The column handle identifies a column with the current open page storage
187
188 /// Register a new column. When reading, the column must exist in the ntuple on disk corresponding to the metadata.
189 /// When writing, every column can only be attached once.
191 /// Unregisters a column. A page source decreases the reference counter for the corresponding active column.
192 /// For a page sink, dropping columns is currently a no-op.
195
196 /// Returns the default metrics object. Subclasses might alternatively provide their own metrics object by
197 /// overriding this.
199
200 /// Returns the NTuple name.
201 const std::string &GetNTupleName() const { return fNTupleName; }
202
204}; // class RPageStorage
205
206// clang-format off
207/**
208\class ROOT::Experimental::Internal::RWritePageMemoryManager
209\ingroup NTuple
210\brief Helper to maintain a memory budget for the write pages of a set of columns
211
212The memory manager keeps track of the sum of bytes used by the write pages of a set of columns.
213It will flush (and shrink) large pages of other columns on the attempt to expand a page.
214*/
215// clang-format on
217private:
218 struct RColumnInfo {
220 std::size_t fCurrentPageSize = 0;
221 std::size_t fInitialPageSize = 0;
222
223 bool operator>(const RColumnInfo &other) const;
224 };
225
226 /// Sum of all the write page sizes (their capacity) of the columns in `fColumnsSortedByPageSize`
227 std::size_t fCurrentAllocatedBytes = 0;
228 /// Maximum allowed value for `fCurrentAllocatedBytes`, set from RNTupleWriteOptions::fPageBufferBudget
229 std::size_t fMaxAllocatedBytes = 0;
230 /// All columns that called `ReservePage()` (hence `TryUpdate()`) at least once,
231 /// sorted by their current write page size from large to small
232 std::set<RColumnInfo, std::greater<RColumnInfo>> fColumnsSortedByPageSize;
233
234 /// Flush columns in order of allocated write page size until the sum of all write page allocations
235 /// leaves space for at least targetAvailableSize bytes. Only use columns with a write page size larger
236 /// than pageSizeLimit.
237 bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit);
238
239public:
241
242 /// Try to register the new write page size for the given column. Flush large columns to make space, if necessary.
243 /// If not enough space is available after all (sum of write pages would be larger than fMaxAllocatedBytes),
244 /// return false.
245 bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize);
246};
247
248// clang-format off
249/**
250\class ROOT::Experimental::Internal::RPageSink
251\ingroup NTuple
252\brief Abstract interface to write data into an ntuple
253
254The page sink takes the list of columns and afterwards a series of page commits and cluster commits.
255The user is responsible to commit clusters at a consistent point, i.e. when all pages corresponding to data
256up to the given entry number are committed.
257
258An object of this class may either be a wrapper (for example a RPageSinkBuf) or a "persistent" sink,
259inheriting from RPagePersistentSink.
260*/
261// clang-format on
262class RPageSink : public RPageStorage {
263public:
264 using Callback_t = std::function<void(RPageSink &)>;
265
266 /// Cluster that was staged, but not yet logically appended to the RNTuple
280
281protected:
282 std::unique_ptr<ROOT::RNTupleWriteOptions> fOptions;
283
284 /// Flag if sink was initialized
285 bool fIsInitialized = false;
286
287 /// Helper for streaming a page. This is commonly used in derived, concrete page sinks. Note that if
288 /// compressionSetting is 0 (uncompressed) and the page is mappable and not checksummed, the returned sealed page
289 /// will point directly to the input page buffer. Otherwise, the sealed page references fSealPageBuffer. Thus,
290 /// the buffer pointed to by the RSealedPage should never be freed.
292
293private:
294 std::vector<Callback_t> fOnDatasetCommitCallbacks;
295 std::vector<unsigned char> fSealPageBuffer; ///< Used as destination buffer in the simple SealPage overload
296
297 /// Used in ReservePage to maintain the page buffer budget
299
300public:
301 RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options);
302
303 RPageSink(const RPageSink &) = delete;
304 RPageSink &operator=(const RPageSink &) = delete;
305 RPageSink(RPageSink &&) = default;
307 ~RPageSink() override;
308
310 /// Returns the sink's write options.
312
313 void DropColumn(ColumnHandle_t /*columnHandle*/) final {}
314
315 bool IsInitialized() const { return fIsInitialized; }
316
317 /// Return the RNTupleDescriptor being constructed.
318 virtual const ROOT::RNTupleDescriptor &GetDescriptor() const = 0;
319
320 virtual ROOT::NTupleSize_t GetNEntries() const = 0;
321
322 /// Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket)
323 /// Init() associates column handles to the columns referenced by the model
324 void Init(RNTupleModel &model)
325 {
326 if (fIsInitialized) {
327 throw RException(R__FAIL("already initialized"));
328 }
329 fIsInitialized = true;
330 InitImpl(model);
331 }
332
333protected:
334 virtual void InitImpl(RNTupleModel &model) = 0;
335 virtual void CommitDatasetImpl() = 0;
336
337public:
338 /// Parameters for the SealPage() method
340 const ROOT::Internal::RPage *fPage = nullptr; ///< Input page to be sealed
342 nullptr; ///< Corresponds to the page's elements, for size calculation etc.
343 std::uint32_t fCompressionSettings = 0; ///< Compression algorithm and level to apply
344 /// Adds a 8 byte little-endian xxhash3 checksum to the page payload. The buffer has to be large enough to
345 /// to store the additional 8 bytes.
346 bool fWriteChecksum = true;
347 /// If false, the output buffer must not point to the input page buffer, which would otherwise be an option
348 /// if the page is mappable and should not be compressed
349 bool fAllowAlias = false;
350 /// Location for sealed output. The memory buffer has to be large enough.
351 void *fBuffer = nullptr;
352 };
353
354 /// Seal a page using the provided info.
355 static RSealedPage SealPage(const RSealPageConfig &config);
356
357 /// Incorporate incremental changes to the model into the ntuple descriptor. This happens, e.g. if new fields were
358 /// added after the initial call to `RPageSink::Init(RNTupleModel &)`.
359 /// `firstEntry` specifies the global index for the first stored element in the added columns.
361 /// Adds an extra type information record to schema. The extra type information will be written to the
362 /// extension header. The information in the record will be merged with the existing information, e.g.
363 /// duplicate streamer info records will be removed. This method is called by the "on commit dataset" callback
364 /// registered by specific fields (e.g., streamer field) and during merging.
366
367 /// Commits a suppressed column for the current cluster. Can be called anytime before CommitCluster().
368 /// For any given column and cluster, there must be no calls to both CommitSuppressedColumn() and page commits.
370 /// Write a page to the storage. The column must have been added before.
372 /// Write a preprocessed page to storage. The column must have been added before.
373 virtual void
375 /// Write a vector of preprocessed pages to storage. The corresponding columns must have been added before.
376 virtual void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) = 0;
377 /// Stage the current cluster and create a new one for the following data.
378 /// Returns the object that must be passed to CommitStagedClusters to logically append the staged cluster to the
379 /// ntuple descriptor.
381 /// Commit staged clusters, logically appending them to the ntuple descriptor.
382 virtual void CommitStagedClusters(std::span<RStagedCluster> clusters) = 0;
383 /// Finalize the current cluster and create a new one for the following data.
384 /// Returns the number of bytes written to storage (excluding metadata).
391 /// Write out the page locations (page list envelope) for all the committed clusters since the last call of
392 /// CommitClusterGroup (or the beginning of writing).
393 virtual void CommitClusterGroup() = 0;
394
395 /// The registered callback is executed at the beginning of CommitDataset();
397 /// Run the registered callbacks and finalize the current cluster and the entrire data set.
398 void CommitDataset();
399
400 /// Get a new, empty page for the given column that can be filled with up to nElements;
401 /// nElements must be larger than zero.
403
404 /// An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
406 std::mutex *fLock;
407
408 public:
409 explicit RSinkGuard(std::mutex *lock) : fLock(lock)
410 {
411 if (fLock != nullptr) {
412 fLock->lock();
413 }
414 }
415 RSinkGuard(const RSinkGuard &) = delete;
416 RSinkGuard &operator=(const RSinkGuard &) = delete;
417 RSinkGuard(RSinkGuard &&) = delete;
420 {
421 if (fLock != nullptr) {
422 fLock->unlock();
423 }
424 }
425 };
426
428 {
429 // By default, there is no lock and the guard does nothing.
430 return RSinkGuard(nullptr);
431 }
432}; // class RPageSink
433
434// clang-format off
435/**
436\class ROOT::Experimental::Internal::RPagePersistentSink
437\ingroup NTuple
438\brief Base class for a sink with a physical storage backend
439*/
440// clang-format on
442private:
443 /// Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization
445
446 /// Remembers the starting cluster id for the next cluster group
447 std::uint64_t fNextClusterInGroup = 0;
448 /// Used to calculate the number of entries in the current cluster
450 /// Keeps track of the number of elements in the currently open cluster. Indexed by column id.
451 std::vector<ROOT::RClusterDescriptor::RColumnRange> fOpenColumnRanges;
452 /// Keeps track of the written pages in the currently open cluster. Indexed by column id.
453 std::vector<ROOT::RClusterDescriptor::RPageRange> fOpenPageRanges;
454
455 /// Union of the streamer info records that are sent from streamer fields to the sink before committing the dataset.
457
458protected:
459 /// Set of optional features supported by the persistent sink
460 struct RFeatures {
461 bool fCanMergePages = false;
462 };
463
466
467 /// Default I/O performance counters that get registered in fMetrics
477 std::unique_ptr<RCounters> fCounters;
478
479 virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length) = 0;
480
482 virtual RNTupleLocator
484 /// Vector commit of preprocessed pages. The `ranges` array specifies a range of sealed pages to be
485 /// committed for each column. The returned vector contains, in order, the RNTupleLocator for each
486 /// page on each range in `ranges`, i.e. the first N entries refer to the N pages in `ranges[0]`,
487 /// followed by M entries that refer to the M pages in `ranges[1]`, etc.
488 /// The mask allows to skip writing out certain pages. The vector has the size of all the pages.
489 /// For every `false` value in the mask, the corresponding locator is skipped (missing) in the output vector.
490 /// The default is to call `CommitSealedPageImpl` for each page; derived classes may provide an
491 /// optimized implementation though.
492 virtual std::vector<RNTupleLocator>
493 CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask);
494 /// Returns the number of bytes written to storage (excluding metadata)
495 virtual std::uint64_t StageClusterImpl() = 0;
496 /// Returns the locator of the page list envelope of the given buffer that contains the serialized page list.
497 /// Typically, the implementation takes care of compressing and writing the provided buffer.
498 virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) = 0;
499 virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) = 0;
500
501 /// Enables the default set of metrics provided by RPageSink. `prefix` will be used as the prefix for
502 /// the counters registered in the internal RNTupleMetrics object.
503 /// This set of counters can be extended by a subclass by calling `fMetrics.MakeCounter<...>()`.
504 ///
505 /// A subclass using the default set of metrics is always responsible for updating the counters
506 /// appropriately, e.g. `fCounters->fNPageCommited.Inc()`
507 void EnableDefaultMetrics(const std::string &prefix);
508
509public:
510 RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options);
511
516 ~RPagePersistentSink() override;
517
518 /// Guess the concrete derived page source from the location
519 static std::unique_ptr<RPageSink> Create(std::string_view ntupleName, std::string_view location,
521
523
525
527
528 /// Updates the descriptor and calls InitImpl() that handles the backend-specific details (file, DAOS, etc.)
529 void InitImpl(RNTupleModel &model) final;
532
533 /// Initialize sink based on an existing descriptor and fill into the descriptor builder, optionally copying over
534 /// the descriptor's clusters to this sink's descriptor.
535 /// \return The model created from the new sink's descriptor. This model should be kept alive
536 /// for at least as long as the sink.
537 [[nodiscard]] std::unique_ptr<RNTupleModel>
539
543 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
544 RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
545 void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
548}; // class RPagePersistentSink
549
550// clang-format off
551/**
552\class ROOT::Experimental::Internal::RPageSource
553\ingroup NTuple
554\brief Abstract interface to read data from an ntuple
555
556The page source is initialized with the columns of interest. Alias columns from projected fields are mapped to the
557corresponding physical columns. Pages from the columns of interest can then be mapped into memory.
558The page source also gives access to the ntuple's metadata.
559*/
560// clang-format on
562public:
563 /// Used in SetEntryRange / GetEntryRange
564 struct REntryRange {
566 ROOT::NTupleSize_t fNEntries = 0;
567
568 /// Returns true if the given cluster has entries within the entry range
569 bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const;
570 };
571
572 /// An RAII wrapper used for the read-only access to `RPageSource::fDescriptor`. See `GetExclDescriptorGuard()``.
575 std::shared_mutex &fLock;
576
577 public:
578 RSharedDescriptorGuard(const ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock)
579 : fDescriptor(desc), fLock(lock)
580 {
581 fLock.lock_shared();
582 }
587 ~RSharedDescriptorGuard() { fLock.unlock_shared(); }
588 const ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
589 const ROOT::RNTupleDescriptor &GetRef() const { return fDescriptor; }
590 };
591
592 /// An RAII wrapper used for the writable access to `RPageSource::fDescriptor`. See `GetSharedDescriptorGuard()`.
595 std::shared_mutex &fLock;
596
597 public:
598 RExclDescriptorGuard(ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
599 {
600 fLock.lock();
601 }
607 {
608 fDescriptor.IncGeneration();
609 fLock.unlock();
610 }
611 ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
612 void MoveIn(ROOT::RNTupleDescriptor desc) { fDescriptor = std::move(desc); }
613 };
614
615private:
617 mutable std::shared_mutex fDescriptorLock;
618 REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
619 bool fHasStructure = false; ///< Set to true once `LoadStructure()` is called
620 bool fIsAttached = false; ///< Set to true once `Attach()` is called
621
622 /// Remembers the last cluster id from which a page was requested
624 /// Clusters from where pages got preloaded in UnzipClusterImpl(), ordered by first entry number
625 /// of the clusters. If the last used cluster changes in LoadPage(), all unused pages from
626 /// previous clusters are evicted from the page pool.
627 std::map<ROOT::NTupleSize_t, ROOT::DescriptorId_t> fPreloadedClusters;
628
629 /// Does nothing if fLastUsedCluster == clusterId. Otherwise, updated fLastUsedCluster
630 /// and evict unused paged from the page pool of all previous clusters.
631 /// Must not be called when the descriptor guard is taken.
632 void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId);
633
634protected:
635 /// Default I/O performance counters that get registered in `fMetrics`
655
656 /// Keeps track of the requested physical column IDs and their in-memory target type via a column element identifier.
657 /// When using alias columns (projected fields), physical columns may be requested multiple times.
659 public:
664
665 private:
666 /// Maps physical column IDs to all the requested in-memory representations.
667 /// A pair of physical column ID and in-memory representation can be requested multiple times, which is
668 /// indicated by the reference counter.
669 /// We can only have a handful of possible in-memory representations for a given column,
670 /// so it is fine to search them linearly.
671 std::unordered_map<ROOT::DescriptorId_t, std::vector<RColumnInfo>> fColumnInfos;
672
673 public:
676 RCluster::ColumnSet_t ToColumnSet() const;
678 {
679 return fColumnInfos.count(physicalColumnId) > 0;
680 }
681 const std::vector<RColumnInfo> &GetColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
682 {
683 return fColumnInfos.at(physicalColumnId);
684 }
685 };
686
687 /// Summarizes cluster-level information that are necessary to load a certain page.
688 /// Used by LoadPageImpl().
690 ROOT::DescriptorId_t fClusterId = 0;
691 /// Location of the page on disk
693 /// The first element number of the page's column in the given cluster
694 std::uint64_t fColumnOffset = 0;
695 };
696
697 std::unique_ptr<RCounters> fCounters;
698
700 /// The active columns are implicitly defined by the model fields or views
702
703 /// Pages that are unzipped with IMT are staged into the page pool
705
706 virtual void LoadStructureImpl() = 0;
707 /// `LoadStructureImpl()` has been called before `AttachImpl()` is called
709 /// Returns a new, unattached page source for the same data set
710 virtual std::unique_ptr<RPageSource> CloneImpl() const = 0;
711 // Only called if a task scheduler is set. No-op be default.
712 virtual void UnzipClusterImpl(RCluster *cluster);
713 // Returns a page from storage if not found in the page pool. Should be able to handle zero page locators.
716
717 /// Prepare a page range read for the column set in `clusterKey`. Specifically, pages referencing the
718 /// `kTypePageZero` locator are filled in `pageZeroMap`; otherwise, `perPageFunc` is called for each page. This is
719 /// commonly used as part of `LoadClusters()` in derived classes.
720 void PrepareLoadCluster(
724
725 /// Enables the default set of metrics provided by RPageSource. `prefix` will be used as the prefix for
726 /// the counters registered in the internal RNTupleMetrics object.
727 /// A subclass using the default set of metrics is responsible for updating the counters
728 /// appropriately, e.g. `fCounters->fNRead.Inc()`
729 /// Alternatively, a subclass might provide its own RNTupleMetrics object by overriding the
730 /// `GetMetrics()` member function.
731 void EnableDefaultMetrics(const std::string &prefix);
732
733 /// Note that the underlying lock is not recursive. See `GetSharedDescriptorGuard()` for further information.
734 RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }
735
736public:
738 RPageSource(const RPageSource &) = delete;
742 ~RPageSource() override;
743 /// Guess the concrete derived page source from the file name (location)
744 static std::unique_ptr<RPageSource> Create(std::string_view ntupleName, std::string_view location,
746 /// Open the same storage multiple time, e.g. for reading in multiple threads.
747 /// If the source is already attached, the clone will be attached, too. The clone will use, however,
748 /// it's own connection to the underlying storage (e.g., file descriptor, XRootD handle, etc.)
749 std::unique_ptr<RPageSource> Clone() const;
750
751 /// Helper for unstreaming a page. This is commonly used in derived, concrete page sources. The implementation
752 /// currently always makes a memory copy, even if the sealed page is uncompressed and in the final memory layout.
753 /// The optimization of directly mapping pages is left to the concrete page source implementations.
757
760
761 /// Takes the read lock for the descriptor. Multiple threads can take the lock concurrently.
762 /// The underlying `std::shared_mutex`, however, is neither read nor write recursive:
763 /// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special
764 /// care in sections protected by `GetSharedDescriptorGuard()` and `GetExclDescriptorGuard()` especially to avoid
765 /// that the locks are acquired indirectly (e.g. by a call to `GetNEntries()`). As a general guideline, no other
766 /// method of the page source should be called (directly or indirectly) in a guarded section.
768 {
769 return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
770 }
771
774
775 /// Loads header and footer without decompressing or deserializing them. This can be used to asynchronously open
776 /// a file in the background. The method is idempotent and it is called as a first step in `Attach()`.
777 /// Pages sources may or may not make use of splitting loading and processing metadata.
778 /// Therefore, `LoadStructure()` may do nothing and defer loading the metadata to `Attach()`.
779 void LoadStructure();
780 /// Open the physical storage container and deserialize header and footer
781 void Attach(
785
786 /// Promise to only read from the given entry range. If set, prevents the cluster pool from reading-ahead beyond
787 /// the given range. The range needs to be within `[0, GetNEntries())`.
788 void SetEntryRange(const REntryRange &range);
789 REntryRange GetEntryRange() const { return fEntryRange; }
790
791 /// Allocates and fills a page that contains the index-th element. The default implementation searches
792 /// the page and calls LoadPageImpl(). Returns a default-constructed RPage for suppressed columns.
794 /// Another version of `LoadPage` that allows to specify cluster-relative indexes.
795 /// Returns a default-constructed RPage for suppressed columns.
797
798 /// Read the packed and compressed bytes of a page into the memory buffer provided by `sealedPage`. The sealed page
799 /// can be used subsequently in a call to `RPageSink::CommitSealedPage`.
800 /// The `fSize` and `fNElements` member of the sealedPage parameters are always set. If `sealedPage.fBuffer` is
801 /// `nullptr`, no data will be copied but the returned size information can be used by the caller to allocate a large
802 /// enough buffer and call `LoadSealedPage` again.
803 virtual void
805
806 /// Populates all the pages of the given cluster ids and columns; it is possible that some columns do not
807 /// contain any pages. The page source may load more columns than the minimal necessary set from `columns`.
808 /// To indicate which columns have been loaded, `LoadClusters()`` must mark them with `SetColumnAvailable()`.
809 /// That includes the ones from the `columns` that don't have pages; otherwise subsequent requests
810 /// for the cluster would assume an incomplete cluster and trigger loading again.
811 /// `LoadClusters()` is typically called from the I/O thread of a cluster pool, i.e. the method runs
812 /// concurrently to other methods of the page source.
813 virtual std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) = 0;
814
815 /// Parallel decompression and unpacking of the pages in the given cluster. The unzipped pages are supposed
816 /// to be preloaded in a page pool attached to the source. The method is triggered by the cluster pool's
817 /// unzip thread. It is an optional optimization, the method can safely do nothing. In particular, the
818 /// actual implementation will only run if a task scheduler is set. In practice, a task scheduler is set
819 /// if implicit multi-threading is turned on.
820 void UnzipCluster(RCluster *cluster);
821
822 // TODO(gparolini): for symmetry with SealPage(), we should either make this private or SealPage() public.
825}; // class RPageSource
826
827} // namespace Internal
828
829} // namespace Experimental
830} // namespace ROOT
831
832#endif
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define d(i)
Definition RSha256.hxx:102
#define b(i)
Definition RSha256.hxx:100
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char mode
char name[80]
Definition TGX11.cxx:110
A thread-safe integral performance counter.
A metric element that computes its floating point value from other counters.
A collection of Counter objects with a name, a unit, and a description.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
The serialization context is used for the piecewise serialization of a descriptor.
@ kForReading
Deserializes the descriptor and performs fixup on the suppressed column ranges and on clusters,...
std::map< Int_t, TVirtualStreamerInfo * > StreamerInfoMap_t
A memory region that contains packed and compressed pages.
Definition RCluster.hxx:103
Base class for a sink with a physical storage backend.
RPagePersistentSink(const RPagePersistentSink &)=delete
ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder
std::vector< ROOT::RClusterDescriptor::RColumnRange > fOpenColumnRanges
Keeps track of the number of elements in the currently open cluster. Indexed by column id.
RPagePersistentSink(RPagePersistentSink &&)=default
std::uint64_t fNextClusterInGroup
Remembers the starting cluster id for the next cluster group.
virtual std::uint64_t StageClusterImpl()=0
Returns the number of bytes written to storage (excluding metadata)
RPagePersistentSink & operator=(RPagePersistentSink &&)=default
virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length)=0
void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final
Incorporate incremental changes to the model into the ntuple descriptor.
ROOT::NTupleSize_t GetNEntries() const final
RNTupleSerializer::RContext fSerializationContext
Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization.
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Write a preprocessed page to storage. The column must have been added before.
RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final
Stage the current cluster and create a new one for the following data.
virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length)=0
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
const ROOT::RNTupleDescriptor & GetDescriptor() const final
Return the RNTupleDescriptor being constructed.
virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)=0
RPagePersistentSink & operator=(const RPagePersistentSink &)=delete
virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page)=0
virtual RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)=0
virtual std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask)
Vector commit of preprocessed pages.
RNTupleSerializer::StreamerInfoMap_t fStreamerInfos
Union of the streamer info records that are sent from streamer fields to the sink before committing t...
void CommitClusterGroup() final
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final
Adds an extra type information record to schema.
void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges) final
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
std::unique_ptr< RNTupleModel > InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters)
Initialize sink based on an existing descriptor and fill into the descriptor builder,...
std::vector< ROOT::RClusterDescriptor::RPageRange > fOpenPageRanges
Keeps track of the written pages in the currently open cluster. Indexed by column id.
void CommitSuppressedColumn(ColumnHandle_t columnHandle) final
Commits a suppressed column for the current cluster.
void CommitStagedClusters(std::span< RStagedCluster > clusters) final
Commit staged clusters, logically appending them to the ntuple descriptor.
ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final
Register a new column.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
ROOT::NTupleSize_t fPrevClusterNEntries
Used to calculate the number of entries in the current cluster.
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
Write a page to the storage. The column must have been added before.
static std::unique_ptr< RPageSink > Create(std::string_view ntupleName, std::string_view location, const ROOT::RNTupleWriteOptions &options=ROOT::RNTupleWriteOptions())
Guess the concrete derived page source from the location.
An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
RSinkGuard & operator=(const RSinkGuard &)=delete
RSinkGuard & operator=(RSinkGuard &&)=delete
Abstract interface to write data into an ntuple.
std::vector< unsigned char > fSealPageBuffer
Used as destination buffer in the simple SealPage overload.
virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry)=0
Incorporate incremental changes to the model into the ntuple descriptor.
std::vector< Callback_t > fOnDatasetCommitCallbacks
virtual void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo)=0
Adds an extra type information record to schema.
RPageSink & operator=(RPageSink &&)=default
bool fIsInitialized
Flag if sink was initialized.
void CommitDataset()
Run the registered callbacks and finalize the current cluster and the entrire data set.
virtual void CommitSuppressedColumn(ColumnHandle_t columnHandle)=0
Commits a suppressed column for the current cluster.
virtual void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page)=0
Write a page to the storage. The column must have been added before.
void Init(RNTupleModel &model)
Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket) In...
virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
Finalize the current cluster and create a new one for the following data.
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
RWritePageMemoryManager fWritePageMemoryManager
Used in ReservePage to maintain the page buffer budget.
virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
Get a new, empty page for the given column that can be filled with up to nElements; nElements must be...
RPageSink & operator=(const RPageSink &)=delete
virtual void CommitClusterGroup()=0
Write out the page locations (page list envelope) for all the committed clusters since the last call ...
virtual void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage)=0
Write a preprocessed page to storage. The column must have been added before.
void RegisterOnCommitDatasetCallback(Callback_t callback)
The registered callback is executed at the beginning of CommitDataset();.
RPageSink(const RPageSink &)=delete
std::function< void(RPageSink &)> Callback_t
virtual void CommitStagedClusters(std::span< RStagedCluster > clusters)=0
Commit staged clusters, logically appending them to the ntuple descriptor.
virtual void InitImpl(RNTupleModel &model)=0
void DropColumn(ColumnHandle_t) final
Unregisters a column.
EPageStorageType GetType() final
Whether the concrete implementation is a sink or a source.
virtual ROOT::NTupleSize_t GetNEntries() const =0
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
virtual RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries)=0
Stage the current cluster and create a new one for the following data.
RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
virtual const ROOT::RNTupleDescriptor & GetDescriptor() const =0
Return the RNTupleDescriptor being constructed.
std::unique_ptr< ROOT::RNTupleWriteOptions > fOptions
virtual void CommitSealedPageV(std::span< RPageStorage::RSealedPageGroup > ranges)=0
Write a vector of preprocessed pages to storage. The corresponding columns must have been added befor...
Keeps track of the requested physical column IDs and their in-memory target type via a column element...
const std::vector< RColumnInfo > & GetColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
std::unordered_map< ROOT::DescriptorId_t, std::vector< RColumnInfo > > fColumnInfos
Maps physical column IDs to all the requested in-memory representations.
bool HasColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
An RAII wrapper used for the writable access to RPageSource::fDescriptor. See GetSharedDescriptorGuar...
RExclDescriptorGuard(const RExclDescriptorGuard &)=delete
RExclDescriptorGuard & operator=(RExclDescriptorGuard &&)=delete
RExclDescriptorGuard & operator=(const RExclDescriptorGuard &)=delete
RExclDescriptorGuard(ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock)
An RAII wrapper used for the read-only access to RPageSource::fDescriptor. See GetExclDescriptorGuard...
RSharedDescriptorGuard & operator=(RSharedDescriptorGuard &&)=delete
RSharedDescriptorGuard(const RSharedDescriptorGuard &)=delete
RSharedDescriptorGuard(const ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock)
RSharedDescriptorGuard & operator=(const RSharedDescriptorGuard &)=delete
Abstract interface to read data from an ntuple.
std::map< ROOT::NTupleSize_t, ROOT::DescriptorId_t > fPreloadedClusters
Clusters from where pages got preloaded in UnzipClusterImpl(), ordered by first entry number of the c...
EPageStorageType GetType() final
Whether the concrete implementation is a sink or a source.
RPageSource(const RPageSource &)=delete
RPageSource & operator=(RPageSource &&)=delete
std::unique_ptr< RCounters > fCounters
RExclDescriptorGuard GetExclDescriptorGuard()
Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further informatio...
RActivePhysicalColumns fActivePhysicalColumns
The active columns are implicitly defined by the model fields or views.
RPageSource & operator=(const RPageSource &)=delete
virtual std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys)=0
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
virtual ROOT::Internal::RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster)=0
REntryRange fEntryRange
Used by the cluster pool to prevent reading beyond the given range.
virtual std::unique_ptr< RPageSource > CloneImpl() const =0
Returns a new, unattached page source for the same data set.
virtual void LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage)=0
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
const ROOT::RNTupleReadOptions & GetReadOptions() const
ROOT::Internal::RPagePool fPagePool
Pages that are unzipped with IMT are staged into the page pool.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
virtual ROOT::RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode)=0
LoadStructureImpl() has been called before AttachImpl() is called
The interface of a task scheduler to schedule page (de)compression tasks.
virtual void Wait()=0
Blocks until all scheduled tasks finished.
virtual void AddTask(const std::function< void(void)> &taskFunc)=0
Take a callable that represents a task.
Common functionality of an ntuple storage for both reading and writing.
std::deque< RSealedPage > SealedPageSequence_t
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
virtual ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column)=0
Register a new column.
virtual void DropColumn(ColumnHandle_t columnHandle)=0
Unregisters a column.
virtual Detail::RNTupleMetrics & GetMetrics()
Returns the default metrics object.
const std::string & GetNTupleName() const
Returns the NTuple name.
virtual EPageStorageType GetType()=0
Whether the concrete implementation is a sink or a source.
RPageStorage & operator=(RPageStorage &&other)=default
RPageStorage & operator=(const RPageStorage &other)=delete
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAllocator
For the time being, we will use the heap allocator for all sources and sinks. This may change in the ...
RPageStorage(const RPageStorage &other)=delete
RPageStorage(RPageStorage &&other)=default
void SetTaskScheduler(RTaskScheduler *taskScheduler)
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
ROOT::DescriptorId_t GetColumnId(ColumnHandle_t columnHandle) const
Helper to maintain a memory budget for the write pages of a set of columns.
std::size_t fCurrentAllocatedBytes
Sum of all the write page sizes (their capacity) of the columns in fColumnsSortedByPageSize
bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit)
Flush columns in order of allocated write page size until the sum of all write page allocations leave...
std::size_t fMaxAllocatedBytes
Maximum allowed value for fCurrentAllocatedBytes, set from RNTupleWriteOptions::fPageBufferBudget.
std::set< RColumnInfo, std::greater< RColumnInfo > > fColumnsSortedByPageSize
All columns that called ReservePage() (hence TryUpdate()) at least once, sorted by their current writ...
bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize)
Try to register the new write page size for the given column.
A column element encapsulates the translation between basic C++ types and their column representation...
A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into ...
Definition RColumn.hxx:40
A helper class for piece-wise construction of an RNTupleDescriptor.
const RNTupleDescriptor & GetDescriptor() const
Abstract interface to allocate and release pages.
A thread-safe cache of pages loaded from the page source.
Definition RPagePool.hxx:46
Reference to a page stored in the page pool.
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:46
Records the partition of data into pages for a particular column in a particular cluster.
Metadata for RNTuple clusters.
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
Field specific extra type information from the header / extenstion header.
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Generic information about the physical location of data.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156
Default I/O performance counters that get registered in fMetrics.
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuZip
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuWrite
Set of optional features supported by the persistent sink.
std::uint32_t fCompressionSettings
Compression algorithm and level to apply.
void * fBuffer
Location for sealed output. The memory buffer has to be large enough.
bool fAllowAlias
If false, the output buffer must not point to the input page buffer, which would otherwise be an opti...
const ROOT::Internal::RPage * fPage
Input page to be sealed.
const ROOT::Internal::RColumnElementBase * fElement
Corresponds to the page's elements, for size calculation etc.
bool fWriteChecksum
Adds a 8 byte little-endian xxhash3 checksum to the page payload.
Cluster that was staged, but not yet logically appended to the RNTuple.
Summarizes cluster-level information that are necessary to load a certain page.
ROOT::RClusterDescriptor::RPageInfoExtended fPageInfo
Location of the page on disk.
Default I/O performance counters that get registered in fMetrics
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuUnzip
Detail::RNTupleTickCounter< Detail::RNTupleAtomicCounter > & fTimeCpuRead
A range of sealed pages referring to the same column that can be used for vector commit.
RSealedPageGroup(ROOT::DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
A sealed page contains the bytes of a page as written to storage (packed & compressed).
bool fHasChecksum
If set, the last 8 bytes of the buffer are the xxhash of the rest of the buffer.
std::size_t fBufferSize
Size of the page payload and the trailing checksum (if available)
RResult< std::uint64_t > GetChecksum() const
Returns a failure if the sealed page has no checksum.
RSealedPage(const void *buffer, std::size_t bufferSize, std::uint32_t nElements, bool hasChecksum=false)
RSealedPage & operator=(const RSealedPage &other)=default
RSealedPage & operator=(RSealedPage &&other)=default
Every concrete RColumnElement type is identified by its on-disk type (column type) and the in-memory ...
The incremental changes to a RNTupleModel
Additional information about a page in an in-memory RPageRange.
Information about a single page in the context of a cluster's page range.