26namespace MultiProcess {
55 instance_->messenger().set_send_flag(zmq::send_flags::dontwait);
77 queue_ptr_ = std::make_unique<PriorityQueue>();
115 if (
instance_->process_manager().is_initialized()) {
116 std::stringstream ss;
117 ss <<
"Cannot add Job to JobManager instantiation, forking has already taken place! Instance object at raw "
120 throw std::logic_error(
"Cannot add Job to JobManager instantiation, forking has already taken place! Call "
121 "terminate() on the instance before adding new Jobs.");
139 bool removed_successfully =
job_objects_.erase(job_object_id) == 1;
143 return removed_successfully;
167 bool job_fully_retrieved =
false;
168 while (not job_fully_retrieved) {
171 auto job_object_id = *
reinterpret_cast<std::size_t *
>(
172 task_result_message.data());
173 bool this_job_fully_retrieved =
175 if (requesting_job_id == job_object_id) {
176 job_fully_retrieved = this_job_fully_retrieved;
182 }
catch (std::logic_error &) {
183 printf(
"JobManager::retrieve got unhandleable ZMQ::ppoll_error_t\n");
187 throw std::logic_error(
"in JobManager::retrieve: master received a SIGTERM, aborting");
189 printf(
"EINTR in JobManager::retrieve, continuing\n");
192 printf(
"EAGAIN from ppoll in JobManager::retrieve, continuing\n");
195 }
catch (zmq::error_t &
e) {
196 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in JobManager::retrieve with errno %d: %s\n",
e.num(),
static unsigned int getDefaultNWorkers()
Main point of access for all MultiProcess infrastructure.
std::unique_ptr< Messenger > messenger_ptr_
std::unique_ptr< Queue > queue_ptr_
static std::size_t add_job_object(Job *job_object)
Messenger & messenger() const
static JobManager * instance()
static std::size_t job_counter_
static Job * get_job_object(std::size_t job_object_id)
ProcessManager & process_manager() const
JobManager(std::size_t N_workers)
Don't construct JobManager objects manually, use the static instance if you need to run multiple jobs...
std::unique_ptr< ProcessManager > process_manager_ptr_
static std::map< std::size_t, Job * > job_objects_
static bool remove_job_object(std::size_t job_object_id)
static std::unique_ptr< JobManager > instance_
void retrieve(std::size_t requesting_job_id)
Retrieve results for a Job.
bool is_activated() const
void activate()
Start queue and worker loops on child processes.
static bool is_instantiated()
interface class for defining the actual work that must be done
virtual bool receive_task_result_on_master(const zmq::message_t &message)=0
Manages ZeroMQ sockets and wraps send and receive calls.
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
value_t receive_from_worker_on_master(bool *more=nullptr)
Fork processes for queue and workers.
Keeps a queue of tasks for workers and manages the queue process through its event loop.
void loop()
The queue process's event loop.
void worker_loop()
The worker processes' event loop.
bool is_worker_loop_running()
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
static QueueType getQueueType()