64 std::unique_lock<std::mutex> lock(fLockWorkQueue);
66 fCvHasReadWork.notify_one();
76 std::unique_lock<std::mutex> lock(fLockWorkQueue);
77 fCvHasReadWork.wait(lock, [&]{
return !fReadQueue.empty(); });
84 for (
unsigned i = 0; i <
readItems.size(); ++i) {
99 for (std::size_t i = 0; i <
clusters.size(); ++i) {
110 for (
const auto &
cptr : fPool) {
119 auto N = fPool.size();
120 for (
unsigned i = 0; i <
N; ++i) {
139 std::int64_t fBunchId = -1;
141 ColumnSet_t fPhysicalColumnSet;
144 static constexpr std::int64_t kFlagRequired = 0x01;
145 static constexpr std::int64_t kFlagLast = 0x02;
148 std::map<DescriptorId_t, RInfo> fMap;
160 std::size_t GetSize()
const {
return fMap.size(); }
165 if (
itr == fMap.end())
168 std::copy_if(
itr->second.fPhysicalColumnSet.
begin(),
itr->second.fPhysicalColumnSet.
end(),
169 std::inserter(
d,
d.end()),
174 itr->second.fPhysicalColumnSet =
d;
178 decltype(fMap)::iterator begin() {
return fMap.begin(); }
188 std::set<ROOT::DescriptorId_t>
keep;
195 for (
unsigned int i = 0; i < fWindowPre; ++i) {
208 if (i == fClusterBunchSize)
214 if (!fPageSource.GetEntryRange().IntersectsWith(
descriptorGuard->GetClusterDescriptor(next)))
229 for (
auto &
cptr : fPool) {
246 std::lock_guard<std::mutex>
lockGuard(fLockWorkQueue);
248 for (
auto itr = fInFlightClusters.
begin();
itr != fInFlightClusters.
end(); ) {
250 if (
itr->fFuture.wait_for(std::chrono::seconds(0)) != std::future_status::ready) {
252 provide.Erase(
itr->fClusterKey.fClusterId,
itr->fClusterKey.fPhysicalColumnSet);
257 auto cptr =
itr->fFuture.get();
261 !
provide.Contains(
itr->fClusterKey.fClusterId) && (
keep.count(
itr->fClusterKey.fClusterId) == 0);
264 itr = fInFlightClusters.erase(
itr);
269 fPageSource.UnzipCluster(
cptr.get());
279 itr = fInFlightClusters.erase(
itr);
283 for (
auto &
cptr : fPool) {
291 if (
provide.GetSize() < fClusterBunchSize) {
294 if ((
kv.second.fFlags & (RProvides::kFlagRequired | RProvides::kFlagLast)) == 0)
312 readItem.fClusterKey.fPhysicalColumnSet =
kv.second.fPhysicalColumnSet;
320 fReadQueue.emplace_back(std::move(
readItem));
322 if (!fReadQueue.empty())
323 fCvHasReadWork.notify_one();
351 decltype(fInFlightClusters)::iterator
itr;
355 for (;
itr != fInFlightClusters.
end(); ++
itr) {
366 auto cptr =
itr->fFuture.get();
371 fPageSource.UnzipCluster(
cptr.get());
381 fInFlightClusters.erase(
itr);
388 decltype(fInFlightClusters)::iterator
itr;
392 if (
itr == fInFlightClusters.
end())
399 fInFlightClusters.erase(
itr);
#define R__unlikely(expr)
std::ios_base::fmtflags fFlags
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Managed a set of clusters containing compressed and packed pages.
void WaitForInFlightClusters()
Used by the unit tests to drain the queue of clusters to be preloaded.
unsigned int fClusterBunchSize
The number of clusters that are being read in a single vector read.
RCluster * GetCluster(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread l...
std::vector< std::unique_ptr< RCluster > > fPool
The cache of clusters around the currently active cluster.
RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize)
void ExecReadClusters()
The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool.
RCluster * WaitFor(ROOT::DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns)
Returns the given cluster from the pool, which needs to contain at least the columns physicalColumns.
RPageSource & fPageSource
Every cluster pool is responsible for exactly one page source that triggers loading of the clusters (...
size_t FindFreeSlot() const
Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())...
RCluster * FindInPool(ROOT::DescriptorId_t clusterId) const
Every cluster id has at most one corresponding RCluster pointer in the pool.
std::thread fThreadIo
The I/O thread calls RPageSource::LoadClusters() asynchronously.
An in-memory subset of the packed and compressed pages of a cluster.
std::unordered_set< ROOT::DescriptorId_t > ColumnSet_t
Abstract interface to read data from an ntuple.
const_iterator begin() const
const_iterator end() const
void Erase(const T &that, std::vector< T > &v)
Erase that element from vector v
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr NTupleSize_t kInvalidNTupleIndex
constexpr DescriptorId_t kInvalidDescriptorId
Clusters that are currently being processed by the pipeline.
RCluster::RKey fClusterKey
bool operator<(const RInFlightCluster &other) const
First order by cluster id, then by number of columns, than by the column ids in fColumns.
Request to load a subset of the columns of a particular cluster.
ColumnSet_t fPhysicalColumnSet
ROOT::DescriptorId_t fClusterId