20#include "RConfigure.h"
29#include <netinet/in.h>
51#include "compiledata.h"
76Bool_t TXProofServSigPipeHandler::Notify()
78 fServ->HandleSigPipe();
95Bool_t TXProofServTerminationHandler::Notify()
97 Printf(
"Received SIGTERM: terminating");
99 fServ->HandleTermination();
116Bool_t TXProofServSegViolationHandler::Notify()
119 Printf(
"**** Segmentation violation: terminating ****");
121 fServ->HandleTermination();
139Bool_t TXProofServInputHandler::Notify()
141 fServ->HandleSocketInput();
143 ((
TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
187 Info(
"CreateServer",
"starting%s server creation", (xtest ?
" test" :
""));
193 Error(
"CreateServer",
"resolving the log file description number");
212 Error(
"CreateServer",
"test: socket setup by xpd undefined");
219 Error(
"CreateServer",
"test: sending protocol number");
226 Error(
"CreateServer",
"socket setup by xpd undefined");
237 if (opensock && strlen(opensock) > 0) {
239 sockfd = (
Int_t) strtol(opensock, 0, 10);
242 Warning(
"CreateServer",
"socket descriptor: wrong conversion from '%s'", opensock);
245 Info(
"CreateServer",
"using open connection (descriptor %d)", sockfd);
251 Error(
"CreateServer",
"Session ID undefined");
258 Error(
"CreateServer",
"Failed to open connection to XrdProofd coordinator");
286 Error(
"CreateServer",
"Client ID undefined");
308 Info(
"CreateServer",
"Service: %s, ConfDir: %s, IsMaster: %d",
392 Error(
"CreateServer",
"no plugin manager found");
401 Error(
"CreateServer",
"no plugin found for TProof with a"
409 if (
h->LoadPlugin() == -1) {
410 Error(
"CreateServer",
"plugin for TProof could not be loaded");
439 Error(
"CreateServer",
"plugin for TProof could not be executed");
464 msg.
Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
465 " This may generate compatibility problems between streamed objects.\n"
466 " The advise is to move to ROOT >= 5.21/02 .");
478 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
480 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
509 Error(
"HandleUrgentData",
"error receiving interrupt");
514 Info(
"HandleUrgentData",
"got interrupt: %d\n", iLev);
523 Info(
"HandleUrgentData",
"*** Ping");
529 Info(
"HandleUrgentData",
"%d slaves did not reply to ping",nbad);
555 Error(
"HandleUrgentData",
"problems updating status path: %s (errno: %d)",
fAdminPath.
Data(), -uss_rc);
558 Info(
"HandleUrgentData",
"admin path undefined");
564 Info(
"HandleUrgentData",
"*** Hard Interrupt");
579 Info(
"HandleUrgentData",
"Soft Interrupt");
594 Info(
"HandleUrgentData",
"Shutdown Interrupt");
602 Error(
"HandleUrgentData",
"unexpected type: %d", iLev);
617 Info(
"HandleSigPipe",
"got sigpipe ... do nothing");
637 timeout = (timeout > 20) ? timeout : 20;
644 Warning(
"HandleTermination",
"processing could not be stopped");
669 Error(
"Setup",
"failed to send proof server startup message");
675 Error(
"Setup",
"remote proof protocol missing");
704 Error(
"Setup",
"Session tag missing");
722 Error(
"Setup",
"top session tag missing");
734 }
else if (nd != 1) {
745 Error(
"Setup",
"Session dir missing");
756 Error(
"Setup",
"common setup failed");
776 Info(
"Setup",
"successfully completed");
810 if (dynamicStartup) {
815 if (doto > 0 && --dynto < 0)
break;
844 if (!dynamicStartup && (nwrks > 0)) {
848 msg.
Form(
"+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
850 msg.
Form(
"+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
868 Error(
"GetWorkers",
"no appropriate master line got from coordinator");
877 while (fl.
Tokenize(tok, from,
"&")) {
879 if (nwrks == -1 || nwrks > 0) {
882 if (pernode && nodecnt) {
892 Info(
"GetWorkers",
"%p: name: %s (%s) val: %d (nwrks: %d)",
895 if (workers) workers->
Add(ni);
906 if (nwrks != -1) nwrks--;
941 "%p: connection to local coordinator re-established",
this);
946 Printf(
"TXProofServ::HandleError: %p: got called ...",
this);
958 Printf(
"TXProofServ::HandleError: %p: DONE ... ",
this);
970 Printf(
"TXProofServ::HandleInput %p, in: %p",
this, in);
982 Info(
"HandleInput",
"kXPD_flush: flushing log file (stdout)");
1004 Info(
"HandleInput",
"kXPD_urgent: unknown type: %d",
type);
1010 Warning(
"HandleInput",
"kXPD_inflate: obsolete message type");
1019 Info(
"HandleInput",
"kXPD_priority: group %s priority set to %f",
1029 Info(
"HandleInput",
"kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1072 Info(
"Terminate",
"starting session termination operations ...");
1082 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1126 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.
Data());
1142 Printf(
"Terminate: termination operations ended: quitting!");
1157 Info(
"LockSession",
"locker space undefined");
1164 TRegexp re(
"session-.*-.*-.*");
1167 Info(
"LockSession",
"bad format: %s", sessiontag);
1179 parlog = parlog.
Remove(parlog.
Index(
"master-")+strlen(
"master-"));
1182 Info(
"LockSession",
"parent still running: do nothing");
1192 if (((*lck)->Lock()) < 0) {
1193 Info(
"LockSession",
"problems locking query lock file");
1209 if (
gDebug > 2)
Info(
"ReleaseWorker",
"releasing: %s", ord);
R__EXTERN Int_t gProofDebugLevel
static volatile Int_t gProofServDebug
const char *const kPROOF_WorkDir
const Int_t kPROOF_Protocol
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
@ kSigSegmentationViolation
R__EXTERN TSystem * gSystem
static volatile Int_t gProofServDebug
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
const char *const XPD_GW_QueryEnqueued
const char *const XPD_GW_Static
#define EnvPutInt(name, val)
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
virtual Long_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=0)
Process a single command line, either a C++ statement or an interpreter command starting with a "....
virtual Long_t ProcessFile(const char *file, Int_t *error=0, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Bool_t IsEmpty() const
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
Set the value of a resource or create a new resource.
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
virtual void Add(TObject *obj)
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
virtual const char * GetName() const
Returns name of object.
Collectable string class.
const char * GetName() const
Returns name of object.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Named parameter, streamable and storable.
void SetVal(const AParamType &val)
const AParamType & GetVal() const
const char * GetName() const
Returns name of object.
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Int_t Unlock()
Unlock the directory.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetNodeName() const
TQueryResult version adapted to PROOF neeeds.
Class providing the PROOF server.
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
static Long_t fgResMemMax
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TProofLockPath * fQueryLock
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TVirtualProofPlayer * fPlayer
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
TShutdownTimer * fShutdownTimer
static Long_t fgVirtMemMax
TIdleTOTimer * fIdleTOTimer
static Int_t fgLogToSysLog
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked,...
TQueryResultManager * fQMgr
static TString fgSysLogEntity
void LogToMaster(Bool_t on=kTRUE)
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
This class controls a Parallel ROOT Facility, PROOF, cluster.
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Bool_t IsEndMaster() const
void Close(Option_t *option="")
Close all open slave servers.
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
void SetActive(Bool_t=kTRUE)
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'.
Int_t Ping(ESlaves list)
Ping PROOF slaves. Returns the number of slaves that responded.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
static const char * GetMacroPath()
Get macro search path. Static utility function.
Regular expression class.
virtual Bool_t Notify()
Notify when signal occurs.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
virtual Int_t GetDescriptor() const
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
virtual Bool_t IsValid() const
virtual Int_t Reconnect()
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
TString & Insert(Ssiz_t pos, const char *s)
Int_t Atoi() const
Return integer value of string.
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
const char * Data() const
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
TString & ReplaceAll(const TString &s1, const TString &s2)
Ssiz_t Last(char c) const
Find last occurrence of a character c.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
TString & Remove(Ssiz_t pos)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
virtual void AddFileHandler(TFileHandler *fh)
Add a file handler to the list of system file handlers.
virtual void Syslog(ELogLevel level, const char *mess)
Send mess to syslog daemon.
static void ResetErrno()
Static function resetting system error number.
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
static Int_t GetErrno()
Static function returning system error number.
virtual int GetPid()
Get process id.
virtual const char * Getenv(const char *env)
Get environment variable.
virtual int MakeDirectory(const char *name)
Make a directory.
virtual Int_t Exec(const char *shellcmd)
Execute a command.
virtual TFileHandler * RemoveFileHandler(TFileHandler *fh)
Remove a file handler from the list of file handlers.
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
virtual void ExitLoop()
Exit from event loop.
virtual Bool_t ChangeDirectory(const char *path)
Change directory.
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
virtual const char * HostName()
Return the system's host name.
virtual void Sleep(UInt_t milliSec)
Sleep milliSec milli seconds.
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
virtual TString GetDirName(const char *pathname)
Return the directory name in pathname.
virtual int Unlink(const char *name)
Unlink, i.e.
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
virtual void Start(Long_t milliSec=-1, Bool_t singleShot=kFALSE)
Starts the timer with a milliSec timeout.
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
void EnableTimeout()
Enable read timeout on the underlying socket.
Int_t Setup()
Print the ProofServ logo on standard output.
void Terminate(Int_t status)
Terminate the proof server.
void HandleTermination()
Called when the client is not alive anymore; terminate the session.
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual ~TXProofServ()
Cleanup.
void DisableTimeout()
Disable read timeout on the underlying socket.
void HandleSigPipe()
Called when the client is not alive anymore; terminate the session.
Bool_t HandleError(const void *in=0)
Handle error on the input socket.
Bool_t HandleInput(const void *in=0)
Handle asynchronous input on the input socket.
TXProofServInterruptHandler * fInterruptHandler
void ReleaseWorker(const char *ord)
Send message to intermediate coordinator to release worker of last ordinal ord.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
TXSocketHandler * fInputHandler
Int_t CreateServer()
Finalize the server setup.
void HandleUrgentData()
Handle high priority data sent by the master or client.
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'.
High level handler of connections to XProofD.
static void SetLocation(const char *loc="")
Set location string.
Implementation of TXSocket using PF_UNIX sockets.