23 #include <sys/socket.h> 88 using Py_IsInitialized_type = int (*)(
void);
89 using PyGILState_Ensure_type =
void* (*)(
void);
90 using PyGILState_Release_type =
void (*)(
void*);
91 void* fPyGILState_STATE =
nullptr;
92 template<
class FPTYPE>
93 FPTYPE GetSymT(
const char*
name) {
return (FPTYPE) dlsym(
nullptr,name);}
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>(
"Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized())
return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>(
"PyGILState_Ensure");
100 if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>(
"PyGILState_Release");
106 if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
130 std::string basePath =
"/tmp/ROOTMP-";
135 unsigned nWorker = 0;
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
140 Error(
"TMPClient::Fork",
"[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
147 ROOT::Internal::TGILRAII tgilraai;
157 TSocket *s =
new TSocket(sockets[0], (std::to_string(pid)).c_str());
162 Error(
"TMPClient::Fork",
"[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
179 if (signalHandlers && signalHandlers->
GetSize() > 0)
187 for (
auto h : *fileHandlers) {
222 server.
Init(sockets[1], nWorker);
258 if (count == nMessages)
264 Error(
"TMPClient:Broadcast",
"[E] Could not send message to server\n");
311 waitpid(pid,
nullptr, 0);
331 unsigned code = msg.first;
333 const char *str = ReadBuffer<const char*>(msg.second.get());
336 Error(
"TMPClient::HandleMPCode",
"[I][C] message received: %s\n", str);
338 Error(
"TMPClient::HandleMPCode",
"[E][C] error message received: %s\n", str);
341 Error(
"TMPClient::HandleMPCode",
"[I][C] shutdown notice received from %s\n", str);
344 Error(
"TMPClient::HandleMPCode",
"[W][C] unknown code received. code=%d\n", code);
R__EXTERN TGuiFactory * gBatchGuiFactory
virtual Bool_t IsValid() const
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
Namespace for new ROOT classes and functions.
virtual TSeqCollection * GetListOfFileHandlers() const
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual void RemoveAll()
Remove all sockets from the monitor.
~TMPClient()
Class destructor.
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
unsigned fNWorkers
The number of workers that should be spawned upon forking.
void Remove(TSocket *s)
Remove a certain socket from the monitor.
Sequenceable collection abstract base class.
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual void DeActivate(TSocket *sock)
De-activate a socket.
void DeActivate(TSocket *s)
DeActivate a certain socket.
R__EXTERN TGuiFactory * gGuiFactory
Used by the workers to notify client of shutdown.
void Error(const char *location, const char *msgfmt,...)
Fatal error: whoever sends this message is terminating execution.
R__EXTERN TVirtualX * gGXBatch
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
R__EXTERN TSystem * gSystem
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual TSeqCollection * GetListOfSignalHandlers() const
TMPClient(unsigned nWorkers=0)
Class constructor.
Used by the client to tell servers to shutdown.
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process...
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
typedef void((*Func_t)())
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual void ActivateAll()
Activate all de-activated sockets.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
virtual Int_t GetSize() const
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
virtual TObject * First() const =0