21#include <unordered_set>
24namespace MultiProcess {
89 pid_t child_pid = fork();
91 while (child_pid == -1) {
94 printf(
"fork returned with error number %d, retrying after 1 second...\n", errno);
98 printf(
"fork returned with error number %d\n", errno);
99 throw std::runtime_error(
"fork returned with error 3 times, aborting!");
114 for (std::size_t ix = 0; ix <
N_workers_; ++ix) {
138 memset(&sa,
'\0',
sizeof(sa));
141 if (sigaction(SIGTERM, &sa, NULL) < 0) {
142 std::perror(
"sigaction failed");
148#if defined(__APPLE__)
150 static bool affinity_warned =
false;
152 std::cout <<
"CPU affinity cannot be set on macOS" << std::endl;
153 affinity_warned =
true;
159 std::cerr <<
"WARNING: CPU affinity setting not implemented on Windows, continuing..." << std::endl;
174 CPU_SET(set_cpu, &mask);
177 if (sched_setaffinity(0,
sizeof(mask), &mask) == -1) {
178 std::cerr <<
"WARNING: Could not set CPU affinity, continuing..." << std::endl;
180 std::cerr <<
"CPU affinity set to cpu " << set_cpu <<
" in process " << getpid() << std::endl;
205 }
catch (
const std::exception &
e) {
206 std::cerr <<
"WARNING: something in ProcessManager::terminate threw an exception! Original exception message:\n"
207 <<
e.what() << std::endl;
226 }
while (-1 == pid && EINTR == errno);
229 if (WIFEXITED(status)) {
230 printf(
"exited, status=%d\n", WEXITSTATUS(status));
231 }
else if (WIFSIGNALED(status)) {
232 if (WTERMSIG(status) != SIGTERM) {
233 printf(
"killed by signal %d\n", WTERMSIG(status));
235 }
else if (WIFSTOPPED(status)) {
236 printf(
"stopped by signal %d\n", WSTOPSIG(status));
237 }
else if (WIFCONTINUED(status)) {
238 printf(
"continued\n");
243 if (errno == ECHILD) {
244 printf(
"chill_wait: no children (got ECHILD error code from wait call), done\n");
246 throw std::runtime_error(std::string(
"chill_wait: error in wait call: ") + strerror(errno) +
247 std::string(
", errno ") + std::to_string(errno));
259 std::unordered_set<pid_t> children;
264 children.insert(pid);
267 while (!children.empty()) {
307 printf(
"I'm a worker, PID %d\n", getpid());
309 printf(
"I'm master, PID %d\n", getpid());
311 printf(
"I'm queue, PID %d\n", getpid());
313 printf(
"I'm not master, queue or worker, weird! PID %d\n", getpid());
R__EXTERN C unsigned int sleep(unsigned int seconds)
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
void initialize_processes(bool cpu_pinning=true)
Fork processes and activate CPU pinning.
std::vector< pid_t > worker_pids_
std::size_t N_workers() const
static void handle_sigterm(int signum)
We need this to tell the children to die, because we can't talk to them anymore during JobManager des...
bool is_initialized() const
static bool sigterm_received()
void wait_for_sigterm_then_exit()
void identify_processes() const
Print to stdout which type of process we are on and what its PID is (for debugging)
void terminate() noexcept
Shutdown forked processes if on master and if this process manager is initialized.
ProcessManager(std::size_t N_workers)
void shutdown_processes()
Shutdown forked processes if on master.
static volatile sig_atomic_t sigterm_received_
std::size_t worker_id() const
void close_context() const
pid_t fork_and_handle_errors()
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...