23#include <sys/socket.h>
88 using Py_IsInitialized_type = int (*)(
void);
89 using PyGILState_Ensure_type =
void* (*)(
void);
90 using PyGILState_Release_type =
void (*)(
void*);
91 void* fPyGILState_STATE =
nullptr;
92 template<
class FPTYPE>
93 FPTYPE GetSymT(
const char*
name) {
return (FPTYPE) dlsym(
nullptr,
name);}
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>(
"Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized())
return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>(
"PyGILState_Ensure");
100 if (PyGILState_Ensure) fPyGILState_STATE = PyGILState_Ensure();
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>(
"PyGILState_Release");
106 if (fPyGILState_STATE && PyGILState_Release) PyGILState_Release(fPyGILState_STATE);
130 std::string basePath =
"/tmp/ROOTMP-";
135 unsigned nWorker = 0;
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
140 Error(
"TMPClient::Fork",
"[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
147 ROOT::Internal::TGILRAII tgilraai;
158 if (
s &&
s->IsValid()) {
162 Error(
"TMPClient::Fork",
"[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
179 if (signalHandlers && signalHandlers->
GetSize() > 0)
187 for (
auto h : *fileHandlers) {
222 server.
Init(sockets[1], nWorker);
258 if (count == nMessages)
264 Error(
"TMPClient:Broadcast",
"[E] Could not send message to server\n");
311 waitpid(pid,
nullptr, 0);
331 unsigned code = msg.first;
333 const char *str = ReadBuffer<const char*>(msg.second.get());
336 Error(
"TMPClient::HandleMPCode",
"[I][C] message received: %s\n", str);
338 Error(
"TMPClient::HandleMPCode",
"[E][C] error message received: %s\n", str);
341 Error(
"TMPClient::HandleMPCode",
"[I][C] shutdown notice received from %s\n", str);
344 Error(
"TMPClient::HandleMPCode",
"[W][C] unknown code received. code=%d\n", code);
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TGuiFactory * gBatchGuiFactory
R__EXTERN TGuiFactory * gGuiFactory
typedef void((*Func_t)())
R__EXTERN TSystem * gSystem
R__EXTERN TVirtualX * gGXBatch
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
unsigned fNWorkers
The number of workers that should be spawned upon forking.
TMPClient(unsigned nWorkers=0)
Class constructor.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
~TMPClient()
Class destructor.
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
void Remove(TSocket *s)
Remove a certain socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process.
void DeActivate(TSocket *s)
DeActivate a certain socket.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual void RemoveAll()
Remove all sockets from the monitor.
virtual void ActivateAll()
Activate all de-activated sockets.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Sequenceable collection abstract base class.
virtual TObject * First() const =0
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
virtual TSeqCollection * GetListOfFileHandlers() const
virtual TSeqCollection * GetListOfSignalHandlers() const
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
@ kMessage
Generic message.
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kShutdownNotice
Used by the workers to notify client of shutdown.
static constexpr double s