21namespace MultiProcess {
69 throw std::logic_error(
"calling Communicator::to_master_queue from slave process");
82 JobTask job_task{job_object_id, state_id, task_id};
97 bool popped =
pop(job_task);
118 std::size_t mq_index;
125 sigemptyset(&sigmask);
126 sigaddset(&sigmask, SIGTERM);
136 for (
auto readable_socket : poll_result) {
138 if (readable_socket.first == mq_index) {
142 auto this_worker_id = readable_socket.first - 1;
151 }
catch (std::logic_error &) {
152 printf(
"queue loop got unhandleable ZMQ::ppoll_error_t\n");
158 printf(
"EINTR in queue loop but no SIGTERM received, continuing\n");
161 printf(
"EAGAIN from ppoll in queue loop, continuing\n");
164 }
catch (zmq::error_t &
e) {
165 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in queue loop with errno %d: %s\n",
e.num(),
e.what());
Messenger & messenger() const
static JobManager * instance()
void send_from_master_to_queue()
value_t receive_from_master_on_queue()
value_t receive_from_worker_on_queue(std::size_t this_worker_id)
void send_from_queue_to_worker(std::size_t this_worker_id)
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
static bool sigterm_received()
void loop()
The queue process's event loop.
void add(JobTask job_task)
Enqueue a task.
void process_master_message(M2Q message)
Helper function for 'Queue::loop()'.
bool pop(JobTask &job_task)
Have a worker ask for a task-message from the queue.
std::size_t N_tasks_at_workers_
void process_worker_message(std::size_t this_worker_id, W2Q message)
Helper function for 'Queue::loop()'.
std::queue< JobTask > queue_
Wrapper class for polling ZeroMQ sockets.
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
combined job_object, state and task identifier type