28#include "RConfigure.h"
102 if (!
gSystem->Getenv(
"ROOTPROOFCLIENT"))
gSystem->Setenv(
"ROOTPROOFCLIENT",
"");
105 fUrl.SetProtocol(
"proof");
106 fUrl.SetHost(
"__lite__");
110 if (strlen(
fUrl.GetUser()) <= 0) {
130 Int_t port =
gEnv->GetValue(
"ProofServ.XpdPort", 1093);
135 Init(url, conffile, confdir, loglevel, alias);
139 if (!
gROOT->GetListOfProofs()->FindObject(
this))
140 gROOT->GetListOfProofs()->Add(
this);
155 const char *confdir,
Int_t loglevel,
const char *)
166 if (!conffile || !conffile[0])
168 if (!confdir || !confdir[0])
177 Error(
"Init",
"could not create/assert sandbox for this session");
182 TString sockpathdir =
gEnv->GetValue(
"ProofLite.SockPathDir",
gSystem->TempDirectory());
183 if (sockpathdir.
IsNull()) sockpathdir =
gSystem->TempDirectory();
184 if (sockpathdir(sockpathdir.
Length()-1) ==
'/') sockpathdir.
Remove(sockpathdir.
Length()-1);
188 Error(
"Init",
"Unix socket path '%s' is too long (%d bytes):",
190 Error(
"Init",
"use 'ProofLite.SockPathDir' to create it under a directory different"
191 " from '%s'", sockpathdir.
Data());
221 TString dynconf =
gEnv->GetValue(
"Proof.SimulateDynamicStartup",
"");
222 if (dynconf.
Length() > 0) {
263 Int_t maxq =
gEnv->GetValue(
"ProofLite.MaxQueriesSaved", 10);
264 if (
fQMgr &&
fQMgr->ApplyMaxQueries(maxq) != 0)
265 Warning(
"Init",
"problems applying fMaxQueries");
268 Warning(
"Init",
"problems initializing the dataset manager");
319 if (
gEnv->GetValue(
"ProofLite.ForkStartup", 0) != 0) {
323 Warning(
"Init",
"fork-based workers startup is not available on Windows - ignoring");
331 TString globpack =
gEnv->GetValue(
"Proof.GlobalPackageDirs",
"");
335 Info(
"Init",
" %d global package directories registered", nglb);
340 Error(
"Init",
"problems setting up workers");
366 gROOT->GetListOfSockets()->Add(
this);
411 TString sysname =
"system.rootrc";
421 ::Error(
"TProofLite::GetNumberOfWorkers",
422 "PROOF-Lite disabled by the system administrator: sorry!");
429 if (url && strlen(url)) {
433 nw.
Remove(0, in + strlen(
"workers="));
437 if ((nWorkers = nw.
Atoi()) <= 0) {
438 ::Warning(
"TProofLite::GetNumberOfWorkers",
439 "number of workers specified by 'workers='"
440 " is non-positive: using default");
453 if ((nWorkers = nw.
Atoi()) == 0) {
454 ::Warning(
"TProofLite::GetNumberOfWorkers",
455 "number of workers specified by 'workers='"
456 " is non-positive: using default");
462 nWorkers =
gEnv->GetValue(
"ProofLite.Workers", -1);
471 if (notify) notify =
kFALSE;
477 ::Warning(
"TProofLite::GetNumberOfWorkers",
"number of PROOF-Lite workers limited by"
478 " the system administrator to %d",
fgWrksMax);
500 Error(
"SetupWorkers",
501 "unable to create server socket for internal communications");
512 Int_t nWrksDone = 0, nWrksTot = -1;
520 for (; ord < nWrksTot; ord++) {
524 fullord.
Form(
"%s.%d", o, ord);
534 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
539 Warning(
"SetupWorkers",
"standard startup: workers already started");
547 for (; ord < nWrksTot; ord++) {
551 fullord.
Form(
"%s.%d", o, ord + 1);
552 if (!clones.
IsNull()) clones +=
" ";
560 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
574 Int_t to =
gEnv->GetValue(
"ProofLite.StartupTimeOut", 5) * 1000;
575 while (started.
GetSize() > 0 && nSelects < nWrksTot) {
582 if (xs == (
TSocket *) -1)
continue;
589 if (s->
Recv(msg) < 0) {
590 Warning(
"SetupWorkers",
"problems receiving message from accepted socket!");
607 gROOT->GetListOfSockets()->Remove(s);
625 if (startedWorkers) startedWorkers->
Add(wrk);
627 NotifyStartUp(
"Setting up worker servers", ++nWrksDone, nWrksTot);
634 Warning(
"SetupWorkers",
"received empty message from accepted socket!");
647 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
672 snprintf(msg, 512,
"%s: OK (%d workers) \n",
675 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
676 action, done, tot, frac);
678 fprintf(stderr,
"%s", msg);
687 if (!ord || strlen(ord) <= 0) {
688 Error(
"SetProofServEnv",
"ordinal string undefined");
694 FILE *frc = fopen(rcfile.
Data(),
"w");
696 Error(
"SetProofServEnv",
"cannot open rc file %s", rcfile.
Data());
701 fprintf(frc,
"# The session working dir\n");
702 fprintf(frc,
"ProofServ.SessionDir: %s/worker-%s\n",
fWorkDir.Data(), ord);
705 fprintf(frc,
"# Session tag\n");
706 fprintf(frc,
"ProofServ.SessionTag: %s\n",
GetName());
709 fprintf(frc,
"# Proof Log/Debug level\n");
710 fprintf(frc,
"Proof.DebugLevel: %d\n",
gDebug);
713 fprintf(frc,
"# Ordinal number\n");
714 fprintf(frc,
"ProofServ.Ordinal: %s\n", ord);
717 fprintf(frc,
"# ROOT Version tag\n");
718 fprintf(frc,
"ProofServ.RootVersionTag: %s\n",
gROOT->GetVersion());
723 Warning(
"SetProofServEnv",
"problems getting sandbox string for worker");
724 fprintf(frc,
"# Users sandbox\n");
725 fprintf(frc,
"ProofServ.Sandbox: %s\n", sandbox.
Data());
728 fprintf(frc,
"# Users cache\n");
729 fprintf(frc,
"ProofServ.CacheDir: %s\n",
fCacheDir.Data());
732 fprintf(frc,
"# Users packages\n");
733 fprintf(frc,
"ProofServ.PackageDir: %s\n",
fPackMgr->GetDir());
736 fprintf(frc,
"# Server image\n");
737 fprintf(frc,
"ProofServ.Image: %s\n",
fImage.Data());
740 fprintf(frc,
"# Open socket\n");
741 fprintf(frc,
"ProofServ.OpenSock: %s\n",
fSockPath.Data());
744 fprintf(frc,
"# Client Protocol\n");
752 FILE *fenv = fopen(envfile.
Data(),
"w");
754 Error(
"SetProofServEnv",
"cannot open env file %s", envfile.
Data());
762 fprintf(fenv,
"export TMPDIR=%s\n",
gSystem->TempDirectory());
765 fprintf(fenv,
"export ROOTPROOFLOGFILE=%s\n", logfile.
Data());
767 fprintf(fenv,
"export ROOTRCFILE=%s\n", rcfile.
Data());
769 fprintf(fenv,
"export ROOTVERSIONTAG=%s\n",
gROOT->GetVersion());
771 fprintf(fenv,
"export ROOTPROOFLITE=%d\n",
fNWorkers);
773 fprintf(fenv,
"export LOCALDATASERVER=\"file://\"\n");
779 while ((env = (
TNamed *)nxenv())) {
782 fprintf(fenv,
"export %s=%s\n", env->
GetName(), senv.
Data());
783 if (namelist.
Length() > 0)
787 fprintf(fenv,
"export PROOF_ALLVARS=%s\n", namelist.
Data());
804 if (!logfile)
return;
807 if (s.
Contains(
"<logfilewrk>") && logfile) {
837 var =
dynamic_cast<TNamed *
>(envVars->
FindObject(
"PROOF_SLAVE_CPUPIN_ORDER"));
838 if (var) cpuPinList = var->
GetTitle();
850 if (cpuPinList.
IsNull() || (cpuPinList ==
"*")) {
862 n = (tok.
Atoi() % nCpus);
886 TString packdir =
gEnv->GetValue(
"Proof.PackageDir",
"");
909 Int_t subpath =
gEnv->GetValue(
"ProofLite.SubPath", 1);
913 sessdir =
gSystem->WorkingDirectory();
929 lastsess.
Form(
"%s/last-lite-session", sessdir.
Data());
956 Printf(
"*** PROOF-Lite cluster %s(sequential mode)", ord.
Data());
961 Int_t port =
gEnv->GetValue(
"ProofServ.XpdPort", 1093);
962 if (port > -1) url.
Form(
"%s:%d",
gSystem->HostName(), port);
970 if (
gSystem->Getenv(
"ROOTVERSIONTAG"))
972 Printf(
"ROOT version|rev|tag: %s", ver.
Data());
973 Printf(
"Architecture-Compiler: %s-%s",
gSystem->GetBuildArch(),
974 gSystem->GetBuildCompilerVersion());
976 Printf(
"Working directory: %s",
gSystem->WorkingDirectory());
988 Printf(
"List of workers:");
990 while (
TSlave* sl =
dynamic_cast<TSlave*
>(nextslave())) {
1007 fQMgr->IncrementSeqNum();
1008 seqnum =
fQMgr->SeqNum();
1013 fPlayer->GetInputList(), nent,
1033 Info(
"SetQueryRunning",
"starting query: %d", pq->
GetSeqNum());
1037 fPackMgr->GetEnabledPackages(parlist);
1061 Info(
"DrawSelect",
"not idle, asynchronous Draw not supported");
1091 if (opt.Contains(
"fb=") || opt.Contains(
"feedback="))
SetFeedback(opt, optfb, 0);
1098 Info(
"Process",
"asynchronous mode not yet supported in PROOF-Lite");
1104 Info(
"Process",
"not idle: cannot accept queries");
1115 Error(
"Process",
"invalid sesion or query-result manager undefined!");
1121 if (!
fPlayer->GetInputList()->FindObject(
"PROOF_MaxSlavesPerNode"))
1132 Error(
"Process",
"from AssertDataSet: %s", emsg.
Data());
1136 Error(
"Process",
"no files to process!");
1139 }
else if (hasNoData) {
1148 emsg.
Form(
"dataset manager not initialized!");
1153 emsg.
Form(
"requested dataset '%s' does not exists", dsn.
Data());
1157 fPlayer->GetInputList()->Remove(ftp);
1160 fcmap->
SetName(
"PROOF_FilesToProcess");
1161 fPlayer->GetInputList()->Add(fcmap);
1166 Error(
"HandleProcess",
"%s", emsg.
Data());
1173 TString selec(selector), varexp, selection, objname;
1175 if (selec.BeginsWith(
"draw:")) {
1179 if (
fPlayer->GetDrawArgs(varexp, selection, opt, selec, objname) != 0) {
1180 Error(
"Process",
"draw query: error parsing arguments '%s', '%s', '%s'",
1181 varexp.
Data(), selection.
Data(), opt.Data());
1192 (!strcmp(
gEnv->GetValue(
"ProofLite.AutoSaveQueries",
"off"),
"on")) ?
kTRUE :
kFALSE;
1195 Int_t memqueries =
gEnv->GetValue(
"ProofLite.MaxQueriesMemory", 1);
1199 if (
fQMgr->Queries()) {
1200 if (memqueries != 0)
fQMgr->Queries()->Add(pq);
1201 if (memqueries >= 0 &&
fQMgr->Queries()->GetSize() > memqueries) {
1204 fQMgr->Queries()->Remove(qfst);
1209 if (savequeries)
fQMgr->SaveQuery(pq);
1220 if (savequeries)
fQMgr->SaveQuery(pq);
1222 fQMgr->IncrementDrawQueries();
1226 if (!
gROOT->IsBatch()) {
1254 TObject *o =
fPlayer->GetInputList()->FindObject(
"PROOF_QueryTag");
1255 if (o)
fPlayer->GetInputList()->Remove(o);
1275 TList *startedWorkers = 0;
1277 startedWorkers =
new TList;
1287 if (selector && strlen(selector)) {
1293 rv =
fPlayer->DrawSelect(dset, varexp, selection, opt,
nentries, first);
1319 gSystem->AddSignalHandler(sh);
1326 Emit(
"StopProcess(Bool_t)", abort);
1346 TNamed *psr = (
TNamed *)
fPlayer->GetOutputList()->FindObject(
"PROOFSERV_RegisterDataSet");
1351 Warning(
"ProcessNext",
"problems registering produced datasets: %s", err.
Data());
1352 fPlayer->GetOutputList()->Remove(psr);
1361 if (savequeries)
fQMgr->SaveQuery(pq, -1);
1367 if (
fPlayer->GetListOfResults())
fPlayer->GetListOfResults()->Remove(pq);
1373 if (!(pq->
IsDraw()) && memqueries >= 0) {
1376 if (pqr)
fQMgr->Queries()->Add(pqr);
1378 fQMgr->Queries()->Remove(pq);
1383 msg.
Form(
"Lite-0: all output objects have been merged ");
1384 fprintf(stderr,
"%s\n", msg.
Data());
1400 if (sst) rv = sst->
GetVal();
1426 TString dsm =
gEnv->GetValue(
"Proof.DataSetManager",
"");
1429 if (
gROOT->GetPluginManager()) {
1431 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
1432 if (
h &&
h->LoadPlugin() != -1) {
1436 user.Data(), dsm.
Data()));
1441 Warning(
"InitDataSetManager",
"dataset manager plug-in initialization failed");
1448 TString dsetdir =
gEnv->GetValue(
"ProofServ.DataSetDir",
"");
1456 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
1457 if (
h &&
h->LoadPlugin() == -1)
h = 0;
1462 group.Data(), user.Data(),
1466 Warning(
"InitDataSetManager",
"default dataset manager plug-in initialization failed");
1472 Info(
"InitDataSetManager",
"datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
1481 TString dsReqCfg =
gEnv->GetValue(
"Proof.DataSetStagingRequests",
"");
1482 if (!dsReqCfg.
IsNull()) {
1483 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
1485 if (reReqDir.
Match(dsReqCfg) == 5) {
1487 dsDirFmt.
Form(
"dir:%s perms:open", reReqDir[3].Data());
1490 Warning(
"InitDataSetManager",
"failed init of dataset staging requests repository");
1494 Warning(
"InitDataSetManager",
"specify, with [dir:]<path>, a valid path for staging requests");
1497 Warning(
"InitDataSetManager",
"no repository for staging requests available");
1524 if (!file || strlen(file) <= 0) {
1544 if (!macro || !macro[0]) {
1545 Error(
"Load",
"need to specify a macro name");
1551 while (macs.Tokenize(mac, from,
",")) {
1561 gSystem->ExpandPathName(cacheDir);
1562 void * dirp =
gSystem->OpenDirectory(cacheDir);
1565 while ((
e =
gSystem->GetDirEntry(dirp))) {
1576 return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
1598 gSystem->ExpandPathName(cacheDir);
1607 Info(
"CopyMacroToCache",
"enter: names: %s, %s", macro,
name.Data());
1611 Error(
"CopyMacroToCache",
"file %s not found or not readable",
name.Data());
1631 const char *hext[] = {
".h",
".hh",
"" };
1634 while (strlen(hext[
i]) > 0) {
1635 hname =
name(0, dot);
1639 if (!checkedext.
IsNull()) checkedext +=
",";
1640 checkedext += hext[
i];
1644 if (hname.
IsNull() && headerRequired == 1) {
1645 Error(
"CopyMacroToCache",
"header file for %s not found or not readable "
1646 "(checked extensions: %s)",
name.Data(), checkedext.
Data());
1649 if (headerRequired < 0)
1659 cachedhname =
Form(
"%s/%s", cacheDir.
Data(),
gSystem->BaseName(hname));
1663 if (md5 && md5cache && (*md5 == *md5cache))
1664 useCacheBinaries =
kTRUE;
1669 if (md5h && md5hcache && (*md5h != *md5hcache))
1670 useCacheBinaries =
kFALSE;
1681 dot = vername.
Last(
'.');
1684 vername +=
".binversion";
1688 if (useCacheBinaries) {
1690 FILE *
f = fopen(
Form(
"%s/%s", cacheDir.
Data(), vername.
Data()),
"r");
1696 if (!
f ||
v !=
gROOT->GetVersion() ||
r !=
gROOT->GetGitCommit())
1697 useCacheBinaries =
kFALSE;
1702 dot = binname.
Last(
'.');
1710 if (useCacheBinaries) {
1713 dirp =
gSystem->OpenDirectory(cacheDir);
1716 while ((
e =
gSystem->GetDirEntry(dirp))) {
1717 if (!strncmp(
e, binname.
Data(), binname.
Length()) ||
1721 if (!
gSystem->GetPathInfo(fncache, stcache)) {
1729 Info(
"CopyMacroToCache",
1730 "retrieving %s from cache", fncache.
Data());
1744 Error(
"CopyMacroToCache",
"could not create a selector from %s", macro);
1753 dirp =
gSystem->OpenDirectory(
".");
1756 while ((
e =
gSystem->GetDirEntry(dirp))) {
1757 if (!strncmp(
e, binname.
Data(), binname.
Length()) ||
1760 if (!
gSystem->GetPathInfo(
e, stlocal)) {
1769 Info(
"CopyMacroToCache",
"caching %s ...",
e);
1783 FILE *
f = fopen(
Form(
"%s/%s", cacheDir.
Data(), vername.
Data()),
"w");
1785 fputs(
gROOT->GetVersion(),
f);
1786 fputs(
Form(
"\n%s",
gROOT->GetGitCommit()),
f);
1792 if (!useCacheBinaries) {
1795 Info(
"CopyMacroToCache",
"caching %s ...",
name.Data());
1800 Info(
"CopyMacroToCache",
"caching %s ...", hname.
Data());
1823 Int_t maxold =
gEnv->GetValue(
"Proof.MaxOldSessions", 1);
1825 if (maxold < 0)
return 0;
1831 void *dirp =
gSystem->OpenDirectory(sandbox);
1834 while ((
e =
gSystem->GetDirEntry(dirp))) {
1835 if (!strncmp(
e,
"session-", 8) && !strstr(
e,
GetName())) {
1850 while (olddirs->
GetSize() > maxold) {
1851 if (notify &&
gDebug > 0)
1852 Printf(
"Cleaning sandbox at: %s", sandbox.
Data());
1878 Int_t ntot = 0, npre = 0, ndraw= 0;
1886 fQMgr->ScanPreviousQueries(qdir);
1888 if (
fQMgr->PreviousQueries()) {
1900 if (
fQMgr->Queries()) {
1911 Warning(
"GetListOfQueries",
"unable to clone TProofQueryResult '%s:%s'",
1917 ndraw =
fQMgr->DrawQueries();
1946 Info(
"RegisterDataSet",
"dataset manager not available");
1950 if (!uri || strlen(uri) <= 0) {
1951 Info(
"RegisterDataSet",
"specifying a dataset name is mandatory");
1959 parallelverify =
kTRUE;
1968 if (!dataSet || dataSet->GetList()->GetSize() == 0) {
1969 Error(
"RegisterDataSet",
"can not save an empty list.");
1976 Info(
"RegisterDataSet",
"dataset registration not allowed");
1981 Error(
"RegisterDataSet",
"dataset was not saved");
1984 if (!parallelverify)
return result;
1989 Error(
"RegisterDataSet",
"problems verifying dataset '%s'", uri);
2005 Info(
"ExistsDataSet",
"dataset manager not available");
2009 if (!dataset || strlen(dataset) <= 0) {
2010 Info(
"SetDataSetTreeName",
"specifying a dataset name is mandatory");
2014 if (!treename || strlen(treename) <= 0) {
2015 Info(
"SetDataSetTreeName",
"specifying a tree name is mandatory");
2034 Info(
"ExistsDataSet",
"dataset manager not available");
2038 if (!uri || strlen(uri) <= 0) {
2039 Error(
"ExistsDataSet",
"dataset name missing");
2053 Info(
"GetDataSets",
"dataset manager not available");
2058 if (srvex && strlen(srvex) > 0) {
2073 Info(
"GetDataSet",
"dataset manager not available");
2087 Info(
"GetDataSet",
"dataset manager not available");
2091 if (!uri || strlen(uri) <= 0) {
2092 Info(
"GetDataSet",
"specifying a dataset name is mandatory");
2107 Info(
"RemoveDataSet",
"dataset manager not available");
2117 Info(
"RemoveDataSet",
"dataset creation / removal not allowed");
2134 Error(
"RequestStagingDataSet",
"invalid dataset specified");
2139 Error(
"RequestStagingDataSet",
"no dataset staging request repository available");
2143 TString dsUser, dsGroup, dsName, dsTree;
2147 while (
fReInvalid->Substitute(validUri,
"_")) {}
2151 Warning(
"RequestStagingDataSet",
"staging of %s already requested", dataset);
2158 Error(
"RequestStagingDataSet",
"empty dataset or no dataset returned");
2177 if (
fDataSetStgRepo->WriteDataSet(dsGroup, dsUser, dsName, fc) == 0) {
2179 Error(
"RequestStagingDataSet",
"can't register staging request for %s", dataset);
2184 Info(
"RequestStagingDataSet",
"Staging request registered for %s", dataset);
2198 Error(
"CancelStagingDataSet",
"invalid dataset specified");
2203 Error(
"CancelStagingDataSet",
"no dataset staging request repository available");
2209 while (
fReInvalid->Substitute(validUri,
"_")) {}
2226 Error(
"GetStagingStatusDataSet",
"invalid dataset specified");
2231 Error(
"GetStagingStatusDataSet",
"no dataset staging request repository available");
2237 while (
fReInvalid->Substitute(validUri,
"_")) {}
2243 Info(
"GetStagingStatusDataSet",
"no pending staging request for %s", dataset);
2258 Info(
"VerifyDataSet",
"dataset manager not available");
2269 Info(
"VerifyDataSet",
"dataset verification not allowed");
2317 if (dataFile.
Length() > 0) {
2324 if (!
gSystem->AccessPathName(dst))
2327 if (
gSystem->CopyFile(dataFile, dst) != 0)
2328 Warning(
"SendInputDataFile",
"problems copying '%s' to '%s'",
2343 Info(
"Remove",
"Enter: %s, %d", ref, all);
2348 fPlayer->RemoveQueryResult(ref);
2353 if (queryref ==
"cleanupdir") {
2359 Info(
"Remove",
"%d directories removed", nd);
2367 if (
fQMgr->LockSession(queryref, &lck) == 0) {
2370 fQMgr->RemoveQuery(queryref, 0);
2382 Warning(
"Remove",
"query result manager undefined!");
2387 "query %s could not be removed (unable to lock session)", queryref.
Data());
2401 Error(
"GetTreeHeader",
"undefined TDSet");
2410 PDB(kGlobal, 1)
Info(
"GetTreeHeader",
"empty TDSet");
2415 t = (
TTree*)
f->Get(
e->GetObjName());
2422 while ((
e = dset->
Next()) != 0) {
2427 entries +=
t1->GetEntries();
2462 Error(
"FindUniqueSlaves",
"first object in fActiveSlaves not a TSlave: embarrasing!");
2497 if (!dirname)
return;
2500 if (
gSystem->GetPathInfo(dirname, dirst) != 0)
return;
2503 void *dirp =
gSystem->OpenDirectory(dirname);
2505 const char *ent = 0;
2506 while ((ent =
gSystem->GetDirEntry(dirp))) {
2507 fn.
Form(
"%s/%s", dirname, ent);
2541 Info(
"PollForNewWorkers",
"max reached: %d workers started",
fNWorkers);
2555 Int_t nWrksDone = 0, nWrksTot = -1;
2562 for (; ord < nWrksTot; ord++) {
2565 fullord =
Form(
"0.%d", ord);
2575 Info(
"PollForNewWorkers",
"additional worker '%s' started", fullord.
Data());
2578 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
2591 Int_t to =
gEnv->GetValue(
"ProofLite.StartupTimeOut", 5) * 1000;
2592 while (started.
GetSize() > 0 && nSelects < nWrksTot) {
2599 if (xs == (
TSocket *) -1)
continue;
2606 if (s->
Recv(msg) < 0) {
2607 Warning(
"PollForNewWorkers",
"problems receiving message from accepted socket!");
2623 gROOT->GetListOfSockets()->Remove(s);
2641 addedWorkers->
Add(wrk);
2643 NotifyStartUp(
"Setting up added worker servers", ++nWrksDone, nWrksTot);
2650 Warning(
"PollForNewWorkers",
"received empty message from accepted socket!");
2678 Info(
"PollForNewWorkers",
"Will send the PROCESS message to selected workers");
2679 fPlayer->JoinProcess(addedWorkers);
2686 TIter naw(addedWorkers);
2687 while ((wrk = (
TSlave *)naw())) {
2691 delete addedWorkers;
R__EXTERN TApplication * gApplication
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t np
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
const Bool_t kSortDescending
R__EXTERN TProofServ * gProofServ
const char *const kPROOF_QueryDir
const char *const kPROOF_DataSetDir
const char *const kPROOF_CacheLockFile
const char *const kPROOF_CacheDir
const char *const kPROOF_PackDir
const char *const kPROOF_ConfFile
const char *const kPROOF_QueryLockFile
R__EXTERN TProof * gProof
const char *const kPROOF_ConfDir
const Int_t kPROOF_Protocol
externTVirtualMutex * gROOTMutex
char * Form(const char *fmt,...)
Formats a string in a circular formatting buffer.
void Printf(const char *fmt,...)
Formats a string in a circular formatting buffer and prints the string.
Bool_t R_ISREG(Int_t mode)
Bool_t R_ISDIR(Int_t mode)
R__EXTERN TSystem * gSystem
#define R__LOCKGUARD(mutex)
void SetName(const char *name)
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
This class implements a data set to be used for PROOF processing.
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
virtual void Reset()
Reset or initialize access to the elements.
TObject * GetEntryList() const
TList * GetListOfElements() const
The TEnv class reads config files, by default named .rootrc.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
virtual Int_t ReadFile(const char *fname, EEnvLevel level)
Read and parse the resource file for a certain level.
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
Int_t Update(Long64_t avgsize=-1)
Update accumulated information about the elements of the collection (e.g.
Long64_t GetNFiles() const
Class describing a generic file including meta information.
Bool_t RemoveUrlAt(Int_t i)
Remove URL at given position. Returns kTRUE on success, kFALSE on error.
A file, usually with extension .root, that stores data and code in the form of serialized objects in ...
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
void Add(TObject *obj) override
TObject * Remove(TObject *obj) override
Remove object from the list.
TObject * Last() const override
Return the last object in the list. Returns 0 when list is empty.
This code implements the MD5 message-digest algorithm.
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
virtual void DeActivateAll()
De-activate all activated sockets.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
const char * GetName() const override
Returns name of object.
const char * GetTitle() const override
Returns title of object.
virtual void SetName(const char *name)
Set the name of the TNamed.
Collectable string class.
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual TObject * FindObject(const char *name) const
Must be redefined in derived classes.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
TObject()
TObject constructor.
@ kInvalidObject
if object ctor succeeded but object should not be used
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex 'this' was created with.
The PROOF package manager contains tools to manage packages.
static Int_t RegisterGlobalPath(const char *paths)
Parse one or more paths as possible sources of packages Returns number of paths added; or -1 in case ...
Named parameter, streamable and storable.
const AParamType & GetVal() const
This class starts a PROOF session on the local machine: no daemons, client and master merged,...
void FindUniqueSlaves() override
Add to the fUniqueSlave list the active slaves that have a unique (user) file system image.
Int_t VerifyDataSet(const char *uri, const char *=0) override
Verify if all files in the specified dataset are available.
TProofLockPath * fQueryLock
Long64_t Process(TDSet *dset, const char *sel, Option_t *o="", Long64_t nent=-1, Long64_t fst=0) override
Process a data set (TDSet) using the specified selector (.C) file.
static Int_t GetNumberOfWorkers(const char *url=0)
Static method to determine the number of workers giving priority to users request.
Int_t PollForNewWorkers() override
Simulate dynamic addition, for test purposes.
TFileCollection * GetDataSet(const char *uri, const char *=0) override
Get a list of TFileInfo objects describing the files of the specified dataset.
Int_t SetDataSetTreeName(const char *dataset, const char *treename) override
Set/Change the name of the default tree.
Int_t CreateSandbox()
Create the sandbox for this session.
Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0) override
Copy the specified macro in the cache directory.
TQueryResultManager * fQMgr
Int_t InitDataSetManager()
Initialize the dataset manager from directives or from defaults Return 0 on success,...
TServerSocket * fServSock
TDataSetManagerFile * fDataSetStgRepo
Int_t SetProofServEnv(const char *ord)
Create environment files for worker 'ord'.
void ClearDataSetCache(const char *dataset=0) override
Clear the content of the dataset cache, if any (matching 'dataset', if defined).
Int_t RemoveDataSet(const char *uri, const char *=0) override
Remove the specified dataset from the PROOF cluster.
void ShowData() override
List contents of the data directory in the sandbox.
void NotifyStartUp(const char *action, Int_t done, Int_t tot)
Notify setting-up operation message.
void ShowDataSets(const char *uri="", const char *=0) override
Shows datasets in locations that match the uri By default shows the user's datasets and global ones.
void SetQueryRunning(TProofQueryResult *pq)
Set query in running state.
Int_t CleanupSandbox()
Remove old sessions dirs keep at most 'Proof.MaxOldSessions' (default 10)
TDataSetManager * fDataSetManager
void ClearCache(const char *file=0) override
Remove files from all file caches.
Int_t Remove(const char *ref, Bool_t all)
Handle remove request.
Int_t Init(const char *masterurl, const char *conffile, const char *confdir, Int_t loglevel, const char *alias=0)
Start the PROOF environment.
void ShowCache(Bool_t all=kFALSE) override
List contents of file cache.
Bool_t RequestStagingDataSet(const char *dataset) override
Allows users to request staging of a particular dataset.
TList * GetListOfQueries(Option_t *opt="") override
Get the list of queries.
void ShowDataDir(const char *dirname)
List contents of the data directory 'dirname'.
void ResolveKeywords(TString &s, const char *ord, const char *logfile)
Resolve some keywords in 's' <logfilewrk>, <user>, <rootsys>, <cpupin>
TTree * GetTreeHeader(TDSet *tdset) override
Creates a tree header (a tree with nonexisting files) object for the DataSet.
void Print(Option_t *option="") const override
Print status of PROOF-Lite cluster.
TMap * GetDataSets(const char *uri="", const char *=0) override
lists all datasets that match given uri
Bool_t CancelStagingDataSet(const char *dataset) override
Cancels a dataset staging request.
Int_t fDynamicStartupNMax
void SendInputDataFile() override
Make sure that the input data objects are available to the workers in a dedicated file in the cache; ...
Int_t CopyMacroToCache(const char *macro, Int_t headerRequired=0, TSelector **selector=0, Int_t opt=0, TList *wrks=0)
Copy a macro, and its possible associated .h[h] file, to the cache directory, from where the workers ...
Int_t fDynamicStartupStep
Bool_t RegisterDataSet(const char *dsName, TFileCollection *ds, const char *opt="") override
Register the 'dataSet' on the cluster under the current user, group and the given 'dataSetName'.
Bool_t ExistsDataSet(const char *uri) override
Returns kTRUE if 'dataset' described by 'uri' exists, kFALSE otherwise.
TProofQueryResult * MakeQueryResult(Long64_t nent, const char *opt, Long64_t fst, TDSet *dset, const char *selec)
Create a TProofQueryResult instance for this query.
TFileCollection * GetStagingStatusDataSet(const char *dataset) override
Obtains a TFileCollection showing the staging status of the specified dataset.
~TProofLite() override
Destructor.
Int_t SetupWorkers(Int_t opt=0, TList *wrks=0)
Start up PROOF workers.
Long64_t DrawSelect(TDSet *dset, const char *varexp, const char *selection="", Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0) override
Execute the specified drawing action on a data set (TDSet).
TProofLockPath * fCacheLock
void ShowDataSetCache(const char *dataset=0) override
Display the content of the dataset cache, if any (matching 'dataset', if defined).
Int_t Unlock()
Unlock the directory.
Int_t Lock()
Locks the directory.
The PROOF manager interacts with the PROOF server coordinator to create or destroy a PROOF session,...
TQueryResult version adapted to PROOF neeeds.
void SetRunning(Int_t startlog, const char *par, Int_t nwrks)
Call when running starts.
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e)
Register TFileCollections in 'out' as datasets according to the rules in 'in'.
static TMap * GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
Get a map {server-name, list-of-files} for collection 'fc' to be used in TPacketizerFile.
Int_t GetNumberOfInactiveSlaves() const
Return number of inactive slaves, i.e.
void ActivateAsyncInput()
Activate the a-sync input handler.
TMonitor * fCurrentMonitor
TMonitor * fAllUniqueMonitor
void SetFeedback(TString &opt, TString &optfb, Int_t action)
Extract from opt in optfb information about wanted feedback settings.
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
TList * fTerminatedSlaveInfos
TMonitor * fUniqueMonitor
Bool_t fProgressDialogStarted
Int_t GetClientProtocol() const
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool=kFALSE)
Set up packages, loaded macros, include and lib paths ...
TList * fNonUniqueMasters
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action)
Extract from opt information about output handling settings.
TVirtualProofPlayer * fPlayer
Bool_t IsParallel() const
void AskParallel()
Ask the for the number of parallel slaves.
void SetRunStatus(ERunStatus rst)
Long64_t fLastPollWorkers_s
TQueryResult * GetQueryResult(const char *ref=0)
Return pointer to the full TQueryResult instance owned by the player and referenced by 'ref'.
Int_t GetNumberOfSlaves() const
Return number of slaves as described in the config file.
void SetPerfTree(const char *pf="perftree.root", Bool_t withWrks=kFALSE)
Enable/Disable saving of the performance tree.
Int_t AssertPath(const char *path, Bool_t writable)
Make sure that 'path' exists; if 'writable' is kTRUE, make also sure that the path is writable.
Int_t VerifyDataSetParallel(const char *uri, const char *optStr)
Internal function for parallel dataset verification used TProof::VerifyDataSet and TProofLite::Verify...
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Float_t GetCpuTime() const
Int_t SendInitialState()
Transfer the initial (i.e.
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
friend class TDataSetManager
void PrepareInputDataFile(TString &dataFile)
Prepare the file with the input data objects to be sent the master; the objects are taken from the de...
void InitMembers()
Default initializations.
void SetParameter(const char *par, const char *value)
Set input list parameter.
void SetActive(Bool_t=kTRUE)
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Long64_t GetBytesRead() const
void UpdateDialog()
Final update of the progress dialog.
void AskStatistics()
Ask the for the statistics of the slaves.
TPluginHandler * fProgressDialog
void ParseConfigField(const char *config)
The config file field may contain special instructions which need to be parsed at the beginning,...
Int_t GetNumberOfBadSlaves() const
Return number of bad slaves.
TMonitor * fActiveMonitor
Int_t GoParallel(Int_t nodes, Bool_t accept=kFALSE, Bool_t random=kFALSE)
Go in parallel mode with at most "nodes" slaves.
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
TSignalHandler * fIntHandler
Int_t SavePerfTree(const char *pf=0, const char *qref=0)
Save performance information from TPerfStats to file 'pf'.
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client.
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
TList * fAvailablePackages
Int_t GetNumberOfActiveSlaves() const
Return number of active slaves, i.e.
TSlave * CreateSlave(const char *url, const char *ord, Int_t perf, const char *image, const char *workdir)
Create a new TSlave of type TSlave::kSlave.
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.
friend class TProofInputHandler
Float_t GetRealTime() const
Int_t SendGroupView()
Send to all active slaves servers the current slave group size and their unique id.
Int_t GetLogLevel() const
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo's. In case of error return 0.
TProofMgr::EServType fServType
static TList * fgProofEnvList
const char * GetUser() const
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
Bool_t fSendGroupView
list returned by kPROOF_GETSLAVEINFO
void ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst, Long64_t ent)
Reset progress dialog.
static const TList * GetEnvVars()
Get environemnt variables.
TList * fEnabledPackagesOnCluster
Int_t GetSandbox(TString &sb, Bool_t assert=kFALSE, const char *rc=0)
Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
Int_t GetNumberOfUniqueSlaves() const
Return number of unique slaves, i.e.
void AddInput(TObject *obj)
Add objects that might be needed during the processing of the selector (see Process()).
TProofOutputList fOutputList
void QueryResultReady(const char *ref)
Notify availability of a query result.
void Emit(const char *signal, const T &arg)
Activate signal with single parameter.
Class managing the query-result area.
A container class for query results.
Long64_t GetEntries() const
void SetTermTime(Float_t termtime)
TQueryResult * CloneInfo()
Return an instance of TQueryResult containing only the local info fields, i.e.
virtual void SetProcessInfo(Long64_t ent, Float_t cpu=0., Long64_t siz=-1, Float_t inittime=0., Float_t proctime=0.)
Set processing info.
virtual void SetOutputList(TList *out, bool adopt=true)
Set / change the output list.
Int_t fSeqNum
query unique sequential number
static const char * GetMacroPath()
Get macro search path. Static utility function.
static const TString & GetRootSys()
Get the rootsys directory in the installation. Static utility function.
static const TString & GetEtcDir()
Get the sysconfig directory in the installation. Static utility function.
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
static TSelector * GetSelector(const char *filename)
The code in filename is loaded (interpreted or compiled, see below), filename must contain a valid cl...
This class implements server sockets.
const char * GetDataDir() const
TSocket * GetSocket() const
void SetSocket(TSocket *s)
void SetInputHandler(TFileHandler *ih)
Adopt and register input handler for this slave.
virtual Int_t SetupServ(Int_t stype, const char *conffile)
Init a PROOF slave object.
virtual Bool_t IsValid() const
This class implements client sockets.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual Bool_t IsValid() const
A sorted doubly linked list.
void Add(TObject *obj) override
Add object in sorted list.
TString & Insert(Ssiz_t pos, const char *s)
Int_t Atoi() const
Return integer value of string.
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
const char * Data() const
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
TString & ReplaceAll(const TString &s1, const TString &s2)
Ssiz_t Last(char c) const
Find last occurrence of a character c.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Int_t CountChar(Int_t c) const
Return number of times character c occurs in the string.
TString & Remove(Ssiz_t pos)
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
A TTree represents a columnar dataset.
virtual void SetMaxEntryLoop(Long64_t maxev=kMaxEntries)
virtual void SetMaxVirtualSize(Long64_t size=0)
virtual void DropBaskets()
Remove some baskets from memory.
virtual Long64_t GetEntries() const
This class represents a RFC 3986 compatible URI.
Bool_t SetFragment(const TString &fragment)
Set fragment component of URI:
const TString GetUri() const
Returns the whole URI - an implementation of chapter 5.3 component recomposition.