39 std::vector<std::pair<size_t, zmq::event_flags>>
r;
41 throw std::runtime_error(
"No sockets registered");
46 n = zmq::poll(
m_items, std::chrono::milliseconds{timeo});
50 }
catch (
const zmq::error_t &
e) {
51 std::cerr <<
"in ZeroMQPoller::poll on PID " << getpid() <<
": " <<
e.what() << std::endl;
52 if (
e.num() != EINTR) {
58 for (
size_t i = 0; i <
m_items.size(); ++i) {
61 zmq::event_flags flags = zmq::event_flags::none;
67 const zmq::socket_t *s;
70 if (
m_items[i].revents &
short(flags)) {
71 r.emplace_back(index, flags);
91std::vector<std::pair<size_t, zmq::event_flags>>
ZeroMQPoller::ppoll(
int timeo,
const sigset_t *sigmask)
94 throw std::runtime_error(
"No sockets registered");
97 std::vector<std::pair<size_t, zmq::event_flags>>
r;
105 zmq::event_flags flags = zmq::event_flags::none;
106 if (m_item.socket ==
nullptr) {
108 std::tie(index, flags) =
m_fds[m_item.fd];
111 const zmq::socket_t *s;
112 std::tie(index, flags, s) =
m_sockets[m_item.socket];
114 if (m_item.revents &
short(flags)) {
115 r.emplace_back(index, flags);
138 zmq::socket_t *s = &
socket;
141 return std::get<0>(it->second);
167 auto it =
m_fds.find(fd);
168 if (it !=
m_fds.end()) {
169 return std::get<0>(it->second);
176 m_items.push_back({
nullptr, fd,
static_cast<short>(
type), 0});
179 m_fds.emplace(fd, std::make_tuple(index,
type));
194 throw std::out_of_range(
"Socket is not registered");
201 return &
socket == std::get<2>(entry.second);
203 auto index = std::get<0>(it->second);
205 void *it_first = it->first;
210 [&it_first](
const zmq::pollitem_t &item) {
return it_first == item.socket; });
227 if (!
m_fds.count(fd)) {
228 throw std::out_of_range(
"fileno is not registered");
231 auto it =
m_fds.find(fd);
232 auto index = std::get<0>(it->second);
234 int it_first = it->first;
239 [&it_first](
const zmq::pollitem_t &item) {
return it_first == item.fd; });
std::vector< zmq::pollitem_t > m_items
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
int ppoll(zmq_pollitem_t *items_, size_t nitems_, long timeout_, const sigset_t *sigmask_)
Wrapper around zmq_ppoll This function can throw, so wrap in try-catch!