47enum EDaosMapping { kOidPerCluster, kOidPerPage };
51 DistributionKey_t fDkey;
59static constexpr DistributionKey_t kDistributionKeyDefault = 0x5a3c69f0cafe4a11;
60static constexpr AttributeKey_t kAttributeKeyDefault = 0x4243544b53444229;
61static constexpr AttributeKey_t kAttributeKeyAnchor = 0x4243544b5344422a;
62static constexpr AttributeKey_t kAttributeKeyHeader = 0x4243544b5344422b;
63static constexpr AttributeKey_t kAttributeKeyFooter = 0x4243544b5344422c;
71static constexpr EDaosMapping kDefaultDaosMapping = kOidPerCluster;
73template <EDaosMapping mapping>
75 long unsigned columnId,
long unsigned pageCount)
77 if constexpr (mapping == kOidPerCluster) {
80 static_cast<DistributionKey_t
>(columnId),
static_cast<AttributeKey_t
>(pageCount)};
81 }
else if constexpr (mapping == kOidPerPage) {
84 kDistributionKeyDefault, kAttributeKeyDefault};
90 std::string fPoolLabel;
92 std::string fContainerLabel;
98RDaosURI ParseDaosURI(std::string_view uri)
100 std::regex re(
"daos://([^/]+)/(.+)");
102 if (!std::regex_match(uri.data(),
m, re))
111 auto position =
static_cast<uint32_t
>(address.
fLocation & 0xFFFFFFFF);
113 return {position,
offset};
120 uint64_t address = (position & 0xFFFFFFFF) | (
offset << 32);
130 if (buffer !=
nullptr) {
131 auto bytes =
reinterpret_cast<unsigned char *
>(buffer);
139 return RNTupleSerializer::SerializeString(
fObjClass,
nullptr) + 20;
146 return R__FAIL(
"DAOS anchor too short");
149 auto bytes =
reinterpret_cast<const unsigned char *
>(buffer);
150 bytes += RNTupleSerializer::DeserializeUInt32(
bytes, fVersion);
151 bytes += RNTupleSerializer::DeserializeUInt32(
bytes, fNBytesHeader);
152 bytes += RNTupleSerializer::DeserializeUInt32(
bytes, fLenHeader);
153 bytes += RNTupleSerializer::DeserializeUInt32(
bytes, fNBytesFooter);
154 bytes += RNTupleSerializer::DeserializeUInt32(
bytes, fLenFooter);
155 auto result = RNTupleSerializer::DeserializeString(
bytes, bufSize - 20, fObjClass);
158 return result.Unwrap() + 20;
171 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
172 auto &anchor = fAnchor.emplace();
178 buffer = std::make_unique<unsigned char[]>(anchorSize);
179 if ((err = cont.
ReadSingleAkey(buffer.get(), anchorSize, oidMetadata, kDistributionKeyDefault, kAttributeKeyAnchor,
183 anchor.Deserialize(buffer.get(), anchorSize).Unwrap();
186 buffer = std::make_unique<unsigned char[]>(anchor.fLenHeader);
187 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesHeader);
188 if ((err = cont.
ReadSingleAkey(zipBuffer.get(), anchor.fNBytesHeader, oidMetadata, kDistributionKeyDefault,
189 kAttributeKeyHeader, kCidMetadata)))
191 decompressor.
Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
195 buffer = std::make_unique<unsigned char[]>(anchor.fLenFooter);
196 zipBuffer = std::make_unique<unsigned char[]>(anchor.fNBytesFooter);
197 if ((err = cont.
ReadSingleAkey(zipBuffer.get(), anchor.fNBytesFooter, oidMetadata, kDistributionKeyDefault,
198 kAttributeKeyFooter, kCidMetadata)))
200 decompressor.
Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
206std::pair<ROOT::Experimental::Detail::RDaosContainerNTupleLocator, ROOT::Experimental::RNTupleDescriptorBuilder>
208 const std::string &ntupleName,
214 auto &builder =
result.second;
216 if (
int err = loc.InitNTupleDescriptorBuilder(cont, decompressor, builder); !err) {
217 if (ntupleName.empty() || ntupleName != builder.GetDescriptor().GetName()) {
220 R__FAIL(
"LocateNTuple: ntuple name '" + ntupleName +
"' unavailable in this container."));
233 <<
"Do not store real data with this version of RNTuple!";
234 fCompressor = std::make_unique<RNTupleCompressor>();
241 unsigned char *serializedHeader, std::uint32_t
length)
246 if (oclass.IsUnknown())
251 fCageSizeLimit = std::max(cageSz, pageSz);
253 auto args = ParseDaosURI(fURI);
254 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
256 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel,
true);
257 fDaosContainer->SetDefaultObjectClass(oclass);
261 fNTupleIndex = locator.GetIndex();
263 auto zipBuffer = std::make_unique<unsigned char[]>(
length);
264 auto szZipHeader = fCompressor->Zip(serializedHeader,
length, GetWriteOptions().GetCompression(),
266 WriteNTupleHeader(zipBuffer.get(), szZipHeader,
length);
276 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
280 return CommitSealedPageImpl(columnHandle.
fPhysicalId, sealedPage);
287 auto offsetData = fPageId.fetch_add(1);
288 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNClusters();
292 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, offsetData);
293 fDaosContainer->WriteSingleAkey(sealedPage.
fBuffer, sealedPage.
fSize, daosKey.fOid, daosKey.fDkey, daosKey.fAkey);
297 result.fPosition = EncodeDaosPagePosition(offsetData);
300 fCounters->fNPageCommitted.Inc();
301 fCounters->fSzWritePayload.Add(sealedPage.
fSize);
302 fNBytesCurrentCluster += sealedPage.
fSize;
306std::vector<ROOT::Experimental::RNTupleLocator>
310 std::vector<ROOT::Experimental::RNTupleLocator> locators;
313 return c + std::distance(r.fFirst, r.fLast);
315 locators.reserve(nPages);
317 const uint32_t maxCageSz = fCageSizeLimit;
318 const bool useCaging = fCageSizeLimit > 0;
321 DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNClusters();
322 int64_t payloadSz = 0;
323 std::size_t positionOffset;
324 uint32_t positionIndex;
327 for (
auto &range : ranges) {
332 positionIndex = useCaging ? fPageId.fetch_add(1) : fPageId.load();
334 for (
auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
338 if (positionOffset + s.
fSize > maxCageSz) {
340 positionIndex = fPageId.fetch_add(1);
347 GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, positionIndex);
350 it->second.Insert(daosKey.fAkey, pageIov);
353 locator.
fPosition = EncodeDaosPagePosition(positionIndex, positionOffset);
357 locators.push_back(locator);
359 positionOffset += s.
fSize;
360 payloadSz += s.
fSize;
363 fNBytesCurrentCluster += payloadSz;
367 if (
int err = fDaosContainer->WriteV(writeRequests))
371 fCounters->fNPageCommitted.Add(nPages);
372 fCounters->fSzWritePayload.Add(payloadSz);
380 return std::exchange(fNBytesCurrentCluster, 0);
387 auto bufPageListZip = std::make_unique<unsigned char[]>(
length);
388 auto szPageListZip = fCompressor->Zip(serializedPageList,
length, GetWriteOptions().GetCompression(),
391 auto offsetData = fClusterGroupId.fetch_add(1);
392 fDaosContainer->WriteSingleAkey(
393 bufPageListZip.get(), szPageListZip,
394 daos_obj_id_t{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)}, kDistributionKeyDefault,
395 offsetData, kCidMetadata);
398 result.fBytesOnStorage = szPageListZip;
400 fCounters->fSzWritePayload.Add(
static_cast<int64_t
>(szPageListZip));
406 auto bufFooterZip = std::make_unique<unsigned char[]>(
length);
407 auto szFooterZip = fCompressor->Zip(serializedFooter,
length, GetWriteOptions().GetCompression(),
409 WriteNTupleFooter(bufFooterZip.get(), szFooterZip,
length);
415 fDaosContainer->WriteSingleAkey(
417 kDistributionKeyDefault, kAttributeKeyHeader, kCidMetadata);
418 fNTupleAnchor.fLenHeader = lenHeader;
419 fNTupleAnchor.fNBytesHeader = nbytes;
424 fDaosContainer->WriteSingleAkey(
426 kDistributionKeyDefault, kAttributeKeyFooter, kCidMetadata);
427 fNTupleAnchor.fLenFooter = lenFooter;
428 fNTupleAnchor.fNBytesFooter = nbytes;
434 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
435 fNTupleAnchor.Serialize(buffer.get());
436 fDaosContainer->WriteSingleAkey(
437 buffer.get(), ntplSize,
daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
438 kDistributionKeyDefault, kAttributeKeyAnchor, kCidMetadata);
447 return fPageAllocator->NewPage(columnHandle.
fPhysicalId, elementSize, nElements);
452 fPageAllocator->DeletePage(page);
460 fPagePool(std::make_shared<
RPagePool>()),
462 fClusterPool(std::make_unique<
RClusterPool>(*this, options.GetClusterBunchSize()))
467 auto args = ParseDaosURI(uri);
468 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
469 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel);
477 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
479 auto [locator, descBuilder] =
481 if (!locator.IsValid())
483 R__FAIL(
"Attach: requested ntuple '" + fNTupleName +
"' is not present in DAOS container."));
486 if (oclass.IsUnknown())
489 fDaosContainer->SetDefaultObjectClass(oclass);
490 fNTupleIndex = locator.GetIndex();
492 ntplDesc = descBuilder.MoveDescriptor();
496 buffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLength());
497 zipBuffer = std::make_unique<unsigned char[]>(cgDesc.GetPageListLocator().fBytesOnStorage);
498 fDaosContainer->ReadSingleAkey(
499 zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, oidPageList, kDistributionKeyDefault,
501 fDecompressor->Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().fBytesOnStorage, cgDesc.GetPageListLength(),
506 for (std::size_t i = 0; i < clusters.size(); ++i) {
516 return fDaosContainer->GetDefaultObjectClass().ToString();
527 auto descriptorGuard = GetSharedDescriptorGuard();
528 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
529 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(clusterIndex.
GetIndex());
534 R__FAIL(
"accessing caged pages is only supported in conjunction with cluster cache"));
538 sealedPage.
fSize = bytesOnStorage;
543 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(
545 fDaosContainer->ReadSingleAkey(
const_cast<void *
>(sealedPage.
fBuffer), bytesOnStorage, daosKey.fOid,
546 daosKey.fDkey, daosKey.fAkey);
558 const auto clusterId = clusterInfo.
fClusterId;
559 const auto &pageInfo = clusterInfo.
fPageInfo;
562 const auto elementSize = element->
GetSize();
563 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
565 const void *sealedPageBuffer =
nullptr;
566 std::unique_ptr<unsigned char[]> directReadBuffer;
570 pageZero.GrowUnchecked(pageInfo.fNElements);
571 pageZero.SetWindow(clusterInfo.
fColumnOffset + pageInfo.fFirstInPage,
573 fPagePool->RegisterPage(pageZero,
RPageDeleter([](
const RPage &,
void *) {},
nullptr));
580 R__FAIL(
"accessing caged pages is only supported in conjunction with cluster cache"));
583 directReadBuffer = std::unique_ptr<unsigned char[]>(
new unsigned char[bytesOnStorage]);
584 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(
586 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), bytesOnStorage, daosKey.fOid, daosKey.fDkey,
588 fCounters->fNPageLoaded.Inc();
589 fCounters->fNRead.Inc();
590 fCounters->fSzReadPayload.Add(bytesOnStorage);
591 sealedPageBuffer = directReadBuffer.get();
593 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
594 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
595 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
597 auto cachedPage = fPagePool->GetPage(columnId,
RClusterIndex(clusterId, idxInCluster));
598 if (!cachedPage.IsNull())
602 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
603 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
604 sealedPageBuffer = onDiskPage->GetAddress();
610 newPage = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element, columnId);
611 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
616 fPagePool->RegisterPage(
619 fCounters->fNPagePopulated.Inc();
627 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
628 if (!cachedPage.IsNull())
631 std::uint64_t idxInCluster;
634 auto descriptorGuard = GetSharedDescriptorGuard();
635 clusterInfo.
fClusterId = descriptorGuard->FindClusterId(columnId, globalIndex);
638 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterInfo.
fClusterId);
639 clusterInfo.
fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
642 clusterInfo.
fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
644 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
652 const auto idxInCluster = clusterIndex.
GetIndex();
654 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
655 if (!cachedPage.IsNull())
661 auto descriptorGuard = GetSharedDescriptorGuard();
662 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
664 clusterInfo.
fColumnOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
665 clusterInfo.
fPageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
668 return PopulatePageFromCluster(columnHandle, clusterInfo, idxInCluster);
673 fPagePool->ReturnPage(page);
679 return std::unique_ptr<RPageSourceDaos>(
clone);
682std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
685 struct RDaosSealedPageLocator {
689 std::uint64_t fPosition = 0;
690 std::uint64_t fCageOffset = 0;
691 std::uint64_t
fSize = 0;
698 auto fnPrepareSingleCluster = [&](
const RCluster::RKey &clusterKey,
703 std::unordered_map<std::uint32_t, std::vector<RDaosSealedPageLocator>> onDiskPages;
705 unsigned clusterBufSz = 0, nPages = 0;
706 auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
707 PrepareLoadCluster(clusterKey, *pageZeroMap,
710 const auto &pageLocator = pageInfo.
fLocator;
711 uint32_t position,
offset;
712 std::tie(position,
offset) =
714 auto [itLoc,
_] = onDiskPages.emplace(position, std::vector<RDaosSealedPageLocator>());
716 itLoc->second.push_back(
717 {clusterId, physicalColumnId, pageNo, position,
offset, pageLocator.fBytesOnStorage});
719 clusterBufSz += pageLocator.fBytesOnStorage;
722 auto clusterBuffer =
new unsigned char[clusterBufSz];
723 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<
unsigned char[]>(clusterBuffer));
725 auto cageBuffer = clusterBuffer;
727 for (
auto &[cageIndex, pageVec] : onDiskPages) {
728 auto columnId = pageVec[0].fColumnId;
729 std::size_t cageSz = 0;
731 for (
auto &s : pageVec) {
732 assert(columnId == s.fColumnId);
733 assert(cageIndex == s.fPosition);
736 pageMap->Register(key,
ROnDiskPage(cageBuffer + s.fCageOffset, s.fSize));
744 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, columnId, cageIndex);
747 itReq->second.Insert(daosKey.fAkey, iov);
749 cageBuffer += cageSz;
751 fCounters->fNPageLoaded.Add(nPages);
752 fCounters->fSzReadPayload.Add(clusterBufSz);
754 auto cluster = std::make_unique<RCluster>(clusterId);
755 cluster->Adopt(std::move(pageMap));
756 cluster->Adopt(std::move(pageZeroMap));
758 cluster->SetColumnAvailable(colId);
762 fCounters->fNClusterLoaded.Add(clusterKeys.size());
764 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> clusters;
766 for (
auto key : clusterKeys) {
767 clusters.emplace_back(fnPrepareSingleCluster(key, readRequests));
772 if (
int err = fDaosContainer->ReadV(readRequests))
775 fCounters->fNReadV.Inc();
776 fCounters->fNRead.Add(readRequests.size());
784 fTaskScheduler->Reset();
786 const auto clusterId = cluster->
GetId();
787 auto descriptorGuard = GetSharedDescriptorGuard();
788 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
790 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
793 for (
const auto columnId : columnsInCluster) {
794 const auto &columnDesc = descriptorGuard->GetColumnDescriptor(columnId);
798 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
799 std::uint64_t pageNo = 0;
800 std::uint64_t firstInPage = 0;
801 for (
const auto &pi : pageRange.fPageInfos) {
804 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
806 auto taskFunc = [
this, columnId, clusterId, firstInPage, onDiskPage, element = allElements.back().get(),
807 nElements = pi.fNElements,
808 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex]() {
809 auto newPage = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element, columnId);
810 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
813 fPagePool->PreloadPage(
819 fTaskScheduler->AddTask(taskFunc);
821 firstInPage += pi.fNElements;
828 fTaskScheduler->Wait();
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
#define R__LOG_WARNING(...)
TObject * clone(const char *newname) const override
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
size_t GetNOnDiskPages() const
DescriptorId_t GetId() const
const ColumnSet_t & GetAvailPhysicalColumns() const
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
std::size_t GetSize() const
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
If CppT == void, use the default C++ type for the given column type.
RColumnElementBase * GetElement() const
A RDaosContainer provides read/write access to objects in a given container.
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
int ReadSingleAkey(void *buffer, std::size_t length, daos_obj_id_t oid, DistributionKey_t dkey, AttributeKey_t akey, ObjClassId_t cid)
Read data from a single object attribute key to the given buffer.
RDaosObject::DistributionKey_t DistributionKey_t
RDaosObject::AttributeKey_t AttributeKey_t
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Helper class to uncompress data blocks in the ROOT compression frame format.
void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Uses standard C++ memory allocation for the column data pages.
static void DeletePage(const RPage &page)
Releases the memory pointed to by page and resets the page's information.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
RNTupleLocator CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
~RPageSinkDaos() override
void CreateImpl(const RNTupleModel &model, unsigned char *serializedHeader, std::uint32_t length) final
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges) final
Vector commit of preprocessed pages.
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
~RPageSourceDaos() override
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UnzipClusterImpl(RCluster *cluster) final
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
void LoadSealedPage(DescriptorId_t physicalColumnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ClusterSize_t::ValueType idxInCluster)
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
RNTupleDescriptor AttachImpl() final
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new connection to the pool/container.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
A page is a slice of a column that is mapped into memory.
std::uint32_t GetNBytes() const
The space taken by column elements in the buffer.
static RPage MakePageZero(ColumnId_t columnId, ClusterSize_t::ValueType elementSize)
Make a 'zero' page for column columnId (that is comprised of 0x00 bytes only).
void SetWindow(const NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::vector< RClusterDescriptorBuilder > GetClusterSummaries(const RNTupleDescriptor &ntplDesc, DescriptorId_t clusterGroupId)
Used to prepare the cluster descriptor builders when loading the page locations for a certain cluster...
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
A helper class for piece-wise construction of an RNTupleDescriptor.
void AddToOnDiskFooterSize(std::uint64_t size)
The real footer size also include the page list envelopes.
void SetOnDiskHeaderSize(std::uint64_t size)
The on-storage meta-data of an ntuple.
RClusterGroupDescriptorIterable GetClusterGroupIterable() const
RResult< void > AddClusterDetails(RClusterDescriptor &&clusterDesc)
Methods to load and drop cluster details.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
uint32_t GetMaxCageSize() const
const std::string & GetObjectClass() const
Common user-tunable settings for storing ntuples.
std::size_t GetApproxUnzippedPageSize() const
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
uint16_t daos_oclass_id_t
std::uint32_t ntuple_index_t
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
constexpr DescriptorId_t kInvalidDescriptorId
The identifiers that specifies the content of a (partial) cluster.
DescriptorId_t fClusterId
ColumnSet_t fPhysicalColumnSet
Helper structure concentrating the functionality required to locate an ntuple within a DAOS container...
int InitNTupleDescriptorBuilder(RDaosContainer &cont, RNTupleDecompressor &decompressor, RNTupleDescriptorBuilder &builder)
static std::pair< RDaosContainerNTupleLocator, RNTupleDescriptorBuilder > LocateNTuple(RDaosContainer &cont, const std::string &ntupleName, RNTupleDecompressor &decompressor)
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Describes a read/write operation on multiple attribute keys under the same object ID and distribution...
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint32_t fVersion
Allows for evolving the struct in future versions.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
static std::uint32_t GetSize()
std::uint32_t Serialize(void *buffer) const
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
Wrap around a daos_oclass_id_t.
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
On-disk pages within a page source are identified by the column and page number.
Summarizes cluster-level information that are necessary to populate a certain page.
RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo
Location of the page on disk.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
DescriptorId_t fClusterId
DescriptorId_t fPhysicalId
A range of sealed pages referring to the same column that can be used for vector commit.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RNTupleLocator payload that is common for object stores using 64bit location information.
Generic information about the physical location of data.
std::uint8_t fReserved
Reserved for use by concrete storage backends.
ELocatorType fType
For non-disk locators, the value for the Type field.
std::uint32_t fBytesOnStorage
std::variant< std::uint64_t, std::string, RNTupleLocatorObject64 > fPosition
Simple on-disk locators consisting of a 64-bit offset use variant type uint64_t; extended locators ha...
const T & GetPosition() const