ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TMPClient.cxx
Go to the documentation of this file.
1 #include "TMPClient.h"
2 #include "TMPWorker.h"
3 #include "MPCode.h"
4 #include "TSocket.h"
5 #include "TGuiFactory.h" //gGuiFactory
6 #include "TVirtualX.h" //gVirtualX
7 #include "TSystem.h" //gSystem
8 #include "TROOT.h" //gROOT
9 #include "TError.h" //gErrorIgnoreLevel
10 #include <unistd.h> // close, fork
11 #include <sys/wait.h> // waitpid
12 #include <errno.h> //errno, used by socketpair
13 #include <sys/socket.h> //socketpair
14 #include <memory> //unique_ptr
15 #include <iostream>
16 
17 //////////////////////////////////////////////////////////////////////////
18 ///
19 /// \class TMPClient
20 ///
21 /// Base class for multiprocess applications' clients. It provides a
22 /// simple interface to fork a ROOT session into server/worker sessions
23 /// and exchange messages with them. Multiprocessing applications can build
24 /// on TMPClient and TMPWorker: the class providing multiprocess
25 /// functionalities to users should inherit (possibly privately) from
26 /// TMPClient, and the workers executing tasks should inherit from TMPWorker.
27 ///
28 //////////////////////////////////////////////////////////////////////////
29 
30 //////////////////////////////////////////////////////////////////////////
31 /// Class constructor.
32 /// \param nWorkers
33 /// \parblock
34 /// the number of children processes that will be created by
35 /// Fork, i.e. the number of workers that will be available after this call.
36 /// The default value (0) means that a number of workers equal to the number
37 /// of cores of the machine is going to be spawned. If that information is
38 /// not available, 2 workers are created instead.
39 /// \endparblock
40 TMPClient::TMPClient(unsigned nWorkers) : fIsParent(true), fWorkerPids(), fMon(), fNWorkers(0)
41 {
42  // decide on number of workers
43  if (nWorkers) {
44  fNWorkers = nWorkers;
45  } else {
46  SysInfo_t si;
47  if (gSystem->GetSysInfo(&si) == 0)
48  fNWorkers = si.fCpus;
49  else
50  fNWorkers = 2;
51  }
52 }
53 
54 
55 //////////////////////////////////////////////////////////////////////////
56 /// Class destructor.
57 /// This method is in charge of shutting down any remaining worker,
58 /// closing off connections and reap the terminated children processes.
60 {
63  l->Delete();
64  delete l;
66  l->Delete();
67  delete l;
68  fMon.RemoveAll();
69  ReapWorkers();
70 }
71 
72 
73 //////////////////////////////////////////////////////////////////////////
74 /// This method forks the ROOT session into fNWorkers children processes.
75 /// The ROOT sessions spawned in this way will not have graphical
76 /// capabilities and will not read from standard input, but will be
77 /// connected to the original (interactive) session through TSockets.
78 /// The children processes' PIDs are added to the fWorkerPids vector.
79 /// The parent session can then communicate with the children using the
80 /// Broadcast and MPSend methods, and receive messages through MPRecv.\n
81 /// \param server
82 /// \parblock
83 /// A pointer to an instance of the class that will take control
84 /// of the subprocesses after forking. Applications should implement their
85 /// own class inheriting from TMPWorker. Behaviour can be customized
86 /// overriding TMPWorker::HandleInput.
87 /// \endparblock
88 /// \return true if Fork succeeded, false otherwise
90 {
91  std::string basePath = "/tmp/ROOTMP-";
92 
93  //fork as many times as needed and save pids
94  pid_t pid = 1; //must be positive to handle the case in which fNWorkers is 0
95  int sockets[2]; //sockets file descriptors
96  unsigned nWorker = 0;
97  for (; nWorker < fNWorkers; ++nWorker) {
98  //create socket pair
99  int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
100  if (ret != 0) {
101  std::cerr << "[E][C] Could not create socketpair. Error n. " << errno << ". Now retrying.\n";
102  --nWorker;
103  continue;
104  }
105 
106  //fork
107  pid = fork();
108 
109  if (!pid) {
110  //child process, exit loop. sockets[1] is the fd that should be used
111  break;
112  } else {
113  //parent process, create TSocket with current value of sockets[0]
114  close(sockets[1]); //we don't need this
115  TSocket *s = new TSocket(sockets[0], (std::to_string(pid)).c_str()); //TSocket's constructor with this signature seems much faster than TSocket(int fd)
116  if (s && s->IsValid()) {
117  fMon.Add(s);
118  fWorkerPids.push_back(pid);
119  } else {
120  std::cerr << "[E][C] Could not connect to worker with pid " << pid << ". Giving up.\n";
121  delete s;
122  }
123  }
124  }
125 
126  if (pid) {
127  //parent returns here
128  return true;
129  } else {
130  //CHILD/WORKER
131  fIsParent = false;
132 
133  //override signal handler (make the servers exit on SIGINT)
134  TSeqCollection *signalHandlers = gSystem->GetListOfSignalHandlers();
135  TSignalHandler *sh = nullptr;
136  if (signalHandlers && signalHandlers->GetSize() > 0)
137  sh = (TSignalHandler *)signalHandlers->First();
138  if (sh)
140 
141  //remove stdin from eventloop and close it
142  TSeqCollection *fileHandlers = gSystem->GetListOfFileHandlers();
143  if (fileHandlers) {
144  for (auto h : *fileHandlers) {
145  if (h && ((TFileHandler *)h)->GetFd() == 0) {
147  break;
148  }
149  }
150  }
151  close(0);
152 
153  //disable graphics
154  //these instructions were copied from TApplication::MakeBatch
155  gROOT->SetBatch();
157  delete gGuiFactory;
159 #ifndef R__WIN32
160  if (gVirtualX != gGXBatch)
161  delete gVirtualX;
162 #endif
164 
165  //prepare server and add it to eventloop
166  server.Init(sockets[1], nWorker);
167 
168  //enter worker loop
169  server.Run();
170  }
171 
172  //control should never reach here
173  return true;
174 }
175 
176 
177 //////////////////////////////////////////////////////////////////////////
178 /// Send a message with the specified code to at most nMessages workers.
179 /// Sockets can either be in an "active" or "non-active" state. This method
180 /// activates all the sockets through which the client is connected to the
181 /// workers, and deactivates them when a message is sent to the corresponding
182 /// worker. This way the sockets pertaining to workers who have been left
183 /// idle will be the only ones in the active list
184 /// (TSocket::GetMonitor()->GetListOfActives()) after execution.
185 /// \param code the code to send (e.g. EMPCode)
186 /// \param nMessages
187 /// \parblock
188 /// the maximum number of messages to send.
189 /// If `nMessages == 0 || nMessage > fNWorkers`, send a message to every worker.
190 /// \endparblock
191 /// \return the number of messages successfully sent
192 unsigned TMPClient::Broadcast(unsigned code, unsigned nMessages)
193 {
194  if (nMessages == 0)
195  nMessages = fNWorkers;
196  unsigned count = 0;
197  fMon.ActivateAll();
198 
199  //send message to all sockets
200  std::unique_ptr<TList> lp(fMon.GetListOfActives());
201  for (auto s : *lp) {
202  if (count == nMessages)
203  break;
204  if (MPSend((TSocket *)s, code)) {
205  fMon.DeActivate((TSocket *)s);
206  ++count;
207  } else {
208  std::cerr << "[E] Could not send message to server\n";
209  }
210  }
211 
212  return count;
213 }
214 
215 
216 //////////////////////////////////////////////////////////////////////////
217 /// DeActivate a certain socket.
218 /// This does not remove it from the monitor: it will be reactivated by
219 /// the next call to Broadcast() (or possibly other methods that are
220 /// specified to do so).\n
221 /// A socket should be DeActivated when the corresponding
222 /// worker is done *for now* and we want to stop listening to this worker's
223 /// socket. If the worker is done *forever*, Remove() should be used instead.
224 /// \param s the socket to be deactivated
226 {
227  fMon.DeActivate(s);
228 }
229 
230 
231 //////////////////////////////////////////////////////////////////////////
232 /// Remove a certain socket from the monitor.
233 /// A socket should be Removed from the monitor when the
234 /// corresponding worker is done *forever*. For example HandleMPCode()
235 /// calls this method on sockets pertaining to workers which sent an
236 /// MPCode::kShutdownNotice.\n
237 /// If the worker is done *for now*, DeActivate should be used instead.
238 /// \param s the socket to be removed from the monitor fMon
240 {
241  fMon.Remove(s);
242  delete s;
243 }
244 
245 
246 //////////////////////////////////////////////////////////////////////////
247 /// Wait on worker processes and remove their pids from fWorkerPids.
248 /// A blocking waitpid is called, but this should actually not block
249 /// execution since ReapWorkers should only be called when all workers
250 /// have already quit. ReapWorkers is then called not to leave zombie
251 /// processes hanging around, and to clean-up fWorkerPids.
253 {
254  for (auto &pid : fWorkerPids) {
255  waitpid(pid, nullptr, 0);
256  }
257  fWorkerPids.clear();
258 }
259 
260 
261 //////////////////////////////////////////////////////////////////////////
262 /// Handle messages containing an EMPCode.
263 /// This method should be called upon receiving a message with a code >= 1000
264 /// (i.e. EMPCode). It handles the most generic types of messages.\n
265 /// Classes inheriting from TMPClient should implement a similar method
266 /// to handle message codes specific to the application they're part of.\n
267 /// \param msg the MPCodeBufPair returned by a MPRecv call
268 /// \param s
269 /// \parblock
270 /// a pointer to the socket from which the message has been received is passed.
271 /// This way HandleMPCode knows which socket to reply on.
272 /// \endparblock
274 {
275  unsigned code = msg.first;
276  //message contains server's pid. retrieve it
277  const char *str = ReadBuffer<const char*>(msg.second.get());
278 
279  if (code == MPCode::kMessage) {
280  std::cerr << "[I][C] message received: " << str << "\n";
281  } else if (code == MPCode::kError) {
282  std::cerr << "[E][C] error message received:\n" << str << "\n";
283  } else if (code == MPCode::kShutdownNotice || code == MPCode::kFatalError) {
284  if (gDebug > 0) //generally users don't want to know this
285  std::cerr << "[I][C] shutdown notice received from " << str << "\n";
286  Remove(s);
287  } else
288  std::cerr << "[W][C] unknown code received. code=" << code << "\n";
289 
290  delete [] str;
291 }
R__EXTERN TGuiFactory * gBatchGuiFactory
Definition: TGuiFactory.h:69
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
Definition: TMPClient.h:48
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
Definition: TList.cxx:405
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Definition: TMPClient.cxx:89
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
Definition: TMPClient.h:49
virtual Bool_t IsValid() const
Definition: TSocket.h:162
TH1 * h
Definition: legend2.C:5
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
Definition: TMPWorker.h:20
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
Definition: MPSendRecv.cxx:21
#define gROOT
Definition: TROOT.h:344
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
virtual TSeqCollection * GetListOfFileHandlers() const
Definition: TSystem.h:372
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
Error message.
Definition: MPCode.h:30
virtual void RemoveAll()
Remove all sockets from the monitor.
Definition: TMonitor.cxx:241
~TMPClient()
Class destructor.
Definition: TMPClient.cxx:59
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
Definition: TSystem.cxx:568
unsigned fNWorkers
The number of workers that should be spawned upon forking.
Definition: TMPClient.h:50
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Definition: TMPClient.cxx:239
Sequenceable collection abstract base class.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
Definition: TMonitor.cxx:515
void DeActivate(TSocket *s)
DeActivate a certain socket.
Definition: TMPClient.cxx:225
R__EXTERN TGuiFactory * gGuiFactory
Definition: TGuiFactory.h:68
Used by the workers to notify client of shutdown.
Definition: MPCode.h:33
A doubly linked list.
Definition: TList.h:47
Fatal error: whoever sends this message is terminating execution.
Definition: MPCode.h:31
R__EXTERN TVirtualX * gGXBatch
Definition: TVirtualX.h:365
Int_t fCpus
Definition: TSystem.h:165
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
TLine * l
Definition: textangle.C:4
#define gVirtualX
Definition: TVirtualX.h:362
void Run()
Definition: TMPWorker.cxx:63
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
Definition: MPSendRecv.h:20
TMPClient(unsigned nWorkers=0)
Class constructor.
Definition: TMPClient.cxx:40
Used by the client to tell servers to shutdown.
Definition: MPCode.h:32
virtual TSeqCollection * GetListOfSignalHandlers() const
Definition: TSystem.h:369
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
Definition: TSystem.cxx:546
virtual Int_t GetSize() const
Definition: TCollection.h:95
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process...
Definition: TMPClient.h:47
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
Definition: TMPClient.cxx:273
Generic message.
Definition: MPCode.h:29
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
Definition: TMPWorker.cxx:55
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
Definition: TProof.cxx:183
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
virtual void ActivateAll()
Activate all de-activated sockets.
Definition: TMonitor.cxx:268
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
Definition: TMPClient.cxx:192
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
Definition: TMPClient.cxx:252
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
Definition: TSystem.cxx:2360
virtual TObject * First() const =0