Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
ProcessManager.cxx
Go to the documentation of this file.
1/*
2 * Project: RooFit
3 * Authors:
4 * PB, Patrick Bos, Netherlands eScience Center, p.bos@esciencecenter.nl
5 * IP, Inti Pelupessy, Netherlands eScience Center, i.pelupessy@esciencecenter.nl
6 *
7 * Copyright (c) 2021, CERN
8 *
9 * Redistribution and use in source and binary forms,
10 * with or without modification, are permitted according to the terms
11 * listed in LICENSE (http://roofit.sourceforge.net/license.txt)
12 */
13
17
18#include <cstring> // for strsignal
19#include <sys/wait.h> // for wait
20#include <iostream>
21#include <unordered_set>
22
23namespace RooFit {
24namespace MultiProcess {
25
26/// \class ProcessManager
27/// \brief Fork processes for queue and workers
28///
29/// This class manages three types of processes:
30/// 1. master: the initial main process. It defines and enqueues tasks
31/// and processes results.
32/// 2. workers: a pool of processes that will try to take tasks from the
33/// queue. These are forked from master.
34/// 3. queue: This process runs the queue_loop and maintains the queue of
35/// tasks. It is also forked from master.
36///
37/// \param N_workers Number of worker processes to spawn.
38ProcessManager::ProcessManager(std::size_t N_workers) : N_workers_(N_workers)
39{
40 // Note: zmq context is automatically created in the ZeroMQSvc class and maintained as singleton,
41 // but we must close any possibly existing state before reusing it. This assumes that our Messenger
42 // is the only user of ZeroMQSvc and that there is only one Messenger at a time. Beware that
43 // this must be designed more carefully if either of these assumptions change! Note also that this
44 // call must be done before the ProcessManager forks new processes, otherwise the master process'
45 // context that will be cloned to all forked processes will be closed multiple times, which will
46 // hang, because the ZeroMQ context creates threads and these will not be cloned along with the
47 // fork. See the ZeroMQ documentation for more details on this. In principle, one could design this
48 // in a more finegrained way by keeping the context on the master process and only recreating it
49 // on child processes (while avoiding calling the destructor on the child processes!). This
50 // approach may offer more flexibility if this is needed in the future.
53}
54
56{
57 if (is_master()) {
58 terminate();
59 } else {
61 }
62}
63
64// static member initialization
65volatile sig_atomic_t ProcessManager::sigterm_received_ = 0;
66
67// static function
68/// We need this to tell the children to die, because we can't talk
69/// to them anymore during JobManager destruction, because that kills
70/// the Messenger first. We do that with SIGTERMs. The sigterm_received()
71/// should be checked in message loops to stop them when it's true.
73{
75}
76
77// static function
79{
80 if (sigterm_received_ > 0) {
81 return true;
82 } else {
83 return false;
84 }
85}
86
88{
89 pid_t child_pid = fork();
90 int retries = 0;
91 while (child_pid == -1) {
92 if (retries < 3) {
93 ++retries;
94 printf("fork returned with error number %d, retrying after 1 second...\n", errno);
95 sleep(1);
96 child_pid = fork();
97 } else {
98 printf("fork returned with error number %d\n", errno);
99 throw std::runtime_error("fork returned with error 3 times, aborting!");
100 }
101 }
102 return child_pid;
103}
104
105/// \brief Fork processes and activate CPU pinning
106///
107/// \param cpu_pinning Activate CPU pinning if true. Effective on Linux only.
109{
110 // Initialize processes;
111 // ... first workers:
112 worker_pids_.resize(N_workers_);
113 pid_t child_pid{};
114 for (std::size_t ix = 0; ix < N_workers_; ++ix) {
115 child_pid = fork_and_handle_errors();
116 if (!child_pid) { // we're on the worker
117 is_worker_ = true;
118 worker_id_ = ix;
119 break;
120 } else { // we're on master
121 worker_pids_[ix] = child_pid;
122 }
123 }
124
125 // ... then queue:
126 if (child_pid) { // we're on master
128 if (!queue_pid_) { // we're now on queue
129 is_queue_ = true;
130 } else {
131 is_master_ = true;
132 }
133 }
134
135 // set the sigterm handler on the child processes
136 if (!is_master_) {
137 struct sigaction sa;
138 memset(&sa, '\0', sizeof(sa));
139 sa.sa_handler = ProcessManager::handle_sigterm;
140
141 if (sigaction(SIGTERM, &sa, NULL) < 0) {
142 std::perror("sigaction failed");
143 std::exit(1);
144 }
145 }
146
147 if (cpu_pinning) {
148#if defined(__APPLE__)
149#ifndef NDEBUG
150 static bool affinity_warned = false;
151 if (is_master() & !affinity_warned) {
152 std::cout << "CPU affinity cannot be set on macOS" << std::endl;
153 affinity_warned = true;
154 }
155#endif // NDEBUG
156#elif defined(_WIN32)
157#ifndef NDEBUG
158 if (is_master())
159 std::cerr << "WARNING: CPU affinity setting not implemented on Windows, continuing..." << std::endl;
160#endif // NDEBUG
161#else
162 cpu_set_t mask;
163 // zero all bits in mask
164 CPU_ZERO(&mask);
165 // set correct bit
166 std::size_t set_cpu;
167 if (is_master()) {
168 set_cpu = N_workers() + 1;
169 } else if (is_queue()) {
170 set_cpu = N_workers();
171 } else {
172 set_cpu = worker_id();
173 }
174 CPU_SET(set_cpu, &mask);
175#ifndef NDEBUG
176 // sched_setaffinity returns 0 on success
177 if (sched_setaffinity(0, sizeof(mask), &mask) == -1) {
178 std::cerr << "WARNING: Could not set CPU affinity, continuing..." << std::endl;
179 } else {
180 std::cerr << "CPU affinity set to cpu " << set_cpu << " in process " << getpid() << std::endl;
181 }
182#endif // NDEBUG
183#endif
184 }
185
186#ifndef NDEBUG
188#endif // NDEBUG
189
190 initialized_ = true;
191}
192
194{
195 return initialized_;
196}
197
198/// Shutdown forked processes if on master and if this process manager is initialized
200{
201 try {
202 if (is_master() && is_initialized()) {
204 }
205 } catch (const std::exception &e) {
206 std::cerr << "WARNING: something in ProcessManager::terminate threw an exception! Original exception message:\n"
207 << e.what() << std::endl;
208 }
209}
210
212{
213 if (!is_master()) {
214 while (!sigterm_received()) {
215 }
216 std::_Exit(0);
217 }
218}
219
221{
222 int status = 0;
223 pid_t pid;
224 do {
225 pid = wait(&status);
226 } while (-1 == pid && EINTR == errno); // retry on interrupted system call
227
228 if (0 != status) {
229 if (WIFEXITED(status)) {
230 printf("exited, status=%d\n", WEXITSTATUS(status));
231 } else if (WIFSIGNALED(status)) {
232 if (WTERMSIG(status) != SIGTERM) {
233 printf("killed by signal %d\n", WTERMSIG(status));
234 }
235 } else if (WIFSTOPPED(status)) {
236 printf("stopped by signal %d\n", WSTOPSIG(status));
237 } else if (WIFCONTINUED(status)) {
238 printf("continued\n");
239 }
240 }
241
242 if (-1 == pid) {
243 if (errno == ECHILD) {
244 printf("chill_wait: no children (got ECHILD error code from wait call), done\n");
245 } else {
246 throw std::runtime_error(std::string("chill_wait: error in wait call: ") + strerror(errno) +
247 std::string(", errno ") + std::to_string(errno));
248 }
249 }
250
251 return pid;
252}
253
254/// Shutdown forked processes if on master
256{
257 if (is_master()) {
258 // terminate all children
259 std::unordered_set<pid_t> children;
260 children.insert(queue_pid_);
261 kill(queue_pid_, SIGTERM);
262 for (auto pid : worker_pids_) {
263 kill(pid, SIGTERM);
264 children.insert(pid);
265 }
266 // then wait for them to actually die and clean out the zombies
267 while (!children.empty()) {
268 pid_t pid = chill_wait();
269 children.erase(pid);
270 }
271 }
272
273 initialized_ = false;
274}
275
276// Getters
277
279{
280 return is_master_;
281}
282
284{
285 return is_queue_;
286}
287
289{
290 return is_worker_;
291}
292
293std::size_t ProcessManager::worker_id() const
294{
295 return worker_id_;
296}
297
298std::size_t ProcessManager::N_workers() const
299{
300 return N_workers_;
301}
302
303/// Print to stdout which type of process we are on and what its PID is (for debugging)
305{
306 if (is_worker_) {
307 printf("I'm a worker, PID %d\n", getpid());
308 } else if (is_master_) {
309 printf("I'm master, PID %d\n", getpid());
310 } else if (is_queue_) {
311 printf("I'm queue, PID %d\n", getpid());
312 } else {
313 printf("I'm not master, queue or worker, weird! PID %d\n", getpid());
314 }
315}
316
317} // namespace MultiProcess
318} // namespace RooFit
#define e(i)
Definition RSha256.hxx:103
R__EXTERN C unsigned int sleep(unsigned int seconds)
ZeroMQSvc & zmqSvc()
Get singleton object of this class.
Definition ZeroMQSvc.cpp:34
void initialize_processes(bool cpu_pinning=true)
Fork processes and activate CPU pinning.
static void handle_sigterm(int signum)
We need this to tell the children to die, because we can't talk to them anymore during JobManager des...
void identify_processes() const
Print to stdout which type of process we are on and what its PID is (for debugging)
void terminate() noexcept
Shutdown forked processes if on master and if this process manager is initialized.
void shutdown_processes()
Shutdown forked processes if on master.
static volatile sig_atomic_t sigterm_received_
void close_context() const
The namespace RooFit contains mostly switches that change the behaviour of functions of PDFs (or othe...
Definition Common.h:18