Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RIoUring.hxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#ifndef ROOT_RIoUring
10#define ROOT_RIoUring
11
12#include <cstdint>
13#include <cstring>
14#include <stdexcept>
15#include <string>
16
17#include <liburing.h>
18#include <liburing/io_uring.h>
19
20#include "TError.h"
21
22namespace ROOT {
23namespace Internal {
24
25class RIoUring {
26private:
27 struct io_uring fRing;
28 std::uint32_t fDepth = 0;
29
30public:
31 // Create an io_uring instance. The ring selects an appropriate queue depth. which can be queried
32 // afterwards using GetQueueDepth(). The depth is typically 1024 or lower. Throws an exception if
33 // ring setup fails.
35 std::uint32_t queueDepth = 1024;
36 int ret;
37 while (true) {
38 ret = io_uring_queue_init(queueDepth, &fRing, 0 /* no flags */);
39 if (ret == 0) {
40 fDepth = queueDepth;
41 break; // ring setup succeeded
42 }
43 if (ret != -ENOMEM) {
44 throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
45 }
46 // try again with a smaller queue for ENOMEM
47 queueDepth /= 2;
48 if (queueDepth == 0) {
49 throw std::runtime_error("Failed to allocate memory for the smallest possible "
50 "io_uring instance. 'memlock' memory has been exhausted for this user");
51 }
52 }
53 }
54
55 // Create a io_uring instance that can hold at least `entriesHint` submission entries. The actual
56 // queue depth is rounded up to the next power of 2. Throws an exception if ring setup fails.
57 explicit RIoUring(std::uint32_t entriesHint) {
58 struct io_uring_params params = {}; /* zero initialize param struct, no flags */
59 int ret = io_uring_queue_init_params(entriesHint, &fRing, &params);
60 if (ret != 0) {
61 throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
62 }
63 fDepth = params.sq_entries;
64 }
65
66 RIoUring(const RIoUring&) = delete;
67 RIoUring& operator=(const RIoUring&) = delete;
68
70 // todo(max) try submitting any pending events before exiting
71 io_uring_queue_exit(&fRing);
72 }
73
74 std::uint32_t GetQueueDepth() {
75 return fDepth;
76 }
77
78 /// Access the raw io_uring instance.
79 struct io_uring *GetRawRing() {
80 return &fRing;
81 }
82
83 /// Basic read event composed of IO data and a target file descriptor.
84 struct RReadEvent {
85 /// The destination for reading
86 void *fBuffer = nullptr;
87 /// The file offset
88 std::uint64_t fOffset = 0;
89 /// The number of desired bytes
90 std::size_t fSize = 0;
91 /// The number of actually read bytes, set by the RIoUring instance
92 std::size_t fOutBytes = 0;
93 /// The file descriptor
94 int fFileDes = -1;
95 };
96
97 /// Submit a number of read events and wait for completion. Events are submitted in batches if
98 /// the number of events is larger than the submission queue depth.
99 void SubmitReadsAndWait(RReadEvent* readEvents, unsigned int nReads) {
100 unsigned int batch = 0;
101 unsigned int batchSize = fDepth;
102 unsigned int readPos = 0;
103
104 while (readPos < nReads) {
105 if (readPos + batchSize > nReads) {
106 batchSize = nReads - readPos;
107 }
108 // prep reads
109 struct io_uring_sqe *sqe;
110 for (std::size_t i = readPos; i < readPos + batchSize; ++i) {
111 sqe = io_uring_get_sqe(&fRing);
112 if (!sqe) {
113 throw std::runtime_error("batch " + std::to_string(batch) + ": "
114 + "get SQE failed for read request '" + std::to_string(i)
115 + "', error: " + std::string(strerror(errno)));
116 }
117 if (readEvents[i].fFileDes == -1) {
118 throw std::runtime_error("batch " + std::to_string(batch) + ": "
119 + "bad fd (-1) for read request '" + std::to_string(i) + "'");
120 }
121 if (readEvents[i].fBuffer == nullptr) {
122 throw std::runtime_error("batch " + std::to_string(batch) + ": "
123 + "null read buffer for read request '" + std::to_string(i) + "'");
124 }
125 io_uring_prep_read(sqe,
126 readEvents[i].fFileDes,
127 readEvents[i].fBuffer,
128 readEvents[i].fSize,
129 readEvents[i].fOffset
130 );
131 sqe->flags |= IOSQE_ASYNC; // maximize read event throughput
132 sqe->user_data = i;
133 }
134
135 // todo(max) check for any difference between submit vs. submit and wait for large nReq
136 int submitted = io_uring_submit_and_wait(&fRing, batchSize);
137 if (submitted <= 0) {
138 throw std::runtime_error("batch " + std::to_string(batch) + ": "
139 "ring submit failed, error: " + std::string(strerror(errno)));
140 }
141 if (submitted != static_cast<int>(batchSize)) {
142 throw std::runtime_error("ring submitted " + std::to_string(submitted) +
143 " events but requested " + std::to_string(batchSize));
144 }
145 // reap reads
146 struct io_uring_cqe *cqe;
147 int ret;
148 for (int i = 0; i < submitted; ++i) {
149 ret = io_uring_wait_cqe(&fRing, &cqe);
150 if (ret < 0) {
151 throw std::runtime_error("wait cqe failed, error: " + std::string(std::strerror(-ret)));
152 }
153 auto index = reinterpret_cast<std::size_t>(io_uring_cqe_get_data(cqe));
154 if (index >= nReads) {
155 throw std::runtime_error("bad cqe user data: " + std::to_string(index));
156 }
157 if (cqe->res < 0) {
158 throw std::runtime_error("batch " + std::to_string(batch) + ": "
159 + "read failed for ReadEvent[" + std::to_string(index) + "], "
160 "error: " + std::string(std::strerror(-cqe->res)));
161 }
162 readEvents[index].fOutBytes = static_cast<std::size_t>(cqe->res);
163 io_uring_cqe_seen(&fRing, cqe);
164 }
165 readPos += batchSize;
166 batch += 1;
167 }
168 return;
169 }
170};
171
172} // namespace Internal
173} // namespace ROOT
174
175#endif
size_t fSize
std::string fBuffer
struct io_uring fRing
Definition RIoUring.hxx:27
RIoUring(std::uint32_t entriesHint)
Definition RIoUring.hxx:57
std::uint32_t GetQueueDepth()
Definition RIoUring.hxx:74
RIoUring(const RIoUring &)=delete
RIoUring & operator=(const RIoUring &)=delete
void SubmitReadsAndWait(RReadEvent *readEvents, unsigned int nReads)
Submit a number of read events and wait for completion.
Definition RIoUring.hxx:99
struct io_uring * GetRawRing()
Access the raw io_uring instance.
Definition RIoUring.hxx:79
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Basic read event composed of IO data and a target file descriptor.
Definition RIoUring.hxx:84
int fFileDes
The file descriptor.
Definition RIoUring.hxx:94
std::uint64_t fOffset
The file offset.
Definition RIoUring.hxx:88
void * fBuffer
The destination for reading.
Definition RIoUring.hxx:86
std::size_t fOutBytes
The number of actually read bytes, set by the RIoUring instance.
Definition RIoUring.hxx:92
std::size_t fSize
The number of desired bytes.
Definition RIoUring.hxx:90