Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
JobManager.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
18#include "RooFit/MultiProcess/Queue.h" // complete type for JobManager::queue()
19#include "FIFOQueue.h" // complete type for JobManager::queue()
20#include "PriorityQueue.h" // complete type for JobManager::queue()
24
25namespace RooFit {
26namespace MultiProcess {
27
28/** \class JobManager
29 *
30 * \brief Main point of access for all MultiProcess infrastructure
31 *
32 * This class mainly serves as the access point to the multi-process infrastructure
33 * for 'Job's. It is meant to be used as a singleton that holds and connects the other
34 * infrastructural classes: the messenger, process manager, worker and queue loops.
35 *
36 * It is important that the user of this class, particularly the one that calls
37 * 'instance()' first, calls 'activate()' soon after, because everything that is
38 * done in between 'instance()' and 'activate()' will be executed on all processes.
39 * This may be useful in some cases, but in general, one will probably want to always
40 * use the 'JobManager' in its full capacity, including the queue and worker loops.
41 * This is the way the Job class uses this class, see 'Job::get_manager()'.
42 *
43 * The default number of processes is set using 'std::thread::hardware_concurrency()'.
44 * To change it, use 'Config::setDefaultNWorkers()' to set it to a different value
45 * before creation of a new JobManager instance.
46 */
47
48// static function
50{
52 instance_.reset(new JobManager(Config::getDefaultNWorkers())); // can't use make_unique, because ctor is private
53 instance_->messenger().test_connections(instance_->process_manager());
54 // set send to non blocking on all processes after checking the connections are working:
55 instance_->messenger().set_send_flag(zmq::send_flags::dontwait);
56 }
57 return instance_.get();
58}
59
60// static function
62{
63 return static_cast<bool>(instance_);
64}
65
66// (private) constructor
67/// Don't construct JobManager objects manually, use the static instance if
68/// you need to run multiple jobs.
69JobManager::JobManager(std::size_t N_workers)
70{
73 queue_ptr_ = std::make_unique<FIFOQueue>();
74 break;
75 }
77 queue_ptr_ = std::make_unique<PriorityQueue>();
78 break;
79 }
80 }
81 process_manager_ptr_ = std::make_unique<ProcessManager>(N_workers);
82 messenger_ptr_ = std::make_unique<Messenger>(*process_manager_ptr_);
83}
84
86{
87 // The instance typically gets created by some Job. Once all Jobs are gone, the
88 // JM will get destroyed. In this case, the job_objects map should have
89 // been emptied.
90 // The second case is when the program ends, at which time the static instance
91 // is destroyed. Jobs may still be present, for instance, the Job subclass
92 // RooFit::TestStatistics::LikelihoodGradientJob, will have
93 // been put into RooMinimizer::_theFitter->fObjFunction, as the gradient
94 // member. Because _theFitter is also a global static member, we cannot
95 // guarantee destruction order, and so the JobManager may be destroyed before
96 // all Jobs are destroyed. We cannot therefore make sure that the first
97 // condition is met. However, the Job objects stuck in _theFitter are not
98 // meant to be run again, because the program is ending anyway. So also in this
99 // case, we can safely shut down.
100 // There used to be an assert statement that checked whether the job_objects
101 // map was empty at destruction time, but that neglected the second possibility
102 // and led to assertion failures, which left the Messenger and ProcessManager
103 // objects intact, leading to the forked processes and their ZeroMQ resources
104 // to remain after exiting the main/master/parent process.
105 messenger_ptr_.reset(nullptr);
106 process_manager_ptr_.reset(nullptr);
107 queue_ptr_.reset(nullptr);
108}
109
110// static function
111/// \return job_id for added job_object
112std::size_t JobManager::add_job_object(Job *job_object)
113{
115 if (instance_->process_manager().is_initialized()) {
116 std::stringstream ss;
117 ss << "Cannot add Job to JobManager instantiation, forking has already taken place! Instance object at raw "
118 "ptr "
119 << instance_.get();
120 throw std::logic_error("Cannot add Job to JobManager instantiation, forking has already taken place! Call "
121 "terminate() on the instance before adding new Jobs.");
122 }
123 }
124 std::size_t job_id = job_counter_++;
125 job_objects_[job_id] = job_object;
126 return job_id;
127}
128
129// static function
130Job *JobManager::get_job_object(std::size_t job_object_id)
131{
132 return job_objects_[job_object_id];
133}
134
135// static function
136/// \return Returns 'true' when removed successfully, 'false' otherwise.
137bool JobManager::remove_job_object(std::size_t job_object_id)
138{
139 bool removed_successfully = job_objects_.erase(job_object_id) == 1;
140 if (job_objects_.empty()) {
141 instance_.reset(nullptr);
142 }
143 return removed_successfully;
144}
145
147{
148 return *process_manager_ptr_;
149}
150
152{
153 return *messenger_ptr_;
154}
155
157{
158 return queue_ptr_.get();
159}
160
161/// Retrieve results for a Job
162///
163/// \param requesting_job_id ID number of the Job in the JobManager's Job list
164void JobManager::retrieve(std::size_t requesting_job_id)
165{
166 if (process_manager().is_master()) {
167 bool job_fully_retrieved = false;
168 while (not job_fully_retrieved) {
169 try {
170 auto task_result_message = messenger().receive_from_worker_on_master<zmq::message_t>();
171 auto job_object_id = *reinterpret_cast<std::size_t *>(
172 task_result_message.data()); // job_id must always be the first element of the result message!
173 bool this_job_fully_retrieved =
174 JobManager::get_job_object(job_object_id)->receive_task_result_on_master(task_result_message);
175 if (requesting_job_id == job_object_id) {
176 job_fully_retrieved = this_job_fully_retrieved;
177 }
178 } catch (ZMQ::ppoll_error_t &e) {
180 try {
181 response = handle_zmq_ppoll_error(e);
182 } catch (std::logic_error &) {
183 printf("JobManager::retrieve got unhandleable ZMQ::ppoll_error_t\n");
184 throw;
185 }
186 if (response == zmq_ppoll_error_response::abort) {
187 throw std::logic_error("in JobManager::retrieve: master received a SIGTERM, aborting");
188 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
189 printf("EINTR in JobManager::retrieve, continuing\n");
190 continue;
191 } else if (response == zmq_ppoll_error_response::retry) {
192 printf("EAGAIN from ppoll in JobManager::retrieve, continuing\n");
193 continue;
194 }
195 } catch (zmq::error_t &e) {
196 printf("unhandled zmq::error_t (not a ppoll_error_t) in JobManager::retrieve with errno %d: %s\n", e.num(),
197 e.what());
198 throw;
199 }
200 }
201 }
202}
203
204/// \brief Start queue and worker loops on child processes
205///
206/// This function exists purely because activation from the constructor is
207/// impossible; the constructor must return a constructed instance, which it
208/// can't do if it's stuck in an infinite loop. This means the Job that first
209/// creates the JobManager instance must also activate it (or any other user
210/// of this class).
211/// This should be called soon after creation of instance, because everything
212/// between construction and activation gets executed both on the master
213/// process and on the slaves.
215{
216 activated_ = true;
217
218 if (process_manager().is_queue()) {
219 queue()->loop();
220 std::_Exit(0);
221 }
222
223 if (!is_worker_loop_running() && process_manager().is_worker()) {
225 std::_Exit(0);
226 }
227}
228
230{
231 return activated_;
232}
233
234// initialize static members
235std::map<std::size_t, Job *> JobManager::job_objects_;
236std::size_t JobManager::job_counter_ = 0;
237std::unique_ptr<JobManager> JobManager::instance_{nullptr};
238
239} // namespace MultiProcess
240} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
static unsigned int getDefaultNWorkers()
Definition Config.cxx:92
Main point of access for all MultiProcess infrastructure.
Definition JobManager.h:30
std::unique_ptr< Messenger > messenger_ptr_
Definition JobManager.h:54
std::unique_ptr< Queue > queue_ptr_
Definition JobManager.h:55
static std::size_t add_job_object(Job *job_object)
static JobManager * instance()
static std::size_t job_counter_
Definition JobManager.h:59
static Job * get_job_object(std::size_t job_object_id)
ProcessManager & process_manager() const
JobManager(std::size_t N_workers)
Don't construct JobManager objects manually, use the static instance if you need to run multiple jobs...
std::unique_ptr< ProcessManager > process_manager_ptr_
Definition JobManager.h:53
static std::map< std::size_t, Job * > job_objects_
Definition JobManager.h:58
static bool remove_job_object(std::size_t job_object_id)
static std::unique_ptr< JobManager > instance_
Definition JobManager.h:60
void retrieve(std::size_t requesting_job_id)
Retrieve results for a Job.
void activate()
Start queue and worker loops on child processes.
interface class for defining the actual work that must be done
Definition Job.h:25
virtual bool receive_task_result_on_master(const zmq::message_t &message)=0
Manages ZeroMQ sockets and wraps send and receive calls.
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
Fork processes for queue and workers.
Keeps a queue of tasks for workers and manages the queue process through its event loop.
Definition Queue.h:22
void loop()
The queue process's event loop.
Definition Queue.cxx:83
void worker_loop()
The worker processes' event loop.
Definition worker.cxx:43
bool is_worker_loop_running()
Definition worker.cxx:34
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18
static QueueType getQueueType()
Definition Config.cxx:107