Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ZeroMQSvc.h
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * RA, Roel Aaij, NIKHEF
5 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
14#ifndef ZEROMQ_IZEROMQSVC_H
15#define ZEROMQ_IZEROMQSVC_H 1
16
17#include <zmq.hpp>
18#include "RooFit_ZMQ/Utility.h"
20
21#include <type_traits>
22#include <string>
23#include <vector>
24#include <sstream>
25#include <ios>
26#include <iostream> // std::cerr
27#include <functional>
28#include <memory>
29
30// debugging
31#include <unistd.h> // getpid
32
33namespace ZMQ {
34
35struct TimeOutException : std::exception {
36 TimeOutException() = default;
38 ~TimeOutException() = default;
40};
41
42struct MoreException : std::exception {
43 MoreException() = default;
44 MoreException(const MoreException &) = default;
45 ~MoreException() = default;
47};
48
49} // namespace ZMQ
50
51template <int PERIOD = 0>
53 void operator()(zmq::socket_t *socket)
54 {
55 int tries = 0;
56 int max_tries = 3;
57 while (true) {
58 try {
59 // the actual work this function should do, plus the delete socket below:
60 if (socket)
61 socket->set(zmq::sockopt::linger, PERIOD);
62 break;
63 } catch (zmq::error_t &e) {
64 if (++tries == max_tries || e.num() == EINVAL || e.num() == ETERM ||
65 e.num() == ENOTSOCK // not recoverable from here
66 ) {
67 std::cerr << "ERROR in ZeroMQSvc::socket: " << e.what() << " (errno: " << e.num() << ")\n";
68 throw;
69 }
70 std::cerr << "RETRY " << tries << "/" << (max_tries - 1)
71 << " in ZmqLingeringSocketPtrDeleter: call interrupted (errno: " << e.num() << ")\n";
72 }
73 }
74
75 delete socket;
76 }
77};
78
79template <int PERIOD = 0>
80using ZmqLingeringSocketPtr = std::unique_ptr<zmq::socket_t, ZmqLingeringSocketPtrDeleter<PERIOD>>;
81
82// We retry send and receive only on EINTR, all other errors are either fatal, or can only
83// be handled at the caller.
84template <typename... args_t>
85auto retry_send(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.send(args...))
86{
87 int tries = 0;
88 while (true) {
89 try {
90 // the actual work this function should do, all the rest is error handling:
91 return socket.send(args...);
92 } catch (zmq::error_t &e) {
93 if (++tries == max_tries || e.num() != EINTR // only recoverable error
94 ) {
95 throw;
96 }
97 std::cerr << "RETRY " << tries << "/" << (max_tries - 1) << " in ZeroMQSvc::send (retry_send) on pid "
98 << getpid() << ": " << e.what() << ")\n";
99 }
100 }
101}
102
103template <typename... args_t>
104auto retry_recv(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.recv(args...))
105{
106 int tries = 0;
107 while (true) {
108 try {
109 // the actual work this function should do, all the rest is error handling:
110 return socket.recv(args...);
111 } catch (zmq::error_t &e) {
112 if (++tries == max_tries || e.num() != EINTR // only recoverable error
113 ) {
114 throw;
115 }
116 std::cerr << "RETRY " << tries << "/" << (max_tries - 1) << " in ZeroMQSvc::recv (retry_recv) on pid "
117 << getpid() << ": " << e.what() << ")\n";
118 }
119 }
120}
121
123 // Note on error handling:
124 // Creating message_t can throw, but only when memory ran out (errno ENOMEM),
125 // and that is something only the caller can fix, so we don't catch it here.
126
127public:
128 enum Encoding { Text = 0, Binary };
129
130 Encoding encoding() const;
131 void setEncoding(const Encoding &e);
132 zmq::context_t &context() const;
133 zmq::socket_t socket(zmq::socket_type type) const;
134 zmq::socket_t *socket_ptr(zmq::socket_type type) const;
135 void close_context() const;
136
137 /// decode message with ZMQ, POD version
138 template <class T, typename std::enable_if<!std::is_pointer<T>::value && ZMQ::Detail::is_trivial<T>::value, T>::type
139 * = nullptr>
140 T decode(const zmq::message_t &msg) const
141 {
142 T object;
143 memcpy(&object, msg.data(), msg.size());
144 return object;
145 }
146
147 /// decode ZMQ message, string version
148 template <class T, typename std::enable_if<std::is_same<T, std::string>::value, T>::type * = nullptr>
149 std::string decode(const zmq::message_t &msg) const
150 {
151 std::string r(msg.size() + 1, char{});
152 r.assign(static_cast<const char *>(msg.data()), msg.size());
153 return r;
154 }
155
156 /// receive message with ZMQ, general version
157 // FIXME: what to do with flags=nullptr.... more is a pointer, that might prevent conversion
158 template <class T, typename std::enable_if<!(std::is_same<zmq::message_t, T>::value), T>::type * = nullptr>
159 T receive(zmq::socket_t &socket, zmq::recv_flags flags = zmq::recv_flags::none, bool *more = nullptr) const
160 {
161 // receive message
162 zmq::message_t msg;
163 auto recv_result = retry_recv(socket, 2, std::ref(msg), flags);
164 if (!recv_result) {
165 throw ZMQ::TimeOutException{};
166 }
167 if (more)
168 *more = msg.more();
169
170 // decode message
171 return decode<T>(msg);
172 }
173
174 /// receive message with ZMQ
175 template <class T, typename std::enable_if<std::is_same<zmq::message_t, T>::value, T>::type * = nullptr>
176 T receive(zmq::socket_t &socket, zmq::recv_flags flags = zmq::recv_flags::none, bool *more = nullptr) const
177 {
178 // receive message
179 zmq::message_t msg;
180 auto recv_result = retry_recv(socket, 2, std::ref(msg), flags);
181 if (!recv_result) {
182 throw ZMQ::TimeOutException{};
183 }
184 if (more)
185 *more = msg.more();
186 return msg;
187 }
188
189 /// encode message to ZMQ
190 template <class T, typename std::enable_if<!std::is_pointer<T>::value && ZMQ::Detail::is_trivial<T>::value, T>::type
191 * = nullptr>
192 zmq::message_t encode(const T &item, std::function<size_t(const T &t)> sizeFun = ZMQ::defaultSizeOf<T>) const
193 {
194 size_t s = sizeFun(item);
195 zmq::message_t msg{s};
196 memcpy((void *)msg.data(), &item, s);
197 return msg;
198 }
199
200 zmq::message_t encode(const char *item) const;
201 zmq::message_t encode(const std::string &item) const;
202
203 /// Send message with ZMQ
204 template <class T, typename std::enable_if<!std::is_same<T, zmq::message_t>::value, T>::type * = nullptr>
205 zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags = zmq::send_flags::none) const
206 {
207 return retry_send(socket, 1, encode(item), flags);
208 }
209
210 zmq::send_result_t
211 send(zmq::socket_t &socket, const char *item, zmq::send_flags flags = zmq::send_flags::none) const;
212 zmq::send_result_t
213 send(zmq::socket_t &socket, zmq::message_t &msg, zmq::send_flags flags = zmq::send_flags::none) const;
214 zmq::send_result_t
215 send(zmq::socket_t &socket, zmq::message_t &&msg, zmq::send_flags flags = zmq::send_flags::none) const;
216
217private:
219 mutable zmq::context_t *m_context = nullptr;
220};
221
223
224#endif // ZEROMQ_IZEROMQSVC_H
#define e(i)
Definition RSha256.hxx:103
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition ZeroMQSvc.h:80
auto retry_send(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.send(args...))
Definition ZeroMQSvc.h:85
auto retry_recv(zmq::socket_t &socket, int max_tries, args_t... args) -> decltype(socket.recv(args...))
Definition ZeroMQSvc.h:104
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
Wrapper class for basic ZeroMQ context and socket management.
Definition ZeroMQSvc.h:122
Encoding m_enc
Definition ZeroMQSvc.h:218
zmq::context_t * m_context
Definition ZeroMQSvc.h:219
zmq::message_t encode(const T &item, std::function< size_t(const T &t)> sizeFun=ZMQ::defaultSizeOf< T >) const
encode message to ZMQ
Definition ZeroMQSvc.h:192
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
Definition ZeroMQSvc.h:205
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
Encoding encoding() const
Definition ZeroMQSvc.cpp:43
T decode(const zmq::message_t &msg) const
decode message with ZMQ, POD version
Definition ZeroMQSvc.h:140
zmq::context_t & context() const
Get context.
Definition ZeroMQSvc.cpp:63
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
Definition ZeroMQSvc.h:159
void close_context() const
void setEncoding(const Encoding &e)
Set encoding mode.
Definition ZeroMQSvc.cpp:53
std::string decode(const zmq::message_t &msg) const
decode ZMQ message, string version
Definition ZeroMQSvc.h:149
MoreException(const MoreException &)=default
MoreException()=default
~MoreException()=default
MoreException & operator=(const MoreException &)=default
TimeOutException(const TimeOutException &)=default
TimeOutException & operator=(const TimeOutException &)=default
~TimeOutException()=default
TimeOutException()=default
void operator()(zmq::socket_t *socket)
Definition ZeroMQSvc.h:53
#define encode(otri)
Definition triangle.c:922