Manages ZeroMQ sockets and wraps send and receive calls.
This class is used for all interprocess communication between the master, queue and worker processes. It sets up ZeroMQ sockets between all processes over IPC socket files stored in /tmp on the filesystem.
Several sockets are used for communication between different places for different purposes:
process_manager | ProcessManager instance which manages the master, queue and worker processes that we want to set up communication for in this Messenger. |
Definition at line 33 of file Messenger_decl.h.
Public Types | |
enum class | test_rcv_pipes { fromQonM , fromMonQ , fromWonQ , fromQonW } |
enum class | test_snd_pipes { M2Q , Q2M , Q2W , W2Q } |
Public Member Functions | |
Messenger (const ProcessManager &process_manager) | |
~Messenger () | |
std::pair< ZeroMQPoller, std::size_t > | create_queue_poller () |
Helper function that creates a poller for Queue::loop() | |
std::pair< ZeroMQPoller, std::size_t > | create_worker_poller () |
Helper function that creates a poller for worker_loop() | |
template<typename T > | |
void | publish_from_master_to_workers (T &&item) |
specialization that sends the final message | |
template<typename T , typename T2 , typename... Ts> | |
void | publish_from_master_to_workers (T &&item, T2 &&item2, Ts &&... items) |
specialization that queues first parts of multipart messages | |
template<typename value_t > | |
value_t | receive_from_master_on_queue () |
template<typename value_t > | |
value_t | receive_from_master_on_worker (bool *more=nullptr) |
template<typename value_t > | |
value_t | receive_from_queue_on_master () |
template<typename value_t > | |
value_t | receive_from_queue_on_worker () |
template<typename value_t > | |
value_t | receive_from_worker_on_master () |
template<typename value_t > | |
value_t | receive_from_worker_on_queue (std::size_t this_worker_id) |
void | send_from_master_to_queue () |
template<typename T , typename... Ts> | |
void | send_from_master_to_queue (T item, Ts... items) |
void | send_from_queue_to_master () |
template<typename T , typename... Ts> | |
void | send_from_queue_to_master (T item, Ts... items) |
void | send_from_queue_to_worker (std::size_t this_worker_id) |
template<typename T , typename... Ts> | |
void | send_from_queue_to_worker (std::size_t this_worker_id, T item, Ts... items) |
void | send_from_worker_to_master () |
template<typename T , typename... Ts> | |
void | send_from_worker_to_master (T item, Ts... items) |
void | send_from_worker_to_queue () |
template<typename T , typename... Ts> | |
void | send_from_worker_to_queue (T item, Ts... items) |
void | set_send_flag (zmq::send_flags flag) |
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination. | |
void | test_connections (const ProcessManager &process_manager) |
Test whether push-pull sockets are working. | |
void | test_receive (X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id) |
void | test_send (X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id) |
Public Attributes | |
sigset_t | ppoll_sigmask |
Private Member Functions | |
template<class T > | |
void | bindAddr (T &socket, std::string &&addr) |
void | debug_print (std::string s) |
Function called from send and receive template functions in debug builds used to monitor the messages that are going to be sent or are received. | |
Private Attributes | |
std::vector< std::string > | bound_ipc_addresses_ |
bool | close_MQ_on_destruct_ = false |
bool | close_QW_container_on_destruct_ = false |
bool | close_this_QW_on_destruct_ = false |
ZmqLingeringSocketPtr | mq_pull_ |
ZeroMQPoller | mq_pull_poller_ |
ZmqLingeringSocketPtr | mq_push_ |
ZeroMQPoller | mq_push_poller_ |
ZmqLingeringSocketPtr | mw_pub_ |
ZmqLingeringSocketPtr | mw_sub_ |
ZeroMQPoller | mw_sub_poller_ |
std::vector< ZmqLingeringSocketPtr<> > | qw_pull_ |
std::vector< ZeroMQPoller > | qw_pull_poller_ |
std::vector< ZmqLingeringSocketPtr<> > | qw_push_ |
std::vector< ZeroMQPoller > | qw_push_poller_ |
zmq::send_flags | send_flag_ = zmq::send_flags::none |
ZmqLingeringSocketPtr | this_worker_qw_pull_ |
ZmqLingeringSocketPtr | this_worker_qw_push_ |
ZmqLingeringSocketPtr | wm_pull_ |
ZeroMQPoller | wm_pull_poller_ |
ZmqLingeringSocketPtr | wm_push_ |
|
strong |
Enumerator | |
---|---|
fromQonM | |
fromMonQ | |
fromWonQ | |
fromQonW |
Definition at line 47 of file Messenger_decl.h.
|
strong |
Enumerator | |
---|---|
M2Q | |
Q2M | |
Q2W | |
W2Q |
Definition at line 40 of file Messenger_decl.h.
|
explicit |
Definition at line 63 of file Messenger.cxx.
RooFit::MultiProcess::Messenger::~Messenger | ( | ) |
Definition at line 213 of file Messenger.cxx.
|
inlineprivate |
Definition at line 111 of file Messenger_decl.h.
std::pair< ZeroMQPoller, std::size_t > RooFit::MultiProcess::Messenger::create_queue_poller | ( | ) |
Helper function that creates a poller for Queue::loop()
Definition at line 429 of file Messenger.cxx.
std::pair< ZeroMQPoller, std::size_t > RooFit::MultiProcess::Messenger::create_worker_poller | ( | ) |
Helper function that creates a poller for worker_loop()
Definition at line 440 of file Messenger.cxx.
|
private |
Function called from send and receive template functions in debug builds used to monitor the messages that are going to be sent or are received.
By defining this in the implementation file, compilation is a lot faster during debugging of Messenger or communication protocols.
Definition at line 522 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::publish_from_master_to_workers | ( | T && | item | ) |
specialization that sends the final message
Definition at line 150 of file Messenger.h.
void RooFit::MultiProcess::Messenger::publish_from_master_to_workers | ( | T && | item, |
T2 && | item2, | ||
Ts &&... | items | ||
) |
specialization that queues first parts of multipart messages
Definition at line 163 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_master_on_queue |
Definition at line 132 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_master_on_worker | ( | bool * | more = nullptr | ) |
Definition at line 176 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_queue_on_master |
Definition at line 103 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_queue_on_worker |
Definition at line 72 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_worker_on_master |
Definition at line 205 of file Messenger.h.
value_t RooFit::MultiProcess::Messenger::receive_from_worker_on_queue | ( | std::size_t | this_worker_id | ) |
Definition at line 43 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_master_to_queue | ( | ) |
Definition at line 458 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_master_to_queue | ( | T | item, |
Ts... | items | ||
) |
Definition at line 118 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_queue_to_master | ( | ) |
Definition at line 456 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_queue_to_master | ( | T | item, |
Ts... | items | ||
) |
Definition at line 89 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_queue_to_worker | ( | std::size_t | this_worker_id | ) |
Definition at line 452 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_queue_to_worker | ( | std::size_t | this_worker_id, |
T | item, | ||
Ts... | items | ||
) |
Definition at line 58 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_worker_to_master | ( | ) |
Definition at line 468 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_worker_to_master | ( | T | item, |
Ts... | items | ||
) |
Definition at line 191 of file Messenger.h.
void RooFit::MultiProcess::Messenger::send_from_worker_to_queue | ( | ) |
Definition at line 450 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::send_from_worker_to_queue | ( | T | item, |
Ts... | items | ||
) |
Definition at line 29 of file Messenger.h.
void RooFit::MultiProcess::Messenger::set_send_flag | ( | zmq::send_flags | flag | ) |
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
Definition at line 461 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::test_connections | ( | const ProcessManager & | process_manager | ) |
Test whether push-pull sockets are working.
process_manager | ProcessManager object used to instantiate this object. Used to identify which process we are running on and hence which sockets need to be tested. |
Definition at line 351 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::test_receive | ( | X2X | expected_ping_value, |
test_rcv_pipes | rcv_pipe, | ||
std::size_t | worker_id | ||
) |
Definition at line 289 of file Messenger.cxx.
void RooFit::MultiProcess::Messenger::test_send | ( | X2X | ping_value, |
test_snd_pipes | snd_pipe, | ||
std::size_t | worker_id | ||
) |
Definition at line 259 of file Messenger.cxx.
|
private |
Definition at line 147 of file Messenger_decl.h.
|
private |
Definition at line 141 of file Messenger_decl.h.
|
private |
Definition at line 143 of file Messenger_decl.h.
|
private |
Definition at line 142 of file Messenger_decl.h.
|
private |
Definition at line 126 of file Messenger_decl.h.
|
private |
Definition at line 129 of file Messenger_decl.h.
|
private |
Definition at line 119 of file Messenger_decl.h.
|
private |
Definition at line 122 of file Messenger_decl.h.
|
private |
Definition at line 132 of file Messenger_decl.h.
|
private |
Definition at line 133 of file Messenger_decl.h.
|
private |
Definition at line 134 of file Messenger_decl.h.
sigset_t RooFit::MultiProcess::Messenger::ppoll_sigmask |
Definition at line 103 of file Messenger_decl.h.
|
private |
Definition at line 124 of file Messenger_decl.h.
|
private |
Definition at line 128 of file Messenger_decl.h.
|
private |
Definition at line 117 of file Messenger_decl.h.
|
private |
Definition at line 121 of file Messenger_decl.h.
|
private |
Definition at line 145 of file Messenger_decl.h.
|
private |
Definition at line 125 of file Messenger_decl.h.
|
private |
Definition at line 118 of file Messenger_decl.h.
|
private |
Definition at line 137 of file Messenger_decl.h.
|
private |
Definition at line 138 of file Messenger_decl.h.
|
private |
Definition at line 136 of file Messenger_decl.h.