Logo ROOT  
Reference Guide
TMPClient.cxx
Go to the documentation of this file.
1/* @(#)root/multiproc:$Id$ */
2// Author: Enrico Guiraud July 2015
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "MPCode.h"
13#include "TGuiFactory.h" //gGuiFactory
14#include "TError.h" //gErrorIgnoreLevel
15#include "TMPClient.h"
16#include "TMPWorker.h"
17#include "TROOT.h" //gROOT
18#include "TSocket.h"
19#include "TSystem.h" //gSystem
20#include "TVirtualX.h" //gVirtualX
21#include <errno.h> //errno, used by socketpair
22#include <memory> //unique_ptr
23#include <sys/socket.h> //socketpair
24#include <sys/wait.h> // waitpid
25#include <unistd.h> // close, fork
26#include <dlfcn.h>
27
28//////////////////////////////////////////////////////////////////////////
29///
30/// \class TMPClient
31///
32/// Base class for multiprocess applications' clients. It provides a
33/// simple interface to fork a ROOT session into server/worker sessions
34/// and exchange messages with them. Multiprocessing applications can build
35/// on TMPClient and TMPWorker: the class providing multiprocess
36/// functionalities to users should inherit (possibly privately) from
37/// TMPClient, and the workers executing tasks should inherit from TMPWorker.
38///
39//////////////////////////////////////////////////////////////////////////
40
41//////////////////////////////////////////////////////////////////////////
42/// Class constructor.
43/// \param nWorkers
44/// \parblock
45/// the number of children processes that will be created by
46/// Fork, i.e. the number of workers that will be available after this call.
47/// The default value (0) means that a number of workers equal to the number
48/// of cores of the machine is going to be spawned. If that information is
49/// not available, 2 workers are created instead.
50/// \endparblock
51TMPClient::TMPClient(unsigned nWorkers) : fIsParent(true), fWorkerPids(), fMon(), fNWorkers(0)
52{
53 // decide on number of workers
54 if (nWorkers) {
55 fNWorkers = nWorkers;
56 } else {
57 SysInfo_t si;
58 if (gSystem->GetSysInfo(&si) == 0)
59 fNWorkers = si.fCpus;
60 else
61 fNWorkers = 2;
62 }
63}
64
65
66//////////////////////////////////////////////////////////////////////////
67/// Class destructor.
68/// This method is in charge of shutting down any remaining worker,
69/// closing off connections and reap the terminated children processes.
71{
74 l->Delete();
75 delete l;
77 l->Delete();
78 delete l;
81}
82
83namespace ROOT {
84 namespace Internal {
85 /// Class to acquire and release the Python GIL where it applies, i.e.
86 /// if libPython is loaded and the interpreter is initialized.
87 class TGILRAII {
88 using Py_IsInitialized_type = int (*)(void);
89 using PyGILState_Ensure_type = void* (*)(void);
90 using PyGILState_Release_type = void (*)(void*);
91 void* fPyGILState_STATE = nullptr;
92 template<class FPTYPE>
93 FPTYPE GetSymT(const char* name) {return (FPTYPE) dlsym(nullptr,name);}
94 public:
95 TGILRAII()
96 {
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>("Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized()) return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>("PyGILState_Ensure");
100 if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
101 }
102
103 ~TGILRAII()
104 {
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>("PyGILState_Release");
106 if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
107 }
108 };
109 }
110}
111
112//////////////////////////////////////////////////////////////////////////
113/// This method forks the ROOT session into fNWorkers children processes.
114/// The ROOT sessions spawned in this way will not have graphical
115/// capabilities and will not read from standard input, but will be
116/// connected to the original (interactive) session through TSockets.
117/// The children processes' PIDs are added to the fWorkerPids vector.
118/// The parent session can then communicate with the children using the
119/// Broadcast and MPSend methods, and receive messages through MPRecv.\n
120/// \param server
121/// \parblock
122/// A pointer to an instance of the class that will take control
123/// of the subprocesses after forking. Applications should implement their
124/// own class inheriting from TMPWorker. Behaviour can be customized
125/// overriding TMPWorker::HandleInput.
126/// \endparblock
127/// \return true if Fork succeeded, false otherwise
129{
130 std::string basePath = "/tmp/ROOTMP-";
131
132 //fork as many times as needed and save pids
133 pid_t pid = 1; //must be positive to handle the case in which fNWorkers is 0
134 int sockets[2]; //sockets file descriptors
135 unsigned nWorker = 0;
136 for (; nWorker < fNWorkers; ++nWorker) {
137 //create socket pair
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
139 if (ret != 0) {
140 Error("TMPClient::Fork", "[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
141 --nWorker;
142 continue;
143 }
144
145 //fork
146 {
147 ROOT::Internal::TGILRAII tgilraai;
148 pid = fork();
149 }
150
151 if (!pid) {
152 //child process, exit loop. sockets[1] is the fd that should be used
153 break;
154 } else {
155 //parent process, create TSocket with current value of sockets[0]
156 close(sockets[1]); //we don't need this
157 TSocket *s = new TSocket(sockets[0], (std::to_string(pid)).c_str()); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
158 if (s && s->IsValid()) {
159 fMon.Add(s);
160 fWorkerPids.push_back(pid);
161 } else {
162 Error("TMPClient::Fork","[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
163 delete s;
164 }
165 }
166 }
167
168 if (pid) {
169 //parent returns here
170 return true;
171 } else {
172 //CHILD/WORKER
173 fIsParent = false;
174 close(sockets[0]); //we don't need this
175
176 //override signal handler (make the servers exit on SIGINT)
178 TSignalHandler *sh = nullptr;
179 if (signalHandlers && signalHandlers->GetSize() > 0)
180 sh = (TSignalHandler *)signalHandlers->First();
181 if (sh)
183
184 //remove stdin from eventloop and close it
186 if (fileHandlers) {
187 for (auto h : *fileHandlers) {
188 if (h && ((TFileHandler *)h)->GetFd() == 0) {
190 break;
191 }
192 }
193 }
194 close(0);
195 if (fMon.GetListOfActives()) {
196 while (fMon.GetListOfActives()->GetSize() > 0) {
198 fMon.Remove(s);
199 delete s;
200 }
201 }
202 if (fMon.GetListOfDeActives()) {
203 while (fMon.GetListOfDeActives()->GetSize() > 0) {
205 fMon.Remove(s);
206 delete s;
207 }
208 }
209 //disable graphics
210 //these instructions were copied from TApplication::MakeBatch
211 gROOT->SetBatch();
213 delete gGuiFactory;
215#ifndef R__WIN32
216 if (gVirtualX != gGXBatch)
217 delete gVirtualX;
218#endif
220
221 //prepare server and add it to eventloop
222 server.Init(sockets[1], nWorker);
223
224 //enter worker loop
225 server.Run();
226 }
227
228 //control should never reach here
229 return true;
230}
231
232
233//////////////////////////////////////////////////////////////////////////
234/// Send a message with the specified code to at most nMessages workers.
235/// Sockets can either be in an "active" or "non-active" state. This method
236/// activates all the sockets through which the client is connected to the
237/// workers, and deactivates them when a message is sent to the corresponding
238/// worker. This way the sockets pertaining to workers who have been left
239/// idle will be the only ones in the active list
240/// (TSocket::GetMonitor()->GetListOfActives()) after execution.
241/// \param code the code to send (e.g. EMPCode)
242/// \param nMessages
243/// \parblock
244/// the maximum number of messages to send.
245/// If `nMessages == 0 || nMessage > fNWorkers`, send a message to every worker.
246/// \endparblock
247/// \return the number of messages successfully sent
248unsigned TMPClient::Broadcast(unsigned code, unsigned nMessages)
249{
250 if (nMessages == 0)
251 nMessages = fNWorkers;
252 unsigned count = 0;
254
255 //send message to all sockets
256 std::unique_ptr<TList> lp(fMon.GetListOfActives());
257 for (auto s : *lp) {
258 if (count == nMessages)
259 break;
260 if (MPSend((TSocket *)s, code)) {
262 ++count;
263 } else {
264 Error("TMPClient:Broadcast", "[E] Could not send message to server\n");
265 }
266 }
267
268 return count;
269}
270
271
272//////////////////////////////////////////////////////////////////////////
273/// DeActivate a certain socket.
274/// This does not remove it from the monitor: it will be reactivated by
275/// the next call to Broadcast() (or possibly other methods that are
276/// specified to do so).\n
277/// A socket should be DeActivated when the corresponding
278/// worker is done *for now* and we want to stop listening to this worker's
279/// socket. If the worker is done *forever*, Remove() should be used instead.
280/// \param s the socket to be deactivated
282{
284}
285
286
287//////////////////////////////////////////////////////////////////////////
288/// Remove a certain socket from the monitor.
289/// A socket should be Removed from the monitor when the
290/// corresponding worker is done *forever*. For example HandleMPCode()
291/// calls this method on sockets pertaining to workers which sent an
292/// MPCode::kShutdownNotice.\n
293/// If the worker is done *for now*, DeActivate should be used instead.
294/// \param s the socket to be removed from the monitor fMon
296{
297 fMon.Remove(s);
298 delete s;
299}
300
301
302//////////////////////////////////////////////////////////////////////////
303/// Wait on worker processes and remove their pids from fWorkerPids.
304/// A blocking waitpid is called, but this should actually not block
305/// execution since ReapWorkers should only be called when all workers
306/// have already quit. ReapWorkers is then called not to leave zombie
307/// processes hanging around, and to clean-up fWorkerPids.
309{
310 for (auto &pid : fWorkerPids) {
311 waitpid(pid, nullptr, 0);
312 }
313 fWorkerPids.clear();
314}
315
316
317//////////////////////////////////////////////////////////////////////////
318/// Handle messages containing an EMPCode.
319/// This method should be called upon receiving a message with a code >= 1000
320/// (i.e. EMPCode). It handles the most generic types of messages.\n
321/// Classes inheriting from TMPClient should implement a similar method
322/// to handle message codes specific to the application they're part of.\n
323/// \param msg the MPCodeBufPair returned by a MPRecv call
324/// \param s
325/// \parblock
326/// a pointer to the socket from which the message has been received is passed.
327/// This way HandleMPCode knows which socket to reply on.
328/// \endparblock
330{
331 unsigned code = msg.first;
332 //message contains server's pid. retrieve it
333 const char *str = ReadBuffer<const char*>(msg.second.get());
334
335 if (code == MPCode::kMessage) {
336 Error("TMPClient::HandleMPCode", "[I][C] message received: %s\n", str);
337 } else if (code == MPCode::kError) {
338 Error("TMPClient::HandleMPCode", "[E][C] error message received: %s\n", str);
339 } else if (code == MPCode::kShutdownNotice || code == MPCode::kFatalError) {
340 if (gDebug > 0) //generally users don't want to know this
341 Error("TMPClient::HandleMPCode", "[I][C] shutdown notice received from %s\n", str);
342 Remove(s);
343 } else
344 Error("TMPClient::HandleMPCode", "[W][C] unknown code received. code=%d\n", code);
345 delete [] str;
346}
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:31
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:32
#define h(i)
Definition: RSha256.hxx:106
R__EXTERN Int_t gDebug
Definition: Rtypes.h:91
void Error(const char *location, const char *msgfmt,...)
char name[80]
Definition: TGX11.cxx:109
R__EXTERN TGuiFactory * gBatchGuiFactory
Definition: TGuiFactory.h:67
R__EXTERN TGuiFactory * gGuiFactory
Definition: TGuiFactory.h:66
#define gROOT
Definition: TROOT.h:415
typedef void((*Func_t)())
R__EXTERN TSystem * gSystem
Definition: TSystem.h:560
#define gVirtualX
Definition: TVirtualX.h:345
R__EXTERN TVirtualX * gGXBatch
Definition: TVirtualX.h:348
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
A doubly linked list.
Definition: TList.h:44
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:656
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:51
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:248
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:70
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:329
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:308
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:295
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:128
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process.
Definition: TMPClient.h:47
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:281
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:26
void Run()
Definition: TMPWorker.cxx:64
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
Definition: TMonitor.cxx:515
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
Sequenceable collection abstract base class.
virtual TObject * First() const =0
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2503
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:574
virtual TSeqCollection * GetListOfFileHandlers() const
Definition: TSystem.h:384
virtual TSeqCollection * GetListOfSignalHandlers() const
Definition: TSystem.h:381
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:552
@ kMessage
Generic message.
Definition: MPCode.h:46
@ kError
Error message.
Definition: MPCode.h:47
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:48
@ kShutdownOrder
Used by the client to tell servers to shutdown.
Definition: MPCode.h:49
@ kShutdownNotice
Used by the workers to notify client of shutdown.
Definition: MPCode.h:50
VSD Structures.
Definition: StringConv.hxx:21
static constexpr double s
Int_t fCpus
Definition: TSystem.h:155
auto * l
Definition: textangle.C:4