Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
JobManager.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
18#include "RooFit/MultiProcess/Queue.h" // complete type for JobManager::queue()
22
23namespace RooFit {
24namespace MultiProcess {
25
26/** \class JobManager
27 *
28 * \brief Main point of access for all MultiProcess infrastructure
29 *
30 * This class mainly serves as the access point to the multi-process infrastructure
31 * for 'Job's. It is meant to be used as a singleton that holds and connects the other
32 * infrastructural classes: the messenger, process manager, worker and queue loops.
33 *
34 * It is important that the user of this class, particularly the one that calls
35 * 'instance()' first, calls 'activate()' soon after, because everything that is
36 * done in between 'instance()' and 'activate()' will be executed on all processes.
37 * This may be useful in some cases, but in general, one will probably want to always
38 * use the 'JobManager' in its full capacity, including the queue and worker loops.
39 * This is the way the Job class uses this class, see 'Job::get_manager()'.
40 *
41 * The default number of processes is set using 'std::thread::hardware_concurrency()'.
42 * To change it, use 'Config::setDefaultNWorkers()' to set it to a different value
43 * before creation of a new JobManager instance.
44 */
45
46// static function
48{
50 instance_.reset(new JobManager(Config::getDefaultNWorkers())); // can't use make_unique, because ctor is private
51 instance_->messenger().test_connections(instance_->process_manager());
52 // set send to non blocking on all processes after checking the connections are working:
53 instance_->messenger().set_send_flag(zmq::send_flags::dontwait);
54 }
55 return instance_.get();
56}
57
58// static function
60{
61 return static_cast<bool>(instance_);
62}
63
64// (private) constructor
65/// Don't construct JobManager objects manually, use the static instance if
66/// you need to run multiple jobs.
67JobManager::JobManager(std::size_t N_workers)
68{
69 queue_ptr_ = std::make_unique<Queue>();
70 process_manager_ptr_ = std::make_unique<ProcessManager>(N_workers);
71 messenger_ptr_ = std::make_unique<Messenger>(*process_manager_ptr_);
72}
73
75{
76 // The instance typically gets created by some Job. Once all Jobs are gone, the
77 // JM will get destroyed. In this case, the job_objects map should have
78 // been emptied.
79 // The second case is when the program ends, at which time the static instance
80 // is destroyed. Jobs may still be present, for instance, the Job subclass
81 // RooFit::TestStatistics::LikelihoodGradientJob, will have
82 // been put into RooMinimizer::_theFitter->fObjFunction, as the gradient
83 // member. Because _theFitter is also a global static member, we cannot
84 // guarantee destruction order, and so the JobManager may be destroyed before
85 // all Jobs are destroyed. We cannot therefore make sure that the first
86 // condition is met. However, the Job objects stuck in _theFitter are not
87 // meant to be run again, because the program is ending anyway. So also in this
88 // case, we can safely shut down.
89 // There used to be an assert statement that checked whether the job_objects
90 // map was empty at destruction time, but that neglected the second possibility
91 // and led to assertion failures, which left the Messenger and ProcessManager
92 // objects intact, leading to the forked processes and their ZeroMQ resources
93 // to remain after exiting the main/master/parent process.
94 messenger_ptr_.reset(nullptr);
95 process_manager_ptr_.reset(nullptr);
96 queue_ptr_.reset(nullptr);
97}
98
99// static function
100/// \return job_id for added job_object
101std::size_t JobManager::add_job_object(Job *job_object)
102{
104 if (instance_->process_manager().is_initialized()) {
105 std::stringstream ss;
106 ss << "Cannot add Job to JobManager instantiation, forking has already taken place! Instance object at raw "
107 "ptr "
108 << instance_.get();
109 throw std::logic_error("Cannot add Job to JobManager instantiation, forking has already taken place! Call "
110 "terminate() on the instance before adding new Jobs.");
111 }
112 }
113 std::size_t job_id = job_counter_++;
114 job_objects_[job_id] = job_object;
115 return job_id;
116}
117
118// static function
119Job *JobManager::get_job_object(std::size_t job_object_id)
120{
121 return job_objects_[job_object_id];
122}
123
124// static function
125/// \return Returns 'true' when removed successfully, 'false' otherwise.
126bool JobManager::remove_job_object(std::size_t job_object_id)
127{
128 bool removed_succesfully = job_objects_.erase(job_object_id) == 1;
129 if (job_objects_.empty()) {
130 instance_.reset(nullptr);
131 }
132 return removed_succesfully;
133}
134
136{
137 return *process_manager_ptr_;
138}
139
141{
142 return *messenger_ptr_;
143}
144
146{
147 return *queue_ptr_;
148}
149
150/// Retrieve results for a Job
151///
152/// \param requesting_job_id ID number of the Job in the JobManager's Job list
153void JobManager::retrieve(std::size_t requesting_job_id)
154{
155 if (process_manager().is_master()) {
156 bool job_fully_retrieved = false;
157 while (not job_fully_retrieved) {
158 try {
159 auto task_result_message = messenger().receive_from_worker_on_master<zmq::message_t>();
160 auto job_object_id = *reinterpret_cast<std::size_t *>(
161 task_result_message.data()); // job_id must always be the first element of the result message!
162 bool this_job_fully_retrieved =
163 JobManager::get_job_object(job_object_id)->receive_task_result_on_master(task_result_message);
164 if (requesting_job_id == job_object_id) {
165 job_fully_retrieved = this_job_fully_retrieved;
166 }
167 } catch (ZMQ::ppoll_error_t &e) {
169 try {
170 response = handle_zmq_ppoll_error(e);
171 } catch (std::logic_error &) {
172 printf("JobManager::retrieve got unhandleable ZMQ::ppoll_error_t\n");
173 throw;
174 }
175 if (response == zmq_ppoll_error_response::abort) {
176 throw std::logic_error("in JobManager::retrieve: master received a SIGTERM, aborting");
177 } else if (response == zmq_ppoll_error_response::unknown_eintr) {
178 printf("EINTR in JobManager::retrieve, continuing\n");
179 continue;
180 } else if (response == zmq_ppoll_error_response::retry) {
181 printf("EAGAIN from ppoll in JobManager::retrieve, continuing\n");
182 continue;
183 }
184 } catch (zmq::error_t &e) {
185 printf("unhandled zmq::error_t (not a ppoll_error_t) in JobManager::retrieve with errno %d: %s\n", e.num(),
186 e.what());
187 throw;
188 }
189 }
190 }
191}
192
193/// \brief Start queue and worker loops on child processes
194///
195/// This function exists purely because activation from the constructor is
196/// impossible; the constructor must return a constructed instance, which it
197/// can't do if it's stuck in an infinite loop. This means the Job that first
198/// creates the JobManager instance must also activate it (or any other user
199/// of this class).
200/// This should be called soon after creation of instance, because everything
201/// between construction and activation gets executed both on the master
202/// process and on the slaves.
204{
205 activated_ = true;
206
207 if (process_manager().is_queue()) {
208 queue().loop();
209 std::_Exit(0);
210 }
211
212 if (!is_worker_loop_running() && process_manager().is_worker()) {
214 std::_Exit(0);
215 }
216}
217
219{
220 return activated_;
221}
222
223// initialize static members
224std::map<std::size_t, Job *> JobManager::job_objects_;
225std::size_t JobManager::job_counter_ = 0;
226std::unique_ptr<JobManager> JobManager::instance_{nullptr};
227
228} // namespace MultiProcess
229} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
static unsigned int getDefaultNWorkers()
Definition Config.cxx:50
Main point of access for all MultiProcess infrastructure.
Definition JobManager.h:30
std::unique_ptr< Messenger > messenger_ptr_
Definition JobManager.h:54
std::unique_ptr< Queue > queue_ptr_
Definition JobManager.h:55
static std::size_t add_job_object(Job *job_object)
static JobManager * instance()
static std::size_t job_counter_
Definition JobManager.h:59
static Job * get_job_object(std::size_t job_object_id)
ProcessManager & process_manager() const
JobManager(std::size_t N_workers)
Don't construct JobManager objects manually, use the static instance if you need to run multiple jobs...
std::unique_ptr< ProcessManager > process_manager_ptr_
Definition JobManager.h:53
static std::map< std::size_t, Job * > job_objects_
Definition JobManager.h:58
static bool remove_job_object(std::size_t job_object_id)
static std::unique_ptr< JobManager > instance_
Definition JobManager.h:60
void retrieve(std::size_t requesting_job_id)
Retrieve results for a Job.
void activate()
Start queue and worker loops on child processes.
interface class for defining the actual work that must be done
Definition Job.h:25
virtual bool receive_task_result_on_master(const zmq::message_t &message)=0
Manages ZeroMQ sockets and wraps send and receive calls.
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
Fork processes for queue and workers.
Keeps a queue of tasks for workers and manages the queue process through its event loop.
Definition Queue.h:24
void loop()
The queue process's event loop.
Definition Queue.cxx:114
void worker_loop()
The worker processes' event loop.
Definition worker.cxx:40
bool is_worker_loop_running()
Definition worker.cxx:31
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
Definition util.cxx:64
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18