14#ifndef ZEROMQ_IZEROMQSVC_H
15#define ZEROMQ_IZEROMQSVC_H 1
51template <
int PERIOD = 0>
61 socket->set(zmq::sockopt::linger, PERIOD);
63 }
catch (zmq::error_t &
e) {
64 if (++tries == max_tries ||
e.num() == EINVAL ||
e.num() == ETERM ||
67 std::cerr <<
"ERROR in ZeroMQSvc::socket: " <<
e.what() <<
" (errno: " <<
e.num() <<
")\n";
70 std::cerr <<
"RETRY " << tries <<
"/" << (max_tries - 1)
71 <<
" in ZmqLingeringSocketPtrDeleter: call interrupted (errno: " <<
e.num() <<
")\n";
79template <
int PERIOD = 0>
84template <
typename... args_t>
91 return socket.send(args...);
92 }
catch (zmq::error_t &
e) {
93 if (++tries == max_tries ||
e.num() != EINTR
97 std::cerr <<
"RETRY " << tries <<
"/" << (max_tries - 1) <<
" in ZeroMQSvc::send (retry_send) on pid "
98 << getpid() <<
": " <<
e.what() <<
")\n";
103template <
typename... args_t>
110 return socket.recv(args...);
111 }
catch (zmq::error_t &
e) {
112 if (++tries == max_tries ||
e.num() != EINTR
116 std::cerr <<
"RETRY " << tries <<
"/" << (max_tries - 1) <<
" in ZeroMQSvc::recv (retry_recv) on pid "
117 << getpid() <<
": " <<
e.what() <<
")\n";
132 zmq::context_t &
context()
const;
133 zmq::socket_t
socket(zmq::socket_type
type)
const;
140 T
decode(
const zmq::message_t &msg)
const
143 memcpy(&
object, msg.data(), msg.size());
148 template <class T, typename std::enable_if<std::is_same<T, std::string>::value, T>
::type * =
nullptr>
149 std::string
decode(
const zmq::message_t &msg)
const
151 std::string
r(msg.size() + 1,
char{});
152 r.assign(
static_cast<const char *
>(msg.data()), msg.size());
158 template <
class T,
typename std::enable_if<!(std::is_same<zmq::message_t, T>::value), T>::type * =
nullptr>
159 T
receive(zmq::socket_t &
socket, zmq::recv_flags flags = zmq::recv_flags::none,
bool *more =
nullptr)
const
171 return decode<T>(msg);
175 template <class T, typename std::enable_if<std::is_same<zmq::message_t, T>::value, T>
::type * =
nullptr>
176 T
receive(zmq::socket_t &
socket, zmq::recv_flags flags = zmq::recv_flags::none,
bool *more =
nullptr)
const
192 zmq::message_t
encode(
const T &item, std::function<
size_t(
const T &t)> sizeFun = ZMQ::defaultSizeOf<T>)
const
194 size_t s = sizeFun(item);
195 zmq::message_t msg{s};
196 memcpy((
void *)msg.data(), &item, s);
200 zmq::message_t
encode(
const char *item)
const;
201 zmq::message_t
encode(
const std::string &item)
const;
204 template <class T, typename std::enable_if<!std::is_same<T, zmq::message_t>::value, T>
::type * =
nullptr>
205 zmq::send_result_t
send(zmq::socket_t &
socket,
const T &item, zmq::send_flags flags = zmq::send_flags::none)
const
211 send(zmq::socket_t &
socket,
const char *item, zmq::send_flags flags = zmq::send_flags::none)
const;
213 send(zmq::socket_t &
socket, zmq::message_t &msg, zmq::send_flags flags = zmq::send_flags::none)
const;
215 send(zmq::socket_t &
socket, zmq::message_t &&msg, zmq::send_flags flags = zmq::send_flags::none)
const;
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
auto retry_send(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.send(args...))
auto retry_recv(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.recv(args...))
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Wrapper class for basic ZeroMQ context and socket management.
zmq::context_t * m_context
zmq::message_t encode(const T &item, std::function< size_t(const T &t)> sizeFun=ZMQ::defaultSizeOf< T >) const
encode message to ZMQ
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
Encoding encoding() const
T decode(const zmq::message_t &msg) const
decode message with ZMQ, POD version
zmq::context_t & context() const
Get context.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
void close_context() const
void setEncoding(const Encoding &e)
Set encoding mode.
std::string decode(const zmq::message_t &msg) const
decode ZMQ message, string version
MoreException(const MoreException &)=default
MoreException & operator=(const MoreException &)=default
TimeOutException(const TimeOutException &)=default
TimeOutException & operator=(const TimeOutException &)=default
~TimeOutException()=default
TimeOutException()=default
void operator()(zmq::socket_t *socket)