Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Messenger.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
16
17#include <RooFit/Common.h>
18
19#include <csignal> // sigprocmask etc
20
21namespace RooFit {
22namespace MultiProcess {
23
25{
26 int optval = 1;
27 socket->set(zmq::sockopt::immediate, optval);
28}
29
30/** \class Messenger
31 *
32 * \brief Manages ZeroMQ sockets and wraps send and receive calls
33 *
34 * This class is used for all interprocess communication between the master,
35 * queue and worker processes. It sets up ZeroMQ sockets between all processes
36 * over IPC socket files stored in /tmp on the filesystem.
37 *
38 * Several sockets are used for communication between different places for
39 * different purposes:
40 * - Master and queue processes each have a PUSH-PULL socket pair to directly
41 * send/receive data between only the master and queue processes. This is
42 * currently used mainly for sending tasks to the queue from master. The
43 * socket from queue back to master is used only to test connections and may
44 * be removed in the future.
45 * - The queue process also has a PUSH-PULL socket pair with each worker
46 * process. These are used by the workers to obtain tasks from the queue.
47 * - The master has a PUB socket that the workers subscribe to with SUB
48 * sockets. These are used to update state. Note that to ensure robust
49 * reception of all messages on the SUB socket, it's important to send over
50 * state in as little messages as possible. For instance, it's best to send
51 * arrays over in a single big message instead of sending over each element
52 * separately. This also improves performance, since each message has some
53 * fixed overhead.
54 * - Each worker has a PUSH socket connected to a PULL socket on master that
55 * is used to send back task results from workers to master in
56 * 'JobManager::retrieve()'.
57 *
58 * @param process_manager ProcessManager instance which manages the master,
59 * queue and worker processes that we want to set up
60 * communication for in this Messenger.
61 */
62
63Messenger::Messenger(const ProcessManager &process_manager)
64{
65 sigemptyset(&ppoll_sigmask);
66
67 auto makeAddrPrefix = [](pid_t pid) -> std::string {
68 return "ipc://" + RooFit::tmpPath() + std::to_string(pid) + "_roofitMP";
69 };
70
71 // high water mark for master-queue sending, which can be quite a busy channel, especially at the start of a run
72 int hwm = 0;
73 // create zmq connections and pollers where necessary
74 // Note: zmq context is automatically created in the ZeroMQSvc class and maintained as singleton.
75 // It is reset in the ProcessManager, if necessary. Do not do that here, see comments in ProcessManager
76 // constructor.
77 try {
78 if (process_manager.is_master()) {
79 auto addrBase = makeAddrPrefix(getpid());
80
81 mq_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
82 mq_push_->set(zmq::sockopt::sndhwm, hwm);
83 bindAddr(mq_push_, addrBase + "_from_master_to_queue");
84
85 mq_push_poller_.register_socket(*mq_push_, zmq::event_flags::pollout);
86
87 mq_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
88 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
89 bindAddr(mq_pull_, addrBase + "_from_queue_to_master");
90
91 mq_pull_poller_.register_socket(*mq_pull_, zmq::event_flags::pollin);
92
93 mw_pub_.reset(zmqSvc().socket_ptr(zmq::socket_type::pub));
94 mw_pub_->set(zmq::sockopt::sndhwm, hwm);
95 bindAddr(mw_pub_, addrBase + "_from_master_to_workers");
96
97 wm_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
98 wm_pull_->set(zmq::sockopt::rcvhwm, hwm);
99 bindAddr(wm_pull_, addrBase + "_from_workers_to_master");
100 wm_pull_poller_.register_socket(*wm_pull_, zmq::event_flags::pollin);
101
103
104 // make sure all subscribers are connected
105 ZmqLingeringSocketPtr<> subscriber_ping_socket{zmqSvc().socket_ptr(zmq::socket_type::pull)};
106 bindAddr(subscriber_ping_socket, addrBase + "_subscriber_ping_socket");
107 ZeroMQPoller subscriber_ping_poller;
108 subscriber_ping_poller.register_socket(*subscriber_ping_socket, zmq::event_flags::pollin);
109 std::size_t N_subscribers_confirmed = 0;
110 while (N_subscribers_confirmed < process_manager.N_workers()) {
111 zmqSvc().send(*mw_pub_, false);
112 auto poll_results = subscriber_ping_poller.poll(0);
113 for (std::size_t ix = 0; ix < poll_results.size(); ++ix) {
114 auto request = zmqSvc().receive<std::string>(*subscriber_ping_socket, zmq::recv_flags::dontwait);
115 assert(request == "present");
116 ++N_subscribers_confirmed;
117 }
118 }
119 zmqSvc().send(*mw_pub_, true);
120
121 } else if (process_manager.is_queue()) {
122 auto addrBase = makeAddrPrefix(getppid());
123
124 // first the queue-worker sockets
125 // do resize instead of reserve so that the unique_ptrs are initialized
126 // (to nullptr) so that we can do reset below, alternatively you can do
127 // push/emplace_back with move or something
128 qw_push_.resize(process_manager.N_workers());
129 qw_pull_.resize(process_manager.N_workers());
130 qw_push_poller_.resize(process_manager.N_workers());
131 qw_pull_poller_.resize(process_manager.N_workers());
132 for (std::size_t ix = 0; ix < process_manager.N_workers(); ++ix) {
133 // push
134 qw_push_[ix].reset(zmqSvc().socket_ptr(zmq::socket_type::push));
135 bindAddr(qw_push_[ix], addrBase + "_from_queue_to_worker_" + std::to_string(ix));
136
137 qw_push_poller_[ix].register_socket(*qw_push_[ix], zmq::event_flags::pollout);
138
139 // pull
140 qw_pull_[ix].reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
141 bindAddr(qw_pull_[ix], addrBase + "_from_worker_" + std::to_string(ix) + "_to_queue");
142
143 qw_pull_poller_[ix].register_socket(*qw_pull_[ix], zmq::event_flags::pollin);
144 }
145
146 // then the master-queue sockets
147 mq_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
148 mq_push_->set(zmq::sockopt::sndhwm, hwm);
149 mq_push_->connect(addrBase + "_from_queue_to_master");
150
151 mq_push_poller_.register_socket(*mq_push_, zmq::event_flags::pollout);
152
153 mq_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
154 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
155 mq_pull_->connect(addrBase + "_from_master_to_queue");
156
157 mq_pull_poller_.register_socket(*mq_pull_, zmq::event_flags::pollin);
158
161 } else if (process_manager.is_worker()) {
162 auto addrBase = makeAddrPrefix(getppid());
163
164 // we only need one queue-worker pipe on the worker
165 qw_push_poller_.resize(1);
166 qw_pull_poller_.resize(1);
167
168 // push
169 this_worker_qw_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
170 auto addr = addrBase + "_from_worker_" + std::to_string(process_manager.worker_id()) + "_to_queue";
171 this_worker_qw_push_->connect(addr);
172
173 qw_push_poller_[0].register_socket(*this_worker_qw_push_, zmq::event_flags::pollout);
174
175 // pull
176 this_worker_qw_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
177 addr = addrBase + "_from_queue_to_worker_" + std::to_string(process_manager.worker_id());
178 this_worker_qw_pull_->connect(addr);
179
180 qw_pull_poller_[0].register_socket(*this_worker_qw_pull_, zmq::event_flags::pollin);
181
182 mw_sub_.reset(zmqSvc().socket_ptr(zmq::socket_type::sub));
183 mw_sub_->set(zmq::sockopt::rcvhwm, hwm);
184 mw_sub_->set(zmq::sockopt::subscribe, "");
185 mw_sub_->connect(addrBase + "_from_master_to_workers");
186 mw_sub_poller_.register_socket(*mw_sub_, zmq::event_flags::pollin);
187
188 wm_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
189 wm_push_->set(zmq::sockopt::sndhwm, hwm);
190 wm_push_->connect(addrBase + "_from_workers_to_master");
191
192 // check publisher connection and then wait until all subscribers are connected
193 ZmqLingeringSocketPtr<> subscriber_ping_socket{zmqSvc().socket_ptr(zmq::socket_type::push)};
194 subscriber_ping_socket->connect(addrBase + "_subscriber_ping_socket");
195 auto all_connected = zmqSvc().receive<bool>(*mw_sub_);
196 zmqSvc().send(*subscriber_ping_socket, "present");
197
198 while (!all_connected) {
199 all_connected = zmqSvc().receive<bool>(*mw_sub_);
200 }
201
203 } else {
204 // should never get here
205 throw std::runtime_error("Messenger ctor: I'm neither master, nor queue, nor a worker");
206 }
207 } catch (zmq::error_t &e) {
208 std::cerr << e.what() << " -- errnum: " << e.num() << std::endl;
209 throw;
210 };
211}
212
214{
216 try {
217 mq_push_.reset(nullptr);
218 mq_pull_.reset(nullptr);
219 mw_pub_.reset(nullptr);
220 wm_pull_.reset(nullptr);
221 // remove bound files
222 for (const auto &address : bound_ipc_addresses_) {
223 // no need to check return value, they are only zero byte /tmp files, the OS should eventually clean them up
224 remove(address.substr(6).c_str());
225 }
226 } catch (const std::exception &e) {
227 std::cerr << "WARNING: something in Messenger dtor threw an exception! Original exception message:\n"
228 << e.what() << std::endl;
229 }
230 }
232 this_worker_qw_push_.reset(nullptr);
233 this_worker_qw_pull_.reset(nullptr);
234 mw_sub_.reset(nullptr);
235 wm_push_.reset(nullptr);
236 }
238 for (auto &socket : qw_push_) {
239 socket.reset(nullptr);
240 }
241 for (auto &socket : qw_pull_) {
242 socket.reset(nullptr);
243 }
244 }
245 // Dev note: do not call zmqSvc()::close_context from here! The Messenger
246 // is (a member of) a static variable (JobManager) and ZeroMQSvc is static
247 // as well (the singleton returned by zmqSvc()). Because of the "static
248 // destruction order fiasco", it is not guaranteed that ZeroMQSvc singleton
249 // state is still available at time of destruction of the Messenger. Instead
250 // of a compile time error, this will lead to segfaults at runtime when
251 // exiting the program (on some platforms), because even though the ZeroMQSvc
252 // singleton pointer may be overwritten with random data, it will usually
253 // not randomly become nullptr, which means the nullptr check in the getter
254 // will still pass and the randomized pointer will be dereferenced.
255 // Instead, we close context in any new ProcessManager that may be created,
256 // which means the Messenger will get a fresh context anyway.
257}
258
259void Messenger::test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
260{
261 try {
262 switch (snd_pipe) {
263 case test_snd_pipes::M2Q: {
264 send_from_master_to_queue(ping_value);
265 break;
266 }
267 case test_snd_pipes::Q2M: {
268 send_from_queue_to_master(ping_value);
269 break;
270 }
271 case test_snd_pipes::Q2W: {
272 send_from_queue_to_worker(worker_id, ping_value);
273 break;
274 }
275 case test_snd_pipes::W2Q: {
276 send_from_worker_to_queue(ping_value);
277 break;
278 }
279 }
280 } catch (zmq::error_t &e) {
281 if (e.num() == EAGAIN) {
282 throw std::runtime_error("Messenger::test_connections: SEND over master-queue connection timed out!");
283 } else {
284 throw;
285 }
286 }
287}
288
289void Messenger::test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
290{
291 X2X handshake = X2X::initial_value;
292
293 std::size_t max_tries = 3, tries = 0;
294 bool carry_on = true;
295 while (carry_on && (tries++ < max_tries)) {
296 try {
297 switch (rcv_pipe) {
299 handshake = receive_from_master_on_queue<X2X>();
300 break;
301 }
303 handshake = receive_from_queue_on_master<X2X>();
304 break;
305 }
307 handshake = receive_from_queue_on_worker<X2X>();
308 break;
309 }
311 handshake = receive_from_worker_on_queue<X2X>(worker_id);
312 break;
313 }
314 }
315 carry_on = false;
316 } catch (ZMQ::ppoll_error_t &e) {
317 auto response = handle_zmq_ppoll_error(e);
318 if (response == zmq_ppoll_error_response::abort) {
319 throw std::runtime_error("EINTR in test_receive and SIGTERM received, aborting\n");
320 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
321 printf("EINTR in test_receive but no SIGTERM received, try %zu\n", tries);
322 continue;
323 } else if (response == zmq_ppoll_error_response::retry) {
324 printf("EAGAIN in test_receive, try %zu\n", tries);
325 continue;
326 }
327 } catch (zmq::error_t &e) {
328 if (e.num() == EAGAIN) {
329 throw std::runtime_error("Messenger::test_connections: RECEIVE over master-queue connection timed out!");
330 } else {
331 printf("unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
332 e.num(), e.what());
333 throw;
334 }
335 }
336 }
337
338 if (handshake != expected_ping_value) {
339 throw std::runtime_error(
340 "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
341 }
342}
343
344/// \brief Test whether push-pull sockets are working
345///
346/// \note This function tests the PUSH-PULL socket pairs only. The PUB-SUB sockets are already tested in the
347/// constructor.
348///
349/// \param process_manager ProcessManager object used to instantiate this object. Used to identify which process we are
350/// running on and hence which sockets need to be tested.
351void Messenger::test_connections(const ProcessManager &process_manager)
352{
353 if (process_manager.is_queue() || process_manager.is_worker()) {
354 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
355 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
356 sigset_t sigmask;
357 sigemptyset(&sigmask);
358 sigaddset(&sigmask, SIGTERM);
359 int rc = sigprocmask(SIG_BLOCK, &sigmask, &ppoll_sigmask);
360 if (rc < 0) {
361 throw std::runtime_error("sigprocmask failed in test_connections");
362 }
363 }
364
365 if (process_manager.is_master()) {
369 // make sure to always receive last on master, so that master knows when queue is done,
370 // which means workers are done as well, so if master is done everything is done:
372 } else if (process_manager.is_queue()) {
373 ZeroMQPoller poller;
374 std::size_t mq_index;
375 std::tie(poller, mq_index) = create_queue_poller();
376
377 for (std::size_t ix = 0; ix < process_manager.N_workers(); ++ix) {
379 }
381
382 while (!process_manager.sigterm_received() && (poller.size() > 0)) {
383 // poll: wait until status change (-1: infinite timeout)
384 std::vector<std::pair<size_t, zmq::event_flags>> poll_result;
385 bool abort;
386 std::tie(poll_result, abort) = careful_ppoll(poller, ppoll_sigmask);
387 if (abort)
388 break;
389
390 // then process incoming messages from sockets
391 for (auto readable_socket : poll_result) {
392 // message comes from the master/queue socket (first element):
393 if (readable_socket.first == mq_index) {
397 } else { // from a worker socket
398 // TODO: dangerous assumption for this_worker_id, may become invalid if we allow multiple queue_loops on
399 // the same process!
400 auto this_worker_id = readable_socket.first - 1; // TODO: replace with a more reliable lookup
401
404 test_send(X2X::pong, test_snd_pipes::Q2W, this_worker_id);
405
406 poller.unregister_socket(*qw_pull_[this_worker_id]);
407 }
408 }
409 }
411
412 } else if (process_manager.is_worker()) {
417 } else {
418 // should never get here
419 throw std::runtime_error("Messenger::test_connections: I'm neither master, nor queue, nor a worker");
420 }
421
422 if (process_manager.is_queue() || process_manager.is_worker()) {
423 // clean up signal management modifications
424 sigprocmask(SIG_SETMASK, &ppoll_sigmask, nullptr);
425 }
426}
427
428/// Helper function that creates a poller for Queue::loop()
429std::pair<ZeroMQPoller, std::size_t> Messenger::create_queue_poller()
430{
431 ZeroMQPoller poller;
432 std::size_t mq_index = poller.register_socket(*mq_pull_, zmq::event_flags::pollin);
433 for (auto &s : qw_pull_) {
434 poller.register_socket(*s, zmq::event_flags::pollin);
435 }
436 return {std::move(poller), mq_index};
437}
438
439/// Helper function that creates a poller for worker_loop()
440std::pair<ZeroMQPoller, std::size_t> Messenger::create_worker_poller()
441{
442 ZeroMQPoller poller;
443 poller.register_socket(*this_worker_qw_pull_, zmq::event_flags::pollin);
444 std::size_t mw_sub_index = poller.register_socket(*mw_sub_, zmq::event_flags::pollin);
445 return {std::move(poller), mw_sub_index};
446}
447
448// -- WORKER - QUEUE COMMUNICATION --
449
451
452void Messenger::send_from_queue_to_worker(std::size_t /*this_worker_id*/) {}
453
454// -- QUEUE - MASTER COMMUNICATION --
455
457
459
460/// Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination
461void Messenger::set_send_flag(zmq::send_flags flag)
462{
463 send_flag_ = flag;
464}
465
466// -- MASTER - WORKER COMMUNICATION --
467
469
470// for debugging
471#define PROCESS_VAL(p) \
472 case (p): s = #p; break;
473
474std::ostream &operator<<(std::ostream &out, const M2Q value)
475{
476 std::string s;
477 switch (value) {
479 default: s = std::to_string(static_cast<int>(value));
480 }
481 return out << s;
482}
483
484std::ostream &operator<<(std::ostream &out, const W2Q value)
485{
486 std::string s;
487 switch (value) {
489 default: s = std::to_string(static_cast<int>(value));
490 }
491 return out << s;
492}
493
494std::ostream &operator<<(std::ostream &out, const Q2W value)
495{
496 std::string s;
497 switch (value) {
500 default: s = std::to_string(static_cast<int>(value));
501 }
502 return out << s;
503}
504
505std::ostream &operator<<(std::ostream &out, const X2X value)
506{
507 std::string s;
508 switch (value) {
511 default: s = std::to_string(static_cast<int>(value));
512 }
513 return out << s;
514}
515
516#undef PROCESS_VAL
517
518/// Function called from send and receive template functions in debug builds
519/// used to monitor the messages that are going to be sent or are received.
520/// By defining this in the implementation file, compilation is a lot faster
521/// during debugging of Messenger or communication protocols.
522void Messenger::debug_print(std::string /*s*/)
523{
524 // print 's' when debugging
525}
526
527} // namespace MultiProcess
528} // namespace RooFit
#define PROCESS_VAL(p)
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:399
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition ZeroMQSvc.h:80
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
Messenger(const ProcessManager &process_manager)
Definition Messenger.cxx:63
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
Fork processes for queue and workers.
Wrapper class for polling ZeroMQ sockets.
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
size_t size() const
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
Definition ZeroMQSvc.h:205
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
Definition ZeroMQSvc.h:159
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
Definition Messenger.cxx:24
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
Definition util.cxx:95
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18
std::string const & tmpPath()
Returns the path to the directory that should be used for temporary RooFit files (e....
Definition Common.cxx:24