23#include <sys/socket.h>
92 template<
class FPTYPE>
97 auto Py_IsInitialized = GetSymT<Py_IsInitialized_type>(
"Py_IsInitialized");
98 if (!Py_IsInitialized || !Py_IsInitialized())
return;
99 auto PyGILState_Ensure = GetSymT<PyGILState_Ensure_type>(
"PyGILState_Ensure");
105 auto PyGILState_Release = GetSymT<PyGILState_Release_type>(
"PyGILState_Release");
130 std::string basePath =
"/tmp/ROOTMP-";
135 unsigned nWorker = 0;
138 int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sockets);
140 Error(
"TMPClient::Fork",
"[E][C] Could not create socketpair. Error n. . Now retrying.\n%d", errno);
157 TSocket *s =
new TSocket(sockets[0], (std::to_string(pid)).c_str());
162 Error(
"TMPClient::Fork",
"[E][C] Could not connect to worker with pid %d. Giving up.\n", pid);
179 if (signalHandlers && signalHandlers->
GetSize() > 0)
187 for (
auto h : *fileHandlers) {
197 while (lofact && (lofact->GetSize() > 0)) {
199 lofact.reset(
nullptr);
206 while (lofdeact && (lofdeact->GetSize() > 0)) {
208 lofdeact.reset(
nullptr);
227 server.
Init(sockets[1], nWorker);
263 if (count == nMessages)
269 Error(
"TMPClient:Broadcast",
"[E] Could not send message to server\n");
316 waitpid(pid,
nullptr, 0);
336 unsigned code = msg.first;
338 const char *str = ReadBuffer<const char*>(msg.second.get());
341 Error(
"TMPClient::HandleMPCode",
"[I][C] message received: %s\n", str);
343 Error(
"TMPClient::HandleMPCode",
"[E][C] error message received: %s\n", str);
346 Error(
"TMPClient::HandleMPCode",
"[I][C] shutdown notice received from %s\n", str);
349 Error(
"TMPClient::HandleMPCode",
"[W][C] unknown code received. code=%d\n", code);
std::pair< unsigned, std::unique_ptr< TBufferFile > > MPCodeBufPair
An std::pair that wraps the code and optional object contained in a message.
int MPSend(TSocket *s, unsigned code)
Send a message with the specified code on the specified socket.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
R__EXTERN TGuiFactory * gBatchGuiFactory
R__EXTERN TGuiFactory * gGuiFactory
R__EXTERN TSystem * gSystem
R__EXTERN TVirtualX * gGXBatch
Class to acquire and release the Python GIL where it applies, i.e.
void *(*)(void) PyGILState_Ensure_type
void(*)(void *) PyGILState_Release_type
int(*)(void) Py_IsInitialized_type
FPTYPE GetSymT(const char *name)
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
unsigned fNWorkers
The number of workers that should be spawned upon forking.
TMPClient(unsigned nWorkers=0)
Class constructor.
unsigned Broadcast(unsigned code, unsigned nMessages=0)
Send a message with the specified code to at most nMessages workers.
std::vector< pid_t > fWorkerPids
A vector containing the PIDs of children processes/workers.
~TMPClient()
Class destructor.
TMonitor fMon
This object manages the sockets and detect socket events via TMonitor::Select.
void HandleMPCode(MPCodeBufPair &msg, TSocket *sender)
Handle messages containing an EMPCode.
void ReapWorkers()
Wait on worker processes and remove their pids from fWorkerPids.
void Remove(TSocket *s)
Remove a certain socket from the monitor.
bool Fork(TMPWorker &server)
This method forks the ROOT session into fNWorkers children processes.
bool fIsParent
This is true if this is the parent/client process, false if this is a child/worker process.
void DeActivate(TSocket *s)
DeActivate a certain socket.
This class works in conjuction with TMPClient, reacting to messages received from it as specified by ...
virtual void Init(int fd, unsigned workerN)
This method is called by children processes right after forking.
virtual void RemoveAll()
Remove all sockets from the monitor.
virtual void ActivateAll()
Activate all de-activated sockets.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
TList * GetListOfDeActives() const
Returns a list with all de-active sockets.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
virtual void Delete(Option_t *option="")
Delete this object.
Sequenceable collection abstract base class.
virtual TObject * First() const =0
virtual Bool_t IsValid() const
virtual int GetSysInfo(SysInfo_t *info) const
Returns static system info, like OS type, CPU type, number of CPUs RAM size, etc into the SysInfo_t s...
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
virtual TSeqCollection * GetListOfFileHandlers() const
virtual TSeqCollection * GetListOfSignalHandlers() const
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
@ kMessage
Generic message.
@ kFatalError
Fatal error: whoever sends this message is terminating execution.
@ kShutdownOrder
Used by the client to tell servers to shutdown.
@ kShutdownNotice
Used by the workers to notify client of shutdown.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...