#include "RConfigure.h"
#include "RConfig.h"
#include "Riostream.h"
#ifdef WIN32
#include <io.h>
typedef long off_t;
#include <snprintf.h>
#else
#include <netinet/in.h>
#endif
#include <sys/types.h>
#include <cstdlib>
#include "TProofServLite.h"
#include "TObjString.h"
#include "TEnv.h"
#include "TError.h"
#include "TException.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TMessage.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TProofPlayer.h"
#include "TProofQueryResult.h"
#include "TRegexp.h"
#include "TClass.h"
#include "TROOT.h"
#include "TSystem.h"
#include "TPluginManager.h"
#include "TSocket.h"
#include "TTimeStamp.h"
#include "compiledata.h"
using namespace std;
static volatile Int_t gProofServDebug = 1;
class TProofServLiteInterruptHandler : public TSignalHandler {
TProofServLite *fServ;
public:
TProofServLiteInterruptHandler(TProofServLite *s)
: TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TProofServLiteInterruptHandler::Notify()
{
fServ->HandleUrgentData();
if (TROOT::Initialized()) {
Throw(GetSignal());
}
return kTRUE;
}
class TProofServLiteSigPipeHandler : public TSignalHandler {
TProofServLite *fServ;
public:
TProofServLiteSigPipeHandler(TProofServLite *s) : TSignalHandler(kSigPipe, kFALSE)
{ fServ = s; }
Bool_t Notify();
};
Bool_t TProofServLiteSigPipeHandler::Notify()
{
fServ->HandleSigPipe();
return kTRUE;
}
class TProofServLiteTerminationHandler : public TSignalHandler {
TProofServLite *fServ;
public:
TProofServLiteTerminationHandler(TProofServLite *s)
: TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TProofServLiteTerminationHandler::Notify()
{
Printf("TProofServLiteTerminationHandler::Notify: wake up!");
fServ->HandleTermination();
return kTRUE;
}
class TProofServLiteSegViolationHandler : public TSignalHandler {
TProofServLite *fServ;
public:
TProofServLiteSegViolationHandler(TProofServLite *s)
: TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TProofServLiteSegViolationHandler::Notify()
{
Printf("**** ");
Printf("**** Segmentation violation: terminating ****");
Printf("**** ");
fServ->HandleTermination();
return kTRUE;
}
class TProofServLiteInputHandler : public TFileHandler {
TProofServLite *fServ;
public:
TProofServLiteInputHandler(TProofServLite *s, Int_t fd) : TFileHandler(fd, 1)
{ fServ = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
Bool_t TProofServLiteInputHandler::Notify()
{
fServ->HandleSocketInput();
return kTRUE;
}
ClassImp(TProofServLite)
extern "C" {
TApplication *GetTProofServLite(Int_t *argc, char **argv, FILE *flog)
{ return new TProofServLite(argc, argv, flog); }
}
TProofServLite::TProofServLite(Int_t *argc, char **argv, FILE *flog)
: TProofServ(argc, argv, flog)
{
fInterruptHandler = 0;
fTerminated = kFALSE;
}
Int_t TProofServLite::CreateServer()
{
if (gProofDebugLevel > 0)
Info("CreateServer", "starting server creation");
if (fLogFile) {
if ((fLogFileDes = fileno(fLogFile)) < 0) {
Error("CreateServer", "resolving the log file description number");
return -1;
}
}
fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
if (fSockPath.Length() <= 0) {
Error("CreateServer", "Socket setup by xpd undefined");
return -1;
}
TString entity = gEnv->GetValue("ProofServ.Entity", "");
if (entity.Length() > 0)
fSockPath.Insert(0,TString::Format("%s/", entity.Data()));
fSocket = new TSocket(fSockPath);
if (!fSocket || !(fSocket->IsValid())) {
Error("CreateServer", "Failed to open connection to the client");
return -1;
}
TMessage msg;
msg << fOrdinal;
fSocket->Send(msg);
Int_t sock = fSocket->GetDescriptor();
fInterruptHandler = new TProofServLiteInterruptHandler(this);
gSystem->AddSignalHandler(fInterruptHandler);
gSystem->AddFileHandler(new TProofServLiteInputHandler(this, sock));
if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
while (gProofServDebug)
;
}
if (gProofDebugLevel > 0)
Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
if (Setup() == -1) {
Terminate(0);
SendLogFile();
return -1;
}
if (!fLogFile) {
RedirectOutput();
if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
Terminate(0);
SendLogFile(-98);
return -1;
}
}
ProcessLine("#include <iostream>", kTRUE);
ProcessLine("#include <string>",kTRUE);
const char *logon;
logon = gEnv->GetValue("Proof.Load", (char *)0);
if (logon) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessLine(TString::Format(".L %s", logon), kTRUE);
delete [] mac;
}
logon = gEnv->GetValue("Proof.Logon", (char *)0);
if (logon && !NoLogOpt()) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessFile(logon);
delete [] mac;
}
gInterpreter->SaveContext();
gInterpreter->SaveGlobalsContext();
FlushLogFile();
return 0;
}
TProofServLite::~TProofServLite()
{
delete fSocket;
}
void TProofServLite::HandleSigPipe()
{
Terminate(0);
}
void TProofServLite::HandleTermination()
{
Terminate(0);
}
Int_t TProofServLite::Setup()
{
char str[512];
if (IsMaster()) {
snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
} else {
snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
}
if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
Error("Setup", "failed to send proof server startup message");
return -1;
}
if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
Error("Setup", "remote proof protocol missing");
return -1;
}
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUser = pw->fUser;
delete pw;
}
fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
Info("Setup", "fWorkDir: %s", fWorkDir.Data());
fTopSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1");
fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), gSystem->HostName(),
(Long_t)TTimeStamp().GetSec(), gSystem->GetPid());
if (gProofDebugLevel > 0)
Info("Setup", "session tag is %s", fSessionTag.Data());
if (fTopSessionTag.IsNull()) fTopSessionTag = fSessionTag;
TMessage m(kPROOF_SESSIONTAG);
m << fSessionTag;
fSocket->Send(m);
if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
Error("Setup", "Session dir missing");
return -1;
}
if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
TString logfile = gSystem->Getenv("ROOTPROOFLOGFILE");
Int_t iord = logfile.Index(TString::Format("-%s", fOrdinal.Data()));
if (iord != kNPOS) logfile.Remove(iord);
logfile += TString::Format("-%s.log", fSessionTag.Data());
gSystem->Symlink(gSystem->Getenv("ROOTPROOFLOGFILE"), logfile);
}
char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
fWorkDir = workdir;
delete [] workdir;
if (gProofDebugLevel > 0)
Info("Setup", "working directory set to %s", fWorkDir.Data());
if (SetupCommon() != 0) {
Error("Setup", "common setup failed");
return -1;
}
fSocket->SetOption(kKeepAlive, 1);
gSystem->AddSignalHandler(new TProofServLiteSigPipeHandler(this));
gSystem->AddSignalHandler(new TProofServLiteTerminationHandler(this));
gSystem->AddSignalHandler(new TProofServLiteSegViolationHandler(this));
return 0;
}
void TProofServLite::Terminate(Int_t status)
{
if (fTerminated)
exit(1);
fTerminated = kTRUE;
Info("Terminate", "starting session termination operations ...");
if (status == 0) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fSessionDir+"/.delete");
gSystem->Exec(TString::Format("%s %s", kRM, fSessionDir.Data()));
}
if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
if (UnlinkDataDir(fDataDir))
Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
}
gSystem->RemoveSignalHandler(fInterruptHandler);
gSystem->ExitLoop();
Printf("Terminate: termination operations ended: quitting!");
}
void TProofServLite::HandleFork(TMessage *mess)
{
if (!mess) {
Error("HandleFork", "empty message!");
return;
}
TString clones;
(*mess) >> clones;
PDB(kGlobal, 1)
Info("HandleFork", "cloning to %s", clones.Data());
TString clone;
Int_t from = 0;
while (clones.Tokenize(clone, from, " ")) {
Int_t rc = 0;
if ((rc = Fork()) < 0) {
Error("HandleFork", "failed to fork %s", clone.Data());
return;
}
if (rc == 0) {
SetupOnFork(clone.Data());
return;
}
}
return;
}
Int_t TProofServLite::SetupOnFork(const char *ord)
{
if (gProofDebugLevel > 0)
Info("SetupOnFork", "finalizing setup of %s", ord);
fOrdinal = ord;
TString sord;
sord.Form("-%s", fOrdinal.Data());
if (fLogFile) {
fclose(fLogFile);
fLogFileDes = -1;
}
TString sdir = gSystem->DirName(fSessionDir.Data());
RedirectOutput(sdir.Data(), "a");
if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
Terminate(0);
return -1;
}
FlushLogFile();
void *dirp = gSystem->OpenDirectory(sdir);
if (dirp) {
TString ent;
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
ent.Form("%s/%s", sdir.Data(), e);
FileStat_t st;
if (gSystem->GetPathInfo(ent.Data(), st) == 0) {
if (st.fIsLink && ent.Contains(sord)) {
PDB(kGlobal, 1)
Info("SetupOnFork","unlinking: %s", ent.Data());
gSystem->Unlink(ent);
}
}
}
gSystem->FreeDirectory(dirp);
}
fSessionTag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
TString logfile = gSystem->Getenv("ROOTPROOFLOGFILE");
logfile.ReplaceAll("-0.0", sord.Data());
gSystem->Setenv("ROOTPROOFLOGFILE", logfile);
Int_t iord = logfile.Index(sord.Data());
if (iord != kNPOS) logfile.Remove(iord + sord.Length());
logfile += TString::Format("-%s.log", fSessionTag.Data());
gSystem->Symlink(gSystem->Getenv("ROOTPROOFLOGFILE"), logfile);
fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
if (fSockPath.Length() <= 0) {
Error("CreateServer", "Socket setup by xpd undefined");
return -1;
}
TString entity = gEnv->GetValue("ProofServ.Entity", "");
if (entity.Length() > 0)
fSockPath.Insert(0, TString::Format("%s/", entity.Data()));
fSocket = new TSocket(fSockPath);
if (!fSocket || !(fSocket->IsValid())) {
Error("CreateServer", "Failed to open connection to the client");
return -1;
}
TMessage msg;
msg << fOrdinal;
fSocket->Send(msg);
Int_t sock = fSocket->GetDescriptor();
fInterruptHandler = new TProofServLiteInterruptHandler(this);
gSystem->AddSignalHandler(fInterruptHandler);
gSystem->AddFileHandler(new TProofServLiteInputHandler(this, sock));
if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
while (gProofServDebug)
;
}
if (gProofDebugLevel > 0)
Info("SetupOnFork", "Service: %s, ConfDir: %s, IsMaster: %d",
fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
if (Setup() == -1) {
Terminate(0);
SendLogFile();
return -1;
}
ProcessLine("#define ROOT_Rtypes 0", kTRUE);
ProcessLine("#define ROOT_TError 0", kTRUE);
ProcessLine("#define ROOT_TGenericClassInfo 0", kTRUE);
gInterpreter->SaveContext();
gInterpreter->SaveGlobalsContext();
return 0;
}