17#include <liburing/io_uring.h>
34 catch (
const std::runtime_error& err) {
35 Warning(
"RIoUring",
"io_uring is not available\n%s", err.what());
45 std::uint32_t queueDepth = 1024;
48 ret = io_uring_queue_init(queueDepth, &
fRing, 0 );
54 throw std::runtime_error(
"Error initializing io_uring: " + std::string(std::strerror(-ret)));
58 if (queueDepth == 0) {
59 throw std::runtime_error(
"Fatal Error: failed to allocate memory for the smallest possible "
60 "io_uring instance. 'memlock' memory has been exhausted for this user");
68 struct io_uring_params params = {};
69 int ret = io_uring_queue_init_params(entriesHint, &
fRing, ¶ms);
71 throw std::runtime_error(
"Error initializing io_uring: " + std::string(std::strerror(-ret)));
73 fDepth = params.sq_entries;
81 io_uring_queue_exit(&
fRing);
116 unsigned int batch = 0;
117 unsigned int batchSize =
fDepth;
118 unsigned int readPos = 0;
120 while (readPos < nReads) {
121 if (readPos + batchSize > nReads) {
122 batchSize = nReads - readPos;
125 struct io_uring_sqe *sqe;
126 for (std::size_t i = readPos; i < readPos + batchSize; ++i) {
127 sqe = io_uring_get_sqe(&
fRing);
129 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
130 +
"get SQE failed for read request '" + std::to_string(i)
131 +
"', error: " + std::string(strerror(errno)));
133 if (readEvents[i].fFileDes == -1) {
134 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
135 +
"bad fd (-1) for read request '" + std::to_string(i) +
"'");
137 if (readEvents[i].
fBuffer ==
nullptr) {
138 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
139 +
"null read buffer for read request '" + std::to_string(i) +
"'");
141 io_uring_prep_read(sqe,
142 readEvents[i].fFileDes,
145 readEvents[i].fOffset
147 sqe->flags |= IOSQE_ASYNC;
152 int submitted = io_uring_submit_and_wait(&
fRing, batchSize);
153 if (submitted <= 0) {
154 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
155 "ring submit failed, error: " + std::string(strerror(errno)));
157 if (submitted !=
static_cast<int>(batchSize)) {
158 throw std::runtime_error(
"ring submitted " + std::to_string(submitted) +
159 " events but requested " + std::to_string(batchSize));
162 struct io_uring_cqe *cqe;
164 for (
int i = 0; i < submitted; ++i) {
165 ret = io_uring_wait_cqe(&
fRing, &cqe);
167 throw std::runtime_error(
"wait cqe failed, error: " + std::string(std::strerror(-ret)));
169 auto index =
reinterpret_cast<std::size_t
>(io_uring_cqe_get_data(cqe));
170 if (index >= nReads) {
171 throw std::runtime_error(
"bad cqe user data: " + std::to_string(index));
174 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
175 +
"read failed for ReadEvent[" + std::to_string(index) +
"], "
176 "error: " + std::string(std::strerror(-cqe->res)));
178 readEvents[index].
fOutBytes =
static_cast<std::size_t
>(cqe->res);
179 io_uring_cqe_seen(&
fRing, cqe);
181 readPos += batchSize;
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
static bool CheckIsAvailable()
RIoUring(std::uint32_t entriesHint)
std::uint32_t GetQueueDepth()
RIoUring(const RIoUring &)=delete
RIoUring & operator=(const RIoUring &)=delete
static bool IsAvailable()
Check if io_uring is available on this system.
void SubmitReadsAndWait(RReadEvent *readEvents, unsigned int nReads)
Submit a number of read events and wait for completion.
struct io_uring * GetRawRing()
Access the raw io_uring instance.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Basic read event composed of IO data and a target file descriptor.
int fFileDes
The file descriptor.
std::uint64_t fOffset
The file offset.
void * fBuffer
The destination for reading.
std::size_t fOutBytes
The number of actually read bytes, set by the RIoUring instance.
std::size_t fSize
The number of desired bytes.