Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
worker.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
15
23
24#include <string>
25#include <unistd.h> // getpid, pid_t
26#include <cerrno> // EINTR
27#include <csignal> // sigprocmask etc
28
29namespace RooFit {
30namespace MultiProcess {
31
32static bool worker_loop_running = false;
33
38
39/// \brief The worker processes' event loop
40///
41/// Asks the queue process for tasks, polls for incoming messages from other
42/// processes and handles them.
44{
45 assert(JobManager::instance()->process_manager().is_worker());
48
49 // use a flag to not ask twice
50 bool dequeue_acknowledged = true;
51
53 std::size_t mw_sub_index;
54
55 std::tie(poller, mw_sub_index) = JobManager::instance()->messenger().create_worker_poller();
56
57 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
58 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
59
60 sigset_t sigmask;
63 sigprocmask(SIG_BLOCK, &sigmask, &JobManager::instance()->messenger().ppoll_sigmask);
64
65 // Before doing anything, check whether we have received a terminate signal while blocking signals!
66 // In this case, we also do that in the while condition.
68 try { // watch for error from ppoll (which is called inside receive functions) caused by SIGTERM from master
69
70 // try to dequeue a task
71 if (dequeue_acknowledged) { // don't ask twice
72 JobManager::instance()->messenger().send_from_worker_to_queue(W2Q::dequeue);
74 }
75
76 // wait for handshake from queue or update from SUB socket
77 auto poll_result = poller.ppoll(-1, &JobManager::instance()->messenger().ppoll_sigmask);
78 // because the poller may now have a waiting update from master over the SUB socket,
79 // but the queue socket could be first in the poll_result vector, and during handling
80 // of a new task it is possible we need to already receive the updated state over SUB,
81 // we have to then flip this boolean so that in the for loop when we reach the SUB
82 // socket's result, we can skip it (otherwise we will hang there, because no more
83 // updated state will be coming):
84 bool skip_sub = false;
85 // then process incoming messages from sockets
86 for (auto readable_socket : poll_result) {
87 // message comes from the master-worker SUB socket (first element):
88 if (readable_socket.first == mw_sub_index) {
89 if (!skip_sub) {
90 auto job_id = JobManager::instance()->messenger().receive_from_master_on_worker<std::size_t>();
91 JobManager::get_job_object(job_id)->update_state();
92 }
93 } else { // from queue socket
94 message_q2w = JobManager::instance()->messenger().receive_from_queue_on_worker<Q2W>();
95 switch (message_q2w) {
98 break;
99 }
102 auto job_id = JobManager::instance()->messenger().receive_from_queue_on_worker<std::size_t>();
103 auto state_id = JobManager::instance()->messenger().receive_from_queue_on_worker<State>();
104 auto task_id = JobManager::instance()->messenger().receive_from_queue_on_worker<Task>();
105
106 // while loop, because multiple jobs may have updated state coming
107 while (state_id != JobManager::get_job_object(job_id)->get_state_id()) {
108 skip_sub = true;
109 auto job_id_for_state =
110 JobManager::instance()->messenger().receive_from_master_on_worker<std::size_t>();
112 }
113 if (RooFit::MultiProcess::Config::getTimingAnalysis()) ProcessTimer::start_timer("worker:eval_task:" + std::to_string(task_id));
114 JobManager::get_job_object(job_id)->evaluate_task(task_id);
115 if (RooFit::MultiProcess::Config::getTimingAnalysis()) ProcessTimer::end_timer("worker:eval_task:" + std::to_string(task_id));
116 JobManager::get_job_object(job_id)->send_back_task_result_from_worker(task_id);
117
118 break;
119 }
120 }
121 }
122 }
123
124 } catch (ZMQ::ppoll_error_t &e) {
126 try {
127 response = handle_zmq_ppoll_error(e);
128 } catch (std::logic_error &) {
129 printf("worker loop at PID %d got unhandleable ZMQ::ppoll_error_t\n", getpid());
130 throw;
131 }
132 if (response == zmq_ppoll_error_response::abort) {
133 break;
134 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
135 printf("EINTR in worker loop at PID %d but no SIGTERM received, continuing\n", getpid());
136 continue;
137 } else if (response == zmq_ppoll_error_response::retry) {
138 printf("EAGAIN from ppoll in worker loop at PID %d, continuing\n", getpid());
139 continue;
140 }
141 } catch (zmq::error_t &e) {
142 printf("unhandled zmq::error_t (not a ppoll_error_t) in worker loop at PID %d with errno %d: %s\n", getpid(),
143 e.num(), e.what());
144 throw;
145 }
146 }
147
149
150 // clean up signal management modifications
151 sigprocmask(SIG_SETMASK, &JobManager::instance()->messenger().ppoll_sigmask, nullptr);
152
153 worker_loop_running = false;
154}
155
156} // namespace MultiProcess
157} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
static bool getTimingAnalysis()
Definition Config.cxx:87
static JobManager * instance()
static Job * get_job_object(std::size_t job_object_id)
static void start_timer(std::string section_name)
static void end_timer(std::string section_name)
Wrapper class for polling ZeroMQ sockets.
void worker_loop()
The worker processes' event loop.
Definition worker.cxx:43
bool is_worker_loop_running()
Definition worker.cxx:34
std::size_t Task
Definition types.h:22
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
static bool worker_loop_running
Definition worker.cxx:32
std::size_t State
Definition types.h:23
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition CodegenImpl.h:64