22namespace MultiProcess {
27 socket->set(zmq::sockopt::immediate, optval);
67 auto makeAddrPrefix = [](pid_t pid) -> std::string {
68 return "ipc://" +
RooFit::tmpPath() + std::to_string(pid) +
"_roofitMP";
79 auto addrBase = makeAddrPrefix(getpid());
82 mq_push_->set(zmq::sockopt::sndhwm, hwm);
88 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
94 mw_pub_->set(zmq::sockopt::sndhwm, hwm);
98 wm_pull_->set(zmq::sockopt::rcvhwm, hwm);
106 bindAddr(subscriber_ping_socket, addrBase +
"_subscriber_ping_socket");
108 subscriber_ping_poller.
register_socket(*subscriber_ping_socket, zmq::event_flags::pollin);
109 std::size_t N_subscribers_confirmed = 0;
110 while (N_subscribers_confirmed < process_manager.
N_workers()) {
112 auto poll_results = subscriber_ping_poller.
poll(0);
113 for (std::size_t ix = 0; ix < poll_results.size(); ++ix) {
114 auto request =
zmqSvc().
receive<std::string>(*subscriber_ping_socket, zmq::recv_flags::dontwait);
115 assert(request ==
"present");
116 ++N_subscribers_confirmed;
121 }
else if (process_manager.
is_queue()) {
122 auto addrBase = makeAddrPrefix(getppid());
132 for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
135 bindAddr(
qw_push_[ix], addrBase +
"_from_queue_to_worker_" + std::to_string(ix));
141 bindAddr(
qw_pull_[ix], addrBase +
"_from_worker_" + std::to_string(ix) +
"_to_queue");
148 mq_push_->set(zmq::sockopt::sndhwm, hwm);
149 mq_push_->connect(addrBase +
"_from_queue_to_master");
154 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
155 mq_pull_->connect(addrBase +
"_from_master_to_queue");
161 }
else if (process_manager.
is_worker()) {
162 auto addrBase = makeAddrPrefix(getppid());
170 auto addr = addrBase +
"_from_worker_" + std::to_string(process_manager.
worker_id()) +
"_to_queue";
177 addr = addrBase +
"_from_queue_to_worker_" + std::to_string(process_manager.
worker_id());
183 mw_sub_->set(zmq::sockopt::rcvhwm, hwm);
184 mw_sub_->set(zmq::sockopt::subscribe,
"");
185 mw_sub_->connect(addrBase +
"_from_master_to_workers");
189 wm_push_->set(zmq::sockopt::sndhwm, hwm);
190 wm_push_->connect(addrBase +
"_from_workers_to_master");
194 subscriber_ping_socket->connect(addrBase +
"_subscriber_ping_socket");
196 zmqSvc().
send(*subscriber_ping_socket,
"present");
198 while (!all_connected) {
205 throw std::runtime_error(
"Messenger ctor: I'm neither master, nor queue, nor a worker");
207 }
catch (zmq::error_t &
e) {
208 std::cerr <<
e.what() <<
" -- errnum: " <<
e.num() << std::endl;
224 remove(address.substr(6).c_str());
226 }
catch (
const std::exception &
e) {
227 std::cerr <<
"WARNING: something in Messenger dtor threw an exception! Original exception message:\n"
228 <<
e.what() << std::endl;
280 }
catch (zmq::error_t &
e) {
281 if (
e.num() == EAGAIN) {
282 throw std::runtime_error(
"Messenger::test_connections: SEND over master-queue connection timed out!");
293 std::size_t max_tries = 3, tries = 0;
294 bool carry_on =
true;
295 while (carry_on && (tries++ < max_tries)) {
299 handshake = receive_from_master_on_queue<X2X>();
303 handshake = receive_from_queue_on_master<X2X>();
307 handshake = receive_from_queue_on_worker<X2X>();
311 handshake = receive_from_worker_on_queue<X2X>(worker_id);
319 throw std::runtime_error(
"EINTR in test_receive and SIGTERM received, aborting\n");
321 printf(
"EINTR in test_receive but no SIGTERM received, try %zu\n", tries);
324 printf(
"EAGAIN in test_receive, try %zu\n", tries);
327 }
catch (zmq::error_t &
e) {
328 if (
e.num() == EAGAIN) {
329 throw std::runtime_error(
"Messenger::test_connections: RECEIVE over master-queue connection timed out!");
331 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
338 if (handshake != expected_ping_value) {
339 throw std::runtime_error(
340 "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
357 sigemptyset(&sigmask);
358 sigaddset(&sigmask, SIGTERM);
361 throw std::runtime_error(
"sigprocmask failed in test_connections");
372 }
else if (process_manager.
is_queue()) {
374 std::size_t mq_index;
377 for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
384 std::vector<std::pair<size_t, zmq::event_flags>> poll_result;
391 for (
auto readable_socket : poll_result) {
393 if (readable_socket.first == mq_index) {
400 auto this_worker_id = readable_socket.first - 1;
412 }
else if (process_manager.
is_worker()) {
419 throw std::runtime_error(
"Messenger::test_connections: I'm neither master, nor queue, nor a worker");
436 return {std::move(poller), mq_index};
445 return {std::move(poller), mw_sub_index};
471#define PROCESS_VAL(p) \
472 case (p): s = #p; break;
479 default: s = std::to_string(
static_cast<int>(value));
489 default: s = std::to_string(
static_cast<int>(value));
500 default: s = std::to_string(
static_cast<int>(value));
511 default: s = std::to_string(
static_cast<int>(value));
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
bool close_QW_container_on_destruct_
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
ZeroMQPoller mq_push_poller_
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
zmq::send_flags send_flag_
void send_from_queue_to_master()
Messenger(const ProcessManager &process_manager)
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
bool close_this_QW_on_destruct_
void send_from_worker_to_master()
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
void send_from_master_to_queue()
ZeroMQPoller mw_sub_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
ZeroMQPoller mq_pull_poller_
bool close_MQ_on_destruct_
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
void send_from_worker_to_queue()
ZeroMQPoller wm_pull_poller_
Fork processes for queue and workers.
std::size_t N_workers() const
static bool sigterm_received()
std::size_t worker_id() const
Wrapper class for polling ZeroMQ sockets.
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
std::string const & tmpPath()
Returns the path to the directory that should be used for temporary RooFit files (e....