64 std::unique_lock<std::mutex> lock(fLockWorkQueue);
66 fCvHasReadWork.notify_one();
76 std::unique_lock<std::mutex> lock(fLockWorkQueue);
77 fCvHasReadWork.wait(lock, [&]{
return !fReadQueue.empty(); });
84 for (
unsigned i = 0; i <
readItems.size(); ++i) {
99 for (std::size_t i = 0; i <
clusters.size(); ++i) {
104 std::unique_lock<std::mutex> lock(fLockWorkQueue);
105 discard = std::any_of(fInFlightClusters.begin(), fInFlightClusters.end(),
107 return inFlight.fClusterKey.fClusterId == thisClusterId && inFlight.fIsExpired;
123 for (
const auto &
cptr : fPool) {
132 auto N = fPool.size();
133 for (
unsigned i = 0; i <
N; ++i) {
152 std::int64_t fBunchId = -1;
154 ColumnSet_t fPhysicalColumnSet;
157 static constexpr std::int64_t kFlagRequired = 0x01;
158 static constexpr std::int64_t kFlagLast = 0x02;
161 std::map<DescriptorId_t, RInfo> fMap;
169 bool Contains(DescriptorId_t
clusterId) {
173 std::size_t GetSize()
const {
return fMap.size(); }
178 if (
itr == fMap.end())
181 std::copy_if(
itr->second.fPhysicalColumnSet.
begin(),
itr->second.fPhysicalColumnSet.
end(),
182 std::inserter(
d,
d.end()),
187 itr->second.fPhysicalColumnSet =
d;
191 decltype(fMap)::iterator begin() {
return fMap.begin(); }
192 decltype(fMap)::iterator
end() {
return fMap.end(); }
201 std::set<DescriptorId_t>
keep;
208 for (
unsigned int i = 0; i < fWindowPre; ++i) {
221 if (i == fClusterBunchSize)
227 if (!fPageSource.GetEntryRange().IntersectsWith(
descriptorGuard->GetClusterDescriptor(next)))
242 for (
auto &
cptr : fPool) {
259 std::lock_guard<std::mutex>
lockGuard(fLockWorkQueue);
261 for (
auto itr = fInFlightClusters.
begin();
itr != fInFlightClusters.
end(); ) {
264 !
provide.Contains(
itr->fClusterKey.fClusterId) && (
keep.count(
itr->fClusterKey.fClusterId) == 0);
266 if (
itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
268 provide.Erase(
itr->fClusterKey.fClusterId,
itr->fClusterKey.fPhysicalColumnSet);
273 auto cptr =
itr->fFuture.get();
275 if (!
cptr ||
itr->fIsExpired) {
277 itr = fInFlightClusters.erase(
itr);
289 itr = fInFlightClusters.erase(
itr);
293 for (
auto &
cptr : fPool) {
301 if (
provide.GetSize() < fClusterBunchSize) {
304 if ((
kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
322 readItem.fClusterKey.fPhysicalColumnSet =
kv.second.fPhysicalColumnSet;
330 fReadQueue.emplace_back(std::move(
readItem));
332 if (!fReadQueue.empty())
333 fCvHasReadWork.notify_one();
350 if (
result->ContainsColumn(cid))
361 decltype(fInFlightClusters)::iterator
itr;
365 for (;
itr != fInFlightClusters.
end(); ++
itr) {
376 auto cptr =
itr->fFuture.get();
379 fPageSource.UnzipCluster(
cptr.get());
387 fInFlightClusters.erase(
itr);
394 decltype(fInFlightClusters)::iterator
itr;
398 if (
itr == fInFlightClusters.
end())
405 fInFlightClusters.erase(
itr);
#define R__unlikely(expr)
std::ios_base::fmtflags fFlags
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
RCluster * GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RCluster * FindInPool(DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
const_iterator begin() const
const_iterator end() const
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr ClusterSize_t kInvalidClusterIndex(std::uint64_t(-1))
constexpr DescriptorId_t kInvalidDescriptorId
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
Clusters that are currently being processed by the pipeline.
RCluster::RKey fClusterKey
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet