26namespace MultiProcess {
 
   55      instance_->messenger().set_send_flag(zmq::send_flags::dontwait);
 
   77      queue_ptr_ = std::make_unique<PriorityQueue>();
 
  115      if (
instance_->process_manager().is_initialized()) {
 
  116         std::stringstream ss;
 
  117         ss << 
"Cannot add Job to JobManager instantiation, forking has already taken place! Instance object at raw " 
  120         throw std::logic_error(
"Cannot add Job to JobManager instantiation, forking has already taken place! Call " 
  121                                "terminate() on the instance before adding new Jobs.");
 
  139   bool removed_successfully = 
job_objects_.erase(job_object_id) == 1;
 
  143   return removed_successfully;
 
  167      bool job_fully_retrieved = 
false;
 
  168      while (not job_fully_retrieved) {
 
  171            auto job_object_id = *
reinterpret_cast<std::size_t *
>(
 
  172               task_result_message.data()); 
 
  173            bool this_job_fully_retrieved =
 
  175            if (requesting_job_id == job_object_id) {
 
  176               job_fully_retrieved = this_job_fully_retrieved;
 
  182            } 
catch (std::logic_error &) {
 
  183               printf(
"JobManager::retrieve got unhandleable ZMQ::ppoll_error_t\n");
 
  187               throw std::logic_error(
"in JobManager::retrieve: master received a SIGTERM, aborting");
 
  189               printf(
"EINTR in JobManager::retrieve, continuing\n");
 
  192               printf(
"EAGAIN from ppoll in JobManager::retrieve, continuing\n");
 
  195         } 
catch (zmq::error_t &
e) {
 
  196            printf(
"unhandled zmq::error_t (not a ppoll_error_t) in JobManager::retrieve with errno %d: %s\n", 
e.num(),
 
static unsigned int getDefaultNWorkers()
 
Main point of access for all MultiProcess infrastructure.
 
std::unique_ptr< Messenger > messenger_ptr_
 
std::unique_ptr< Queue > queue_ptr_
 
static std::size_t add_job_object(Job *job_object)
 
Messenger & messenger() const
 
static JobManager * instance()
 
static std::size_t job_counter_
 
static Job * get_job_object(std::size_t job_object_id)
 
ProcessManager & process_manager() const
 
JobManager(std::size_t N_workers)
Don't construct JobManager objects manually, use the static instance if you need to run multiple jobs...
 
std::unique_ptr< ProcessManager > process_manager_ptr_
 
static std::map< std::size_t, Job * > job_objects_
 
static bool remove_job_object(std::size_t job_object_id)
 
static std::unique_ptr< JobManager > instance_
 
void retrieve(std::size_t requesting_job_id)
Retrieve results for a Job.
 
bool is_activated() const
 
void activate()
Start queue and worker loops on child processes.
 
static bool is_instantiated()
 
interface class for defining the actual work that must be done
 
virtual bool receive_task_result_on_master(const zmq::message_t &message)=0
 
Manages ZeroMQ sockets and wraps send and receive calls.
 
void test_connections(const ProcessManager &process_manager)
Test whether push-pull sockets are working.
 
value_t receive_from_worker_on_master()
 
Fork processes for queue and workers.
 
Keeps a queue of tasks for workers and manages the queue process through its event loop.
 
void loop()
The queue process's event loop.
 
void worker_loop()
The worker processes' event loop.
 
bool is_worker_loop_running()
 
zmq_ppoll_error_response handle_zmq_ppoll_error(ZMQ::ppoll_error_t &e)
 
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
 
static QueueType getQueueType()