Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ZeroMQPoller.cpp
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * RA, Roel Aaij, NIKHEF
5 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
15
16#include "RooFit_ZMQ/ppoll.h"
17#include <iostream>
18
19/** \class ZeroMQPoller
20 * \brief Wrapper class for polling ZeroMQ sockets
21 *
22 * This class simplifies calls to poll or ppoll ZeroMQ sockets. It stores the
23 * list of sockets to be polled, which means they don't have to be separately
24 * carried around by the user. It also parses output and returns an easily
25 * digestible vector of events.
26 */
27
28/**
29 * \brief Poll the sockets
30 *
31 * \param[in] timeo Timeout in milliseconds. 0 means return immediately. -1 means wait for an event indefinitely.
32 * \return A vector of pairs of index and flags; index is the index of the registered fd or socket and flags are 0 (no
33 * events), ZMQ_POLLIN or ZMQ_POLLOUT.
34 *
35 * \note This function can throw (from inside zmq::poll), so wrap in try-catch!
36 */
37std::vector<std::pair<size_t, zmq::event_flags>> ZeroMQPoller::poll(int timeo)
38{
39 std::vector<std::pair<size_t, zmq::event_flags>> r;
40 if (m_items.empty()) {
41 throw std::runtime_error("No sockets registered");
42 }
43 int n = 0;
44 while (true) {
45 try {
46 n = zmq::poll(m_items, std::chrono::milliseconds{timeo});
47 if (n == 0)
48 return r;
49 break;
50 } catch (const zmq::error_t &e) {
51 std::cerr << "in ZeroMQPoller::poll on PID " << getpid() << ": " << e.what() << std::endl;
52 if (e.num() != EINTR) {
53 throw;
54 }
55 }
56 }
57 // TODO: replace this with ranges::v3::zip
58 for (size_t i = 0; i < m_items.size(); ++i) {
59 void *socket = m_items[i].socket;
60 size_t index = 0;
61 zmq::event_flags flags = zmq::event_flags::none;
62 if (socket == nullptr) {
63 // an fd was registered
64 std::tie(index, flags) = m_fds[m_items[i].fd];
65 } else {
66 // a socket was registered
67 const zmq::socket_t *s;
68 std::tie(index, flags, s) = m_sockets[socket];
69 }
70 if (m_items[i].revents & short(flags)) {
71 r.emplace_back(index, flags);
72 }
73 }
74 return r;
75}
76
77/**
78 * \brief Poll the sockets with ppoll
79 *
80 * By polling with ppoll instead of poll, one can pass along a signal mask to
81 * handle POSIX signals properly. See the zmq_ppoll documentation for examples
82 * of when this is useful: http://api.zeromq.org/
83 *
84 * \param[in] timeo Timeout in milliseconds. 0 means return immediately. -1 means wait for an event indefinitely.
85 * \param[in] sigmask A non-NULL pointer to a signal mask must be constructed and passed to 'sigmask'. See the man page
86 * of sigprocmask(2) for more details on this. \return A vector of pairs of index and flags; index is the index of the
87 * registered fd or socket and flags are 0 (no events), ZMQ_POLLIN or ZMQ_POLLOUT.
88 *
89 * \note This function can throw (from inside ZMQ::ppoll), so wrap in try-catch!
90 */
91std::vector<std::pair<size_t, zmq::event_flags>> ZeroMQPoller::ppoll(int timeo, const sigset_t *sigmask)
92{
93 if (m_items.empty()) {
94 throw std::runtime_error("No sockets registered");
95 }
96
97 std::vector<std::pair<size_t, zmq::event_flags>> r;
98
99 auto n = ZMQ::ppoll(m_items, timeo, sigmask);
100 if (n == 0)
101 return r;
102
103 for (auto &m_item : m_items) {
104 size_t index = 0;
105 zmq::event_flags flags = zmq::event_flags::none;
106 if (m_item.socket == nullptr) {
107 // an fd was registered
108 std::tie(index, flags) = m_fds[m_item.fd];
109 } else {
110 // a socket was registered
111 const zmq::socket_t *s;
112 std::tie(index, flags, s) = m_sockets[m_item.socket];
113 }
114 if (m_item.revents & short(flags)) {
115 r.emplace_back(index, flags);
116 }
117 }
118 return r;
119}
120
121size_t ZeroMQPoller::size() const
122{
123 return m_items.size();
124}
125
126/**
127 * \brief Register socket to poll
128 *
129 * Adds the socket to the internal list of sockets to poll.
130 *
131 * \param[in] socket Socket to register.
132 * \param[in] type Type of events to poll for. Can be ZMQ_POLLIN, ZMQ_POLLOUT or a bit-wise combination of the two.
133 * \return The index of the socket in the poller's internal list. Can be used to match with indices returned from
134 * (p)poll.
135 */
136size_t ZeroMQPoller::register_socket(zmq::socket_t &socket, zmq::event_flags type)
137{
138 zmq::socket_t *s = &socket;
139 auto it = m_sockets.find(s);
140 if (it != m_sockets.end()) {
141 return std::get<0>(it->second);
142 }
143 size_t index = m_free.empty() ? m_items.size() : m_free.front();
144 if (!m_free.empty())
145 m_free.pop_front();
146 // NOTE: this uses the conversion-to-void* operator of
147 // zmq::socket_t, which returns the wrapped object
148 m_items.push_back({socket, 0, static_cast<short>(type), 0});
149
150 // We need to lookup by the pointer to the object wrapped by zmq::socket_t
151 m_sockets.emplace(m_items.back().socket, std::make_tuple(index, type, s));
152 return index;
153}
154
155/**
156 * \brief Register socket to poll
157 *
158 * Adds the socket to the internal list of sockets to poll.
159 *
160 * \param[in] fd File descriptor of socket to register.
161 * \param[in] type Type of events to poll for. Can be ZMQ_POLLIN, ZMQ_POLLOUT or a bit-wise combination of the two.
162 * \return The index of the socket in the poller's internal list. Can be used to match with indices returned from
163 * (p)poll.
164 */
165size_t ZeroMQPoller::register_socket(int fd, zmq::event_flags type)
166{
167 auto it = m_fds.find(fd);
168 if (it != m_fds.end()) {
169 return std::get<0>(it->second);
170 }
171 size_t index = m_free.empty() ? m_items.size() : m_free.front();
172 if (!m_free.empty())
173 m_free.pop_front();
174 // NOTE: this uses the conversion-to-void* operator of
175 // zmq::socket_t, which returns the wrapped object
176 m_items.push_back({nullptr, fd, static_cast<short>(type), 0});
177
178 // We need to lookup by the pointer to the object wrapped by zmq::socket_t
179 m_fds.emplace(fd, std::make_tuple(index, type));
180 return index;
181}
182
183/**
184 * \brief Unregister socket from poller
185 *
186 * Removes the socket from the internal list of sockets to poll.
187 *
188 * \param[in] socket Socket to unregister.
189 * \return The index of the socket in the poller's internal list before removal.
190 */
192{
193 if (!m_sockets.count(socket.operator void *())) {
194 throw std::out_of_range("Socket is not registered");
195 }
196 // Remove from m_sockets
197 // Can't search by the key of m_sockets, as that is the wrapped
198 // object, but have to use the pointer to the wrapper
199 // (zmq::socket_t)
200 auto it = std::find_if(begin(m_sockets), end(m_sockets), [&socket](const decltype(m_sockets)::value_type &entry) {
201 return &socket == std::get<2>(entry.second);
202 });
203 auto index = std::get<0>(it->second);
204 m_free.push_back(index);
205 void *it_first = it->first;
206 m_sockets.erase(it);
207
208 // Remove from m_items
209 auto iit = std::find_if(begin(m_items), end(m_items),
210 [&it_first](const zmq::pollitem_t &item) { return it_first == item.socket; });
211 assert(iit != end(m_items));
212 m_items.erase(iit);
213
214 return index;
215}
216
217/**
218 * \brief Unregister socket from poller
219 *
220 * Removes the socket from the internal list of sockets to poll.
221 *
222 * \param[in] fd File descriptor of socket to unregister.
223 * \return The index of the socket in the poller's internal list before removal.
224 */
226{
227 if (!m_fds.count(fd)) {
228 throw std::out_of_range("fileno is not registered");
229 }
230 // Remove from m_fds
231 auto it = m_fds.find(fd);
232 auto index = std::get<0>(it->second);
233 m_free.push_back(index);
234 int it_first = it->first;
235 m_fds.erase(it);
236
237 // Remove from m_items
238 auto iit = std::find_if(begin(m_items), end(m_items),
239 [&it_first](const zmq::pollitem_t &item) { return it_first == item.fd; });
240 assert(iit != end(m_items));
241 m_items.erase(iit);
242
243 return index;
244}
ROOT::R::TRInterface & r
Definition Object.C:4
#define e(i)
Definition RSha256.hxx:103
int type
Definition TGX11.cxx:121
std::vector< zmq::pollitem_t > m_items
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
size_t size() const
sockets_t m_sockets
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
const Int_t n
Definition legend1.C:16
int ppoll(zmq_pollitem_t *items_, size_t nitems_, long timeout_, const sigset_t *sigmask_)
Wrapper around zmq_ppoll This function can throw, so wrap in try-catch!
Definition ppoll.cpp:19