Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Messenger.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
16
17#include <TSystem.h>
18
19#include <csignal> // sigprocmask etc
20
21namespace RooFit {
22namespace MultiProcess {
23
25{
26 int optval = 1;
27 socket->set(zmq::sockopt::immediate, optval);
28}
29
30/** \class Messenger
31 *
32 * \brief Manages ZeroMQ sockets and wraps send and receive calls
33 *
34 * This class is used for all interprocess communication between the master,
35 * queue and worker processes. It sets up ZeroMQ sockets between all processes
36 * over IPC socket files stored in /tmp on the filesystem.
37 *
38 * Several sockets are used for communication between different places for
39 * different purposes:
40 * - Master and queue processes each have a PUSH-PULL socket pair to directly
41 * send/receive data between only the master and queue processes. This is
42 * currently used mainly for sending tasks to the queue from master. The
43 * socket from queue back to master is used only to test connections and may
44 * be removed in the future.
45 * - The queue process also has a PUSH-PULL socket pair with each worker
46 * process. These are used by the workers to obtain tasks from the queue.
47 * - The master has a PUB socket that the workers subscribe to with SUB
48 * sockets. These are used to update state. Note that to ensure robust
49 * reception of all messages on the SUB socket, it's important to send over
50 * state in as little messages as possible. For instance, it's best to send
51 * arrays over in a single big message instead of sending over each element
52 * separately. This also improves performance, since each message has some
53 * fixed overhead.
54 * - Each worker has a PUSH socket connected to a PULL socket on master that
55 * is used to send back task results from workers to master in
56 * 'JobManager::retrieve()'.
57 *
58 * @param process_manager ProcessManager instance which manages the master,
59 * queue and worker processes that we want to set up
60 * communication for in this Messenger.
61 */
62
63Messenger::Messenger(const ProcessManager &process_manager)
64{
65 sigemptyset(&ppoll_sigmask);
66
67 auto makeAddrPrefix = [](pid_t pid) -> std::string {
68 std::string tmpPath = gSystem->TempDirectory();
69 return "ipc://" + tmpPath + "/roofit_" + std::to_string(pid) + "_roofitMP";
70 };
71
72 // high water mark for master-queue sending, which can be quite a busy channel, especially at the start of a run
73 int hwm = 0;
74 // create zmq connections and pollers where necessary
75 // Note: zmq context is automatically created in the ZeroMQSvc class and maintained as singleton.
76 // It is reset in the ProcessManager, if necessary. Do not do that here, see comments in ProcessManager
77 // constructor.
78 try {
79 if (process_manager.is_master()) {
80 auto addrBase = makeAddrPrefix(getpid());
81
82 mq_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
83 mq_push_->set(zmq::sockopt::sndhwm, hwm);
84 bindAddr(mq_push_, addrBase + "_from_master_to_queue");
85
86 mq_push_poller_.register_socket(*mq_push_, zmq::event_flags::pollout);
87
88 mq_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
89 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
90 bindAddr(mq_pull_, addrBase + "_from_queue_to_master");
91
92 mq_pull_poller_.register_socket(*mq_pull_, zmq::event_flags::pollin);
93
94 mw_pub_.reset(zmqSvc().socket_ptr(zmq::socket_type::pub));
95 mw_pub_->set(zmq::sockopt::sndhwm, hwm);
96 bindAddr(mw_pub_, addrBase + "_from_master_to_workers");
97
98 wm_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
99 wm_pull_->set(zmq::sockopt::rcvhwm, hwm);
100 bindAddr(wm_pull_, addrBase + "_from_workers_to_master");
101 wm_pull_poller_.register_socket(*wm_pull_, zmq::event_flags::pollin);
102
104
105 // make sure all subscribers are connected
106 ZmqLingeringSocketPtr<> subscriber_ping_socket{zmqSvc().socket_ptr(zmq::socket_type::pull)};
107 bindAddr(subscriber_ping_socket, addrBase + "_subscriber_ping_socket");
108 ZeroMQPoller subscriber_ping_poller;
109 subscriber_ping_poller.register_socket(*subscriber_ping_socket, zmq::event_flags::pollin);
110 std::size_t N_subscribers_confirmed = 0;
111 while (N_subscribers_confirmed < process_manager.N_workers()) {
112 zmqSvc().send(*mw_pub_, false);
113 auto poll_results = subscriber_ping_poller.poll(0);
114 for (std::size_t ix = 0; ix < poll_results.size(); ++ix) {
115 auto request = zmqSvc().receive<std::string>(*subscriber_ping_socket, zmq::recv_flags::dontwait);
116 assert(request == "present");
117 ++N_subscribers_confirmed;
118 }
119 }
120 zmqSvc().send(*mw_pub_, true);
121
122 } else if (process_manager.is_queue()) {
123 auto addrBase = makeAddrPrefix(getppid());
124
125 // first the queue-worker sockets
126 // do resize instead of reserve so that the unique_ptrs are initialized
127 // (to nullptr) so that we can do reset below, alternatively you can do
128 // push/emplace_back with move or something
129 qw_push_.resize(process_manager.N_workers());
130 qw_pull_.resize(process_manager.N_workers());
131 qw_push_poller_.resize(process_manager.N_workers());
132 qw_pull_poller_.resize(process_manager.N_workers());
133 for (std::size_t ix = 0; ix < process_manager.N_workers(); ++ix) {
134 // push
135 qw_push_[ix].reset(zmqSvc().socket_ptr(zmq::socket_type::push));
136 bindAddr(qw_push_[ix], addrBase + "_from_queue_to_worker_" + std::to_string(ix));
137
138 qw_push_poller_[ix].register_socket(*qw_push_[ix], zmq::event_flags::pollout);
139
140 // pull
141 qw_pull_[ix].reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
142 bindAddr(qw_pull_[ix], addrBase + "_from_worker_" + std::to_string(ix) + "_to_queue");
143
144 qw_pull_poller_[ix].register_socket(*qw_pull_[ix], zmq::event_flags::pollin);
145 }
146
147 // then the master-queue sockets
148 mq_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
149 mq_push_->set(zmq::sockopt::sndhwm, hwm);
150 mq_push_->connect(addrBase + "_from_queue_to_master");
151
152 mq_push_poller_.register_socket(*mq_push_, zmq::event_flags::pollout);
153
154 mq_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
155 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
156 mq_pull_->connect(addrBase + "_from_master_to_queue");
157
158 mq_pull_poller_.register_socket(*mq_pull_, zmq::event_flags::pollin);
159
162 } else if (process_manager.is_worker()) {
163 auto addrBase = makeAddrPrefix(getppid());
164
165 // we only need one queue-worker pipe on the worker
166 qw_push_poller_.resize(1);
167 qw_pull_poller_.resize(1);
168
169 // push
170 this_worker_qw_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
171 auto addr = addrBase + "_from_worker_" + std::to_string(process_manager.worker_id()) + "_to_queue";
172 this_worker_qw_push_->connect(addr);
173
174 qw_push_poller_[0].register_socket(*this_worker_qw_push_, zmq::event_flags::pollout);
175
176 // pull
177 this_worker_qw_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
178 addr = addrBase + "_from_queue_to_worker_" + std::to_string(process_manager.worker_id());
179 this_worker_qw_pull_->connect(addr);
180
181 qw_pull_poller_[0].register_socket(*this_worker_qw_pull_, zmq::event_flags::pollin);
182
183 mw_sub_.reset(zmqSvc().socket_ptr(zmq::socket_type::sub));
184 mw_sub_->set(zmq::sockopt::rcvhwm, hwm);
185 mw_sub_->set(zmq::sockopt::subscribe, "");
186 mw_sub_->connect(addrBase + "_from_master_to_workers");
187 mw_sub_poller_.register_socket(*mw_sub_, zmq::event_flags::pollin);
188
189 wm_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
190 wm_push_->set(zmq::sockopt::sndhwm, hwm);
191 wm_push_->connect(addrBase + "_from_workers_to_master");
192
193 // check publisher connection and then wait until all subscribers are connected
194 ZmqLingeringSocketPtr<> subscriber_ping_socket{zmqSvc().socket_ptr(zmq::socket_type::push)};
195 subscriber_ping_socket->connect(addrBase + "_subscriber_ping_socket");
196 auto all_connected = zmqSvc().receive<bool>(*mw_sub_);
197 zmqSvc().send(*subscriber_ping_socket, "present");
198
199 while (!all_connected) {
200 all_connected = zmqSvc().receive<bool>(*mw_sub_);
201 }
202
204 } else {
205 // should never get here
206 throw std::runtime_error("Messenger ctor: I'm neither master, nor queue, nor a worker");
207 }
208 } catch (zmq::error_t &e) {
209 std::cerr << e.what() << " -- errnum: " << e.num() << std::endl;
210 throw;
211 };
212}
213
215{
217 try {
218 mq_push_.reset();
219 mq_pull_.reset();
220 mw_pub_.reset();
221 wm_pull_.reset();
222 // remove bound files
223 for (const auto &address : bound_ipc_addresses_) {
224 // no need to check return value, they are only zero byte /tmp files, the OS should eventually clean them up
225 remove(address.substr(6).c_str());
226 }
227 } catch (const std::exception &e) {
228 std::cerr << "WARNING: something in Messenger dtor threw an exception! Original exception message:\n"
229 << e.what() << std::endl;
230 }
231 }
233 this_worker_qw_push_.reset();
234 this_worker_qw_pull_.reset();
235 mw_sub_.reset();
236 wm_push_.reset();
237 }
239 for (auto &socket : qw_push_) {
240 socket.reset();
241 }
242 for (auto &socket : qw_pull_) {
243 socket.reset();
244 }
245 }
246 // Dev note: do not call zmqSvc()::close_context from here! The Messenger
247 // is (a member of) a static variable (JobManager) and ZeroMQSvc is static
248 // as well (the singleton returned by zmqSvc()). Because of the "static
249 // destruction order fiasco", it is not guaranteed that ZeroMQSvc singleton
250 // state is still available at time of destruction of the Messenger. Instead
251 // of a compile time error, this will lead to segfaults at runtime when
252 // exiting the program (on some platforms), because even though the ZeroMQSvc
253 // singleton pointer may be overwritten with random data, it will usually
254 // not randomly become nullptr, which means the nullptr check in the getter
255 // will still pass and the randomized pointer will be dereferenced.
256 // Instead, we close context in any new ProcessManager that may be created,
257 // which means the Messenger will get a fresh context anyway.
258}
259
260void Messenger::test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
261{
262 try {
263 switch (snd_pipe) {
264 case test_snd_pipes::M2Q: {
265 send_from_master_to_queue(ping_value);
266 break;
267 }
268 case test_snd_pipes::Q2M: {
269 send_from_queue_to_master(ping_value);
270 break;
271 }
272 case test_snd_pipes::Q2W: {
273 send_from_queue_to_worker(worker_id, ping_value);
274 break;
275 }
276 case test_snd_pipes::W2Q: {
277 send_from_worker_to_queue(ping_value);
278 break;
279 }
280 }
281 } catch (zmq::error_t &e) {
282 if (e.num() == EAGAIN) {
283 throw std::runtime_error("Messenger::test_connections: SEND over master-queue connection timed out!");
284 } else {
285 throw;
286 }
287 }
288}
289
290void Messenger::test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
291{
292 X2X handshake = X2X::initial_value;
293
294 std::size_t max_tries = 3;
295 std::size_t tries = 0;
296 bool carry_on = true;
297 while (carry_on && (tries++ < max_tries)) {
298 try {
299 switch (rcv_pipe) {
301 handshake = receive_from_master_on_queue<X2X>();
302 break;
303 }
305 handshake = receive_from_queue_on_master<X2X>();
306 break;
307 }
309 handshake = receive_from_queue_on_worker<X2X>();
310 break;
311 }
313 handshake = receive_from_worker_on_queue<X2X>(worker_id);
314 break;
315 }
316 }
317 carry_on = false;
318 } catch (ZMQ::ppoll_error_t &e) {
319 auto response = handle_zmq_ppoll_error(e);
320 if (response == zmq_ppoll_error_response::abort) {
321 throw std::runtime_error("EINTR in test_receive and SIGTERM received, aborting\n");
322 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
323 printf("EINTR in test_receive but no SIGTERM received, try %zu\n", tries);
324 continue;
325 } else if (response == zmq_ppoll_error_response::retry) {
326 printf("EAGAIN in test_receive, try %zu\n", tries);
327 continue;
328 }
329 } catch (zmq::error_t &e) {
330 if (e.num() == EAGAIN) {
331 throw std::runtime_error("Messenger::test_connections: RECEIVE over master-queue connection timed out!");
332 } else {
333 printf("unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
334 e.num(), e.what());
335 throw;
336 }
337 }
338 }
339
340 if (handshake != expected_ping_value) {
341 throw std::runtime_error(
342 "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
343 }
344}
345
346/// \brief Test whether push-pull sockets are working
347///
348/// \note This function tests the PUSH-PULL socket pairs only. The PUB-SUB sockets are already tested in the
349/// constructor.
350///
351/// \param process_manager ProcessManager object used to instantiate this object. Used to identify which process we are
352/// running on and hence which sockets need to be tested.
353void Messenger::test_connections(const ProcessManager &process_manager)
354{
355 if (process_manager.is_queue() || process_manager.is_worker()) {
356 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
357 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
358 sigset_t sigmask;
359 sigemptyset(&sigmask);
360 sigaddset(&sigmask, SIGTERM);
361 int rc = sigprocmask(SIG_BLOCK, &sigmask, &ppoll_sigmask);
362 if (rc < 0) {
363 throw std::runtime_error("sigprocmask failed in test_connections");
364 }
365 }
366
367 if (process_manager.is_master()) {
371 // make sure to always receive last on master, so that master knows when queue is done,
372 // which means workers are done as well, so if master is done everything is done:
374 } else if (process_manager.is_queue()) {
375 ZeroMQPoller poller;
376 std::size_t mq_index;
377 std::tie(poller, mq_index) = create_queue_poller();
378
379 for (std::size_t ix = 0; ix < process_manager.N_workers(); ++ix) {
381 }
383
384 while (!process_manager.sigterm_received() && (poller.size() > 0)) {
385 // poll: wait until status change (-1: infinite timeout)
386 std::vector<std::pair<size_t, zmq::event_flags>> poll_result;
387 bool abort;
388 std::tie(poll_result, abort) = careful_ppoll(poller, ppoll_sigmask);
389 if (abort)
390 break;
391
392 // then process incoming messages from sockets
393 for (auto readable_socket : poll_result) {
394 // message comes from the master/queue socket (first element):
395 if (readable_socket.first == mq_index) {
399 } else { // from a worker socket
400 // TODO: dangerous assumption for this_worker_id, may become invalid if we allow multiple queue_loops on
401 // the same process!
402 auto this_worker_id = readable_socket.first - 1; // TODO: replace with a more reliable lookup
403
406 test_send(X2X::pong, test_snd_pipes::Q2W, this_worker_id);
407
408 poller.unregister_socket(*qw_pull_[this_worker_id]);
409 }
410 }
411 }
413
414 } else if (process_manager.is_worker()) {
419 } else {
420 // should never get here
421 throw std::runtime_error("Messenger::test_connections: I'm neither master, nor queue, nor a worker");
422 }
423
424 if (process_manager.is_queue() || process_manager.is_worker()) {
425 // clean up signal management modifications
426 sigprocmask(SIG_SETMASK, &ppoll_sigmask, nullptr);
427 }
428}
429
430/// Helper function that creates a poller for Queue::loop()
431std::pair<ZeroMQPoller, std::size_t> Messenger::create_queue_poller()
432{
433 ZeroMQPoller poller;
434 std::size_t mq_index = poller.register_socket(*mq_pull_, zmq::event_flags::pollin);
435 for (auto &s : qw_pull_) {
436 poller.register_socket(*s, zmq::event_flags::pollin);
437 }
438 return {std::move(poller), mq_index};
439}
440
441/// Helper function that creates a poller for worker_loop()
442std::pair<ZeroMQPoller, std::size_t> Messenger::create_worker_poller()
443{
444 ZeroMQPoller poller;
445 poller.register_socket(*this_worker_qw_pull_, zmq::event_flags::pollin);
446 std::size_t mw_sub_index = poller.register_socket(*mw_sub_, zmq::event_flags::pollin);
447 return {std::move(poller), mw_sub_index};
448}
449
450// -- WORKER - QUEUE COMMUNICATION --
451
453
454void Messenger::send_from_queue_to_worker(std::size_t /*this_worker_id*/) {}
455
456// -- QUEUE - MASTER COMMUNICATION --
457
459
461
462/// Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination
463void Messenger::set_send_flag(zmq::send_flags flag)
464{
465 send_flag_ = flag;
466}
467
468// for debugging
469#define PROCESS_VAL(p) \
470 case (p): s = #p; break;
471
472std::ostream &operator<<(std::ostream &out, const M2Q value)
473{
474 std::string s;
475 switch (value) {
477 default: s = std::to_string(static_cast<int>(value));
478 }
479 return out << s;
480}
481
482std::ostream &operator<<(std::ostream &out, const W2Q value)
483{
484 std::string s;
485 switch (value) {
487 default: s = std::to_string(static_cast<int>(value));
488 }
489 return out << s;
490}
491
492std::ostream &operator<<(std::ostream &out, const Q2W value)
493{
494 std::string s;
495 switch (value) {
498 default: s = std::to_string(static_cast<int>(value));
499 }
500 return out << s;
501}
502
503std::ostream &operator<<(std::ostream &out, const X2X value)
504{
505 std::string s;
506 switch (value) {
509 default: s = std::to_string(static_cast<int>(value));
510 }
511 return out << s;
512}
513
514#undef PROCESS_VAL
515
516/// Function called from send and receive template functions in debug builds
517/// used to monitor the messages that are going to be sent or are received.
518/// By defining this in the implementation file, compilation is a lot faster
519/// during debugging of Messenger or communication protocols.
520void Messenger::debug_print(std::string /*s*/)
521{
522 // print 's' when debugging
523}
524
525} // namespace MultiProcess
526} // namespace RooFit
#define PROCESS_VAL(p)
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:397
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
R__EXTERN TSystem * gSystem
Definition TSystem.h:561
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition ZeroMQSvc.h:74
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
Messenger(const ProcessManager &process_manager)
Definition Messenger.cxx:63
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
Fork processes for queue and workers.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition TSystem.cxx:1482
Wrapper class for polling ZeroMQ sockets.
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
size_t size() const
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
Definition ZeroMQSvc.h:199
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
Definition ZeroMQSvc.h:153
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
Definition Messenger.cxx:24
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
Definition util.cxx:95
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition CodegenImpl.h:64