Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RPageStorageDaos.cxx
Go to the documentation of this file.
1/// \file RPageStorageDaos.cxx
2/// \ingroup NTuple ROOT7
3/// \author Javier Lopez-Gomez <j.lopez@cern.ch>
4/// \date 2020-11-03
5/// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6/// is welcome!
7
8/*************************************************************************
9 * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers. *
10 * All rights reserved. *
11 * *
12 * For the licensing terms see $ROOTSYS/LICENSE. *
13 * For the list of contributors see $ROOTSYS/README/CREDITS. *
14 *************************************************************************/
15
16#include <ROOT/RCluster.hxx>
17#include <ROOT/RClusterPool.hxx>
18#include <ROOT/RField.hxx>
19#include <ROOT/RLogger.hxx>
21#include <ROOT/RNTupleModel.hxx>
23#include <ROOT/RNTupleUtil.hxx>
24#include <ROOT/RNTupleZip.hxx>
25#include <ROOT/RPage.hxx>
27#include <ROOT/RPagePool.hxx>
28#include <ROOT/RDaos.hxx>
30
31#include <RVersion.h>
32#include <TError.h>
33
34#include <algorithm>
35#include <cstdio>
36#include <cstdlib>
37#include <iostream>
38#include <limits>
39#include <utility>
40#include <regex>
41
42namespace {
43struct RDaosURI {
44 /// \brief UUID of the DAOS pool
45 std::string fPoolUuid;
46 /// \brief Ranks of the service replicas, separated by `_`
47 std::string fSvcReplicas;
48 /// \brief UUID of the container for this RNTuple
49 std::string fContainerUuid;
50};
51
52/**
53 \brief Parse a DAOS RNTuple URI of the form 'daos://pool-uuid:svc_replicas/container_uuid'.
54*/
55RDaosURI ParseDaosURI(std::string_view uri)
56{
57 std::regex re("daos://([[:xdigit:]-]+):([[:digit:]_]+)/([[:xdigit:]-]+)");
58 std::cmatch m;
59 if (!std::regex_match(uri.data(), m, re))
60 throw ROOT::Experimental::RException(R__FAIL("Invalid DAOS pool URI."));
61 return { m[1], m[2], m[3] };
62}
63
64/// \brief Some random distribution/attribute key. TODO: apply recommended schema, i.e.
65/// an OID for each cluster + a dkey for each page.
66static constexpr std::uint64_t kDistributionKey = 0x5a3c69f0cafe4a11;
67static constexpr std::uint64_t kAttributeKey = 0x4243544b5344422d;
68
69static constexpr daos_obj_id_t kOidAnchor{std::uint64_t(-1), 0};
70static constexpr daos_obj_id_t kOidHeader{std::uint64_t(-2), 0};
71static constexpr daos_obj_id_t kOidFooter{std::uint64_t(-3), 0};
72// The page list offset needs to be a positive 64bit integer because we currently use
73// the object ID for the offset in the locator
74// TODO(jblomer): use object store locators
75// TODO(jblomer): add support for multiple page list envelopes from multiple cluster groups
76static constexpr daos_obj_id_t kOidPageList{std::numeric_limits<int64_t>::max(), 0};
77
78static constexpr daos_oclass_id_t kCidMetadata = OC_SX;
79} // namespace
80
81
82////////////////////////////////////////////////////////////////////////////////
83
84
85std::uint32_t
87{
89 if (buffer != nullptr) {
90 auto bytes = reinterpret_cast<unsigned char *>(buffer);
91 bytes += RNTupleSerializer::SerializeUInt32(fVersion, bytes);
92 bytes += RNTupleSerializer::SerializeUInt32(fNBytesHeader, bytes);
93 bytes += RNTupleSerializer::SerializeUInt32(fLenHeader, bytes);
94 bytes += RNTupleSerializer::SerializeUInt32(fNBytesFooter, bytes);
95 bytes += RNTupleSerializer::SerializeUInt32(fLenFooter, bytes);
96 bytes += RNTupleSerializer::SerializeString(fObjClass, bytes);
97 }
98 return RNTupleSerializer::SerializeString(fObjClass, nullptr) + 20;
99}
100
102ROOT::Experimental::Detail::RDaosNTupleAnchor::Deserialize(const void *buffer, std::uint32_t bufSize)
103{
104 if (bufSize < 20)
105 return R__FAIL("DAOS anchor too short");
106
107 using RNTupleSerializer = ROOT::Experimental::Internal::RNTupleSerializer;
108 auto bytes = reinterpret_cast<const unsigned char *>(buffer);
109 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fVersion);
110 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fNBytesHeader);
111 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fLenHeader);
112 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fNBytesFooter);
113 bytes += RNTupleSerializer::DeserializeUInt32(bytes, fLenFooter);
114 auto result = RNTupleSerializer::DeserializeString(bytes, bufSize - 20, fObjClass);
115 if (!result)
116 return R__FORWARD_ERROR(result);
117 return result.Unwrap() + 20;
118}
119
120std::uint32_t
122{
123 return RDaosNTupleAnchor().Serialize(nullptr)
125}
126
127
128////////////////////////////////////////////////////////////////////////////////
129
130
131ROOT::Experimental::Detail::RPageSinkDaos::RPageSinkDaos(std::string_view ntupleName, std::string_view uri,
132 const RNTupleWriteOptions &options)
133 : RPageSink(ntupleName, options)
134 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
135 , fURI(uri)
136{
137 R__LOG_WARNING(NTupleLog()) << "The DAOS backend is experimental and still under development. " <<
138 "Do not store real data with this version of RNTuple!";
139 fCompressor = std::make_unique<RNTupleCompressor>();
140 EnableDefaultMetrics("RPageSinkDaos");
141}
142
143
145
146
148{
149 auto opts = dynamic_cast<RNTupleWriteOptionsDaos *>(fOptions.get());
150 fNTupleAnchor.fObjClass = opts ? opts->GetObjectClass() : RNTupleWriteOptionsDaos().GetObjectClass();
151 auto oclass = RDaosObject::ObjClassId(fNTupleAnchor.fObjClass);
152 if (oclass.IsUnknown())
153 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + fNTupleAnchor.fObjClass));
154
155 auto args = ParseDaosURI(fURI);
156 auto pool = std::make_shared<RDaosPool>(args.fPoolUuid, args.fSvcReplicas);
157 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerUuid, /*create =*/ true);
158 fDaosContainer->SetDefaultObjectClass(oclass);
159
160 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
161 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(nullptr, descriptor);
162 auto buffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
163 fSerializationContext = Internal::RNTupleSerializer::SerializeHeaderV1(buffer.get(), descriptor);
164
165 auto zipBuffer = std::make_unique<unsigned char[]>(fSerializationContext.GetHeaderSize());
166 auto szZipHeader =
167 fCompressor->Zip(buffer.get(), fSerializationContext.GetHeaderSize(), GetWriteOptions().GetCompression(),
168 [&zipBuffer](const void *b, size_t n, size_t o) { memcpy(zipBuffer.get() + o, b, n); });
169 WriteNTupleHeader(zipBuffer.get(), szZipHeader, fSerializationContext.GetHeaderSize());
170}
171
172
175{
176 auto element = columnHandle.fColumn->GetElement();
177 RPageStorage::RSealedPage sealedPage;
178 {
179 RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
180 sealedPage = SealPage(page, *element, GetWriteOptions().GetCompression());
181 }
182
183 fCounters->fSzZip.Add(page.GetNBytes());
184 return CommitSealedPageImpl(columnHandle.fId, sealedPage);
185}
186
187
190 DescriptorId_t /*columnId*/, const RPageStorage::RSealedPage &sealedPage)
191{
192 auto offsetData = fOid.fetch_add(1);
193 {
194 RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
195 fDaosContainer->WriteSingleAkey(sealedPage.fBuffer, sealedPage.fSize,
196 {offsetData, 0}, kDistributionKey, kAttributeKey);
197 }
198
199 RNTupleLocator result;
200 result.fPosition = offsetData;
201 result.fBytesOnStorage = sealedPage.fSize;
202 fCounters->fNPageCommitted.Inc();
203 fCounters->fSzWritePayload.Add(sealedPage.fSize);
204 fNBytesCurrentCluster += sealedPage.fSize;
205 return result;
206}
207
208
209std::uint64_t
211{
212 return std::exchange(fNBytesCurrentCluster, 0);
213}
214
215
217{
218 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
219
220 std::vector<DescriptorId_t> physClusterIDs;
221 for (const auto &c : descriptor.GetClusterIterable()) {
222 physClusterIDs.emplace_back(fSerializationContext.MapClusterId(c.GetId()));
223 }
224
225 auto szPageList =
226 Internal::RNTupleSerializer::SerializePageListV1(nullptr, descriptor, physClusterIDs, fSerializationContext);
227 auto bufPageList = std::make_unique<unsigned char[]>(szPageList);
228 Internal::RNTupleSerializer::SerializePageListV1(bufPageList.get(), descriptor, physClusterIDs,
229 fSerializationContext);
230
231 auto bufPageListZip = std::make_unique<unsigned char[]>(szPageList);
232 auto szPageListZip = fCompressor->Zip(bufPageList.get(), szPageList, GetWriteOptions().GetCompression(),
233 RNTupleCompressor::MakeMemCopyWriter(bufPageListZip.get()));
234 fDaosContainer->WriteSingleAkey(bufPageListZip.get(), szPageListZip, kOidPageList, kDistributionKey, kAttributeKey,
235 kCidMetadata);
236
238 pageListEnvelope.fUnzippedSize = szPageList;
239 pageListEnvelope.fLocator.fPosition = kOidPageList.lo;
240 pageListEnvelope.fLocator.fBytesOnStorage = szPageListZip;
241 fSerializationContext.AddClusterGroup(physClusterIDs.size(), pageListEnvelope);
242
243 auto szFooter = Internal::RNTupleSerializer::SerializeFooterV1(nullptr, descriptor, fSerializationContext);
244 auto bufFooter = std::make_unique<unsigned char[]>(szFooter);
245 Internal::RNTupleSerializer::SerializeFooterV1(bufFooter.get(), descriptor, fSerializationContext);
246
247 auto bufFooterZip = std::make_unique<unsigned char[]>(szFooter);
248 auto szFooterZip = fCompressor->Zip(bufFooter.get(), szFooter, GetWriteOptions().GetCompression(),
249 RNTupleCompressor::MakeMemCopyWriter(bufFooterZip.get()));
250 WriteNTupleFooter(bufFooterZip.get(), szFooterZip, szFooter);
251 WriteNTupleAnchor();
252}
253
254
256 const void *data, size_t nbytes, size_t lenHeader)
257{
258 fDaosContainer->WriteSingleAkey(data, nbytes, kOidHeader, kDistributionKey,
259 kAttributeKey, kCidMetadata);
260 fNTupleAnchor.fLenHeader = lenHeader;
261 fNTupleAnchor.fNBytesHeader = nbytes;
262}
263
265 const void *data, size_t nbytes, size_t lenFooter)
266{
267 fDaosContainer->WriteSingleAkey(data, nbytes, kOidFooter, kDistributionKey,
268 kAttributeKey, kCidMetadata);
269 fNTupleAnchor.fLenFooter = lenFooter;
270 fNTupleAnchor.fNBytesFooter = nbytes;
271}
272
274 const auto ntplSize = RDaosNTupleAnchor::GetSize();
275 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
276 fNTupleAnchor.Serialize(buffer.get());
277 fDaosContainer->WriteSingleAkey(buffer.get(), ntplSize, kOidAnchor, kDistributionKey,
278 kAttributeKey, kCidMetadata);
279}
280
283{
284 if (nElements == 0)
285 throw RException(R__FAIL("invalid call: request empty page"));
286 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
287 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
288}
289
291{
292 fPageAllocator->DeletePage(page);
293}
294
295
296////////////////////////////////////////////////////////////////////////////////
297
298
300 ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
301{
302 RPage newPage(columnId, mem, elementSize, nElements);
303 newPage.GrowUnchecked(nElements);
304 return newPage;
305}
306
308{
309 if (page.IsNull())
310 return;
311 delete[] reinterpret_cast<unsigned char *>(page.GetBuffer());
312}
313
314
315////////////////////////////////////////////////////////////////////////////////
316
317
318ROOT::Experimental::Detail::RPageSourceDaos::RPageSourceDaos(std::string_view ntupleName, std::string_view uri,
319 const RNTupleReadOptions &options)
320 : RPageSource(ntupleName, options)
321 , fPageAllocator(std::make_unique<RPageAllocatorDaos>())
322 , fPagePool(std::make_shared<RPagePool>())
323 , fURI(uri)
324 , fClusterPool(std::make_unique<RClusterPool>(*this))
325{
326 fDecompressor = std::make_unique<RNTupleDecompressor>();
327 EnableDefaultMetrics("RPageSourceDaos");
328
329 auto args = ParseDaosURI(uri);
330 auto pool = std::make_shared<RDaosPool>(args.fPoolUuid, args.fSvcReplicas);
331 fDaosContainer = std::make_unique<RDaosContainer>(pool, args.fContainerUuid);
332}
333
334
336
337
339{
340 RNTupleDescriptorBuilder descBuilder;
342 const auto ntplSize = RDaosNTupleAnchor::GetSize();
343 auto buffer = std::make_unique<unsigned char[]>(ntplSize);
344 fDaosContainer->ReadSingleAkey(buffer.get(), ntplSize, kOidAnchor, kDistributionKey,
345 kAttributeKey, kCidMetadata);
346 ntpl.Deserialize(buffer.get(), ntplSize).Unwrap();
347
348 auto oclass = RDaosObject::ObjClassId(ntpl.fObjClass);
349 if (oclass.IsUnknown())
350 throw ROOT::Experimental::RException(R__FAIL("Unknown object class " + ntpl.fObjClass));
351 fDaosContainer->SetDefaultObjectClass(oclass);
352
353 descBuilder.SetOnDiskHeaderSize(ntpl.fNBytesHeader);
354 buffer = std::make_unique<unsigned char[]>(ntpl.fLenHeader);
355 auto zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesHeader);
356 fDaosContainer->ReadSingleAkey(zipBuffer.get(), ntpl.fNBytesHeader, kOidHeader, kDistributionKey,
357 kAttributeKey, kCidMetadata);
358 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesHeader, ntpl.fLenHeader, buffer.get());
359 Internal::RNTupleSerializer::DeserializeHeaderV1(buffer.get(), ntpl.fLenHeader, descBuilder);
360
361 buffer = std::make_unique<unsigned char[]>(ntpl.fLenFooter);
362 zipBuffer = std::make_unique<unsigned char[]>(ntpl.fNBytesFooter);
363 fDaosContainer->ReadSingleAkey(zipBuffer.get(), ntpl.fNBytesFooter, kOidFooter, kDistributionKey,
364 kAttributeKey, kCidMetadata);
365 fDecompressor->Unzip(zipBuffer.get(), ntpl.fNBytesFooter, ntpl.fLenFooter, buffer.get());
366 Internal::RNTupleSerializer::DeserializeFooterV1(buffer.get(), ntpl.fLenFooter, descBuilder);
367
368 auto cg = descBuilder.GetClusterGroup(0);
369 descBuilder.SetOnDiskFooterSize(ntpl.fNBytesFooter + cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage);
370 buffer = std::make_unique<unsigned char[]>(cg.fPageListEnvelopeLink.fUnzippedSize);
371 zipBuffer = std::make_unique<unsigned char[]>(cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage);
372 fDaosContainer->ReadSingleAkey(
373 zipBuffer.get(), cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage,
374 {static_cast<decltype(daos_obj_id_t::lo)>(cg.fPageListEnvelopeLink.fLocator.fPosition), 0}, kDistributionKey,
375 kAttributeKey, kCidMetadata);
376 fDecompressor->Unzip(zipBuffer.get(), cg.fPageListEnvelopeLink.fLocator.fBytesOnStorage,
377 cg.fPageListEnvelopeLink.fUnzippedSize, buffer.get());
378
379 std::vector<RClusterDescriptorBuilder> clusters;
380 Internal::RNTupleSerializer::DeserializePageListV1(buffer.get(), cg.fPageListEnvelopeLink.fUnzippedSize, clusters);
381 for (std::size_t i = 0; i < clusters.size(); ++i) {
382 descBuilder.AddCluster(i, std::move(clusters[i]));
383 }
384
385 return descBuilder.MoveDescriptor();
386}
387
388
390{
391 return fDaosContainer->GetDefaultObjectClass().ToString();
392}
393
394
396 DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage)
397{
398 const auto clusterId = clusterIndex.GetClusterId();
399 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
400
401 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(clusterIndex.GetIndex());
402
403 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
404 sealedPage.fSize = bytesOnStorage;
405 sealedPage.fNElements = pageInfo.fNElements;
406 if (sealedPage.fBuffer) {
407 fDaosContainer->ReadSingleAkey(const_cast<void *>(sealedPage.fBuffer), bytesOnStorage,
408 {static_cast<decltype(daos_obj_id_t::lo)>(pageInfo.fLocator.fPosition), 0},
409 kDistributionKey, kAttributeKey);
410 }
411}
412
414 ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
415{
416 const auto columnId = columnHandle.fId;
417 const auto clusterId = clusterDescriptor.GetId();
418
419 auto pageInfo = clusterDescriptor.GetPageRange(columnId).Find(idxInCluster);
420
421 const auto element = columnHandle.fColumn->GetElement();
422 const auto elementSize = element->GetSize();
423 const auto bytesOnStorage = pageInfo.fLocator.fBytesOnStorage;
424
425 const void *sealedPageBuffer = nullptr; // points either to directReadBuffer or to a read-only page in the cluster
426 std::unique_ptr<unsigned char []> directReadBuffer; // only used if cluster pool is turned off
427
428 if (fOptions.GetClusterCache() == RNTupleReadOptions::EClusterCache::kOff) {
429 directReadBuffer = std::make_unique<unsigned char[]>(bytesOnStorage);
430 fDaosContainer->ReadSingleAkey(directReadBuffer.get(), bytesOnStorage,
431 {static_cast<decltype(daos_obj_id_t::lo)>(pageInfo.fLocator.fPosition), 0},
432 kDistributionKey, kAttributeKey);
433 fCounters->fNPageLoaded.Inc();
434 fCounters->fNRead.Inc();
435 fCounters->fSzReadPayload.Add(bytesOnStorage);
436 sealedPageBuffer = directReadBuffer.get();
437 } else {
438 if (!fCurrentCluster || (fCurrentCluster->GetId() != clusterId) || !fCurrentCluster->ContainsColumn(columnId))
439 fCurrentCluster = fClusterPool->GetCluster(clusterId, fActiveColumns);
440 R__ASSERT(fCurrentCluster->ContainsColumn(columnId));
441
442 auto cachedPage = fPagePool->GetPage(columnId, RClusterIndex(clusterId, idxInCluster));
443 if (!cachedPage.IsNull())
444 return cachedPage;
445
446 ROnDiskPage::Key key(columnId, pageInfo.fPageNo);
447 auto onDiskPage = fCurrentCluster->GetOnDiskPage(key);
448 R__ASSERT(onDiskPage && (bytesOnStorage == onDiskPage->GetSize()));
449 sealedPageBuffer = onDiskPage->GetAddress();
450 }
451
452 std::unique_ptr<unsigned char []> pageBuffer;
453 {
454 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
455 pageBuffer = UnsealPage({sealedPageBuffer, bytesOnStorage, pageInfo.fNElements}, *element);
456 fCounters->fSzUnzip.Add(elementSize * pageInfo.fNElements);
457 }
458
459 const auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
460 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), elementSize, pageInfo.fNElements);
461 newPage.SetWindow(indexOffset + pageInfo.fFirstInPage, RPage::RClusterInfo(clusterId, indexOffset));
462 fPagePool->RegisterPage(newPage,
463 RPageDeleter([](const RPage &page, void * /*userData*/)
464 {
466 }, nullptr));
467 fCounters->fNPagePopulated.Inc();
468 return newPage;
469}
470
471
473 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
474{
475 const auto columnId = columnHandle.fId;
476 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
477 if (!cachedPage.IsNull())
478 return cachedPage;
479
480 const auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
481 R__ASSERT(clusterId != kInvalidDescriptorId);
482 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
483 const auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
484 R__ASSERT(selfOffset <= globalIndex);
485 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
486}
487
488
490 ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
491{
492 const auto clusterId = clusterIndex.GetClusterId();
493 const auto idxInCluster = clusterIndex.GetIndex();
494 const auto columnId = columnHandle.fId;
495 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
496 if (!cachedPage.IsNull())
497 return cachedPage;
498
499 R__ASSERT(clusterId != kInvalidDescriptorId);
500 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
501 return PopulatePageFromCluster(columnHandle, clusterDescriptor, idxInCluster);
502}
503
505{
506 fPagePool->ReturnPage(page);
507}
508
509std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceDaos::Clone() const
510{
511 auto clone = new RPageSourceDaos(fNTupleName, fURI, fOptions);
512 return std::unique_ptr<RPageSourceDaos>(clone);
513}
514
515std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>>
517{
518 std::vector<std::unique_ptr<ROOT::Experimental::Detail::RCluster>> result;
519 for (const auto &clusterKey : clusterKeys) {
520 auto clusterId = clusterKey.fClusterId;
521 fCounters->fNClusterLoaded.Inc();
522
523 const auto &clusterDesc = GetDescriptor().GetClusterDescriptor(clusterId);
524
525 struct RDaosSealedPageLocator {
526 RDaosSealedPageLocator() = default;
527 RDaosSealedPageLocator(DescriptorId_t c, NTupleSize_t p, std::uint64_t o, std::uint64_t s, std::size_t b)
528 : fColumnId(c), fPageNo(p), fObjectId(o), fSize(s), fBufPos(b) {}
529 DescriptorId_t fColumnId = 0;
530 NTupleSize_t fPageNo = 0;
531 std::uint64_t fObjectId = 0;
532 std::uint64_t fSize = 0;
533 std::size_t fBufPos = 0;
534 };
535
536 // Collect the page necessary page meta-data and sum up the total size of the compressed and packed pages
537 std::vector<RDaosSealedPageLocator> onDiskPages;
538 std::size_t szPayload = 0;
539 for (auto columnId : clusterKey.fColumnSet) {
540 const auto &pageRange = clusterDesc.GetPageRange(columnId);
541 NTupleSize_t pageNo = 0;
542 for (const auto &pageInfo : pageRange.fPageInfos) {
543 const auto &pageLocator = pageInfo.fLocator;
544 onDiskPages.emplace_back(RDaosSealedPageLocator(
545 columnId, pageNo, pageLocator.fPosition, pageLocator.fBytesOnStorage, szPayload));
546 szPayload += pageLocator.fBytesOnStorage;
547 ++pageNo;
548 }
549 }
550
551 // Prepare the input vector for the RDaosContainer::ReadV() call
552 std::vector<RDaosContainer::RWOperation> readRequests;
553 auto buffer = new unsigned char[szPayload];
554 for (auto &s : onDiskPages) {
555 std::vector<d_iov_t> iovs(1);
556 d_iov_set(&iovs[0], buffer + s.fBufPos, s.fSize);
557 readRequests.emplace_back(daos_obj_id_t{s.fObjectId, 0},
558 kDistributionKey, kAttributeKey, iovs);
559 }
560 fCounters->fSzReadPayload.Add(szPayload);
561
562 // Register the on disk pages in a page map
563 auto pageMap = std::make_unique<ROnDiskPageMapHeap>(std::unique_ptr<unsigned char []>(buffer));
564 for (const auto &s : onDiskPages) {
565 ROnDiskPage::Key key(s.fColumnId, s.fPageNo);
566 pageMap->Register(key, ROnDiskPage(buffer + s.fBufPos, s.fSize));
567 }
568 fCounters->fNPageLoaded.Add(onDiskPages.size());
569
570 {
571 RNTupleAtomicTimer timer(fCounters->fTimeWallRead, fCounters->fTimeCpuRead);
572 fDaosContainer->ReadV(readRequests);
573 }
574 fCounters->fNReadV.Inc();
575 fCounters->fNRead.Add(readRequests.size());
576
577 auto cluster = std::make_unique<RCluster>(clusterId);
578 cluster->Adopt(std::move(pageMap));
579 for (auto colId : clusterKey.fColumnSet)
580 cluster->SetColumnAvailable(colId);
581
582 result.emplace_back(std::move(cluster));
583 }
584 return result;
585}
586
587
589{
590 RNTupleAtomicTimer timer(fCounters->fTimeWallUnzip, fCounters->fTimeCpuUnzip);
591 fTaskScheduler->Reset();
592
593 const auto clusterId = cluster->GetId();
594 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
595
596 std::vector<std::unique_ptr<RColumnElementBase>> allElements;
597
598 const auto &columnsInCluster = cluster->GetAvailColumns();
599 for (const auto columnId : columnsInCluster) {
600 const auto &columnDesc = fDescriptor.GetColumnDescriptor(columnId);
601
602 allElements.emplace_back(RColumnElementBase::Generate(columnDesc.GetModel().GetType()));
603
604 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
605 std::uint64_t pageNo = 0;
606 std::uint64_t firstInPage = 0;
607 for (const auto &pi : pageRange.fPageInfos) {
608 ROnDiskPage::Key key(columnId, pageNo);
609 auto onDiskPage = cluster->GetOnDiskPage(key);
610 R__ASSERT(onDiskPage && (onDiskPage->GetSize() == pi.fLocator.fBytesOnStorage));
611
612 auto taskFunc =
613 [this, columnId, clusterId, firstInPage, onDiskPage,
614 element = allElements.back().get(),
615 nElements = pi.fNElements,
616 indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex
617 ] () {
618 auto pageBuffer = UnsealPage({onDiskPage->GetAddress(), onDiskPage->GetSize(), nElements}, *element);
619 fCounters->fSzUnzip.Add(element->GetSize() * nElements);
620
621 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer.release(), element->GetSize(), nElements);
622 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
623 fPagePool->PreloadPage(newPage,
624 RPageDeleter([](const RPage &page, void * /*userData*/)
625 {
627 }, nullptr));
628 };
629
630 fTaskScheduler->AddTask(taskFunc);
631
632 firstInPage += pi.fNElements;
633 pageNo++;
634 } // for all pages in column
635 } // for all columns in cluster
636
637 fCounters->fNPagePopulated.Add(cluster->GetNOnDiskPages());
638
639 fTaskScheduler->Wait();
640}
size_t fSize
#define R__FORWARD_ERROR(res)
Short-hand to return an RResult<T> in an error state (i.e. after checking)
Definition RError.hxx:295
#define R__FAIL(msg)
Short-hand to return an RResult<T> in an error state; the RError is implicitly converted into RResult...
Definition RError.hxx:291
#define R__LOG_WARNING(...)
Definition RLogger.hxx:363
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define R__ASSERT(e)
Definition TError.h:118
Managed a set of clusters containing compressed and packed pages.
An in-memory subset of the packed and compressed pages of a cluster.
Definition RCluster.hxx:154
const ColumnSet_t & GetAvailColumns() const
Definition RCluster.hxx:198
const ROnDiskPage * GetOnDiskPage(const ROnDiskPage::Key &key) const
Definition RCluster.cxx:37
static std::unique_ptr< RColumnElementBase > Generate(EColumnType type)
RColumnElementBase * GetElement() const
Definition RColumn.hxx:308
static Writer_t MakeMemCopyWriter(unsigned char *dest)
Record wall time and CPU time between construction and destruction.
A page as being stored on disk, that is packed and compressed.
Definition RCluster.hxx:43
Manages pages read from a DAOS container.
static RPage NewPage(ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
Uses standard C++ memory allocation for the column data pages.
A closure that can free the memory associated with a mapped page.
A thread-safe cache of column pages.
Definition RPagePool.hxx:47
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options)
RNTupleLocator CommitSealedPageImpl(DescriptorId_t columnId, const RPageStorage::RSealedPage &sealedPage) final
void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter)
void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader)
std::uint64_t CommitClusterImpl(NTupleSize_t nEntries) final
Returns the number of bytes written to storage (excluding metadata)
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final
void CreateImpl(const RNTupleModel &model) final
RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final
Get a new, empty page for the given column that can be filled with up to nElements.
Abstract interface to write data into an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSink.
std::unique_ptr< RNTupleCompressor > fCompressor
Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
Storage provider that reads ntuple pages from a DAOS container.
std::unique_ptr< RDaosContainer > fDaosContainer
A container that stores object data (header/footer, pages, etc.)
void LoadSealedPage(DescriptorId_t columnId, const RClusterIndex &clusterIndex, RSealedPage &sealedPage) final
Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage.
RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType idxInCluster)
RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options)
void ReleasePage(RPage &page) final
Every page store needs to be able to free pages it handed out.
void UnzipClusterImpl(RCluster *cluster) final
std::vector< std::unique_ptr< RCluster > > LoadClusters(std::span< RCluster::RKey > clusterKeys) final
Populates all the pages of the given cluster ids and columns; it is possible that some columns do not...
RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final
Allocates and fills a page that contains the index-th element.
std::string GetObjectClass() const
Return the object class used for user data OIDs in this ntuple.
std::unique_ptr< RPageSource > Clone() const final
The cloned page source creates a new connection to the pool/container.
Abstract interface to read data from an ntuple.
void EnableDefaultMetrics(const std::string &prefix)
Enables the default set of metrics provided by RPageSource.
std::unique_ptr< RNTupleDecompressor > fDecompressor
Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
Stores information about the cluster in which this page resides.
Definition RPage.hxx:46
A page is a slice of a column that is mapped into memory.
Definition RPage.hxx:41
ClusterSize_t::ValueType GetNBytes() const
The space taken by column elements in the buffer.
Definition RPage.hxx:81
void * GrowUnchecked(ClusterSize_t::ValueType nElements)
Called during writing: returns a pointer after the last element and increases the element counter in ...
Definition RPage.hxx:109
A helper class for serializing and deserialization of the RNTuple binary format.
static std::uint32_t SerializePageListV1(void *buffer, const RNTupleDescriptor &desc, std::span< DescriptorId_t > physClusterIDs, const RContext &context)
static RResult< void > DeserializePageListV1(const void *buffer, std::uint32_t bufSize, std::vector< RClusterDescriptorBuilder > &clusters)
static RContext SerializeHeaderV1(void *buffer, const RNTupleDescriptor &desc)
static RResult< void > DeserializeFooterV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
static std::uint32_t SerializeFooterV1(void *buffer, const RNTupleDescriptor &desc, const RContext &context)
static RResult< void > DeserializeHeaderV1(const void *buffer, std::uint32_t bufSize, RNTupleDescriptorBuilder &descBuilder)
Meta-data for a set of ntuple clusters.
const RPageRange & GetPageRange(DescriptorId_t columnId) const
const RColumnRange & GetColumnRange(DescriptorId_t columnId) const
Addresses a column element or field item relative to a particular cluster, instead of a global NTuple...
DescriptorId_t GetClusterId() const
ClusterSize_t::ValueType GetIndex() const
Base class for all ROOT issued exceptions.
Definition RError.hxx:114
A helper class for piece-wise construction of an RNTupleDescriptor.
Internal::RNTupleSerializer::RClusterGroup GetClusterGroup(std::uint32_t id) const
void AddCluster(DescriptorId_t clusterId, RNTupleVersion version, NTupleSize_t firstEntryIndex, ClusterSize_t nEntries)
The on-storage meta-data of an ntuple.
The RNTupleModel encapulates the schema of an ntuple.
Common user-tunable settings for reading ntuples.
DAOS-specific user-tunable settings for storing ntuples.
Common user-tunable settings for storing ntuples.
The class is used as a return type for operations that can fail; wraps a value of type T or an RError...
Definition RError.hxx:195
static void d_iov_set(d_iov_t *iov, void *buf, size_t size)
Definition daos.h:54
uint16_t daos_oclass_id_t
Definition daos.h:133
@ OC_SX
Definition daos.h:127
const Int_t n
Definition legend1.C:16
RLogChannel & NTupleLog()
Log channel for RNTuple diagnostics.
std::uint64_t NTupleSize_t
Integer type long enough to hold the maximum number of entries in a column.
std::uint64_t DescriptorId_t
Distriniguishes elements of the same type within a descriptor, e.g. different fields.
std::int64_t ColumnId_t
Uniquely identifies a physical column within the scope of the current process, used to tag pages.
constexpr DescriptorId_t kInvalidDescriptorId
Entry point for an RNTuple in a DAOS container.
std::uint32_t fNBytesFooter
The size of the compressed ntuple footer.
std::uint32_t fNBytesHeader
The size of the compressed ntuple header.
std::string fObjClass
The object class for user data OIDs, e.g. SX
std::uint32_t fVersion
Allows for evolving the struct in future versions.
RResult< std::uint32_t > Deserialize(const void *buffer, std::uint32_t bufSize)
std::uint32_t fLenHeader
The size of the uncompressed ntuple header.
std::uint32_t Serialize(void *buffer) const
std::uint32_t fLenFooter
The size of the uncompressed ntuple footer.
Wrap around a daos_oclass_id_t.
Definition RDaos.hxx:71
static constexpr std::size_t kOCNameMaxLength
This limit is currently not defined in any header and any call to daos_oclass_id2name() within DAOS u...
Definition RDaos.hxx:85
On-disk pages within a page source are identified by the column and page number.
Definition RCluster.hxx:53
A sealed page contains the bytes of a page as written to storage (packed & compressed).
RPageInfoExtended Find(RClusterSize::ValueType idxInCluster) const
Find the page in the RPageRange that contains the given element. The element must exist.
Generic information about the physical location of data.
auto * m
Definition textangle.C:8