Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \ingroup NTuple
3/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
4/// \date 2020-11-03
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RLogger.hxx>
19#include <ROOT/RNTupleModel.hxx>
22#include <ROOT/RNTupleTypes.hxx>
23#include <ROOT/RNTupleUtils.hxx>
24#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPage.hxx>
27#include <ROOT/RPagePool.hxx>
28#include <ROOT/RDaos.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <cstdio>
36#include <cstdlib>
37#include <cstring>
38#include <limits>
39#include <utility>
40#include <regex>
41#include <cassert>
42
43namespace {
52
53/// \brief RNTuple page-DAOS mappings
54enum EDaosMapping {
57};
58
59struct RDaosKey {
60 daos_obj_id_t fOid;
61 DistributionKey_t fDkey;
62 AttributeKey_t fAkey;
63};
64
65/// \brief Pre-defined keys for object store. `kDistributionKeyDefault` is the distribution key for metadata and
66/// pagelist values; optionally it can be used for ntuple pages (if under the `kOidPerPage` mapping strategy).
67/// `kAttributeKeyDefault` is the attribute key for ntuple pages under `kOidPerPage`.
68/// `kAttributeKey{Anchor,Header,Footer}` are the respective attribute keys for anchor/header/footer metadata elements.
69static constexpr DistributionKey_t kDistributionKeyDefault = 0x5a3c69f0cafe4a11;
70static constexpr AttributeKey_t kAttributeKeyDefault = 0x4243544b53444229;
71static constexpr AttributeKey_t kAttributeKeyAnchor = 0x4243544b5344422a;
72static constexpr AttributeKey_t kAttributeKeyHeader = 0x4243544b5344422b;
73static constexpr AttributeKey_t kAttributeKeyFooter = 0x4243544b5344422c;
74
75/// \brief Pre-defined 64 LSb of the OIDs for ntuple metadata (holds anchor/header/footer) and clusters' pagelists.
76static constexpr decltype(daos_obj_id_t::lo) kOidLowMetadata = -1;
77static constexpr decltype(daos_obj_id_t::lo) kOidLowPageList = -2;
78
79static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
80
81static constexpr EDaosMapping kDefaultDaosMapping = kOidPerCluster;
82
83template <EDaosMapping mapping>
85 long unsigned columnId, long unsigned pageCount)
86{
87 if constexpr (mapping == kOidPerCluster) {
88 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(clusterId),
89 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
90 static_cast<DistributionKey_t>(columnId), static_cast<AttributeKey_t>(pageCount)};
91 } else if constexpr (mapping == kOidPerPage) {
92 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(pageCount),
93 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
95 }
96}
97
98struct RDaosURI {
99 /// \brief Label of the DAOS pool
100 std::string fPoolLabel;
101 /// \brief Label of the container for this RNTuple
102 std::string fContainerLabel;
103};
104
105/**
106 \brief Parse a DAOS RNTuple URI of the form 'daos://pool_id/container_id'.
107*/
108RDaosURI ParseDaosURI(std::string_view uri)
109{
110 std::regex re("daos://([^/]+)/(.+)");
111 std::cmatch m;
112 if (!std::regex_match(uri.data(), m, re))
113 throw ROOT::RException(R__FAIL("Invalid DAOS pool URI."));
114 return {m[1], m[2]};
115}
116
117/// \brief Helper structure concentrating the functionality required to locate an ntuple within a DAOS container.
118/// It includes a hashing function that converts the RNTuple's name into a 32-bit identifier; this value is used to
119/// index the subspace for the ntuple among all objects in the container. A zero-value hash value is reserved for
120/// storing any future metadata related to container-wide management; a zero-index ntuple is thus disallowed and
121/// remapped to "1". Once the index is computed, `InitNTupleDescriptorBuilder()` can be called to return a
122/// partially-filled builder with the ntuple's anchor, header and footer, lacking only pagelists. Upon that call,
123/// a copy of the anchor is stored in `fAnchor`.
124struct RDaosContainerNTupleLocator {
125 std::string fName{};
126 ntuple_index_t fIndex{};
127 std::optional<ROOT::Experimental::Internal::RDaosNTupleAnchor> fAnchor;
128 static const ntuple_index_t kReservedIndex = 0;
129
130 RDaosContainerNTupleLocator() = default;
131 explicit RDaosContainerNTupleLocator(const std::string &ntupleName) : fName(ntupleName), fIndex(Hash(ntupleName)) {}
132
133 bool IsValid() { return fAnchor.has_value() && fAnchor->fNBytesHeader; }
134 [[nodiscard]] ntuple_index_t GetIndex() const { return fIndex; };
135 static ntuple_index_t Hash(const std::string &ntupleName)
136 {
137 // Convert string to numeric representation via `std::hash`.
138 uint64_t h = std::hash<std::string>{}(ntupleName);
139 // Fold the hash into 32-bit using `boost::hash_combine()` algorithm and magic number.
140 auto seed = static_cast<uint32_t>(h >> 32);
141 seed ^= static_cast<uint32_t>(h & 0xffffffff) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
142 auto hash = static_cast<ntuple_index_t>(seed);
143 return (hash == kReservedIndex) ? kReservedIndex + 1 : hash;
144 }
145
148 {
149 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
150 auto &anchor = fAnchor.emplace();
151 int err;
152
154 daos_obj_id_t oidMetadata{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(this->GetIndex())};
155
157 if ((err = cont.ReadSingleAkey(buffer.get(), anchorSize, oidMetadata, kDistributionKeyDefault,
159 return err;
160 }
161
162 anchor.Deserialize(buffer.get(), anchorSize).Unwrap();
163
164 builder.SetVersion(anchor.fVersionEpoch, anchor.fVersionMajor, anchor.fVersionMinor, anchor.fVersionPatch);
165 builder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
166 buffer = MakeUninitArray<unsigned char>(anchor.fLenHeader);
168 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesHeader, oidMetadata, kDistributionKeyDefault,
170 return err;
171 RNTupleDecompressor::Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
172 RNTupleSerializer::DeserializeHeader(buffer.get(), anchor.fLenHeader, builder);
173
174 builder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
175 buffer = MakeUninitArray<unsigned char>(anchor.fLenFooter);
177 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesFooter, oidMetadata, kDistributionKeyDefault,
179 return err;
180 RNTupleDecompressor::Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
181 RNTupleSerializer::DeserializeFooter(buffer.get(), anchor.fLenFooter, builder);
182
183 return 0;
184 }
185
186 static std::pair<RDaosContainerNTupleLocator, ROOT::Internal::RNTupleDescriptorBuilder>
188 {
189 auto result = std::make_pair(RDaosContainerNTupleLocator(ntupleName), ROOT::Internal::RNTupleDescriptorBuilder());
190
191 auto &loc = result.first;
192 auto &builder = result.second;
193
194 if (int err = loc.InitNTupleDescriptorBuilder(cont, builder); !err) {
195 if (ntupleName.empty() || ntupleName != builder.GetDescriptor().GetName()) {
196 // Hash already taken by a differently-named ntuple.
197 throw ROOT::RException(
198 R__FAIL("LocateNTuple: ntuple name '" + ntupleName + "' unavailable in this container."));
199 }
200 }
201 return result;
202 }
203};
204
205} // anonymous namespace
206
207////////////////////////////////////////////////////////////////////////////////
208
226
229{
230 if (bufSize < 32)
231 return R__FAIL("DAOS anchor too short");
232
233 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
235 if (fVersionAnchor != RDaosNTupleAnchor().fVersionAnchor) {
236 return R__FAIL("unsupported DAOS anchor version: " + std::to_string(fVersionAnchor));
237 }
238
248 if (!result)
249 return R__FORWARD_ERROR(result);
250 return result.Unwrap() + 32;
251}
252
257
258////////////////////////////////////////////////////////////////////////////////
259
261 const ROOT::RNTupleWriteOptions &options)
262 : RPagePersistentSink(ntupleName, options), fURI(uri)
263{
264 static std::once_flag once;
265 std::call_once(once, []() {
266 R__LOG_WARNING(ROOT::Internal::NTupleLog()) << "The DAOS backend is experimental and still under development. "
267 << "Do not store real data with this version of RNTuple!";
268 });
269 EnableDefaultMetrics("RPageSinkDaos");
270}
271
273
275{
276 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
277 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
278 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
279 if (oclass.IsUnknown())
280 throw ROOT::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
281
282 auto args = ParseDaosURI(fURI);
283 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
284
285 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel, /*create =*/true);
286 fDaosContainer->SetDefaultObjectClass(oclass);
287
288 auto [locator, _] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
289 fNTupleIndex = locator.GetIndex();
290
292 auto szZipHeader =
293 RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get());
294 WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
295}
296
299{
300 auto element = columnHandle.fColumn->GetElement();
302 {
303 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
304 sealedPage = SealPage(page, *element);
305 }
306
307 fCounters->fSzZip.Add(page.GetNBytes());
308 return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage);
309}
310
314{
315 auto pageId = fPageId.fetch_add(1);
316 ROOT::DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
317
318 {
319 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
321 fDaosContainer->WriteSingleAkey(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
322 daosKey.fAkey);
323 }
324
327 result.SetNBytesOnStorage(sealedPage.GetDataSize());
329 fCounters->fNPageCommitted.Inc();
330 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
331 fNBytesCurrentCluster += sealedPage.GetBufferSize();
332 return result;
333}
334
335std::vector<ROOT::RNTupleLocator>
336ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
337 const std::vector<bool> &mask)
338{
340 std::vector<RNTupleLocator> locators;
341 auto nPages = mask.size();
342 locators.reserve(nPages);
343
344 ROOT::DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
345 int64_t payloadSz = 0;
346
347 /// Aggregate batch of requests by object ID and distribution key, determined by the ntuple-DAOS mapping
348 for (auto &range : ranges) {
349 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
351
352 const auto pageId = fPageId.fetch_add(1);
353
355 d_iov_set(&pageIov, const_cast<void *>(s.GetBuffer()), s.GetBufferSize());
356
357 RDaosKey daosKey =
358 GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, pageId);
361 it->second.Insert(daosKey.fAkey, pageIov);
362
365 locator.SetNBytesOnStorage(s.GetDataSize());
367 locators.push_back(locator);
368
370 }
371 }
372 fNBytesCurrentCluster += payloadSz;
373
374 {
375 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
376 if (int err = fDaosContainer->WriteV(writeRequests))
377 throw ROOT::RException(R__FAIL("WriteV: error" + std::string(d_errstr(err))));
378 }
379
380 fCounters->fNPageCommitted.Add(nPages);
381 fCounters->fSzWritePayload.Add(payloadSz);
382
383 return locators;
384}
385
387{
388 return std::exchange(fNBytesCurrentCluster, 0);
389}
390
393 std::uint32_t length)
394{
396 auto szPageListZip =
397 RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get());
398
399 auto offsetData = fClusterGroupId.fetch_add(1);
400 fDaosContainer->WriteSingleAkey(
402 daos_obj_id_t{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)}, kDistributionKeyDefault,
406 result.SetNBytesOnStorage(szPageListZip);
408 fCounters->fSzWritePayload.Add(static_cast<int64_t>(szPageListZip));
409 return result;
410}
411
414{
416 auto szFooterZip =
417 RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get());
418 WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
419 WriteNTupleAnchor();
420
421 // TODO: return the proper anchor locator+length
422 return {};
423}
424
426{
427 fDaosContainer->WriteSingleAkey(
428 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
430 fNTupleAnchor.fLenHeader = lenHeader;
431 fNTupleAnchor.fNBytesHeader = nbytes;
432}
433
435{
436 fDaosContainer->WriteSingleAkey(
437 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
439 fNTupleAnchor.fLenFooter = lenFooter;
440 fNTupleAnchor.fNBytesFooter = nbytes;
441}
442
444{
447 fNTupleAnchor.Serialize(buffer.get());
448 fDaosContainer->WriteSingleAkey(
449 buffer.get(), ntplSize, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
451}
452
453std::unique_ptr<ROOT::Internal::RPageSink>
455 const ROOT::RNTupleWriteOptions & /*opts*/) const
456{
457 throw ROOT::RException(R__FAIL("cloning a DAOS sink is not implemented yet"));
458}
459
460////////////////////////////////////////////////////////////////////////////////
461
463 const ROOT::RNTupleReadOptions &options)
464 : RPageSource(ntupleName, options), fURI(uri)
465{
466 EnableDefaultMetrics("RPageSourceDaos");
467
468 auto args = ParseDaosURI(uri);
469 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
470 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel);
471}
472
474{
475 fClusterPool.StopBackgroundThread();
476}
477
480{
482 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
483
484 auto [locator, descBuilder] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
485 if (!locator.IsValid())
486 throw ROOT::RException(
487 R__FAIL("Attach: requested ntuple '" + fNTupleName + "' is not present in DAOS container."));
488
489 auto oclass = RDaosObject::ObjClassId(locator.fAnchor->fObjClass);
490 if (oclass.IsUnknown())
491 throw ROOT::RException(R__FAIL("Attach: unknown object class " + locator.fAnchor->fObjClass));
492
493 fDaosContainer->SetDefaultObjectClass(oclass);
494 fNTupleIndex = locator.GetIndex();
496
497 auto desc = descBuilder.MoveDescriptor();
498
499 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
500 buffer = MakeUninitArray<unsigned char>(cgDesc.GetPageListLength());
501 zipBuffer = MakeUninitArray<unsigned char>(cgDesc.GetPageListLocator().GetNBytesOnStorage());
502 fDaosContainer->ReadSingleAkey(
503 zipBuffer.get(), cgDesc.GetPageListLocator().GetNBytesOnStorage(), oidPageList, kDistributionKeyDefault,
504 cgDesc.GetPageListLocator().GetPosition<RNTupleLocatorObject64>().GetLocation(), kCidMetadata);
505 RNTupleDecompressor::Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().GetNBytesOnStorage(),
506 cgDesc.GetPageListLength(), buffer.get());
507
508 RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc, mode);
509 }
510
511 return desc;
512}
513
515{
516 return fDaosContainer->GetDefaultObjectClass().ToString();
517}
518
522{
523 const auto clusterId = localIndex.GetClusterId();
524
526 {
527 auto descriptorGuard = GetSharedDescriptorGuard();
528 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
529 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(localIndex.GetIndexInCluster());
530 }
531
532 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
533 sealedPage.SetNElements(pageInfo.GetNElements());
534 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
535 if (!sealedPage.GetBuffer())
536 return;
537
538 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
539 assert(!pageInfo.HasChecksum());
540 memcpy(const_cast<void *>(sealedPage.GetBuffer()), ROOT::Internal::RPage::GetPageZeroBuffer(),
541 sealedPage.GetBufferSize());
542 return;
543 }
544
545 RDaosKey daosKey =
547 pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
548 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(), daosKey.fOid,
549 daosKey.fDkey, daosKey.fAkey);
550
551 sealedPage.VerifyChecksumIfEnabled().ThrowOnError();
552}
553
557{
558 const auto columnId = columnHandle.fPhysicalId;
559 const auto clusterId = clusterInfo.fClusterId;
560 const auto &pageInfo = clusterInfo.fPageInfo;
561
562 const auto element = columnHandle.fColumn->GetElement();
563 const auto elementSize = element->GetSize();
564 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
565
566 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
567 auto pageZero = fPageAllocator->NewPage(elementSize, pageInfo.GetNElements());
568 pageZero.GrowUnchecked(pageInfo.GetNElements());
569 memset(pageZero.GetBuffer(), 0, pageZero.GetNBytes());
570 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
572 return fPagePool.RegisterPage(std::move(pageZero),
574 }
575
577 sealedPage.SetNElements(pageInfo.GetNElements());
578 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
579 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
580 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
581
582 if (fOptions.GetClusterCache() == ROOT::RNTupleReadOptions::EClusterCache::kOff) {
585 fNTupleIndex, clusterId, columnId, pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
586 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
587 daosKey.fAkey);
588 fCounters->fNPageRead.Inc();
589 fCounters->fNRead.Inc();
590 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
591 sealedPage.SetBuffer(directReadBuffer.get());
592 } else {
593 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
594 fCurrentCluster = fClusterPool.GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
595 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
596
599 if (!cachedPageRef.Get().IsNull())
600 return cachedPageRef;
601
603 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
604 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
605 sealedPage.SetBuffer(onDiskPage->GetAddress());
606 }
607
609 {
610 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
611 newPage = UnsealPage(sealedPage, *element).Unwrap();
612 fCounters->fSzUnzip.Add(elementSize * pageInfo.GetNElements());
613 }
614
615 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
617 fCounters->fNPageUnsealed.Inc();
618 return fPagePool.RegisterPage(std::move(newPage), ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType});
619}
620
621std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceDaos::CloneImpl() const
622{
623 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
624 return std::unique_ptr<RPageSourceDaos>(clone);
625}
626
627std::vector<std::unique_ptr<RCluster>>
629{
631 ROOT::DescriptorId_t fClusterId = 0;
632 ROOT::DescriptorId_t fColumnId = 0;
633 ROOT::NTupleSize_t fPageNo = 0;
634 std::uint64_t fPageId = 0;
635 std::uint64_t fDataSize = 0; // page payload
636 std::uint64_t fBufferSize = 0; // page payload + checksum (if available)
637 };
638
639 // Prepares read requests for a single cluster; `readRequests` is modified by this function. Requests are coalesced
640 // by OID and distribution key.
641 // TODO(jalopezg): this may be a private member function; that, however, requires additional changes given that
642 // `RDaosContainer::MultiObjectRWOperation_t` cannot be forward-declared
645 auto clusterId = clusterKey.fClusterId;
646 std::vector<RDaosSealedPageLocator> onDiskPages;
647
648 unsigned clusterBufSz = 0, nPages = 0;
649 auto pageZeroMap = std::make_unique<ROOT::Internal::ROnDiskPageMap>();
650 PrepareLoadCluster(
654 const auto &pageLocator = pageInfo.GetLocator();
655 const auto pageId = pageLocator.GetPosition<RNTupleLocatorObject64>().GetLocation();
656 const auto pageBufferSize = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
658 pageLocator.GetNBytesOnStorage(), pageBufferSize});
659
660 ++nPages;
662 });
663
664 auto clusterBuffer = new unsigned char[clusterBufSz];
665 auto pageMap =
666 std::make_unique<ROOT::Internal::ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(clusterBuffer));
667
668 // Fill the cluster page map and the read requests for the RDaosContainer::ReadV() call
669 for (const auto &sealedLoc : onDiskPages) {
671 pageMap->Register(key, ROOT::Internal::ROnDiskPage(clusterBuffer, sealedLoc.fBufferSize));
672
673 // Prepare new read request batched up by object ID and distribution key
674 d_iov_t iov;
675 d_iov_set(&iov, clusterBuffer, sealedLoc.fBufferSize);
676
677 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, sealedLoc.fClusterId, sealedLoc.fColumnId,
678 sealedLoc.fPageId);
681 itReq->second.Insert(daosKey.fAkey, iov);
682
683 clusterBuffer += sealedLoc.fBufferSize;
684 }
685 fCounters->fNPageRead.Add(nPages);
686 fCounters->fSzReadPayload.Add(clusterBufSz);
687
688 auto cluster = std::make_unique<RCluster>(clusterId);
689 cluster->Adopt(std::move(pageMap));
690 cluster->Adopt(std::move(pageZeroMap));
691 for (auto colId : clusterKey.fPhysicalColumnSet)
692 cluster->SetColumnAvailable(colId);
693 return cluster;
694 };
695
696 fCounters->fNClusterLoaded.Add(clusterKeys.size());
697
698 std::vector<std::unique_ptr<ROOT::Internal::RCluster>> clusters;
700 for (auto key : clusterKeys) {
701 clusters.emplace_back(fnPrepareSingleCluster(key, readRequests));
702 }
703
704 {
705 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
706 if (int err = fDaosContainer->ReadV(readRequests))
707 throw ROOT::RException(R__FAIL("ReadV: error" + std::string(d_errstr(err))));
708 }
709 fCounters->fNReadV.Inc();
710 fCounters->fNRead.Add(readRequests.size());
711
712 return clusters;
713}
714
716{
717 R__LOG_WARNING(ROOT::Internal::NTupleLog()) << "DAOS-backed sources have no associated StreamerInfo to load.";
718}
719
720std::unique_ptr<ROOT::Internal::RPageSource>
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:304
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:300
#define R__LOG_WARNING(...)
Definition RLogger.hxx:358
#define h(i)
Definition RSha256.hxx:106
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void data
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t mask
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char mode
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
UInt_t Hash(const TString &s)
Definition TString.h:503
#define _(A, B)
Definition cfortran.h:108
A RDaosContainer provides read/write access to objects in a given container.
Definition RDaos.hxx:157
RDaosObject::DistributionKey_t DistributionKey_t
Definition RDaos.hxx:160
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
Definition RDaos.hxx:231
RDaosObject::AttributeKey_t AttributeKey_t
Definition RDaos.hxx:161
std::unique_ptr< ROOT::Internal::RPageSink > CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const final
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options)
RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
Storage provider that reads ntuple pages from a DAOS container.
void LoadStreamerInfo() final
Forces the loading of ROOT StreamerInfo from the underlying file.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new connection to the pool/container.
ROOT::Internal::RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster) final
std::vector< std::unique_ptr< ROOT::Internal::RCluster > > LoadClusters(std::span< ROOT::Internal::RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
std::unique_ptr< RPageSource > OpenWithDifferentAnchor(const ROOT::Internal::RNTupleLink &anchorLink, const ROOT::RNTupleReadOptions &options={}) final
Creates a new PageSource using the same underlying file as this but referring to a different RNTuple,...
void LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleReadOptions &options)
ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode) final
LoadStructureImpl() has been called before AttachImpl() is called
DAOS-specific user-tunable settings for storing ntuples.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:148
Helper class to compress data blocks in the ROOT compression frame format.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
Helper class to uncompress data blocks in the ROOT compression frame format.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
A helper class for piece-wise construction of an RNTupleDescriptor.
void SetVersion(std::uint16_t versionEpoch, std::uint16_t versionMajor, std::uint16_t versionMinor, std::uint16_t versionPatch)
const RNTupleDescriptor & GetDescriptor() const
void AddToOnDiskFooterSize(std::uint64_t size)
The real footer size also include the page list envelopes.
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< std::uint32_t > DeserializeString(const void *buffer, std::uint64_t bufSize, std::string &val)
static std::uint32_t SerializeUInt32(std::uint32_t val, void *buffer)
static std::uint32_t DeserializeUInt32(const void *buffer, std::uint32_t &val)
static std::uint32_t SerializeUInt16(std::uint16_t val, void *buffer)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, ROOT::DescriptorId_t clusterGroupId, RNTupleDescriptor &desc, EDescriptorDeserializeMode mode)
static std::uint32_t SerializeString(const std::string &val, void *buffer)
static std::uint32_t DeserializeUInt16(const void *buffer, std::uint16_t &val)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeUInt64(std::uint64_t val, void *buffer)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:41
Base class for a sink with a physical storage backend.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:53
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:44
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:23
Base class for all ROOT issued exceptions.
Definition RError.hxx:79
The on-storage metadata of an RNTuple.
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
RNTupleLocator payload that is common for object stores using 64bit location information.
std::uint64_t GetLocation() const
Generic information about the physical location of data.
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:198
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition daos.h:50
@ OC_SX
Definition daos.h:129
uint16_t daos_oclass_id_t
Definition daos.h:135
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Definition RDaos.hxx:166
Describes a read/write operation on multiple attribute keys under the same object ID and distribution...
Definition RDaos.hxx:190
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint64_t fVersionAnchor
Allows for evolving the struct in future versions.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint16_t fVersionEpoch
Version of the binary format supported by the writer.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition RDaos.hxx:108
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:152
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:51
Summarizes cluster-level information that are necessary to load a certain page.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
Information about a single page in the context of a cluster's page range.
iovec for memory buffer
Definition daos.h:37
uint64_t hi
Definition daos.h:147
uint64_t lo
Definition daos.h:146
TMarker m
Definition textangle.C:8