62 StopBackgroundThread();
67 if (fThreadIo.joinable())
75 if (!fThreadIo.joinable())
80 std::unique_lock<std::mutex> lock(fLockWorkQueue);
82 fCvHasReadWork.notify_one();
92 std::unique_lock<std::mutex> lock(fLockWorkQueue);
93 fCvHasReadWork.wait(lock, [&]{
return !fReadQueue.empty(); });
100 for (
unsigned i = 0; i <
readItems.size(); ++i) {
115 for (std::size_t i = 0; i <
clusters.size(); ++i) {
132 std::int64_t fBunchId = -1;
134 ColumnSet_t fPhysicalColumnSet;
137 static constexpr std::int64_t kFlagRequired = 0x01;
138 static constexpr std::int64_t kFlagLast = 0x02;
141 std::map<DescriptorId_t, RInfo> fMap;
149 bool Contains(DescriptorId_t
clusterId) {
153 std::size_t GetSize()
const {
return fMap.size(); }
158 if (
itr == fMap.end())
161 std::copy_if(
itr->second.fPhysicalColumnSet.
begin(),
itr->second.fPhysicalColumnSet.
end(),
162 std::inserter(
d,
d.end()),
167 itr->second.fPhysicalColumnSet =
d;
171 decltype(fMap)::iterator begin() {
return fMap.begin(); }
172 decltype(fMap)::iterator end() {
return fMap.end(); }
180 StartBackgroundThread();
182 std::unordered_set<ROOT::DescriptorId_t>
keep{fPageSource.GetPinnedClusters()};
183 for (
auto cid : fPageSource.GetPinnedClusters()) {
189 !fPageSource.GetEntryRange().IntersectsWith(
descriptorGuard->GetClusterDescriptor(next))) {
207 if (i == fClusterBunchSize)
213 if (!fPageSource.GetEntryRange().IntersectsWith(
descriptorGuard->GetClusterDescriptor(next)))
233 if (
keep.count(
itr->first) > 0) {
238 fCounters->fNCluster.Dec();
248 std::lock_guard<std::mutex>
lockGuard(fLockWorkQueue);
250 for (
auto itr = fInFlightClusters.
begin();
itr != fInFlightClusters.
end(); ) {
252 if (
itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
254 provide.Erase(
itr->fClusterKey.fClusterId,
itr->fClusterKey.fPhysicalColumnSet);
259 auto cptr =
itr->fFuture.get();
263 !
provide.Contains(
itr->fClusterKey.fClusterId) && (
keep.count(
itr->fClusterKey.fClusterId) == 0);
266 itr = fInFlightClusters.erase(
itr);
271 fPageSource.UnzipCluster(
cptr.get());
278 const auto cid =
cptr->GetId();
279 fPool.emplace(
cid, std::move(
cptr));
280 fCounters->fNCluster.Inc();
282 itr = fInFlightClusters.erase(
itr);
286 for (
const auto &[
_,
cptr] : fPool) {
292 if (
provide.GetSize() < fClusterBunchSize) {
295 if ((
kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
313 readItem.fClusterKey.fPhysicalColumnSet =
kv.second.fPhysicalColumnSet;
321 fReadQueue.emplace_back(std::move(
readItem));
323 if (!fReadQueue.empty())
324 fCvHasReadWork.notify_one();
337 if (
result != fPool.end()) {
347 return result->second.get();
351 decltype(fInFlightClusters)::iterator
itr;
355 for (;
itr != fInFlightClusters.
end(); ++
itr) {
366 auto cptr =
itr->fFuture.get();
371 fPageSource.UnzipCluster(
cptr.get());
373 if (
result != fPool.end()) {
376 const auto cid =
cptr->GetId();
377 fPool.emplace(
cid, std::move(
cptr));
378 fCounters->fNCluster.Inc();
382 fInFlightClusters.erase(
itr);
389 decltype(fInFlightClusters)::iterator
itr;
393 while (
itr != fInFlightClusters.
end() &&
394 itr->fFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
397 if (
itr == fInFlightClusters.
end())
#define R__unlikely(expr)
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
A thread-safe integral performance counter.
CounterPtrT MakeCounter(const std::string &name, Args &&... args)
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The cluster pool counters are observed by the page source.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::unique_ptr< RCounters > fCounters
void StopBackgroundThread()
Stop the I/O background thread. No-op if already stopped. Called by the destructor.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
void StartBackgroundThread()
Spawn the I/O background thread. No-op if already started.
ROOT::Internal::RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
const_iterator begin() const
const_iterator end() const
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Performance counters that get registered in fMetrics.
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
RCluster::RKey fClusterKey
Request to load a subset of the columns of a particular cluster.
ROOT::DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet