Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
Queue.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
17#include "RooFit/MultiProcess/Job.h" // complete Job object for JobManager::get_job_object()
19
20namespace RooFit {
21namespace MultiProcess {
22
23/** \class Queue
24 * \brief Keeps a queue of tasks for workers and manages the queue process through its event loop
25 *
26 * The Queue maintains a set of tasks on the queue process by receiving them
27 * from the master process. Worker processes can request to pop them off the
28 * queue. The communication between these processes is handled inside
29 * 'Queue::loop()', the queue process's event loop that polls the Messenger's
30 * sockets for incoming messages and handles them when they come.
31 *
32 * The reason for this class is to get automatic load balancing between
33 * workers. By allowing workers to request tasks whenever they are ready to
34 * do work, we don't need to manually distribute work over workers and they
35 * will always have something to do until all tasks have been completed.
36 * The alternative simple strategy of just distributing all tasks evenly over
37 * workers will be suboptimal when tasks have different or even varying
38 * runtimes (this simple strategy could be implemented with a PUSH-PULL
39 * ZeroMQ socket from master to workers, which would distribute tasks in a
40 * round-robin fashion, which, indeed, does not do load balancing).
41 */
42
43/// Have a worker ask for a task-message from the queue
44///
45/// \param[out] job_task JobTask reference to put the Job ID and the task index into.
46/// \return true if a task was popped from the queue successfully, false if the queue was empty.
47bool Queue::pop(JobTask &job_task)
48{
49 if (queue_.empty()) {
50 return false;
51 } else {
52 job_task = queue_.front();
53 queue_.pop();
54 return true;
55 }
56}
57
58/// Enqueue a task
59///
60/// \param[in] job_task JobTask object that contains the Job ID and the task index.
61void Queue::add(JobTask job_task)
62{
63 if (JobManager::instance()->process_manager().is_master()) {
65 job_task.task_id);
66 } else if (JobManager::instance()->process_manager().is_queue()) {
67 queue_.push(job_task);
68 } else {
69 throw std::logic_error("calling Communicator::to_master_queue from slave process");
70 }
71}
72
73/// Helper function for 'Queue::loop()'
75{
76 switch (message) {
77 case M2Q::enqueue: {
78 // enqueue task
79 auto job_object_id = JobManager::instance()->messenger().receive_from_master_on_queue<std::size_t>();
82 JobTask job_task{job_object_id, state_id, task_id};
83 add(job_task);
84 N_tasks_++;
85 break;
86 }
87 }
88}
89
90/// Helper function for 'Queue::loop()'
91void Queue::process_worker_message(std::size_t this_worker_id, W2Q message)
92{
93 switch (message) {
94 case W2Q::dequeue: {
95 // dequeue task
96 JobTask job_task;
97 bool popped = pop(job_task);
98 if (popped) {
99 // Note: below two commands should be run atomically for thread safety (if that ever becomes an issue)
101 this_worker_id, Q2W::dequeue_accepted, job_task.job_id, job_task.state_id, job_task.task_id);
103 } else {
105 }
106 break;
107 }
108 }
109}
110
111/// \brief The queue process's event loop
112///
113/// Polls for incoming messages from other processes and handles them.
115{
116 assert(JobManager::instance()->process_manager().is_queue());
117 ZeroMQPoller poller;
118 std::size_t mq_index;
119 std::tie(poller, mq_index) = JobManager::instance()->messenger().create_queue_poller();
120
121 // Before blocking SIGTERM, set the signal handler, so we can also check after blocking whether a signal occurred
122 // In our case, we already set it in the ProcessManager after forking to the queue and worker processes.
123
124 sigset_t sigmask;
125 sigemptyset(&sigmask);
126 sigaddset(&sigmask, SIGTERM);
127 sigprocmask(SIG_BLOCK, &sigmask, &JobManager::instance()->messenger().ppoll_sigmask);
128
129 // Before doing anything, check whether we have received a terminate signal while blocking signals!
130 // In this case, we also do that in the while condition.
132 try { // watch for zmq_error from ppoll caused by SIGTERM from master
133 // poll: wait until status change (-1: infinite timeout)
134 auto poll_result = poller.ppoll(-1, &JobManager::instance()->messenger().ppoll_sigmask);
135 // then process incoming messages from sockets
136 for (auto readable_socket : poll_result) {
137 // message comes from the master/queue socket (first element):
138 if (readable_socket.first == mq_index) {
140 process_master_message(message);
141 } else { // from a worker socket
142 auto this_worker_id = readable_socket.first - 1;
143 auto message = JobManager::instance()->messenger().receive_from_worker_on_queue<W2Q>(this_worker_id);
144 process_worker_message(this_worker_id, message);
145 }
146 }
147 } catch (ZMQ::ppoll_error_t &e) {
149 try {
150 response = handle_zmq_ppoll_error(e);
151 } catch (std::logic_error &) {
152 printf("queue loop got unhandleable ZMQ::ppoll_error_t\n");
153 throw;
154 }
155 if (response == zmq_ppoll_error_response::abort) {
156 break;
157 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
158 printf("EINTR in queue loop but no SIGTERM received, continuing\n");
159 continue;
160 } else if (response == zmq_ppoll_error_response::retry) {
161 printf("EAGAIN from ppoll in queue loop, continuing\n");
162 continue;
163 }
164 } catch (zmq::error_t &e) {
165 printf("unhandled zmq::error_t (not a ppoll_error_t) in queue loop with errno %d: %s\n", e.num(), e.what());
166 throw;
167 }
168 }
169
170 // clean up signal management modifications
171 sigprocmask(SIG_SETMASK, &JobManager::instance()->messenger().ppoll_sigmask, nullptr);
172}
173
174} // namespace MultiProcess
175} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
static JobManager * instance()
value_t receive_from_worker_on_queue(std::size_t this_worker_id)
Definition Messenger.h:43
void send_from_queue_to_worker(std::size_t this_worker_id)
std::pair< ZeroMQPoller, std::size_t > create_queue_poller()
Helper function that creates a poller for Queue::loop()
void loop()
The queue process's event loop.
Definition Queue.cxx:114
void add(JobTask job_task)
Enqueue a task.
Definition Queue.cxx:61
void process_master_message(M2Q message)
Helper function for 'Queue::loop()'.
Definition Queue.cxx:74
bool pop(JobTask &job_task)
Have a worker ask for a task-message from the queue.
Definition Queue.cxx:47
std::size_t N_tasks_at_workers_
Definition Queue.h:37
void process_worker_message(std::size_t this_worker_id, W2Q message)
Helper function for 'Queue::loop()'.
Definition Queue.cxx:91
std::queue< JobTask > queue_
Definition Queue.h:35
Wrapper class for polling ZeroMQ sockets.
std::vector< std::pair< size_t, zmq::event_flags > > ppoll(int timeo, const sigset_t *sigmask)
Poll the sockets with ppoll.
std::size_t State
Definition types.h:23
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
std::size_t Task
Definition types.h:22
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18
combined job_object, state and task identifier type
Definition types.h:25