Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RClusterPool.hxx
Go to the documentation of this file.
1/// \file ROOT/RClusterPool.hxx
2/// \ingroup NTuple ROOT7
3/// \author Jakob Blomer <jblomer@cern.ch>
4/// \date 2020-03-11
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#ifndef ROOT7_RClusterPool
17#define ROOT7_RClusterPool
18
19#include <ROOT/RCluster.hxx>
20#include <ROOT/RNTupleUtil.hxx>
21
22#include <condition_variable>
23#include <deque>
24#include <memory>
25#include <mutex>
26#include <future>
27#include <thread>
28#include <set>
29#include <vector>
30
31namespace ROOT {
32namespace Experimental {
33
34namespace Internal {
35class RPageSource;
36
37// clang-format off
38/**
39\class ROOT::Experimental::Internal::RClusterPool
40\ingroup NTuple
41\brief Managed a set of clusters containing compressed and packed pages
42
43The cluster pool steers the preloading of (partial) clusters. There is a two-step pipeline: in a first step,
44compressed pages are read from clusters into a memory buffer. The second pipeline step decompresses the pages
45and pushes them into the page pool. The actual logic of reading and unzipping is implemented by the page source.
46The cluster pool only orchestrates the work queues for reading and unzipping. It uses one extra I/O thread for
47reading waits for data from storage and generates no CPU load.
48
49The unzipping step of the pipeline therefore behaves differently depending on whether or not implicit multi-threading
50is turned on. If it is turned off, i.e. in a single-threaded environment, the cluster pool will only read the
51compressed pages and the page source has to uncompresses pages at a later point when data from the page is requested.
52*/
53// clang-format on
55private:
56 /// Request to load a subset of the columns of a particular cluster.
57 /// Work items come in groups and are executed by the page source.
58 struct RReadItem {
59 /// Items with different bunch ids are scheduled for different vector reads
60 std::int64_t fBunchId = -1;
61 std::promise<std::unique_ptr<RCluster>> fPromise;
63 };
64
65 /// Clusters that are currently being processed by the pipeline. Every in-flight cluster has a corresponding
66 /// work item, first a read item and then an unzip item.
68 std::future<std::unique_ptr<RCluster>> fFuture;
70
71 bool operator ==(const RInFlightCluster &other) const {
72 return (fClusterKey.fClusterId == other.fClusterKey.fClusterId) &&
74 }
75 bool operator !=(const RInFlightCluster &other) const { return !(*this == other); }
76 /// First order by cluster id, then by number of columns, than by the column ids in fColumns
77 bool operator <(const RInFlightCluster &other) const;
78 };
79
80 /// Every cluster pool is responsible for exactly one page source that triggers loading of the clusters
81 /// (GetCluster()) and is used for implementing the I/O and cluster memory allocation (PageSource::LoadClusters()).
83 /// The number of clusters before the currently active cluster that should stay in the pool if present
84 /// Reserved for later use.
85 unsigned int fWindowPre = 0;
86 /// The number of clusters that are being read in a single vector read.
87 unsigned int fClusterBunchSize;
88 /// Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other
89 std::int64_t fBunchId = 0;
90 /// The cache of clusters around the currently active cluster
91 std::vector<std::unique_ptr<RCluster>> fPool;
92
93 /// Protects the shared state between the main thread and the I/O thread, namely the work queue and the in-flight
94 /// clusters vector
95 std::mutex fLockWorkQueue;
96 /// The clusters that were handed off to the I/O thread
97 std::vector<RInFlightCluster> fInFlightClusters;
98 /// Signals a non-empty I/O work queue
99 std::condition_variable fCvHasReadWork;
100 /// The communication channel to the I/O thread
101 std::deque<RReadItem> fReadQueue;
102
103 /// The I/O thread calls RPageSource::LoadClusters() asynchronously. The thread is mostly waiting for the
104 /// data to arrive (blocked by the kernel) and therefore can safely run in addition to the application
105 /// main threads.
106 std::thread fThreadIo;
107
108 /// Every cluster id has at most one corresponding RCluster pointer in the pool
109 RCluster *FindInPool(DescriptorId_t clusterId) const;
110 /// Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())
111 /// make sure that a free slot actually exists
112 size_t FindFreeSlot() const;
113 /// The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool
114 void ExecReadClusters();
115 /// Returns the given cluster from the pool, which needs to contain at least the columns `physicalColumns`.
116 /// Executed at the end of GetCluster when all missing data pieces have been sent to the load queue.
117 /// Ideally, the function returns without blocking if the cluster is already in the pool.
118 RCluster *WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns);
119
120public:
121 static constexpr unsigned int kDefaultClusterBunchSize = 1;
122 RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize);
123 explicit RClusterPool(RPageSource &pageSource) : RClusterPool(pageSource, kDefaultClusterBunchSize) {}
124 RClusterPool(const RClusterPool &other) = delete;
125 RClusterPool &operator =(const RClusterPool &other) = delete;
127
128 /// Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread load
129 /// the cluster in the pool, blocks until done, and then returns it. Triggers along the way the background loading
130 /// of the following fWindowPost number of clusters. The returned cluster has at least all the pages of
131 /// `physicalColumns` and possibly pages of other columns, too. If implicit multi-threading is turned on, the
132 /// uncompressed pages of the returned cluster are already pushed into the page pool associated with the page source
133 /// upon return. The cluster remains valid until the next call to GetCluster().
134 RCluster *GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns);
135
136 /// Used by the unit tests to drain the queue of clusters to be preloaded
138}; // class RClusterPool
139
140} // namespace Internal
141} // namespace Experimental
142} // namespace ROOT
143
144#endif
Managed a set of clusters containing compressed and packed pages.
unsigned int fWindowPre
The number of clusters before the currently active cluster that should stay in the pool if present Re...
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RClusterPool(const RClusterPool &other)=delete
static constexpr unsigned int kDefaultClusterBunchSize
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
RClusterPool & operator=(const RClusterPool &other)=delete
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:152
std::unordered_set< DescriptorId_t > ColumnSet_t
Definition RCluster.hxx:154
Abstract interface to read data from an ntuple.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Clusters that are currently being processed by the pipeline.
std::future< std::unique_ptr< RCluster > > fFuture
bool operator!=(const RInFlightCluster &other) const
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
bool operator==(const RInFlightCluster &other) const
Request to load a subset of the columns of a particular cluster.
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:156