20 #include "RConfigure.h"
28 #include <sys/types.h>
29 #include <netinet/in.h>
51 #include "compiledata.h"
55 #include <XrdClient/XrdClientConst.hh>
56 #include <XrdClient/XrdClientEnv.hh>
77 fServ->HandleSigPipe();
96 Printf(
"Received SIGTERM: terminating");
98 fServ->HandleTermination();
118 Printf(
"**** Segmentation violation: terminating ****");
120 fServ->HandleTermination();
140 fServ->HandleSocketInput();
142 ((
TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
186 Info(
"CreateServer",
"starting%s server creation", (xtest ?
" test" :
""));
192 Error(
"CreateServer",
"resolving the log file description number");
211 Error(
"CreateServer",
"test: socket setup by xpd undefined");
217 if (
write(fpw, &proto,
sizeof(proto)) !=
sizeof(proto)) {
218 Error(
"CreateServer",
"test: sending protocol number");
225 Error(
"CreateServer",
"socket setup by xpd undefined");
236 if (opensock && strlen(opensock) > 0) {
238 sockfd = (
Int_t) strtol(opensock, 0, 10);
241 Warning(
"CreateServer",
"socket descriptor: wrong conversion from '%s'", opensock);
244 Info(
"CreateServer",
"using open connection (descriptor %d)", sockfd);
250 Error(
"CreateServer",
"Session ID undefined");
257 Error(
"CreateServer",
"Failed to open connection to XrdProofd coordinator");
285 Error(
"CreateServer",
"Client ID undefined");
295 while (gProofServDebug)
301 while (gProofServDebug)
307 Info(
"CreateServer",
"Service: %s, ConfDir: %s, IsMaster: %d",
391 Error(
"CreateServer",
"no plugin manager found");
400 Error(
"CreateServer",
"no plugin found for TProof with a"
409 Error(
"CreateServer",
"plugin for TProof could not be loaded");
438 Error(
"CreateServer",
"plugin for TProof could not be executed");
463 msg.
Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
464 " This may generate compatibility problems between streamed objects.\n"
465 " The advise is to move to ROOT >= 5.21/02 .");
477 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
479 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
508 Error(
"HandleUrgentData",
"error receiving interrupt");
513 Info(
"HandleUrgentData",
"got interrupt: %d\n", iLev);
522 Info(
"HandleUrgentData",
"*** Ping");
528 Info(
"HandleUrgentData",
"%d slaves did not reply to ping",nbad);
554 Error(
"HandleUrgentData",
"problems updating status path: %s (errno: %d)",
fAdminPath.
Data(), -uss_rc);
557 Info(
"HandleUrgentData",
"admin path undefined");
563 Info(
"HandleUrgentData",
"*** Hard Interrupt");
578 Info(
"HandleUrgentData",
"Soft Interrupt");
593 Info(
"HandleUrgentData",
"Shutdown Interrupt");
601 Error(
"HandleUrgentData",
"unexpected type: %d", iLev);
616 Info(
"HandleSigPipe",
"got sigpipe ... do nothing");
636 timeout = (timeout > 20) ? timeout : 20;
643 Warning(
"HandleTermination",
"processing could not be stopped");
662 snprintf(str, 512,
"**** Welcome to the PROOF server @ %s ****",
gSystem->
HostName());
664 snprintf(str, 512,
"**** PROOF worker server @ %s started ****",
gSystem->
HostName());
668 Error(
"Setup",
"failed to send proof server startup message");
674 Error(
"Setup",
"remote proof protocol missing");
703 Error(
"Setup",
"Session tag missing");
721 Error(
"Setup",
"top session tag missing");
733 }
else if (nd != 1) {
744 Error(
"Setup",
"Session dir missing");
757 Error(
"Setup",
"common setup failed");
777 Info(
"Setup",
"successfully completed");
811 if (dynamicStartup) {
816 if (doto > 0 && --dynto < 0)
break;
839 if (s.EndsWith(
"x")) {
841 s.ReplaceAll(
"x",
"");
845 if (!dynamicStartup && (nwrks > 0)) {
849 msg.
Form(
"+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
851 msg.
Form(
"+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
865 if (fl.Tokenize(tok, from,
"&")) {
869 Error(
"GetWorkers",
"no appropriate master line got from coordinator");
878 while (fl.Tokenize(tok, from,
"&")) {
880 if (nwrks == -1 || nwrks > 0) {
883 if (pernode && nodecnt) {
893 Info(
"GetWorkers",
"%p: name: %s (%s) val: %d (nwrks: %d)",
896 if (workers) workers->
Add(ni);
907 if (nwrks != -1) nwrks--;
942 "%p: connection to local coordinator re-established",
this);
947 Printf(
"TXProofServ::HandleError: %p: got called ...",
this);
959 Printf(
"TXProofServ::HandleError: %p: DONE ... ",
this);
971 Printf(
"TXProofServ::HandleInput %p, in: %p",
this, in);
983 Info(
"HandleInput",
"kXPD_flush: flushing log file (stdout)");
1005 Info(
"HandleInput",
"kXPD_urgent: unknown type: %d", type);
1011 Warning(
"HandleInput",
"kXPD_inflate: obsolete message type");
1020 Info(
"HandleInput",
"kXPD_priority: group %s priority set to %f",
1030 Info(
"HandleInput",
"kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1073 Info(
"Terminate",
"starting session termination operations ...");
1083 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1127 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.
Data());
1143 Printf(
"Terminate: termination operations ended: quitting!");
1158 Info(
"LockSession",
"locker space undefined");
1165 TRegexp re(
"session-.*-.*-.*");
1168 Info(
"LockSession",
"bad format: %s", sessiontag);
1180 parlog = parlog.
Remove(parlog.
Index(
"master-")+strlen(
"master-"));
1183 Info(
"LockSession",
"parent still running: do nothing");
1193 if (((*lck)->Lock()) < 0) {
1194 Info(
"LockSession",
"problems locking query lock file");
1210 if (
gDebug > 2)
Info(
"ReleaseWorker",
"releasing: %s", ord);
const char * GetName() const
Returns name of object.
TXSocketHandler * fInputHandler
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Ssiz_t Last(char c) const
Find last occurrence of a character c.
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
Int_t Setup()
Print the ProofServ logo on standard output.
static void SetLocation(const char *loc="")
Set location string.
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "...
virtual int GetPid()
Get process id.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
void LogToMaster(Bool_t on=kTRUE)
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
static Long_t fgResMemMax
void SetCompressionSettings(Int_t settings=1)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level...
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
Int_t CreateServer()
Finalize the server setup.
Collectable string class.
virtual Bool_t IsValid() const
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'. ...
virtual Int_t Reconnect()
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'...
TString & ReplaceAll(const TString &s1, const TString &s2)
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TVirtualProofPlayer * fPlayer
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked, kFALSE otherwise.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
const char *const kPROOF_WorkDir
virtual int MakeDirectory(const char *name)
Make a directory.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Regular expression class.
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
const char *const XPD_GW_QueryEnqueued
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
void SetVal(const AParamType &val)
Int_t LoadPlugin()
Load the plugin library for this handler.
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
Long_t ExecPlugin(int nargs, const T &...params)
void HandleUrgentData()
Handle high priority data sent by the master or client.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
void DisableTimeout()
Disable read timeout on the underlying socket.
TString & Insert(Ssiz_t pos, const char *s)
void EnableTimeout()
Enable read timeout on the underlying socket.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
virtual Int_t GetDescriptor() const
static Long_t fgVirtMemMax
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
static const char * GetMacroPath()
Get macro search path. Static utility function.
static Int_t GetErrno()
Static function returning system error number.
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
Implementation of TXSocket using PF_UNIX sockets.
const char * Data() const
virtual Bool_t Notify()
Notify when signal occurs.
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
virtual Bool_t IsEmpty() const
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
TXProofServInterruptHandler * fInterruptHandler
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
virtual const char * Getenv(const char *env)
Get environment variable.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
if(pyself &&pyself!=Py_None)
virtual void ExitLoop()
Exit from event loop.
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
const TString & GetOrdinal() const
const char Int_t const char TProof Int_t const char * workdir
void Terminate(Int_t status)
Terminate the proof server.
Named parameter, streamable and storable.
static Int_t fgLogToSysLog
The purpose of this class is to provide a complete node description for masters, submasters and worke...
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
R__EXTERN TSystem * gSystem
Class providing the PROOF server.
const char * GetName() const
Returns name of object.
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
virtual ~TXProofServ()
Cleanup.
High level handler of connections to XProofD.
void SetActive(Bool_t=kTRUE)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
TIdleTOTimer * fIdleTOTimer
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
char * Form(const char *fmt,...)
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
This class implements a plugin library manager.
virtual const char * GetName() const
Returns name of object.
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
virtual Int_t Exec(const char *shellcmd)
Execute a command.
const Int_t kPROOF_Protocol
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
TShutdownTimer * fShutdownTimer
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
TString & Remove(Ssiz_t pos)
void Close(Option_t *option="")
Close all open slave servers.
virtual Int_t GetSize() const
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
virtual const char * HostName()
Return the system's host name.
This class controls a Parallel ROOT Facility, PROOF, cluster.
ClassImp(TXProofServ) extern"C"
Int_t Unlock()
Unlock the directory.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
TQueryResult version adapted to PROOF neeeds.
static TString fgSysLogEntity
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
const char *const XPD_GW_Static
TXProofServ(Int_t *argc, char **argv, FILE *flog=0)
Main constructor.
virtual void Add(TObject *obj)
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
R__EXTERN Int_t gProofDebugLevel
const TString & GetNodeName() const
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
static void ResetErrno()
Static function resetting system error number.
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
const AParamType & GetVal() const
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
const TString & GetImage() const
virtual void SetTitle(const char *title="")
Change (i.e. set) the title of the TNamed.
static volatile Int_t gProofServDebug
Bool_t IsEndMaster() const
TQueryResultManager * fQMgr
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
TProofLockPath * fQueryLock
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.