#include "TProofLite.h"
#ifdef WIN32
# include <io.h>
# include "snprintf.h"
#endif
#include "RConfigure.h"
#include "TDSet.h"
#include "TEnv.h"
#include "TError.h"
#include "TFile.h"
#include "TFileCollection.h"
#include "TFileInfo.h"
#include "THashList.h"
#include "TMessage.h"
#include "TMonitor.h"
#include "TObjString.h"
#include "TPluginManager.h"
#include "TDataSetManager.h"
#include "TDataSetManagerFile.h"
#include "TParameter.h"
#include "TPRegexp.h"
#include "TProofQueryResult.h"
#include "TProofServ.h"
#include "TQueryResultManager.h"
#include "TROOT.h"
#include "TServerSocket.h"
#include "TSlave.h"
#include "TSortedList.h"
#include "TTree.h"
#include "TVirtualProofPlayer.h"
#include "TSelector.h"
ClassImp(TProofLite)
Int_t TProofLite::fgWrksMax = -2;
TProofLite::TProofLite(const char *url, const char *conffile, const char *confdir,
Int_t loglevel, const char *alias, TProofMgr *mgr)
{
fUrl.SetUrl(url);
fServSock = 0;
fCacheLock = 0;
fQueryLock = 0;
fQMgr = 0;
fDataSetManager = 0;
fDataSetStgRepo = 0;
fReInvalid = new TPMERegexp("[^A-Za-z0-9._-]");
InitMembers();
fManager = mgr;
fServType = TProofMgr::kProofLite;
fQueryMode = kSync;
fMasterServ = kTRUE;
if (fManager) SetBit(TProof::kIsClient);
SetBit(TProof::kIsMaster);
if (!gSystem->Getenv("ROOTPROOFCLIENT")) gSystem->Setenv("ROOTPROOFCLIENT","");
fUrl.SetProtocol("proof");
fUrl.SetHost("__lite__");
fUrl.SetPort(1093);
if (strlen(fUrl.GetUser()) <= 0) {
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
fUrl.SetUser(pw->fUser);
delete pw;
}
}
fMaster = gSystem->HostName();
ParseConfigField(conffile);
if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
TString stup;
if (gProofServ) {
Int_t port = gEnv->GetValue("ProofServ.XpdPort", 1093);
stup.Form("%s @ %s:%d ", gProofServ->GetOrdinal(), gSystem->HostName(), port);
}
Printf(" +++ Starting PROOF-Lite %swith %d workers +++", stup.Data(), fNWorkers);
Init(url, conffile, confdir, loglevel, alias);
}
if (!gROOT->GetListOfProofs()->FindObject(this))
gROOT->GetListOfProofs()->Add(this);
gProof = this;
}
Int_t TProofLite::Init(const char *, const char *conffile,
const char *confdir, Int_t loglevel, const char *)
{
R__ASSERT(gSystem);
fValid = kFALSE;
fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
if (TestBit(TProof::kIsMaster)) {
if (!conffile || !conffile[0])
fConfFile = kPROOF_ConfFile;
if (!confdir || !confdir[0])
fConfDir = kPROOF_ConfDir;
} else {
fConfDir = confdir;
fConfFile = conffile;
}
if (CreateSandbox() != 0) {
Error("Init", "could not create/assert sandbox for this session");
return 0;
}
TString sockpathdir = gEnv->GetValue("ProofLite.SockPathDir", gSystem->TempDirectory());
if (sockpathdir.IsNull()) sockpathdir = gSystem->TempDirectory();
if (sockpathdir(sockpathdir.Length()-1) == '/') sockpathdir.Remove(sockpathdir.Length()-1);
fSockPath.Form("%s/plite-%d", sockpathdir.Data(), gSystem->GetPid());
if (fSockPath.Length() > 104) {
Error("Init", "Unix socket path '%s' is too long (%d bytes):",
fSockPath.Data(), fSockPath.Length());
Error("Init", "use 'ProofLite.SockPathDir' to create it under a directory different"
" from '%s'", sockpathdir.Data());
return 0;
}
fLogLevel = loglevel;
fProtocol = kPROOF_Protocol;
fSendGroupView = kTRUE;
fImage = "<local>";
fIntHandler = 0;
fStatus = 0;
fRecvMessages = new TList;
fRecvMessages->SetOwner(kTRUE);
fSlaveInfo = 0;
fChains = new TList;
fAvailablePackages = 0;
fEnabledPackages = 0;
fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
fInputData = 0;
ResetBit(TProof::kNewInputData);
fCollectTimeout = gEnv->GetValue("Proof.CollectTimeout", -1);
fProgressDialog = 0;
fProgressDialogStarted = kFALSE;
fRedirLog = kFALSE;
if (TestBit(TProof::kIsClient)) {
fLogFileName = Form("%s/session-%s.log", fWorkDir.Data(), GetName());
if ((fLogFileW = fopen(fLogFileName.Data(), "w")) == 0)
Error("Init", "could not create temporary logfile %s", fLogFileName.Data());
if ((fLogFileR = fopen(fLogFileName.Data(), "r")) == 0)
Error("Init", "could not open logfile %s for reading", fLogFileName.Data());
}
fLogToWindowOnly = kFALSE;
fCacheLock = new TProofLockPath(TString::Format("%s/%s%s", gSystem->TempDirectory(),
kPROOF_CacheLockFile,
TString(fCacheDir).ReplaceAll("/","%").Data()));
fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s", gSystem->TempDirectory(),
kPROOF_QueryLockFile, GetName(),
TString(fQueryDir).ReplaceAll("/","%").Data()));
fQueryLock->Lock();
fQMgr = new TQueryResultManager(fQueryDir, GetName(), fWorkDir,
fQueryLock, fLogFileW);
Int_t maxq = gEnv->GetValue("ProofLite.MaxQueriesSaved", 10);
if (fQMgr && fQMgr->ApplyMaxQueries(maxq) != 0)
Warning("Init", "problems applying fMaxQueries");
if (InitDataSetManager() != 0)
Warning("Init", "problems initializing the dataset manager");
fNotIdle = 0;
fSync = kTRUE;
fQueries = 0;
fOtherQueries = 0;
fDrawQueries = 0;
fMaxDrawQueries = 1;
fSeqNum = 0;
fSessionID = -1;
fWaitingSlaves = 0;
fPlayer = 0;
MakePlayer("lite");
fFeedback = new TList;
fFeedback->SetOwner();
fFeedback->SetName("FeedbackList");
AddInput(fFeedback);
fSlaves = new TSortedList(kSortDescending);
fActiveSlaves = new TList;
fInactiveSlaves = new TList;
fUniqueSlaves = new TList;
fAllUniqueSlaves = new TList;
fNonUniqueMasters = new TList;
fBadSlaves = new TList;
fAllMonitor = new TMonitor;
fActiveMonitor = new TMonitor;
fUniqueMonitor = new TMonitor;
fAllUniqueMonitor = new TMonitor;
fCurrentMonitor = 0;
fServSock = 0;
fTerminatedSlaveInfos = new TList;
fTerminatedSlaveInfos->SetOwner(kTRUE);
fForkStartup = kFALSE;
if (gEnv->GetValue("ProofLite.ForkStartup", 0) != 0) {
#ifndef WIN32
fForkStartup = kTRUE;
#else
Warning("Init", "fork-based workers startup is not available on Windows - ignoring");
#endif
}
fPackageLock = 0;
fEnabledPackagesOnClient = 0;
fLoadedMacros = 0;
fGlobalPackageDirList = 0;
if (TestBit(TProof::kIsClient)) {
TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
if (globpack.Length() > 0) {
Int_t ng = 0;
Int_t from = 0;
TString ldir;
while (globpack.Tokenize(ldir, from, ":")) {
TProofServ::ResolveKeywords(ldir);
if (gSystem->AccessPathName(ldir, kReadPermission)) {
Warning("Init", "directory for global packages %s does not"
" exist or is not readable", ldir.Data());
} else {
TString key = Form("G%d", ng++);
if (!fGlobalPackageDirList) {
fGlobalPackageDirList = new THashList();
fGlobalPackageDirList->SetOwner();
}
fGlobalPackageDirList->Add(new TNamed(key,ldir));
}
}
}
TString lockpath(fPackageDir);
lockpath.ReplaceAll("/", "%");
lockpath.Insert(0, TString::Format("%s/%s", gSystem->TempDirectory(), kPROOF_PackageLockFile));
fPackageLock = new TProofLockPath(lockpath.Data());
fEnabledPackagesOnClient = new TList;
fEnabledPackagesOnClient->SetOwner();
}
if (SetupWorkers(0) != 0) {
Error("Init", "problems setting up workers");
return 0;
}
fValid = kTRUE;
fAllMonitor->DeActivateAll();
GoParallel(-1, kFALSE);
SendInitialState();
SetActive(kFALSE);
if (IsValid()) {
ActivateAsyncInput();
SetRunStatus(TProof::kRunning);
}
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Add(this);
AskParallel();
return fActiveSlaves->GetSize();
}
TProofLite::~TProofLite()
{
RemoveWorkers(0);
if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
gSystem->MakeDirectory(fQueryDir+"/.delete");
gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
}
if (fQueryLock) {
gSystem->Unlink(fQueryLock->GetName());
fQueryLock->Unlock();
}
SafeDelete(fReInvalid);
SafeDelete(fDataSetManager);
SafeDelete(fDataSetStgRepo);
SafeDelete(fServSock);
gSystem->Unlink(fSockPath);
}
Int_t TProofLite::GetNumberOfWorkers(const char *url)
{
Bool_t notify = kFALSE;
if (fgWrksMax == -2) {
TString sysname = "system.rootrc";
#ifdef ROOTETCDIR
char *s = gSystem->ConcatFileName(ROOTETCDIR, sysname);
#else
TString etc = gRootDir;
#ifdef WIN32
etc += "\\etc";
#else
etc += "/etc";
#endif
char *s = gSystem->ConcatFileName(etc, sysname);
#endif
TEnv sysenv(0);
sysenv.ReadFile(s, kEnvGlobal);
fgWrksMax = sysenv.GetValue("ProofLite.MaxWorkers", -1);
notify = kTRUE;
if (s) delete[] s;
}
if (fgWrksMax == 0) {
::Error("TProofLite::GetNumberOfWorkers",
"PROOF-Lite disabled by the system administrator: sorry!");
return 0;
}
TString nw;
Int_t nWorkers = -1;
Bool_t urlSetting = kFALSE;
if (url && strlen(url)) {
nw = url;
Int_t in = nw.Index("workers=");
if (in != kNPOS) {
nw.Remove(0, in + strlen("workers="));
while (!nw.IsDigit())
nw.Remove(nw.Length()-1);
if (!nw.IsNull()) {
if ((nWorkers = nw.Atoi()) <= 0) {
::Warning("TProofLite::GetNumberOfWorkers",
"number of workers specified by 'workers='"
" is non-positive: using default");
} else {
urlSetting = kFALSE;
}
}
}
}
if (!urlSetting && fgProofEnvList) {
TNamed *nm = (TNamed *) fgProofEnvList->FindObject("PROOF_NWORKERS");
if (nm) {
nw = nm->GetTitle();
if (nw.IsDigit()) {
if ((nWorkers = nw.Atoi()) == 0) {
::Warning("TProofLite::GetNumberOfWorkers",
"number of workers specified by 'workers='"
" is non-positive: using default");
}
}
}
}
if (nWorkers <= 0) {
nWorkers = gEnv->GetValue("ProofLite.Workers", -1);
if (nWorkers <= 0) {
SysInfo_t si;
if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
nWorkers = si.fCpus;
} else {
nWorkers = 2;
}
if (notify) notify = kFALSE;
}
}
if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
if (notify)
::Warning("TProofLite::GetNumberOfWorkers", "number of PROOF-Lite workers limited by"
" the system administrator to %d", fgWrksMax);
nWorkers = fgWrksMax;
}
return nWorkers;
}
Int_t TProofLite::SetupWorkers(Int_t opt, TList *startedWorkers)
{
if (!fServSock) {
if ((fServSock = new TServerSocket(fSockPath))) {
R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(fServSock);
}
}
if (!fServSock || !fServSock->IsValid()) {
Error("SetupWorkers",
"unable to create server socket for internal communications");
SetBit(kInvalidObject);
return -1;
}
TMonitor *mon = new TMonitor;
mon->Add(fServSock);
TList started;
TSlave *wrk = 0;
Int_t nWrksDone = 0, nWrksTot = -1;
TString fullord;
if (opt == 0) {
nWrksTot = fForkStartup ? 1 : fNWorkers;
Int_t ord = 0;
for (; ord < nWrksTot; ord++) {
const char *o = (gProofServ) ? gProofServ->GetOrdinal() : "0";
fullord.Form("%s.%d", o, ord);
SetProofServEnv(fullord);
if ((wrk = CreateSlave("lite", fullord, 100, fImage, fWorkDir)))
started.Add(wrk);
NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
}
} else {
if (!fForkStartup) {
Warning("SetupWorkers", "standard startup: workers already started");
return -1;
}
nWrksTot = fNWorkers - 1;
TString clones;
Int_t ord = 0;
for (; ord < nWrksTot; ord++) {
const char *o = (gProofServ) ? gProofServ->GetOrdinal() : "0";
fullord.Form("%s.%d", o, ord + 1);
if (!clones.IsNull()) clones += " ";
clones += fullord;
if ((wrk = CreateSlave("lite", fullord, -1, fImage, fWorkDir)))
started.Add(wrk);
NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
}
TMessage m(kPROOF_FORK);
m << clones;
Broadcast(m, kActive);
}
nWrksDone = 0;
nWrksTot = started.GetSize();
Int_t nSelects = 0;
Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
while (started.GetSize() > 0 && nSelects < nWrksTot) {
TSocket *xs = mon->Select(to);
nSelects++;
if (xs == (TSocket *) -1) continue;
TSocket *s = fServSock->Accept();
if (s && s->IsValid()) {
TMessage *msg = 0;
if (s->Recv(msg) < 0) {
Warning("SetupWorkers", "problems receiving message from accepted socket!");
} else {
if (msg) {
TString ord;
*msg >> ord;
if ((wrk = (TSlave *) started.FindObject(ord))) {
started.Remove(wrk);
wrk->SetSocket(s);
{ R__LOCKGUARD2(gROOTMutex);
gROOT->GetListOfSockets()->Remove(s);
}
if (wrk->IsValid()) {
wrk->SetInputHandler(new TProofInputHandler(this, wrk->GetSocket()));
wrk->fParallel = 1;
wrk->SetupServ(TSlave::kSlave, 0);
}
fSlaves->Add(wrk);
if (wrk->IsValid()) {
if (opt == 1) fActiveSlaves->Add(wrk);
fAllMonitor->Add(wrk->GetSocket());
if (startedWorkers) startedWorkers->Add(wrk);
NotifyStartUp("Setting up worker servers", ++nWrksDone, nWrksTot);
} else {
fBadSlaves->Add(wrk);
}
}
} else {
Warning("SetupWorkers", "received empty message from accepted socket!");
}
}
}
}
mon->DeActivateAll();
delete mon;
if (!gROOT->IsBatch() && !fProgressDialog) {
if ((fProgressDialog =
gROOT->GetPluginManager()->FindHandler("TProofProgressDialog")))
if (fProgressDialog->LoadPlugin() == -1)
fProgressDialog = 0;
}
if (opt == 1) {
Collect(kActive);
SendGroupView();
SetParallel(-1, 0);
}
return 0;
}
void TProofLite::NotifyStartUp(const char *action, Int_t done, Int_t tot)
{
Int_t frac = (Int_t) (done*100.)/tot;
char msg[512] = {0};
if (frac >= 100) {
snprintf(msg, 512, "%s: OK (%d workers) \n",
action, tot);
} else {
snprintf(msg, 512, "%s: %d out of %d (%d %%)\r",
action, done, tot, frac);
}
fprintf(stderr,"%s", msg);
}
Int_t TProofLite::SetProofServEnv(const char *ord)
{
if (!ord || strlen(ord) <= 0) {
Error("SetProofServEnv", "ordinal string undefined");
return -1;
}
TString rcfile(Form("%s/worker-%s.rootrc", fWorkDir.Data(), ord));
FILE *frc = fopen(rcfile.Data(), "w");
if (!frc) {
Error("SetProofServEnv", "cannot open rc file %s", rcfile.Data());
return -1;
}
fprintf(frc,"# The session working dir\n");
fprintf(frc,"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
fprintf(frc,"# Session tag\n");
fprintf(frc,"ProofServ.SessionTag: %s\n", GetName());
fprintf(frc,"# Proof Log/Debug level\n");
fprintf(frc,"Proof.DebugLevel: %d\n", gDebug);
fprintf(frc,"# Ordinal number\n");
fprintf(frc,"ProofServ.Ordinal: %s\n", ord);
fprintf(frc,"# ROOT Version tag\n");
fprintf(frc,"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
TString sandbox = fSandbox;
if (GetSandbox(sandbox, kFALSE, "ProofServ.Sandbox") != 0)
Warning("SetProofServEnv", "problems getting sandbox string for worker");
fprintf(frc,"# Users sandbox\n");
fprintf(frc, "ProofServ.Sandbox: %s\n", sandbox.Data());
fprintf(frc,"# Users cache\n");
fprintf(frc, "ProofServ.CacheDir: %s\n", fCacheDir.Data());
fprintf(frc,"# Users packages\n");
fprintf(frc, "ProofServ.PackageDir: %s\n", fPackageDir.Data());
fprintf(frc,"# Server image\n");
fprintf(frc, "ProofServ.Image: %s\n", fImage.Data());
fprintf(frc,"# Open socket\n");
fprintf(frc, "ProofServ.OpenSock: %s\n", fSockPath.Data());
fprintf(frc,"# Client Protocol\n");
fprintf(frc, "ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
fclose(frc);
TString envfile(Form("%s/worker-%s.env", fWorkDir.Data(), ord));
FILE *fenv = fopen(envfile.Data(), "w");
if (!fenv) {
Error("SetProofServEnv", "cannot open env file %s", envfile.Data());
return -1;
}
#ifdef R__HAVE_CONFIG
fprintf(fenv, "export ROOTSYS=%s\n", ROOTPREFIX);
#else
fprintf(fenv, "export ROOTSYS=%s\n", gSystem->Getenv("ROOTSYS"));
#endif
#ifdef R__HAVE_CONFIG
fprintf(fenv, "export ROOTCONFDIR=%s\n", ROOTETCDIR);
#else
fprintf(fenv, "export ROOTCONFDIR=%s\n", gSystem->Getenv("ROOTSYS"));
#endif
fprintf(fenv, "export TMPDIR=%s\n", gSystem->TempDirectory());
TString logfile(Form("%s/worker-%s.log", fWorkDir.Data(), ord));
fprintf(fenv, "export ROOTPROOFLOGFILE=%s\n", logfile.Data());
fprintf(fenv, "export ROOTRCFILE=%s\n", rcfile.Data());
fprintf(fenv, "export ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
fprintf(fenv, "export ROOTPROOFLITE=%d\n", fNWorkers);
fprintf(fenv, "export LOCALDATASERVER=\"file://\"\n");
if (fgProofEnvList) {
TString namelist;
TIter nxenv(fgProofEnvList);
TNamed *env = 0;
while ((env = (TNamed *)nxenv())) {
TString senv(env->GetTitle());
ResolveKeywords(senv, ord, logfile.Data());
fprintf(fenv, "export %s=%s\n", env->GetName(), senv.Data());
if (namelist.Length() > 0)
namelist += ',';
namelist += env->GetName();
}
fprintf(fenv, "export PROOF_ALLVARS=%s\n", namelist.Data());
}
fclose(fenv);
return 0;
}
void TProofLite::ResolveKeywords(TString &s, const char *ord,
const char *logfile)
{
if (!logfile) return;
if (s.Contains("<logfilewrk>") && logfile) {
TString lfr(logfile);
if (lfr.EndsWith(".log")) lfr.Remove(lfr.Last('.'));
s.ReplaceAll("<logfilewrk>", lfr.Data());
}
if (gSystem->Getenv("USER") && s.Contains("<user>")) {
s.ReplaceAll("<user>", gSystem->Getenv("USER"));
}
if (gSystem->Getenv("ROOTSYS") && s.Contains("<rootsys>")) {
s.ReplaceAll("<rootsys>", gSystem->Getenv("ROOTSYS"));
}
if (s.Contains("<cpupin>")) {
TString o = ord;
Int_t n = o.Index('.');
if (n != kNPOS) {
o.Remove(0, n+1);
n = o.Atoi();
TString cpuPinList;
{
const TList *envVars = GetEnvVars();
TNamed *var;
if (envVars) {
var = dynamic_cast<TNamed *>(envVars->FindObject("PROOF_SLAVE_CPUPIN_ORDER"));
if (var) cpuPinList = var->GetTitle();
}
}
UInt_t nCpus = 1;
{
SysInfo_t si;
if (gSystem->GetSysInfo(&si) == 0 && (si.fCpus > 0))
nCpus = si.fCpus;
else nCpus = 1;
}
if (cpuPinList.IsNull() || (cpuPinList == "*")) {
n = n % nCpus;
}
else {
n = n % (cpuPinList.CountChar('+')+1);
TString tok;
Ssiz_t from = 0;
for (Int_t i=0; cpuPinList.Tokenize(tok, from, "\\+"); i++) {
if (i == n) {
n = (tok.Atoi() % nCpus);
break;
}
}
}
o.Form("%d", n);
}
else {
o = "0";
}
s.ReplaceAll("<cpupin>", o);
}
}
Int_t TProofLite::CreateSandbox()
{
if (GetSandbox(fSandbox, kTRUE, "ProofLite.Sandbox") != 0) return -1;
fPackageDir = gEnv->GetValue("Proof.PackageDir", "");
if (fPackageDir.IsNull())
fPackageDir.Form("%s/%s", fSandbox.Data(), kPROOF_PackDir);
if (AssertPath(fPackageDir, kTRUE) != 0) return -1;
fCacheDir = gEnv->GetValue("Proof.CacheDir", "");
if (fCacheDir.IsNull())
fCacheDir.Form("%s/%s", fSandbox.Data(), kPROOF_CacheDir);
if (AssertPath(fCacheDir, kTRUE) != 0) return -1;
fDataSetDir = gEnv->GetValue("Proof.DataSetDir", "");
if (fDataSetDir.IsNull())
fDataSetDir.Form("%s/%s", fSandbox.Data(), kPROOF_DataSetDir);
if (AssertPath(fDataSetDir, kTRUE) != 0) return -1;
TString stag;
stag.Form("%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
SetName(stag.Data());
Int_t subpath = gEnv->GetValue("ProofLite.SubPath", 1);
TString sessdir;
if (subpath != 0) {
sessdir = gSystem->WorkingDirectory();
sessdir.ReplaceAll(gSystem->HomeDirectory(),"");
sessdir.ReplaceAll("/","-");
sessdir.Replace(0,1,"/",1);
sessdir.Insert(0, fSandbox.Data());
} else {
sessdir = fSandbox;
}
fWorkDir.Form("%s/session-%s", sessdir.Data(), stag.Data());
if (AssertPath(fWorkDir, kTRUE) != 0) return -1;
TString lastsess;
lastsess.Form("%s/last-lite-session", sessdir.Data());
gSystem->Unlink(lastsess);
gSystem->Symlink(fWorkDir, lastsess);
fQueryDir = gEnv->GetValue("Proof.QueryDir", "");
if (fQueryDir.IsNull())
fQueryDir.Form("%s/%s", sessdir.Data(), kPROOF_QueryDir);
if (AssertPath(fQueryDir, kTRUE) != 0) return -1;
CleanupSandbox();
return 0;
}
void TProofLite::Print(Option_t *option) const
{
TString ord;
if (gProofServ) ord.Form("%s ", gProofServ->GetOrdinal());
if (IsParallel())
Printf("*** PROOF-Lite cluster %s(parallel mode, %d workers):", ord.Data(), GetParallel());
else
Printf("*** PROOF-Lite cluster %s(sequential mode)", ord.Data());
if (gProofServ) {
TString url(gSystem->HostName());
Int_t port = gEnv->GetValue("ProofServ.XpdPort", 1093);
if (port > -1) url.Form("%s:%d",gSystem->HostName(), port);
Printf("URL: %s", url.Data());
} else {
Printf("Host name: %s", gSystem->HostName());
}
Printf("User: %s", GetUser());
TString ver(gROOT->GetVersion());
ver += TString::Format("|%s", gROOT->GetGitCommit());
if (gSystem->Getenv("ROOTVERSIONTAG"))
ver += TString::Format("|%s", gSystem->Getenv("ROOTVERSIONTAG"));
Printf("ROOT version|rev|tag: %s", ver.Data());
Printf("Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
gSystem->GetBuildCompilerVersion());
Printf("Protocol version: %d", GetClientProtocol());
Printf("Working directory: %s", gSystem->WorkingDirectory());
Printf("Communication path: %s", fSockPath.Data());
Printf("Log level: %d", GetLogLevel());
Printf("Number of workers: %d", GetNumberOfSlaves());
Printf("Number of active workers: %d", GetNumberOfActiveSlaves());
Printf("Number of unique workers: %d", GetNumberOfUniqueSlaves());
Printf("Number of inactive workers: %d", GetNumberOfInactiveSlaves());
Printf("Number of bad workers: %d", GetNumberOfBadSlaves());
Printf("Total MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
Printf("Total real time used (s): %.3f", GetRealTime());
Printf("Total CPU time used (s): %.3f", GetCpuTime());
if (TString(option).Contains("a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
Printf("List of workers:");
TIter nextslave(fSlaves);
while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
if (sl->IsValid())
sl->Print(option);
}
}
}
TProofQueryResult *TProofLite::MakeQueryResult(Long64_t nent, const char *opt,
Long64_t fst, TDSet *dset,
const char *selec)
{
Int_t seqnum = -1;
if (fQMgr) {
fQMgr->IncrementSeqNum();
seqnum = fQMgr->SeqNum();
}
TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt,
fPlayer->GetInputList(), nent,
fst, dset, selec,
(dset ? dset->GetEntryList() : 0));
pqr->SetTitle(GetName());
return pqr;
}
void TProofLite::SetQueryRunning(TProofQueryResult *pq)
{
fflush(fLogFileW);
Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
Printf(" ");
Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
TString parlist = "";
TIter nxp(fEnabledPackagesOnClient);
TObjString *os= 0;
while ((os = (TObjString *)nxp())) {
if (parlist.Length() <= 0)
parlist = os->GetName();
else
parlist += Form(";%s",os->GetName());
}
pq->SetRunning(startlog, parlist, GetParallel());
AskStatistics();
pq->SetProcessInfo(pq->GetEntries(), GetCpuTime(), GetBytesRead());
}
Long64_t TProofLite::DrawSelect(TDSet *dset, const char *varexp,
const char *selection, Option_t *option,
Long64_t nentries, Long64_t first)
{
if (!IsValid()) return -1;
if (!IsIdle()) {
Info("DrawSelect","not idle, asynchronous Draw not supported");
return -1;
}
TString opt(option);
Int_t idx = opt.Index("ASYN", 0, TString::kIgnoreCase);
if (idx != kNPOS)
opt.Replace(idx,4,"");
fVarExp = varexp;
fSelection = selection;
return Process(dset, "draw:", opt, nentries, first);
}
Long64_t TProofLite::Process(TDSet *dset, const char *selector, Option_t *option,
Long64_t nentries, Long64_t first)
{
TString opt(option), optfb, outfile;
if (opt.Contains("fb=") || opt.Contains("feedback=")) SetFeedback(opt, optfb, 0);
if (HandleOutputOptions(opt, outfile, 0) != 0) return -1;
fSync = (GetQueryMode(opt) == kSync);
if (!fSync) {
Info("Process","asynchronous mode not yet supported in PROOF-Lite");
return -1;
}
if (!IsIdle()) {
Info("Process", "not idle: cannot accept queries");
return -1;
}
if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
fRunningDSets->SetOwner(kTRUE);
fRunningDSets->Delete();
}
if (!IsValid() || !fQMgr || !fPlayer) {
Error("Process", "invalid sesion or query-result manager undefined!");
return -1;
}
if (!fPlayer->GetInputList()->FindObject("PROOF_MaxSlavesPerNode"))
SetParameter("PROOF_MaxSlavesPerNode", (Long_t)fNWorkers);
Bool_t hasNoData = (!dset || (dset && dset->TestBit(TDSet::kEmpty))) ? kTRUE : kFALSE;
TString emsg;
if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
Error("Process", "from AssertDataSet: %s", emsg.Data());
return -1;
}
if (dset->GetListOfElements()->GetSize() == 0) {
Error("Process", "no files to process!");
return -1;
}
} else if (hasNoData) {
TNamed *ftp = dynamic_cast<TNamed *>(fPlayer->GetInputList()->FindObject("PROOF_FilesToProcess"));
if (ftp) {
TString dsn(ftp->GetTitle());
if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
dsn.ReplaceAll("dataset:", "");
if (!fDataSetManager) {
emsg.Form("dataset manager not initialized!");
} else {
TFileCollection *fc = 0;
if (!(fc = fDataSetManager->GetDataSet(dsn))) {
emsg.Form("requested dataset '%s' does not exists", dsn.Data());
} else {
TMap *fcmap = TProofServ::GetDataSetNodeMap(fc, emsg);
if (fcmap) {
fPlayer->GetInputList()->Remove(ftp);
delete ftp;
fcmap->SetOwner(kTRUE);
fcmap->SetName("PROOF_FilesToProcess");
fPlayer->GetInputList()->Add(fcmap);
}
}
}
if (!emsg.IsNull()) {
Error("HandleProcess", "%s", emsg.Data());
return -1;
}
}
}
}
TString selec(selector), varexp, selection, objname;
if (selec.BeginsWith("draw:")) {
varexp = fVarExp;
selection = fSelection;
if (fPlayer->GetDrawArgs(varexp, selection, opt, selec, objname) != 0) {
Error("Process", "draw query: error parsing arguments '%s', '%s', '%s'",
varexp.Data(), selection.Data(), opt.Data());
return -1;
}
}
TProofQueryResult *pq = MakeQueryResult(nentries, opt, first, 0, selec);
Bool_t savequeries =
(!strcmp(gEnv->GetValue("ProofLite.AutoSaveQueries", "off"), "on")) ? kTRUE : kFALSE;
Int_t memqueries = gEnv->GetValue("ProofLite.MaxQueriesMemory", 10);
if (!(pq->IsDraw())) {
if (fQMgr->Queries()) {
if (memqueries > 0 && fQMgr->Queries()->GetSize() >= memqueries) {
TObject *qfst = fQMgr->Queries()->First();
fQMgr->Queries()->Remove(qfst);
}
if (memqueries >= 0) fQMgr->Queries()->Add(pq);
}
if (savequeries) fQMgr->SaveQuery(pq);
}
fSeqNum = pq->GetSeqNum();
SetQueryRunning(pq);
if (!(pq->IsDraw())) {
if (savequeries) fQMgr->SaveQuery(pq);
} else {
fQMgr->IncrementDrawQueries();
}
if (!gROOT->IsBatch()) {
Int_t dsz = (dset && dset->GetListOfElements()) ? dset->GetListOfElements()->GetSize() : -1;
if (fProgressDialog &&
!TestBit(kUsingSessionGui) && TestBit(kUseProgressDialog)) {
if (!fProgressDialogStarted) {
fProgressDialog->ExecPlugin(5, this, selec.Data(), dsz,
first, nentries);
fProgressDialogStarted = kTRUE;
} else {
ResetProgressDialog(selec.Data(), dsz, first, nentries);
}
}
ResetBit(kUsingSessionGui);
}
if (!(pq->IsDraw()))
fPlayer->AddQueryResult(pq);
fPlayer->SetCurrentQuery(pq);
TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
if (qtag) {
qtag->SetTitle(Form("%s:%s",pq->GetTitle(),pq->GetName()));
} else {
TObject *o = fPlayer->GetInputList()->FindObject("PROOF_QueryTag");
if (o) fPlayer->GetInputList()->Remove(o);
fPlayer->AddInput(new TNamed("PROOF_QueryTag",
Form("%s:%s",pq->GetTitle(),pq->GetName())));
}
SetRunStatus(TProof::kRunning);
TSignalHandler *sh = 0;
if (fSync) {
if (gApplication)
sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
}
fOutputList.Clear();
TList *startedWorkers = 0;
if (fForkStartup) {
startedWorkers = new TList;
startedWorkers->SetOwner(kFALSE);
SetupWorkers(1, startedWorkers);
}
Long64_t rv = 0;
if (!(pq->IsDraw())) {
if (selector && strlen(selector)) {
rv = fPlayer->Process(dset, selec, opt, nentries, first);
} else {
rv = fPlayer->Process(dset, fSelector, opt, nentries, first);
}
} else {
rv = fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
}
if (!optfb.IsNull()) SetFeedback(opt, optfb, 1);
if (fSync) {
if (fForkStartup && startedWorkers) {
RemoveWorkers(startedWorkers);
SafeDelete(startedWorkers);
}
if (sh)
gSystem->AddSignalHandler(sh);
if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
Bool_t abort = (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted)
? kTRUE : kFALSE;
if (abort) fPlayer->StopProcess(kTRUE);
Emit("StopProcess(Bool_t)", abort);
}
pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
UpdateDialog();
if (rv == 0 && dset && pq->GetInputList()) {
pq->GetInputList()->Add(dset);
if (dset->GetEntryList())
pq->GetInputList()->Add(dset->GetEntryList());
}
if (fDataSetManager && fPlayer->GetOutputList()) {
TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
if (psr) {
TString err;
if (TProofServ::RegisterDataSets(fPlayer->GetInputList(),
fPlayer->GetOutputList(), fDataSetManager, err) != 0)
Warning("ProcessNext", "problems registering produced datasets: %s", err.Data());
fPlayer->GetOutputList()->Remove(psr);
delete psr;
}
}
AskStatistics();
if (!(pq->IsDraw())) {
if (fQMgr->FinalizeQuery(pq, this, fPlayer)) {
if (savequeries) fQMgr->SaveQuery(pq, -1);
}
}
if (fPlayer && fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
if (fPlayer->GetListOfResults()) fPlayer->GetListOfResults()->Remove(pq);
if (fQMgr) fQMgr->RemoveQuery(pq);
} else {
QueryResultReady(Form("%s:%s", pq->GetTitle(), pq->GetName()));
if (!(pq->IsDraw()) && memqueries >= 0) {
if (fQMgr && fQMgr->Queries()) {
TQueryResult *pqr = pq->CloneInfo();
if (pqr) fQMgr->Queries()->Add(pqr);
fQMgr->Queries()->Remove(pq);
}
}
TString msg;
msg.Form("Lite-0: all output objects have been merged ");
fprintf(stderr, "%s\n", msg.Data());
}
if (!fPerfTree.IsNull()) {
if (SavePerfTree() != 0) Error("Process", "saving performance info ...");
SetPerfTree(0);
}
}
if (HandleOutputOptions(opt, outfile, 1) != 0) return -1;
if (rv >= 0) {
TParameter<Long64_t> *sst =
(TParameter<Long64_t> *) fOutputList.FindObject("PROOF_SelectorStatus");
if (sst) rv = sst->GetVal();
}
return rv;
}
Int_t TProofLite::CreateSymLinks(TList *files)
{
Int_t rc = 0;
if (files) {
TIter nxf(files);
TObjString *os = 0;
while ((os = (TObjString *) nxf())) {
TString tgt(os->GetName());
gSystem->ExpandPathName(tgt);
TIter nxw(fActiveSlaves);
TSlave *wrk = 0;
while ((wrk = (TSlave *) nxw())) {
TString lnk = Form("%s/%s", wrk->GetWorkDir(), gSystem->BaseName(os->GetName()));
gSystem->Unlink(lnk);
if (gSystem->Symlink(tgt, lnk) != 0) {
rc++;
Warning("CreateSymLinks", "problems creating sym link: %s", lnk.Data());
}
}
}
} else {
Warning("CreateSymLinks", "files list is undefined");
}
return rc;
}
Int_t TProofLite::InitDataSetManager()
{
fDataSetManager = 0;
TString user("???"), group("default");
UserGroup_t *pw = gSystem->GetUserInfo();
if (pw) {
user = pw->fUser;
delete pw;
}
TPluginHandler *h = 0;
TString dsm = gEnv->GetValue("Proof.DataSetManager", "");
if (!dsm.IsNull()) {
if (gROOT->GetPluginManager()) {
h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
if (h && h->LoadPlugin() != -1) {
fDataSetManager =
reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, group.Data(),
user.Data(), dsm.Data()));
}
}
}
if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("InitDataSetManager", "dataset manager plug-in initialization failed");
SafeDelete(fDataSetManager);
}
if (!fDataSetManager) {
TString opts("Av:");
TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
if (dsetdir.IsNull()) {
dsetdir = fDataSetDir;
opts += "Sb:";
}
if (!h) {
h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
if (h && h->LoadPlugin() == -1) h = 0;
}
if (h) {
fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
group.Data(), user.Data(),
Form("dir:%s opt:%s", dsetdir.Data(), opts.Data())));
}
if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
Warning("InitDataSetManager", "default dataset manager plug-in initialization failed");
SafeDelete(fDataSetManager);
}
}
if (gDebug > 0 && fDataSetManager) {
Info("InitDataSetManager", "datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
fDataSetManager->TestBit(TDataSetManager::kCheckQuota),
fDataSetManager->TestBit(TDataSetManager::kAllowRegister),
fDataSetManager->TestBit(TDataSetManager::kAllowVerify),
fDataSetManager->TestBit(TDataSetManager::kTrustInfo),
fDataSetManager->TestBit(TDataSetManager::kIsSandbox));
}
TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
if (!dsReqCfg.IsNull()) {
TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
if (reReqDir.Match(dsReqCfg) == 5) {
TString dsDirFmt;
dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_", dsDirFmt);
if (fDataSetStgRepo && fDataSetStgRepo->TestBit(TObject::kInvalidObject)) {
Warning("InitDataSetManager", "failed init of dataset staging requests repository");
SafeDelete(fDataSetStgRepo);
}
} else {
Warning("InitDataSetManager", "specify, with [dir:]<path>, a valid path for staging requests");
}
} else if (gDebug > 0) {
Warning("InitDataSetManager", "no repository for staging requests available");
}
return (fDataSetManager ? 0 : -1);
}
void TProofLite::ShowCache(Bool_t)
{
if (!IsValid()) return;
Printf("*** Local file cache %s ***", fCacheDir.Data());
gSystem->Exec(Form("%s %s", kLS, fCacheDir.Data()));
}
void TProofLite::ClearCache(const char *file)
{
if (!IsValid()) return;
fCacheLock->Lock();
if (!file || strlen(file) <= 0) {
gSystem->Exec(Form("%s %s/*", kRM, fCacheDir.Data()));
} else {
gSystem->Exec(Form("%s %s/%s", kRM, fCacheDir.Data(), file));
}
fCacheLock->Unlock();
}
Int_t TProofLite::Load(const char *macro, Bool_t notOnClient, Bool_t uniqueOnly,
TList *wrks)
{
if (!IsValid()) return -1;
if (!macro || !macro[0]) {
Error("Load", "need to specify a macro name");
return -1;
}
TString macs(macro), mac;
Int_t from = 0;
while (macs.Tokenize(mac, from, ",")) {
if (CopyMacroToCache(mac) < 0) return -1;
}
return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
}
Int_t TProofLite::CopyMacroToCache(const char *macro, Int_t headerRequired,
TSelector **selector, Int_t opt)
{
TString cacheDir = fCacheDir;
gSystem->ExpandPathName(cacheDir);
TProofLockPath *cacheLock = fCacheLock;
TString name = macro;
TString acmode, args, io;
name = gSystem->SplitAclicMode(name, acmode, args, io);
PDB(kGlobal,1)
Info("CopyMacroToCache", "enter: names: %s, %s", macro, name.Data());
if (gSystem->AccessPathName(name, kReadPermission)) {
Error("CopyMacroToCache", "file %s not found or not readable", name.Data());
return -1;
}
TString mp(TROOT::GetMacroPath());
TString np(gSystem->DirName(name));
if (!np.IsNull()) {
np += ":";
if (!mp.BeginsWith(np) && !mp.Contains(":"+np)) {
Int_t ip = (mp.BeginsWith(".:")) ? 2 : 0;
mp.Insert(ip, np);
TROOT::SetMacroPath(mp);
if (gDebug > 0)
Info("CopyMacroToCache", "macro path set to '%s'", TROOT::GetMacroPath());
}
}
Int_t dot = name.Last('.');
const char *hext[] = { ".h", ".hh", "" };
TString hname, checkedext;
Int_t i = 0;
while (strlen(hext[i]) > 0) {
hname = name(0, dot);
hname += hext[i];
if (!gSystem->AccessPathName(hname, kReadPermission))
break;
if (!checkedext.IsNull()) checkedext += ",";
checkedext += hext[i];
hname = "";
i++;
}
if (hname.IsNull() && headerRequired == 1) {
Error("CopyMacroToCache", "header file for %s not found or not readable "
"(checked extensions: %s)", name.Data(), checkedext.Data());
return -1;
}
if (headerRequired < 0)
hname = "";
cacheLock->Lock();
Bool_t useCacheBinaries = kFALSE;
TString cachedname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(name));
TString cachedhname;
if (!hname.IsNull())
cachedhname = Form("%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
if (!gSystem->AccessPathName(cachedname, kReadPermission)) {
TMD5 *md5 = TMD5::FileChecksum(name);
TMD5 *md5cache = TMD5::FileChecksum(cachedname);
if (md5 && md5cache && (*md5 == *md5cache))
useCacheBinaries = kTRUE;
if (!hname.IsNull()) {
if (!gSystem->AccessPathName(cachedhname, kReadPermission)) {
TMD5 *md5h = TMD5::FileChecksum(hname);
TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
if (md5h && md5hcache && (*md5h != *md5hcache))
useCacheBinaries = kFALSE;
SafeDelete(md5h);
SafeDelete(md5hcache);
}
}
SafeDelete(md5);
SafeDelete(md5cache);
}
TString vername(Form(".%s", name.Data()));
dot = vername.Last('.');
if (dot != kNPOS)
vername.Remove(dot);
vername += ".binversion";
Bool_t savever = kFALSE;
if (useCacheBinaries) {
TString v, r;
FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "r");
if (f) {
v.Gets(f);
r.Gets(f);
fclose(f);
}
if (!f || v != gROOT->GetVersion() || r != gROOT->GetGitCommit())
useCacheBinaries = kFALSE;
}
TString binname = gSystem->BaseName(name);
dot = binname.Last('.');
if (dot != kNPOS)
binname.Replace(dot,1,"_");
binname += ".";
FileStat_t stlocal, stcache;
void *dirp = 0;
if (useCacheBinaries) {
dirp = gSystem->OpenDirectory(cacheDir);
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, binname.Data(), binname.Length())) {
TString fncache = Form("%s/%s", cacheDir.Data(), e);
Bool_t docp = kTRUE;
if (!gSystem->GetPathInfo(fncache, stcache)) {
Int_t rc = gSystem->GetPathInfo(e, stlocal);
if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
docp = kFALSE;
if (docp) {
gSystem->Exec(Form("%s %s", kRM, e));
PDB(kGlobal,2)
Info("CopyMacroToCache",
"retrieving %s from cache", fncache.Data());
gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
}
}
}
}
gSystem->FreeDirectory(dirp);
}
}
cacheLock->Unlock();
if (selector) {
if (!(*selector = TSelector::GetSelector(macro))) {
Error("CopyMacroToCache", "could not create a selector from %s", macro);
return -1;
}
}
cacheLock->Lock();
TList *cachedFiles = new TList;
dirp = gSystem->OpenDirectory(".");
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, binname.Data(), binname.Length())) {
Bool_t docp = kTRUE;
if (!gSystem->GetPathInfo(e, stlocal)) {
TString fncache = Form("%s/%s", cacheDir.Data(), e);
Int_t rc = gSystem->GetPathInfo(fncache, stcache);
if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
docp = kFALSE;
if (docp) {
gSystem->Exec(Form("%s %s", kRM, fncache.Data()));
PDB(kGlobal,2)
Info("CopyMacroToCache","caching %s ...", e);
gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
savever = kTRUE;
}
if (opt & kCpBin)
cachedFiles->Add(new TObjString(fncache.Data()));
}
}
}
gSystem->FreeDirectory(dirp);
}
if (savever) {
FILE *f = fopen(Form("%s/%s", cacheDir.Data(), vername.Data()), "w");
if (f) {
fputs(gROOT->GetVersion(), f);
fputs(Form("\n%s", gROOT->GetGitCommit()), f);
fclose(f);
}
}
if (!useCacheBinaries) {
gSystem->Exec(Form("%s %s", kRM, cachedname.Data()));
PDB(kGlobal,2)
Info("CopyMacroToCache","caching %s ...", name.Data());
gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
if (!hname.IsNull()) {
gSystem->Exec(Form("%s %s", kRM, cachedhname.Data()));
PDB(kGlobal,2)
Info("CopyMacroToCache","caching %s ...", hname.Data());
gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
}
}
if (opt & kCp) {
cachedFiles->Add(new TObjString(cachedname.Data()));
if (!hname.IsNull())
cachedFiles->Add(new TObjString(cachedhname.Data()));
}
cacheLock->Unlock();
if (opt & (kCp | kCpBin))
CreateSymLinks(cachedFiles);
cachedFiles->SetOwner();
delete cachedFiles;
return 0;
}
Int_t TProofLite::CleanupSandbox()
{
Int_t maxold = gEnv->GetValue("Proof.MaxOldSessions", 1);
if (maxold < 0) return 0;
TSortedList *olddirs = new TSortedList(kFALSE);
TString sandbox = gSystem->DirName(fWorkDir.Data());
void *dirp = gSystem->OpenDirectory(sandbox);
if (dirp) {
const char *e = 0;
while ((e = gSystem->GetDirEntry(dirp))) {
if (!strncmp(e, "session-", 8) && !strstr(e, GetName())) {
TString d(e);
Int_t i = d.Last('-');
if (i != kNPOS) d.Remove(i);
i = d.Last('-');
if (i != kNPOS) d.Remove(0,i+1);
TString path = Form("%s/%s", sandbox.Data(), e);
olddirs->Add(new TNamed(d, path));
}
}
gSystem->FreeDirectory(dirp);
}
Bool_t notify = kTRUE;
while (olddirs->GetSize() > maxold) {
if (notify && gDebug > 0)
Printf("Cleaning sandbox at: %s", sandbox.Data());
notify = kFALSE;
TNamed *n = (TNamed *) olddirs->Last();
if (n) {
gSystem->Exec(Form("%s %s", kRM, n->GetTitle()));
olddirs->Remove(n);
delete n;
}
}
olddirs->SetOwner();
delete olddirs;
return 0;
}
TList *TProofLite::GetListOfQueries(Option_t *opt)
{
Bool_t all = ((strchr(opt,'A') || strchr(opt,'a'))) ? kTRUE : kFALSE;
TList *ql = new TList;
Int_t ntot = 0, npre = 0, ndraw= 0;
if (fQMgr) {
if (all) {
TString qdir = fQueryDir;
Int_t idx = qdir.Index("session-");
if (idx != kNPOS)
qdir.Remove(idx);
fQMgr->ScanPreviousQueries(qdir);
if (fQMgr->PreviousQueries()) {
TIter nxq(fQMgr->PreviousQueries());
TProofQueryResult *pqr = 0;
while ((pqr = (TProofQueryResult *)nxq())) {
ntot++;
pqr->fSeqNum = ntot;
ql->Add(pqr);
}
}
}
npre = ntot;
if (fQMgr->Queries()) {
TIter nxq(fQMgr->Queries());
TProofQueryResult *pqr = 0;
TQueryResult *pqm = 0;
while ((pqr = (TProofQueryResult *)nxq())) {
ntot++;
if ((pqm = pqr->CloneInfo())) {
pqm->fSeqNum = ntot;
ql->Add(pqm);
} else {
Warning("GetListOfQueries", "unable to clone TProofQueryResult '%s:%s'",
pqr->GetName(), pqr->GetTitle());
}
}
}
ndraw = fQMgr->DrawQueries();
}
fOtherQueries = npre;
fDrawQueries = ndraw;
if (fQueries) {
fQueries->Delete();
delete fQueries;
fQueries = 0;
}
fQueries = ql;
return fQueries;
}
Bool_t TProofLite::RegisterDataSet(const char *uri,
TFileCollection *dataSet, const char* optStr)
{
if (!fDataSetManager) {
Info("RegisterDataSet", "dataset manager not available");
return kFALSE;
}
if (!uri || strlen(uri) <= 0) {
Info("RegisterDataSet", "specifying a dataset name is mandatory");
return kFALSE;
}
Bool_t parallelverify = kFALSE;
TString sopt(optStr);
if (sopt.Contains("V") && !sopt.Contains("S")) {
parallelverify = kTRUE;
sopt.ReplaceAll("V", "");
}
sopt.ReplaceAll("S", "");
Bool_t result = kTRUE;
if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
if (!dataSet || dataSet->GetList()->GetSize() == 0) {
Error("RegisterDataSet", "can not save an empty list.");
result = kFALSE;
}
result = (fDataSetManager->RegisterDataSet(uri, dataSet, sopt) == 0)
? kTRUE : kFALSE;
} else {
Info("RegisterDataSet", "dataset registration not allowed");
result = kFALSE;
}
if (!result)
Error("RegisterDataSet", "dataset was not saved");
if (!parallelverify) return result;
sopt += "V";
if (VerifyDataSet(uri, sopt) < 0){
Error("RegisterDataSet", "problems verifying dataset '%s'", uri);
return kFALSE;
}
return kTRUE;
}
Int_t TProofLite::SetDataSetTreeName(const char *dataset, const char *treename)
{
if (!fDataSetManager) {
Info("ExistsDataSet", "dataset manager not available");
return kFALSE;
}
if (!dataset || strlen(dataset) <= 0) {
Info("SetDataSetTreeName", "specifying a dataset name is mandatory");
return -1;
}
if (!treename || strlen(treename) <= 0) {
Info("SetDataSetTreeName", "specifying a tree name is mandatory");
return -1;
}
TUri uri(dataset);
TString fragment(treename);
if (!fragment.BeginsWith("/")) fragment.Insert(0, "/");
uri.SetFragment(fragment);
return fDataSetManager->ScanDataSet(uri.GetUri().Data(),
(UInt_t)TDataSetManager::kSetDefaultTree);
}
Bool_t TProofLite::ExistsDataSet(const char *uri)
{
if (!fDataSetManager) {
Info("ExistsDataSet", "dataset manager not available");
return kFALSE;
}
if (!uri || strlen(uri) <= 0) {
Error("ExistsDataSet", "dataset name missing");
return kFALSE;
}
return fDataSetManager->ExistsDataSet(uri);
}
TMap *TProofLite::GetDataSets(const char *uri, const char *srvex)
{
if (!fDataSetManager) {
Info("GetDataSets", "dataset manager not available");
return (TMap *)0;
}
if (srvex && strlen(srvex) > 0) {
return fDataSetManager->GetSubDataSets(uri, srvex);
} else {
UInt_t opt = (UInt_t)TDataSetManager::kExport;
return fDataSetManager->GetDataSets(uri, opt);
}
}
void TProofLite::ShowDataSets(const char *uri, const char *opt)
{
if (!fDataSetManager) {
Info("GetDataSet", "dataset manager not available");
return;
}
fDataSetManager->ShowDataSets(uri, opt);
}
TFileCollection *TProofLite::GetDataSet(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("GetDataSet", "dataset manager not available");
return (TFileCollection *)0;
}
if (!uri || strlen(uri) <= 0) {
Info("GetDataSet", "specifying a dataset name is mandatory");
return 0;
}
return fDataSetManager->GetDataSet(uri);
}
Int_t TProofLite::RemoveDataSet(const char *uri, const char *)
{
if (!fDataSetManager) {
Info("RemoveDataSet", "dataset manager not available");
return -1;
}
if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
if (!fDataSetManager->RemoveDataSet(uri)) {
return -1;
}
} else {
Info("RemoveDataSet", "dataset creation / removal not allowed");
return -1;
}
return 0;
}
Bool_t TProofLite::RequestStagingDataSet(const char *dataset)
{
if (!dataset) {
Error("RequestStagingDataSet", "invalid dataset specified");
return kFALSE;
}
if (!fDataSetStgRepo) {
Error("RequestStagingDataSet", "no dataset staging request repository available");
return kFALSE;
}
TString dsUser, dsGroup, dsName, dsTree;
TString validUri = dataset;
while (fReInvalid->Substitute(validUri, "_")) {}
if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
Warning("RequestStagingDataSet", "staging of %s already requested", dataset);
return kFALSE;
}
TFileCollection *fc = fDataSetManager->GetDataSet(dataset);
if (!fc || (fc->GetNFiles() == 0)) {
Error("RequestStagingDataSet", "empty dataset or no dataset returned");
if (fc) delete fc;
return kFALSE;
}
TIter it(fc->GetList());
TFileInfo *fi;
while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
fi->ResetBit(TFileInfo::kStaged);
Int_t nToErase = fi->GetNUrls() - 1;
for (Int_t i=0; i<nToErase; i++)
fi->RemoveUrlAt(0);
}
fc->Update();
fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser, dsName, fc) == 0) {
Error("RequestStagingDataSet", "can't register staging request for %s", dataset);
delete fc;
return kFALSE;
}
Info("RequestStagingDataSet", "Staging request registered for %s", dataset);
delete fc;
return kTRUE;
}
Bool_t TProofLite::CancelStagingDataSet(const char *dataset)
{
if (!dataset) {
Error("CancelStagingDataSet", "invalid dataset specified");
return kFALSE;
}
if (!fDataSetStgRepo) {
Error("CancelStagingDataSet", "no dataset staging request repository available");
return kFALSE;
}
TString validUri = dataset;
while (fReInvalid->Substitute(validUri, "_")) {}
if (!fDataSetStgRepo->RemoveDataSet(validUri.Data()))
return kFALSE;
return kTRUE;
}
TFileCollection *TProofLite::GetStagingStatusDataSet(const char *dataset)
{
if (!dataset) {
Error("GetStagingStatusDataSet", "invalid dataset specified");
return 0;
}
if (!fDataSetStgRepo) {
Error("GetStagingStatusDataSet", "no dataset staging request repository available");
return 0;
}
TString validUri = dataset;
while (fReInvalid->Substitute(validUri, "_")) {}
TFileCollection *fc = fDataSetStgRepo->GetDataSet(validUri.Data());
if (!fc) {
Info("GetStagingStatusDataSet", "no pending staging request for %s", dataset);
return 0;
}
return fc;
}
Int_t TProofLite::VerifyDataSet(const char *uri, const char *optStr)
{
if (!fDataSetManager) {
Info("VerifyDataSet", "dataset manager not available");
return -1;
}
Int_t rc = -1;
TString sopt(optStr);
if (sopt.Contains("S")) {
if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
rc = fDataSetManager->ScanDataSet(uri);
} else {
Info("VerifyDataSet", "dataset verification not allowed");
rc = -1;
}
return rc;
}
return VerifyDataSetParallel(uri, optStr);
}
void TProofLite::ClearDataSetCache(const char *dataset)
{
if (fDataSetManager) fDataSetManager->ClearCache(dataset);
return;
}
void TProofLite::ShowDataSetCache(const char *dataset)
{
if (fDataSetManager) fDataSetManager->ShowCache(dataset);
return;
}
void TProofLite::SendInputDataFile()
{
TString dataFile;
PrepareInputDataFile(dataFile);
if (dataFile.Length() > 0) {
if (!dataFile.BeginsWith(fCacheDir)) {
TString dst;
dst.Form("%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
if (!gSystem->AccessPathName(dst))
gSystem->Unlink(dst);
if (gSystem->CopyFile(dataFile, dst) != 0)
Warning("SendInputDataFile", "problems copying '%s' to '%s'",
dataFile.Data(), dst.Data());
}
AddInput(new TNamed("PROOF_InputDataFile", Form("%s", gSystem->BaseName(dataFile))));
}
}
Int_t TProofLite::Remove(const char *ref, Bool_t all)
{
PDB(kGlobal, 1)
Info("Remove", "Enter: %s, %d", ref, all);
if (all) {
if (fPlayer)
fPlayer->RemoveQueryResult(ref);
}
TString queryref(ref);
if (queryref == "cleanupdir") {
Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
Info("Remove", "%d directories removed", nd);
return 0;
}
if (fQMgr) {
TProofLockPath *lck = 0;
if (fQMgr->LockSession(queryref, &lck) == 0) {
fQMgr->RemoveQuery(queryref, 0);
if (lck) {
gSystem->Unlink(lck->GetName());
SafeDelete(lck);
}
return 0;
}
} else {
Warning("Remove", "query result manager undefined!");
}
Info("Remove",
"query %s could not be removed (unable to lock session)", queryref.Data());
return -1;
}
TTree *TProofLite::GetTreeHeader(TDSet *dset)
{
TTree *t = 0;
if (!dset) {
Error("GetTreeHeader", "undefined TDSet");
return t;
}
dset->Reset();
TDSetElement *e = dset->Next();
Long64_t entries = 0;
TFile *f = 0;
if (!e) {
PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
} else {
f = TFile::Open(e->GetFileName());
t = 0;
if (f) {
t = (TTree*) f->Get(e->GetObjName());
if (t) {
t->SetMaxVirtualSize(0);
t->DropBaskets();
entries = t->GetEntries();
while ((e = dset->Next()) != 0) {
TFile *f1 = TFile::Open(e->GetFileName());
if (f1) {
TTree *t1 = (TTree*) f1->Get(e->GetObjName());
if (t1) {
entries += t1->GetEntries();
delete t1;
}
delete f1;
}
}
t->SetMaxEntryLoop(entries);
}
}
}
return t;
}
void TProofLite::FindUniqueSlaves()
{
fUniqueSlaves->Clear();
fUniqueMonitor->RemoveAll();
fAllUniqueSlaves->Clear();
fAllUniqueMonitor->RemoveAll();
fNonUniqueMasters->Clear();
if (fActiveSlaves->GetSize() <= 0) return;
TSlave *wrk = dynamic_cast<TSlave*>(fActiveSlaves->First());
if (!wrk) {
Error("FindUniqueSlaves", "first object in fActiveSlaves not a TSlave: embarrasing!");
return;
}
fUniqueSlaves->Add(wrk);
fAllUniqueSlaves->Add(wrk);
fUniqueMonitor->Add(wrk->GetSocket());
fAllUniqueMonitor->Add(wrk->GetSocket());
fUniqueMonitor->DeActivateAll();
fAllUniqueMonitor->DeActivateAll();
}
void TProofLite::ShowData()
{
if (!IsValid()) return;
TList *wrki = GetListOfSlaveInfos();
TSlaveInfo *wi = 0;
TIter nxwi(wrki);
while ((wi = (TSlaveInfo *) nxwi())) {
ShowDataDir(wi->GetDataDir());
}
}
void TProofLite::ShowDataDir(const char *dirname)
{
if (!dirname) return;
FileStat_t dirst;
if (gSystem->GetPathInfo(dirname, dirst) != 0) return;
if (!R_ISDIR(dirst.fMode)) return;
void *dirp = gSystem->OpenDirectory(dirname);
TString fn;
const char *ent = 0;
while ((ent = gSystem->GetDirEntry(dirp))) {
fn.Form("%s/%s", dirname, ent);
FileStat_t st;
if (gSystem->GetPathInfo(fn.Data(), st) == 0) {
if (R_ISREG(st.fMode)) {
Printf("lite:0| %s", fn.Data());
} else if (R_ISREG(st.fMode)) {
ShowDataDir(fn.Data());
}
}
}
return;
}