Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
worker.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
15
21
22#include <unistd.h> // getpid, pid_t
23#include <cerrno> // EINTR
24#include <csignal> // sigprocmask etc
25
26namespace RooFit {
27namespace MultiProcess {
28
29static bool worker_loop_running = false;
30
32{
34}
35
36/// \brief The worker processes' event loop
37///
38/// Asks the queue process for tasks, polls for incoming messages from other
39/// processes and handles them.
41{
42 assert(JobManager::instance()->process_manager().is_worker());
44 Q2W message_q2w;
45
46 // use a flag to not ask twice
47 bool dequeue_acknowledged = true;
48
49 ZeroMQPoller poller;
50 std::size_t mw_sub_index;
51
52 std::tie(poller, mw_sub_index) = JobManager::instance()->messenger().create_worker_poller();
53
54 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
55 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
56
57 sigset_t sigmask;
58 sigemptyset(&sigmask);
59 sigaddset(&sigmask, SIGTERM);
60 sigprocmask(SIG_BLOCK, &sigmask, &JobManager::instance()->messenger().ppoll_sigmask);
61
62 // Before doing anything, check whether we have received a terminate signal while blocking signals!
63 // In this case, we also do that in the while condition.
65 try { // watch for error from ppoll (which is called inside receive functions) caused by SIGTERM from master
66
67 // try to dequeue a task
68 if (dequeue_acknowledged) { // don't ask twice
70 dequeue_acknowledged = false;
71 }
72
73 // wait for handshake from queue or update from SUB socket
74 auto poll_result = poller.ppoll(-1, &JobManager::instance()->messenger().ppoll_sigmask);
75 // because the poller may now have a waiting update from master over the SUB socket,
76 // but the queue socket could be first in the poll_result vector, and during handling
77 // of a new task it is possible we need to already receive the updated state over SUB,
78 // we have to then flip this boolean so that in the for loop when we reach the SUB
79 // socket's result, we can skip it (otherwise we will hang there, because no more
80 // updated state will be coming):
81 bool skip_sub = false;
82 // then process incoming messages from sockets
83 for (auto readable_socket : poll_result) {
84 // message comes from the master-worker SUB socket (first element):
85 if (readable_socket.first == mw_sub_index) {
86 if (!skip_sub) {
87 auto job_id = JobManager::instance()->messenger().receive_from_master_on_worker<std::size_t>();
89 }
90 } else { // from queue socket
92 switch (message_q2w) {
94 dequeue_acknowledged = true;
95 break;
96 }
98 dequeue_acknowledged = true;
99 auto job_id = JobManager::instance()->messenger().receive_from_queue_on_worker<std::size_t>();
102
103 // while loop, because multiple jobs may have updated state coming
104 while (state_id != JobManager::get_job_object(job_id)->get_state_id()) {
105 skip_sub = true;
106 auto job_id_for_state =
108 JobManager::get_job_object(job_id_for_state)->update_state();
109 }
110
113
114 break;
115 }
116 }
117 }
118 }
119
120 } catch (ZMQ::ppoll_error_t &e) {
122 try {
123 response = handle_zmq_ppoll_error(e);
124 } catch (std::logic_error &) {
125 printf("worker loop at PID %d got unhandleable ZMQ::ppoll_error_t\n", getpid());
126 throw;
127 }
128 if (response == zmq_ppoll_error_response::abort) {
129 break;
130 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
131 printf("EINTR in worker loop at PID %d but no SIGTERM received, continuing\n", getpid());
132 continue;
133 } else if (response == zmq_ppoll_error_response::retry) {
134 printf("EAGAIN from ppoll in worker loop at PID %d, continuing\n", getpid());
135 continue;
136 }
137 } catch (zmq::error_t &e) {
138 printf("unhandled zmq::error_t (not a ppoll_error_t) in worker loop at PID %d with errno %d: %s\n", getpid(),
139 e.num(), e.what());
140 throw;
141 }
142 }
143 // clean up signal management modifications
144 sigprocmask(SIG_SETMASK, &JobManager::instance()->messenger().ppoll_sigmask, nullptr);
145
146 worker_loop_running = false;
147}
148
149} // namespace MultiProcess
150} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
static JobManager * instance()
static Job * get_job_object(std::size_t job_object_id)
std::size_t get_state_id()
Get the current state identifier.
Definition Job.cxx:145
virtual void send_back_task_result_from_worker(std::size_t task)=0
virtual void update_state()
Virtual function to update any necessary state on workers.
Definition Job.cxx:142
virtual void evaluate_task(std::size_t task)=0
value_t receive_from_master_on_worker(bool *more=nullptr)
Definition Messenger.h:176
std::pair< ZeroMQPoller, std::size_t > create_worker_poller()
Helper function that creates a poller for worker_loop()
Wrapper class for polling ZeroMQ sockets.
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
void worker_loop()
The worker processes' event loop.
Definition worker.cxx:40
bool is_worker_loop_running()
Definition worker.cxx:31
std::size_t State
Definition types.h:23
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
static bool worker_loop_running
Definition worker.cxx:29
std::size_t Task
Definition types.h:22
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18