53 DistributionKey_t fDkey;
75template <EDaosMapping mapping>
82 static_cast<DistributionKey_t
>(
columnId),
static_cast<AttributeKey_t
>(
pageCount)};
92 std::string fPoolLabel;
94 std::string fContainerLabel;
102 std::regex re(
"daos://([^/]+)/(.+)");
104 if (!std::regex_match(uri.data(),
m, re))
113 auto position =
static_cast<uint32_t
>(address.
fLocation & 0xFFFFFFFF);
115 return {position,
offset};
122 uint64_t address = (position & 0xFFFFFFFF) | (
offset << 32);
133struct RDaosContainerNTupleLocator {
136 std::optional<ROOT::Experimental::Internal::RDaosNTupleAnchor> fAnchor;
139 RDaosContainerNTupleLocator() =
default;
142 bool IsValid() {
return fAnchor.has_value() && fAnchor->fNBytesHeader; }
149 auto seed =
static_cast<uint32_t
>(
h >> 32);
150 seed ^=
static_cast<uint32_t
>(
h & 0xffffffff) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
152 return (
hash == kReservedIndex) ? kReservedIndex + 1 :
hash;
159 std::unique_ptr<unsigned char[]> buffer,
zipBuffer;
160 auto &
anchor = fAnchor.emplace();
166 buffer = std::make_unique<unsigned char[]>(
anchorSize);
175 R__FAIL(
"unsupported RNTuple epoch version: " + std::to_string(
anchor.fVersionEpoch)));
177 if (
anchor.fVersionEpoch == 0) {
178 static std::once_flag
once;
181 <<
"Pre-release format version: RC " <<
anchor.fVersionMajor;
186 buffer = std::make_unique<unsigned char[]>(
anchor.fLenHeader);
195 buffer = std::make_unique<unsigned char[]>(
anchor.fLenFooter);
206 static std::pair<RDaosContainerNTupleLocator, ROOT::Experimental::Internal::RNTupleDescriptorBuilder>
220 R__FAIL(
"LocateNTuple: ntuple name '" +
ntupleName +
"' unavailable in this container."));
234 if (buffer !=
nullptr) {
235 auto bytes =
reinterpret_cast<unsigned char *
>(buffer);
254 return R__FAIL(
"DAOS anchor too short");
257 auto bytes =
reinterpret_cast<const unsigned char *
>(buffer);
260 return R__FAIL(
"unsupported DAOS anchor version: " + std::to_string(fVersionAnchor));
274 return result.Unwrap() + 32;
290 static std::once_flag
once;
291 std::call_once(
once, []() {
293 <<
"Do not store real data with this version of RNTuple!";
295 fCompressor = std::make_unique<RNTupleCompressor>();
314 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
316 fDaosContainer = std::make_unique<RDaosContainer>(
pool, args.fContainerLabel,
true);
317 fDaosContainer->SetDefaultObjectClass(
oclass);
320 auto [
locator,
_] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName,
decompressor);
321 fNTupleIndex =
locator.GetIndex();
339 fCounters->fSzZip.Add(
page.GetNBytes());
360 fCounters->fNPageCommitted.Inc();
361 fCounters->fSzWritePayload.Add(
sealedPage.fSize);
366std::vector<ROOT::Experimental::RNTupleLocator>
370 std::vector<ROOT::Experimental::RNTupleLocator>
locators;
373 return c + std::distance(r.fFirst, r.fLast);
377 const uint32_t
maxCageSz = fCageSizeLimit;
378 const bool useCaging = fCageSizeLimit > 0;
387 for (
auto &
range : ranges) {
431 fCounters->fNPageCommitted.Add(
nPages);
432 fCounters->fSzWritePayload.Add(
payloadSz);
439 return std::exchange(fNBytesCurrentCluster, 0);
450 auto offsetData = fClusterGroupId.fetch_add(1);
451 fDaosContainer->WriteSingleAkey(
459 fCounters->fSzWritePayload.Add(
static_cast<int64_t
>(
szPageListZip));
475 fDaosContainer->WriteSingleAkey(
479 fNTupleAnchor.fNBytesHeader =
nbytes;
484 fDaosContainer->WriteSingleAkey(
488 fNTupleAnchor.fNBytesFooter =
nbytes;
494 auto buffer = std::make_unique<unsigned char[]>(
ntplSize);
495 fNTupleAnchor.Serialize(buffer.get());
496 fDaosContainer->WriteSingleAkey(
497 buffer.get(),
ntplSize,
daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
512 fPageAllocator->DeletePage(
page);
522 fClusterPool(std::make_unique<
RClusterPool>(*
this, options.GetClusterBunchSize()))
528 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
537 std::unique_ptr<unsigned char[]> buffer,
zipBuffer;
540 RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName, *fDecompressor);
543 R__FAIL(
"Attach: requested ntuple '" + fNTupleName +
"' is not present in DAOS container."));
549 fDaosContainer->SetDefaultObjectClass(
oclass);
550 fNTupleIndex =
locator.GetIndex();
555 for (
const auto &
cgDesc : desc.GetClusterGroupIterable()) {
556 buffer = std::make_unique<unsigned char[]>(
cgDesc.GetPageListLength());
557 zipBuffer = std::make_unique<unsigned char[]>(
cgDesc.GetPageListLocator().fBytesOnStorage);
558 fDaosContainer->ReadSingleAkey(
561 fDecompressor->Unzip(
zipBuffer.get(),
cgDesc.GetPageListLocator().fBytesOnStorage,
cgDesc.GetPageListLength(),
572 return fDaosContainer->GetDefaultObjectClass().ToString();
589 R__FAIL(
"accessing caged pages is only supported in conjunction with cluster cache"));
635 R__FAIL(
"accessing caged pages is only supported in conjunction with cluster cache"));
643 fCounters->fNPageLoaded.Inc();
644 fCounters->fNRead.Inc();
648 if (!fCurrentCluster || (fCurrentCluster->GetId() !=
clusterId) || !fCurrentCluster->ContainsColumn(
columnId))
649 fCurrentCluster = fClusterPool->GetCluster(
clusterId, fActivePhysicalColumns.ToColumnSet());
657 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
671 fPagePool->RegisterPage(
673 fCounters->fNPagePopulated.Inc();
730 fPagePool->ReturnPage(
page);
736 return std::unique_ptr<RPageSourceDaos>(
clone);
739std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
746 std::uint64_t fPosition = 0;
748 std::uint64_t
fSize = 0;
760 std::unordered_map<std::uint32_t, std::vector<RDaosSealedPageLocator>>
onDiskPages;
763 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
768 uint32_t position,
offset;
769 std::tie(position,
offset) =
771 auto [
itLoc,
_] =
onDiskPages.emplace(position, std::vector<RDaosSealedPageLocator>());
773 itLoc->second.push_back(
780 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<
unsigned char[]>(
clusterBuffer));
808 fCounters->fNPageLoaded.Add(
nPages);
819 fCounters->fNClusterLoaded.Add(
clusterKeys.size());
821 std::vector<std::unique_ptr<ROOT::Experimental::Internal::RCluster>>
clusters;
832 fCounters->fNReadV.Inc();
846 std::vector<std::unique_ptr<RColumnElementBase>>
allElements;
857 for (
const auto &pi :
pageRange.fPageInfos) {
869 fPagePool->PreloadPage(
880 fCounters->fNPagePopulated.Add(
cluster->GetNOnDiskPages());
882 fTaskScheduler->Wait();
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
TObject * clone(const char *newname) const override
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
UInt_t Hash(const TString &s)
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
A RDaosContainer provides read/write access to objects in a given container.
RDaosObject::DistributionKey_t DistributionKey_t
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
RDaosObject::AttributeKey_t AttributeKey_t
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Helper class to uncompress data blocks in the ROOT compression frame format.
A helper class for piece-wise construction of an RNTupleDescriptor.
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t DeserializeUInt16(const void *buffer, std::uint16_t &val)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::uint32_t SerializeString(const std::string &val, void *buffer)
static std::uint32_t DeserializeUInt32(const void *buffer, std::uint32_t &val)
static std::uint32_t SerializeUInt64(std::uint64_t val, void *buffer)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, DescriptorId_t clusterGroupId, RNTupleDescriptor &desc)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static RResult< std::uint32_t > DeserializeString(const void *buffer, std::uint64_t bufSize, std::string &val)
static std::uint32_t SerializeUInt16(std::uint16_t val, void *buffer)
static std::uint32_t SerializeUInt32(std::uint32_t val, void *buffer)
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
A thread-safe cache of column pages.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
~RPageSinkDaos() override
std::uint64_t CommitClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
void UnzipClusterImpl(RCluster *cluster) final
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
~RPageSourceDaos() override
void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
RNTupleDescriptor AttachImpl() final
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new connection to the pool/container.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
Base class for all ROOT issued exceptions.
The on-storage meta-data of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
uint32_t GetMaxCageSize() const
const std::string & GetObjectClass() const
Common user-tunable settings for storing ntuples.
std::size_t GetApproxUnzippedPageSize() const
static constexpr std::uint16_t kVersionEpoch
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
uint16_t daos_oclass_id_t
std::uint32_t ntuple_index_t
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Describes a read/write operation on multiple attribute keys under the same object ID and distribution...
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint64_t fVersionAnchor
Allows for evolving the struct in future versions.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint16_t fVersionPatch
std::uint32_t Serialize(void *buffer) const
std::uint16_t fVersionEpoch
Version of the binary format supported by the writer.
std::uint16_t fVersionMinor
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
static std::uint32_t GetSize()
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
std::uint16_t fVersionMajor
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
Wrap around a daos_oclass_id_t.
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
On-disk pages within a page source are identified by the column and page number.
Summarizes cluster-level information that are necessary to populate a certain page.
A range of sealed pages referring to the same column that can be used for vector commit.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RNTupleLocator payload that is common for object stores using 64bit location information.
Generic information about the physical location of data.