Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
3/// \date 2020-11-03
4/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
5/// is welcome!
6
7/*************************************************************************
8 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
9 * All rights reserved. *
10 * *
11 * For the licensing terms see $ROOTSYS/LICENSE. *
12 * For the list of contributors see $ROOTSYS/README/CREDITS. *
13 *************************************************************************/
14
15#include <ROOT/RCluster.hxx>
16#include <ROOT/RLogger.hxx>
18#include <ROOT/RNTupleModel.hxx>
21#include <ROOT/RNTupleTypes.hxx>
22#include <ROOT/RNTupleUtils.hxx>
23#include <ROOT/RNTupleZip.hxx>
24#include <ROOT/RPage.hxx>
26#include <ROOT/RPagePool.hxx>
27#include <ROOT/RDaos.hxx>
29
30#include <RVersion.h>
31#include <TError.h>
32
33#include <algorithm>
34#include <cstdio>
35#include <cstdlib>
36#include <cstring>
37#include <limits>
38#include <utility>
39#include <regex>
40#include <cassert>
41
42namespace {
51
52/// \brief RNTuple page-DAOS mappings
53enum EDaosMapping {
54 kOidPerCluster,
55 kOidPerPage
56};
57
58struct RDaosKey {
59 daos_obj_id_t fOid;
60 DistributionKey_t fDkey;
61 AttributeKey_t fAkey;
62};
63
64/// \brief Pre-defined keys for object store. `kDistributionKeyDefault` is the distribution key for metadata and
65/// pagelist values; optionally it can be used for ntuple pages (if under the `kOidPerPage` mapping strategy).
66/// `kAttributeKeyDefault` is the attribute key for ntuple pages under `kOidPerPage`.
67/// `kAttributeKey{Anchor,Header,Footer}` are the respective attribute keys for anchor/header/footer metadata elements.
68static constexpr DistributionKey_t kDistributionKeyDefault = 0x5a3c69f0cafe4a11;
69static constexpr AttributeKey_t kAttributeKeyDefault = 0x4243544b53444229;
70static constexpr AttributeKey_t kAttributeKeyAnchor = 0x4243544b5344422a;
71static constexpr AttributeKey_t kAttributeKeyHeader = 0x4243544b5344422b;
72static constexpr AttributeKey_t kAttributeKeyFooter = 0x4243544b5344422c;
73
74/// \brief Pre-defined 64 LSb of the OIDs for ntuple metadata (holds anchor/header/footer) and clusters' pagelists.
75static constexpr decltype(daos_obj_id_t::lo) kOidLowMetadata = -1;
76static constexpr decltype(daos_obj_id_t::lo) kOidLowPageList = -2;
77
78static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
79
80static constexpr EDaosMapping kDefaultDaosMapping = kOidPerCluster;
81
82template <EDaosMapping mapping>
83RDaosKey GetPageDaosKey(ROOT::Experimental::Internal::ntuple_index_t ntplId, long unsigned clusterId,
84 long unsigned columnId, long unsigned pageCount)
85{
86 if constexpr (mapping == kOidPerCluster) {
87 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(clusterId),
88 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
89 static_cast<DistributionKey_t>(columnId), static_cast<AttributeKey_t>(pageCount)};
90 } else if constexpr (mapping == kOidPerPage) {
91 return RDaosKey{daos_obj_id_t{static_cast<decltype(daos_obj_id_t::lo)>(pageCount),
92 static_cast<decltype(daos_obj_id_t::hi)>(ntplId)},
93 kDistributionKeyDefault, kAttributeKeyDefault};
94 }
95}
96
97struct RDaosURI {
98 /// \brief Label of the DAOS pool
99 std::string fPoolLabel;
100 /// \brief Label of the container for this RNTuple
101 std::string fContainerLabel;
102};
103
104/**
105 \brief Parse a DAOS RNTuple URI of the form 'daos://pool_id/container_id'.
106*/
107RDaosURI ParseDaosURI(std::string_view uri)
108{
109 std::regex re("daos://([^/]+)/(.+)");
110 std::cmatch m;
111 if (!std::regex_match(uri.data(), m, re))
112 throw ROOT::RException(R__FAIL("Invalid DAOS pool URI."));
113 return {m[1], m[2]};
114}
115
116/// \brief Helper structure concentrating the functionality required to locate an ntuple within a DAOS container.
117/// It includes a hashing function that converts the RNTuple's name into a 32-bit identifier; this value is used to
118/// index the subspace for the ntuple among all objects in the container. A zero-value hash value is reserved for
119/// storing any future metadata related to container-wide management; a zero-index ntuple is thus disallowed and
120/// remapped to "1". Once the index is computed, `InitNTupleDescriptorBuilder()` can be called to return a
121/// partially-filled builder with the ntuple's anchor, header and footer, lacking only pagelists. Upon that call,
122/// a copy of the anchor is stored in `fAnchor`.
123struct RDaosContainerNTupleLocator {
124 std::string fName{};
125 ntuple_index_t fIndex{};
126 std::optional<ROOT::Experimental::Internal::RDaosNTupleAnchor> fAnchor;
127 static const ntuple_index_t kReservedIndex = 0;
128
129 RDaosContainerNTupleLocator() = default;
130 explicit RDaosContainerNTupleLocator(const std::string &ntupleName) : fName(ntupleName), fIndex(Hash(ntupleName)) {}
131
132 bool IsValid() { return fAnchor.has_value() && fAnchor->fNBytesHeader; }
133 [[nodiscard]] ntuple_index_t GetIndex() const { return fIndex; };
134 static ntuple_index_t Hash(const std::string &ntupleName)
135 {
136 // Convert string to numeric representation via `std::hash`.
137 uint64_t h = std::hash<std::string>{}(ntupleName);
138 // Fold the hash into 32-bit using `boost::hash_combine()` algorithm and magic number.
139 auto seed = static_cast<uint32_t>(h >> 32);
140 seed ^= static_cast<uint32_t>(h & 0xffffffff) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
141 auto hash = static_cast<ntuple_index_t>(seed);
142 return (hash == kReservedIndex) ? kReservedIndex + 1 : hash;
143 }
144
145 int InitNTupleDescriptorBuilder(ROOT::Experimental::Internal::RDaosContainer &cont,
146 ROOT::Internal::RNTupleDescriptorBuilder &builder)
147 {
148 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
149 auto &anchor = fAnchor.emplace();
150 int err;
151
153 daos_obj_id_t oidMetadata{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(this->GetIndex())};
154
155 buffer = MakeUninitArray<unsigned char>(anchorSize);
156 if ((err = cont.ReadSingleAkey(buffer.get(), anchorSize, oidMetadata, kDistributionKeyDefault,
157 kAttributeKeyAnchor, kCidMetadata))) {
158 return err;
159 }
160
161 anchor.Deserialize(buffer.get(), anchorSize).Unwrap();
162
163 builder.SetVersion(anchor.fVersionEpoch, anchor.fVersionMajor, anchor.fVersionMinor, anchor.fVersionPatch);
164 builder.SetOnDiskHeaderSize(anchor.fNBytesHeader);
165 buffer = MakeUninitArray<unsigned char>(anchor.fLenHeader);
166 zipBuffer = MakeUninitArray<unsigned char>(anchor.fNBytesHeader);
167 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesHeader, oidMetadata, kDistributionKeyDefault,
168 kAttributeKeyHeader, kCidMetadata)))
169 return err;
170 RNTupleDecompressor::Unzip(zipBuffer.get(), anchor.fNBytesHeader, anchor.fLenHeader, buffer.get());
171 RNTupleSerializer::DeserializeHeader(buffer.get(), anchor.fLenHeader, builder);
172
173 builder.AddToOnDiskFooterSize(anchor.fNBytesFooter);
174 buffer = MakeUninitArray<unsigned char>(anchor.fLenFooter);
175 zipBuffer = MakeUninitArray<unsigned char>(anchor.fNBytesFooter);
176 if ((err = cont.ReadSingleAkey(zipBuffer.get(), anchor.fNBytesFooter, oidMetadata, kDistributionKeyDefault,
177 kAttributeKeyFooter, kCidMetadata)))
178 return err;
179 RNTupleDecompressor::Unzip(zipBuffer.get(), anchor.fNBytesFooter, anchor.fLenFooter, buffer.get());
180 RNTupleSerializer::DeserializeFooter(buffer.get(), anchor.fLenFooter, builder);
181
182 return 0;
183 }
184
185 static std::pair<RDaosContainerNTupleLocator, ROOT::Internal::RNTupleDescriptorBuilder>
186 LocateNTuple(ROOT::Experimental::Internal::RDaosContainer &cont, const std::string &ntupleName)
187 {
188 auto result = std::make_pair(RDaosContainerNTupleLocator(ntupleName), ROOT::Internal::RNTupleDescriptorBuilder());
189
190 auto &loc = result.first;
191 auto &builder = result.second;
192
193 if (int err = loc.InitNTupleDescriptorBuilder(cont, builder); !err) {
194 if (ntupleName.empty() || ntupleName != builder.GetDescriptor().GetName()) {
195 // Hash already taken by a differently-named ntuple.
196 throw ROOT::RException(
197 R__FAIL("LocateNTuple: ntuple name '" + ntupleName + "' unavailable in this container."));
198 }
199 }
200 return result;
201 }
202};
203
204} // anonymous namespace
205
206////////////////////////////////////////////////////////////////////////////////
207
209{
210 if (buffer != nullptr) {
211 auto bytes = reinterpret_cast<unsigned char *>(buffer);
222 }
223 return RNTupleSerializer::SerializeString(fObjClass, nullptr) + 32;
224}
225
227ROOT::Experimental::Internal::RDaosNTupleAnchor::Deserialize(const void *buffer, std::uint32_t bufSize)
228{
229 if (bufSize < 32)
230 return R__FAIL("DAOS anchor too short");
231
232 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
235 return R__FAIL("unsupported DAOS anchor version: " + std::to_string(fVersionAnchor));
236 }
237
246 auto result = RNTupleSerializer::DeserializeString(bytes, bufSize - 32, fObjClass);
247 if (!result)
248 return R__FORWARD_ERROR(result);
249 return result.Unwrap() + 32;
250}
251
256
257////////////////////////////////////////////////////////////////////////////////
258
259ROOT::Experimental::Internal::RPageSinkDaos::RPageSinkDaos(std::string_view ntupleName, std::string_view uri,
260 const ROOT::RNTupleWriteOptions &options)
261 : RPagePersistentSink(ntupleName, options), fURI(uri)
262{
263 static std::once_flag once;
264 std::call_once(once, []() {
265 R__LOG_WARNING(ROOT::Internal::NTupleLog()) << "The DAOS backend is experimental and still under development. "
266 << "Do not store real data with this version of RNTuple!";
267 });
268 EnableDefaultMetrics("RPageSinkDaos");
269}
270
272
273void ROOT::Experimental::Internal::RPageSinkDaos::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
274{
275 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
276 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
277 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
278 if (oclass.IsUnknown())
279 throw ROOT::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
280
281 auto args = ParseDaosURI(fURI);
282 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
283
284 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel, /*create =*/true);
285 fDaosContainer->SetDefaultObjectClass(oclass);
286
287 auto [locator, _] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
288 fNTupleIndex = locator.GetIndex();
289
290 auto zipBuffer = MakeUninitArray<unsigned char>(length);
291 auto szZipHeader =
292 RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get());
293 WriteNTupleHeader(zipBuffer.get(), szZipHeader, length);
294}
295
297 const ROOT::Internal::RPage &page)
298{
299 auto element = columnHandle.fColumn->GetElement();
300 RPageStorage::RSealedPage sealedPage;
301 {
302 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
303 sealedPage = SealPage(page, *element);
304 }
305
306 fCounters->fSzZip.Add(page.GetNBytes());
307 return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage);
308}
309
312 const RPageStorage::RSealedPage &sealedPage)
313{
314 auto pageId = fPageId.fetch_add(1);
315 ROOT::DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
316
317 {
318 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
319 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId, pageId);
320 fDaosContainer->WriteSingleAkey(sealedPage.GetBuffer(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
321 daosKey.fAkey);
322 }
323
324 RNTupleLocator result;
326 result.SetNBytesOnStorage(sealedPage.GetDataSize());
328 fCounters->fNPageCommitted.Inc();
329 fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
330 fNBytesCurrentCluster += sealedPage.GetBufferSize();
331 return result;
332}
333
334std::vector<ROOT::RNTupleLocator>
335ROOT::Experimental::Internal::RPageSinkDaos::CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges,
336 const std::vector<bool> &mask)
337{
339 std::vector<RNTupleLocator> locators;
340 auto nPages = mask.size();
341 locators.reserve(nPages);
342
343 ROOT::DescriptorId_t clusterId = fDescriptorBuilder.GetDescriptor().GetNActiveClusters();
344 int64_t payloadSz = 0;
345
346 /// Aggregate batch of requests by object ID and distribution key, determined by the ntuple-DAOS mapping
347 for (auto &range : ranges) {
348 for (auto sealedPageIt = range.fFirst; sealedPageIt != range.fLast; ++sealedPageIt) {
349 const RPageStorage::RSealedPage &s = *sealedPageIt;
350
351 const auto pageId = fPageId.fetch_add(1);
352
353 d_iov_t pageIov;
354 d_iov_set(&pageIov, const_cast<void *>(s.GetBuffer()), s.GetBufferSize());
355
356 RDaosKey daosKey =
357 GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, range.fPhysicalColumnId, pageId);
358 auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
359 auto [it, ret] = writeRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
360 it->second.Insert(daosKey.fAkey, pageIov);
361
362 RNTupleLocator locator;
364 locator.SetNBytesOnStorage(s.GetDataSize());
366 locators.push_back(locator);
367
368 payloadSz += s.GetBufferSize();
369 }
370 }
371 fNBytesCurrentCluster += payloadSz;
372
373 {
374 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
375 if (int err = fDaosContainer->WriteV(writeRequests))
376 throw ROOT::RException(R__FAIL("WriteV: error" + std::string(d_errstr(err))));
377 }
378
379 fCounters->fNPageCommitted.Add(nPages);
380 fCounters->fSzWritePayload.Add(payloadSz);
381
382 return locators;
383}
384
386{
387 return std::exchange(fNBytesCurrentCluster, 0);
388}
389
392 std::uint32_t length)
393{
394 auto bufPageListZip = MakeUninitArray<unsigned char>(length);
395 auto szPageListZip =
396 RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get());
397
398 auto offsetData = fClusterGroupId.fetch_add(1);
399 fDaosContainer->WriteSingleAkey(
400 bufPageListZip.get(), szPageListZip,
401 daos_obj_id_t{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)}, kDistributionKeyDefault,
402 offsetData, kCidMetadata);
403 RNTupleLocator result;
405 result.SetNBytesOnStorage(szPageListZip);
406 result.SetPosition(RNTupleLocatorObject64{offsetData});
407 fCounters->fSzWritePayload.Add(static_cast<int64_t>(szPageListZip));
408 return result;
409}
410
412ROOT::Experimental::Internal::RPageSinkDaos::CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)
413{
414 auto bufFooterZip = MakeUninitArray<unsigned char>(length);
415 auto szFooterZip =
416 RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get());
417 WriteNTupleFooter(bufFooterZip.get(), szFooterZip, length);
419
420 // TODO: return the proper anchor locator+length
421 return {};
422}
423
424void ROOT::Experimental::Internal::RPageSinkDaos::WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
425{
426 fDaosContainer->WriteSingleAkey(
427 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
428 kDistributionKeyDefault, kAttributeKeyHeader, kCidMetadata);
429 fNTupleAnchor.fLenHeader = lenHeader;
430 fNTupleAnchor.fNBytesHeader = nbytes;
431}
432
433void ROOT::Experimental::Internal::RPageSinkDaos::WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
434{
435 fDaosContainer->WriteSingleAkey(
436 data, nbytes, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
437 kDistributionKeyDefault, kAttributeKeyFooter, kCidMetadata);
438 fNTupleAnchor.fLenFooter = lenFooter;
439 fNTupleAnchor.fNBytesFooter = nbytes;
440}
441
443{
444 const auto ntplSize = RDaosNTupleAnchor::GetSize();
445 auto buffer = MakeUninitArray<unsigned char>(ntplSize);
446 fNTupleAnchor.Serialize(buffer.get());
447 fDaosContainer->WriteSingleAkey(
448 buffer.get(), ntplSize, daos_obj_id_t{kOidLowMetadata, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)},
449 kDistributionKeyDefault, kAttributeKeyAnchor, kCidMetadata);
450}
451
452std::unique_ptr<ROOT::Internal::RPageSink>
454 const ROOT::RNTupleWriteOptions & /*opts*/) const
455{
456 throw ROOT::RException(R__FAIL("cloning a DAOS sink is not implemented yet"));
457}
458
459////////////////////////////////////////////////////////////////////////////////
460
461ROOT::Experimental::Internal::RPageSourceDaos::RPageSourceDaos(std::string_view ntupleName, std::string_view uri,
462 const ROOT::RNTupleReadOptions &options)
463 : RPageSource(ntupleName, options), fURI(uri)
464{
465 EnableDefaultMetrics("RPageSourceDaos");
466
467 auto args = ParseDaosURI(uri);
468 auto pool = std::make_shared<RDaosPool>(args.fPoolLabel);
469 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerLabel);
470}
471
476
479{
481 std::unique_ptr<unsigned char[]> buffer, zipBuffer;
482
483 auto [locator, descBuilder] = RDaosContainerNTupleLocator::LocateNTuple(*fDaosContainer, fNTupleName);
484 if (!locator.IsValid())
485 throw ROOT::RException(
486 R__FAIL("Attach: requested ntuple '" + fNTupleName + "' is not present in DAOS container."));
487
488 auto oclass = RDaosObject::ObjClassId(locator.fAnchor->fObjClass);
489 if (oclass.IsUnknown())
490 throw ROOT::RException(R__FAIL("Attach: unknown object class " + locator.fAnchor->fObjClass));
491
492 fDaosContainer->SetDefaultObjectClass(oclass);
493 fNTupleIndex = locator.GetIndex();
494 daos_obj_id_t oidPageList{kOidLowPageList, static_cast<decltype(daos_obj_id_t::hi)>(fNTupleIndex)};
495
496 auto desc = descBuilder.MoveDescriptor();
497
498 for (const auto &cgDesc : desc.GetClusterGroupIterable()) {
499 buffer = MakeUninitArray<unsigned char>(cgDesc.GetPageListLength());
500 zipBuffer = MakeUninitArray<unsigned char>(cgDesc.GetPageListLocator().GetNBytesOnStorage());
501 fDaosContainer->ReadSingleAkey(
502 zipBuffer.get(), cgDesc.GetPageListLocator().GetNBytesOnStorage(), oidPageList, kDistributionKeyDefault,
503 cgDesc.GetPageListLocator().GetPosition<RNTupleLocatorObject64>().GetLocation(), kCidMetadata);
504 RNTupleDecompressor::Unzip(zipBuffer.get(), cgDesc.GetPageListLocator().GetNBytesOnStorage(),
505 cgDesc.GetPageListLength(), buffer.get());
506
507 RNTupleSerializer::DeserializePageList(buffer.get(), cgDesc.GetPageListLength(), cgDesc.GetId(), desc, mode);
508 }
509
510 return desc;
511}
512
514{
515 return fDaosContainer->GetDefaultObjectClass().ToString();
516}
517
519 RNTupleLocalIndex localIndex,
520 RSealedPage &sealedPage)
521{
522 const auto clusterId = localIndex.GetClusterId();
523
525 {
526 auto descriptorGuard = GetSharedDescriptorGuard();
527 const auto &clusterDescriptor = descriptorGuard->GetClusterDescriptor(clusterId);
528 pageInfo = clusterDescriptor.GetPageRange(physicalColumnId).Find(localIndex.GetIndexInCluster());
529 }
530
531 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
532 sealedPage.SetNElements(pageInfo.GetNElements());
533 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
534 if (!sealedPage.GetBuffer())
535 return;
536
538 assert(!pageInfo.HasChecksum());
539 memcpy(const_cast<void *>(sealedPage.GetBuffer()), ROOT::Internal::RPage::GetPageZeroBuffer(),
540 sealedPage.GetBufferSize());
541 return;
542 }
543
544 RDaosKey daosKey =
545 GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, clusterId, physicalColumnId,
547 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.GetBuffer()), sealedPage.GetBufferSize(), daosKey.fOid,
548 daosKey.fDkey, daosKey.fAkey);
549
551}
552
554 const RClusterInfo &clusterInfo,
555 ROOT::NTupleSize_t idxInCluster)
556{
557 const auto columnId = columnHandle.fPhysicalId;
558 const auto clusterId = clusterInfo.fClusterId;
559 const auto &pageInfo = clusterInfo.fPageInfo;
560
561 const auto element = columnHandle.fColumn->GetElement();
562 const auto elementSize = element->GetSize();
563 const auto elementInMemoryType = element->GetIdentifier().fInMemoryType;
564
565 if (pageInfo.GetLocator().GetType() == RNTupleLocator::kTypePageZero) {
566 auto pageZero = fPageAllocator->NewPage(elementSize, pageInfo.GetNElements());
567 pageZero.GrowUnchecked(pageInfo.GetNElements());
568 memset(pageZero.GetBuffer(), 0, pageZero.GetNBytes());
569 pageZero.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
570 ROOT::Internal::RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
571 return fPagePool.RegisterPage(std::move(pageZero),
572 ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType});
573 }
574
575 RSealedPage sealedPage;
576 sealedPage.SetNElements(pageInfo.GetNElements());
577 sealedPage.SetHasChecksum(pageInfo.HasChecksum());
578 sealedPage.SetBufferSize(pageInfo.GetLocator().GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum);
579 std::unique_ptr<unsigned char[]> directReadBuffer; // only used if cluster pool is turned off
580
582 directReadBuffer = MakeUninitArray<unsigned char>(sealedPage.GetBufferSize());
583 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(
584 fNTupleIndex, clusterId, columnId, pageInfo.GetLocator().GetPosition<RNTupleLocatorObject64>().GetLocation());
585 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), sealedPage.GetBufferSize(), daosKey.fOid, daosKey.fDkey,
586 daosKey.fAkey);
587 fCounters->fNPageRead.Inc();
588 fCounters->fNRead.Inc();
589 fCounters->fSzReadPayload.Add(sealedPage.GetBufferSize());
590 sealedPage.SetBuffer(directReadBuffer.get());
591 } else {
592 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
593 fCurrentCluster = fClusterPool.GetCluster(clusterId, fActivePhysicalColumns.ToColumnSet());
594 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
595
596 auto cachedPageRef = fPagePool.GetPage(ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType},
597 RNTupleLocalIndex(clusterId, idxInCluster));
598 if (!cachedPageRef.Get().IsNull())
599 return cachedPageRef;
600
601 ROOT::Internal::ROnDiskPage::Key key(columnId, pageInfo.GetPageNumber());
602 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
603 R__ASSERT(onDiskPage && (sealedPage.GetBufferSize() == onDiskPage->GetSize()));
604 sealedPage.SetBuffer(onDiskPage->GetAddress());
605 }
606
607 ROOT::Internal::RPage newPage;
608 {
609 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
610 newPage = UnsealPage(sealedPage, *element).Unwrap();
611 fCounters->fSzUnzip.Add(elementSize * pageInfo.GetNElements());
612 }
613
614 newPage.SetWindow(clusterInfo.fColumnOffset + pageInfo.GetFirstElementIndex(),
615 ROOT::Internal::RPage::RClusterInfo(clusterId, clusterInfo.fColumnOffset));
616 fCounters->fNPageUnsealed.Inc();
617 return fPagePool.RegisterPage(std::move(newPage), ROOT::Internal::RPagePool::RKey{columnId, elementInMemoryType});
618}
619
620std::unique_ptr<ROOT::Internal::RPageSource> ROOT::Experimental::Internal::RPageSourceDaos::CloneImpl() const
621{
622 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
623 return std::unique_ptr<RPageSourceDaos>(clone);
624}
625
626std::vector<std::unique_ptr<RCluster>>
628{
629 struct RDaosSealedPageLocator {
630 ROOT::DescriptorId_t fClusterId = 0;
631 ROOT::DescriptorId_t fColumnId = 0;
632 ROOT::NTupleSize_t fPageNo = 0;
633 std::uint64_t fPageId = 0;
634 std::uint64_t fDataSize = 0; // page payload
635 std::uint64_t fBufferSize = 0; // page payload + checksum (if available)
636 };
637
638 // Prepares read requests for a single cluster; `readRequests` is modified by this function. Requests are coalesced
639 // by OID and distribution key.
640 // TODO(jalopezg): this may be a private member function; that, however, requires additional changes given that
641 // `RDaosContainer::MultiObjectRWOperation_t` cannot be forward-declared
642 auto fnPrepareSingleCluster = [&](const RCluster::RKey &clusterKey,
644 auto clusterId = clusterKey.fClusterId;
645 std::vector<RDaosSealedPageLocator> onDiskPages;
646
647 unsigned clusterBufSz = 0, nPages = 0;
648 auto pageZeroMap = std::make_unique<ROOT::Internal::ROnDiskPageMap>();
650 clusterKey, *pageZeroMap,
651 [&](ROOT::DescriptorId_t physicalColumnId, ROOT::NTupleSize_t pageNo,
652 const ROOT::RClusterDescriptor::RPageInfo &pageInfo) {
653 const auto &pageLocator = pageInfo.GetLocator();
654 const auto pageId = pageLocator.GetPosition<RNTupleLocatorObject64>().GetLocation();
655 const auto pageBufferSize = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
656 onDiskPages.emplace_back(RDaosSealedPageLocator{clusterId, physicalColumnId, pageNo, pageId,
657 pageLocator.GetNBytesOnStorage(), pageBufferSize});
658
659 ++nPages;
660 clusterBufSz += pageBufferSize;
661 });
662
663 auto clusterBuffer = new unsigned char[clusterBufSz];
664 auto pageMap =
665 std::make_unique<ROOT::Internal::ROnDiskPageMapHeap>(std::unique_ptr<unsigned char[]>(clusterBuffer));
666
667 // Fill the cluster page map and the read requests for the RDaosContainer::ReadV() call
668 for (const auto &sealedLoc : onDiskPages) {
669 ROOT::Internal::ROnDiskPage::Key key(sealedLoc.fColumnId, sealedLoc.fPageNo);
670 pageMap->Register(key, ROOT::Internal::ROnDiskPage(clusterBuffer, sealedLoc.fBufferSize));
671
672 // Prepare new read request batched up by object ID and distribution key
673 d_iov_t iov;
674 d_iov_set(&iov, clusterBuffer, sealedLoc.fBufferSize);
675
676 RDaosKey daosKey = GetPageDaosKey<kDefaultDaosMapping>(fNTupleIndex, sealedLoc.fClusterId, sealedLoc.fColumnId,
677 sealedLoc.fPageId);
678 auto odPair = RDaosContainer::ROidDkeyPair{daosKey.fOid, daosKey.fDkey};
679 auto [itReq, ret] = readRequests.emplace(odPair, RDaosContainer::RWOperation(odPair));
680 itReq->second.Insert(daosKey.fAkey, iov);
681
682 clusterBuffer += sealedLoc.fBufferSize;
683 }
684 fCounters->fNPageRead.Add(nPages);
685 fCounters->fSzReadPayload.Add(clusterBufSz);
686
687 auto cluster = std::make_unique<RCluster>(clusterId);
688 cluster->Adopt(std::move(pageMap));
689 cluster->Adopt(std::move(pageZeroMap));
690 for (auto colId : clusterKey.fPhysicalColumnSet)
691 cluster->SetColumnAvailable(colId);
692 return cluster;
693 };
694
695 fCounters->fNClusterLoaded.Add(clusterKeys.size());
696
697 std::vector<std::unique_ptr<ROOT::Internal::RCluster>> clusters;
699 for (auto key : clusterKeys) {
700 clusters.emplace_back(fnPrepareSingleCluster(key, readRequests));
701 }
702
703 {
704 Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
705 if (int err = fDaosContainer->ReadV(readRequests))
706 throw ROOT::RException(R__FAIL("ReadV: error" + std::string(d_errstr(err))));
707 }
708 fCounters->fNReadV.Inc();
709 fCounters->fNRead.Add(readRequests.size());
710
711 return clusters;
712}
713
715{
716 R__LOG_WARNING(ROOT::Internal::NTupleLog()) << "DAOS-backed sources have no associated StreamerInfo to load.";
717}
718
719std::unique_ptr<ROOT::Internal::RPageSource>
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking).
Definition RError.hxx:303
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:299
#define R__LOG_WARNING(...)
Definition RLogger.hxx:357
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
#define h(i)
Definition RSha256.hxx:106
char * ret
Definition Rotated.cxx:221
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Double_t err
UInt_t Hash(const TString &s)
Definition TString.h:503
#define _(A, B)
Definition cfortran.h:108
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
static RResult< void > DeserializeFooter(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
static RResult< void > DeserializeHeader(const void *buffer, std::uint64_t bufSize, ROOT::Internal::RNTupleDescriptorBuilder &descBuilder)
RDaosObject::DistributionKey_t DistributionKey_t
Definition RDaos.hxx:159
std::unordered_map< ROidDkeyPair, RWOperation, ROidDkeyPair::Hash > MultiObjectRWOperation_t
Definition RDaos.hxx:230
int ReadSingleAkey(void *buffer, std::size_t length, daos_obj_id_t oid, DistributionKey_t dkey, AttributeKey_t akey, ObjClassId_t cid)
Read data from a single object attribute key to the given buffer.
Definition RDaos.cxx:210
RDaosObject::AttributeKey_t AttributeKey_t
Definition RDaos.hxx:160
std::unique_ptr< ROOT::Internal::RPageSink > CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const final
Creates a new sink with the same underlying storage as this but writing to a different RNTuple named ...
std::unique_ptr< RDaosContainer > fDaosContainer
Underlying DAOS container.
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final
std::uint64_t fNBytesCurrentCluster
Tracks the number of bytes committed to the current cluster.
std::vector< RNTupleLocator > CommitSealedPageVImpl(std::span< RPageStorage::RSealedPageGroup > ranges, const std::vector< bool > &mask) final
Vector commit of preprocessed pages.
std::string fURI
A URI to a DAOS pool of the form 'daos://pool-label/container-label'.
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
std::uint64_t StageClusterImpl() final
Returns the number of bytes written to storage (excluding metadata).
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final
Returns the locator of the page list envelope of the given buffer that contains the serialized page l...
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final
std::atomic< std::uint64_t > fPageId
Page identifier for the next committed page; it is automatically incremented in CommitSealedPageImpl(...
std::atomic< std::uint64_t > fClusterGroupId
Cluster group counter for the next committed cluster pagelist; incremented in CommitClusterGroupImpl(...
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options)
RNTupleLocator CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final
void LoadStreamerInfo() final
Forces the loading of ROOT StreamerInfo from the underlying file.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::unique_ptr< RPageSource > CloneImpl() const final
The cloned page source creates a new connection to the pool/container.
ROOT::Internal::RPageRef LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster) final
std::vector< std::unique_ptr< ROOT::Internal::RCluster > > LoadClusters(std::span< ROOT::Internal::RCluster::RKey > clusterKeys) final
std::string fURI
A URI to a DAOS pool of the form 'daos://pool-label/container-label'.
std::unique_ptr< RPageSource > OpenWithDifferentAnchor(const ROOT::Internal::RNTupleLink &anchorLink, const ROOT::RNTupleReadOptions &options={}) final
Creates a new PageSource using the same underlying file as this but referring to a different RNTuple,...
void LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by sealedPage.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.).
ROOT::Internal::RCluster * fCurrentCluster
The last cluster from which a page got loaded. Points into fClusterPool->fPool.
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleReadOptions &options)
ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode) final
LoadStructureImpl() has been called before AttachImpl() is called
DAOS-specific user-tunable settings for storing ntuples.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:147
ROOT::Internal::RColumnElementBase * GetElement() const
Definition RColumn.hxx:338
Helper class to compress data blocks in the ROOT compression frame format.
static std::size_t Zip(const void *from, std::size_t nbytes, int compression, void *to)
Returns the size of the compressed data, written into the provided output buffer.
Helper class to uncompress data blocks in the ROOT compression frame format.
static void Unzip(const void *from, size_t nbytes, size_t dataLen, void *to)
The nbytes parameter provides the size ls of the from buffer.
void SetVersion(std::uint16_t versionEpoch, std::uint16_t versionMajor, std::uint16_t versionMinor, std::uint16_t versionPatch)
const RNTupleDescriptor & GetDescriptor() const
void AddToOnDiskFooterSize(std::uint64_t size)
The real footer size also include the page list envelopes.
A helper class for serializing and deserialization of the RNTuple binary format.
static RResult< std::uint32_t > DeserializeString(const void *buffer, std::uint64_t bufSize, std::string &val)
static std::uint32_t SerializeUInt32(std::uint32_t val, void *buffer)
static std::uint32_t DeserializeUInt32(const void *buffer, std::uint32_t &val)
static std::uint32_t SerializeUInt16(std::uint16_t val, void *buffer)
static RResult< void > DeserializePageList(const void *buffer, std::uint64_t bufSize, ROOT::DescriptorId_t clusterGroupId, RNTupleDescriptor &desc, EDescriptorDeserializeMode mode)
static std::uint32_t SerializeString(const std::string &val, void *buffer)
static std::uint32_t DeserializeUInt16(const void *buffer, std::uint16_t &val)
static std::uint32_t DeserializeUInt64(const void *buffer, std::uint64_t &val)
static std::uint32_t SerializeUInt64(std::uint64_t val, void *buffer)
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:40
ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder
RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options)
std::unique_ptr< RCounters > fCounters
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
Reference to a page stored in the page pool.
std::unique_ptr< ROOT::RNTupleWriteOptions > fOptions
const ROOT::RNTupleWriteOptions & GetWriteOptions() const
Returns the sink's write options.
RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element)
Helper for streaming a page.
ROOT::Internal::RClusterPool fClusterPool
The cluster pool asynchronously preloads the next few clusters.
ROOT::RNTupleReadOptions fOptions
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
Takes the read lock for the descriptor.
void PrepareLoadCluster(const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap, std::function< void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)> perPageFunc)
Prepare a page range read for the column set in clusterKey.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RCounters > fCounters
ROOT::Internal::RPagePool fPagePool
Pages that are unzipped with IMT are staged into the page pool.
RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions)
RActivePhysicalColumns fActivePhysicalColumns
The active columns are implicitly defined by the model fields or views.
static RResult< ROOT::Internal::RPage > UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element, ROOT::Internal::RPageAllocator &pageAlloc)
Helper for unstreaming a page.
std::unique_ptr< ROOT::Internal::RPageAllocator > fPageAllocator
For the time being, we will use the heap allocator for all sources and sinks. This may change in the ...
static constexpr std::size_t kNBytesPageChecksum
The page checksum is a 64bit xxhash3.
RColumnHandle ColumnHandle_t
The column handle identifies a column with the current open page storage.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:52
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:43
std::size_t GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:111
static const void * GetPageZeroBuffer()
Return a pointer to the page zero buffer used if there is no on-disk data for a particular deferred c...
Definition RPage.cxx:22
void SetWindow(const ROOT::NTupleSize_t rangeFirst, const RClusterInfo &clusterInfo)
Seek the page to a certain position of the column.
Definition RPage.hxx:157
Base class for all ROOT issued exceptions.
Definition RError.hxx:78
The on-storage metadata of an RNTuple.
const std::string & GetName() const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
ROOT::NTupleSize_t GetIndexInCluster() const
ROOT::DescriptorId_t GetClusterId() const
RNTupleLocator payload that is common for object stores using 64bit location information.
std::uint64_t GetLocation() const
Generic information about the physical location of data.
std::uint64_t GetNBytesOnStorage() const
ELocatorType GetType() const
For non-disk locators, the value for the Type field.
T GetPosition() const
Note that for GetPosition() / SetPosition(), the locator type must correspond (kTypeFile,...
void SetType(ELocatorType type)
void SetPosition(std::uint64_t position)
void SetNBytesOnStorage(std::uint64_t nBytesOnStorage)
Common user-tunable settings for reading RNTuples.
Common user-tunable settings for storing RNTuples.
void ThrowOnError()
Short-hand method to throw an exception in the case of errors.
Definition RError.hxx:289
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:197
const char * d_errstr(int rc)
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition daos.h:50
@ OC_SX
Definition daos.h:129
uint16_t daos_oclass_id_t
Definition daos.h:135
RNTupleTimer< RNTupleAtomicCounter, RNTupleTickCounter< RNTupleAtomicCounter > > RNTupleAtomicTimer
ROOT::RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::unique_ptr< T[]> MakeUninitArray(std::size_t size)
Make an array of default-initialized elements.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
A pair of <object ID, distribution key> that can be used to issue a fetch/update request for multiple...
Definition RDaos.hxx:165
Describes a read/write operation on multiple attribute keys under the same object ID and distribution...
Definition RDaos.hxx:189
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint64_t fVersionAnchor
Allows for evolving the struct in future versions.
std::string fObjClass
The object class for user data OIDs, e.g. SX.
std::uint16_t fVersionEpoch
Version of the binary format supported by the writer.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition RDaos.hxx:107
The identifiers that specifies the content of a (partial) cluster.
Definition RCluster.hxx:151
ROOT::DescriptorId_t fClusterId
Definition RCluster.hxx:152
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:50
Summarizes cluster-level information that are necessary to load a certain page.
std::uint64_t fColumnOffset
The first element number of the page's column in the given cluster.
ROOT::RClusterDescriptor::RPageInfoExtended fPageInfo
Location of the page on disk.
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RResult< void > VerifyChecksumIfEnabled() const
void SetNElements(std::uint32_t nElements)
void SetBufferSize(std::size_t bufferSize)
Information about a single page in the context of a cluster's page range.
const RNTupleLocator & GetLocator() const
iovec for memory buffer
Definition daos.h:37
uint64_t hi
Definition daos.h:147
uint64_t lo
Definition daos.h:146
TMarker m
Definition textangle.C:8