20#include "RConfigure.h"
29#include <netinet/in.h>
51#include "compiledata.h"
78 fServ->HandleSigPipe();
97 Printf(
"Received SIGTERM: terminating");
99 fServ->HandleTermination();
119 Printf(
"**** Segmentation violation: terminating ****");
121 fServ->HandleTermination();
141 fServ->HandleSocketInput();
187 Info(
"CreateServer",
"starting%s server creation", (xtest ?
" test" :
""));
193 Error(
"CreateServer",
"resolving the log file description number");
212 Error(
"CreateServer",
"test: socket setup by xpd undefined");
219 Error(
"CreateServer",
"test: sending protocol number");
226 Error(
"CreateServer",
"socket setup by xpd undefined");
229 TString entity =
gEnv->GetValue(
"ProofServ.Entity",
"");
236 const char *opensock =
gSystem->Getenv(
"ROOTOPENSOCK");
237 if (opensock && strlen(opensock) > 0) {
239 sockfd = (
Int_t) strtol(opensock, 0, 10);
242 Warning(
"CreateServer",
"socket descriptor: wrong conversion from '%s'", opensock);
245 Info(
"CreateServer",
"using open connection (descriptor %d)", sockfd);
249 Int_t psid =
gEnv->GetValue(
"ProofServ.SessionID", -1);
251 Error(
"CreateServer",
"Session ID undefined");
258 Error(
"CreateServer",
"Failed to open connection to XrdProofd coordinator");
284 Int_t cid =
gEnv->GetValue(
"ProofServ.ClientID", -1);
286 Error(
"CreateServer",
"Client ID undefined");
295 if (
gEnv->GetValue(
"Proof.GdbHook",0) == 1) {
301 if (
gEnv->GetValue(
"Proof.GdbHook",0) == 2) {
308 Info(
"CreateServer",
"Service: %s, ConfDir: %s, IsMaster: %d",
347 logon =
gEnv->GetValue(
"Proof.Load", (
char *)0);
356 logon =
gEnv->GetValue(
"Proof.Logon", (
char *)0);
375 master.
Form(
"proof://%s@__master__",
fUser.Data());
378 Int_t port =
gEnv->GetValue(
"ProofServ.XpdPort", -1);
387 gEnv->SetValue(
"Proof.ParallelStartup", 0);
392 Error(
"CreateServer",
"no plugin manager found");
401 Error(
"CreateServer",
"no plugin found for TProof with a"
402 " config file of '%s'",
fConfFile.Data());
409 if (
h->LoadPlugin() == -1) {
410 Error(
"CreateServer",
"plugin for TProof could not be loaded");
439 Error(
"CreateServer",
"plugin for TProof could not be executed");
464 msg.
Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
465 " This may generate compatibility problems between streamed objects.\n"
466 " The advise is to move to ROOT >= 5.21/02 .");
473 Int_t idle_to =
gEnv->GetValue(
"ProofServ.IdleTimeout", -1);
478 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
480 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
509 Error(
"HandleUrgentData",
"error receiving interrupt");
514 Info(
"HandleUrgentData",
"got interrupt: %d\n", iLev);
523 Info(
"HandleUrgentData",
"*** Ping");
529 Info(
"HandleUrgentData",
"%d slaves did not reply to ping",nbad);
542 Info(
"HandleUrgentData",
"problems touching path: %s",
fAdminPath.Data());
555 Error(
"HandleUrgentData",
"problems updating status path: %s (errno: %d)",
fAdminPath.Data(), -uss_rc);
558 Info(
"HandleUrgentData",
"admin path undefined");
564 Info(
"HandleUrgentData",
"*** Hard Interrupt");
579 Info(
"HandleUrgentData",
"Soft Interrupt");
594 Info(
"HandleUrgentData",
"Shutdown Interrupt");
602 Error(
"HandleUrgentData",
"unexpected type: %d", iLev);
617 Info(
"HandleSigPipe",
"got sigpipe ... do nothing");
634 fProof->InterruptCurrentMonitor();
636 Long_t timeout =
gEnv->GetValue(
"Proof.ShutdownTimeout", 60);
637 timeout = (timeout > 20) ? timeout : 20;
644 Warning(
"HandleTermination",
"processing could not be stopped");
663 snprintf(str, 512,
"**** Welcome to the PROOF server @ %s ****",
gSystem->HostName());
665 snprintf(str, 512,
"**** PROOF worker server @ %s started ****",
gSystem->HostName());
668 if (
fSocket->Send(str) != 1+
static_cast<Int_t>(strlen(str))) {
669 Error(
"Setup",
"failed to send proof server startup message");
674 if ((
fProtocol =
gEnv->GetValue(
"ProofServ.ClientVersion", -1)) < 0) {
675 Error(
"Setup",
"remote proof protocol missing");
680 fUser =
gEnv->GetValue(
"ProofServ.Entity",
"");
681 if (
fUser.Length() >= 0) {
682 if (
fUser.Contains(
":"))
684 if (
fUser.Contains(
"@"))
696 TString cf =
gEnv->GetValue(
"ProofServ.ProofConfFile",
"");
703 if ((
fSessionTag =
gEnv->GetValue(
"ProofServ.SessionTag",
"-1")) ==
"-1") {
704 Error(
"Setup",
"Session tag missing");
711 if (
gSystem->Getenv(
"ROOTPROOFLOGFILE")) {
722 Error(
"Setup",
"top session tag missing");
734 }
else if (nd != 1) {
735 Warning(
"Setup",
"Wrong number of '-' in session tag: protocol error? %s",
fSessionTag.Data());
744 if ((
fSessionDir =
gEnv->GetValue(
"ProofServ.SessionDir",
"-1")) ==
"-1") {
745 Error(
"Setup",
"Session dir missing");
752 Info(
"Setup",
"working directory set to %s",
fWorkDir.Data());
756 Error(
"Setup",
"common setup failed");
776 Info(
"Setup",
"successfully completed");
793 if (
gEnv->GetValue(
"ProofServ.UseUserCfg", 0) != 0) {
810 if (dynamicStartup) {
812 Int_t dynto =
gEnv->GetValue(
"Proof.DynamicStartupTimeout", -1);
815 if (doto > 0 && --dynto < 0)
break;
836 if (
gSystem->Getenv(
"PROOF_NWORKERS")) {
844 if (!dynamicStartup && (nwrks > 0)) {
848 msg.
Form(
"+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
850 msg.
Form(
"+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
868 Error(
"GetWorkers",
"no appropriate master line got from coordinator");
877 while (fl.
Tokenize(tok, from,
"&")) {
879 if (nwrks == -1 || nwrks > 0) {
882 if (pernode && nodecnt) {
892 Info(
"GetWorkers",
"%p: name: %s (%s) val: %d (nwrks: %d)",
895 if (workers) workers->
Add(ni);
906 if (nwrks != -1) nwrks--;
941 "%p: connection to local coordinator re-established",
this);
946 Printf(
"TXProofServ::HandleError: %p: got called ...",
this);
958 Printf(
"TXProofServ::HandleError: %p: DONE ... ",
this);
970 Printf(
"TXProofServ::HandleInput %p, in: %p",
this, in);
982 Info(
"HandleInput",
"kXPD_flush: flushing log file (stdout)");
997 fProof->StopProcess(abort, timeout);
1000 fPlayer->StopProcess(abort, timeout);
1004 Info(
"HandleInput",
"kXPD_urgent: unknown type: %d",
type);
1010 Warning(
"HandleInput",
"kXPD_inflate: obsolete message type");
1019 Info(
"HandleInput",
"kXPD_priority: group %s priority set to %f",
1029 Info(
"HandleInput",
"kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1072 Info(
"Terminate",
"starting session termination operations ...");
1081 if (!
gSystem->GetProcInfo(&pi)){
1082 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1093 gSystem->ChangeDirectory(
"/");
1103 gSystem->ChangeDirectory(
"/");
1119 fPlayer->StopProcess(abort,1);
1126 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.Data());
1142 Printf(
"Terminate: termination operations ended: quitting!");
1157 Info(
"LockSession",
"locker space undefined");
1164 TRegexp re(
"session-.*-.*-.*");
1167 Info(
"LockSession",
"bad format: %s", sessiontag);
1179 parlog = parlog.
Remove(parlog.
Index(
"master-")+strlen(
"master-"));
1181 if (!
gSystem->AccessPathName(parlog)) {
1182 Info(
"LockSession",
"parent still running: do nothing");
1190 if (!
gSystem->AccessPathName(qlock)) {
1192 if (((*lck)->Lock()) < 0) {
1193 Info(
"LockSession",
"problems locking query lock file");
1209 if (
gDebug > 2)
Info(
"ReleaseWorker",
"releasing: %s", ord);
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
R__EXTERN Int_t gProofDebugLevel
static volatile Int_t gProofServDebug
const char *const kPROOF_WorkDir
const Int_t kPROOF_Protocol
char * Form(const char *fmt,...)
Formats a string in a circular formatting buffer.
void Printf(const char *fmt,...)
Formats a string in a circular formatting buffer and prints the string.
@ kSigSegmentationViolation
R__EXTERN TSystem * gSystem
TApplication * GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
const char *const XPD_GW_QueryEnqueued
const char *const XPD_GW_Static
#define EnvPutInt(name, val)
This class creates the ROOT Application Environment that interfaces to the windowing system eventloop...
virtual Longptr_t ProcessLine(const char *line, Bool_t sync=kFALSE, Int_t *error=nullptr)
Process a single command line, either a C++ statement or an interpreter command starting with a "....
virtual Longptr_t ProcessFile(const char *file, Int_t *error=nullptr, Bool_t keep=kFALSE)
Process a file containing a C++ macro.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
void Add(TObject *obj) override
Collectable string class.
const char * GetName() const override
Returns name of object.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Named parameter, streamable and storable.
const char * GetName() const override
Returns name of object.
void SetVal(const AParamType &val)
const AParamType & GetVal() const
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=nullptr)
Returns the handler if there exists a handler for the specified URI.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetNodeName() const
TQueryResult version adapted to PROOF neeeds.
Int_t CatMotd()
Print message of the day (in the file pointed by the env PROOFMOTD or from fConfDir/etc/proof/motd).
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
static Long_t fgResMemMax
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TProofLockPath * fQueryLock
void RedirectOutput(const char *dir=0, const char *mode="w")
Redirect stdout to a log file.
Int_t SetupCommon()
Common part (between TProofServ and TXProofServ) of the setup phase.
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
TVirtualProofPlayer * fPlayer
TProofServ(Int_t *argc, char **argv, FILE *flog=0)
Main constructor.
virtual void HandleSocketInput()
Handle input coming from the client or from the master server.
TShutdownTimer * fShutdownTimer
static Long_t fgVirtMemMax
TIdleTOTimer * fIdleTOTimer
static Int_t fgLogToSysLog
Bool_t UnlinkDataDir(const char *path)
Scan recursively the datadir and unlink it if empty Return kTRUE if it can be unlinked,...
TQueryResultManager * fQMgr
static TString fgSysLogEntity
void LogToMaster(Bool_t on=kTRUE)
virtual void SendLogFile(Int_t status=0, Int_t start=-1, Int_t end=-1)
Send log file to master.
Int_t UpdateSessionStatus(Int_t xst=-1)
Update the session status in the relevant file.
This class controls a Parallel ROOT Facility, PROOF, cluster.
static const char * GetMacroPath()
Get macro search path. Static utility function.
Regular expression class.
Int_t Atoi() const
Return integer value of string.
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
const char * Data() const
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
TString & ReplaceAll(const TString &s1, const TString &s2)
Ssiz_t Last(char c) const
Find last occurrence of a character c.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TString & Remove(Ssiz_t pos)
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
static void ResetErrno()
Static function resetting system error number.
static Int_t GetErrno()
Static function returning system error number.
TXProofServSegViolationHandler(TXProofServ *s)
Bool_t Notify()
Notify when signal occurs.
Bool_t Notify()
Notify when signal occurs.
TXProofServSigPipeHandler(TXProofServ *s)
TXProofServTerminationHandler(TXProofServ *s)
Bool_t Notify()
Notify when signal occurs.
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
void ReleaseWorker(const char *ord) override
Send message to intermediate coordinator to release worker of last ordinal ord.
void HandleUrgentData() override
Handle high priority data sent by the master or client.
Int_t Setup() override
Print the ProofServ logo on standard output.
void Terminate(Int_t status) override
Terminate the proof server.
void HandleSigPipe() override
Called when the client is not alive anymore; terminate the session.
~TXProofServ() override
Cleanup.
Bool_t HandleError(const void *in=0) override
Handle error on the input socket.
void EnableTimeout() override
Enable read timeout on the underlying socket.
TXProofServInterruptHandler * fInterruptHandler
EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE) override
Get list of workers to be used from now on.
void DisableTimeout() override
Disable read timeout on the underlying socket.
Int_t CreateServer() override
Finalize the server setup.
Bool_t HandleInput(const void *in=0) override
Handle asynchronous input on the input socket.
Int_t LockSession(const char *sessiontag, TProofLockPath **lck)
Try locking query area of session tagged sessiontag.
TXSocketHandler * fInputHandler
void HandleTermination() override
Called when the client is not alive anymore; terminate the session.
static TXSocketHandler * GetSocketHandler(TFileHandler *h=0, TSocket *s=0)
Get an instance of the input socket handler with 'h' as handler, connected to socket 's'.
High level handler of connections to XProofD.
static void SetLocation(const char *loc="")
Set location string.
Implementation of TXSocket using PF_UNIX sockets.