Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
RIoUring.hxx
Go to the documentation of this file.
1/*************************************************************************
2 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. *
3 * All rights reserved. *
4 * *
5 * For the licensing terms see $ROOTSYS/LICENSE. *
6 * For the list of contributors see $ROOTSYS/README/CREDITS. *
7 *************************************************************************/
8
9#ifndef ROOT_RIoUring
10#define ROOT_RIoUring
11
12#include <cstdint>
13#include <cstring>
14#include <stdexcept>
15
16#include <liburing.h>
17#include <liburing/io_uring.h>
18
19#include "TError.h"
20
21namespace ROOT {
22namespace Internal {
23
24class RIoUring {
25private:
26 struct io_uring fRing;
27 std::uint32_t fDepth = 0;
28
29 static bool CheckIsAvailable() {
30 try {
31 RIoUring(1);
32 return true;
33 }
34 catch (const std::runtime_error& err) {
35 Warning("RIoUring", "io_uring is not available\n%s", err.what());
36 }
37 return false;
38 }
39
40public:
41 // Create an io_uring instance. The ring selects an appropriate queue depth. which can be queried
42 // afterwards using GetQueueDepth(). The depth is typically 1024 or lower. Throws an exception if
43 // ring setup fails.
45 std::uint32_t queueDepth = 1024;
46 int ret;
47 while (true) {
48 ret = io_uring_queue_init(queueDepth, &fRing, 0 /* no flags */);
49 if (ret == 0) {
50 fDepth = queueDepth;
51 break; // ring setup succeeded
52 }
53 if (ret != -ENOMEM) {
54 throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
55 }
56 // try again with a smaller queue for ENOMEM
57 queueDepth /= 2;
58 if (queueDepth == 0) {
59 throw std::runtime_error("Fatal Error: failed to allocate memory for the smallest possible "
60 "io_uring instance. 'memlock' memory has been exhausted for this user");
61 }
62 }
63 }
64
65 // Create a io_uring instance that can hold at least `entriesHint` submission entries. The actual
66 // queue depth is rounded up to the next power of 2. Throws an exception if ring setup fails.
67 explicit RIoUring(std::uint32_t entriesHint) {
68 struct io_uring_params params = {}; /* zero initialize param struct, no flags */
69 int ret = io_uring_queue_init_params(entriesHint, &fRing, &params);
70 if (ret != 0) {
71 throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
72 }
73 fDepth = params.sq_entries;
74 }
75
76 RIoUring(const RIoUring&) = delete;
77 RIoUring& operator=(const RIoUring&) = delete;
78
80 // todo(max) try submitting any pending events before exiting
81 io_uring_queue_exit(&fRing);
82 }
83
84 /// Check if io_uring is available on this system.
85 static bool IsAvailable() {
86 static const bool available = RIoUring::CheckIsAvailable();
87 return available;
88 }
89
90 std::uint32_t GetQueueDepth() {
91 return fDepth;
92 }
93
94 /// Access the raw io_uring instance.
95 struct io_uring *GetRawRing() {
96 return &fRing;
97 }
98
99 /// Basic read event composed of IO data and a target file descriptor.
100 struct RReadEvent {
101 /// The destination for reading
102 void *fBuffer = nullptr;
103 /// The file offset
104 std::uint64_t fOffset = 0;
105 /// The number of desired bytes
106 std::size_t fSize = 0;
107 /// The number of actually read bytes, set by the RIoUring instance
108 std::size_t fOutBytes = 0;
109 /// The file descriptor
110 int fFileDes = -1;
111 };
112
113 /// Submit a number of read events and wait for completion. Events are submitted in batches if
114 /// the number of events is larger than the submission queue depth.
115 void SubmitReadsAndWait(RReadEvent* readEvents, unsigned int nReads) {
116 unsigned int batch = 0;
117 unsigned int batchSize = fDepth;
118 unsigned int readPos = 0;
119
120 while (readPos < nReads) {
121 if (readPos + batchSize > nReads) {
122 batchSize = nReads - readPos;
123 }
124 // prep reads
125 struct io_uring_sqe *sqe;
126 for (std::size_t i = readPos; i < readPos + batchSize; ++i) {
127 sqe = io_uring_get_sqe(&fRing);
128 if (!sqe) {
129 throw std::runtime_error("batch " + std::to_string(batch) + ": "
130 + "get SQE failed for read request '" + std::to_string(i)
131 + "', error: " + std::string(strerror(errno)));
132 }
133 if (readEvents[i].fFileDes == -1) {
134 throw std::runtime_error("batch " + std::to_string(batch) + ": "
135 + "bad fd (-1) for read request '" + std::to_string(i) + "'");
136 }
137 if (readEvents[i].fBuffer == nullptr) {
138 throw std::runtime_error("batch " + std::to_string(batch) + ": "
139 + "null read buffer for read request '" + std::to_string(i) + "'");
140 }
141 io_uring_prep_read(sqe,
142 readEvents[i].fFileDes,
143 readEvents[i].fBuffer,
144 readEvents[i].fSize,
145 readEvents[i].fOffset
146 );
147 sqe->flags |= IOSQE_ASYNC; // maximize read event throughput
148 sqe->user_data = i;
149 }
150
151 // todo(max) check for any difference between submit vs. submit and wait for large nReq
152 int submitted = io_uring_submit_and_wait(&fRing, batchSize);
153 if (submitted <= 0) {
154 throw std::runtime_error("batch " + std::to_string(batch) + ": "
155 "ring submit failed, error: " + std::string(strerror(errno)));
156 }
157 if (submitted != static_cast<int>(batchSize)) {
158 throw std::runtime_error("ring submitted " + std::to_string(submitted) +
159 " events but requested " + std::to_string(batchSize));
160 }
161 // reap reads
162 struct io_uring_cqe *cqe;
163 int ret;
164 for (int i = 0; i < submitted; ++i) {
165 ret = io_uring_wait_cqe(&fRing, &cqe);
166 if (ret < 0) {
167 throw std::runtime_error("wait cqe failed, error: " + std::string(std::strerror(-ret)));
168 }
169 auto index = reinterpret_cast<std::size_t>(io_uring_cqe_get_data(cqe));
170 if (index >= nReads) {
171 throw std::runtime_error("bad cqe user data: " + std::to_string(index));
172 }
173 if (cqe->res < 0) {
174 throw std::runtime_error("batch " + std::to_string(batch) + ": "
175 + "read failed for ReadEvent[" + std::to_string(index) + "], "
176 "error: " + std::string(std::strerror(-cqe->res)));
177 }
178 readEvents[index].fOutBytes = static_cast<std::size_t>(cqe->res);
179 io_uring_cqe_seen(&fRing, cqe);
180 }
181 readPos += batchSize;
182 batch += 1;
183 }
184 return;
185 }
186};
187
188} // namespace Internal
189} // namespace ROOT
190
191#endif
size_t fSize
std::string fBuffer
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:231
static bool CheckIsAvailable()
Definition RIoUring.hxx:29
struct io_uring fRing
Definition RIoUring.hxx:26
RIoUring(std::uint32_t entriesHint)
Definition RIoUring.hxx:67
std::uint32_t GetQueueDepth()
Definition RIoUring.hxx:90
RIoUring(const RIoUring &)=delete
RIoUring & operator=(const RIoUring &)=delete
static bool IsAvailable()
Check if io_uring is available on this system.
Definition RIoUring.hxx:85
void SubmitReadsAndWait(RReadEvent *readEvents, unsigned int nReads)
Submit a number of read events and wait for completion.
Definition RIoUring.hxx:115
struct io_uring * GetRawRing()
Access the raw io_uring instance.
Definition RIoUring.hxx:95
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Basic read event composed of IO data and a target file descriptor.
Definition RIoUring.hxx:100
int fFileDes
The file descriptor.
Definition RIoUring.hxx:110
std::uint64_t fOffset
The file offset.
Definition RIoUring.hxx:104
void * fBuffer
The destination for reading.
Definition RIoUring.hxx:102
std::size_t fOutBytes
The number of actually read bytes, set by the RIoUring instance.
Definition RIoUring.hxx:108
std::size_t fSize
The number of desired bytes.
Definition RIoUring.hxx:106