Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RClusterPool.hxx
Go to the documentation of this file.
1/// \file ROOT/RClusterPool.hxx
2/// \author Jakob Blomer <jblomer@cern.ch>
3/// \date 2020-03-11
4/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
5/// is welcome!
6
7/*************************************************************************
8 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#ifndef ROOT_RClusterPool
16#define ROOT_RClusterPool
17
18#include <ROOT/RCluster.hxx>
20#include <ROOT/RNTupleTypes.hxx>
21
22#include <condition_variable>
23#include <deque>
24#include <memory>
25#include <mutex>
26#include <future>
27#include <thread>
28#include <unordered_map>
29#include <vector>
30
31namespace ROOT {
32namespace Internal {
33class RPageSource;
34}
35
36namespace Internal {
37
38// clang-format off
39/**
40\class ROOT::Internal::RClusterPool
41\ingroup NTuple
42\brief Managed a set of clusters containing compressed and packed pages
43
44The cluster pool steers the preloading of (partial) clusters. There is a two-step pipeline: in a first step,
45compressed pages are read from clusters into a memory buffer. The second pipeline step decompresses the pages
46and pushes them into the page pool. The actual logic of reading and unzipping is implemented by the page source.
47The cluster pool only orchestrates the work queues for reading and unzipping. It uses one extra I/O thread for
48reading waits for data from storage and generates no CPU load.
49
50The unzipping step of the pipeline therefore behaves differently depending on whether or not implicit multi-threading
51is turned on. If it is turned off, i.e. in a single-threaded environment, the cluster pool will only read the
52compressed pages and the page source has to uncompresses pages at a later point when data from the page is requested.
53*/
54// clang-format on
56private:
57 /// Request to load a subset of the columns of a particular cluster.
58 /// Work items come in groups and are executed by the page source.
59 struct RReadItem {
60 /// Items with different bunch ids are scheduled for different vector reads
61 std::int64_t fBunchId = -1;
62 std::promise<std::unique_ptr<RCluster>> fPromise;
64 };
65
66 /// Clusters that are currently being processed by the pipeline. Every in-flight cluster has a corresponding
67 /// read item.
69 std::future<std::unique_ptr<RCluster>> fFuture;
71
72 bool operator ==(const RInFlightCluster &other) const {
73 return (fClusterKey.fClusterId == other.fClusterKey.fClusterId) &&
74 (fClusterKey.fPhysicalColumnSet == other.fClusterKey.fPhysicalColumnSet);
75 }
76 bool operator !=(const RInFlightCluster &other) const { return !(*this == other); }
77 /// First order by cluster id, then by number of columns, than by the column ids in fColumns
78 bool operator <(const RInFlightCluster &other) const;
79 };
80
81 /// Performance counters that get registered in fMetrics
85 std::unique_ptr<RCounters> fCounters;
86
87 /// Every cluster pool is responsible for exactly one page source that triggers loading of the clusters
88 /// (GetCluster()) and is used for implementing the I/O and cluster memory allocation (PageSource::LoadClusters()).
90 /// The number of clusters that are being read in a single vector read.
91 unsigned int fClusterBunchSize;
92 /// Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other
93 std::int64_t fBunchId = 0;
94 /// The cache of active clusters and their successors
95 std::unordered_map<ROOT::DescriptorId_t, std::unique_ptr<RCluster>> fPool;
96
97 /// Protects the shared state between the main thread and the I/O thread, namely the work queue and the in-flight
98 /// clusters vector
99 std::mutex fLockWorkQueue;
100 /// The clusters that were handed off to the I/O thread
101 std::vector<RInFlightCluster> fInFlightClusters;
102 /// Signals a non-empty I/O work queue
103 std::condition_variable fCvHasReadWork;
104 /// The communication channel to the I/O thread
105 std::deque<RReadItem> fReadQueue;
106
107 /// The I/O thread calls RPageSource::LoadClusters() asynchronously. The thread is mostly waiting for the
108 /// data to arrive (blocked by the kernel) and therefore can safely run in addition to the application
109 /// main threads.
110 std::thread fThreadIo;
111
112 /// The cluster pool counters are observed by the page source
114
115 /// The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool
116 void ExecReadClusters();
117 /// Returns the given cluster from the pool, which needs to contain at least the columns `physicalColumns`.
118 /// Executed at the end of GetCluster when all missing data pieces have been sent to the load queue.
119 /// Ideally, the function returns without blocking if the cluster is already in the pool.
120 RCluster *WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns);
121
122public:
123 static constexpr unsigned int kDefaultClusterBunchSize = 1;
124 RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize);
126 {
127 }
128 RClusterPool(const RClusterPool &other) = delete;
129 RClusterPool &operator =(const RClusterPool &other) = delete;
131
132 /// Spawn the I/O background thread. No-op if already started.
134
135 /// Stop the I/O background thread. No-op if already stopped. Called by the destructor.
137
138 /// Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread load
139 /// the cluster in the pool, blocks until done, and then returns it. Triggers along the way the background loading
140 /// of the following fClusterBunchSize number of clusters. The returned cluster has at least all the pages of
141 /// `physicalColumns` and possibly pages of other columns, too. If implicit multi-threading is turned on, the
142 /// uncompressed pages of the returned cluster are already pushed into the page pool associated with the page source
143 /// upon return. The cluster remains valid until the next call to GetCluster().
144 RCluster *GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns);
145
146 /// Used by the unit tests to drain the queue of clusters to be preloaded
148
150}; // class RClusterPool
151
152} // namespace Internal
153} // namespace ROOT
154
155#endif
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
A thread-safe integral performance counter.
A collection of Counter objects with a name, a unit, and a description.
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The cluster pool counters are observed by the page source.
RClusterPool(ROOT::Internal::RPageSource &pageSource)
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
std::unordered_map< ROOT::DescriptorId_t, std::unique_ptr< RCluster > > fPool
The cache of active clusters and their successors.
std::unique_ptr< RCounters > fCounters
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
void StopBackgroundThread()
Stop the I/O background thread. No-op if already stopped. Called by the destructor.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
RClusterPool(const RClusterPool &other)=delete
void StartBackgroundThread()
Spawn the I/O background thread. No-op if already started.
RClusterPool & operator=(const RClusterPool &other)=delete
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
ROOT::Internal::RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
static constexpr unsigned int kDefaultClusterBunchSize
ROOT::Experimental::Detail::RNTupleMetrics & GetMetrics()
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:149
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
Performance counters that get registered in fMetrics.
Clusters that are currently being processed by the pipeline.
Request to load a subset of the columns of a particular cluster.
ROOT::Experimental::Detail::RNTupleAtomicCounter & fNCluster
Clusters that are currently being processed by the pipeline.
bool operator!=(const RInFlightCluster &other) const
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
bool operator==(const RInFlightCluster &other) const
std::future< std::unique_ptr< RCluster > > fFuture
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:151
ROOT::DescriptorId_t fClusterId
Definition RCluster.hxx:152