Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Messenger.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
16
17#include <TSystem.h>
18
19#include <csignal> // sigprocmask etc
20
21namespace RooFit {
22namespace MultiProcess {
23
25{
26 int optval = 1;
27 socket->set(zmq::sockopt::immediate, optval);
28}
29
30/** \class Messenger
31 *
32 * \brief Manages ZeroMQ sockets and wraps send and receive calls
33 *
34 * This class is used for all interprocess communication between the master,
35 * queue and worker processes. It sets up ZeroMQ sockets between all processes
36 * over IPC socket files stored in /tmp on the filesystem.
37 *
38 * Several sockets are used for communication between different places for
39 * different purposes:
40 * - Master and queue processes each have a PUSH-PULL socket pair to directly
41 * send/receive data between only the master and queue processes. This is
42 * currently used mainly for sending tasks to the queue from master. The
43 * socket from queue back to master is used only to test connections and may
44 * be removed in the future.
45 * - The queue process also has a PUSH-PULL socket pair with each worker
46 * process. These are used by the workers to obtain tasks from the queue.
47 * - The master has a PUB socket that the workers subscribe to with SUB
48 * sockets. These are used to update state. Note that to ensure robust
49 * reception of all messages on the SUB socket, it's important to send over
50 * state in as little messages as possible. For instance, it's best to send
51 * arrays over in a single big message instead of sending over each element
52 * separately. This also improves performance, since each message has some
53 * fixed overhead.
54 * - Each worker has a PUSH socket connected to a PULL socket on master that
55 * is used to send back task results from workers to master in
56 * 'JobManager::retrieve()'.
57 *
58 * @param process_manager ProcessManager instance which manages the master,
59 * queue and worker processes that we want to set up
60 * communication for in this Messenger.
61 */
62
63Messenger::Messenger(const ProcessManager &process_manager)
64{
65 sigemptyset(&ppoll_sigmask);
66
67 auto makeAddrPrefix = [](pid_t pid) -> std::string {
68 std::string tmpPath = gSystem->TempDirectory();
69 return "ipc://" + tmpPath + "/roofit_" + std::to_string(pid) + "_roofitMP";
70 };
71
72 // high water mark for master-queue sending, which can be quite a busy channel, especially at the start of a run
73 int hwm = 0;
74 // create zmq connections and pollers where necessary
75 // Note: zmq context is automatically created in the ZeroMQSvc class and maintained as singleton.
76 // It is reset in the ProcessManager, if necessary. Do not do that here, see comments in ProcessManager
77 // constructor.
78 try {
79 if (process_manager.is_master()) {
80 auto addrBase = makeAddrPrefix(getpid());
81
82 mq_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
83 mq_push_->set(zmq::sockopt::sndhwm, hwm);
84 bindAddr(mq_push_, addrBase + "_from_master_to_queue");
85
86 mq_push_poller_.register_socket(*mq_push_, zmq::event_flags::pollout);
87
88 mq_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
89 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
90 bindAddr(mq_pull_, addrBase + "_from_queue_to_master");
91
92 mq_pull_poller_.register_socket(*mq_pull_, zmq::event_flags::pollin);
93
94 mw_pub_.reset(zmqSvc().socket_ptr(zmq::socket_type::pub));
95 mw_pub_->set(zmq::sockopt::sndhwm, hwm);
96 bindAddr(mw_pub_, addrBase + "_from_master_to_workers");
97
98 wm_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
99 wm_pull_->set(zmq::sockopt::rcvhwm, hwm);
100 bindAddr(wm_pull_, addrBase + "_from_workers_to_master");
101 wm_pull_poller_.register_socket(*wm_pull_, zmq::event_flags::pollin);
102
104
105 // make sure all subscribers are connected
106 ZmqLingeringSocketPtr<> subscriber_ping_socket{zmqSvc().socket_ptr(zmq::socket_type::pull)};
107 bindAddr(subscriber_ping_socket, addrBase + "_subscriber_ping_socket");
108 ZeroMQPoller subscriber_ping_poller;
109 subscriber_ping_poller.register_socket(*subscriber_ping_socket, zmq::event_flags::pollin);
110 std::size_t N_subscribers_confirmed = 0;
111 while (N_subscribers_confirmed < process_manager.N_workers()) {
112 zmqSvc().send(*mw_pub_, false);
113 auto poll_results = subscriber_ping_poller.poll(0);
114 for (std::size_t ix = 0; ix < poll_results.size(); ++ix) {
115 auto request = zmqSvc().receive<std::string>(*subscriber_ping_socket, zmq::recv_flags::dontwait);
116 assert(request == "present");
117 ++N_subscribers_confirmed;
118 }
119 }
120 zmqSvc().send(*mw_pub_, true);
121
122 } else if (process_manager.is_queue()) {
123 auto addrBase = makeAddrPrefix(getppid());
124
125 // first the queue-worker sockets
126 // do resize instead of reserve so that the unique_ptrs are initialized
127 // (to nullptr) so that we can do reset below, alternatively you can do
128 // push/emplace_back with move or something
129 qw_push_.resize(process_manager.N_workers());
130 qw_pull_.resize(process_manager.N_workers());
131 qw_push_poller_.resize(process_manager.N_workers());
132 qw_pull_poller_.resize(process_manager.N_workers());
133 for (std::size_t ix = 0; ix < process_manager.N_workers(); ++ix) {
134 // push
135 qw_push_[ix].reset(zmqSvc().socket_ptr(zmq::socket_type::push));
136 bindAddr(qw_push_[ix], addrBase + "_from_queue_to_worker_" + std::to_string(ix));
137
138 qw_push_poller_[ix].register_socket(*qw_push_[ix], zmq::event_flags::pollout);
139
140 // pull
141 qw_pull_[ix].reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
142 bindAddr(qw_pull_[ix], addrBase + "_from_worker_" + std::to_string(ix) + "_to_queue");
143
144 qw_pull_poller_[ix].register_socket(*qw_pull_[ix], zmq::event_flags::pollin);
145 }
146
147 // then the master-queue sockets
148 mq_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
149 mq_push_->set(zmq::sockopt::sndhwm, hwm);
150 mq_push_->connect(addrBase + "_from_queue_to_master");
151
152 mq_push_poller_.register_socket(*mq_push_, zmq::event_flags::pollout);
153
154 mq_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
155 mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
156 mq_pull_->connect(addrBase + "_from_master_to_queue");
157
158 mq_pull_poller_.register_socket(*mq_pull_, zmq::event_flags::pollin);
159
162 } else if (process_manager.is_worker()) {
163 auto addrBase = makeAddrPrefix(getppid());
164
165 // we only need one queue-worker pipe on the worker
166 qw_push_poller_.resize(1);
167 qw_pull_poller_.resize(1);
168
169 // push
170 this_worker_qw_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
171 auto addr = addrBase + "_from_worker_" + std::to_string(process_manager.worker_id()) + "_to_queue";
172 this_worker_qw_push_->connect(addr);
173
174 qw_push_poller_[0].register_socket(*this_worker_qw_push_, zmq::event_flags::pollout);
175
176 // pull
177 this_worker_qw_pull_.reset(zmqSvc().socket_ptr(zmq::socket_type::pull));
178 addr = addrBase + "_from_queue_to_worker_" + std::to_string(process_manager.worker_id());
179 this_worker_qw_pull_->connect(addr);
180
181 qw_pull_poller_[0].register_socket(*this_worker_qw_pull_, zmq::event_flags::pollin);
182
183 mw_sub_.reset(zmqSvc().socket_ptr(zmq::socket_type::sub));
184 mw_sub_->set(zmq::sockopt::rcvhwm, hwm);
185 mw_sub_->set(zmq::sockopt::subscribe, "");
186 mw_sub_->connect(addrBase + "_from_master_to_workers");
187 mw_sub_poller_.register_socket(*mw_sub_, zmq::event_flags::pollin);
188
189 wm_push_.reset(zmqSvc().socket_ptr(zmq::socket_type::push));
190 wm_push_->set(zmq::sockopt::sndhwm, hwm);
191 wm_push_->connect(addrBase + "_from_workers_to_master");
192
193 // check publisher connection and then wait until all subscribers are connected
194 ZmqLingeringSocketPtr<> subscriber_ping_socket{zmqSvc().socket_ptr(zmq::socket_type::push)};
195 subscriber_ping_socket->connect(addrBase + "_subscriber_ping_socket");
196 auto all_connected = zmqSvc().receive<bool>(*mw_sub_);
197 zmqSvc().send(*subscriber_ping_socket, "present");
198
199 while (!all_connected) {
200 all_connected = zmqSvc().receive<bool>(*mw_sub_);
201 }
202
204 } else {
205 // should never get here
206 throw std::runtime_error("Messenger ctor: I'm neither master, nor queue, nor a worker");
207 }
208 } catch (zmq::error_t &e) {
209 std::cerr << e.what() << " -- errnum: " << e.num() << std::endl;
210 throw;
211 };
212}
213
215{
217 try {
218 mq_push_.reset();
219 mq_pull_.reset();
220 mw_pub_.reset();
221 wm_pull_.reset();
222 // remove bound files
223 for (const auto &address : bound_ipc_addresses_) {
224 // no need to check return value, they are only zero byte /tmp files, the OS should eventually clean them up
225 remove(address.substr(6).c_str());
226 }
227 } catch (const std::exception &e) {
228 std::cerr << "WARNING: something in Messenger dtor threw an exception! Original exception message:\n"
229 << e.what() << std::endl;
230 }
231 }
233 this_worker_qw_push_.reset();
234 this_worker_qw_pull_.reset();
235 mw_sub_.reset();
236 wm_push_.reset();
237 }
239 for (auto &socket : qw_push_) {
240 socket.reset();
241 }
242 for (auto &socket : qw_pull_) {
243 socket.reset();
244 }
245 }
246 // Dev note: do not call zmqSvc()::close_context from here! The Messenger
247 // is (a member of) a static variable (JobManager) and ZeroMQSvc is static
248 // as well (the singleton returned by zmqSvc()). Because of the "static
249 // destruction order fiasco", it is not guaranteed that ZeroMQSvc singleton
250 // state is still available at time of destruction of the Messenger. Instead
251 // of a compile time error, this will lead to segfaults at runtime when
252 // exiting the program (on some platforms), because even though the ZeroMQSvc
253 // singleton pointer may be overwritten with random data, it will usually
254 // not randomly become nullptr, which means the nullptr check in the getter
255 // will still pass and the randomized pointer will be dereferenced.
256 // Instead, we close context in any new ProcessManager that may be created,
257 // which means the Messenger will get a fresh context anyway.
258}
259
260void Messenger::test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
261{
262 try {
263 switch (snd_pipe) {
264 case test_snd_pipes::M2Q: {
265 send_from_master_to_queue(ping_value);
266 break;
267 }
268 case test_snd_pipes::Q2M: {
269 send_from_queue_to_master(ping_value);
270 break;
271 }
272 case test_snd_pipes::Q2W: {
273 send_from_queue_to_worker(worker_id, ping_value);
274 break;
275 }
276 case test_snd_pipes::W2Q: {
277 send_from_worker_to_queue(ping_value);
278 break;
279 }
280 }
281 } catch (zmq::error_t &e) {
282 if (e.num() == EAGAIN) {
283 throw std::runtime_error("Messenger::test_connections: SEND over master-queue connection timed out!");
284 } else {
285 throw;
286 }
287 }
288}
289
290void Messenger::test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
291{
292 X2X handshake = X2X::initial_value;
293
294 std::size_t max_tries = 3, tries = 0;
295 bool carry_on = true;
296 while (carry_on && (tries++ < max_tries)) {
297 try {
298 switch (rcv_pipe) {
300 handshake = receive_from_master_on_queue<X2X>();
301 break;
302 }
304 handshake = receive_from_queue_on_master<X2X>();
305 break;
306 }
308 handshake = receive_from_queue_on_worker<X2X>();
309 break;
310 }
312 handshake = receive_from_worker_on_queue<X2X>(worker_id);
313 break;
314 }
315 }
316 carry_on = false;
317 } catch (ZMQ::ppoll_error_t &e) {
318 auto response = handle_zmq_ppoll_error(e);
319 if (response == zmq_ppoll_error_response::abort) {
320 throw std::runtime_error("EINTR in test_receive and SIGTERM received, aborting\n");
321 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
322 printf("EINTR in test_receive but no SIGTERM received, try %zu\n", tries);
323 continue;
324 } else if (response == zmq_ppoll_error_response::retry) {
325 printf("EAGAIN in test_receive, try %zu\n", tries);
326 continue;
327 }
328 } catch (zmq::error_t &e) {
329 if (e.num() == EAGAIN) {
330 throw std::runtime_error("Messenger::test_connections: RECEIVE over master-queue connection timed out!");
331 } else {
332 printf("unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
333 e.num(), e.what());
334 throw;
335 }
336 }
337 }
338
339 if (handshake != expected_ping_value) {
340 throw std::runtime_error(
341 "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
342 }
343}
344
345/// \brief Test whether push-pull sockets are working
346///
347/// \note This function tests the PUSH-PULL socket pairs only. The PUB-SUB sockets are already tested in the
348/// constructor.
349///
350/// \param process_manager ProcessManager object used to instantiate this object. Used to identify which process we are
351/// running on and hence which sockets need to be tested.
352void Messenger::test_connections(const ProcessManager &process_manager)
353{
354 if (process_manager.is_queue() || process_manager.is_worker()) {
355 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
356 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
357 sigset_t sigmask;
358 sigemptyset(&sigmask);
359 sigaddset(&sigmask, SIGTERM);
360 int rc = sigprocmask(SIG_BLOCK, &sigmask, &ppoll_sigmask);
361 if (rc < 0) {
362 throw std::runtime_error("sigprocmask failed in test_connections");
363 }
364 }
365
366 if (process_manager.is_master()) {
370 // make sure to always receive last on master, so that master knows when queue is done,
371 // which means workers are done as well, so if master is done everything is done:
373 } else if (process_manager.is_queue()) {
374 ZeroMQPoller poller;
375 std::size_t mq_index;
376 std::tie(poller, mq_index) = create_queue_poller();
377
378 for (std::size_t ix = 0; ix < process_manager.N_workers(); ++ix) {
380 }
382
383 while (!process_manager.sigterm_received() && (poller.size() > 0)) {
384 // poll: wait until status change (-1: infinite timeout)
385 std::vector<std::pair<size_t, zmq::event_flags>> poll_result;
386 bool abort;
387 std::tie(poll_result, abort) = careful_ppoll(poller, ppoll_sigmask);
388 if (abort)
389 break;
390
391 // then process incoming messages from sockets
392 for (auto readable_socket : poll_result) {
393 // message comes from the master/queue socket (first element):
394 if (readable_socket.first == mq_index) {
398 } else { // from a worker socket
399 // TODO: dangerous assumption for this_worker_id, may become invalid if we allow multiple queue_loops on
400 // the same process!
401 auto this_worker_id = readable_socket.first - 1; // TODO: replace with a more reliable lookup
402
405 test_send(X2X::pong, test_snd_pipes::Q2W, this_worker_id);
406
407 poller.unregister_socket(*qw_pull_[this_worker_id]);
408 }
409 }
410 }
412
413 } else if (process_manager.is_worker()) {
418 } else {
419 // should never get here
420 throw std::runtime_error("Messenger::test_connections: I'm neither master, nor queue, nor a worker");
421 }
422
423 if (process_manager.is_queue() || process_manager.is_worker()) {
424 // clean up signal management modifications
425 sigprocmask(SIG_SETMASK, &ppoll_sigmask, nullptr);
426 }
427}
428
429/// Helper function that creates a poller for Queue::loop()
430std::pair<ZeroMQPoller, std::size_t> Messenger::create_queue_poller()
431{
432 ZeroMQPoller poller;
433 std::size_t mq_index = poller.register_socket(*mq_pull_, zmq::event_flags::pollin);
434 for (auto &s : qw_pull_) {
435 poller.register_socket(*s, zmq::event_flags::pollin);
436 }
437 return {std::move(poller), mq_index};
438}
439
440/// Helper function that creates a poller for worker_loop()
441std::pair<ZeroMQPoller, std::size_t> Messenger::create_worker_poller()
442{
443 ZeroMQPoller poller;
444 poller.register_socket(*this_worker_qw_pull_, zmq::event_flags::pollin);
445 std::size_t mw_sub_index = poller.register_socket(*mw_sub_, zmq::event_flags::pollin);
446 return {std::move(poller), mw_sub_index};
447}
448
449// -- WORKER - QUEUE COMMUNICATION --
450
452
453void Messenger::send_from_queue_to_worker(std::size_t /*this_worker_id*/) {}
454
455// -- QUEUE - MASTER COMMUNICATION --
456
458
460
461/// Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination
462void Messenger::set_send_flag(zmq::send_flags flag)
463{
464 send_flag_ = flag;
465}
466
467// -- MASTER - WORKER COMMUNICATION --
468
470
471// for debugging
472#define PROCESS_VAL(p) \
473 case (p): s = #p; break;
474
475std::ostream &operator<<(std::ostream &out, const M2Q value)
476{
477 std::string s;
478 switch (value) {
480 default: s = std::to_string(static_cast<int>(value));
481 }
482 return out << s;
483}
484
485std::ostream &operator<<(std::ostream &out, const W2Q value)
486{
487 std::string s;
488 switch (value) {
490 default: s = std::to_string(static_cast<int>(value));
491 }
492 return out << s;
493}
494
495std::ostream &operator<<(std::ostream &out, const Q2W value)
496{
497 std::string s;
498 switch (value) {
501 default: s = std::to_string(static_cast<int>(value));
502 }
503 return out << s;
504}
505
506std::ostream &operator<<(std::ostream &out, const X2X value)
507{
508 std::string s;
509 switch (value) {
512 default: s = std::to_string(static_cast<int>(value));
513 }
514 return out << s;
515}
516
517#undef PROCESS_VAL
518
519/// Function called from send and receive template functions in debug builds
520/// used to monitor the messages that are going to be sent or are received.
521/// By defining this in the implementation file, compilation is a lot faster
522/// during debugging of Messenger or communication protocols.
523void Messenger::debug_print(std::string /*s*/)
524{
525 // print 's' when debugging
526}
527
528} // namespace MultiProcess
529} // namespace RooFit
#define PROCESS_VAL(p)
#define e(i)
Definition RSha256.hxx:103
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
Definition TBuffer.h:399
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
R__EXTERN TSystem * gSystem
Definition TSystem.h:560
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
Definition ZeroMQSvc.h:80
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
std::vector< ZmqLingeringSocketPtr<> > qw_push_
std::vector< std::string > bound_ipc_addresses_
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
ZmqLingeringSocketPtr wm_push_
std::vector< ZeroMQPoller > qw_pull_poller_
Messenger(const ProcessManager &process_manager)
Definition Messenger.cxx:63
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
ZmqLingeringSocketPtr mw_pub_
void bindAddr(T &socket, std::string &&addr)
ZmqLingeringSocketPtr mq_pull_
ZmqLingeringSocketPtr wm_pull_
std::vector< ZeroMQPoller > qw_push_poller_
ZmqLingeringSocketPtr this_worker_qw_push_
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
ZmqLingeringSocketPtr mw_sub_
void send_from_queue_to_worker(std::size_t this_worker_id)
ZmqLingeringSocketPtr this_worker_qw_pull_
ZmqLingeringSocketPtr mq_push_
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
Fork processes for queue and workers.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Definition TSystem.cxx:1469
Wrapper class for polling ZeroMQ sockets.
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
size_t size() const
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
Definition ZeroMQSvc.h:205
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
Definition ZeroMQSvc.h:159
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
Definition Messenger.cxx:24
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
Definition util.cxx:95
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition JSONIO.h:26