37 itr1 !=
fClusterKey.fPhysicalColumnSet.end(); ++itr1, ++itr2) {
89 std::deque<RReadItem> readItems;
97 while (!readItems.empty()) {
98 std::vector<RCluster::RKey> clusterKeys;
99 std::int64_t bunchId = -1;
100 for (
unsigned i = 0; i < readItems.size(); ++i) {
101 const auto &item = readItems[i];
108 if ((bunchId >= 0) && (item.fBunchId != bunchId))
110 bunchId = item.fBunchId;
111 clusterKeys.emplace_back(item.fClusterKey);
114 auto clusters =
fPageSource.LoadClusters(clusterKeys);
115 for (std::size_t i = 0; i < clusters.size(); ++i) {
116 readItems[i].fPromise.set_value(std::move(clusters[i]));
118 readItems.erase(readItems.begin(), readItems.begin() + clusters.size());
132 std::int64_t fBunchId = -1;
133 std::int64_t fFlags = 0;
134 ColumnSet_t fPhysicalColumnSet;
137 static constexpr std::int64_t kFlagRequired = 0x01;
138 static constexpr std::int64_t kFlagLast = 0x02;
141 std::map<DescriptorId_t, RInfo> fMap;
146 fMap.emplace(clusterId, info);
149 bool Contains(DescriptorId_t clusterId) {
150 return fMap.count(clusterId) > 0;
153 std::size_t GetSize()
const {
return fMap.size(); }
155 void Erase(DescriptorId_t clusterId,
const ColumnSet_t &physicalColumns)
157 auto itr = fMap.find(clusterId);
158 if (itr == fMap.end())
161 std::copy_if(itr->second.fPhysicalColumnSet.begin(), itr->second.fPhysicalColumnSet.end(),
162 std::inserter(
d,
d.end()),
163 [&physicalColumns](DescriptorId_t needle) { return physicalColumns.count(needle) == 0; });
167 itr->second.fPhysicalColumnSet =
d;
171 decltype(fMap)::iterator begin() {
return fMap.begin(); }
172 decltype(fMap)::iterator end() {
return fMap.end(); }
177ROOT::Internal::RCluster *
182 std::unordered_set<ROOT::DescriptorId_t> keep{
fPageSource.GetPinnedClusters()};
184 auto descriptorGuard =
fPageSource.GetSharedDescriptorGuard();
187 next = descriptorGuard->FindNextClusterId(next);
189 !
fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next))) {
199 auto descriptorGuard =
fPageSource.GetSharedDescriptorGuard();
202 RProvides::RInfo provideInfo;
203 provideInfo.fPhysicalColumnSet = physicalColumns;
205 provideInfo.fFlags = RProvides::kFlagRequired;
211 next = descriptorGuard->FindNextClusterId(cid);
213 if (!
fPageSource.GetEntryRange().IntersectsWith(descriptorGuard->GetClusterDescriptor(next)))
217 provideInfo.fFlags |= RProvides::kFlagLast;
219 provide.Insert(cid, provideInfo);
223 provideInfo.fFlags = 0;
228 for (
auto itr =
fPool.begin(); itr !=
fPool.end();) {
229 if (provide.Contains(itr->first)) {
233 if (keep.count(itr->first) > 0) {
237 itr =
fPool.erase(itr);
252 if (itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
254 provide.Erase(itr->fClusterKey.fClusterId, itr->fClusterKey.fPhysicalColumnSet);
259 auto cptr = itr->fFuture.get();
262 const bool isExpired =
263 !provide.Contains(itr->fClusterKey.fClusterId) && (keep.count(itr->fClusterKey.fClusterId) == 0);
274 auto existingCluster =
fPool.find(cptr->GetId());
275 if (existingCluster !=
fPool.end()) {
276 existingCluster->second->Adopt(std::move(*cptr));
278 const auto cid = cptr->GetId();
279 fPool.emplace(cid, std::move(cptr));
286 for (
const auto &[
_, cptr] :
fPool) {
287 provide.Erase(cptr->GetId(), cptr->GetAvailPhysicalColumns());
291 bool skipPrefetch =
false;
294 for (
const auto &kv : provide) {
295 if ((kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
297 skipPrefetch =
false;
307 for (
const auto &kv : provide) {
308 R__ASSERT(!kv.second.fPhysicalColumnSet.empty());
312 readItem.
fBunchId = kv.second.fBunchId;
328 return WaitFor(clusterId, physicalColumns);
336 auto result =
fPool.find(clusterId);
337 if (result !=
fPool.end()) {
338 bool hasMissingColumn =
false;
339 for (
auto cid : physicalColumns) {
340 if (result->second->ContainsColumn(cid))
343 hasMissingColumn =
true;
346 if (!hasMissingColumn)
347 return result->second.get();
353 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
356 if (itr->fClusterKey.fClusterId == clusterId)
366 auto cptr = itr->fFuture.get();
373 if (result !=
fPool.end()) {
374 result->second->Adopt(std::move(*cptr));
376 const auto cid = cptr->GetId();
377 fPool.emplace(cid, std::move(cptr));
381 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
391 std::lock_guard<std::mutex> lockGuardInFlightClusters(
fLockWorkQueue);
394 itr->fFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
#define R__unlikely(expr)
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
A thread-safe integral performance counter.
A thread-safe integral performance counter.
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
std::condition_variable fCvHasReadWork
Signals a non-empty I/O work queue.
ROOT::Experimental::Detail::RNTupleMetrics fMetrics
The cluster pool counters are observed by the page source.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
std::vector< RInFlightCluster > fInFlightClusters
The clusters that were handed off to the I/O thread.
std::unordered_map< ROOT::DescriptorId_t, std::unique_ptr< RCluster > > fPool
The cache of active clusters and their successors.
std::unique_ptr< RCounters > fCounters
std::deque< RReadItem > fReadQueue
The communication channel to the I/O thread.
void StopBackgroundThread()
Stop the I/O background thread. No-op if already stopped. Called by the destructor.
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
RClusterPool(ROOT::Internal::RPageSource &pageSource, unsigned int clusterBunchSize)
std::int64_t fBunchId
Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other.
void StartBackgroundThread()
Spawn the I/O background thread. No-op if already started.
std::mutex fLockWorkQueue
Protects the shared state between the main thread and the I/O thread, namely the work queue and the i...
ROOT::Internal::RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Performance counters that get registered in fMetrics.
Clusters that are currently being processed by the pipeline.
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
RCluster::RKey fClusterKey
std::future< std::unique_ptr< RCluster > > fFuture
Request to load a subset of the columns of a particular cluster.
RCluster::RKey fClusterKey
std::int64_t fBunchId
Items with different bunch ids are scheduled for different vector reads.
std::promise< std::unique_ptr< RCluster > > fPromise
ROOT::DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet