13 #include <sys/socket.h>
91 std::string basePath =
"/tmp/ROOTMP-";
99 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
101 std::cerr <<
"[E][C] Could not create socketpair. Error n. " << errno <<
". Now retrying.\n";
115 TSocket *s =
new TSocket(sockets[0], (std::to_string(pid)).c_str());
120 std::cerr <<
"[E][C] Could not connect to worker with pid " << pid <<
". Giving up.\n";
136 if (signalHandlers && signalHandlers->
GetSize() > 0)
144 for (
auto h : *fileHandlers) {
166 server.
Init(sockets[1], nWorker);
202 if (count == nMessages)
208 std::cerr <<
"[E] Could not send message to server\n";
255 waitpid(pid,
nullptr, 0);
275 unsigned code = msg.first;
277 const char *str = ReadBuffer<const char*>(msg.second.get());
280 std::cerr <<
"[I][C] message received: " << str <<
"\n";
282 std::cerr <<
"[E][C] error message received:\n" << str <<
"\n";
285 std::cerr <<
"[I][C] shutdown notice received from " << str <<
"\n";
288 std::cerr <<
"[W][C] unknown code received. code=" << code <<
"\n";
R__EXTERN TGuiFactory * gBatchGuiFactory
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
virtual Bool_t IsValid() const
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual TSeqCollection * GetListOfFileHandlers() const
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual void RemoveAll()
Remove all sockets from the monitor.
~TMPClient()
Class destructor.
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
unsigned fNWorkers
The number of workers that should be spawned upon forking.
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Sequenceable collection abstract base class.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
void DeActivate(TSocket *s)
DeActivate a certain socket.
R__EXTERN TGuiFactory * gGuiFactory
Used by the workers to notify client of shutdown.
Fatal error: whoever sends this message is terminating execution.
R__EXTERN TVirtualX * gGXBatch
R__EXTERN TSystem * gSystem
std::pair< unsigned, std::unique_ptr< TBufferFile >> MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
TMPClient(unsigned nWorkers=0)
Class constructor.
Used by the client to tell servers to shutdown.
virtual TSeqCollection * GetListOfSignalHandlers() const
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
virtual Int_t GetSize() const
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process...
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
ClassImp(TSlaveInfo) Int_t TSlaveInfo const TSlaveInfo * si
Used to sort slaveinfos by ordinal.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual TObject * First() const =0