#include "RConfigure.h"
#include "RConfig.h"
#include "Riostream.h"
#ifdef WIN32
#include <process.h>
#include <io.h>
#include "snprintf.h"
typedef long off_t;
#endif
#include <errno.h>
#include <time.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#ifndef WIN32
#include <sys/wait.h>
#endif
#include <cstdlib>
#include <exception>
#include <new>
using namespace std;
#if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
(defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
(MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
#include <sys/file.h>
#define lockf(fd, op, sz) flock((fd), (op))
#ifndef F_LOCK
#define F_LOCK (LOCK_EX | LOCK_NB)
#endif
#ifndef F_ULOCK
#define F_ULOCK LOCK_UN
#endif
#endif
#include "TProofServ.h"
#include "TDSetProxy.h"
#include "TEnv.h"
#include "TError.h"
#include "TEventList.h"
#include "TEntryList.h"
#include "TException.h"
#include "TFile.h"
#include "THashList.h"
#include "TInterpreter.h"
#include "TKey.h"
#include "TMessage.h"
#include "TVirtualPerfStats.h"
#include "TProofDebug.h"
#include "TProof.h"
#include "TVirtualProofPlayer.h"
#include "TProofQueryResult.h"
#include "TQueryResultManager.h"
#include "TRegexp.h"
#include "TROOT.h"
#include "TSocket.h"
#include "TStopwatch.h"
#include "TSystem.h"
#include "TTimeStamp.h"
#include "TUrl.h"
#include "TPluginManager.h"
#include "TObjString.h"
#include "compiledata.h"
#include "TProofResourcesStatic.h"
#include "TProofNodeInfo.h"
#include "TFileInfo.h"
#include "TMutex.h"
#include "TClass.h"
#include "TSQLServer.h"
#include "TSQLResult.h"
#include "TSQLRow.h"
#include "TPRegexp.h"
#include "TParameter.h"
#include "TMap.h"
#include "TSortedList.h"
#include "TParameter.h"
#include "TFileCollection.h"
#include "TLockFile.h"
#include "TDataSetManagerFile.h"
#include "TProofProgressStatus.h"
#include "TServerSocket.h"
#include "TMonitor.h"
#include "TFunction.h"
#include "TMethodArg.h"
#include "TMethodCall.h"
#include "TProofOutputFile.h"
#include "TSelector.h"
TProofServ *gProofServ = 0;
static volatile Int_t gProofServDebug = 1;
Int_t TProofServ::fgLogToSysLog = 0;
TString TProofServ::fgSysLogService("proof");
TString TProofServ::fgSysLogEntity("undef:default");
FILE *TProofServ::fgErrorHandlerFile = 0;
#ifdef __APPLE__
extern "C" {
static const char *__crashreporter_info__ = 0;
asm(".desc ___crashreporter_info__, 0x10");
}
#endif
Int_t TProofServ::fgRecursive = 0;
TString TProofServ::fgLastMsg("<undef>");
Long64_t TProofServ::fgLastEntry = -1;
Long_t TProofServ::fgVirtMemMax = -1;
Long_t TProofServ::fgResMemMax = -1;
Float_t TProofServ::fgMemHWM = 0.80;
Float_t TProofServ::fgMemStop = 0.95;
class TProofServTerminationHandler : public TSignalHandler {
TProofServ *fServ;
public:
TProofServTerminationHandler(TProofServ *s)
: TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TProofServTerminationHandler::Notify()
{
Printf("Received SIGTERM: terminating");
fServ->HandleTermination();
return kTRUE;
}
class TProofServInterruptHandler : public TSignalHandler {
TProofServ *fServ;
public:
TProofServInterruptHandler(TProofServ *s)
: TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
Bool_t Notify();
};
Bool_t TProofServInterruptHandler::Notify()
{
fServ->HandleUrgentData();
if (TROOT::Initialized()) {
Throw(GetSignal());
}
return kTRUE;
}
class TProofServSigPipeHandler : public TSignalHandler {
TProofServ *fServ;
public:
TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
{ fServ = s; }
Bool_t Notify();
};
Bool_t TProofServSigPipeHandler::Notify()
{
fServ->HandleSigPipe();
return kTRUE;
}
class TProofServInputHandler : public TFileHandler {
TProofServ *fServ;
public:
TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
{ fServ = s; }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
};
Bool_t TProofServInputHandler::Notify()
{
fServ->HandleSocketInput();
return kTRUE;
}
TString TProofServLogHandler::fgPfx = "";
Int_t TProofServLogHandler::fgCmdRtn = 0;
TProofServLogHandler::TProofServLogHandler(const char *cmd,
TSocket *s, const char *pfx)
: TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
{
ResetBit(kFileIsPipe);
fgCmdRtn = 0;
fFile = 0;
if (s && cmd) {
fFile = gSystem->OpenPipe(cmd, "r");
if (fFile) {
SetFd(fileno(fFile));
Notify();
SetBit(kFileIsPipe);
} else {
fSocket = 0;
Error("TProofServLogHandler", "executing command in pipe");
fgCmdRtn = -1;
}
} else {
Error("TProofServLogHandler",
"undefined command (%p) or socket (%p)", (int *)cmd, s);
}
}
TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s, const char *pfx)
: TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
{
ResetBit(kFileIsPipe);
fgCmdRtn = 0;
fFile = 0;
if (s && f) {
fFile = f;
SetFd(fileno(fFile));
Notify();
} else {
Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
}
}
TProofServLogHandler::~TProofServLogHandler()
{
if (TestBit(kFileIsPipe) && fFile) {
Int_t rc = gSystem->ClosePipe(fFile);
#ifdef WIN32
fgCmdRtn = rc;
#else
fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
#endif
}
fFile = 0;
fSocket = 0;
ResetBit(kFileIsPipe);
}
Bool_t TProofServLogHandler::Notify()
{
if (IsValid()) {
TMessage m(kPROOF_MESSAGE);
char line[4096];
char *plf = 0;
while (fgets(line, sizeof(line), fFile)) {
if ((plf = strchr(line, '\n')))
*plf = 0;
TString log;
if (fPfx.Length() > 0) {
log.Form("%s: %s", fPfx.Data(), line);
} else if (fgPfx.Length() > 0) {
log.Form("%s: %s", fgPfx.Data(), line);
} else {
log = line;
}
m.Reset(kPROOF_MESSAGE);
m << log;
fSocket->Send(m);
}
}
return kTRUE;
}
void TProofServLogHandler::SetDefaultPrefix(const char *pfx)
{
fgPfx = pfx;
}
Int_t TProofServLogHandler::GetCmdRtn()
{
return fgCmdRtn;
}
TProofServLogHandlerGuard::TProofServLogHandlerGuard(const char *cmd, TSocket *s,
const char *pfx, Bool_t on)
{
fExecHandler = 0;
if (cmd && on) {
fExecHandler = new TProofServLogHandler(cmd, s, pfx);
if (fExecHandler->IsValid()) {
gSystem->AddFileHandler(fExecHandler);
} else {
Error("TProofServLogHandlerGuard","invalid handler");
}
} else {
if (on)
Error("TProofServLogHandlerGuard","undefined command");
}
}
TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
const char *pfx, Bool_t on)
{
fExecHandler = 0;
if (f && on) {
fExecHandler = new TProofServLogHandler(f, s, pfx);
if (fExecHandler->IsValid()) {
gSystem->AddFileHandler(fExecHandler);
} else {
Error("TProofServLogHandlerGuard","invalid handler");
}
} else {
if (on)
Error("TProofServLogHandlerGuard","undefined file");
}
}
TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
{
if (fExecHandler && fExecHandler->IsValid()) {
gSystem->RemoveFileHandler(fExecHandler);
SafeDelete(fExecHandler);
}
}
TShutdownTimer::TShutdownTimer(TProofServ *p, Int_t delay)
: TTimer(delay, kFALSE), fProofServ(p)
{
fTimeout = gEnv->GetValue("ProofServ.ShutdownTimeout", 20);
fTimeout = gEnv->GetValue("ProofServ.ShutdonwTimeout", fTimeout);
}
Bool_t TShutdownTimer::Notify()
{
if (gDebug > 0)
printf("TShutdownTimer::Notify: checking activity on the input socket\n");
TSocket *xs = 0;
if (fProofServ && (xs = fProofServ->GetSocket())) {
TTimeStamp now;
TTimeStamp ts = xs->GetLastUsage();
Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
(Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
if (dt > fTimeout * 60000) {
printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
" during the last %d mins: aborting\n", xs, fTimeout);
gSystem->Abort();
} else {
if (gDebug > 0)
printf("TShutdownTimer::Notify: input socket: %p: show activity"
" %ld secs ago\n", xs, dt / 60000);
}
}
Reset();
return kTRUE;
}
TReaperTimer::~TReaperTimer()
{
if (fChildren) {
fChildren->SetOwner(kTRUE);
delete fChildren;
fChildren = 0;
}
}
void TReaperTimer::AddPid(Int_t pid)
{
if (pid > 0) {
if (!fChildren)
fChildren = new TList;
TString spid;
spid.Form("%d", pid);
fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
TurnOn();
}
}
Bool_t TReaperTimer::Notify()
{
if (fChildren) {
TIter nxp(fChildren);
TParameter<Int_t> *p = 0;
while ((p = (TParameter<Int_t> *)nxp())) {
int status;
#ifndef WIN32
pid_t pid;
do {
pid = waitpid(p->GetVal(), &status, WNOHANG);
} while (pid < 0 && errno == EINTR);
#else
intptr_t pid;
pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
#endif
if (pid > 0 && pid == p->GetVal()) {
fChildren->Remove(p);
delete p;
}
}
}
if (!fChildren || fChildren->GetSize() <= 0) {
Stop();
} else {
Reset();
}
return kTRUE;
}
Bool_t TIdleTOTimer::Notify()
{
Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
if (fProofServ) {
Int_t uss_rc = -1;
if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
TString msg;
if (fProofServ->GetProtocol() < 29) {
msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
"// Please IGNORE any error message possibly displayed below\n//",
gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
} else {
msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
}
fProofServ->SendAsynMessage(msg.Data());
fProofServ->Terminate(0);
Reset();
Stop();
} else {
Warning("Notify", "fProofServ undefined!");
Start(-1, kTRUE);
}
return kTRUE;
}
ClassImp(TProofServ)
extern "C" {
TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
{ return new TProofServ(argc, argv, flog); }
}
TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
: TApplication("proofserv", argc, argv, 0, -1)
{
Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
if (xtest) {
Printf("proofserv: command line testing: OK");
exit(0);
}
TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
: "session.rootrc";
if (!gSystem->AccessPathName(rcfile, kReadPermission))
gEnv->ReadFile(rcfile, kEnvChange);
fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
if (mmx < kMaxLong && mmx > 0)
fgVirtMemMax = mmx * 1024;
}
if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
if (mmx < kMaxLong && mmx > 0)
fgVirtMemMax = mmx * 1024;
}
fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
if (mmx < kMaxLong && mmx > 0)
fgResMemMax = mmx * 1024;
}
fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
if (fgVirtMemMax > 0 || fgResMemMax > 0) {
if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
Warning("TProofServ", "requested memory fraction threshold to stop processing"
" (MemStop) out of range [0,1] - ignoring");
fgMemStop = 0.95;
}
if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
" (MemHWM) out of range [0,MemStop] - ignoring");
fgMemHWM = 0.80;
}
}
Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
(gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
while (gProofServDebug)
;
}
if (argc && *argc >= 4)
if (!strcmp(argv[3], "test"))
fService = "prooftest";
if (argc && *argc < 2) {
Error("TProofServ", "Must have at least 1 arguments (see proofd).");
exit(1);
}
gProofServ = this;
fSendLogToMaster = kFALSE;
gErrorAbortLevel = kSysError + 1;
SetErrorHandlerFile(stderr);
SetErrorHandler(ErrorHandler);
fNcmd = 0;
fGroupPriority = 100;
fInterrupt = kFALSE;
fProtocol = 0;
fOrdinal = gEnv->GetValue("ProofServ.Ordinal", "-1");
fGroupId = -1;
fGroupSize = 0;
fRealTime = 0.0;
fCpuTime = 0.0;
fProof = 0;
fPlayer = 0;
fSocket = 0;
fEnabledPackages = new TList;
fEnabledPackages->SetOwner();
fTotSessions = -1;
fActSessions = -1;
fEffSessions = -1.;
fGlobalPackageDirList = 0;
fLogFile = flog;
fLogFileDes = -1;
fArchivePath = "";
fPackageLock = 0;
fCacheLock = 0;
fQueryLock = 0;
fQMgr = 0;
fQMtx = new TMutex(kTRUE);
fWaitingQueries = new TList;
fIdle = kTRUE;
fQuerySeqNum = -1;
fQueuedMsg = new TList;
fRealTimeLog = kFALSE;
fShutdownTimer = 0;
fReaperTimer = 0;
fIdleTOTimer = 0;
fDataSetManager = 0;
fDataSetStgRepo = 0;
fInputHandler = 0;
fMaxQueries = -1;
fMaxBoxSize = -1;
fHWMBoxSize = -1;
fMergingSocket = 0;
fMergingMonitor = 0;
fMergedWorkers = 0;
ResetBit(TProofServ::kHighMemory);
fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
fCompressMsg = gEnv->GetValue("ProofServ.CompressMessage", 0);
gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
fLogLevel = gProofDebugLevel;
gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue("Proof.DebugMask",~0);
if (gProofDebugLevel > 0)
Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
fLogFileMaxSize = -1;
TString logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
if (!logmx.IsNull()) {
Long64_t xf = 1;
if (!logmx.IsDigit()) {
if (logmx.EndsWith("K")) {
xf = 1024;
logmx.Remove(TString::kTrailing, 'K');
} else if (logmx.EndsWith("M")) {
xf = 1024*1024;
logmx.Remove(TString::kTrailing, 'M');
} if (logmx.EndsWith("G")) {
xf = 1024*1024*1024;
logmx.Remove(TString::kTrailing, 'G');
}
}
if (logmx.IsDigit()) {
fLogFileMaxSize = logmx.Atoi() * xf;
if (fLogFileMaxSize > 0)
Info("TProofServ", "keeping the log file size within %lld bytes", fLogFileMaxSize);
} else {
logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
Warning("TProofServ", "bad formatted log file size limit ignored: '%s'", logmx.Data());
}
}
GetOptions(argc, argv);
fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
if (test) fPrefix = "Test";
if (fOrdinal != "-1")
fPrefix += fOrdinal;
TProofServLogHandler::SetDefaultPrefix(fPrefix);
TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
if (!(slog.IsNull())) {
if (slog.IsDigit()) {
fgLogToSysLog = slog.Atoi();
} else {
char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
(c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
if (dosyslog) {
slog.Remove(0,1);
if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
if (fgLogToSysLog <= 0)
Warning("TProofServ", "request for syslog logging ineffective!");
}
}
}
if (fgLogToSysLog > 0) {
fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
gSystem->Openlog(fgSysLogService, kLogPid | kLogCons, kLogLocal5);
}
Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
if (enableSchemaEvolution) {
TMessage::EnableSchemaEvolutionForAll();
} else {
Info("TProofServ", "automatic schema evolution in TMessage explicitly disabled");
}
}
Int_t TProofServ::CreateServer()
{
TString opensock = gSystem->Getenv("ROOTOPENSOCK");
if (opensock.Length() <= 0)
opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
Int_t sock = opensock.Atoi();
if (sock <= 0) {
Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
return -1;
}
fSocket = new TSocket(sock);
fSocket->SetCompressionSettings(fCompressMsg);
if (IsMaster()) {
if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
while (gProofServDebug)
;
}
} else {
if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
while (gProofServDebug)
;
}
}
if (gProofDebugLevel > 0)
Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
GetService(), GetConfDir(), (Int_t)fMasterServ);
if (Setup() != 0) {
LogToMaster();
SendLogFile();
Terminate(0);
return -1;
}
TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
pfx += GetOrdinal();
TProofServLogHandler::SetDefaultPrefix(pfx);
if (!fLogFile) {
RedirectOutput();
if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
LogToMaster();
SendLogFile(-98);
Terminate(0);
return -1;
}
} else {
if ((fLogFileDes = fileno(fLogFile)) < 0) {
LogToMaster();
SendLogFile(-98);
Terminate(0);
return -1;
}
}
if (IsMaster()) {
if (CatMotd() == -1) {
LogToMaster();
SendLogFile(-99);
Terminate(0);
return -1;
}
}
ProcessLine("#include <iostream>", kTRUE);
ProcessLine("#include <string>",kTRUE);
const char *logon;
logon = gEnv->GetValue("Proof.Load", (char *)0);
if (logon) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessLine(TString::Format(".L %s", logon), kTRUE);
delete [] mac;
}
logon = gEnv->GetValue("Proof.Logon", (char *)0);
if (logon && !NoLogOpt()) {
char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
if (mac)
ProcessFile(logon);
delete [] mac;
}
gInterpreter->SaveContext();
gInterpreter->SaveGlobalsContext();
gSystem->AddSignalHandler(new TProofServTerminationHandler(this));
gSystem->AddSignalHandler(new TProofServInterruptHandler(this));
fInputHandler = new TProofServInputHandler(this, sock);
gSystem->AddFileHandler(fInputHandler);
if (IsMaster()) {
TString master = "proof://__master__";
TInetAddress a = gSystem->GetSockName(sock);
if (a.IsValid()) {
master += ":";
master += a.GetPort();
}
TPluginManager *pm = gROOT->GetPluginManager();
if (!pm) {
Error("CreateServer", "no plugin manager found");
SendLogFile(-99);
Terminate(0);
return -1;
}
TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
if (!h) {
Error("CreateServer", "no plugin found for TProof with a"
" config file of '%s'", fConfFile.Data());
SendLogFile(-99);
Terminate(0);
return -1;
}
if (h->LoadPlugin() == -1) {
Error("CreateServer", "plugin for TProof could not be loaded");
SendLogFile(-99);
Terminate(0);
return -1;
}
fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
fConfFile.Data(),
GetConfDir(),
fLogLevel, 0));
if (!fProof || !fProof->IsValid()) {
Error("CreateServer", "plugin for TProof could not be executed");
SafeDelete(fProof);
SendLogFile(-99);
Terminate(0);
return -1;
}
fEndMaster = fProof->IsEndMaster();
SendLogFile();
}
if (!fShutdownTimer) {
fShutdownTimer = new TShutdownTimer(this, 300000);
fShutdownTimer->Start(-1, kFALSE);
}
if (fProtocol <= 17) {
TString msg;
msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
" This may generate compatibility problems between streamed objects.\n"
" The advise is to move to ROOT >= 5.21/02 .");
SendAsynMessage(msg.Data());
}
if (IsMaster() && !fIdleTOTimer) {
Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
if (idle_to > 0) {
fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
fIdleTOTimer->Start(-1, kTRUE);
if (gProofDebugLevel > 0)
Info("CreateServer", " idle timer started (%d secs)", idle_to);
} else if (gProofDebugLevel > 0) {
Info("CreateServer", " idle timer not started (no idle timeout requested)");
}
}
return 0;
}
TProofServ::~TProofServ()
{
SafeDelete(fWaitingQueries);
SafeDelete(fQMtx);
SafeDelete(fEnabledPackages);
SafeDelete(fSocket);
SafeDelete(fPackageLock);
SafeDelete(fCacheLock);
SafeDelete(fQueryLock);
SafeDelete(fGlobalPackageDirList);
SafeDelete(fDataSetManager);
SafeDelete(fDataSetStgRepo);
close(fLogFileDes);
}
Int_t TProofServ::CatMotd()
{
TString lastname;
FILE *motd;
Bool_t show = kFALSE;
TString motdname(GetConfDir());
if (gSystem->Getenv("PROOFNOPROOF")) {
motdname = gSystem->Getenv("PROOFNOPROOF");
} else {
motdname += "/etc/proof/noproof";
}
if ((motd = fopen(motdname, "r"))) {
Int_t c;
printf("\n");
while ((c = getc(motd)) != EOF)
putchar(c);
fclose(motd);
printf("\n");
return -1;
}
lastname = TString(GetWorkDir()) + "/.prooflast";
char *last = gSystem->ExpandPathName(lastname.Data());
Long64_t size;
Long_t id, flags, modtime, lasttime = 0;
if (gSystem->GetPathInfo(last, &id, &size, &flags, &lasttime) == 1)
lasttime = 0;
if (time(0) - lasttime > (time_t)86400)
show = kTRUE;
if (gSystem->Getenv("PROOFMOTD")) {
motdname = gSystem->Getenv("PROOFMOTD");
} else {
motdname = GetConfDir();
motdname += "/etc/proof/motd";
}
if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
if (modtime > lasttime || show) {
if ((motd = fopen(motdname, "r"))) {
Int_t c;
printf("\n");
while ((c = getc(motd)) != EOF)
putchar(c);
fclose(motd);
printf("\n");
}
}
}
if (lasttime)
gSystem->Unlink(last);
Int_t fd = creat(last, 0600);
if (fd >= 0) close(fd);
delete [] last;
return 0;
}
TObject *TProofServ::Get(const char *namecycle)
{
if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
Error("Get", "problems sending request");
return (TObject *)0;
}
TObject *idcur = 0;
Bool_t notdone = kTRUE;
while (notdone) {
TMessage *mess = 0;
if (fSocket->Recv(mess) < 0)
return 0;
Int_t what = mess->What();
if (what == kMESS_OBJECT) {
idcur = mess->ReadObject(mess->GetClass());
notdone = kFALSE;
} else {
Int_t xrc = HandleSocketInput(mess, kFALSE);
if (xrc == -1) {
Error("Get", "command %d cannot be executed while processing", what);
} else if (xrc == -2) {
Error("Get", "unknown command %d ! Protocol error?", what);
}
}
delete mess;
}
return idcur;
}
void TProofServ::RestartComputeTime()
{
fCompute.Stop();
if (fPlayer) {
TProofProgressStatus *status = fPlayer->GetProgressStatus();
if (status) status->SetLearnTime(fCompute.RealTime());
Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
fCompute.RealTime(), fPlayer->GetLearnEntries());
}
fCompute.Start(kFALSE);
}
TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
{
Long64_t bytesRead = 0;
if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
if (fCompute.Counter() > 0)
fCompute.Stop();
TMessage req(kPROOF_GETPACKET);
Double_t cputime = fCompute.CpuTime();
Double_t realtime = fCompute.RealTime();
if (fProtocol > 18) {
req << fLatency.RealTime();
TProofProgressStatus *status = 0;
if (fPlayer) {
fPlayer->UpdateProgressInfo();
status = fPlayer->GetProgressStatus();
} else {
Error("GetNextPacket", "no progress status object");
return 0;
}
if (status->GetEntries() > 0) {
PDB(kLoop, 2) status->Print(GetOrdinal());
status->IncProcTime(realtime);
status->IncCPUTime(cputime);
}
if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
req << status;
Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
req << cacheSize << learnent;
req << totalEntries;
if (fProtocol > 34) req << fSaveOutput.RealTime();
PDB(kLoop, 1) {
PDB(kLoop, 2) status->Print();
Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
}
status->ResetBit(TProofProgressStatus::kFileNotOpen);
status->ResetBit(TProofProgressStatus::kFileCorrupted);
status = 0;
} else {
req << fLatency.RealTime() << realtime << cputime
<< bytesRead << totalEntries;
if (fPlayer)
req << fPlayer->GetEventsProcessed();
}
fLatency.Start();
Int_t rc = fSocket->Send(req);
if (rc <= 0) {
Error("GetNextPacket","Send() failed, returned %d", rc);
return 0;
}
if (fPlayer) {
fSaveOutput.Start();
if (fPlayer->SavePartialResults(kFALSE) < 0)
Warning("GetNextPacket", "problems saving partial results");
fSaveOutput.Stop();
}
TDSetElement *e = 0;
Bool_t notdone = kTRUE;
while (notdone) {
TMessage *mess;
if ((rc = fSocket->Recv(mess)) <= 0) {
fLatency.Stop();
Error("GetNextPacket","Recv() failed, returned %d", rc);
return 0;
}
Int_t xrc = 0;
TString file, dir, obj;
Int_t what = mess->What();
switch (what) {
case kPROOF_GETPACKET:
fLatency.Stop();
(*mess) >> e;
if (e != 0) {
fCompute.Start();
PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
e->GetFileName(), e->GetDirectory(),
e->GetObjName(), e->GetFirst(),e->GetNum());
} else {
PDB(kLoop, 2) Info("GetNextPacket", "Done");
}
notdone = kFALSE;
break;
case kPROOF_STOPPROCESS:
fLatency.Stop();
PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
break;
default:
xrc = HandleSocketInput(mess, kFALSE);
if (xrc == -1) {
Error("GetNextPacket", "command %d cannot be executed while processing", what);
} else if (xrc == -2) {
Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
}
break;
}
delete mess;
}
return e;
}
void TProofServ::GetOptions(Int_t *argc, char **argv)
{
Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
Printf("proofserv: command line testing: OK");
exit(0);
}
if (!argc || (argc && *argc <= 1)) {
Fatal("GetOptions", "Must be started from proofd with arguments");
exit(1);
}
if (!strcmp(argv[1], "proofserv")) {
fMasterServ = kTRUE;
fEndMaster = kTRUE;
} else if (!strcmp(argv[1], "proofslave")) {
fMasterServ = kFALSE;
fEndMaster = kFALSE;
} else {
Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
exit(1);
}
fService = argv[1];
if (!(gSystem->Getenv("ROOTCONFDIR"))) {
Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
exit(1);
}
fConfDir = gSystem->Getenv("ROOTCONFDIR");
}
void TProofServ::HandleSocketInput()
{
TIdleTOTimerGuard itg(fIdleTOTimer);
Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
fgRecursive++;
TMessage *mess;
Int_t rc = 0;
TString exmsg;
TruncateLogFile();
try {
if (fSocket->Recv(mess) <= 0 || !mess) {
Error("HandleSocketInput", "retrieving message from input socket");
Terminate(0);
return;
}
Int_t what = mess->What();
PDB(kCollect, 1)
Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
fNcmd++;
if (fProof) fProof->SetActive();
Bool_t doit = kTRUE;
while (doit) {
rc = HandleSocketInput(mess, all);
if (rc < 0) {
TString emsg;
if (rc == -1) {
emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
} else if (rc == -3) {
emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
} else {
emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
}
SendAsynMessage(emsg.Data());
} else if (rc == 2) {
fQueuedMsg->Add(mess);
PDB(kGlobal, 1)
Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
what, fQueuedMsg->GetSize());
mess = 0;
}
doit = 0;
if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
PDB(kCollect, 1)
Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
what, fQueuedMsg->GetSize());
all = 1;
SafeDelete(mess);
mess = (TMessage *) fQueuedMsg->First();
if (mess) fQueuedMsg->Remove(mess);
doit = 1;
}
}
} catch (std::bad_alloc &) {
exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s %lld",
fgLastMsg.Data(), fgLastEntry);
} catch (std::exception &exc) {
exmsg.Form("caught standard exception '%s' %s %lld",
exc.what(), fgLastMsg.Data(), fgLastEntry);
} catch (int i) {
exmsg.Form("caught exception throwing %d %s %lld",
i, fgLastMsg.Data(), fgLastEntry);
} catch (const char *str) {
exmsg.Form("caught exception throwing '%s' %s %lld",
str, fgLastMsg.Data(), fgLastEntry);
} catch (...) {
exmsg.Form("caught exception <unknown> %s %lld",
fgLastMsg.Data(), fgLastEntry);
}
if (!exmsg.IsNull()) {
Error("HandleSocketInput", "%s", exmsg.Data());
SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
Terminate(0);
}
if (TestBit(TProofServ::kHighMemory)) {
exmsg.Form("high-memory footprint detected during Process(...) - terminating");
Error("HandleSocketInput", "%s", exmsg.Data());
SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
Terminate(0);
}
fgRecursive--;
if (fProof) {
Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
Int_t ngwrks = fProof->GetListOfActiveSlaves()->GetSize() + fProof->GetListOfInactiveSlaves()->GetSize();
if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
Terminate(0);
}
fProof->SetActive(kFALSE);
fProof->SetRunStatus(TProof::kRunning);
}
SafeDelete(mess);
}
Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
{
static TStopwatch timer;
char str[2048];
Bool_t aborted = kFALSE;
if (!mess) return -3;
Int_t what = mess->What();
PDB(kCollect, 1)
Info("HandleSocketInput", "processing message type %d from '%s'",
what, fSocket->GetTitle());
timer.Start();
Int_t rc = 0;
TString slb;
TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
switch (what) {
case kMESS_CINT:
if (all) {
mess->ReadString(str, sizeof(str));
TString fn;
if (TProof::GetFileInCmd(str, fn))
CopyFromCache(fn, 1);
if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
fProof->SendCommand(str);
} else {
PDB(kGlobal, 1)
Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
ProcessLine(str);
}
LogToMaster();
} else {
rc = -1;
}
SendLogFile();
if (pslb) slb = str;
break;
case kMESS_STRING:
if (all) {
mess->ReadString(str, sizeof(str));
} else {
rc = -1;
}
break;
case kMESS_OBJECT:
if (all) {
mess->ReadObject(mess->GetClass());
} else {
rc = -1;
}
break;
case kPROOF_GROUPVIEW:
if (all) {
mess->ReadString(str, sizeof(str));
sscanf(str, "%d %d", &fGroupId, &fGroupSize);
} else {
rc = -1;
}
break;
case kPROOF_LOGLEVEL:
{ UInt_t mask;
mess->ReadString(str, sizeof(str));
sscanf(str, "%d %u", &fLogLevel, &mask);
Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
gProofDebugLevel = fLogLevel;
gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
if (levelchanged)
Info("HandleSocketInput:kPROOF_LOGLEVEL", "debug level set to %d (mask: 0x%x)",
gProofDebugLevel, gProofDebugMask);
if (IsMaster())
fProof->SetLogLevel(fLogLevel, mask);
}
break;
case kPROOF_PING:
{ if (IsMaster())
fProof->Ping();
}
break;
case kPROOF_PRINT:
mess->ReadString(str, sizeof(str));
Print(str);
LogToMaster();
SendLogFile();
break;
case kPROOF_RESET:
if (all) {
mess->ReadString(str, sizeof(str));
Reset(str);
} else {
rc = -1;
}
break;
case kPROOF_STATUS:
Warning("HandleSocketInput:kPROOF_STATUS",
"kPROOF_STATUS message is obsolete");
if (fSocket->Send(fProof->GetParallel(), kPROOF_STATUS) < 0)
Warning("HandleSocketInput:kPROOF_STATUS", "problem sending of request");
break;
case kPROOF_GETSTATS:
SendStatistics();
break;
case kPROOF_GETPARALLEL:
SendParallel();
break;
case kPROOF_STOP:
if (all) {
if (IsMaster()) {
TString ord;
*mess >> ord;
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
if (fProof) fProof->TerminateWorker(ord);
} else {
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
Terminate(0);
}
} else {
rc = -1;
}
break;
case kPROOF_STOPPROCESS:
if (all) {
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
} else {
Long_t timeout = -1;
(*mess) >> aborted;
if (fProtocol > 9)
(*mess) >> timeout;
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_STOPPROCESS",
"recursive mode: enter %d, %ld", aborted, timeout);
if (fProof)
fProof->StopProcess(aborted, timeout);
else
if (fPlayer)
fPlayer->StopProcess(aborted, timeout);
}
break;
case kPROOF_PROCESS:
{
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
HandleProcess(mess, pslb);
}
break;
case kPROOF_SENDOUTPUT:
{
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
"worker was asked to send output to master");
Int_t sorc = 0;
if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
Error("HandleSocketInput:kPROOF_SENDOUTPUT", "problems sending output list");
sorc = 1;
}
fSocket->Send(kPROOF_SETIDLE);
SetIdle(kTRUE);
DeletePlayer();
SendLogFile(sorc);
}
break;
case kPROOF_QUERYLIST:
{
HandleQueryList(mess);
SendLogFile();
}
break;
case kPROOF_REMOVE:
{
HandleRemove(mess, pslb);
SendLogFile();
}
break;
case kPROOF_RETRIEVE:
{
HandleRetrieve(mess, pslb);
SendLogFile();
}
break;
case kPROOF_ARCHIVE:
{
HandleArchive(mess, pslb);
SendLogFile();
}
break;
case kPROOF_MAXQUERIES:
{ PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
TMessage m(kPROOF_MAXQUERIES);
m << fMaxQueries;
fSocket->Send(m);
SendLogFile();
}
break;
case kPROOF_CLEANUPSESSION:
if (all) {
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
TString stag;
(*mess) >> stag;
if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
Printf("Session %s cleaned up", stag.Data());
} else {
Printf("Could not cleanup session %s", stag.Data());
}
} else {
rc = -1;
}
SendLogFile();
break;
case kPROOF_GETENTRIES:
{ PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
Bool_t isTree;
TString filename;
TString dir;
TString objname("undef");
Long64_t entries = -1;
if (all) {
(*mess) >> isTree >> filename >> dir >> objname;
PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
"Report size of object %s (%s) in dir %s in file %s",
objname.Data(), isTree ? "T" : "O",
dir.Data(), filename.Data());
entries = TDSet::GetEntries(isTree, filename, dir, objname);
PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
"Found %lld %s", entries, isTree ? "entries" : "objects");
} else {
rc = -1;
}
TMessage answ(kPROOF_GETENTRIES);
answ << entries << objname;
SendLogFile();
fSocket->Send(answ);
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
}
break;
case kPROOF_CHECKFILE:
if (!all && fProtocol <= 19) {
rc = 2;
} else {
HandleCheckFile(mess, pslb);
}
break;
case kPROOF_SENDFILE:
if (!all && fProtocol <= 19) {
rc = 2;
} else {
mess->ReadString(str, sizeof(str));
Long_t size;
Int_t bin, fw = 1;
char name[1024];
if (fProtocol > 5) {
sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
} else {
sscanf(str, "%1023s %d %ld", name, &bin, &size);
}
TString fnam(name);
Bool_t copytocache = kTRUE;
if (fnam.BeginsWith("cache:")) {
fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
copytocache = kFALSE;
}
Int_t rfrc = 0;
if (size > 0) {
rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
} else {
if (!fnam.BeginsWith(fCacheDir.Data())) {
fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
}
}
if (rfrc == 0) {
if (copytocache && size > 0 &&
strncmp(fPackageDir, name, fPackageDir.Length()))
CopyToCache(name, 0);
if (IsMaster() && fw == 1) {
Int_t opt = TProof::kForward | TProof::kCp;
if (bin)
opt |= TProof::kBinary;
PDB(kGlobal, 1)
Info("HandleSocketInput","forwarding file: %s", fnam.Data());
if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
}
}
if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
} else {
SendLogFile(1);
}
}
break;
case kPROOF_LOGFILE:
{
Int_t start, end;
(*mess) >> start >> end;
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_LOGFILE",
"Logfile request - byte range: %d - %d", start, end);
LogToMaster();
SendLogFile(0, start, end);
}
break;
case kPROOF_PARALLEL:
if (all) {
if (IsMaster()) {
Int_t nodes;
Bool_t random = kFALSE;
(*mess) >> nodes;
if ((mess->BufferSize() > mess->Length()))
(*mess) >> random;
if (fProof) fProof->SetParallel(nodes, random);
rc = 1;
}
} else {
rc = -1;
}
SendLogFile();
break;
case kPROOF_CACHE:
if (!all && fProtocol <= 19) {
rc = 2;
} else {
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
Int_t hcrc = HandleCache(mess, pslb);
SendLogFile(hcrc);
}
break;
case kPROOF_WORKERLISTS:
{ Int_t wlrc = -1;
if (all) {
if (IsMaster())
wlrc = HandleWorkerLists(mess);
else
Warning("HandleSocketInput:kPROOF_WORKERLISTS",
"Action meaning-less on worker nodes: protocol error?");
} else {
rc = -1;
}
SendLogFile(wlrc);
}
break;
case kPROOF_GETSLAVEINFO:
if (all) {
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
if (IsMaster()) {
Bool_t ok = kTRUE;
if (fProof->UseDynamicStartup()) {
ok = kFALSE;
Int_t pc = 0;
TList* workerList = new TList();
EQueryAction retVal = GetWorkers(workerList, pc);
if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
if (Int_t ret = fProof->AddWorkers(workerList) < 0) {
Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
"adding a list of worker nodes returned: %d", ret);
}
} else {
Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
"getting list of worker nodes returned: %d", retVal);
}
ok = kTRUE;
}
if (ok) {
TList *info = fProof->GetListOfSlaveInfos();
TMessage answ(kPROOF_GETSLAVEINFO);
answ << info;
fSocket->Send(answ);
if (IsMaster() && fProof->UseDynamicStartup()) fProof->RemoveWorkers(0);
}
} else {
TMessage answ(kPROOF_GETSLAVEINFO);
TList *info = new TList;
TSlaveInfo *wi = new TSlaveInfo(GetOrdinal(), TUrl(gSystem->HostName()).GetHostFQDN(), 0, "", GetDataDir());
SysInfo_t si;
gSystem->GetSysInfo(&si);
wi->SetSysInfo(si);
info->Add(wi);
answ << (TList *)info;
fSocket->Send(answ);
info->SetOwner(kTRUE);
delete info;
}
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
} else {
TMessage answ(kPROOF_GETSLAVEINFO);
answ << (TList *)0;
fSocket->Send(answ);
rc = -1;
}
break;
case kPROOF_GETTREEHEADER:
if (all) {
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
if (p) {
p->HandleGetTreeHeader(mess);
delete p;
} else {
Error("HandleSocketInput:kPROOF_GETTREEHEADER", "could not create TProofPlayer instance!");
}
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
} else {
TMessage answ(kPROOF_GETTREEHEADER);
answ << TString("Failed") << (TObject *)0;
fSocket->Send(answ);
rc = -1;
}
break;
case kPROOF_GETOUTPUTLIST:
{ PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
TList* outputList = 0;
if (IsMaster()) {
outputList = fProof->GetOutputList();
if (!outputList)
outputList = new TList();
} else {
outputList = new TList();
if (fProof->GetPlayer()) {
TList *olist = fProof->GetPlayer()->GetOutputList();
TIter next(olist);
TObject *o;
while ( (o = next()) ) {
outputList->Add(new TNamed(o->GetName(), ""));
}
}
}
outputList->SetOwner();
TMessage answ(kPROOF_GETOUTPUTLIST);
answ << outputList;
fSocket->Send(answ);
delete outputList;
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
}
break;
case kPROOF_VALIDATE_DSET:
if (all) {
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
TDSet* dset = 0;
(*mess) >> dset;
if (IsMaster()) fProof->ValidateDSet(dset);
else dset->Validate();
TMessage answ(kPROOF_VALIDATE_DSET);
answ << dset;
fSocket->Send(answ);
delete dset;
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
} else {
rc = -1;
}
SendLogFile();
break;
case kPROOF_DATA_READY:
if (all) {
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
TMessage answ(kPROOF_DATA_READY);
if (IsMaster()) {
Long64_t totalbytes = 0, bytesready = 0;
Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
answ << dataready << totalbytes << bytesready;
} else {
Error("HandleSocketInput:kPROOF_DATA_READY",
"This message should not be sent to slaves");
answ << kFALSE << Long64_t(0) << Long64_t(0);
}
fSocket->Send(answ);
PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
} else {
TMessage answ(kPROOF_DATA_READY);
answ << kFALSE << Long64_t(0) << Long64_t(0);
fSocket->Send(answ);
rc = -1;
}
SendLogFile();
break;
case kPROOF_DATASETS:
{ Int_t dsrc = -1;
if (fProtocol > 16) {
dsrc = HandleDataSets(mess, pslb);
} else {
Error("HandleSocketInput", "old client: no or incompatible dataset support");
}
SendLogFile(dsrc);
}
break;
case kPROOF_SUBMERGER:
{ HandleSubmerger(mess);
}
break;
case kPROOF_LIB_INC_PATH:
if (all) {
HandleLibIncPath(mess);
} else {
rc = -1;
}
SendLogFile();
break;
case kPROOF_REALTIMELOG:
{ Bool_t on;
(*mess) >> on;
PDB(kGlobal, 1)
Info("HandleSocketInput:kPROOF_REALTIMELOG",
"setting real-time logging %s", (on ? "ON" : "OFF"));
fRealTimeLog = on;
if (IsMaster())
fProof->SetRealTimeLog(on);
}
break;
case kPROOF_FORK:
if (all) {
HandleFork(mess);
LogToMaster();
} else {
rc = -1;
}
SendLogFile();
break;
case kPROOF_STARTPROCESS:
if (all) {
if (WaitingQueries() == 0) {
Error("HandleSocketInput", "no queries enqueued");
break;
}
TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
Int_t pc = 0;
EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
if (retVal == TProofServ::kQueryOK) {
Int_t ret = 0;
if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
} else {
ProcessNext(pslb);
SetIdle(kTRUE);
TMessage m(kPROOF_SETIDLE);
Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
m << waiting;
fSocket->Send(m);
}
} else {
if (retVal == TProofServ::kQueryStop) {
Error("HandleSocketInput", "error getting list of worker nodes");
} else if (retVal != TProofServ::kQueryEnqueued) {
Warning("HandleSocketInput", "query was re-queued!");
} else {
Error("HandleSocketInput", "unexpected answer: %d", retVal);
break;
}
}
}
break;
case kPROOF_GOASYNC:
{
if (!IsIdle() && fPlayer) {
TProofQueryResult *pq = (TProofQueryResult *) fPlayer->GetCurrentQuery();
TMessage m(kPROOF_QUERYSUBMITTED);
m << pq->GetSeqNum() << kFALSE;
fSocket->Send(m);
} else {
SendAsynMessage("Processing request to go asynchronous:"
" idle or undefined player - ignoring");
}
}
break;
case kPROOF_ECHO:
{
TObject *obj = mess->ReadObject(0x0);
if (IsMaster()) {
if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
fProof->Echo(obj);
}
}
TMessage rmsg(kPROOF_MESSAGE);
TString smsg;
if (obj->InheritsFrom(TObjString::Class())) {
smsg.Form("Echo response from %s:%s: %s",
gSystem->HostName(), GetOrdinal(),
((TObjString *)obj)->String().Data());
}
else {
TString tmpfn = "echo-out-";
FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
Error("HandleSocketInput", "Can't redirect output");
if (tf) {
fclose(tf);
gSystem->Unlink(tmpfn);
}
rc = -1;
delete obj;
break;
}
obj->Print();
gSystem->RedirectOutput(0x0);
fclose(tf);
smsg.Form("*** Echo response from %s:%s ***\n",
gSystem->HostName(), GetOrdinal());
TMacro *fr = new TMacro();
fr->ReadFile(tmpfn);
TIter nextLine(fr->GetListOfLines());
TObjString *line;
while (( line = (TObjString *)nextLine() )) {
smsg.Append( line->String() );
}
delete fr;
gSystem->Unlink(tmpfn);
}
rmsg << smsg;
GetSocket()->Send(rmsg);
delete obj;
}
break;
default:
Error("HandleSocketInput", "unknown command %d", what);
rc = -2;
break;
}
fRealTime += (Float_t)timer.RealTime();
fCpuTime += (Float_t)timer.CpuTime();
if (!(slb.IsNull()) || fgLogToSysLog > 1) {
TString s;
s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
what, timer.RealTime(), timer.CpuTime(), slb.Data());
gSystem->Syslog(kLogNotice, s.Data());
}
return rc;
}
Bool_t TProofServ::AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
{
TMessage *mess = new TMessage();
Int_t mergedWorkers = 0;
PDB(kSubmerger, 1) Info("AcceptResults", "enter");
Bool_t result = kTRUE;
fMergingMonitor = new TMonitor();
fMergingMonitor->Add(fMergingSocket);
Int_t numworkers = 0;
while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
TSocket *s = fMergingMonitor->Select();
if (!s) {
Info("AcceptResults", "interrupt!");
result = kFALSE;
break;
}
if (s == fMergingSocket) {
TSocket *sw = fMergingSocket->Accept();
if (sw && sw != (TSocket *)(-1)) {
fMergingMonitor->Add(sw);
PDB(kSubmerger, 2)
Info("AcceptResults", "connection from a worker accepted on merger %s ",
fOrdinal.Data());
if (++numworkers >= connections)
fMergingMonitor->Remove(fMergingSocket);
} else {
PDB(kSubmerger, 1)
Info("AcceptResults", "spurious signal found of merging socket");
}
} else {
if (s->Recv(mess) < 0) {
Error("AcceptResults", "problems receiving message");
continue;
}
PDB(kSubmerger, 2)
Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
if (!mess) {
Error("AcceptResults", "message received: %p ", mess);
continue;
}
Int_t type = 0;
while ((mess->BufferSize() > mess->Length())) {
(*mess) >> type;
PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
if (type == 2) {
mergedWorkers++;
PDB(kSubmerger, 2)
Info("AcceptResults",
"a new worker has been mergerd. Total merged workers: %d",
mergedWorkers);
}
TObject *o = mess->ReadObject(TObject::Class());
if (mergerPlayer->AddOutputObject(o) == 1) {
PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
SafeDelete(o);
} else
PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
}
}
}
fMergingMonitor->DeActivateAll();
TList* sockets = fMergingMonitor->GetListOfDeActives();
Int_t size = sockets->GetSize();
for (Int_t i =0; i< size; ++i){
((TSocket*)(sockets->At(i)))->Close();
PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
delete ((TSocket*)(sockets->At(i)));
}
fMergingMonitor->RemoveAll();
SafeDelete(fMergingMonitor);
PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
return result;
}
void TProofServ::HandleUrgentData()
{
char oob_byte;
Int_t n, nch, wasted = 0;
const Int_t kBufSize = 1024;
char waste[kBufSize];
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
PDB(kGlobal, 5)
Info("HandleUrgentData", "handling oob...");
while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
if (n == -2) {
fSocket->GetOption(kBytesToRead, nch);
if (nch == 0) {
gSystem->Sleep(1000);
continue;
}
if (nch > kBufSize) nch = kBufSize;
n = fSocket->RecvRaw(waste, nch);
if (n <= 0) {
Error("HandleUrgentData", "error receiving waste");
break;
}
wasted = 1;
} else {
Error("HandleUrgentData", "error receiving OOB");
return;
}
}
PDB(kGlobal, 5)
Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
if (fProof) fProof->SetActive();
switch (oob_byte) {
case TProof::kHardInterrupt:
Info("HandleUrgentData", "*** Hard Interrupt");
if (IsMaster())
fProof->Interrupt(TProof::kHardInterrupt);
while (1) {
Int_t atmark;
fSocket->GetOption(kAtMark, atmark);
if (atmark) {
n = fSocket->SendRaw(&oob_byte, 1, kOob);
if (n <= 0)
Error("HandleUrgentData", "error sending OOB");
break;
}
fSocket->GetOption(kBytesToRead, nch);
if (nch == 0) {
gSystem->Sleep(1000);
continue;
}
if (nch > kBufSize) nch = kBufSize;
n = fSocket->RecvRaw(waste, nch);
if (n <= 0) {
Error("HandleUrgentData", "error receiving waste (2)");
break;
}
}
SendLogFile();
break;
case TProof::kSoftInterrupt:
Info("HandleUrgentData", "Soft Interrupt");
if (IsMaster())
fProof->Interrupt(TProof::kSoftInterrupt);
if (wasted) {
Error("HandleUrgentData", "soft interrupt flushed stream");
break;
}
Interrupt();
SendLogFile();
break;
case TProof::kShutdownInterrupt:
Info("HandleUrgentData", "Shutdown Interrupt");
if (IsMaster())
fProof->Interrupt(TProof::kShutdownInterrupt);
Terminate(0);
break;
default:
Error("HandleUrgentData", "unexpected OOB byte");
break;
}
if (fProof) fProof->SetActive(kFALSE);
}
void TProofServ::HandleSigPipe()
{
TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
if (IsMaster()) {
if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
Info("HandleSigPipe", "keepAlive probe failed");
fProof->SetActive();
fProof->Interrupt(TProof::kShutdownInterrupt);
fProof->SetActive(kFALSE);
Terminate(0);
}
} else {
Info("HandleSigPipe", "keepAlive probe failed");
Terminate(0);
}
}
Bool_t TProofServ::IsParallel() const
{
if (IsMaster() && fProof)
return fProof->IsParallel() || fProof->UseDynamicStartup() ;
return kFALSE;
}
void TProofServ::Print(Option_t *option) const
{
if (IsMaster() && fProof)
fProof->Print(option);
else
Printf("This is worker %s", gSystem->HostName());
}
void TProofServ::RedirectOutput(const char *dir, const char *mode)
{
char logfile[512];
TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
if (IsMaster()) {
snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
} else {
snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
}
if ((freopen(logfile, mode, stdout)) == 0)
SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
if ((dup2(fileno(stdout), fileno(stderr))) < 0)
SysError("RedirectOutput", "could not redirect stderr");
if ((fLogFile = fopen(logfile, "r")) == 0)
SysError("RedirectOutput", "could not open logfile '%s'", logfile);
if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
Warning("RedirectOutput", "no way to tell master (or client) where"
" to upload packages");
}
}
void TProofServ::Reset(const char *dir)
{
TString dd(dir);
if (!dd.BeginsWith("proofserv")) {
Int_t ic = dd.Index(":");
if (ic != kNPOS)
dd.Replace(0, ic, "proofserv");
}
gDirectory->cd(dd.Data());
gROOT->Reset();
if (gDirectory != gROOT) {
gDirectory->Delete();
}
if (IsMaster()) fProof->SendCurrentState();
}
Int_t TProofServ::ReceiveFile(const char *file, Bool_t bin, Long64_t size)
{
if (size <= 0) return 0;
Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
if (fd < 0) {
SysError("ReceiveFile", "error opening file %s", file);
return -1;
}
const Int_t kMAXBUF = 16384;
char buf[kMAXBUF], cpy[kMAXBUF];
Int_t left, r;
Long64_t filesize = 0;
while (filesize < size) {
left = Int_t(size - filesize);
if (left > kMAXBUF)
left = kMAXBUF;
r = fSocket->RecvRaw(&buf, left);
if (r > 0) {
char *p = buf;
filesize += r;
while (r) {
Int_t w;
if (!bin) {
Int_t k = 0, i = 0, j = 0;
char *q;
while (i < r) {
if (p[i] == '\r') {
i++;
k++;
}
cpy[j++] = buf[i++];
}
q = cpy;
r -= k;
w = write(fd, q, r);
} else {
w = write(fd, p, r);
}
if (w < 0) {
SysError("ReceiveFile", "error writing to file %s", file);
close(fd);
return -1;
}
r -= w;
p += w;
}
} else if (r < 0) {
Error("ReceiveFile", "error during receiving file %s", file);
close(fd);
return -1;
}
}
close(fd);
if (chmod(file, 0644) != 0)
Warning("ReceiveFile", "error setting mode 0644 on file %s", file);
return 0;
}
void TProofServ::Run(Bool_t retrn)
{
if (CreateServer() == 0) {
TApplication::Run(retrn);
}
}
void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
{
fflush(stdout);
if (!IsMaster()) {
if (!fSendLogToMaster) {
FlushLogFile();
} else {
LogToMaster(kFALSE);
}
}
off_t ltot=0, lnow=0;
Int_t left = -1;
Bool_t adhoc = kFALSE;
if (fLogFileDes > -1) {
ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
if (ltot >= 0 && lnow >= 0) {
if (start > -1) {
lseek(fLogFileDes, (off_t) start, SEEK_SET);
if (end <= start || end > ltot)
end = ltot;
left = (Int_t)(end - start);
if (end < ltot)
left++;
adhoc = kTRUE;
} else {
left = (Int_t)(ltot - lnow);
}
}
}
if (left > 0) {
if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
SysError("SendLogFile", "error sending kPROOF_LOGFILE");
return;
}
const Int_t kMAXBUF = 32768;
char buf[kMAXBUF];
Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
Int_t len;
do {
while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
TSystem::GetErrno() == EINTR)
TSystem::ResetErrno();
if (len < 0) {
SysError("SendLogFile", "error reading log file");
break;
}
if (end == ltot && len == wanted)
buf[len-1] = '\n';
if (fSocket->SendRaw(buf, len) < 0) {
SysError("SendLogFile", "error sending log file");
break;
}
left -= len;
wanted = (left > kMAXBUF) ? kMAXBUF : left;
} while (len > 0 && left > 0);
}
if (adhoc && lnow >=0 )
lseek(fLogFileDes, lnow, SEEK_SET);
TMessage mess(kPROOF_LOGDONE);
if (IsMaster())
mess << status << (fProof ? fProof->GetParallel() : 0);
else
mess << status << (Int_t) 1;
if (fSocket->Send(mess) < 0) {
SysError("SendLogFile", "error sending kPROOF_LOGDONE");
return;
}
PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
}
void TProofServ::SendStatistics()
{
Long64_t bytesread = TFile::GetFileBytesRead();
Float_t cputime = fCpuTime, realtime = fRealTime;
if (IsMaster()) {
bytesread = fProof->GetBytesRead();
cputime = fProof->GetCpuTime();
}
TMessage mess(kPROOF_GETSTATS);
TString workdir = gSystem->WorkingDirectory();
mess << bytesread << realtime << cputime << workdir;
if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
mess << TString(gProofServ->GetImage());
fSocket->Send(mess);
}
void TProofServ::SendParallel(Bool_t async)
{
Int_t nparallel = 0;
if (IsMaster()) {
PDB(kGlobal, 2)
Info("SendParallel", "Will invoke AskParallel()");
fProof->AskParallel();
PDB(kGlobal, 2)
Info("SendParallel", "Will invoke GetParallel()");
nparallel = fProof->GetParallel();
} else {
nparallel = 1;
}
TMessage mess(kPROOF_GETPARALLEL);
mess << nparallel << async;
fSocket->Send(mess);
}
Int_t TProofServ::UnloadPackage(const char *package)
{
TPair *pack = (TPair *) fEnabledPackages->FindObject(package);
if (pack) {
TString aclicincpath = gSystem->GetIncludePath();
TString cintincpath = gInterpreter->GetIncludePath();
aclicincpath.Remove(aclicincpath.Length() - cintincpath.Length() - 1);
aclicincpath.ReplaceAll(TString(" -I") + package, "");
gSystem->SetIncludePath(aclicincpath);
delete fEnabledPackages->Remove(pack);
PDB(kPackage, 1)
Info("UnloadPackage",
"package %s successfully unloaded", package);
}
if (!gSystem->AccessPathName(package))
if (gSystem->Unlink(package) != 0)
Warning("UnloadPackage", "unable to remove symlink to %s", package);
return 0;
}
Int_t TProofServ::UnloadPackages()
{
TIter nextpackage(fEnabledPackages);
while (TPair *pck = dynamic_cast<TPair *>(nextpackage()))
if (UnloadPackage(pck->GetName()) != 0)
return -1;
PDB(kPackage, 1)
Info("UnloadPackages",
"packages successfully unloaded");
return 0;
}
Int_t TProofServ::Setup()
{
char str[512];
if (IsMaster()) {
snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
} else {
snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
}
if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
Error("Setup", "failed to send proof server startup message");
return -1;
}
Int_t what;
if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
Error("Setup", "failed to receive remote proof protocol");
return -1;
}
if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
Error("Setup", "failed to send local proof protocol");
return -1;
}
if (fProtocol < 5) {
TString wconf;
if (OldAuthSetup(wconf) != 0) {
Error("Setup", "OldAuthSetup: failed to setup authentication");
return -1;
}
if (IsMaster()) {
fConfFile = wconf;
fWorkDir.Form("~/%s", kPROOF_WorkDir);
} else {
if (fProtocol < 4) {
fWorkDir.Form("~/%s", kPROOF_WorkDir);
} else {
fWorkDir = wconf;
if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
}
}
} else {
TMessage *mess;
if ((fSocket->Recv(mess) <= 0) || !mess) {
Error("Setup", "failed to receive ordinal and config info");
return -1;
}
if (IsMaster()) {
(*mess) >> fUser >> fOrdinal >> fConfFile;
fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
} else {
(*mess) >> fUser >> fOrdinal >> fWorkDir;
if (fWorkDir.IsNull())
fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
}
if (fOrdinal != "-1")
fPrefix += fOrdinal;
TProofServLogHandler::SetDefaultPrefix(fPrefix);
delete mess;
}
if (IsMaster()) {
TString conffile = fConfFile;
conffile.Remove(0, 1 + conffile.Index(":"));
TProofResourcesStatic resources(fConfDir, conffile);
if (resources.IsValid()) {
if (resources.GetMaster()) {
TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
if (tmpWorkDir != "")
fWorkDir = tmpWorkDir;
}
} else {
Info("Setup", "invalid config file %s (missing or unreadable",
resources.GetFileName().Data());
}
}
gSystem->Setenv("HOME", gSystem->HomeDirectory());
if (fWorkDir.BeginsWith("/") &&
!fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
if (!fWorkDir.EndsWith("/"))
fWorkDir += "/";
UserGroup_t *u = gSystem->GetUserInfo();
if (u) {
fWorkDir += u->fUser;
delete u;
}
}
char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
fWorkDir = workdir;
delete [] workdir;
if (gProofDebugLevel > 0)
Info("Setup", "working directory set to %s", fWorkDir.Data());
TString host = gSystem->HostName();
if (host.Index(".") != kNPOS)
host.Remove(host.Index("."));
fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
(Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
fTopSessionTag = fSessionTag;
fSessionDir = fWorkDir;
if (IsMaster())
fSessionDir += "/master-";
else
fSessionDir += "/slave-";
fSessionDir += fSessionTag;
if (SetupCommon() != 0) {
Error("Setup", "common setup failed");
return -1;
}
fSocket->SetOption(kProcessGroup, gSystem->GetPid());
fSocket->SetOption(kNoDelay, 1);
fSocket->SetOption(kKeepAlive, 1);
return 0;
}
Int_t TProofServ::SetupCommon()
{
gSystem->Umask(022);
#ifdef R__UNIX
TString path(gSystem->Getenv("PATH"));
TString bindir;
# ifdef ROOTBINDIR
bindir = ROOTBINDIR;
# else
bindir = gSystem->Getenv("ROOTSYS");
if (!bindir.IsNull()) bindir += "/bin";
# endif
TString paths = gEnv->GetValue("ProofServ.BinPaths", "");
if (paths.Length() > 0) {
Int_t icomp = 0;
if (paths.Contains("^<compiler>"))
icomp = 1;
else if (paths.Contains("<compiler>"))
icomp = -1;
if (icomp != 0) {
# ifdef COMPILER
TString compiler = COMPILER;
if (compiler.Index("is ") != kNPOS)
compiler.Remove(0, compiler.Index("is ") + 3);
compiler = gSystem->DirName(compiler);
if (icomp == 1) {
if (!bindir.IsNull()) bindir += ":";
bindir += compiler;
} else if (icomp == -1) {
if (!path.IsNull()) path += ":";
path += compiler;
}
#endif
}
Int_t isysb = 0;
if (paths.Contains("^<sysbin>"))
isysb = 1;
else if (paths.Contains("<sysbin>"))
isysb = -1;
if (isysb != 0) {
if (isysb == 1) {
if (!bindir.IsNull()) bindir += ":";
bindir += "/bin:/usr/bin:/usr/local/bin";
} else if (isysb == -1) {
if (!path.IsNull()) path += ":";
path += "/bin:/usr/bin:/usr/local/bin";
}
}
}
if (!bindir.IsNull()) bindir += ":";
path.Insert(0, bindir);
gSystem->Setenv("PATH", path);
#endif
if (gSystem->AccessPathName(fWorkDir)) {
gSystem->mkdir(fWorkDir, kTRUE);
if (!gSystem->ChangeDirectory(fWorkDir)) {
Error("SetupCommon", "can not change to PROOF directory %s",
fWorkDir.Data());
return -1;
}
} else {
if (!gSystem->ChangeDirectory(fWorkDir)) {
gSystem->Unlink(fWorkDir);
gSystem->mkdir(fWorkDir, kTRUE);
if (!gSystem->ChangeDirectory(fWorkDir)) {
Error("SetupCommon", "can not change to PROOF directory %s",
fWorkDir.Data());
return -1;
}
}
}
fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
TString::Format("%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
ResolveKeywords(fCacheDir);
if (gSystem->AccessPathName(fCacheDir))
gSystem->mkdir(fCacheDir, kTRUE);
if (gProofDebugLevel > 0)
Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
fCacheLock =
new TProofLockPath(TString::Format("%s/%s%s",
gSystem->TempDirectory(), kPROOF_CacheLockFile,
TString(fCacheDir).ReplaceAll("/","%").Data()));
fPackageDir = gEnv->GetValue("ProofServ.PackageDir",
TString::Format("%s/%s", fWorkDir.Data(), kPROOF_PackDir));
ResolveKeywords(fPackageDir);
if (gSystem->AccessPathName(fPackageDir))
gSystem->mkdir(fPackageDir, kTRUE);
if (gProofDebugLevel > 0)
Info("SetupCommon", "package directory set to %s", fPackageDir.Data());
fPackageLock =
new TProofLockPath(TString::Format("%s/%s%s",
gSystem->TempDirectory(), kPROOF_PackageLockFile,
TString(fPackageDir).ReplaceAll("/","%").Data()));
fDataDir = gEnv->GetValue("ProofServ.DataDir","");
Ssiz_t isep = kNPOS;
if (fDataDir.IsNull()) {
fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
} else if ((isep = fDataDir.Last(' ')) != kNPOS) {
fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
fDataDir.Remove(isep);
}
ResolveKeywords(fDataDir);
if (gSystem->AccessPathName(fDataDir))
if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
Warning("SetupCommon", "problems creating path '%s' (errno: %d)",
fDataDir.Data(), TSystem::GetErrno());
}
if (gProofDebugLevel > 0)
Info("SetupCommon", "data directory set to %s", fDataDir.Data());
TString dataDirOpts = gEnv->GetValue("ProofServ.DataDirOpts","");
if (!dataDirOpts.IsNull()) {
Bool_t doit = kTRUE;
if ((IsMaster() && !dataDirOpts.Contains("M")) ||
(!IsMaster() && !dataDirOpts.Contains("W"))) doit = kFALSE;
if (doit) {
UInt_t m = 0755;
if (dataDirOpts.Contains("g")) m = 0775;
if (dataDirOpts.Contains("a") || dataDirOpts.Contains("o")) m = 0777;
if (gProofDebugLevel > 0)
Info("SetupCommon", "requested mode for data directories is '%o'", m);
FileStat_t st;
TString p, subp;
Int_t from = 0;
if (fDataDir.BeginsWith("/")) p = "/";
while (fDataDir.Tokenize(subp, from, "/")) {
if (subp.IsNull()) continue;
p += subp;
if (gSystem->GetPathInfo(p, st) == 0) {
if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
if (gSystem->Chmod(p.Data(), m) != 0) {
Warning("SetupCommon", "problems setting mode '%o' on path '%s' (errno: %d)",
m, p.Data(), TSystem::GetErrno());
break;
}
}
p += "/";
} else {
Warning("SetupCommon", "problems stat-ing path '%s' (errno: %d; datadir: %s)",
p.Data(), TSystem::GetErrno(), fDataDir.Data());
break;
}
}
}
}
TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
if (globpack.Length() > 0) {
Int_t ng = 0;
Int_t from = 0;
TString ldir;
while (globpack.Tokenize(ldir, from, ":")) {
if (gSystem->AccessPathName(ldir, kReadPermission)) {
Warning("SetupCommon", "directory for global packages %s does not"
" exist or is not readable", ldir.Data());
} else {
TString key;
key.Form("G%d", ng++);
if (!fGlobalPackageDirList) {
fGlobalPackageDirList = new THashList();
fGlobalPackageDirList->SetOwner();
}
ResolveKeywords(ldir);
fGlobalPackageDirList->Add(new TNamed(key,ldir));
Info("SetupCommon", "directory for global packages %s added to the list",
ldir.Data());
FlushLogFile();
}
}
}
if (fSessionDir != gSystem->WorkingDirectory()) {
ResolveKeywords(fSessionDir);
if (gSystem->AccessPathName(fSessionDir))
gSystem->mkdir(fSessionDir, kTRUE);
if (!gSystem->ChangeDirectory(fSessionDir)) {
Error("SetupCommon", "can not change to working directory '%s'",
fSessionDir.Data());
return -1;
}
}
gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
if (gProofDebugLevel > 0)
Info("SetupCommon", "session dir is '%s'", fSessionDir.Data());
if (IsMaster()) {
fQueryDir = fWorkDir;
fQueryDir += TString("/") + kPROOF_QueryDir;
ResolveKeywords(fQueryDir);
if (gSystem->AccessPathName(fQueryDir))
gSystem->mkdir(fQueryDir, kTRUE);
fQueryDir += TString("/session-") + fTopSessionTag;
if (gSystem->AccessPathName(fQueryDir))
gSystem->mkdir(fQueryDir, kTRUE);
if (gProofDebugLevel > 0)
Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
gSystem->TempDirectory(),
kPROOF_QueryLockFile, fSessionTag.Data(),
TString(fQueryDir).ReplaceAll("/","%").Data()));
fQueryLock->Lock();
fQMgr = new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
fQueryLock, 0);
}
fImage = gEnv->GetValue("ProofServ.Image", "");
if (IsMaster()) {
TMessage m(kPROOF_SESSIONTAG);
m << fTopSessionTag << fGroup << fUser;
fSocket->Send(m);
fGroupPriority = GetPriority();
TPluginHandler *h = 0;
TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
if (!dsms.IsNull()) {
TString dsm;
Int_t from = 0;
while (dsms.Tokenize(dsm, from, ",")) {
if (fDataSetManager && !fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("SetupCommon", "a valid dataset manager already initialized");
Warning("SetupCommon", "support for multiple managers not yet available");
break;
}
if (gROOT->GetPluginManager()) {
h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
if (h && h->LoadPlugin() != -1) {
fDataSetManager =
reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
fUser.Data(), dsm.Data()));
}
}
}
if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("SetupCommon", "dataset manager plug-in initialization failed");
SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
SafeDelete(fDataSetManager);
}
} else {
TString opts("Av:");
TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
if (dsetdir.IsNull()) {
dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
if (gSystem->AccessPathName(fDataSetDir))
gSystem->MakeDirectory(fDataSetDir);
opts += "Sb:";
}
if (!h) {
h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
if (h && h->LoadPlugin() == -1) h = 0;
}
if (h) {
TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
fGroup.Data(), fUser.Data(), oo.Data()));
}
if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("SetupCommon", "default dataset manager plug-in initialization failed");
SafeDelete(fDataSetManager);
}
}
TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
if (!dsReqCfg.IsNull()) {
TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
if (reReqDir.Match(dsReqCfg) == 5) {
TString dsDirFmt;
dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_",
dsDirFmt);
if (fDataSetStgRepo &&
fDataSetStgRepo->TestBit(TObject::kInvalidObject)) {
Warning("SetupCommon",
"failed init of dataset staging requests repository");
SafeDelete(fDataSetStgRepo);
}
} else {
Warning("SetupCommon",
"specify, with [dir:]<path>, a valid path for staging requests");
}
} else if (gProofDebugLevel > 0) {
Warning("SetupCommon", "no repository for staging requests available");
}
}
TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
if (quotas.IsNull())
quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
if (quotas.IsNull())
quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
if (!quotas.IsNull()) {
TString tok;
Ssiz_t from = 0;
while (quotas.Tokenize(tok, from, " ")) {
if (tok.BeginsWith("maxquerykept=")) {
tok.ReplaceAll("maxquerykept=","");
if (tok.IsDigit())
fMaxQueries = tok.Atoi();
else
Info("SetupCommon",
"parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
}
const char *ksz[2] = {"hwmsz=", "maxsz="};
for (Int_t j = 0; j < 2; j++) {
if (tok.BeginsWith(ksz[j])) {
tok.ReplaceAll(ksz[j],"");
Long64_t fact = -1;
if (!tok.IsDigit()) {
tok.ToLower();
const char *s[3] = {"k", "m", "g"};
Int_t i = 0, k = 1024;
while (fact < 0) {
if (tok.EndsWith(s[i]))
fact = k;
else
k *= 1024;
}
tok.Remove(tok.Length()-1);
}
if (tok.IsDigit()) {
if (j == 0)
fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
else
fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
} else {
TString ssz(ksz[j], strlen(ksz[j])-1);
Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
}
}
}
}
}
if (IsMaster() && fQMgr)
if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
Warning("SetupCommon", "problems applying fMaxQueries");
if (fProtocol > 12) {
TString vac = gROOT->GetVersion();
vac += TString::Format(":%s", gROOT->GetGitCommit());
TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
if (rtag.Length() > 0)
vac += TString::Format(":%s", rtag.Data());
vac += TString::Format("|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
TMessage m(kPROOF_VERSARCHCOMP);
m << vac;
fSocket->Send(m);
}
TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
TString name;
Int_t from = 0;
while (all_vars.Tokenize(name, from, ",")) {
if (!name.IsNull()) {
TString value = gSystem->Getenv(name);
TProof::AddEnvVar(name, value);
}
}
if (fgLogToSysLog > 0) {
if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
} else if (!(fUser.IsNull()) && fGroup.IsNull()) {
fgSysLogEntity.Form("%s:default", fUser.Data());
} else if (fUser.IsNull() && !(fGroup.IsNull())) {
fgSysLogEntity.Form("undef:%s", fGroup.Data());
}
TString s;
s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
gSystem->Syslog(kLogNotice, s.Data());
}
if (gProofDebugLevel > 0)
Info("SetupCommon", "successfully completed");
return 0;
}
void TProofServ::Terminate(Int_t status)
{
if (fgLogToSysLog > 0) {
TString s;
s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
gSystem->Syslog(kLogNotice, s.Data());
}
ProcInfo_t pi;
if (!gSystem->GetProcInfo(&pi)){
Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
}
if (status == 0) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fSessionDir+"/.delete");
gSystem->Exec(TString::Format("%s %s", kRM, fSessionDir.Data()));
}
if (IsMaster()) {
if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
gSystem->ChangeDirectory("/");
gSystem->MakeDirectory(fQueryDir+"/.delete");
gSystem->Exec(TString::Format("%s %s", kRM, fQueryDir.Data()));
if (fQueryLock)
gSystem->Unlink(fQueryLock->GetName());
}
if (fQueryLock)
fQueryLock->Unlock();
}
if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
if (UnlinkDataDir(fDataDir))
Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
}
TIter next(gSystem->GetListOfFileHandlers());
TObject *fh = 0;
while ((fh = next())) {
TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
if (ih)
gSystem->RemoveFileHandler(ih);
}
gSystem->ExitLoop();
}
Bool_t TProofServ::UnlinkDataDir(const char *path)
{
if (!path || strlen(path) <= 0) return kFALSE;
Bool_t dorm = kTRUE;
void *dirp = gSystem->OpenDirectory(path);
if (dirp) {
TString fpath;
const char *ent = 0;
while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
fpath.Form("%s/%s", path, ent);
FileStat_t st;
if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
dorm = UnlinkDataDir(fpath);
} else {
dorm = kFALSE;
}
}
gSystem->FreeDirectory(dirp);
} else {
dorm = kFALSE;
}
if (dorm && gSystem->Unlink(path) != 0)
Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
return dorm;
}
Bool_t TProofServ::IsActive()
{
return gProofServ ? kTRUE : kFALSE;
}
TProofServ *TProofServ::This()
{
return gProofServ;
}
Int_t TProofServ::OldAuthSetup(TString &conf)
{
OldProofServAuthSetup_t oldAuthSetupHook = 0;
if (!oldAuthSetupHook) {
TString authlib = "libRootAuth";
char *p = 0;
if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
delete[] p;
if (gSystem->Load(authlib) == -1) {
Error("OldAuthSetup", "can't load %s",authlib.Data());
return kFALSE;
}
} else {
Error("OldAuthSetup", "can't locate %s",authlib.Data());
return -1;
}
Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
if (f)
oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
else {
Error("OldAuthSetup", "can't find OldProofServAuthSetup");
return -1;
}
}
return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
fUser, fOrdinal, conf);
}
TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
const char *opt,
TList *inlist, Long64_t fst,
TDSet *dset, const char *selec,
TObject *elist)
{
Int_t seqnum = -1;
if (fQMgr) {
fQMgr->IncrementSeqNum();
seqnum = fQMgr->SeqNum();
}
Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
if (olds)
dset->SetWriteV3(kFALSE);
TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
fst, dset, selec, elist);
pqr->SetTitle(gSystem->BaseName(fQueryDir));
if (olds)
dset->SetWriteV3(kTRUE);
return pqr;
}
void TProofServ::SetQueryRunning(TProofQueryResult *pq)
{
fflush(stdout);
Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
Printf(" ");
Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
TString parlist = "";
TIter nxp(fEnabledPackages);
TPair *pck= 0;
while ((pck = (TPair *)nxp())) {
if (parlist.Length() <= 0)
parlist = pck->GetName();
else
parlist += TString::Format(";%s", pck->GetName());
}
if (fProof) {
pq->SetRunning(startlog, parlist, fProof->GetParallel());
pq->SetProcessInfo(pq->GetEntries(),
fProof->GetCpuTime(), fProof->GetBytesRead());
} else {
pq->SetRunning(startlog, parlist, -1);
pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
}
}
void TProofServ::HandleArchive(TMessage *mess, TString *slb)
{
PDB(kGlobal, 1)
Info("HandleArchive", "Enter");
TString queryref;
TString path;
(*mess) >> queryref >> path;
if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
if (queryref == "Default") {
fArchivePath = path;
Info("HandleArchive",
"default path set to %s", fArchivePath.Data());
return;
}
Int_t qry = -1;
TString qdir;
TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
TProofQueryResult *pqm = pqr;
if (path.Length() <= 0) {
if (fArchivePath.Length() <= 0) {
Info("HandleArchive",
"archive paths are not defined - do nothing");
return;
}
if (qry > 0) {
path.Form("%s/session-%s-%d.root",
fArchivePath.Data(), fTopSessionTag.Data(), qry);
} else {
path = queryref;
path.ReplaceAll(":q","-");
path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
path += ".root";
}
}
if (!pqr || qry < 0) {
TString fout = qdir;
fout += "/query-result.root";
TFile *f = TFile::Open(fout,"READ");
pqr = 0;
if (f) {
f->ReadKeys();
TIter nxk(f->GetListOfKeys());
TKey *k = 0;
while ((k = (TKey *)nxk())) {
if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
pqr = (TProofQueryResult *) f->Get(k->GetName());
if (pqr)
break;
}
}
f->Close();
delete f;
} else {
Info("HandleArchive",
"file cannot be open (%s)",fout.Data());
return;
}
}
if (pqr) {
PDB(kGlobal, 1) Info("HandleArchive",
"archive path for query #%d: %s",
qry, path.Data());
TFile *farc = 0;
if (gSystem->AccessPathName(path))
farc = TFile::Open(path,"NEW");
else
farc = TFile::Open(path,"UPDATE");
if (!farc || !(farc->IsOpen())) {
Info("HandleArchive",
"archive file cannot be open (%s)",path.Data());
return;
}
farc->cd();
pqr->SetArchived(path);
if (pqm)
pqm->SetArchived(path);
pqr->Write();
if (qry > -1 && fQMgr)
fQMgr->SaveQuery(pqr);
Info("HandleArchive",
"results of query %s archived to file %s",
queryref.Data(), path.Data());
}
return;
}
TMap *TProofServ::GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
{
TMap *fcmap = 0;
emsg = "";
if (!fc) {
emsg.Form("file collection undefined!");
return fcmap;
}
fcmap = new TMap();
TIter nxf(fc->GetList());
TFileInfo *fiind = 0;
TString key;
while ((fiind = (TFileInfo *)nxf())) {
TUrl *xurl = fiind->GetCurrentUrl();
key.Form("%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
if (xurl->GetPort() > 0)
key += TString::Format(":%d", xurl->GetPort());
TPair *ent = 0;
THashList* l = 0;
if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
l = (THashList *) ent->Value();
} else {
l = new THashList;
l->SetOwner(kTRUE);
fcmap->Add(new TObjString(key.Data()), l);
}
l->Add(fiind);
}
return fcmap;
}
void TProofServ::HandleProcess(TMessage *mess, TString *slb)
{
PDB(kGlobal, 1)
Info("HandleProcess", "Enter");
if (!IsTopMaster() && !IsIdle())
return;
TDSet *dset;
TString filename, opt;
TList *input;
Long64_t nentries, first;
TEventList *evl = 0;
TEntryList *enl = 0;
Bool_t sync;
(*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
(*mess) >> enl;
Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
if (enl && evl)
SafeDelete(evl);
if ((!hasNoData) && elist)
dset->SetEntryList(elist);
if (IsTopMaster()) {
TString emsg;
if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
fPrefix.Data(), emsg.Data()));
Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
if (sync) SendLogFile();
return;
}
} else if (hasNoData) {
TNamed *ftp = dynamic_cast<TNamed *>(input->FindObject("PROOF_FilesToProcess"));
if (ftp) {
TString dsn(ftp->GetTitle());
if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
dsn.ReplaceAll("dataset:", "");
if (!fDataSetManager) {
emsg.Form("dataset manager not initialized!");
} else {
TFileCollection *fc = 0;
if (!(fc = fDataSetManager->GetDataSet(dsn))) {
emsg.Form("requested dataset '%s' does not exists", dsn.Data());
} else {
TMap *fcmap = GetDataSetNodeMap(fc, emsg);
if (fcmap) {
input->Remove(ftp);
delete ftp;
fcmap->SetOwner(kTRUE);
fcmap->SetName("PROOF_FilesToProcess");
input->Add(fcmap);
}
}
}
if (!emsg.IsNull()) {
SendAsynMessage(TString::Format("HandleProcess on %s: %s",
fPrefix.Data(), emsg.Data()));
Error("HandleProcess", "%s", emsg.Data());
if (sync) SendLogFile();
return;
}
}
}
}
TProofQueryResult *pq = 0;
pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
if (dset) input->Add(dset);
if (elist) input->Add(elist);
pq->SetInputList(input, kTRUE);
input->Clear("nodelete");
SafeDelete(input);
if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
Warning("HandleProcess", "could not save input data: %s", emsg.Data());
if (!(pq->IsDraw())) {
if (fQMgr) {
if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
fQMgr->SaveQuery(pq);
}
}
QueueQuery(pq);
Bool_t enqueued = kFALSE;
Int_t pc = 0;
if (fProof->UseDynamicStartup()) {
TList* workerList = new TList();
EQueryAction retVal = GetWorkers(workerList, pc);
if (retVal == TProofServ::kQueryStop) {
Error("HandleProcess", "error getting list of worker nodes");
if (sync) SendLogFile();
return;
} else if (retVal == TProofServ::kQueryEnqueued) {
enqueued = kTRUE;
Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
} else if (Int_t ret = fProof->AddWorkers(workerList) < 0) {
Error("HandleProcess", "Adding a list of worker nodes returned: %d",
ret);
if (sync) SendLogFile();
return;
}
} else {
EQueryAction retVal = GetWorkers(0, pc);
if (retVal == TProofServ::kQueryStop) {
Error("HandleProcess", "error getting list of worker nodes");
if (sync) SendLogFile();
return;
} else if (retVal == TProofServ::kQueryEnqueued) {
enqueued = kTRUE;
Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
} else if (retVal != TProofServ::kQueryOK) {
Error("HandleProcess", "unknown return value: %d", retVal);
if (sync) SendLogFile();
return;
}
}
TMessage m(kPROOF_QUERYSUBMITTED);
if (!sync || enqueued) {
m << pq->GetSeqNum() << kFALSE;
fSocket->Send(m);
}
if (!IsIdle()) {
Info("HandleProcess",
"query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
return;
}
Bool_t doprocess = kFALSE;
while (WaitingQueries() > 0 && !enqueued) {
doprocess = kTRUE;
ProcessNext(slb);
if (fProof->UseDynamicStartup())
enqueued = kTRUE;
}
SetIdle(kTRUE);
fProof->ResetMergers();
if (!sync) SendLogFile();
if (doprocess) {
m.Reset(kPROOF_SETIDLE);
Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
m << waiting;
fSocket->Send(m);
}
if (sync) SendLogFile();
SetIdle(kTRUE);
} else {
fCompute.Reset();
fCompute.Start();
SetIdle(kFALSE);
Bool_t deleteplayer = kTRUE;
MakePlayer();
if (dset && (dset->IsA() == TDSetProxy::Class()))
((TDSetProxy*)dset)->SetProofServ(this);
TString emsg;
if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
Warning("HandleProcess", "could not get input data: %s", emsg.Data());
if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
Warning("HandleProcess", "could not get query sequential number!");
TObject *nord = 0;
while ((nord = input->FindObject("PROOF_Ordinal")))
input->Remove(nord);
input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
TIter next(input);
TObject *o = 0;
while ((o = next())) {
PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
fPlayer->AddInput(o);
}
TObject *obj = 0;
TSelector *selector_obj = 0;
TIter nxt(input);
while ((obj = nxt())){
if (obj->InheritsFrom("TSelector")) {
selector_obj = (TSelector *) obj;
filename = selector_obj->ClassName();
Info("HandleProcess", "selector obj for '%s' found", selector_obj->ClassName());
break;
}
}
fSocket->Send(kPROOF_STARTPROCESS);
fLatency.Reset();
fSaveOutput.Reset();
PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
if (selector_obj){
Info("HandleProcess", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
fPlayer->Process(dset, selector_obj, opt, nentries, first);
}
else {
Info("HandleProcess", "calling fPlayer->Process() with selector name: %s", filename.Data());
fPlayer->Process(dset, filename, opt, nentries, first);
}
TMessage m(kPROOF_STOPPROCESS);
Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
if (fProtocol > 18) {
TProofProgressStatus* status =
new TProofProgressStatus(fPlayer->GetEventsProcessed(),
gPerfStats?gPerfStats->GetBytesRead():0);
if (status)
m << status << abort;
if (slb)
slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
status->GetEntries(), status->GetBytesRead());
SafeDelete(status);
} else {
m << fPlayer->GetEventsProcessed() << abort;
if (slb)
slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
}
fSocket->Send(m);
PDB(kGlobal, 2)
Info("TProofServ::Handleprocess",
"worker %s has finished processing with %d objects in output list",
GetOrdinal(), fPlayer->GetOutputList()->GetEntries());
SafeDelete(dset);
SafeDelete(enl);
SafeDelete(evl);
Bool_t outok = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted &&
fPlayer->GetOutputList()) ? kTRUE : kFALSE;
if (outok) {
Int_t cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
if (TProof::GetParameter(input, "PROOF_ControlSendOutput", cso) != 0)
cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
if (cso > 0) {
TMessage msg(kPROOF_SENDOUTPUT);
fSocket->Send(msg);
SetIdle(kTRUE);
deleteplayer = kFALSE;
PDB(kGlobal, 1)
Info("HandleProcess", "controlled mode: worker %s has finished,"
" sizes sent to master", fOrdinal.Data());
} else {
Bool_t isInMergingMode = kFALSE;
if (!(TestBit(TProofServ::kHighMemory))) {
Int_t nm = 0;
if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
isInMergingMode = (nm >= 0) ? kTRUE : kFALSE;
}
}
PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isInMergingMode);
if (!IsMaster() && isInMergingMode) {
TMessage msg_osize(kPROOF_SUBMERGER);
msg_osize << Int_t(TProof::kOutputSize);
msg_osize << fPlayer->GetOutputList()->GetEntries();
fMergingSocket = new TServerSocket(0);
Int_t merge_port = 0;
if (fMergingSocket) {
PDB(kGlobal, 2)
Info("HandleProcess", "possible port for merging connections: %d",
fMergingSocket->GetLocalPort());
merge_port = fMergingSocket->GetLocalPort();
}
msg_osize << merge_port;
fSocket->Send(msg_osize);
SetIdle(kTRUE);
deleteplayer = kFALSE;
PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
} else {
PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
Warning("HandleProcess","problems sending output list");
if (IsMaster()) fProof->ResetMergers();
fSocket->Send(kPROOF_SETIDLE);
SetIdle(kTRUE);
SendLogFile();
}
}
} else {
if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
Warning("HandleProcess","the output list is empty!");
if (SendResults(fSocket) != 0)
Warning("HandleProcess", "problems sending output list");
if (IsMaster()) fProof->ResetMergers();
fSocket->Send(kPROOF_SETIDLE);
SetIdle(kTRUE);
SendLogFile();
}
TIter nex(input);
while ((obj = nex())) {
if (obj->InheritsFrom("TSelector")) input->Remove(obj);
}
fPlayer->GetInputList()->SetOwner(0);
TList *added = dynamic_cast<TList *>(input->FindObject("PROOF_InputObjsFromFile"));
if (added) {
if (added->GetSize() > 0) {
TFile *f = dynamic_cast<TFile *>(added->Last());
if (f) {
added->Remove(f);
TIter nxo(added);
while ((o = nxo())) { input->Remove(o); }
input->Remove(added);
added->SetOwner(kFALSE);
added->Clear();
f->Close();
delete f;
}
}
SafeDelete(added);
}
input->SetOwner();
SafeDelete(input);
if (deleteplayer) DeletePlayer();
}
PDB(kGlobal, 1) Info("HandleProcess", "done");
return;
}
Int_t TProofServ::SendResults(TSocket *sock, TList *outlist, TQueryResult *pq)
{
PDB(kOutput, 2) Info("SendResults", "enter");
TString msg;
if (fProtocol > 23 && outlist) {
TMessage mbuf(kPROOF_OUTPUTOBJECT);
Int_t olsz = outlist->GetSize();
if (IsTopMaster() && pq) {
msg.Form("%s: merging output objects ... done ",
fPrefix.Data());
SendAsynMessage(msg.Data());
msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
SendAsynMessage(msg.Data(), kFALSE);
mbuf << (Int_t) 0;
mbuf.WriteObject(pq);
if (sock->Send(mbuf) < 0) return -1;
}
Int_t ns = 0, np = 0;
TIter nxo(outlist);
TObject *o = 0;
Int_t totsz = 0, objsz = 0;
mbuf.Reset();
while ((o = nxo())) {
if (mbuf.Length() > fMsgSizeHWM) {
PDB(kOutput, 1)
Info("SendResults",
"message has %d bytes: limit of %lld bytes reached - sending ...",
mbuf.Length(), fMsgSizeHWM);
if (GetCompressionLevel() > 0) {
mbuf.SetCompressionSettings(fCompressMsg);
mbuf.Compress();
objsz = mbuf.CompLength();
} else {
objsz = mbuf.Length();
}
totsz += objsz;
if (IsTopMaster()) {
msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
fPrefix.Data(), ns, olsz, objsz);
SendAsynMessage(msg.Data(), kFALSE);
}
if (sock->Send(mbuf) < 0) return -1;
mbuf.Reset();
np = 0;
}
ns++;
np++;
mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
mbuf << o;
}
if (np > 0) {
if (GetCompressionLevel() > 0) {
mbuf.SetCompressionSettings(fCompressMsg);
mbuf.Compress();
objsz = mbuf.CompLength();
} else {
objsz = mbuf.Length();
}
totsz += objsz;
if (IsTopMaster()) {
msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
fPrefix.Data(), ns, olsz, objsz);
SendAsynMessage(msg.Data(), kFALSE);
}
if (sock->Send(mbuf) < 0) return -1;
}
if (IsTopMaster()) {
msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
fPrefix.Data(), olsz, totsz);
SendAsynMessage(msg.Data());
}
} else if (fProtocol > 10 && outlist) {
TMessage mbuf(kPROOF_OUTPUTOBJECT);
Int_t olsz = outlist->GetSize();
if (IsTopMaster() && pq) {
msg.Form("%s: merging output objects ... done ",
fPrefix.Data());
SendAsynMessage(msg.Data());
msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
SendAsynMessage(msg.Data(), kFALSE);
mbuf << (Int_t) 0;
mbuf.WriteObject(pq);
if (sock->Send(mbuf) < 0) return -1;
}
Int_t ns = 0;
Int_t totsz = 0, objsz = 0;
TIter nxo(fPlayer->GetOutputList());
TObject *o = 0;
while ((o = nxo())) {
ns++;
mbuf.Reset();
Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
mbuf << type;
mbuf.WriteObject(o);
if (GetCompressionLevel() > 0) {
mbuf.SetCompressionSettings(fCompressMsg);
mbuf.Compress();
objsz = mbuf.CompLength();
} else {
objsz = mbuf.Length();
}
totsz += objsz;
if (IsTopMaster()) {
msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
fPrefix.Data(), ns, olsz, objsz);
SendAsynMessage(msg.Data(), kFALSE);
}
if (sock->Send(mbuf) < 0) return -1;
}
if (IsTopMaster()) {
msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
fPrefix.Data(), olsz, totsz);
SendAsynMessage(msg.Data());
}
} else if (IsTopMaster() && fProtocol > 6 && outlist) {
TMessage mbuf(kPROOF_OUTPUTLIST);
mbuf.WriteObject(pq);
Int_t blen = mbuf.CompLength();
Int_t olsz = outlist->GetSize();
msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
SendAsynMessage(msg.Data(), kFALSE);
if (sock->Send(mbuf) < 0) return -1;
} else {
if (outlist) {
PDB(kGlobal, 2) Info("SendResults", "sending output list");
} else {
PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
}
if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
}
PDB(kOutput,2) Info("SendResults", "done");
return 0;
}
void TProofServ::ProcessNext(TString *slb)
{
TDSet *dset = 0;
TString filename, opt;
TList *input = 0;
Long64_t nentries = -1, first = 0;
TProofQueryResult *pq = 0;
TObject* obj = 0;
TSelector* selector_obj = 0;
fCompute.Reset();
fCompute.Start();
pq = NextQuery();
if (pq) {
SetIdle(kFALSE);
opt = pq->GetOptions();
input = pq->GetInputList();
nentries = pq->GetEntries();
first = pq->GetFirst();
filename = pq->GetSelecImp()->GetName();
Ssiz_t id = opt.Last('#');
if (id != kNPOS && id < opt.Length() - 1) {
filename += opt(id + 1, opt.Length());
opt.Remove(id);
}
TObject *o = 0;
if ((o = pq->GetInputObject("TDSet"))) {
dset = (TDSet *) o;
} else {
Error("ProcessNext", "no TDset object: cannot continue");
return;
}
if (pq->GetSelecImp()) {
gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
}
if (pq->GetSelecHdr() &&
!strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
}
TIter nxt(input);
while ((obj = nxt())){
if (obj->InheritsFrom("TSelector") &&
!strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
selector_obj = (TSelector *) obj;
Info("ProcessNext", "found object for selector '%s'", obj->ClassName());
break;
}
}
} else {
Error("ProcessNext", "empty waiting queries list!");
return;
}
SetQueryRunning(pq);
if (fQMgr) {
if (!(pq->IsDraw()))
fQMgr->SaveQuery(pq);
else
fQMgr->IncrementDrawQueries();
fQMgr->ResetTime();
}
TMessage m(kPROOF_STARTPROCESS);
m << TString(pq->GetSelecImp()->GetName())
<< dset->GetNumOfFiles()
<< pq->GetFirst() << pq->GetEntries();
fSocket->Send(m);
MakePlayer();
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
if (dset->IsA() == TDSetProxy::Class())
((TDSetProxy*)dset)->SetProofServ(this);
TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
fQuerySeqNum = pq->GetSeqNum();
input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
if (smg >= 0) {
input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
if (gEnv->Lookup("Proof.MergersByHost")) {
Int_t mbh = gEnv->GetValue("Proof.MergersByHost", 0);
if (mbh != 0) {
TObject *o = 0;
if ((o = input->FindObject("PROOF_MergersByHost"))) { input->Remove(o); delete o; }
input->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
}
}
}
}
TIter next(input);
TObject *o = 0;
while ((o = next())) {
PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
fPlayer->AddInput(o);
}
if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
if (selector_obj){
Info("ProcessNext", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
fPlayer->Process(dset, selector_obj, opt, nentries, first);
}
else {
Info("ProcessNext", "calling fPlayer->Process() with selector name: %s", filename.Data());
fPlayer->Process(dset, filename, opt, nentries, first);
}
Bool_t abort =
(fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
m.Reset(kPROOF_STOPPROCESS);
if (fProtocol > 18) {
TProofProgressStatus* status = fPlayer->GetProgressStatus();
m << status << abort;
status = 0;
} else if (fProtocol > 8) {
m << fPlayer->GetEventsProcessed() << abort;
} else {
m << fPlayer->GetEventsProcessed();
}
fSocket->Send(m);
}
if (fDataSetManager && fPlayer->GetOutputList()) {
TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
if (psr) {
TString emsg;
if (RegisterDataSets(input, fPlayer->GetOutputList(), fDataSetManager, emsg) != 0)
Warning("ProcessNext", "problems registering produced datasets: %s", emsg.Data());
do {
fPlayer->GetOutputList()->Remove(psr);
delete psr;
} while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet")));
}
}
if (fQMgr && !pq->IsDraw()) {
if (!abort) fProof->AskStatistics();
if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
fQMgr->SaveQuery(pq, fMaxQueries);
}
if (IsTopMaster() && fPlayer->GetOutputList()) {
Bool_t save = kTRUE;
TIter nxo(fPlayer->GetOutputList());
TObject *xo = 0;
while ((xo = nxo())) {
if (xo->InheritsFrom("TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
save = kFALSE;
break;
}
}
if (save) {
TNamed *nof = (TNamed *) input->FindObject("PROOF_DefaultOutputOption");
if (nof) {
TString oopt(nof->GetTitle());
if (oopt.BeginsWith("of:")) {
oopt.Replace(0, 3, "");
if (!oopt.IsNull()) fPlayer->SetOutputFilePath(oopt);
fPlayer->SavePartialResults(kTRUE, kTRUE);
}
}
}
}
TQueryResult *pqr = pq->CloneInfo();
Info("ProcessNext", "adding info about dataset '%s' in the light query result", dset->GetName());
TList rin;
TDSet *ds = new TDSet(dset->GetName(), dset->GetObjName());
rin.Add(ds);
if (pqr) pqr->SetInputList(&rin, kTRUE);
if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
PDB(kGlobal, 2)
Info("ProcessNext", "sending results");
TQueryResult *xpq = (pqr && fProtocol > 10) ? pqr : pq;
if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
Warning("ProcessNext", "problems sending output list");
if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
pq->GetBytes(), pq->GetUsedCPU());
} else {
if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
Warning("ProcessNext","the output list is empty!");
if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
Warning("ProcessNext", "problems sending output list");
if (slb) slb->Form("%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
}
if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
if (pqr) SafeDelete(pqr);
if (fQMgr) fQMgr->RemoveQuery(pq);
} else {
if (!(pq->IsDraw()) && pqr) {
if (fQMgr && fQMgr->Queries()) {
fQMgr->Queries()->Add(pqr);
fQMgr->Queries()->Remove(pq);
}
fPlayer->RemoveQueryResult(TString::Format("%s:%s",
pq->GetTitle(), pq->GetName()));
}
}
DeletePlayer();
if (IsMaster() && fProof->UseDynamicStartup())
fProof->RemoveWorkers(0);
}
Int_t TProofServ::RegisterDataSets(TList *in, TList *out,
TDataSetManager *dsm, TString &msg)
{
PDB(kDataset, 1)
::Info("TProofServ::RegisterDataSets",
"enter: %d objs in the output list", (out ? out->GetSize() : -1));
if (!in || !out || !dsm) {
::Error("TProofServ::RegisterDataSets", "invalid inputs: %p, %p, %p", in, out, dsm);
return 0;
}
msg = "";
THashList tags;
TList torm;
TIter nxo(out);
TObject *o = 0;
while ((o = nxo())) {
TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
if (ds) {
ds->SetTitle(gSystem->HostName());
TNamed *fcn = 0;
TString tag = TString::Format("DATASET_%s", ds->GetName());
if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
if (tags.FindObject(tag)) {
torm.Add(o);
continue;
}
TString regopt(fcn->GetTitle());
if (regopt.Contains(":sortidx:")) {
ds->Sort(kTRUE);
regopt.ReplaceAll(":sortidx:", "");
}
if (dsm->TestBit(TDataSetManager::kAllowRegister)) {
if (ds->GetList()->GetSize() > 0) {
const char *vfmsg = regopt.Contains("V") ? " and verifying" : "";
msg.Form("Registering%s dataset '%s' ... ", vfmsg, ds->GetName());
Bool_t allowVerify = dsm->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
if (regopt.Contains("V") && !allowVerify) dsm->SetBit(TDataSetManager::kAllowVerify);
Int_t rc = dsm->RegisterDataSet(ds->GetName(), ds, regopt);
if (regopt.Contains("V") && !allowVerify) dsm->ResetBit(TDataSetManager::kAllowVerify);
if (rc != 0) {
::Warning("TProofServ::RegisterDataSets",
"failure registering or verifying dataset '%s'", ds->GetName());
msg.Form("Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->GetName());
} else {
::Info("TProofServ::RegisterDataSets", "dataset '%s' successfully registered%s",
ds->GetName(), (strlen(vfmsg) > 0) ? " and verified" : "");
msg.Form("Registering%s dataset '%s' ... OK", vfmsg, ds->GetName());
tags.Add(new TObjString(tag));
}
PDB(kDataset, 2) {
::Info("TProofServ::RegisterDataSets", "printing collection");
ds->Print("F");
}
} else {
::Warning("TProofServ::RegisterDataSets", "collection '%s' is empty", o->GetName());
}
} else {
::Info("TProofServ::RegisterDataSets", "dataset registration not allowed");
return -1;
}
}
}
TIter nxrm(&torm);
while ((o = nxrm())) out->Remove(o);
torm.SetOwner(kTRUE);
TIter nxtg(&tags);
while((o = nxtg())) {
TObject *oo = 0;
while ((oo = out->FindObject(o->GetName()))) { out->Remove(oo); }
}
tags.SetOwner(kTRUE);
PDB(kDataset, 1) ::Info("TProofServ::RegisterDataSets", "exit");
return 0;
}
void TProofServ::HandleQueryList(TMessage *mess)
{
PDB(kGlobal, 1)
Info("HandleQueryList", "Enter");
Bool_t all;
(*mess) >> all;
TList *ql = new TList;
Int_t ntot = 0, npre = 0, ndraw= 0;
if (fQMgr) {
if (all) {
TString qdir = fQueryDir;
Int_t idx = qdir.Index("session-");
if (idx != kNPOS)
qdir.Remove(idx);
fQMgr->ScanPreviousQueries(qdir);
if (fQMgr->PreviousQueries()) {
TIter nxq(fQMgr->PreviousQueries());
TProofQueryResult *pqr = 0;
while ((pqr = (TProofQueryResult *)nxq())) {
ntot++;
pqr->fSeqNum = ntot;
ql->Add(pqr);
}
}
}
npre = ntot;
if (fQMgr->Queries()) {
TIter nxq(fQMgr->Queries());
TProofQueryResult *pqr = 0;
TQueryResult *pqm = 0;
while ((pqr = (TProofQueryResult *)nxq())) {
ntot++;
if ((pqm = pqr->CloneInfo())) {
pqm->fSeqNum = ntot;
ql->Add(pqm);
} else {
Warning("HandleQueryList", "unable to clone TProofQueryResult '%s:%s'",
pqr->GetName(), pqr->GetTitle());
}
}
}
ndraw = fQMgr->DrawQueries();
}
TMessage m(kPROOF_QUERYLIST);
m << npre << ndraw << ql;
fSocket->Send(m);
delete ql;
return;
}
void TProofServ::HandleRemove(TMessage *mess, TString *slb)
{
PDB(kGlobal, 1)
Info("HandleRemove", "Enter");
TString queryref;
(*mess) >> queryref;
if (slb) *slb = queryref;
if (queryref == "cleanupqueue") {
Int_t pend = CleanupWaitingQueries();
Info("HandleRemove", "%d queries removed from the waiting list", pend);
return;
}
if (queryref == "cleanupdir") {
Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
Info("HandleRemove", "%d directories removed", nd);
return;
}
if (fQMgr) {
TProofLockPath *lck = 0;
if (fQMgr->LockSession(queryref, &lck) == 0) {
TList qtorm;
fQMgr->RemoveQuery(queryref, &qtorm);
CleanupWaitingQueries(kFALSE, &qtorm);
if (lck) {
gSystem->Unlink(lck->GetName());
SafeDelete(lck);
}
return;
}
} else {
Warning("HandleRemove", "query result manager undefined!");
}
Info("HandleRemove",
"query %s could not be removed (unable to lock session)", queryref.Data());
return;
}
void TProofServ::HandleRetrieve(TMessage *mess, TString *slb)
{
PDB(kGlobal, 1)
Info("HandleRetrieve", "Enter");
TString queryref;
(*mess) >> queryref;
if (slb) *slb = queryref;
Int_t qry = -1;
TString qdir;
if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
TString fout = qdir;
fout += "/query-result.root";
TFile *f = TFile::Open(fout,"READ");
TProofQueryResult *pqr = 0;
if (f) {
f->ReadKeys();
TIter nxk(f->GetListOfKeys());
TKey *k = 0;
while ((k = (TKey *)nxk())) {
if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
pqr = (TProofQueryResult *) f->Get(k->GetName());
if (pqr && fProtocol < 13) {
TDSet *d = 0;
TObject *o = 0;
TIter nxi(pqr->GetInputList());
while ((o = nxi()))
if ((d = dynamic_cast<TDSet *>(o)))
break;
d->SetWriteV3(kTRUE);
}
if (pqr) {
Float_t qsz = (Float_t) f->GetSize();
Int_t ilb = 0;
static const char *clb[4] = { "bytes", "KB", "MB", "GB" };
while (qsz > 1000. && ilb < 3) {
qsz /= 1000.;
ilb++;
}
SendAsynMessage(TString::Format("%s: sending result of %s:%s (%.1f %s)",
fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
qsz, clb[ilb]));
fSocket->SendObject(pqr, kPROOF_RETRIEVE);
} else {
Info("HandleRetrieve",
"query not found in file %s",fout.Data());
fSocket->SendObject(0, kPROOF_RETRIEVE);
}
break;
}
}
f->Close();
delete f;
} else {
Info("HandleRetrieve",
"file cannot be open (%s)",fout.Data());
fSocket->SendObject(0, kPROOF_RETRIEVE);
return;
}
return;
}
void TProofServ::HandleLibIncPath(TMessage *mess)
{
TString type;
Bool_t add;
TString path;
(*mess) >> type >> add >> path;
if ((type != "lib") && (type != "inc")) {
Error("HandleLibIncPath","unknown action type: %s", type.Data());
return;
}
path.ReplaceAll(","," ");
TObjArray *op = 0;
if (path.Length() > 0 && path != "-") {
if (!(op = path.Tokenize(" "))) {
Error("HandleLibIncPath","decomposing path %s", path.Data());
return;
}
}
if (add) {
if (type == "lib") {
TIter nxl(op, kIterBackward);
TObjString *lib = 0;
while ((lib = (TObjString *) nxl())) {
TString xlib = lib->GetName();
gSystem->ExpandPathName(xlib);
if (!gSystem->AccessPathName(xlib, kReadPermission)) {
TString newlibpath = gSystem->GetDynamicPath();
Int_t pos = 0;
if (newlibpath.BeginsWith(".:"))
pos = 2;
if (newlibpath.Index(xlib) == kNPOS) {
newlibpath.Insert(pos,TString::Format("%s:", xlib.Data()));
gSystem->SetDynamicPath(newlibpath);
}
} else {
Info("HandleLibIncPath",
"libpath %s does not exist or cannot be read - not added", xlib.Data());
}
}
if (IsMaster())
fProof->AddDynamicPath(path);
} else {
TIter nxi(op);
TObjString *inc = 0;
while ((inc = (TObjString *) nxi())) {
TString xinc = inc->GetName();
gSystem->ExpandPathName(xinc);
if (!gSystem->AccessPathName(xinc, kReadPermission)) {
TString curincpath = gSystem->GetIncludePath();
if (curincpath.Index(xinc) == kNPOS)
gSystem->AddIncludePath(TString::Format("-I%s", xinc.Data()));
} else
Info("HandleLibIncPath",
"incpath %s does not exist or cannot be read - not added", xinc.Data());
}
if (IsMaster())
fProof->AddIncludePath(path);
}
} else {
if (type == "lib") {
TIter nxl(op);
TObjString *lib = 0;
while ((lib = (TObjString *) nxl())) {
TString xlib = lib->GetName();
gSystem->ExpandPathName(xlib);
TString newlibpath = gSystem->GetDynamicPath();
newlibpath.ReplaceAll(TString::Format("%s:", xlib.Data()),"");
gSystem->SetDynamicPath(newlibpath);
}
if (IsMaster())
fProof->RemoveDynamicPath(path);
} else {
TIter nxi(op);
TObjString *inc = 0;
while ((inc = (TObjString *) nxi())) {
TString newincpath = gSystem->GetIncludePath();
newincpath.ReplaceAll(TString::Format("-I%s", inc->GetName()),"");
newincpath.ReplaceAll(gInterpreter->GetIncludePath(),"");
gSystem->SetIncludePath(newincpath);
}
if (IsMaster())
fProof->RemoveIncludePath(path);
}
}
}
void TProofServ::HandleCheckFile(TMessage *mess, TString *slb)
{
TString filenam;
TMD5 md5;
UInt_t opt = TProof::kUntar;
TMessage reply(kPROOF_CHECKFILE);
(*mess) >> filenam >> md5;
if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
(*mess) >> opt;
if (slb) *slb = filenam;
if (filenam.BeginsWith("-")) {
Int_t st = 0;
Bool_t err = kFALSE;
filenam = filenam.Strip(TString::kLeading, '-');
TString packnam = filenam;
packnam.Remove(packnam.Length() - 4);
fPackageLock->Lock();
TMD5 *md5local = TMD5::FileChecksum(fPackageDir + "/" + filenam);
if (md5local && md5 == (*md5local)) {
if ((opt & TProof::kRemoveOld)) {
st = gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
packnam.Data()));
if (st)
Error("HandleCheckFile", "failure executing: %s %s/%s",
kRM, fPackageDir.Data(), packnam.Data());
}
char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
kExecutePermission);
if (gunzip) {
st = gSystem->Exec(TString::Format(kUNTAR, gunzip, fPackageDir.Data(),
filenam.Data(), fPackageDir.Data()));
if (st)
Error("HandleCheckFile", "failure executing: %s",
TString::Format(kUNTAR, gunzip, fPackageDir.Data(),
filenam.Data(), fPackageDir.Data()).Data());
delete [] gunzip;
} else
Error("HandleCheckFile", "%s not found", kGUNZIP);
if (gSystem->AccessPathName(fPackageDir + "/" + packnam, kWritePermission)) {
reply << (Int_t)0;
if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
err = kTRUE;
Error("HandleCheckFile", "package %s did not unpack into %s",
filenam.Data(), packnam.Data());
} else {
TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
TMD5::WriteChecksum(md5f, md5local);
reply << (Int_t)1;
PDB(kPackage, 1)
Info("HandleCheckFile",
"package %s installed on node", filenam.Data());
}
} else {
reply << (Int_t)0;
if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
err = kTRUE;
PDB(kPackage, 1)
Info("HandleCheckFile",
"package %s not yet on node", filenam.Data());
}
if (err) {
gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
filenam.Data()));
fPackageLock->Unlock();
} else if (IsMaster()) {
fPackageLock->Unlock();
if (fProof->UploadPackage(fPackageDir + "/" + filenam,
(TProof::EUploadPackageOpt)opt) != 0)
Info("HandleCheckFile",
"problems uploading package %s", filenam.Data());
} else {
fPackageLock->Unlock();
}
delete md5local;
fSocket->Send(reply);
} else if (filenam.BeginsWith("+")) {
filenam = filenam.Strip(TString::kLeading, '+');
TString packnam = filenam;
packnam.Remove(packnam.Length() - 4);
TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
fPackageLock->Lock();
TMD5 *md5local = TMD5::ReadChecksum(md5f);
fPackageLock->Unlock();
if (md5local && md5 == (*md5local)) {
reply << (Int_t)1;
PDB(kPackage, 1)
Info("HandleCheckFile",
"package %s already on node", filenam.Data());
if (IsMaster())
if (fProof->UploadPackage(fPackageDir + "/" + filenam) != 0)
Info("HandleCheckFile",
"problems uploading package %s", filenam.Data());
} else {
reply << (Int_t)0;
if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
PDB(kPackage, 1)
Info("HandleCheckFile",
"package %s not yet on node", filenam.Data());
}
delete md5local;
fSocket->Send(reply);
} else if (filenam.BeginsWith("=")) {
filenam = filenam.Strip(TString::kLeading, '=');
TString packnam = filenam;
packnam.Remove(packnam.Length() - 4);
TString md5f = fPackageDir + "/" + packnam + "/PROOF-INF/md5.txt";
fPackageLock->Lock();
TMD5 *md5local = TMD5::ReadChecksum(md5f);
fPackageLock->Unlock();
if (md5local && md5 == (*md5local)) {
reply << (Int_t)1;
PDB(kPackage, 1)
Info("HandleCheckFile",
"package %s already on node", filenam.Data());
if (IsMaster())
if (fProof->UploadPackage(fPackageDir + "/" + filenam) != 0)
Info("HandleCheckFile",
"problems with uploading package %s", filenam.Data());
} else {
reply << (Int_t)0;
if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
PDB(kPackage, 1)
Info("HandleCheckFile",
"package %s not yet on node", filenam.Data());
}
delete md5local;
fSocket->Send(reply);
} else {
TString cachef = fCacheDir + "/" + filenam;
fCacheLock->Lock();
TMD5 *md5local = TMD5::FileChecksum(cachef);
if (md5local && md5 == (*md5local)) {
Bool_t cp = ((opt & TProof::kCp || opt & TProof::kCpBin) || (fProtocol <= 19)) ? kTRUE : kFALSE;
if (cp) {
Bool_t cpbin = (opt & TProof::kCpBin) ? kTRUE : kFALSE;
CopyFromCache(filenam, cpbin);
}
reply << (Int_t)1;
PDB(kCache, 1)
Info("HandleCheckFile", "file %s already on node", filenam.Data());
} else {
reply << (Int_t)0;
if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
PDB(kCache, 1)
Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
}
delete md5local;
fSocket->Send(reply);
fCacheLock->Unlock();
}
}
Int_t TProofServ::HandleCache(TMessage *mess, TString *slb)
{
PDB(kGlobal, 1)
Info("HandleCache", "Enter");
Int_t status = 0;
Int_t type = 0;
Bool_t all = kFALSE;
TMessage msg;
Bool_t fromglobal = kFALSE;
Int_t chkveropt = TProof::kCheckROOT;
TString noth;
const char *k = (IsMaster()) ? "Mst" : "Wrk";
noth.Form("%s-%s", k, fOrdinal.Data());
TList *optls = 0;
TString packagedir(fPackageDir), package, pdir, ocwd, file;
(*mess) >> type;
switch (type) {
case TProof::kShowCache:
(*mess) >> all;
printf("*** File cache %s:%s ***\n", gSystem->HostName(),
fCacheDir.Data());
fflush(stdout);
PDB(kCache, 1) {
gSystem->Exec(TString::Format("%s -a %s", kLS, fCacheDir.Data()));
} else {
gSystem->Exec(TString::Format("%s %s", kLS, fCacheDir.Data()));
}
if (IsMaster() && all)
fProof->ShowCache(all);
LogToMaster();
if (slb) slb->Form("%d %d", type, all);
break;
case TProof::kClearCache:
file = "";
if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
fCacheLock->Lock();
if (file.IsNull() || file == "*") {
gSystem->Exec(TString::Format("%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
} else {
gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
}
fCacheLock->Unlock();
if (IsMaster())
fProof->ClearCache(file);
if (slb) slb->Form("%d %s", type, file.Data());
break;
case TProof::kShowPackages:
(*mess) >> all;
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
while ((nm = (TNamed *)nxd())) {
printf("*** Global Package cache %s %s:%s ***\n",
nm->GetName(), gSystem->HostName(), nm->GetTitle());
fflush(stdout);
gSystem->Exec(TString::Format("%s %s", kLS, nm->GetTitle()));
printf("\n");
fflush(stdout);
}
}
printf("*** Package cache %s:%s ***\n", gSystem->HostName(),
fPackageDir.Data());
fflush(stdout);
gSystem->Exec(TString::Format("%s %s", kLS, fPackageDir.Data()));
if (IsMaster() && all)
fProof->ShowPackages(all);
LogToMaster();
if (slb) slb->Form("%d %d", type, all);
break;
case TProof::kClearPackages:
status = UnloadPackages();
if (status == 0) {
fPackageLock->Lock();
gSystem->Exec(TString::Format("%s %s/*", kRM, fPackageDir.Data()));
fPackageLock->Unlock();
if (IsMaster())
status = fProof->ClearPackages();
}
if (slb) slb->Form("%d %d", type, status);
break;
case TProof::kClearPackage:
(*mess) >> package;
status = UnloadPackage(package);
if (status == 0) {
fPackageLock->Lock();
gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
package.Data()));
if (IsMaster())
gSystem->Exec(TString::Format("%s %s/%s.par", kRM, fPackageDir.Data(),
package.Data()));
fPackageLock->Unlock();
if (IsMaster())
status = fProof->ClearPackage(package);
}
if (slb) slb->Form("%d %s %d", type, package.Data(), status);
break;
case TProof::kBuildPackage:
(*mess) >> package;
if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
pdir = fPackageDir + "/" + package;
fromglobal = kFALSE;
if (gSystem->AccessPathName(pdir, kReadPermission) ||
gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
while ((nm = (TNamed *)nxd())) {
pdir.Form("%s/%s", nm->GetTitle(), package.Data());
if (!gSystem->AccessPathName(pdir, kReadPermission) &&
!gSystem->AccessPathName(pdir + "/PROOF-INF", kReadPermission)) {
fromglobal = kTRUE;
packagedir = nm->GetTitle();
break;
}
pdir = "";
}
if (pdir.Length() <= 0) {
SendAsynMessage(TString::Format("%s: kBuildPackage: failure locating %s ...",
noth.Data(), package.Data()));
status = -1;
break;
}
}
}
if (IsMaster() && !fromglobal) {
if (fProof->UploadPackage(pdir + ".par") != 0) {
Warning("HandleCache",
"kBuildPackage: problems forwarding package %s to workers", package.Data());
SendAsynMessage(TString::Format("%s: kBuildPackage: problems forwarding package %s to workers ...",
noth.Data(), package.Data()));
}
}
fPackageLock->Lock();
if (!status) {
PDB(kPackage, 1)
Info("HandleCache",
"kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
ocwd = gSystem->WorkingDirectory();
gSystem->ChangeDirectory(pdir);
if (IsMaster())
fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
if (!gSystem->AccessPathName("PROOF-INF/BUILD.sh")) {
SendAsynMessage(TString::Format("%s: building %s ...", noth.Data(), package.Data()));
Bool_t goodver = kTRUE;
Bool_t savever = kFALSE;
TString v, r;
FILE *f = fopen("PROOF-INF/proofvers.txt", "r");
if (f) {
v.Gets(f);
r.Gets(f);
fclose(f);
if (chkveropt == TProof::kCheckROOT || chkveropt == TProof::kCheckSVN) {
if (v != gROOT->GetVersion()) goodver = kFALSE;
if (goodver && chkveropt == TProof::kCheckSVN)
if (r != gROOT->GetGitCommit()) goodver = kFALSE;
}
}
if (!f || !goodver) {
if (!fromglobal || !gSystem->AccessPathName(pdir, kWritePermission)) {
savever = kTRUE;
SendAsynMessage(TString::Format("%s: %s: version change (current: %s:%s,"
" build: %s:%s): cleaning ... ",
noth.Data(), package.Data(), gROOT->GetVersion(),
gROOT->GetGitCommit(), v.Data(), r.Data()));
gSystem->ChangeDirectory(packagedir);
gSystem->Exec(TString::Format("%s %s", kRM, pdir.Data()));
char *gunzip = gSystem->Which(gSystem->Getenv("PATH"), kGUNZIP,
kExecutePermission);
if (gunzip) {
TString par;
par.Form("%s.par", pdir.Data());
TString cmd;
cmd.Form(kUNTAR3, gunzip, par.Data());
status = gSystem->Exec(cmd);
if (status) {
Error("HandleCache", "kBuildPackage: failure executing: %s", cmd.Data());
} else {
TMD5 *md5local = TMD5::FileChecksum(par);
if (md5local) {
TString md5f = packagedir + "/" + package + "/PROOF-INF/md5.txt";
TMD5::WriteChecksum(md5f, md5local);
gSystem->ChangeDirectory(pdir);
SafeDelete(md5local);
} else {
Warning("HandleCache", "kBuildPackage: failure calculating/saving MD5sum for '%s'", par.Data());
}
}
delete [] gunzip;
} else {
Error("HandleCache", "kBuildPackage: %s not found", kGUNZIP);
status = -1;
}
} else {
SendAsynMessage(TString::Format("%s: %s: ROOT version inconsistency (current: %s, build: %s):"
" global package: cannot re-build!!! ",
noth.Data(), package.Data(), gROOT->GetVersion(), v.Data()));
status = -1;
}
}
if (!status) {
TString ipath(gSystem->GetIncludePath());
ipath.ReplaceAll("\"","");
TString cmd;
cmd.Form("export ROOTINCLUDEPATH=\"%s\" ; PROOF-INF/BUILD.sh", ipath.Data());
{
TProofServLogHandlerGuard hg(cmd, fSocket);
}
if (!(status = TProofServLogHandler::GetCmdRtn())) {
if (savever) {
f = fopen("PROOF-INF/proofvers.txt", "w");
if (f) {
fputs(gROOT->GetVersion(), f);
fputs(TString::Format("\n%s", gROOT->GetGitCommit()), f);
fclose(f);
}
}
}
}
} else {
PDB(kPackage, 1)
Info("HandleCache", "no PROOF-INF/BUILD.sh found for package %s", package.Data());
}
gSystem->ChangeDirectory(ocwd);
}
fPackageLock->Unlock();
if (status) {
SendAsynMessage(TString::Format("%s: failure building %s ... (status: %d)", noth.Data(), package.Data(), status));
} else {
if (IsMaster())
status = fProof->BuildPackage(package, TProof::kCollectBuildResults);
PDB(kPackage, 1)
Info("HandleCache", "package %s successfully built", package.Data());
}
if (slb) slb->Form("%d %s %d %d", type, package.Data(), status, chkveropt);
break;
case TProof::kLoadPackage:
(*mess) >> package;
if (fEnabledPackages->FindObject(package)) {
Info("HandleCache",
"package %s already loaded", package.Data());
break;
}
pdir = fPackageDir + "/" + package;
if (gSystem->AccessPathName(pdir, kReadPermission)) {
if (fGlobalPackageDirList && fGlobalPackageDirList->GetSize() > 0) {
TIter nxd(fGlobalPackageDirList);
TNamed *nm = 0;
while ((nm = (TNamed *)nxd())) {
pdir.Form("%s/%s", nm->GetTitle(), package.Data());
if (!gSystem->AccessPathName(pdir, kReadPermission)) {
break;
}
pdir = "";
}
if (pdir.Length() <= 0) {
SendAsynMessage(TString::Format("%s: kLoadPackage: failure locating %s ...",
noth.Data(), package.Data()));
status = -1;
break;
}
}
}
ocwd = gSystem->WorkingDirectory();
gSystem->ChangeDirectory(pdir);
fPackageLock->Lock();
if (!gSystem->AccessPathName("PROOF-INF/SETUP.C")) {
TString setup, setupfn;
setup.Form("SETUP_ganis_%d_%x", gSystem->GetPid(), package.Hash());
const char *tmpDir = gSystem->TempDirectory();
if (tmpDir[strlen(tmpDir)-1] == '/') {
setupfn.Form("%s%s.C", tmpDir, setup.Data());
} else {
setupfn.Form("%s/%s.C", tmpDir, setup.Data());
}
TMacro setupmc("PROOF-INF/SETUP.C");
TObjString *setupline = setupmc.GetLineWith("SETUP(");
if (setupline) {
TString setupstring(setupline->GetString());
setupstring.ReplaceAll("SETUP(", TString::Format("%s(", setup.Data()));
setupline->SetString(setupstring);
} else {
SendAsynMessage(TString::Format("%s: warning: macro '%s/PROOF-INF/SETUP.C' does not contain a SETUP()"
" function", noth.Data(), package.Data()));
}
setupmc.SaveSource(setupfn.Data());
if (gROOT->LoadMacro(setupfn.Data()) != 0) {
SendAsynMessage(TString::Format("%s: error: macro '%s/PROOF-INF/SETUP.C' could not be loaded:"
" cannot continue",
noth.Data(), package.Data()));
status = -1;
} else {
TFunction *fun = (TFunction *) gROOT->GetListOfGlobalFunctions()->FindObject(setup);
if (!fun) {
SendAsynMessage(TString::Format("%s: error: function SETUP() not found in macro '%s/PROOF-INF/SETUP.C':"
" cannot continue",
noth.Data(), package.Data()));
status = -1;
} else {
TMethodCall callEnv;
if (fun->GetNargs() == 0) {
callEnv.Init(fun);
if ((mess->BufferSize() > mess->Length())) {
(*mess) >> optls;
SendAsynMessage(TString::Format("%s: warning: loaded SETUP() does not take any argument:"
" the specified argument will be ignored", noth.Data()));
}
} else if (fun->GetNargs() == 1) {
TMethodArg *arg = (TMethodArg *) fun->GetListOfMethodArgs()->First();
if (arg) {
callEnv.Init(fun);
if ((mess->BufferSize() > mess->Length())) (*mess) >> optls;
TString argsig(arg->GetTitle());
if (argsig.BeginsWith("TList")) {
callEnv.ResetParam();
callEnv.SetParam((Long_t) optls);
} else if (argsig.BeginsWith("const char")) {
callEnv.ResetParam();
TObjString *os = optls ? dynamic_cast<TObjString *>(optls->First()) : 0;
if (os) {
callEnv.SetParam((Long_t) os->GetName());
} else {
if (optls && optls->First()) {
SendAsynMessage(TString::Format("%s: warning: found object argument of type %s:"
" SETUP expects 'const char *': ignoring",
noth.Data(), optls->First()->ClassName()));
}
callEnv.SetParam((Long_t) 0);
}
} else {
SendAsynMessage(TString::Format("%s: error: unsupported SETUP signature: SETUP(%s)"
" cannot continue", noth.Data(), arg->GetTitle()));
status = -1;
}
} else {
SendAsynMessage(TString::Format("%s: error: cannot get information about the SETUP() argument:"
" cannot continue", noth.Data()));
status = -1;
}
} else if (fun->GetNargs() > 1) {
SendAsynMessage(TString::Format("%s: error: function SETUP() can have at most a 'TList *' argument:"
" cannot continue", noth.Data()));
status = -1;
}
Long_t setuprc = (status == 0) ? 0 : -1;
if (status == 0) {
callEnv.Execute(setuprc);
if (setuprc < 0) status = -1;
}
}
}
if (!gSystem->AccessPathName(setupfn.Data())) gSystem->Unlink(setupfn.Data());
}
fPackageLock->Unlock();
gSystem->ChangeDirectory(ocwd);
if (status < 0) {
SendAsynMessage(TString::Format("%s: failure loading %s ...", noth.Data(), package.Data()));
} else {
gSystem->Symlink(pdir, package);
gSystem->AddIncludePath(TString("-I") + package);
gROOT->ProcessLine(TString(".I ") + package);
TPair *pck = (optls && optls->GetSize() > 0) ? new TPair(new TObjString(package), optls->Clone())
: new TPair(new TObjString(package), 0);
fEnabledPackages->Add(pck);
if (IsMaster()) {
if (optls && optls->GetSize() > 0) {
status = fProof->LoadPackage(package, kFALSE, optls);
} else {
status = fProof->LoadPackage(package);
}
}
PDB(kPackage, 1)
Info("HandleCache", "package %s successfully loaded", package.Data());
}
if (slb) slb->Form("%d %s %d", type, package.Data(), status);
break;
case TProof::kShowEnabledPackages:
(*mess) >> all;
if (IsMaster()) {
if (all)
printf("*** Enabled packages on master %s on %s\n",
fOrdinal.Data(), gSystem->HostName());
else
printf("*** Enabled packages ***\n");
} else {
printf("*** Enabled packages on slave %s on %s\n",
fOrdinal.Data(), gSystem->HostName());
}
{
TIter next(fEnabledPackages);
while (TPair *pck = (TPair *) next())
printf("%s\n", pck->GetName());
}
if (IsMaster() && all)
fProof->ShowEnabledPackages(all);
LogToMaster();
if (slb) slb->Form("%d %d", type, all);
break;
case TProof::kShowSubCache:
(*mess) >> all;
if (IsMaster() && all)
fProof->ShowCache(all);
LogToMaster();
if (slb) slb->Form("%d %d", type, all);
break;
case TProof::kClearSubCache:
file = "";
if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
if (IsMaster())
fProof->ClearCache(file);
if (slb) slb->Form("%d %s", type, file.Data());
break;
case TProof::kShowSubPackages:
(*mess) >> all;
if (IsMaster() && all)
fProof->ShowPackages(all);
LogToMaster();
if (slb) slb->Form("%d %d", type, all);
break;
case TProof::kDisableSubPackages:
if (IsMaster())
fProof->DisablePackages();
if (slb) slb->Form("%d", type);
break;
case TProof::kDisableSubPackage:
(*mess) >> package;
if (IsMaster())
fProof->DisablePackage(package);
if (slb) slb->Form("%d %s", type, package.Data());
break;
case TProof::kBuildSubPackage:
(*mess) >> package;
if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
if (IsMaster())
fProof->BuildPackage(package, TProof::kBuildAll, chkveropt);
if (slb) slb->Form("%d %s %d", type, package.Data(), chkveropt);
break;
case TProof::kUnloadPackage:
(*mess) >> package;
status = UnloadPackage(package);
if (IsMaster() && status == 0)
status = fProof->UnloadPackage(package);
if (slb) slb->Form("%d %s %d", type, package.Data(), status);
break;
case TProof::kDisablePackage:
(*mess) >> package;
fPackageLock->Lock();
gSystem->Exec(TString::Format("%s %s/%s", kRM, fPackageDir.Data(),
package.Data()));
gSystem->Exec(TString::Format("%s %s/%s.par", kRM, fPackageDir.Data(),
package.Data()));
fPackageLock->Unlock();
if (IsMaster())
fProof->DisablePackage(package);
if (slb) slb->Form("%d %s", type, package.Data());
break;
case TProof::kUnloadPackages:
status = UnloadPackages();
if (IsMaster() && status == 0)
status = fProof->UnloadPackages();
if (slb) slb->Form("%d %s %d", type, package.Data(), status);
break;
case TProof::kDisablePackages:
fPackageLock->Lock();
gSystem->Exec(TString::Format("%s %s/*", kRM, fPackageDir.Data()));
fPackageLock->Unlock();
if (IsMaster())
fProof->DisablePackages();
if (slb) slb->Form("%d %s", type, package.Data());
break;
case TProof::kListEnabledPackages:
msg.Reset(kPROOF_PACKAGE_LIST);
{ TList *epl = new TList;
if (fEnabledPackages->GetSize() > 0) {
TIter nxp(fEnabledPackages);
TObject *o = 0;
while ((o = nxp())) { epl->Add(new TObjString(o->GetName()));}
}
msg << type << epl;
fSocket->Send(msg);
epl->SetOwner();
delete epl;
}
if (slb) slb->Form("%d", type);
break;
case TProof::kListPackages:
{
TList *pack = new TList;
void *dir = gSystem->OpenDirectory(fPackageDir);
if (dir) {
TString pac(gSystem->GetDirEntry(dir));
while (pac.Length() > 0) {
if (pac.EndsWith(".par")) {
pac.ReplaceAll(".par","");
pack->Add(new TObjString(pac.Data()));
}
pac = gSystem->GetDirEntry(dir);
}
}
gSystem->FreeDirectory(dir);
msg.Reset(kPROOF_PACKAGE_LIST);
msg << type << pack;
fSocket->Send(msg);
}
if (slb) slb->Form("%d", type);
break;
case TProof::kLoadMacro:
{
(*mess) >> package;
if (IsMaster())
fProof->Load(package, kFALSE, kTRUE);
fCacheLock->Lock();
TString fn;
Ssiz_t from = 0;
while ((package.Tokenize(fn, from, ",")))
CopyFromCache(fn, kTRUE);
TString pack(package);
if ((from = pack.Index(",")) != kNPOS) pack.Remove(from);
Info("HandleCache", "loading macro %s ...", pack.Data());
gROOT->ProcessLine(TString::Format(".L %s", pack.Data()));
from = 0;
while ((package.Tokenize(fn, from, ",")))
CopyToCache(fn, 1);
fCacheLock->Unlock();
if (IsMaster())
fProof->Load(package, kFALSE, kFALSE);
LogToMaster();
if (slb) slb->Form("%d %s", type, package.Data());
}
break;
default:
Error("HandleCache", "unknown type %d", type);
break;
}
return status;
}
Int_t TProofServ::HandleWorkerLists(TMessage *mess)
{
PDB(kGlobal, 1)
Info("HandleWorkerLists", "Enter");
Int_t type = 0, rc = 0;
TString ord;
(*mess) >> type;
switch (type) {
case TProof::kActivateWorker:
(*mess) >> ord;
if (ord != "*" && !ord.BeginsWith(GetOrdinal()) && ord != "restore") break;
if (fProof) {
Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
fProof->GetListOfBadSlaves()->GetSize();
if (nact < nactmax || !IsEndMaster()) {
Int_t nwc = fProof->ActivateWorker(ord);
Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
if (ord == "*") {
if (nactnew == nactmax) {
PDB(kGlobal, 1) Info("HandleWorkerList", "all workers (re-)activated");
} else {
if (IsEndMaster())
PDB(kGlobal, 1) Info("HandleWorkerList", "%d workers could not be (re-)activated", nactmax - nactnew);
}
} else if (ord.BeginsWith(GetOrdinal())) {
if (nactnew == (nact + nwc)) {
if (nwc > 0)
PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s (re-)activated", ord.Data());
} else {
if (nwc != -2 && IsEndMaster()) {
Error("HandleWorkerList", "some worker(s) could not be (re-)activated;"
" # of actives: %d --> %d (nwc: %d)",
nact, nactnew, nwc);
}
rc = (nwc < 0) ? nwc : -1;
}
}
} else {
PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already active");
}
} else {
Warning("HandleWorkerList","undefined PROOF session: protocol error?");
}
break;
case TProof::kDeactivateWorker:
(*mess) >> ord;
if (ord != "*" && !ord.BeginsWith(GetOrdinal()) && ord != "restore") break;
if (fProof) {
Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
if (nact > 0) {
Int_t nwc = fProof->DeactivateWorker(ord);
Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
if (ord == "*") {
if (nactnew == 0) {
PDB(kGlobal, 1) Info("HandleWorkerList","all workers deactivated");
} else {
if (IsEndMaster())
PDB(kGlobal, 1) Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
}
} else {
if (nactnew == (nact - nwc)) {
if (nwc > 0)
PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s deactivated", ord.Data());
} else {
if (nwc != -2 && IsEndMaster()) {
Error("HandleWorkerList", "some worker(s) could not be deactivated:"
" # of actives: %d --> %d (nwc: %d)",
nact, nactnew, nwc);
}
rc = (nwc < 0) ? nwc : -1;
}
}
} else {
PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already inactive");
}
} else {
Warning("HandleWorkerList","undefined PROOF session: protocol error?");
}
break;
default:
Warning("HandleWorkerList","unknown action type (%d)", type);
rc = -1;
}
return rc;
}
TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
Int_t & ,
Bool_t )
{
TProofResourcesStatic *resources =
new TProofResourcesStatic(fConfDir, fConfFile);
fConfFile = resources->GetFileName();
PDB(kGlobal,1)
Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
TProofNodeInfo *master = resources->GetMaster();
if (!master) {
PDB(kAll,1)
Info("GetWorkers",
"no appropriate master line found in %s", fConfFile.Data());
return kQueryStop;
} else {
if (fImage.IsNull() && strlen(master->GetImage()) > 0)
fImage = master->GetImage();
}
if (workers) {
if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
PDB(kAll,1)
resources->GetSubmasters()->Print();
TProofNodeInfo *ni = 0;
TIter nw(resources->GetSubmasters());
while ((ni = (TProofNodeInfo *) nw()))
workers->Add(new TProofNodeInfo(*ni));
} else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
PDB(kAll,1)
resources->GetWorkers()->Print();
TProofNodeInfo *ni = 0;
TIter nw(resources->GetWorkers());
while ((ni = (TProofNodeInfo *) nw()))
workers->Add(new TProofNodeInfo(*ni));
}
}
return kQueryOK;
}
FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
{
FILE *oldferr = fgErrorHandlerFile;
fgErrorHandlerFile = (ferr) ? ferr : stderr;
return oldferr;
}
void TProofServ::ErrorHandler(Int_t level, Bool_t abort, const char *location,
const char *msg)
{
if (gErrorIgnoreLevel == kUnset) {
gErrorIgnoreLevel = 0;
if (gEnv) {
TString lvl = gEnv->GetValue("Root.ErrorIgnoreLevel", "Print");
if (!lvl.CompareTo("Print", TString::kIgnoreCase))
gErrorIgnoreLevel = kPrint;
else if (!lvl.CompareTo("Info", TString::kIgnoreCase))
gErrorIgnoreLevel = kInfo;
else if (!lvl.CompareTo("Warning", TString::kIgnoreCase))
gErrorIgnoreLevel = kWarning;
else if (!lvl.CompareTo("Error", TString::kIgnoreCase))
gErrorIgnoreLevel = kError;
else if (!lvl.CompareTo("Break", TString::kIgnoreCase))
gErrorIgnoreLevel = kBreak;
else if (!lvl.CompareTo("SysError", TString::kIgnoreCase))
gErrorIgnoreLevel = kSysError;
else if (!lvl.CompareTo("Fatal", TString::kIgnoreCase))
gErrorIgnoreLevel = kFatal;
}
}
if (level < gErrorIgnoreLevel)
return;
if (level >= kError && gProofServ)
gProofServ->LogToMaster();
Bool_t tosyslog = (fgLogToSysLog > 2) ? kTRUE : kFALSE;
const char *type = 0;
ELogLevel loglevel = kLogInfo;
Int_t ipos = (location) ? strlen(location) : 0;
if (level >= kPrint) {
loglevel = kLogInfo;
type = "Print";
}
if (level >= kInfo) {
loglevel = kLogInfo;
char *ps = location ? (char *) strrchr(location, '|') : (char *)0;
if (ps) {
ipos = (int)(ps - (char *)location);
type = "SvcMsg";
} else {
type = "Info";
}
}
if (level >= kWarning) {
loglevel = kLogWarning;
type = "Warning";
}
if (level >= kError) {
loglevel = kLogErr;
type = "Error";
}
if (level >= kBreak) {
loglevel = kLogErr;
type = "*** Break ***";
}
if (level >= kSysError) {
loglevel = kLogErr;
type = "SysError";
}
if (level >= kFatal) {
loglevel = kLogErr;
type = "Fatal";
}
TString buf;
TTimeStamp ts;
TString st(ts.AsString("lc"),19);
if (!location || ipos == 0 ||
(level >= kPrint && level < kInfo) ||
(level >= kBreak && level < kSysError)) {
fprintf(fgErrorHandlerFile, "%s %5d %s | %s: %s\n", st(11,8).Data(),
gSystem->GetPid(),
(gProofServ ? gProofServ->GetPrefix() : "proof"),
type, msg);
if (tosyslog)
buf.Form("%s: %s:%s", fgSysLogEntity.Data(), type, msg);
} else {
fprintf(fgErrorHandlerFile, "%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
gSystem->GetPid(),
(gProofServ ? gProofServ->GetPrefix() : "proof"),
type, ipos, location, msg);
if (tosyslog)
buf.Form("%s: %s:<%.*s>: %s", fgSysLogEntity.Data(), type, ipos, location, msg);
}
fflush(fgErrorHandlerFile);
if (tosyslog)
gSystem->Syslog(loglevel, buf);
#ifdef __APPLE__
if (__crashreporter_info__)
delete [] __crashreporter_info__;
__crashreporter_info__ = StrDup(buf);
#endif
if (abort) {
static Bool_t recursive = kFALSE;
if (gProofServ != 0 && !recursive) {
recursive = kTRUE;
if (gProofServ->GetSocket()) gProofServ->GetSocket()->Send(kPROOF_FATAL);
recursive = kFALSE;
}
fprintf(fgErrorHandlerFile, "aborting\n");
fflush(fgErrorHandlerFile);
gSystem->StackTrace();
gSystem->Abort();
}
}
Int_t TProofServ::CopyFromCache(const char *macro, Bool_t cpbin)
{
if (!macro || strlen(macro) <= 0)
return -1;
TString name = macro;
TString acmode, args, io;
name = gSystem->SplitAclicMode(name, acmode, args, io);
PDB(kGlobal,1)
Info("CopyFromCache","enter: names: %s, %s", macro, name.Data());
Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
if (!locked) fCacheLock->Lock();
Bool_t assertfile = kFALSE;
TString srcname(name);
Int_t dot = srcname.Last('.');
if (dot != kNPOS) {
srcname.Remove(dot);
srcname += "*";
} else {
assertfile = kTRUE;
}
srcname.Insert(0, TString::Format("%s/",fCacheDir.Data()));
dot = (dot != kNPOS) ? srcname.Last('.') : dot;
if (assertfile) {
if (gSystem->AccessPathName(srcname)) {
PDB(kCache,1)
Info("CopyFromCache", "file %s not in cache", srcname.Data());
if (!locked) fCacheLock->Unlock();
return 0;
}
}
PDB(kCache,1)
Info("CopyFromCache", "retrieving %s from cache", srcname.Data());
gSystem->Exec(TString::Format("%s %s .", kCP, srcname.Data()));
if (!cpbin) {
if (!locked) fCacheLock->Unlock();
return 0;
}
TString binname = name;
dot = binname.Last('.');
if (dot != kNPOS) {
binname.Replace(dot,1,"_");
binname += ".";
} else {
PDB(kCache,1)
Info("CopyFromCache",
"non-standard name structure: %s ('.' missing)", name.Data());
if (!locked) fCacheLock->Unlock();
return 0;
}
TString vername;
vername.Form(".%s", name.Data());
Int_t dotv = vername.Last('.');
if (dotv != kNPOS)
vername.Remove(dotv);
vername += ".binversion";
TString v, r;
Bool_t okfil = kFALSE;
FILE *f = fopen(TString::Format("%s/%s", fCacheDir.Data(), vername.Data()), "r");
if (f) {
v.Gets(f);
r.Gets(f);
fclose(f);
okfil = kTRUE;
}
Bool_t okver = (v != gROOT->GetVersion()) ? kFALSE : kTRUE;
Bool_t okrev = (r != gROOT->GetGitCommit()) ? kFALSE : kTRUE;
if (!okfil || !okver || !okrev) {
PDB(kCache,1)
Info("CopyFromCache",
"removing binaries: 'file': %s, 'ROOT version': %s, 'ROOT revision': %s",
(okfil ? "OK" : "not OK"), (okver ? "OK" : "not OK"), (okrev ? "OK" : "not OK") );
binname += "*";
gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
if (!locked) fCacheLock->Unlock();
return 0;
}
void *dirp = gSystem->OpenDirectory(fCacheDir);
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, binname.Data(), binname.Length())) {
TString fncache;
fncache.Form("%s/%s", fCacheDir.Data(), e);
Bool_t docp = kTRUE;
FileStat_t stlocal, stcache;
if (!gSystem->GetPathInfo(fncache, stcache)) {
Int_t rc = gSystem->GetPathInfo(e, stlocal);
if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
docp = kFALSE;
if (docp) {
TMD5 *md5local = TMD5::FileChecksum(e);
TMD5 *md5cache = TMD5::FileChecksum(fncache);
if (md5local && md5cache && md5local == md5cache) docp = kFALSE;
SafeDelete(md5local);
SafeDelete(md5cache);
}
if (docp) {
gSystem->Exec(TString::Format("%s %s", kRM, e));
PDB(kCache,1)
Info("CopyFromCache",
"retrieving %s from cache", fncache.Data());
gSystem->Exec(TString::Format("%s %s %s", kCP, fncache.Data(), e));
}
}
}
}
gSystem->FreeDirectory(dirp);
}
if (!locked) fCacheLock->Unlock();
return 0;
}
Int_t TProofServ::CopyToCache(const char *macro, Int_t opt)
{
if (!macro || strlen(macro) <= 0 || opt < 0 || opt > 1)
return -1;
TString name = macro;
TString acmode, args, io;
name = gSystem->SplitAclicMode(name, acmode, args, io);
PDB(kGlobal,1)
Info("CopyToCache","enter: opt: %d, names: %s, %s", opt, macro, name.Data());
TString binname = name;
Int_t dot = binname.Last('.');
if (dot != kNPOS)
binname.Replace(dot,1,"_");
TString vername;
vername.Form(".%s", name.Data());
dot = vername.Last('.');
if (dot != kNPOS)
vername.Remove(dot);
vername += ".binversion";
Bool_t savever = kFALSE;
Bool_t locked = (fCacheLock->IsLocked()) ? kTRUE : kFALSE;
if (!locked) fCacheLock->Lock();
if (opt == 0) {
PDB(kCache,1)
Info("CopyToCache",
"caching %s/%s ...", fCacheDir.Data(), name.Data());
gSystem->Exec(TString::Format("%s %s %s", kCP, name.Data(), fCacheDir.Data()));
if (dot != kNPOS) {
binname += "*";
PDB(kCache,1)
Info("CopyToCache", "opt = 0: removing binaries '%s'", binname.Data());
gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), binname.Data()));
gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), vername.Data()));
}
} else if (opt == 1) {
if (dot != kNPOS) {
void *dirp = gSystem->OpenDirectory(".");
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, binname.Data(), binname.Length())) {
Bool_t docp = kTRUE;
FileStat_t stlocal, stcache;
if (!gSystem->GetPathInfo(e, stlocal)) {
TString fncache;
fncache.Form("%s/%s", fCacheDir.Data(), e);
Int_t rc = gSystem->GetPathInfo(fncache, stcache);
if (rc == 0 && (stlocal.fMtime <= stcache.fMtime)) {
docp = kFALSE;
if (rc == 0) rc = -1;
}
if (docp) {
TMD5 *md5local = TMD5::FileChecksum(e);
TMD5 *md5cache = TMD5::FileChecksum(fncache);
if (md5local && md5cache && md5local == md5cache) docp = kFALSE;
SafeDelete(md5local);
SafeDelete(md5cache);
if (!docp) rc = -2;
}
if (docp) {
gSystem->Exec(TString::Format("%s %s", kRM, fncache.Data()));
PDB(kCache,1)
Info("CopyToCache","caching %s ... (reason: %d)", e, rc);
gSystem->Exec(TString::Format("%s %s %s", kCP, e, fncache.Data()));
savever = kTRUE;
}
}
}
}
gSystem->FreeDirectory(dirp);
}
if (savever) {
PDB(kCache,1)
Info("CopyToCache","updating version file %s ...", vername.Data());
FILE *f = fopen(TString::Format("%s/%s", fCacheDir.Data(), vername.Data()), "w");
if (f) {
fputs(gROOT->GetVersion(), f);
fputs(TString::Format("\n%s", gROOT->GetGitCommit()), f);
fclose(f);
}
}
}
}
if (!locked) fCacheLock->Unlock();
return 0;
}
void TProofServ::MakePlayer()
{
TVirtualProofPlayer *p = 0;
DeletePlayer();
if (IsParallel()) {
p = fProof->MakePlayer();
} else {
p = TVirtualProofPlayer::Create("slave", 0, fSocket);
if (IsMaster())
fProof->SetPlayer(p);
}
fPlayer = p;
}
void TProofServ::DeletePlayer()
{
if (IsMaster()) {
PDB(kGlobal, 1) {
fCompute.Stop();
Printf(" +++ Latest processing times: %f s (CPU: %f s)",
fCompute.RealTime(), fCompute.CpuTime());
}
if (fProof) fProof->SetPlayer(0);
} else {
SafeDelete(fPlayer);
}
fPlayer = 0;
}
Int_t TProofServ::GetPriority()
{
TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB","");
TString sqluser = gEnv->GetValue("ProofServ.QueryLogUser","");
TString sqlpass = gEnv->GetValue("ProofServ.QueryLogPasswd","");
Int_t priority = 100;
if (sqlserv == "")
return priority;
TString sql;
sql.Form("SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
TSQLServer *db = TSQLServer::Connect(sqlserv, sqluser, sqlpass);
if (!db || db->IsZombie()) {
Error("GetPriority", "failed to connect to SQL server %s as %s %s",
sqlserv.Data(), sqluser.Data(), sqlpass.Data());
printf("%s\n", sql.Data());
} else {
TSQLResult *res = db->Query(sql);
if (!res) {
Error("GetPriority", "query into proofpriority failed");
Printf("%s", sql.Data());
} else {
TSQLRow *row = res->Next();
if (row) {
priority = atoi(row->GetField(0));
delete row;
} else {
Error("GetPriority", "first row is header is NULL");
}
}
delete res;
}
delete db;
return priority;
}
Int_t TProofServ::SendAsynMessage(const char *msg, Bool_t lf)
{
static TMessage m(kPROOF_MESSAGE);
PDB(kAsyn,1)
Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
if (fSocket && msg) {
m.Reset(kPROOF_MESSAGE);
m << TString(msg) << lf;
return fSocket->Send(m);
}
return -1;
}
void TProofServ::FlushLogFile()
{
off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
if (lend >= 0) lseek(fLogFileDes, lend, SEEK_SET);
}
void TProofServ::TruncateLogFile()
{
#ifndef WIN32
TString emsg;
if (fLogFileMaxSize > 0 && fLogFileDes > 0) {
fflush(stdout);
struct stat st;
if (fstat(fLogFileDes, &st) == 0) {
if (st.st_size >= fLogFileMaxSize) {
off_t truncsz = (off_t) (( fLogFileMaxSize * 80 ) / 100 );
if (truncsz < 100) {
emsg.Form("+++ WARNING +++: %s: requested truncate size too small"
" (%lld,%lld) - ignore ", fPrefix.Data(), (Long64_t) truncsz, fLogFileMaxSize);
SendAsynMessage(emsg.Data());
return;
}
TSystem::ResetErrno();
while (ftruncate(fileno(stdout), truncsz) != 0 &&
(TSystem::GetErrno() == EINTR)) {
TSystem::ResetErrno();
}
if (TSystem::GetErrno() > 0) {
Error("TruncateLogFile", "truncating to %lld bytes; file size is %lld bytes (errno: %d)",
(Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
emsg.Form("+++ WARNING +++: %s: problems truncating log file to %lld bytes; file size is %lld bytes"
" (errno: %d)", fPrefix.Data(), (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
SendAsynMessage(emsg.Data());
} else {
Info("TruncateLogFile", "file truncated to %lld bytes (80%% of %lld); file size was %lld bytes ",
(Long64_t)truncsz, fLogFileMaxSize, (Long64_t)st.st_size);
emsg.Form("+++ WARNING +++: %s: log file truncated to %lld bytes (80%% of %lld)",
fPrefix.Data(), (Long64_t)truncsz, fLogFileMaxSize);
SendAsynMessage(emsg.Data());
}
}
} else {
emsg.Form("+++ WARNING +++: %s: could not stat log file descriptor"
" for truncation (errno: %d)", fPrefix.Data(), TSystem::GetErrno());
SendAsynMessage(emsg.Data());
}
}
#endif
}
void TProofServ::HandleException(Int_t sig)
{
Error("HandleException", "caugth exception triggered by signal '%d' %s %lld",
sig, fgLastMsg.Data(), fgLastEntry);
TString emsg;
emsg.Form("%s: caught exception triggered by signal '%d' %s %lld",
GetOrdinal(), sig, fgLastMsg.Data(), fgLastEntry);
SendAsynMessage(emsg.Data());
gSystem->Exit(sig);
}
Int_t TProofServ::HandleDataSets(TMessage *mess, TString *slb)
{
if (gDebug > 0)
Info("HandleDataSets", "enter");
if (!fDataSetManager) {
Warning("HandleDataSets", "no data manager is available to fullfil the request");
return -1;
}
TString dsUser, dsGroup, dsName, dsTree, uri, opt;
Int_t rc = 0;
TPMERegexp reInvalid("[^A-Za-z0-9._-]");
Int_t type = 0;
(*mess) >> type;
switch (type) {
case TProof::kCheckDataSetName:
{
(*mess) >> uri;
if (slb) slb->Form("%d %s", type, uri.Data());
if (fDataSetManager->ExistsDataSet(uri))
return -1;
}
break;
case TProof::kRegisterDataSet:
{
if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
(*mess) >> uri;
(*mess) >> opt;
if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
TFileCollection *dataSet =
dynamic_cast<TFileCollection*> ((mess->ReadObject(TFileCollection::Class())));
if (!dataSet || dataSet->GetList()->GetSize() == 0) {
Error("HandleDataSets", "can not save an empty list.");
return -1;
}
rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
delete dataSet;
return rc;
} else {
Info("HandleDataSets", "dataset registration not allowed");
if (slb) slb->Form("%d notallowed", type);
return -1;
}
}
break;
case TProof::kRequestStaging:
{
(*mess) >> uri;
if (!fDataSetStgRepo) {
Error("HandleDataSets",
"no dataset staging request repository available");
return -1;
}
TString validUri = uri;
while (reInvalid.Substitute(validUri, "_")) {}
if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
Warning("HandleDataSets", "staging of %s already requested",
uri.Data());
return -1;
}
TFileCollection *fc = fDataSetManager->GetDataSet(uri.Data());
if (!fc || (fc->GetNFiles() == 0)) {
Error("HandleDataSets", "empty dataset or no dataset returned");
if (fc) delete fc;
return -1;
}
TIter it(fc->GetList());
TFileInfo *fi;
while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
fi->ResetBit(TFileInfo::kStaged);
Int_t nToErase = fi->GetNUrls() - 1;
for (Int_t i=0; i<nToErase; i++)
fi->RemoveUrlAt(0);
}
fc->Update();
fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser,
dsName, fc) == 0) {
Error("HandleDataSets",
"can't register staging request for %s", uri.Data());
delete fc;
return -1;
}
Info("HandleDataSets",
"Staging request registered for %s", uri.Data());
delete fc;
return 0;
}
break;
case TProof::kStagingStatus:
{
if (!fDataSetStgRepo) {
Error("HandleDataSets",
"no dataset staging request repository available");
return -1;
}
(*mess) >> uri;
while (reInvalid.Substitute(uri, "_")) {}
TFileCollection *fc = fDataSetStgRepo->GetDataSet(uri.Data());
if (fc) {
fSocket->SendObject(fc, kMESS_OK);
delete fc;
return 0;
}
else {
Info("HandleDataSets", "no pending staging request for %s",
uri.Data());
return 0;
}
}
break;
case TProof::kCancelStaging:
{
if (!fDataSetStgRepo) {
Error("HandleDataSets",
"no dataset staging request repository available");
return -1;
}
(*mess) >> uri;
while (reInvalid.Substitute(uri, "_")) {}
if (!fDataSetStgRepo->RemoveDataSet(uri.Data()))
return -1;
return 0;
}
break;
case TProof::kShowDataSets:
{
(*mess) >> uri >> opt;
if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
fDataSetManager->ShowDataSets(uri, opt);
}
break;
case TProof::kGetDataSets:
{
(*mess) >> uri >> opt;
if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
UInt_t omsk = (UInt_t)TDataSetManager::kExport;
Ssiz_t kLite = opt.Index(":lite:", 0, TString::kIgnoreCase);
if (kLite != kNPOS) {
omsk |= (UInt_t)TDataSetManager::kReadShort;
opt.Remove(kLite, strlen(":lite:"));
}
TMap *returnMap = fDataSetManager->GetDataSets(uri, omsk);
if (returnMap && !opt.IsNull()) {
TMap *rmap = new TMap;
TObject *k = 0;
TFileCollection *fc = 0, *xfc = 0;
TIter nxd(returnMap);
while ((k = nxd()) && (fc = (TFileCollection *) returnMap->GetValue(k))) {
if ((xfc = fc->GetFilesOnServer(opt.Data()))) {
rmap->Add(new TObjString(k->GetName()), xfc);
}
}
returnMap->DeleteAll();
if (rmap->GetSize() > 0) {
returnMap = rmap;
} else {
Info("HandleDataSets", "no dataset found on server '%s'", opt.Data());
delete rmap;
returnMap = 0;
}
}
if (returnMap) {
fSocket->SendObject(returnMap, kMESS_OK);
returnMap->DeleteAll();
} else {
return -1;
}
}
break;
case TProof::kGetDataSet:
{
(*mess) >> uri >> opt;
if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
TFileCollection *fileList = fDataSetManager->GetDataSet(uri,opt);
if (fileList) {
fSocket->SendObject(fileList, kMESS_OK);
delete fileList;
} else {
return -1;
}
}
break;
case TProof::kRemoveDataSet:
{
if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
(*mess) >> uri;
if (slb) slb->Form("%d %s", type, uri.Data());
if (!fDataSetManager->RemoveDataSet(uri)) {
return -1;
}
} else {
Info("HandleDataSets", "dataset creation / removal not allowed");
if (slb) slb->Form("%d notallowed", type);
return -1;
}
}
break;
case TProof::kVerifyDataSet:
{
if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
(*mess) >> uri >> opt;
if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
TProofServLogHandlerGuard hg(fLogFile, fSocket);
rc = fDataSetManager->ScanDataSet(uri, opt);
} else {
Info("HandleDataSets", "dataset verification not allowed");
return -1;
}
}
break;
case TProof::kGetQuota:
{
if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
if (slb) slb->Form("%d", type);
TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
if (groupQuotaMap) {
fSocket->SendObject(groupQuotaMap, kMESS_OK);
} else {
return -1;
}
} else {
Info("HandleDataSets", "quota control disabled");
if (slb) slb->Form("%d disabled", type);
return -1;
}
}
break;
case TProof::kShowQuota:
{
if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
if (slb) slb->Form("%d", type);
(*mess) >> opt;
fDataSetManager->ShowQuota(opt);
} else {
Info("HandleDataSets", "quota control disabled");
if (slb) slb->Form("%d disabled", type);
}
}
break;
case TProof::kSetDefaultTreeName:
{
if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
(*mess) >> uri;
if (slb) slb->Form("%d %s", type, uri.Data());
rc = fDataSetManager->ScanDataSet(uri, (UInt_t)TDataSetManager::kSetDefaultTree);
} else {
Info("HandleDataSets", "kSetDefaultTreeName: modification of dataset info not allowed");
if (slb) slb->Form("%d notallowed", type);
return -1;
}
}
break;
case TProof::kCache:
{
(*mess) >> uri >> opt;
if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
if (opt == "show") {
fDataSetManager->ShowCache(uri);
} else if (opt == "clear") {
fDataSetManager->ClearCache(uri);
} else {
Error("HandleDataSets", "kCache: unknown action: %s", opt.Data());
}
}
break;
default:
rc = -1;
Error("HandleDataSets", "unknown type %d", type);
break;
}
return rc;
}
void TProofServ::HandleSubmerger(TMessage *mess)
{
Int_t type = 0;
(*mess) >> type;
TString msg;
switch (type) {
case TProof::kOutputSize:
break;
case TProof::kSendOutput:
{
Bool_t deleteplayer = kTRUE;
if (!IsMaster()) {
if (fMergingMonitor) {
Info("HandleSubmerger", "kSendOutput: interrupting ...");
fMergingMonitor->Interrupt();
}
if (fMergingSocket) {
if (fMergingMonitor) fMergingMonitor->Remove(fMergingSocket);
fMergingSocket->Close();
SafeDelete(fMergingSocket);
}
TString name;
Int_t port = 0;
Int_t merger_id = -1;
(*mess) >> merger_id >> name >> port;
PDB(kSubmerger, 1)
Info("HandleSubmerger","worker %s redirected to merger #%d %s:%d", fOrdinal.Data(), merger_id, name.Data(), port);
TSocket *t = 0;
if (name.Length() > 0 && port > 0 && (t = new TSocket(name, port)) && t->IsValid()) {
PDB(kSubmerger, 2) Info("HandleSubmerger",
"kSendOutput: worker asked for sending output to merger #%d %s:%d",
merger_id, name.Data(), port);
if (SendResults(t, fPlayer->GetOutputList()) != 0) {
msg.Form("worker %s cannot send results to merger #%d at %s:%d", GetPrefix(), merger_id, name.Data(), port);
PDB(kSubmerger, 2) Info("HandleSubmerger",
"kSendOutput: %s - inform the master", msg.Data());
SendAsynMessage(msg);
TMessage answ(kPROOF_SUBMERGER);
answ << Int_t(TProof::kMergerDown);
answ << merger_id;
fSocket->Send(answ);
} else {
TMessage answ(kPROOF_SUBMERGER);
answ << Int_t(TProof::kOutputSent);
answ << merger_id;
fSocket->Send(answ);
PDB(kSubmerger, 2) Info("HandleSubmerger", "kSendOutput: worker sent its output");
fSocket->Send(kPROOF_SETIDLE);
SetIdle(kTRUE);
SendLogFile();
}
} else {
if (name == "master") {
PDB(kSubmerger, 2) Info("HandleSubmerger",
"kSendOutput: worker was asked for sending output to master");
if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
Warning("HandleSubmerger", "problems sending output list");
fSocket->Send(kPROOF_SETIDLE);
SetIdle(kTRUE);
SendLogFile();
} else if (!t || !(t->IsValid())) {
msg.Form("worker %s could not open a valid socket to merger #%d at %s:%d",
GetPrefix(), merger_id, name.Data(), port);
PDB(kSubmerger, 2) Info("HandleSubmerger",
"kSendOutput: %s - inform the master", msg.Data());
SendAsynMessage(msg);
TMessage answ(kPROOF_SUBMERGER);
answ << Int_t(TProof::kMergerDown);
answ << merger_id;
fSocket->Send(answ);
deleteplayer = kFALSE;
}
if (t) SafeDelete(t);
}
} else {
Error("HandleSubmerger", "kSendOutput: received not on worker");
}
if (deleteplayer) DeletePlayer();
}
break;
case TProof::kBeMerger:
{
Bool_t deleteplayer = kTRUE;
if (!IsMaster()) {
Int_t merger_id = -1;
Int_t connections = 0;
(*mess) >> merger_id >> connections;
PDB(kSubmerger, 2)
Info("HandleSubmerger", "worker %s established as merger", fOrdinal.Data());
PDB(kSubmerger, 2)
Info("HandleSubmerger",
"kBeMerger: worker asked for being merger #%d for %d connections",
merger_id, connections);
TVirtualProofPlayer *mergerPlayer = TVirtualProofPlayer::Create("remote",fProof,0);
if (mergerPlayer) {
PDB(kSubmerger, 2) Info("HandleSubmerger",
"kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
mergerPlayer->SetBit(TVirtualProofPlayer::kIsSubmerger);
if (AcceptResults(connections, mergerPlayer)) {
PDB(kSubmerger, 2)
Info("HandleSubmerger", "kBeMerger: all outputs from workers accepted");
PDB(kSubmerger, 2)
Info("","adding own output to the list on %s", fOrdinal.Data());
TIter nxo(fPlayer->GetOutputList());
TObject * o = 0;
while ((o = nxo())) {
if ((mergerPlayer->AddOutputObject(o) != 1)) {
if (fPlayer->GetOutputList()) {
PDB(kSubmerger, 2)
Info("HandleSocketInput", "removing merged object (%p)", o);
fPlayer->GetOutputList()->Remove(o);
}
}
}
PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: own outputs added");
PDB(kSubmerger, 2) Info("HandleSubmerger","starting delayed merging on %s", fOrdinal.Data());
mergerPlayer->MergeOutput();
PDB(kSubmerger, 2) mergerPlayer->GetOutputList()->Print();
PDB(kSubmerger, 2) Info("HandleSubmerger", "delayed merging on %s finished ", fOrdinal.Data());
PDB(kSubmerger, 2) Info("HandleSubmerger", "%s sending results to master ", fOrdinal.Data());
if (SendResults(fSocket, mergerPlayer->GetOutputList()) != 0)
Warning("HandleSubmerger","kBeMerger: problems sending output list");
if (mergerPlayer->GetOutputList())
mergerPlayer->GetOutputList()->SetOwner(kTRUE);
PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: results sent to master");
fSocket->Send(kPROOF_SETIDLE);
SetIdle(kTRUE);
SendLogFile();
} else {
TMessage answ(kPROOF_SUBMERGER);
answ << Int_t(TProof::kMergerDown);
answ << merger_id;
fSocket->Send(answ);
deleteplayer = kFALSE;
}
SafeDelete(mergerPlayer);
} else {
Warning("HandleSubmerger","kBeMerger: problems craeting the merger player!");
TMessage answ(kPROOF_SUBMERGER);
answ << Int_t(TProof::kMergerDown);
answ << merger_id;
fSocket->Send(answ);
deleteplayer = kFALSE;
}
} else {
Error("HandleSubmerger","kSendOutput: received not on worker");
}
if (deleteplayer) DeletePlayer();
}
break;
case TProof::kMergerDown:
break;
case TProof::kStopMerging:
{
PDB(kSubmerger, 2) Info("HandleSubmerger", "kStopMerging");
if (fMergingMonitor) {
Info("HandleSubmerger", "kStopMerging: interrupting ...");
fMergingMonitor->Interrupt();
}
}
break;
case TProof::kOutputSent:
break;
}
}
void TProofServ::HandleFork(TMessage *)
{
Info("HandleFork", "fork cloning not implemented");
}
Int_t TProofServ::Fork()
{
#ifndef WIN32
pid_t pid;
if ((pid = fork()) < 0) {
Error("Fork", "failed to fork");
return pid;
}
if (!pid) return pid;
if (!fReaperTimer) {
fReaperTimer = new TReaperTimer(1000);
fReaperTimer->Start(-1);
}
fReaperTimer->AddPid(pid);
return pid;
#else
Warning("Fork", "Functionality not provided under windows");
return -1;
#endif
}
void TProofServ::ResolveKeywords(TString &fname, const char *path)
{
if (fname.Contains("<user>")) {
if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
fname.ReplaceAll("<user>", gProofServ->GetUser());
} else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
fname.ReplaceAll("<user>", gProof->GetUser());
} else {
fname.ReplaceAll("<user>", "nouser");
}
}
if (fname.Contains("<u>")) {
if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
TString u(gProofServ->GetUser()[0]);
fname.ReplaceAll("<u>", u);
} else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
TString u(gProof->GetUser()[0]);
fname.ReplaceAll("<u>", u);
} else {
fname.ReplaceAll("<u>", "n");
}
}
if (fname.Contains("<group>")) {
if (gProofServ && gProofServ->GetGroup() && strlen(gProofServ->GetGroup())) {
fname.ReplaceAll("<group>", gProofServ->GetGroup());
} else if (gProof && gProof->GetGroup() && strlen(gProof->GetGroup())) {
fname.ReplaceAll("<group>", gProof->GetGroup());
} else {
fname.ReplaceAll("<group>", "default");
}
}
if (fname.Contains("<stag>")) {
if (gProofServ && gProofServ->GetSessionTag() && strlen(gProofServ->GetSessionTag())) {
fname.ReplaceAll("<stag>", gProofServ->GetSessionTag());
} else if (gProof && gProof->GetSessionTag() && strlen(gProof->GetSessionTag())) {
fname.ReplaceAll("<stag>", gProof->GetSessionTag());
} else {
::Warning("TProofServ::ResolveKeywords", "session tag undefined: ignoring");
}
}
if (fname.Contains("<ord>")) {
if (gProofServ && gProofServ->GetOrdinal() && strlen(gProofServ->GetOrdinal()))
fname.ReplaceAll("<ord>", gProofServ->GetOrdinal());
else
::Warning("TProofServ::ResolveKeywords", "ordinal number undefined: ignoring");
}
if (fname.Contains("<qnum>")) {
if (gProofServ && gProofServ->GetQuerySeqNum() && gProofServ->GetQuerySeqNum() > 0)
fname.ReplaceAll("<qnum>", TString::Format("%d", gProofServ->GetQuerySeqNum()).Data());
else
::Warning("TProofServ::ResolveKeywords", "query seqeuntial number undefined: ignoring");
}
if (fname.Contains("<file>") && path && strlen(path) > 0) {
fname.ReplaceAll("<file>", path);
}
if (fname.Contains("<rver>")) {
TString v = TString::Format("%d", gROOT->GetVersionInt());
fname.ReplaceAll("<rver>", v);
}
if (fname.Contains("<build>")) {
TString b = TString::Format("%d_%s_%s", gROOT->GetVersionInt(), gSystem->GetBuildArch(),
gSystem->GetBuildCompilerVersion());
fname.ReplaceAll("<build>", b);
}
}
Int_t TProofServ::GetSessionStatus()
{
R__LOCKGUARD(fQMtx);
Int_t st = (fIdle) ? 0 : 1;
if (fIdle && fWaitingQueries->GetSize() > 0) st = 3;
return st;
}
Int_t TProofServ::UpdateSessionStatus(Int_t xst)
{
FILE *fs = fopen(fAdminPath.Data(), "w");
if (fs) {
Int_t st = (xst < 0) ? GetSessionStatus() : xst;
fprintf(fs, "%d", st);
fclose(fs);
PDB(kGlobal, 2)
Info("UpdateSessionStatus", "status (=%d) update in path: %s", st, fAdminPath.Data());
} else {
return -errno;
}
return 0;
}
Bool_t TProofServ::IsIdle()
{
R__LOCKGUARD(fQMtx);
return fIdle;
}
void TProofServ::SetIdle(Bool_t st)
{
R__LOCKGUARD(fQMtx);
fIdle = st;
}
Bool_t TProofServ::IsWaiting()
{
R__LOCKGUARD(fQMtx);
if (fIdle && fWaitingQueries->GetSize() > 0) return kTRUE;
return kFALSE;
}
Int_t TProofServ::WaitingQueries()
{
R__LOCKGUARD(fQMtx);
return fWaitingQueries->GetSize();
}
Int_t TProofServ::QueueQuery(TProofQueryResult *pq)
{
R__LOCKGUARD(fQMtx);
fWaitingQueries->Add(pq);
return fWaitingQueries->GetSize();
}
TProofQueryResult *TProofServ::NextQuery()
{
R__LOCKGUARD(fQMtx);
TProofQueryResult *pq = (TProofQueryResult *) fWaitingQueries->First();
fWaitingQueries->Remove(pq);
return pq;
}
Int_t TProofServ::CleanupWaitingQueries(Bool_t del, TList *qls)
{
R__LOCKGUARD(fQMtx);
Int_t ncq = 0;
if (qls) {
TIter nxq(qls);
TObject *o = 0;
while ((o = nxq())) {
if (fWaitingQueries->FindObject(o)) ncq++;
fWaitingQueries->Remove(o);
if (del) delete o;
}
} else {
ncq = fWaitingQueries->GetSize();
fWaitingQueries->SetOwner(del);
fWaitingQueries->Delete();
}
return ncq;
}
void TProofServ::SetLastMsg(const char *lastmsg)
{
fgLastMsg = lastmsg;
}
void TProofServ::SetLastEntry(Long64_t entry)
{
fgLastEntry = entry;
}
Long_t TProofServ::GetVirtMemMax()
{
return fgVirtMemMax;
}
Long_t TProofServ::GetResMemMax()
{
return fgResMemMax;
}
Float_t TProofServ::GetMemHWM()
{
return fgMemHWM;
}
Float_t TProofServ::GetMemStop()
{
return fgMemStop;
}
void TProofServ::GetLocalServer(TString &dsrv)
{
if (gSystem->Getenv("LOCALDATASERVER")) {
dsrv = gSystem->Getenv("LOCALDATASERVER");
if (!dsrv.EndsWith("/")) dsrv += "/";
}
return;
}
void TProofServ::FilterLocalroot(TString &path, const char *dsrv)
{
TUrl u(path, kTRUE);
if (!strcmp(u.GetProtocol(), "file")) {
TString pfx = gEnv->GetValue("Path.Localroot","");
if (!pfx.IsNull() && !strncmp(u.GetFile(), pfx.Data(), pfx.Length())) {
TString srvp = TUrl(dsrv).GetProtocol();
if (srvp == "root" || srvp == "xrd") path.Remove(0, pfx.Length());
}
}
return;
}
Int_t TProofLockPath::Lock()
{
const char *pname = GetName();
if (gSystem->AccessPathName(pname))
fLockId = open(pname, O_CREAT|O_RDWR, 0644);
else
fLockId = open(pname, O_RDWR);
if (fLockId == -1) {
SysError("Lock", "cannot open lock file %s", pname);
return -1;
}
PDB(kPackage, 2)
Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
#if !defined(R__WIN32) && !defined(R__WINGCC)
if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
SysError("Lock", "error locking %s", pname);
close(fLockId);
fLockId = -1;
return -1;
}
#endif
PDB(kPackage, 2)
Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
return 0;
}
Int_t TProofLockPath::Unlock()
{
if (!IsLocked())
return 0;
PDB(kPackage, 2)
Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
lseek(fLockId, 0, SEEK_SET);
#if !defined(R__WIN32) && !defined(R__WINGCC)
if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
SysError("Unlock", "error unlocking %s", GetName());
close(fLockId);
fLockId = -1;
return -1;
}
#endif
PDB(kPackage, 2)
Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());
close(fLockId);
fLockId = -1;
return 0;
}