20#include "RConfigure.h"
29#include <netinet/in.h>
51#include "compiledata.h"
75Bool_t TXProofServSigPipeHandler::Notify()
77 fServ->HandleSigPipe();
94Bool_t TXProofServTerminationHandler::Notify()
96 Printf(
"Received SIGTERM: terminating");
98 fServ->HandleTermination();
115Bool_t TXProofServSegViolationHandler::Notify()
118 Printf(
"**** Segmentation violation: terminating ****");
120 fServ->HandleTermination();
138Bool_t TXProofServInputHandler::Notify()
140 fServ->HandleSocketInput();
142 ((
TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
186 Info(
"CreateServer",
"starting%s server creation", (xtest ?
" test" :
""));
192 Error(
"CreateServer",
"resolving the log file description number");
211 Error(
"CreateServer",
"test: socket setup by xpd undefined");
218 Error(
"CreateServer",
"test: sending protocol number");
225 Error(
"CreateServer",
"socket setup by xpd undefined");
236 if (opensock && strlen(opensock) > 0) {
238 sockfd = (
Int_t) strtol(opensock, 0, 10);
241 Warning(
"CreateServer",
"socket descriptor: wrong conversion from '%s'", opensock);
244 Info(
"CreateServer",
"using open connection (descriptor %d)", sockfd);
250 Error(
"CreateServer",
"Session ID undefined");
257 Error(
"CreateServer",
"Failed to open connection to XrdProofd coordinator");
285 Error(
"CreateServer",
"Client ID undefined");
307 Info(
"CreateServer",
"Service: %s, ConfDir: %s, IsMaster: %d",
391 Error(
"CreateServer",
"no plugin manager found");
400 Error(
"CreateServer",
"no plugin found for TProof with a"
408 if (
h->LoadPlugin() == -1) {
409 Error(
"CreateServer",
"plugin for TProof could not be loaded");
438 Error(
"CreateServer",
"plugin for TProof could not be executed");
463 msg.
Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
464 " This may generate compatibility problems between streamed objects.\n"
465 " The advise is to move to ROOT >= 5.21/02 .");
477 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
479 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
508 Error(
"HandleUrgentData",
"error receiving interrupt");
513 Info(
"HandleUrgentData",
"got interrupt: %d\n", iLev);
522 Info(
"HandleUrgentData",
"*** Ping");
528 Info(
"HandleUrgentData",
"%d slaves did not reply to ping",nbad);
554 Error(
"HandleUrgentData",
"problems updating status path: %s (errno: %d)",
fAdminPath.
Data(), -uss_rc);
557 Info(
"HandleUrgentData",
"admin path undefined");
563 Info(
"HandleUrgentData",
"*** Hard Interrupt");
578 Info(
"HandleUrgentData",
"Soft Interrupt");
593 Info(
"HandleUrgentData",
"Shutdown Interrupt");
601 Error(
"HandleUrgentData",
"unexpected type: %d", iLev);
616 Info(
"HandleSigPipe",
"got sigpipe ... do nothing");
636 timeout = (timeout > 20) ? timeout : 20;
643 Warning(
"HandleTermination",
"processing could not be stopped");
668 Error(
"Setup",
"failed to send proof server startup message");
674 Error(
"Setup",
"remote proof protocol missing");
703 Error(
"Setup",
"Session tag missing");
721 Error(
"Setup",
"top session tag missing");
733 }
else if (nd != 1) {
744 Error(
"Setup",
"Session dir missing");
757 Error(
"Setup",
"common setup failed");
777 Info(
"Setup",
"successfully completed");
811 if (dynamicStartup) {
816 if (doto > 0 && --dynto < 0)
break;
839 if (
s.EndsWith(
"x")) {
841 s.ReplaceAll(
"x",
"");
845 if (!dynamicStartup && (nwrks > 0)) {
849 msg.
Form(
"+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
851 msg.
Form(
"+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
869 Error(
"GetWorkers",
"no appropriate master line got from coordinator");
878 while (fl.
Tokenize(tok, from,
"&")) {
880 if (nwrks == -1 || nwrks > 0) {
883 if (pernode && nodecnt) {
893 Info(
"GetWorkers",
"%p: name: %s (%s) val: %d (nwrks: %d)",
896 if (workers) workers->
Add(ni);
907 if (nwrks != -1) nwrks--;
942 "%p: connection to local coordinator re-established",
this);
947 Printf(
"TXProofServ::HandleError: %p: got called ...",
this);
959 Printf(
"TXProofServ::HandleError: %p: DONE ... ",
this);
971 Printf(
"TXProofServ::HandleInput %p, in: %p",
this, in);
983 Info(
"HandleInput",
"kXPD_flush: flushing log file (stdout)");
1005 Info(
"HandleInput",
"kXPD_urgent: unknown type: %d",
type);
1011 Warning(
"HandleInput",
"kXPD_inflate: obsolete message type");
1020 Info(
"HandleInput",
"kXPD_priority: group %s priority set to %f",
1030 Info(
"HandleInput",
"kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1073 Info(
"Terminate",
"starting session termination operations ...");
1083 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1127 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.
Data());
1143 Printf(
"Terminate: termination operations ended: quitting!");
1158 Info(
"LockSession",
"locker space undefined");
1165 TRegexp re(
"session-.*-.*-.*");
1168 Info(
"LockSession",
"bad format: %s", sessiontag);
1180 parlog = parlog.
Remove(parlog.
Index(
"master-")+strlen(
"master-"));
1183 Info(
"LockSession",
"parent still running: do nothing");
1193 if (((*lck)->Lock()) < 0) {
1194 Info(
"LockSession",
"problems locking query lock file");
1210 if (
gDebug > 2)
Info(
"ReleaseWorker",
"releasing: %s", ord);
R__EXTERN Int_t gProofDebugLevel
const char *const kPROOF_WorkDir
const Int_t kPROOF_Protocol
char * Form(const char *fmt,...)
@ kSigSegmentationViolation
R__EXTERN TSystem * gSystem
static volatile Int_t gProofServDebug
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
const char *const XPD_GW_QueryEnqueued
const char *const XPD_GW_Static
#define EnvPutInt(name, val)
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Bool_t IsEmpty() const
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
virtual void Add(TObject *obj)
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
virtual const char * GetName() const
Returns name of object.
Collectable string class.
const char * GetName() const
Returns name of object.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Named parameter, streamable and storable.
void SetVal(const AParamType &val)
const AParamType & GetVal() const
const char * GetName() const
Returns name of object.
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Int_t Unlock()
Unlock the directory.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetNodeName() const
TQueryResult version adapted to PROOF neeeds.
Class providing the PROOF server.
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
static Long_t fgResMemMax
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TProofLockPath * fQueryLock
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TVirtualProofPlayer * fPlayer
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
TShutdownTimer * fShutdownTimer
static Long_t fgVirtMemMax
TIdleTOTimer * fIdleTOTimer
static Int_t fgLogToSysLog
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked,...
TQueryResultManager * fQMgr
static TString fgSysLogEntity
void LogToMaster(Bool_t on=kTRUE)
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
This class controls a Parallel ROOT Facility, PROOF, cluster.
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Bool_t IsEndMaster() const
void Close(Option_t *option="")
Close all open slave servers.
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
void SetActive(Bool_t=kTRUE)
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'.
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
static const char * GetMacroPath()
Get macro search path. Static utility function.
Regular expression class.
virtual Bool_t Notify()
Notify when signal occurs.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
virtual Int_t GetDescriptor() const
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
virtual Bool_t IsValid() const
virtual Int_t Reconnect()
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
TString & Insert(Ssiz_t pos, const char *s)
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
const char * Data() const
TString & ReplaceAll(const TString &s1, const TString &s2)
Ssiz_t Last(char c) const
Find last occurrence of a character c.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
TString & Remove(Ssiz_t pos)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
static void ResetErrno()
Static function resetting system error number.
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
static Int_t GetErrno()
Static function returning system error number.
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
virtual int GetPid()
Get process id.
virtual const char * Getenv(const char *env)
Get environment variable.
virtual int MakeDirectory(const char *name)
Make a directory.
virtual Int_t Exec(const char *shellcmd)
Execute a command.
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
virtual void ExitLoop()
Exit from event loop.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
virtual const char * HostName()
Return the system's host name.
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
virtual int Unlink(const char *name)
Unlink, i.e.
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
void EnableTimeout()
Enable read timeout on the underlying socket.
Int_t Setup()
Print the ProofServ logo on standard output.
void Terminate(Int_t status)
Terminate the proof server.
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual ~TXProofServ()
Cleanup.
void DisableTimeout()
Disable read timeout on the underlying socket.
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
TXProofServInterruptHandler * fInterruptHandler
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
TXSocketHandler * fInputHandler
Int_t CreateServer()
Finalize the server setup.
void HandleUrgentData()
Handle high priority data sent by the master or client.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'.
High level handler of connections to XProofD.
static void SetLocation(const char *loc="")
Set location string.
Implementation of TXSocket using PF_UNIX sockets.
static constexpr double s
static constexpr double pc
static constexpr double pi