27namespace MultiProcess {
47 bool dequeue_acknowledged =
true;
50 std::size_t mw_sub_index;
58 sigemptyset(&sigmask);
59 sigaddset(&sigmask, SIGTERM);
68 if (dequeue_acknowledged) {
70 dequeue_acknowledged =
false;
81 bool skip_sub =
false;
83 for (
auto readable_socket : poll_result) {
85 if (readable_socket.first == mw_sub_index) {
92 switch (message_q2w) {
94 dequeue_acknowledged =
true;
98 dequeue_acknowledged =
true;
106 auto job_id_for_state =
124 }
catch (std::logic_error &) {
125 printf(
"worker loop at PID %d got unhandleable ZMQ::ppoll_error_t\n", getpid());
131 printf(
"EINTR in worker loop at PID %d but no SIGTERM received, continuing\n", getpid());
134 printf(
"EAGAIN from ppoll in worker loop at PID %d, continuing\n", getpid());
137 }
catch (zmq::error_t &
e) {
138 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in worker loop at PID %d with errno %d: %s\n", getpid(),
Messenger & messenger() const
static JobManager * instance()
static Job * get_job_object(std::size_t job_object_id)
std::size_t get_state_id()
Get the current state identifier.
virtual void send_back_task_result_from_worker(std::size_t task)=0
virtual void update_state()
Virtual function to update any necessary state on workers.
virtual void evaluate_task(std::size_t task)=0
value_t receive_from_master_on_worker(bool *more=nullptr)
value_t receive_from_queue_on_worker()
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
void send_from_worker_to_queue()
static bool sigterm_received()
Wrapper class for polling ZeroMQ sockets.
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
void worker_loop()
The worker processes' event loop.
bool is_worker_loop_running()
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
static bool worker_loop_running
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...