22namespace MultiProcess {
 
   27   socket->set(zmq::sockopt::immediate, optval);
 
   67   auto makeAddrPrefix = [](pid_t pid) -> std::string {
 
   68      return "ipc://" + 
RooFit::tmpPath() + std::to_string(pid) + 
"_roofitMP";
 
   79         auto addrBase = makeAddrPrefix(getpid());
 
   82         mq_push_->set(zmq::sockopt::sndhwm, hwm);
 
   88         mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
 
   94         mw_pub_->set(zmq::sockopt::sndhwm, hwm);
 
   98         wm_pull_->set(zmq::sockopt::rcvhwm, hwm);
 
  106         bindAddr(subscriber_ping_socket, addrBase + 
"_subscriber_ping_socket");
 
  108         subscriber_ping_poller.
register_socket(*subscriber_ping_socket, zmq::event_flags::pollin);
 
  109         std::size_t N_subscribers_confirmed = 0;
 
  110         while (N_subscribers_confirmed < process_manager.
N_workers()) {
 
  112            auto poll_results = subscriber_ping_poller.
poll(0);
 
  113            for (std::size_t ix = 0; ix < poll_results.size(); ++ix) {
 
  114               auto request = 
zmqSvc().
receive<std::string>(*subscriber_ping_socket, zmq::recv_flags::dontwait);
 
  115               assert(request == 
"present");
 
  116               ++N_subscribers_confirmed;
 
  121      } 
else if (process_manager.
is_queue()) {
 
  122         auto addrBase = makeAddrPrefix(getppid());
 
  132         for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
 
  135            bindAddr(
qw_push_[ix], addrBase + 
"_from_queue_to_worker_" + std::to_string(ix));
 
  141            bindAddr(
qw_pull_[ix], addrBase + 
"_from_worker_" + std::to_string(ix) + 
"_to_queue");
 
  148         mq_push_->set(zmq::sockopt::sndhwm, hwm);
 
  149         mq_push_->connect(addrBase + 
"_from_queue_to_master");
 
  154         mq_pull_->set(zmq::sockopt::rcvhwm, hwm);
 
  155         mq_pull_->connect(addrBase + 
"_from_master_to_queue");
 
  161      } 
else if (process_manager.
is_worker()) {
 
  162         auto addrBase = makeAddrPrefix(getppid());
 
  170         auto addr = addrBase + 
"_from_worker_" + std::to_string(process_manager.
worker_id()) + 
"_to_queue";
 
  177         addr = addrBase + 
"_from_queue_to_worker_" + std::to_string(process_manager.
worker_id());
 
  183         mw_sub_->set(zmq::sockopt::rcvhwm, hwm);
 
  184         mw_sub_->set(zmq::sockopt::subscribe, 
"");
 
  185         mw_sub_->connect(addrBase + 
"_from_master_to_workers");
 
  189         wm_push_->set(zmq::sockopt::sndhwm, hwm);
 
  190         wm_push_->connect(addrBase + 
"_from_workers_to_master");
 
  194         subscriber_ping_socket->connect(addrBase + 
"_subscriber_ping_socket");
 
  196         zmqSvc().
send(*subscriber_ping_socket, 
"present");
 
  198         while (!all_connected) {
 
  205         throw std::runtime_error(
"Messenger ctor: I'm neither master, nor queue, nor a worker");
 
  207   } 
catch (zmq::error_t &
e) {
 
  208      std::cerr << 
e.what() << 
" -- errnum: " << 
e.num() << std::endl;
 
  224            remove(address.substr(6).c_str());
 
  226      } 
catch (
const std::exception &
e) {
 
  227         std::cerr << 
"WARNING: something in Messenger dtor threw an exception! Original exception message:\n" 
  228                   << 
e.what() << std::endl;
 
  280   } 
catch (zmq::error_t &
e) {
 
  281      if (
e.num() == EAGAIN) {
 
  282         throw std::runtime_error(
"Messenger::test_connections: SEND over master-queue connection timed out!");
 
  293   std::size_t max_tries = 3, tries = 0;
 
  294   bool carry_on = 
true;
 
  295   while (carry_on && (tries++ < max_tries)) {
 
  299            handshake = receive_from_master_on_queue<X2X>();
 
  303            handshake = receive_from_queue_on_master<X2X>();
 
  307            handshake = receive_from_queue_on_worker<X2X>();
 
  311            handshake = receive_from_worker_on_queue<X2X>(worker_id);
 
  319            throw std::runtime_error(
"EINTR in test_receive and SIGTERM received, aborting\n");
 
  321            printf(
"EINTR in test_receive but no SIGTERM received, try %zu\n", tries);
 
  324            printf(
"EAGAIN in test_receive, try %zu\n", tries);
 
  327      } 
catch (zmq::error_t &
e) {
 
  328         if (
e.num() == EAGAIN) {
 
  329            throw std::runtime_error(
"Messenger::test_connections: RECEIVE over master-queue connection timed out!");
 
  331            printf(
"unhandled zmq::error_t (not a ppoll_error_t) in Messenger::test_receive with errno %d: %s\n",
 
  338   if (handshake != expected_ping_value) {
 
  339      throw std::runtime_error(
 
  340         "Messenger::test_connections: RECEIVE over master-queue connection failed, did not receive expected value!");
 
  357      sigemptyset(&sigmask);
 
  358      sigaddset(&sigmask, SIGTERM);
 
  361         throw std::runtime_error(
"sigprocmask failed in test_connections");
 
  372   } 
else if (process_manager.
is_queue()) {
 
  374      std::size_t mq_index;
 
  377      for (std::size_t ix = 0; ix < process_manager.
N_workers(); ++ix) {
 
  384         std::vector<std::pair<size_t, zmq::event_flags>> poll_result;
 
  391         for (
auto readable_socket : poll_result) {
 
  393            if (readable_socket.first == mq_index) {
 
  400               auto this_worker_id = readable_socket.first - 1; 
 
  412   } 
else if (process_manager.
is_worker()) {
 
  419      throw std::runtime_error(
"Messenger::test_connections: I'm neither master, nor queue, nor a worker");
 
  436   return {std::move(poller), mq_index};
 
  445   return {std::move(poller), mw_sub_index};
 
  471#define PROCESS_VAL(p) \ 
  472   case (p): s = #p; break; 
  479   default: s = std::to_string(
static_cast<int>(
value));
 
  489   default: s = std::to_string(
static_cast<int>(
value));
 
  500   default: s = std::to_string(
static_cast<int>(
value));
 
  511   default: s = std::to_string(
static_cast<int>(
value));
 
TBuffer & operator<<(TBuffer &buf, const Tmpl *obj)
 
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void value
 
std::unique_ptr< zmq::socket_t, ZmqLingeringSocketPtrDeleter< PERIOD > > ZmqLingeringSocketPtr
 
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
 
void test_receive(X2X expected_ping_value, test_rcv_pipes rcv_pipe, std::size_t worker_id)
 
bool close_QW_container_on_destruct_
 
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
 
std::vector< ZmqLingeringSocketPtr<> > qw_push_
 
std::vector< std::string > bound_ipc_addresses_
 
void set_send_flag(zmq::send_flags flag)
Set the flag used in all send functions; 0, ZMQ_DONTWAIT, ZMQ_SNDMORE or bitwise combination.
 
ZeroMQPoller mq_push_poller_
 
ZmqLingeringSocketPtr wm_push_
 
std::vector< ZeroMQPoller > qw_pull_poller_
 
zmq::send_flags send_flag_
 
void send_from_queue_to_master()
 
Messenger(const ProcessManager &process_manager)
 
std::vector< ZmqLingeringSocketPtr<> > qw_pull_
 
ZmqLingeringSocketPtr mw_pub_
 
void bindAddr(T &socket, std::string &&addr)
 
ZmqLingeringSocketPtr mq_pull_
 
bool close_this_QW_on_destruct_
 
void send_from_worker_to_master()
 
ZmqLingeringSocketPtr wm_pull_
 
std::vector< ZeroMQPoller > qw_push_poller_
 
void send_from_master_to_queue()
 
ZeroMQPoller mw_sub_poller_
 
ZmqLingeringSocketPtr this_worker_qw_push_
 
void debug_print(std::string s)
Function called from send and receive template functions in debug builds used to monitor the messages...
 
ZeroMQPoller mq_pull_poller_
 
bool close_MQ_on_destruct_
 
ZmqLingeringSocketPtr mw_sub_
 
void send_from_queue_to_worker(std::size_t this_worker_id)
 
ZmqLingeringSocketPtr this_worker_qw_pull_
 
ZmqLingeringSocketPtr mq_push_
 
void test_send(X2X ping_value, test_snd_pipes snd_pipe, std::size_t worker_id)
 
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
 
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
 
void send_from_worker_to_queue()
 
ZeroMQPoller wm_pull_poller_
 
Fork processes for queue and workers.
 
std::size_t N_workers() const
 
static bool sigterm_received()
 
std::size_t worker_id() const
 
Wrapper class for polling ZeroMQ sockets.
 
size_t register_socket(zmq::socket_t &socket, zmq::event_flags type)
Register socket to poll.
 
size_t unregister_socket(zmq::socket_t &socket)
Unregister socket from poller.
 
std::vector< std::pair< size_t, zmq::event_flags > > poll(int timeo=-1)
Poll the sockets.
 
zmq::send_result_t send(zmq::socket_t &socket, const T &item, zmq::send_flags flags=zmq::send_flags::none) const
Send message with ZMQ.
 
zmq::socket_t * socket_ptr(zmq::socket_type type) const
Create and return a new socket by pointer.
 
T receive(zmq::socket_t &socket, zmq::recv_flags flags=zmq::recv_flags::none, bool *more=nullptr) const
receive message with ZMQ, general version
 
void set_socket_immediate(ZmqLingeringSocketPtr<> &socket)
 
std::tuple< std::vector< std::pair< size_t, zmq::event_flags > >, bool > careful_ppoll(ZeroMQPoller &poller, const sigset_t &ppoll_sigmask, std::size_t max_tries=2)
 
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
 
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
 
std::string const & tmpPath()
Returns the path to the directory that should be used for temporary RooFit files (e....