18#include <liburing/io_uring.h>
35 std::uint32_t queueDepth = 1024;
38 ret = io_uring_queue_init(queueDepth, &
fRing, 0 );
44 throw std::runtime_error(
"Error initializing io_uring: " + std::string(std::strerror(-ret)));
48 if (queueDepth == 0) {
49 throw std::runtime_error(
"Failed to allocate memory for the smallest possible "
50 "io_uring instance. 'memlock' memory has been exhausted for this user");
58 struct io_uring_params params = {};
59 int ret = io_uring_queue_init_params(entriesHint, &
fRing, ¶ms);
61 throw std::runtime_error(
"Error initializing io_uring: " + std::string(std::strerror(-ret)));
63 fDepth = params.sq_entries;
71 io_uring_queue_exit(&
fRing);
100 unsigned int batch = 0;
101 unsigned int batchSize =
fDepth;
102 unsigned int readPos = 0;
104 while (readPos < nReads) {
105 if (readPos + batchSize > nReads) {
106 batchSize = nReads - readPos;
109 struct io_uring_sqe *sqe;
110 for (std::size_t i = readPos; i < readPos + batchSize; ++i) {
111 sqe = io_uring_get_sqe(&
fRing);
113 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
114 +
"get SQE failed for read request '" + std::to_string(i)
115 +
"', error: " + std::string(strerror(errno)));
117 if (readEvents[i].fFileDes == -1) {
118 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
119 +
"bad fd (-1) for read request '" + std::to_string(i) +
"'");
121 if (readEvents[i].
fBuffer ==
nullptr) {
122 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
123 +
"null read buffer for read request '" + std::to_string(i) +
"'");
125 io_uring_prep_read(sqe,
126 readEvents[i].fFileDes,
129 readEvents[i].fOffset
131 sqe->flags |= IOSQE_ASYNC;
136 int submitted = io_uring_submit_and_wait(&
fRing, batchSize);
137 if (submitted <= 0) {
138 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
139 "ring submit failed, error: " + std::string(strerror(errno)));
141 if (submitted !=
static_cast<int>(batchSize)) {
142 throw std::runtime_error(
"ring submitted " + std::to_string(submitted) +
143 " events but requested " + std::to_string(batchSize));
146 struct io_uring_cqe *cqe;
148 for (
int i = 0; i < submitted; ++i) {
149 ret = io_uring_wait_cqe(&
fRing, &cqe);
151 throw std::runtime_error(
"wait cqe failed, error: " + std::string(std::strerror(-ret)));
153 auto index =
reinterpret_cast<std::size_t
>(io_uring_cqe_get_data(cqe));
154 if (index >= nReads) {
155 throw std::runtime_error(
"bad cqe user data: " + std::to_string(index));
158 throw std::runtime_error(
"batch " + std::to_string(batch) +
": "
159 +
"read failed for ReadEvent[" + std::to_string(index) +
"], "
160 "error: " + std::string(std::strerror(-cqe->res)));
162 readEvents[index].
fOutBytes =
static_cast<std::size_t
>(cqe->res);
163 io_uring_cqe_seen(&
fRing, cqe);
165 readPos += batchSize;
RIoUring(std::uint32_t entriesHint)
std::uint32_t GetQueueDepth()
RIoUring(const RIoUring &)=delete
RIoUring & operator=(const RIoUring &)=delete
void SubmitReadsAndWait(RReadEvent *readEvents, unsigned int nReads)
Submit a number of read events and wait for completion.
struct io_uring * GetRawRing()
Access the raw io_uring instance.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Basic read event composed of IO data and a target file descriptor.
int fFileDes
The file descriptor.
std::uint64_t fOffset
The file offset.
void * fBuffer
The destination for reading.
std::size_t fOutBytes
The number of actually read bytes, set by the RIoUring instance.
std::size_t fSize
The number of desired bytes.