24namespace MultiProcess {
53 instance_->messenger().set_send_flag(zmq::send_flags::dontwait);
104 if (
instance_->process_manager().is_initialized()) {
105 std::stringstream ss;
106 ss <<
"Cannot add Job to JobManager instantiation, forking has already taken place! Instance object at raw "
109 throw std::logic_error(
"Cannot add Job to JobManager instantiation, forking has already taken place! Call "
110 "terminate() on the instance before adding new Jobs.");
128 bool removed_succesfully =
job_objects_.erase(job_object_id) == 1;
132 return removed_succesfully;
156 bool job_fully_retrieved =
false;
157 while (not job_fully_retrieved) {
160 auto job_object_id = *
reinterpret_cast<std::size_t *
>(
161 task_result_message.data());
162 bool this_job_fully_retrieved =
164 if (requesting_job_id == job_object_id) {
165 job_fully_retrieved = this_job_fully_retrieved;
171 }
catch (std::logic_error &) {
172 printf(
"JobManager::retrieve got unhandleable ZMQ::ppoll_error_t\n");
176 throw std::logic_error(
"in JobManager::retrieve: master received a SIGTERM, aborting");
178 printf(
"EINTR in JobManager::retrieve, continuing\n");
181 printf(
"EAGAIN from ppoll in JobManager::retrieve, continuing\n");
184 }
catch (zmq::error_t &
e) {
185 printf(
"unhandled zmq::error_t (not a ppoll_error_t) in JobManager::retrieve with errno %d: %s\n",
e.num(),
static unsigned int getDefaultNWorkers()
Main point of access for all MultiProcess infrastructure.
std::unique_ptr< Messenger > messenger_ptr_
std::unique_ptr< Queue > queue_ptr_
static std::size_t add_job_object(Job *job_object)
Messenger & messenger() const
static JobManager * instance()
static std::size_t job_counter_
static Job * get_job_object(std::size_t job_object_id)
ProcessManager & process_manager() const
JobManager(std::size_t N_workers)
Don't construct JobManager objects manually, use the static instance if you need to run multiple jobs...
std::unique_ptr< ProcessManager > process_manager_ptr_
static std::map< std::size_t, Job * > job_objects_
static bool remove_job_object(std::size_t job_object_id)
static std::unique_ptr< JobManager > instance_
void retrieve(std::size_t requesting_job_id)
Retrieve results for a Job.
bool is_activated() const
void activate()
Start queue and worker loops on child processes.
static bool is_instantiated()
interface class for defining the actual work that must be done
virtual bool receive_task_result_on_master(const zmq::message_t &message)=0
Manages ZeroMQ sockets and wraps send and receive calls.
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
value_t receive_from_worker_on_master()
Fork processes for queue and workers.
Keeps a queue of tasks for workers and manages the queue process through its event loop.
void loop()
The queue process's event loop.
void worker_loop()
The worker processes' event loop.
bool is_worker_loop_running()
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...