44# include <sys/types.h>
51#include "RConfigure.h"
127 a = Getline(
"\nSwitch to asynchronous mode not supported remotely:"
128 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
130 a = Getline(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
131 " any other key to continue: ");
133 if (
a[0] ==
'Q' ||
a[0] ==
'S' ||
a[0] ==
'q' ||
a[0] ==
's') {
135 Info(
"Notify",
"Processing interrupt signal ... %c",
a[0]);
156 fSocket(s), fProof(p)
187 while (myord && otherord) {
188 Int_t myval = atoi(myord);
189 Int_t otherval = atoi(otherord);
190 if (myval < otherval)
return 1;
191 if (myval > otherval)
return -1;
192 myord = strchr(myord,
'.');
194 otherord = strchr(otherord,
'.');
195 if (otherord) otherord++;
197 if (myord)
return -1;
198 if (otherord)
return 1;
295 Error(
"SetMergedWorker",
"all workers have been already merged before!");
308 Error(
"AddWorker",
"all workers have been already assigned to this merger");
362 if( 0 == _cluster->
Length() ) {
363 Error(
"PoDCheckUrl",
"PoD server is not running");
403 if (!masterurl || strlen(masterurl) <= 0) {
406 }
else if (!(strstr(masterurl,
"://"))) {
443 }
else if (
fMaster ==
"prooflite") {
452 Init(masterurl, conffile, confdir, loglevel, alias);
457 if (
Exec(
"gProofServ->GetUser()",
"0",
kTRUE) == 0) {
464 emsg =
"could not find 'const char *' string in macro log";
467 emsg =
"could not retrieve user info";
476 Warning(
"TProof",
"%s: using local default %s", emsg.
Data(), usr.
Data());
486 gROOT->GetListOfSockets()->Remove(mgr);
487 gROOT->GetListOfSockets()->Add(mgr);
492 if (!
gROOT->GetListOfProofs()->FindObject(
this))
493 gROOT->GetListOfProofs()->Add(
this);
512 if (!
gROOT->GetListOfProofs()->FindObject(
this))
513 gROOT->GetListOfProofs()->Add(
this);
620 while (envs.Tokenize(env, from,
",")) {
623 Warning(
"Init",
"request for sending over undefined environemnt variable '%s' - ignoring", env.
Data());
625 if (!envsfound.
IsNull()) envsfound +=
",";
633 Warning(
"Init",
"none of the requested env variables were found: '%s'", envs.Data());
635 Info(
"Init",
"the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.
Data());
724 gROOT->GetListOfProofs()->Remove(
this);
740 Emit(
"CloseWindow()");
752 const char *confdir,
Int_t loglevel,
const char *alias)
776 if (!conffile || !conffile[0])
778 if (!confdir || !confdir[0])
834 Error(
"Init",
"could not create temporary logfile");
836 Error(
"Init",
"could not open temp logfile for reading");
898 if (enableSchemaEvolution) {
901 Info(
"TProof",
"automatic schema evolution in TMessage explicitly disabled");
911 Error(
"Init",
"failure asserting sandbox directory %s", sandbox.
Data());
920 Error(
"Init",
"failure asserting directory %s", packdir.
Data());
925 Info(
"Init",
"package directory set to %s", packdir.
Data());
934 Info(
"Init",
" %d global package directories registered", nglb);
956 GetRC(
"Proof.DynamicStartup", dyn);
996 gROOT->GetListOfSockets()->Add(
this);
1021 }
else if (sb ==
"..") {
1044 const char *cq = (
IsLite()) ?
"\"" :
"";
1045 while (sconf.Tokenize(opt, from,
",")) {
1046 if (opt.
IsNull())
continue;
1053 TString mst, top, sub, wrk, all;
1058 all =
n->GetTitle();
1060 mst =
n->GetTitle();
1062 top =
n->GetTitle();
1064 sub =
n->GetTitle();
1066 wrk =
n->GetTitle();
1068 if (all !=
"" && mst ==
"") mst = all;
1069 if (all !=
"" && top ==
"") top = all;
1070 if (all !=
"" && sub ==
"") sub = all;
1071 if (all !=
"" && wrk ==
"") wrk = all;
1072 if (all !=
"" && all.
BeginsWith(
"valgrind_opts:")) {
1074 Info(
"ParseConfigField",
"valgrind run: resetting 'PROOF_WRAPPERCMD':"
1075 " must be set again for next run , if any");
1079 cmd.
Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1080 TString mstlab(
"NO"), wrklab(
"NO");
1081 Bool_t doMaster = (opt ==
"valgrind" || (opt.
Contains(
"master") &&
1087 if (mst ==
"" || mst.
BeginsWith(
"valgrind_opts:")) {
1089 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), mst.
Data());
1092 }
else if (mst !=
"") {
1098 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1100 if (!opt.
Contains(
"workers"))
return;
1102 if (opt ==
"valgrind" || opt ==
"valgrind=") opt =
"valgrind=workers";
1107 if (top ==
"" || top.
BeginsWith(
"valgrind_opts:")) {
1109 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), top.
Data());
1112 }
else if (top !=
"") {
1118 if (sub ==
"" || sub.
BeginsWith(
"valgrind_opts:")) {
1120 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), sub.
Data());
1123 }
else if (sub !=
"") {
1129 if (wrk ==
"" || wrk.
BeginsWith(
"valgrind_opts:")) {
1131 var.
Form(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.
Data(), wrk.
Data(), cq);
1136 nwrks = opt(inw+1, opt.
Length());
1137 if (!nwrks.
IsDigit()) nwrks =
"2";
1149 }
else if (wrk !=
"") {
1163 Printf(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.
Data());
1165 Printf(
" ---> Starting a debug run with valgrind (workers:%s)", wrklab.
Data());
1167 Printf(
" ---> Please be patient: startup may be VERY slow ...");
1168 Printf(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1169 Printf(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1178 Printf(
"*** Requested IgProf performance profiling ***");
1179 TString addLogExt =
"__igprof.pp__.log";
1180 TString addLogFmt =
"igprof -pk -pp -t proofserv.exe -o %s.%s";
1188 tmp.
Form(addLogFmt.
Data(),
"<logfilemst>", addLogExt.
Data());
1191 tmp.
Form(addLogFmt.
Data(),
"<logfilewrk>", addLogExt.
Data());
1217 if ((
c !=
'+') && ((
c <
'0') || (
c >
'9')))
1236 if (
IsLite() && cpuPin) {
1237 Printf(
"*** Requested CPU pinning ***");
1239 const char *pinCmd =
"taskset -c <cpupin>";
1242 if (ev && (p =
dynamic_cast<TNamed *
>(ev->
FindObject(
"PROOF_SLAVE_WRAPPERCMD")))) {
1248 val.
Form(
"\"%s\"", pinCmd);
1261 if (!inpath || strlen(inpath) <= 0) {
1262 Error(
"AssertPath",
"undefined input path");
1271 Error(
"AssertPath",
"could not create path %s", path.
Data());
1278 Error(
"AssertPath",
"could not make path %s writable", path.
Data());
1297 gROOT->GetListOfSockets()->Remove(mgr);
1298 gROOT->GetListOfSockets()->Add(mgr);
1313 Error(
"AddWorkers",
"AddWorkers can only be called on the master!");
1317 if (!workerList || !(workerList->
GetSize())) {
1318 Error(
"AddWorkers",
"empty list of workers!");
1339 if (!addedWorkers) {
1341 Error(
"AddWorkers",
"cannot create new list for the workers to be added");
1349 while ((to = next())) {
1398 addedWorkers->
Add(slave);
1406 Info(
"AddWorkers",
"worker on host %s created"
1412 m <<
TString(
"Opening connections to workers") << nSlaves
1413 << nSlavesDone << slaveOk;
1427 TIter nxsl(addedWorkers);
1429 while ((sl = (
TSlave *) nxsl())) {
1440 Info(
"AddWorkers",
"worker on host %s finalized"
1450 m <<
TString(
"Setting up worker servers") << nSlaves
1451 << nSlavesDone << slaveOk;
1470 Info(
"AddWorkers",
"will invoke GoMoreParallel()");
1473 Info(
"AddWorkers",
"GoMoreParallel()=%d", nw);
1479 Info(
"AddWorkers",
"will invoke GoParallel()");
1488 Info(
"AddWorkers",
"will invoke SaveWorkerInfo()");
1494 Info(
"AddWorkers",
"will invoke SendParallel()");
1497 if (goMoreParallel &&
fPlayer) {
1502 Info(
"AddWorkers",
"will send the PROCESS message to selected workers");
1510 delete addedWorkers;
1523 if (packs && packs->
GetSize() > 0) {
1526 while ((pck = (
TPair *) nxp())) {
1532 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on added workers");
1537 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on all workers");
1546 delete server_packs;
1555 Info(
"SetupWorkersEnv",
"will invoke Load() on selected workers");
1567 Info(
"SetupWorkersEnv",
"will invoke AddDynamicPath() on selected workers");
1575 Info(
"SetupWorkersEnv",
"will invoke AddIncludePath() on selected workers");
1590 Error(
"RemoveWorkers",
"RemoveWorkers can only be called on the master!");
1600 while ((sl = (
TSlave *) nxsl())) {
1606 if (!(workerList->
GetSize())) {
1607 Error(
"RemoveWorkers",
"The list of workers should not be empty!");
1615 while ((to = next())) {
1617 if (!strcmp(to->
ClassName(),
"TProofNodeInfo")) {
1621 while ((sl = (
TSlave *) nxsl())) {
1629 Warning(
"RemoveWorkers",
"unknown object type: %s - it should be"
1630 " TProofNodeInfo or inheriting from TSlave", to->
ClassName());
1660 TString emsg(
"no resource currently available for this session: please retry later");
1672 Printf(
"Starting master: opening connection ...");
1678 fprintf(stderr,
"Starting master:"
1679 " connection open: setting up server ... \r");
1693 Printf(
"Starting master: OK ");
1699 Error(
"StartSlaves",
1700 "client and remote protocols not compatible (%d and %d)",
1719 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1722 if (slStatus == -99)
1723 Error(
"StartSlaves",
"no resources available or problems setting up workers (check logs)");
1724 else if (slStatus == -98)
1725 Error(
"StartSlaves",
"could not setup output redirection on master");
1727 Error(
"StartSlaves",
"setting up master");
1738 Error(
"StartSlaves",
1739 "failed to setup connection with PROOF master server");
1745 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1751 Printf(
"Starting master: failure");
1756 Printf(
"Starting master: OK ");
1761 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1774 Error(
"StartSlaves",
"failed to create (or connect to) the PROOF master server");
1790 { std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
1799 while ((sl = (
TSlave *)nxs()))
1813 gROOT->GetListOfSockets()->Remove(
this);
1825 gROOT->GetListOfProofs()->Remove(
this);
1844 Int_t perf,
const char *image,
const char *workdir)
1866 const char *image,
const char *msd,
Int_t nwk)
1886 while ((sl = (
TSlave *)next())) {
1913 if (
fImage == sl->fImage) {
1923 TSlave *replace_slave = 0;
1926 if (sl->fImage == sl2->fImage) {
1931 replace_slave = sl2;
1938 Error(
"FindUniqueSlaves",
"TSlave is neither Master nor Slave");
1951 if (replace_slave) {
2043 if (s.
Contains(
"Total MB's processed:")) {
2046 }
else if (s.
Contains(
"Total real time used (s):")) {
2047 s.
ReplaceAll(
"Total real time used (s):",
"");
2049 }
else if (s.
Contains(
"Total CPU time used (s):")) {
2050 s.
ReplaceAll(
"Total CPU time used (s):",
"");
2059 Printf(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2163 Printf(
"+++ Options: \"A\" show all queries known to server");
2164 Printf(
"+++ \"L\" show retrieved queries");
2165 Printf(
"+++ \"F\" full listing of query info");
2166 Printf(
"+++ \"H\" print this menu");
2168 Printf(
"+++ (case insensitive)");
2170 Printf(
"+++ Use Retrieve(<#>) to retrieve the full"
2171 " query results from the master");
2172 Printf(
"+++ e.g. Retrieve(8)");
2202 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2204 while ((pq = nxq()))
2211 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2218 Printf(
"+++ Queries available locally: %d", listlocal->
GetSize());
2219 TIter nxlq(listlocal);
2220 while ((pq = nxlq()))
2236 while (
TSlave *sl =
dynamic_cast<TSlave*
>(nextSlave())) {
2246 if (submasters.
GetSize() > 0) {
2254 EmitVA(
"IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2257 Info(
"IsDataReady",
"%lld / %lld (%s)",
2258 bytesready, totalbytes,
fDataReady?
"READY":
"NOT READY");
2276 if (slaves->
GetSize() == 0)
return;
2281 while ((sl = (
TSlave *)next())) {
2300 Int_t nparallel = 0;
2301 while (
TSlave* sl =
dynamic_cast<TSlave*
>(nextSlave()))
2302 if (sl->GetParallel() >= 0)
2303 nparallel += sl->GetParallel();
2326 while ((slave = (
TSlave *) next()) != 0) {
2336 while ((activeslave = (
TSlave *) nextactive())) {
2345 while ((badslave = (
TSlave *) nextbad())) {
2354 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2362 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2367 Error(
"GetSlaveInfo",
"TSlave is neither Master nor Slave");
2388 while ((sl = (
TSlave*) next())) {
2418 if (workers->
GetSize() == 0)
return 0;
2421 TIter next(workers);
2424 while ((wrk = (
TSlave *)next())) {
2427 MarkBad(wrk,
"could not send group priority");
2469 if (!slaves || slaves->
GetSize() == 0)
return 0;
2475 while ((sl = (
TSlave *)next())) {
2478 MarkBad(sl,
"could not broadcast request");
2561 if (slaves->
GetSize() == 0)
return 0;
2567 while ((sl = (
TSlave *)next())) {
2570 MarkBad(sl,
"could not send broadcast-raw request");
2604 if (wrks->
GetSize() == 0)
return 0;
2610 while ((wrk = (
TSlave *)next())) {
2613 Error(
"BroadcastFile",
2614 "problems sending file to worker %s (%s)",
2674 rc =
Collect(mon, timeout, endtype, deactonfail);
2700 while ((sl = (
TSlave*) next())) {
2705 rc =
Collect(mon, timeout, endtype, deactonfail);
2732 rc =
Collect(mon, timeout, endtype, deactonfail);
2751 Info(
"Collect",
">>>>>> Entering collect responses #%04d", collectId);
2781 int cnt = 0, rc = 0;
2786 Info(
"Collect",
"#%04d: active: %d", collectId, mon->
GetActive());
2798 while ((nact = mon->
GetActive(sto)) && (nto < 0 || nto > 0)) {
2804 if (al && al->
GetSize() > 0) {
2805 Info(
"Collect",
" %d node(s) still active:", al->
GetSize());
2808 while ((xs = (
TSocket *)nxs())) {
2830 Info(
"Collect",
"#%04d: now active: %d", collectId, mon->
GetActive());
2835 Info(
"Collect",
"Will invoke Select() #%04d", collectId);
2838 if (s && s != (
TSocket *)(-1)) {
2841 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2846 Info(
"Collect",
"#%04d: deactivating %p (active: %d, %p)", collectId,
2850 }
else if (rc == 2) {
2857 Info(
"Collect",
"save monitor: deactivating %p (active: %d, %p)",
2875 if (s == (
TSocket *)(-1) && nto > 0)
2887 while (mxws && (wrk = (
TSlave *) nxwr())) {
2892 Info(
"Collect",
"worker %s was asked to send its output to master",
2917 if (al && al->
GetSize() > 0) {
2919 Info(
"Collect",
" %d node(s) went in timeout:", al->
GetSize());
2922 while ((xs = (
TSocket *)nxs())) {
2951 Info(
"Collect",
"<<<<<< Exiting collect responses #%04d", collectId);
2968 Error(
"PollForNewWorkers",
"Can't invoke: not on a master -- should not happen!");
2972 Error(
"PollForNewWorkers",
"No ProofServ available -- should not happen!");
2982 TIter next(reqWorkers);
2993 while (( sl =
dynamic_cast<TSlave *
>(nextInner()) )) {
3000 if (found)
delete ni;
3002 newWorkers->
Add(ni);
3004 Info(
"PollForNewWorkers",
"New worker found: %s:%s",
3014 if (nNewWorkers > 0) {
3016 Info(
"PollForNewWorkers",
"Requesting to add %d new worker(s)", newWorkers->
GetEntries());
3019 Error(
"PollForNewWorkers",
"Call to AddWorkers() failed (got %d < 0)", rv);
3026 Info(
"PollForNewWorkers",
"No new worker found");
3055 if ((recvrc = s->
Recv(mess)) < 0) {
3057 Info(
"CollectInputFrom",
"%p: got %d from Recv()", s, recvrc);
3068 MarkBad(s,
"problems receiving a message in TProof::CollectInputFrom(...)");
3074 MarkBad(s,
"undefined message in TProof::CollectInputFrom(...)");
3082 if (rc == 1 && (endtype >= 0) && (
what != endtype))
3102 Warning(
"HandleInputMessage",
"given an empty message or undefined worker");
3108 Warning(
"HandleInputMessage",
"worker socket is undefined");
3135 MarkBad(s,
"received kPROOF_FATAL");
3148 Info(
"HandleInputMessage",
"received kPROOF_STOP from %s: disabling any further collection this worker",
3180 Info(
"HandleInputMessage",
"%s: kPROOF_GETPACKET", sl->
GetOrdinal());
3222 Info(
"HandleInputMessage",
"%s: kPROOF_LOGFILE: size: %d", sl->
GetOrdinal(), size);
3230 Info(
"HandleInputMessage",
"%s: kPROOF_LOGDONE: status %d parallel %d",
3264 Info(
"HandleInputMessage",
3280 rc = (async) ? 0 : 1;
3305 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: enter");
3315 Error(
"HandleInputMessage",
3316 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3325 Error(
"HandleInputMessage",
3326 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3330 Error(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: unknown type: %d",
type);
3343 Info(
"HandleInputMessage",
"kPROOF_SENDOUTPUT: enter (%s)", sl->
GetOrdinal());
3359 Info(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: enter");
3363 Info(
"HandleInputMessage",
"finalization on %s started ...", prefix);
3389 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: query result missing");
3391 }
else if (
type > 0) {
3401 }
else if (
IsTty() || changed) {
3402 fprintf(stderr,
"%s\r", msg.
Data());
3422 while ((xo = nxin()))
3434 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: player undefined!");
3445 Info(
"HandleInputMessage",
"%s: kPROOF_OUTPUTLIST: enter", sl->
GetOrdinal());
3467 Info(
"HandleInputMessage",
3468 "%s: kPROOF_OUTPUTLIST: query result missing", sl->
GetOrdinal());
3477 Info(
"HandleInputMessage",
3478 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->
GetOrdinal());
3482 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->
GetOrdinal());
3492 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYLIST: enter");
3505 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_RETRIEVE: enter");
3514 Info(
"HandleInputMessage",
3515 "kPROOF_RETRIEVE: query result missing or player undefined");
3522 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MAXQUERIES: enter");
3526 Printf(
"Number of queries fully kept remotely: %d", max);
3532 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SERVERSTARTED: enter");
3534 UInt_t tot = 0, done = 0;
3538 (*mess) >> action >> tot >> done >> st;
3545 char msg[512] = {0};
3547 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3550 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3551 action.
Data(), done, tot, frac);
3554 fprintf(stderr,
"%s", msg);
3564 m << action << tot << done << st;
3572 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATASET_STATUS: enter");
3574 UInt_t tot = 0, done = 0;
3578 (*mess) >> action >> tot >> done >> st;
3584 char msg[512] = {0};
3586 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3589 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3590 action.
Data(), done, tot, frac);
3593 fprintf(stderr,
"%s", msg);
3603 m << action << tot << done << st;
3611 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STARTPROCESS: enter");
3631 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"Preparation time: %f s",
fPrepTime);
3636 (*mess) >> selec >> dsz >>
first >> nent;
3638 if (!
gROOT->IsBatch()) {
3657 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_ENDINIT: enter");
3676 Info(
"HandleInputMessage",
"%s: got kPROOF_SETIDLE", sl->
GetOrdinal());
3679 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3693 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYSUBMITTED: enter");
3718 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SESSIONTAG: enter");
3741 Info(
"HandleInputMessage",
"kPROOF_FEEDBACK: enter");
3754 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_AUTOBIN: enter");
3773 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PROGRESS: enter");
3782 Float_t initTime, procTime, evtrti, mbrti;
3783 (*mess) >>
total >> processed >> bytesread
3784 >> initTime >> procTime
3788 initTime, procTime, evtrti, mbrti);
3793 (*mess) >>
total >> processed;
3807 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STOPPROCESS: enter");
3814 (*mess) >> status >> abort;
3816 (*mess) >> events >> abort;
3822 TList *listOfMissingFiles = 0;
3823 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
3824 listOfMissingFiles =
new TList();
3825 listOfMissingFiles->
SetName(
"MissingFiles");
3844 Emit(
"StopProcess(Bool_t)", abort);
3850 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SUBMERGER: enter");
3857 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: enter");
3864 Error(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: no list received!");
3908 Info(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: enter");
3912 Error(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: fDSet not set");
3921 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATA_READY: enter");
3924 (*mess) >> dataready >> totalbytes >> bytesready;
3937 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MESSAGE: enter");
3950 fprintf(stderr,
"%s%c", msg.
Data(), (lfeed ?
'\n' :
'\r'));
3958 fprintf(stderr,
"%s%c", msg.
Data(), (lfeed ?
'\n' :
'\r'));
3974 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_VERSARCHCOMP: %s", vac.
Data());
3987 Error(
"HandleInputMessage",
"unknown command received from '%s' (what = %d)",
4015 Int_t merger_id = -1;
4016 (*mess) >> merger_id;
4019 Info(
"HandleSubmerger",
"kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4023 Error(
"HandleSubmerger",
"kOutputSize: #%d not in list ", merger_id);
4036 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"all mergers removed ... ");
4040 PDB(kSubmerger, 2)
Error(
"HandleSubmerger",
"kOutputSent: received not on endmaster!");
4047 Int_t merger_id = -1;
4048 (*mess) >> merger_id;
4050 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown: #%d ", merger_id);
4053 Error(
"HandleSubmerger",
"kMergerDown: #%d not in list ", merger_id);
4076 while ((o = nxo())) {
4079 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown:%d: exit", merger_id);
4087 Info(
"HandleSubmerger",
"worker %s reported as finished ", sl->
GetOrdinal());
4091 Info(
"HandleSubmerger",
"finalization on %s started ...", prefix);
4095 Int_t output_size = 0;
4096 Int_t merging_port = 0;
4097 (*mess) >> output_size >> merging_port;
4099 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
4100 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4116 msg.
Form(
"%s: Invalid request: cannot start %d mergers for %d workers",
4126 if (activeWorkers > 1) {
4132 msg.
Form(
"%s: Number of mergers set dynamically to %d (for %d workers)",
4135 msg.
Form(
"%s: No mergers will be used for %d workers",
4136 prefix, activeWorkers);
4145 if (activeWorkers > 1) {
4150 while ((wrk = nxwk())) {
4158 msg.
Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host",
4161 msg.
Form(
"%s: No mergers will be used for %d workers",
4162 prefix, activeWorkers);
4170 msg.
Form(
"%s: Number of mergers set by user to %d (for %d workers)",
4238 Error(
"HandleSubMerger",
"kOutputSize received not on endmaster!");
4250 Int_t merger_id = -1;
4264 if (merger_id == -1) {
4271 Info(
"RedirectWorker",
"redirecting worker %s to merger %d", sl->
GetOrdinal(), merger_id);
4273 PDB(kSubmerger, 2)
Info(
"RedirectWorker",
"redirecting output to merger #%d", merger_id);
4275 Error(
"RedirectWorker",
"#%d not in list ", merger_id);
4281 sendoutput << merger_id;
4282 sendoutput << hname;
4284 s->
Send(sendoutput);
4296 while (fLastAssignedMerger < fMergers->GetSize() &&
4308 while (fLastAssignedMerger < fMergers->GetSize() &&
4329 PDB(kSubmerger, 2)
Info(
"AskForOutput",
4330 "worker %s was asked to send its output to master",
4334 sendoutput <<
TString(
"master");
4350 Info(
"UpdateDialog",
4351 "processing was aborted - %lld events processed",
4366 Info(
"UpdateDialog",
4367 "processing was stopped - %lld events processed",
4384 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4389 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4405 while ((sl = (
TSlave*) next()))
4418 while ((sl = (
TSlave*) next()))
4430 Int_t active_mergers = 0;
4435 if (mi->
IsActive()) active_mergers++;
4438 return active_mergers;
4447 Info(
"CreateMerger",
"worker %s will be merger ", sl->
GetOrdinal());
4449 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"Begin");
4453 Info(
"CreateMerger",
"cannot create merger on port %d - exit", port);
4470 Int_t workersOnHost = 0;
4474 workers = workersOnHost - 1;
4478 msg.
Form(
"worker %s on host %s will be merger for %d additional workers", sl->
GetOrdinal(), sl->
GetName(), workers);
4490 bemerger << workers;
4493 PDB(kSubmerger,2)
Info(
"CreateMerger",
4494 "merger #%d (port: %d) for %d workers started",
4502 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"exit");
4513 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4519 Error(
"MarkBad",
"worker instance undefined: protocol error? ");
4540 msg.
Form(
"\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4542 (reason && strlen(reason)) ? reason :
"unknown");
4547 msg +=
TString::Format(
"\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4552 msg +=
TString::Format(
" +++ Please check the session logs for error messages either using\n");
4556 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4560 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4561 "Display(\"*\")\n\n", thisurl.
Data());
4564 }
else if (reason) {
4566 Info(
"MarkBad",
"worker %s at %s:%d asked to terminate",
4574 TList *listOfMissingFiles = 0;
4575 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
4576 listOfMissingFiles =
new TList();
4577 listOfMissingFiles->
SetName(
"MissingFiles");
4586 packetizer->
MarkBad(wrk, 0, &listOfMissingFiles);
4642 Int_t mergersCount = -1;
4644 if (mc) mergersCount = mc->
GetVal();
4646 if (mergersCount == 0) {
4648 if (activeWorkers > 1) {
4674 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4689 Warning(
"TerminateWorker",
"worker instance undefined: protocol error? ");
4699 Info(
"TerminateWorker",
"connection to worker is already down: cannot"
4700 " send termination message");
4712 if (ord && strlen(ord) > 0) {
4717 while ((wrk = (
TSlave *)nxw())) {
4718 if (all || !strcmp(wrk->
GetOrdinal(), ord)) {
4750 if (slaves->
GetSize() == 0)
return 0;
4756 while ((sl = (
TSlave *)next())) {
4758 if (sl->
Ping() == -1) {
4759 MarkBad(sl,
"ping unsuccessful");
4776 if (slaves->
GetSize() == 0)
return;
4781 while ((sl = (
TSlave *)next())) {
4799 IsValid() ?
"valid" :
"invalid");
4802 Printf(
"ROOT version|rev: %s|%s",
gROOT->GetVersion(),
gROOT->GetGitCommit());
4809 Printf(
"Security context: %s",
4813 Printf(
"Security context: Error - No connection");
4814 Printf(
"Proofd protocol version: Error - No connection");
4826 Printf(
"*** Master server %s (parallel mode, %d workers):",
4829 Printf(
"*** Master server %s (sequential mode):",
4843 Printf(
"ROOT version|rev|tag: %s", ver.
Data());
4861 Printf(
"List of workers:");
4864 while (
TSlave* sl =
dynamic_cast<TSlave*
>(nextslave())) {
4865 if (!sl->IsValid())
continue;
4872 if (sl->GetSocket()->Send(mess) == -1)
4873 const_cast<TProof*
>(
this)->
MarkBad(sl,
"could not send kPROOF_PRINT request");
4877 Error(
"Print",
"TSlave is neither Master nor Worker");
4929 TString outfile, dsname, stfopt;
4933 while (opt.
Tokenize(oo, from,
"[; ]")) {
4936 iof = opt.
Index(tagf);
4939 iof = opt.
Index(tagf);
4942 iod = opt.
Index(tagd);
4945 iod = opt.
Index(tagd);
4948 ios = opt.
Index(tags);
4950 tags =
"savetofile";
4951 ios = opt.
Index(tags);
4956 Error(
"HandleOutputOptions",
"options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4962 from = iof + tagf.
Length();
4964 Error(
"HandleOutputOptions",
"could not extract output file settings string! (%s)", opt.
Data());
4972 from = iod + tagd.
Length();
4973 if (!opt.
Tokenize(dsname, from,
"[; ]"))
4974 if (
gDebug > 0)
Info(
"HandleOutputOptions",
"no dataset name found: use default");
4983 if (dsname.
IsNull()) dsname =
"dataset_<qtag>";
4987 from = ios + tags.
Length();
4988 if (!opt.
Tokenize(stfopt, from,
"[; ]"))
4989 if (
gDebug > 0)
Info(
"HandleOutputOptions",
"save-to-file not found: use default");
4997 Error(
"HandleOutputOptions",
"save-to-file option must be a digit! (%s)", stfopt.
Data());
5021 Warning(
"HandleOutputOptions",
5022 "directory '%s' for the output file does not exists or is not writable:"
5043 if (
Exec(
"gProofServ->GetDataDir()",
"0",
kTRUE) == 0) {
5048 ddir = os->
GetString()(fst+1, lst-fst-1);
5050 emsg =
"could not find 'const char *' string in macro log! cannot continue";
5053 emsg =
"could not retrieve master data directory info! cannot continue";
5056 Error(
"HandleOutputOptions",
"%s", emsg.
Data());
5060 if (!ddir.
IsNull()) ddir +=
"/";
5062 outfile.
Form(
"%s<file>", ddir.
Data());
5083 Warning(
"HandleOutputOptions",
"Dataset required bu Save-To-File disabled: enabling!");
5084 stfopt.
Form(
"%d", ostf+1);
5099 if (target ==
"ds|V") {
5104 while ((o = nxo())) {
5116 Warning(
"HandleOutputOptions",
"could not retrieve TFileCollection for dataset '%s'", dsname.
Data());
5119 Warning(
"HandleOutputOptions",
"dataset not found!");
5131 Printf(
" Output successfully copied to %s", target.
Data());
5132 targetcopied =
kTRUE;
5134 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.
Data());
5142 while ((o = nxo())) {
5146 if (pof == pf && targetcopied)
continue;
5151 Printf(
" Output successfully copied to %s", target.
Data());
5154 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.
Data());
5164 Warning(
"HandleOutputOptions",
5171 if (!target.
IsNull() && !swapcopied) {
5174 if (!fout || (fout && fout->
IsZombie())) {
5176 Warning(
"HandleOutputOptions",
"problems opening output file %s", target.
Data());
5181 while ((o = nxo())) {
5194 Printf(
" Output saved to %s", target.
Data());
5225 if (action == 0 || (action == 1 && optfb.
IsNull())) {
5230 ifb = opt.
Index(tag);
5232 if (ifb ==
kNPOS)
return;
5233 from = ifb + tag.
Length();
5236 Warning(
"SetFeedback",
"could not extract feedback string! Ignoring ...");
5245 TString nm, startdraw, stopdraw;
5247 while (optfb.
Tokenize(nm, from,
",")) {
5249 if (nm ==
"stats") {
5251 startdraw.
Form(
"gDirectory->Add(new TStatsFeedback((TProof *)%p))",
this);
5258 stopdraw.
Form(
"TObject *o = gDirectory->FindObject(\"%s\"); "
5259 " if (o && strcmp(o->ClassName(), \"TStatsFeedback\")) "
5271 startdraw.
Form(
"gDirectory->Add(new TDrawFeedback((TProof *)%p))",
this);
5276 stopdraw.
Form(
"TObject *o = gDirectory->FindObject(\"%s\"); "
5277 " if (o && strcmp(o->ClassName(), \"TDrawFeedback\")) "
5301 TString opt(option), optfb, outfile;
5303 if (opt.Contains(
"fb=") || opt.Contains(
"feedback="))
SetFeedback(opt, optfb, 0);
5312 Info(
"Process",
"session is in waiting or processing status: switch to asynchronous mode");
5314 opt.ReplaceAll(
"SYNC",
"");
5348 if (selector && strlen(selector)) {
5353 Error(
"Process",
"neither a selecrot file nor a selector object have"
5354 " been specified: cannot process!");
5376 if (sst) rv = sst->
GetVal();
5408 Info(
"Process",
"server version < 5.18/00:"
5409 " processing of TFileCollection not supported");
5420 if (selector && strlen(selector)) {
5425 Error(
"Process",
"neither a selecrot file nor a selector object have"
5426 " been specified: cannot process!");
5498 Info(
"Process",
"processing 'by name' not supported by the server");
5502 TString dsname, fname(dsetname);
5508 const char *separator = (fname.
EndsWith(
",")) ?
"," :
"|";
5514 if (
f && !(
f->IsZombie())) {
5515 const Int_t blen = 8192;
5519 Long64_t len = (rest > blen - 1) ? blen - 1 : rest;
5520 if (
f->ReadBuffer(buf, len)) {
5521 Error(
"Process",
"problems reading from file '%s'", fname.
Data());
5532 if (rest > 0)
return -1;
5534 Error(
"Process",
"could not open file '%s'", fname.
Data());
5546 Info(
"Process",
"processing multi-dataset read from file '%s':", fname.
Data());
5547 Info(
"Process",
" '%s'", dsname.
Data());
5554 Info(
"Process",
"multi-dataset processing not supported by the server");
5561 while (names.Tokenize(
name, from,
"[, |]")) {
5567 if (ienl ==
kNPOS) {
5568 ienl =
name.Index(
"<<");
5569 if (ienl !=
kNPOS) {
5571 ienl += strlen(
"<<");
5575 ienl += strlen(
"?enl=");
5581 if (idxc !=
kNPOS) {
5583 if (idxs !=
kNPOS) {
5584 obj = newname(idxs+1, newname.
Length());
5585 dir = newname(idxc+1, newname.
Length());
5589 obj = newname(idxc+1, newname.
Length());
5594 Error(
"Process",
"bad name syntax (%s): please use"
5595 " a '#' after the dataset name",
name.Data());
5603 }
else if (obj != dsobj || dir != dsdir) {
5605 Warning(
"Process",
"'obj' or 'dir' specification not consistent w/ the first given: ignore");
5608 if (ienl !=
kNPOS) {
5618 if ((el =
dynamic_cast<TEntryList *
>(oel))) {
5629 if (
f && !(
f->IsZombie()) &&
f->GetListOfKeys()) {
5630 TIter nxk(
f->GetListOfKeys());
5632 while ((k = (
TKey *) nxk())) {
5648 Warning(
"Process",
"multiple entry lists found in file '%s': the first one is taken;\n"
5649 "if this is not what you want, load first the content in memory"
5650 "and select it by name ", enl.
Data());
5655 Warning(
"Process",
"file '%s' cannot be open or is empty - ignoring", enl.
Data());
5677 TDSet *dset =
new TDSet(dsname, dsobj, dsdir);
5686 if (selector && strlen(selector)) {
5691 Error(
"Process",
"neither a selector file nor a selector object have"
5692 " been specified: cannot process!");
5716 Info(
"Process",
"server version < 5.17/04: generic processing not supported");
5725 if (selector && strlen(selector)) {
5726 retval =
Process(dset, selector, option,
n);
5730 Error(
"Process",
"neither a selector file nor a selector object have"
5731 " been specified: cannot process!");
5755 Error(
"Process",
"server version < 5.33/02:"
5756 "processing by object not supported");
5760 Error(
"Process",
"selector object undefined!");
5781 Error(
"Process",
"server version < 5.33/02:"
5782 "processing by object not supported");
5786 Error(
"Process",
"selector object undefined!");
5804 Error(
"Process",
"server version < 5.33/02:"
5805 "processing by object not supported");
5809 Error(
"Process",
"selector object undefined!");
5828 Error(
"Process",
"server version < 5.33/02:"
5829 "processing by object not supported");
5833 Error(
"Process",
"selector object undefined!");
5881 Info(
"Finalize",
"query #%d not found", qry);
5915 Info(
"Finalize",
"query already finalized:"
5916 " use Finalize(<qry>,kTRUE) to force new retrieval");
5945 Info(
"Retrieve",
"query #%d not found", qry);
5947 Info(
"Retrieve",
"positive argument required - do nothing");
5974 if (!farc || (farc && !(farc->
IsOpen()))) {
5975 Info(
"Retrieve",
"archive file cannot be open (%s)", path);
5990 Info(
"Retrieve",
"query not found after retrieve");
6010 Info(
"Remove",
"query #%d not found", qry);
6012 Info(
"Remove",
"positive argument required - do nothing");
6054 Info(
"Archive",
"query #%d not found", qry);
6056 Info(
"Archive",
"positive argument required - do nothing");
6113 if (mode && (strlen(mode) > 0)) {
6116 if (
m.Contains(
"ASYN")) {
6118 }
else if (
m.Contains(
"SYNC")) {
6124 Info(
"GetQueryMode",
"query mode is set to: %s", qmode ==
kSync ?
6137 const char *selection,
Option_t *option,
6144 Info(
"DrawSelect",
"not idle, asynchronous Draw not supported");
6173 const char *selection,
Option_t *option,
6177 Info(
"Process",
"processing 'by name' not supported by the server");
6185 if (idxc !=
kNPOS) {
6187 if (idxs !=
kNPOS) {
6198 Error(
"DrawSelect",
"bad name syntax (%s): please use"
6199 " a '#' after the dataset name", dsetname);
6217 Info(
"StopProcess",
"enter %d", abort);
6240 while ((sl = (
TSlave *)next()))
6251 Emit(
"DisableGoAsyn()");
6262 Info(
"GoAsynchronous",
"functionality not supported by the server - ignoring");
6270 Info(
"GoAsynchronous",
"either idle or already in asynchronous mode - ignoring");
6279 const Int_t kMAXBUF = 16384;
6293 Warning(
"RecvLogFile",
"file descriptor for outputs undefined (%d):"
6294 " will not log msgs", fdout);
6297 lseek(fdout, (off_t) 0, SEEK_END);
6303 while (filesize < size) {
6304 left =
Int_t(size - filesize);
6305 if (left >= kMAXBUF)
6308 filesize = (rec > 0) ? (filesize + rec) : filesize;
6317 w = write(fdout, p,
r);
6320 SysError(
"RecvLogFile",
"error writing to unit: %d", fdout);
6326 }
else if (rec < 0) {
6327 Error(
"RecvLogFile",
"error during receiving log file");
6333 EmitVA(
"LogMessage(const char*,Bool_t)", 2, buf,
kFALSE);
6352 if (!msg || (len = strlen(msg)) <= 0)
6356 Int_t lsfx = (sfx) ? strlen(sfx) : 0;
6363 Warning(
"NotifyLogMsg",
"file descriptor for outputs undefined (%d):"
6364 " will not notify msgs", fdout);
6367 lseek(fdout, (off_t) 0, SEEK_END);
6373 char *p = (
char *)msg;
6376 Int_t w = write(fdout, p,
r);
6378 SysError(
"NotifyLogMsg",
"error writing to unit: %d", fdout);
6386 if (write(fdout, sfx, lsfx) != lsfx)
6387 SysError(
"NotifyLogMsg",
"error writing to unit: %d", fdout);
6393 EmitVA(
"LogMessage(const char*,Bool_t)", 2, msg,
kFALSE);
6407 Info(
"LogMessage",
"Enter ... %s, 'all: %s", msg ? msg :
"",
6408 all ?
"true" :
"false");
6410 if (
gROOT->IsBatch()) {
6411 PDB(kGlobal,1)
Info(
"LogMessage",
"GUI not started - use TProof::ShowLog()");
6416 EmitVA(
"LogMessage(const char*,Bool_t)", 2, msg, all);
6422 lseek(fileno(
fLogFileR), (off_t) 0, SEEK_SET);
6424 const Int_t kMAXBUF = 32768;
6428 while ((len = read(fileno(
fLogFileR), buf, kMAXBUF-1)) < 0 &&
6433 Error(
"LogMessage",
"error reading log file");
6439 EmitVA(
"LogMessage(const char*,Bool_t)", 2, buf,
kFALSE);
6463 while ((sl = (
TSlave *)next())) {
6464 snprintf(str, 32,
"%d %d", cnt, size);
6466 MarkBad(sl,
"could not send kPROOF_GROUPVIEW message");
6530 if (!s.
Length())
return 0;
6539 Error(
"Exec",
"file %s could not be transfered", fn);
6550 Error(
"Exec",
"macro %s not found", filename.
Data());
6558 gROOT->ProcessLine(cmd);
6586 if (!s.
Length())
return 0;
6590 gROOT->ProcessLine(cmd);
6598 if (strcmp(ord,
"master") && strcmp(ord,
"0"))
ActivateWorker(ord);
6658 TString cmd =
TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6672 Printf(
"%s: %d", rcenv, env);
6684 TString cmd =
TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6698 Printf(
"%s: %f", rcenv, env);
6710 TString cmd =
TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6719 env = os->
GetString()(fst+1, lst-fst-1);
6801 FileMap_t::const_iterator it;
6808 if ((*md5) != md.
fMD5) {
6831 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
6845 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
6892 slaves =
new TList();
6896 if (slaves->
GetSize() == 0)
return 0;
6912 Error(
"SendFile",
"cannot stat file %s",
file);
6917 Error(
"SendFile",
"empty file %s",
file);
6929 if ((opt &
kCp)) cpopt |=
kCp;
6932 const Int_t kMAXBUF = 32768;
6939 if (fnam ==
"cache") {
6941 }
else if (fnam.
IsNull()) {
6946 while ((sl = (
TSlave *)next())) {
6956 Info(
"SendFile",
"%s sending file %s to: %s:%s (%d)", snd,
6964 snprintf(buf, kMAXBUF,
"%s %d %lld %d", fnam.
Data(), bin, siz, fw);
6966 MarkBad(sl,
"could not send kPROOF_SENDFILE request");
6972 lseek(fd, 0, SEEK_SET);
6980 SysError(
"SendFile",
"error reading from file %s",
file);
6987 SysError(
"SendFile",
"error writing to slave %s:%s (now offline)",
6989 MarkBad(sl,
"sendraw failure");
7010 return (
fStatus != 0) ? -1 : nsl;
7020 if (!
IsValid() || !obj)
return -1;
7046 if (!
IsValid() || !obj)
return -1;
7075 snprintf(str, 32,
"%d %u", level, mask);
7093 Warning(
"SetRealTimeLog",
"session is invalid - do nothing");
7111 PDB(kGlobal,1)
Info(
"SetParallelSilent",
"request all nodes");
7113 PDB(kGlobal,1)
Info(
"SetParallelSilent",
"request %d node%s", nodes,
7114 nodes == 1 ?
"" :
"s");
7117 mess << nodes << random;
7121 PDB(kGlobal,1)
Info(
"SetParallelSilent",
"got %d node%s",
n,
n == 1 ?
"" :
"s");
7140 Printf(
"PROOF set to sequential mode");
7142 TString subfix = (
n == 1) ?
"" :
"s";
7144 subfix +=
", randomly selected";
7145 Printf(
"PROOF set to parallel mode (%d worker%s)",
n, subfix.
Data());
7163 Error(
"GoMoreParallel",
"can't invoke here -- should not happen!");
7167 Error(
"GoMoreParallel",
"no ProofServ available nor Lite -- should not happen!");
7173 Int_t nAddedWorkers = 0;
7175 while (((nAddedWorkers < nWorkersToAdd) || (nWorkersToAdd == -1)) &&
7176 (( sl =
dynamic_cast<TSlave *
>( next() ) ))) {
7181 Error(
"GoMoreParallel",
"TSlave is neither a Master nor a Slave: %s:%s",
7188 (strcmp(
"IGNORE", sl->
GetImage()) == 0)) {
7190 Info(
"GoMoreParallel",
"Worker %s:%s won't be considered",
7197 Info(
"GoMoreParallel",
"Worker %s:%s is already active: skipping",
7213 Info(
"GoMoreParallel",
"Worker %s:%s marked as active!",
7218 Error(
"GoMoreParallel",
"Dynamic addition of master is not supported");
7226 Info(
"GoMoreParallel",
"Will invoke AskStatistics() -- implies a Collect()");
7231 Info(
"GoMoreParallel",
"Will invoke FindUniqueSlaves()");
7236 Info(
"GoMoreParallel",
"Will invoke SendGroupView()");
7240 Info(
"GoMoreParallel",
"Will invoke GetParallel()");
7246 s.
Form(
"PROOF just went more parallel (%d additional worker%s, %d worker%s total)",
7247 nAddedWorkers, (nAddedWorkers == 1) ?
"" :
"s",
7248 nTotalWorkers, (nTotalWorkers == 1) ?
"" :
"s");
7250 Info(
"GoMoreParallel",
"%s", s.
Data());
7252 return nTotalWorkers;
7276 while ((sl = (
TSlave *)nxt())) {
7278 if (strcmp(
"IGNORE", sl->
GetImage()) == 0)
continue;
7281 Error(
"GoParallel",
"TSlave is neither Master nor Slave");
7294 while (cnt < nwrks) {
7304 Error(
"GoParallel",
"attaching to candidate!");
7310 Int_t slavenodes = 0;
7321 Int_t nn = (nodes < 0) ? -1 : nodes-cnt;
7329 MarkBad(sl,
"could not send kPROOF_PARALLEL or kPROOF_LOGFILE request");
7345 MarkBad(sl,
"collect failed after kPROOF_PARALLEL or kPROOF_LOGFILE request");
7372 printf(
"PROOF set to sequential mode\n");
7374 printf(
"PROOF set to parallel mode (%d worker%s)\n",
7375 n,
n == 1 ?
"" :
"s");
7378 PDB(kGlobal,1)
Info(
"GoParallel",
"got %d node%s",
n,
n == 1 ?
"" :
"s");
7414 if (doask && !
Prompt(
"Do you really want to remove all data files"))
return;
7415 if (
fManager->
Rm(
"~/data/*",
"-rf",
"all") < 0)
7416 Warning(
"ClearData",
"problems purging data directory");
7420 if (!dsname || strlen(dsname) <= 0) {
7421 Error(
"ClearData",
"dataset name mandatory when removing a full dataset");
7426 Error(
"ClearData",
"dataset '%s' does not exists", dsname);
7432 Error(
"ClearData",
"could not retrieve info about dataset '%s'", dsname);
7437 " of dataset '%s'", dsname);
7444 Int_t rfiles = 0, nfiles =
fc->GetList()->GetSize();
7450 Error(
"ClearData",
"GetFirstUrl() returns NULL for '%s' - skipping",
7461 while (nurl-- && fi->
NextUrl()) {
7475 Error(
"ClearData",
"problems removing '%s'",
file.Data());
7482 fprintf(stderr,
"\n");
7490 TString outtmp(
"ProofClearData_");
7493 Error(
"ClearData",
"cannot create temp file for logs");
7503 in.open(outtmp.
Data());
7504 if (!in.is_open()) {
7505 Error(
"ClearData",
"could not open temp file for logs: %s", outtmp.
Data());
7516 if (
line.IsNull())
continue;
7520 if (!
line.Tokenize(host, from,
"| "))
continue;
7522 if (!
line.Tokenize(
file, from,
"| "))
continue;
7533 Info(
"ClearData",
"added info for: h:%s, f:%s", host.
Data(),
file.Data());
7535 Warning(
"ClearData",
"found incomplete line: '%s'",
line.Data());
7545 if (!fcmap || (fcmap && fcmap->
GetSize() <= 0)) {
7547 Warning(
"ClearData",
"no dataset beloning to '%s'", sel.
Data());
7566 while (nurl-- && fi->
NextUrl()) {
7575 Info(
"ClearData",
"found: host: %s, file: %s", host.
Data(),
file.Data());
7586 "registered file '%s' not found in the full list!",
7602 Info(
"ClearData",
"%d unregistered files to be removed:", nfiles);
7606 " unregistered data files", nfiles);
7620 Error(
"ClearData",
"problems removing '%s' on host '%s'",
7628 fprintf(stderr,
"\n");
7643 if (!pp.
Contains(
"[y/N]")) pp +=
" [y/N]";
7645 if (
a !=
"\n" &&
a[0] !=
'y' &&
a[0] !=
'Y' &&
a[0] !=
'n' &&
a[0] !=
'N') {
7646 Printf(
"Please answer y, Y, n or N");
7649 }
else if (
a ==
"\n" ||
a[0] ==
'n' ||
a[0] ==
'N') {
7662 fprintf(stderr,
"[TProof::ClearData] Total %5d files\t|", t);
7664 if (
r > 0 && t > 0) {
7666 fprintf(stderr,
"=");
7667 else if (
l == 20*
r/t)
7668 fprintf(stderr,
">");
7669 else if (
l > 20*
r/t)
7670 fprintf(stderr,
".");
7672 fprintf(stderr,
"=");
7674 fprintf(stderr,
"| %.02f %% \r", 100.0*(t ? (
r/t) : 1));
7736 while (fgets(
line, 2048, fin)) {
7739 if (write(fdout,
line,
r) < 0) {
7741 "errno %d writing to file descriptor %d",
7770 Warning(
"ShowPackages",
"file descriptor for outputs undefined (%p):"
7771 " will not log msgs", fout);
7774 lseek(fileno(fout), (off_t) 0, SEEK_END);
7851 if (!package || !package[0]) {
7852 Error(
"ClearPackage",
"need to specify a package name");
7879 if (!pack || strlen(pack) <= 0) {
7880 Error(
"DisablePackage",
"need to specify a package name");
7891 Warning(
"DisablePackage",
"problem removing locally package '%s'", pack);
7901 path.
Form(
"~/packages/%s", pack);
7902 if (
fManager->
Rm(path,
"-rf",
"all") != -1) {
7938 Warning(
"DisablePackages",
"problem removing packages locally");
7947 if (
fManager->
Rm(
"~/packages/*",
"-rf",
"all") != -1) {
7985 if (!package || !package[0]) {
7986 Error(
"BuildPackage",
"need to specify a package name");
8025 if (buildOnClient) {
8031 if (!
IsLite() || !buildOnClient) {
8064 if (!package || !package[0]) {
8065 Error(
"LoadPackage",
"need to specify a package name");
8076 if (
fPackMgr->
Load(package, loadopts) == -1)
return -1;
8080 if (loadopts) mess << loadopts;
8089 Info(
"LoadPackage",
"Sending load message to selected workers only");
8091 if (doCollect)
Collect(workers, -1, -1, deactivateOnFailure);
8108 if (!package || !package[0]) {
8109 Error(
"UnloadPackage",
"need to specify a package name");
8120 Warning(
"UnloadPackage",
"unable to remove symlink to %s", package);
8191 if (loadopts && strlen(loadopts)) {
8209 Warning(
"EnablePackage",
"'checkversion' option unknown from argument: '%s' - ignored", ocv.
Data());
8212 Info(
"EnablePackage",
"setting check version option from argument: %d", cvopt);
8216 if (lcv !=
kNPOS && fcv == 0) ocv += os->
String()[lcv];
8222 if (!optls) optls =
new TList;
8228 Warning(
"EnablePackage",
"remote server does not support options: ignoring the option string");
8253 if (!package || !package[0]) {
8254 Error(
"EnablePackage",
"need to specify a package name");
8272 if (ocv ==
"off" || ocv ==
"0")
8274 else if (ocv ==
"on" || ocv ==
"1")
8277 Warning(
"EnablePackage",
"'checkversion' option unknown from rootrc: '%s' - ignored", ocv.
Data());
8282 chkveropt = pcv->
GetVal();
8288 Info(
"EnablePackage",
"using check version option: %d", chkveropt);
8293 TList *optls = (loadopts && loadopts->
GetSize() > 0) ? loadopts : 0;
8295 Warning(
"EnablePackage",
"remote server does not support options: ignoring the option list");
8299 if (
LoadPackage(pac, notOnClient, optls, workers) == -1)
8327 Error(
"DownloadPackage",
"the manager is undefined!");
8333 if (!parname.EndsWith(
".par")) parname +=
".par";
8334 src.
Form(
"packages/%s", parname.Data());
8335 if (!dstdir || strlen(dstdir) <= 0) {
8336 dst.
Form(
"./%s", parname.Data());
8343 Error(
"DownloadPackage",
8344 "could not create the destination directory '%s' (errno: %d)",
8349 Error(
"DownloadPackage",
8350 "destination path '%s' exist but is not a directory!", dstdir);
8353 dst.
Form(
"%s/%s", dstdir, parname.Data());
8363 Warning(
"DownloadPackage",
"problems restoring output");
8376 if (s.
Contains(
"*** Global Package cache")) {
8382 }
else if (s.
Contains(
"*** Package cache")) {
8387 if (isGlobal && s.
Contains(parname)) {
8388 src.
Form(
"%s/%s", globaldir.
Data(), parname.Data());
8399 Error(
"DownloadPackage",
"problems downloading '%s' (src:%s, dst:%s)",
8403 Info(
"DownloadPackage",
"'%s' cross-checked against master repository (local path: %s)",
8441 if (par.EndsWith(
".par")) {
8443 name = base(0, base.
Length() - strlen(
".par"));
8458 Info(
"UploadPackage",
"global package found (%s): no upload needed",
8461 }
else if (xrc < 0) {
8462 Error(
"UploadPackage",
"PAR file '%s' not found", par.Data());
8494 smsg.
Form(
"+%s", par.Data());
8500 mess << smsg << (*md5);
8503 mess2 << smsg << (*md5);
8506 mess3 << smsg << (*md5);
8521 TIter next(workers);
8523 while ((sl = (
TSlave *) next())) {
8537 Error(
"UploadPackage",
"%s: problems uploading file %s",
8556 Error(
"UploadPackage",
"%s: unpacking of package %s failed",
8566 while ((ma = (
TSlave *) nextmaster())) {
8576 Error(
"UploadPackage",
"package %s did not exist on submaster %s",
8592 if (macro && strlen(macro) > 0) {
8596 gROOT->SetMacroPath(macrop);
8623 if (!macro || !macro[0]) {
8624 Error(
"Load",
"need to specify a macro name");
8634 TString addsname, implname = macro;
8636 if (icom !=
kNPOS) {
8637 addsname = implname(icom + 1, implname.
Length());
8641 TString bmsg(basemacro), acmode, args, io;
8647 Info(
"Load",
"macro '%s' does not contain a '.': do nothing", macro);
8663 Info(
"Load",
"no associated header file found: tried: %s %s",
8664 h.Data(), headname.
Data());
8671 if (!addsname.
IsNull()) {
8674 while (addsname.
Tokenize(fn, from,
",")) {
8676 Error(
"Load",
"additional file '%s' not found", fn.
Data());
8684 }
else if (!addincs.
Contains(dirn)) {
8696 Error(
"Load",
"problems sending implementation file %s", implname.
Data());
8701 Error(
"Load",
"problems sending header file %s", headname.
Data());
8706 TIter nxfn(&addfiles);
8711 Error(
"Load",
"problems sending additional file %s", os->
GetName());
8763 PDB(kGlobal, 1)
Info(
"Load",
"adding loaded macro: %s", macro);
8780 if (uniqueWorkers) {
8796 while ((wrk = (
TSlave *)nxw())) {
8813 PDB(kGlobal, 1)
Info(
"Load",
"adding loaded macro: %s", macro);
8836 if ((!libpath || !libpath[0])) {
8838 Info(
"AddDynamicPath",
"list is empty - nothing to do");
8850 if (libpath && strlen(libpath)) {
8881 if ((!incpath || !incpath[0])) {
8883 Info(
"AddIncludePath",
"list is empty - nothing to do");
8895 if (incpath && strlen(incpath)) {
8925 if ((!libpath || !libpath[0])) {
8927 Info(
"RemoveDynamicPath",
"list is empty - nothing to do");
8939 if (libpath && strlen(libpath))
8959 if ((!incpath || !incpath[0])) {
8961 Info(
"RemoveIncludePath",
"list is empty - nothing to do");
8973 if (incpath && strlen(incpath))
8994 if ((
type !=
"lib") && (
type !=
"inc")) {
8995 Error(
"HandleLibIncPath",
"unknown action type: %s - protocol error?",
type.Data());
9004 if (path.
Length() > 0 && path !=
"-") {
9006 Warning(
"HandleLibIncPath",
"decomposing path %s", path.
Data());
9013 if (
type ==
"lib") {
9035 Info(
"HandleLibIncPath",
9036 "libpath %s does not exist or cannot be read - not added", xlib.
Data());
9056 Info(
"HandleLibIncPath",
9057 "incpath %s does not exist or cannot be read - not added", xinc.
Data());
9064 if (
type ==
"lib") {
9137 (*fPrintProgress)(
total, processed, procTime, bytesread);
9142 fprintf(stderr,
"[TProof::Progress] Total %lld events\t|",
total);
9144 for (
int l = 0;
l < 20;
l++) {
9146 if (
l < 20*processed/
total)
9147 fprintf(stderr,
"=");
9148 else if (
l == 20*processed/
total)
9149 fprintf(stderr,
">");
9150 else if (
l > 20*processed/
total)
9151 fprintf(stderr,
".");
9153 fprintf(stderr,
"=");
9155 Float_t evtrti = (procTime > 0. && processed > 0) ? processed / procTime : -1.;
9156 Float_t mbsrti = (procTime > 0. && bytesread > 0) ? bytesread / procTime : -1.;
9159 Float_t remainingTime = (
total >= processed) ? (
total - processed) / evtrti : -1;
9161 const Float_t toK = 1024., toM = 1048576., toG = 1073741824.;
9162 if (mbsrti >= toG) {
9165 }
else if (mbsrti >= toM) {
9168 }
else if (mbsrti >= toK) {
9172 fprintf(stderr,
"| %.02f %% [%.1f evts/s, %.1f %s, time left: %.1f s]\r",
9173 (
total ? ((100.0*processed)/
total) : 100.0), evtrti, mbsrti, sunit.
Data(), remainingTime);
9175 fprintf(stderr,
"| %.02f %% [%.1f evts/s, time left: %.1f s]\r",
9176 (
total ? ((100.0*processed)/
total) : 100.0), evtrti, remainingTime);
9179 fprintf(stderr,
"| %.02f %%\r",
9180 (
total ? ((100.0*processed)/
total) : 100.0));
9182 if (processed >=
total) {
9183 fprintf(stderr,
"\n Query processing time: %.1f s\n", procTime);
9199 Info(
"Progress",
"%2f (%lld/%lld)", 100.*processed/
total, processed,
total);
9201 if (
gROOT->IsBatch()) {
9206 EmitVA(
"Progress(Long64_t,Long64_t)", 2,
total, processed);
9219 Info(
"Progress",
"%lld %lld %lld %f %f %f %f",
total, processed, bytesread,
9220 initTime, procTime, evtrti, mbrti);
9222 if (
gROOT->IsBatch()) {
9227 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
9228 7,
total, processed, bytesread, initTime, procTime, evtrti, mbrti);
9241 Info(
"Progress",
"%lld %lld %lld %f %f %f %f %d %f",
total, processed, bytesread,
9242 initTime, procTime, evtrti, mbrti, actw, eses);
9244 if (
gROOT->IsBatch()) {
9249 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
9250 10,
total, processed, bytesread, initTime, procTime, evtrti, mbrti, actw, tses, eses);
9267 Emit(
"Feedback(TList *objs)", (
Long_t) objs);
9276 Info(
"CloseProgressDialog",
9283 Emit(
"CloseProgressDialog()");
9293 Info(
"ResetProgressDialog",
"(%s,%d,%lld,%lld)", sel, sz, fst, ent);
9295 EmitVA(
"ResetProgressDialog(const char*,Int_t,Long64_t,Long64_t)",
9296 4, sel, sz, fst, ent);
9305 Info(
"StartupMessage",
"(%s,%d,%d,%d)", msg, st, done,
total);
9307 EmitVA(
"StartupMessage(const char*,Bool_t,Int_t,Int_t)",
9308 4, msg, st, done,
total);
9317 Info(
"DataSetStatus",
"(%s,%d,%d,%d)", msg, st, done,
total);
9319 EmitVA(
"DataSetStatus(const char*,Bool_t,Int_t,Int_t)",
9320 4, msg, st, done,
total);
9333 char msg[512] = {0};
9335 snprintf(msg, 512,
"%s: OK (%d %s) \n",
9336 action,tot,
type.Data());
9338 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
9339 action, done, tot, frac);
9342 fprintf(stderr,
"%s", msg);
9351 mess <<
TString(action) << tot << done << st;
9362 Info(
"QueryResultReady",
"ref: %s", ref);
9364 Emit(
"QueryResultReady(const char*)",ref);
9384 while (
TSlave *sl =
dynamic_cast<TSlave*
>(nextSlave())) {
9389 sllist->
SetName(sl->GetName());
9390 slholder.
Add(sllist);
9393 elemholder.
Add(elemlist);
9394 nodes.
Add(
new TPair(sllist, elemlist));
9396 sllist =
dynamic_cast<TList*
>(p->
Key());
9398 if (sllist) sllist->
Add(sl);
9404 for (
Int_t i = 0; i < 2; i++) {
9408 if (elem->GetValid())
continue;
9440 Warning(
"ValidateDSet",
"invalid values from TPair! Protocol error?");
9448 Warning(
"ValidateDSet",
"no node to allocate TDSetElement to - ignoring");
9456 TIter nextNode(&nodes);
9458 while (
TPair *node =
dynamic_cast<TPair*
>(nextNode())) {
9459 TList *slaves =
dynamic_cast<TList*
>(node->Key());
9460 TList *setelements =
dynamic_cast<TList*
>(node->Value());
9461 if (!slaves || !setelements)
continue;
9465 for (
Int_t i=0; i<nslaves; i++) {
9469 for (
Int_t j = (i*nelements)/nslaves;
9470 j < ((i+1)*nelements)/nslaves;
9487 PDB(kGlobal,1)
Info(
"ValidateDSet",
9488 "Sending TDSet with %d elements to slave %s"
9490 copyset.GetListOfElements()->GetSize(),
9500 Info(
"ValidateDSet",
"Calling Collect");
9577 if (datafile && strlen(datafile) > 0) {
9609 if (dataFile.
Length() > 0) {
9611 Info(
"SendInputDataFile",
"broadcasting %s", dataFile.
Data());
9644 if (
f &&
f->GetListOfKeys() &&
f->GetListOfKeys()->GetSize() > 0)
9658 if (!list_ok && !file_ok)
return;
9661 if (file_ok && !list_ok) {
9664 }
else if (!file_ok && list_ok) {
9674 while ((obj = next())) {
9684 }
else if (file_ok && list_ok) {
9704 while ((obj = next())) {
9710 Error(
"PrepareInputDataFile",
"could not open %s for updating", dataFile.
Data());
9770 !out || (out && out->
GetSize() <= 0))
return o;
9777 while ((o = nxo())) {
9783 if (!
f || (
f &&
f->IsZombie())) {
9784 ::Warning(
"TProof::GetOutput",
"problems opening file %s", fn.
Data());
9787 if (
f && (o =
f->Get(
name)))
return o;
9815 Warning(
"SetParameter",
"player undefined! Ignoring");
9834 Warning(
"SetParameter",
"player undefined! Ignoring");
9853 Warning(
"SetParameter",
"player undefined! Ignoring");
9872 Warning(
"SetParameter",
"player undefined! Ignoring");
9891 Warning(
"SetParameter",
"player undefined! Ignoring");
9911 Warning(
"GetParameter",
"player undefined! Ignoring");
9927 if (!wildcard) wildcard =
"";
9929 Int_t nch = strlen(wildcard);
9935 while ((p = next())) {
9937 if (nch && s != wildcard && s.
Index(re) ==
kNPOS)
continue;
9952 if (!wildcard) wildcard =
"";
9954 Int_t nch = strlen(wildcard);
9959 while ((p = next())) {
9961 if (nch && s != wildcard && s.
Index(re) ==
kNPOS)
continue;
9962 if (p->IsA() == TNamed::Class()) {
9982 Info(
"AddFeedback",
"Adding object \"%s\" to feedback",
name);
10013 Info(
"",
"no feedback requested");
10037 Error(
"GetTreeHeader",
"No connection");
10054 d = soc->
Recv(reply);
10057 Error(
"GetTreeHeader",
"Error getting a replay from the master.Result %d", (
int)
d);
10064 if (
s1 ==
"Success")
10069 Info(
"GetTreeHeader",
"%s, message size: %d, entries: %d",
10072 Info(
"GetTreeHeader",
"tree header retrieval failed");
10236 msg << start << end;
10252 off_t nowlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_CUR);
10255 "problem lseeking log file to current position (errno: %d)",
TSystem::GetErrno());
10260 off_t startlog = nowlog;
10261 off_t endlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_END);
10270 if (tolog <= 0)
return maclog;
10273 if (lseek(fileno(
fLogFileR), startlog, SEEK_SET) < 0) {
10275 "problem lseeking log file to start position (errno: %d)",
TSystem::GetErrno());
10284 Int_t wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10295 wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10299 if (lseek(fileno(
fLogFileR), nowlog, SEEK_SET) < 0) {
10301 "problem lseeking log file to original position (errno: %d)",
TSystem::GetErrno());
10320 EmitVA(
"LogMessage(const char*,Bool_t)", 2,
l->GetName(),
kFALSE);
10340 if (strstr(queryref, qr->
GetTitle()) &&
10341 strstr(queryref, qr->
GetName()))
10364 off_t nowlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_CUR);
10371 off_t startlog = nowlog;
10372 off_t endlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_END);
10378 lseek(fileno(
fLogFileR), nowlog, SEEK_SET);
10381 lseek(fileno(
fLogFileR), (off_t) 0, SEEK_SET);
10382 }
else if (qry != -1) {
10393 }
else if (qry > 0) {
10396 TIter nxq(queries);
10403 TIter nxq(queries);
10414 Info(
"ShowLog",
"query %d not found in list", qry);
10425 lseek(fileno(
fLogFileR), startlog, SEEK_SET);
10431 Int_t wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10440 Int_t w = write(fileno(stdout), p,
r);
10442 SysError(
"ShowLog",
"error writing to stdout");
10449 tolog -= strlen(
line);
10454 const char *opt = Getline(
"More (y/n)? [y]");
10464 wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10473 if (write(fileno(stdout),
"\n", 1) != 1)
10474 SysError(
"ShowLog",
"error writing to stdout");
10479 lseek(fileno(
fLogFileR), nowlog, SEEK_SET);
10491 if (
d->GetProof()) {
10517 Error(
"Detach",
"corrupted worker instance: wrk:%p, sock:%p", sl, s);
10524 if (shutdown && !
IsIdle()) {
10529 timeout = (timeout > 20) ? timeout : 20;
10554 if (
d->GetProof() ==
this) {
10627 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10658 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10676 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10702 Info(
"RegisterDataSet",
10703 "functionality not available: the server does not have dataset support");
10707 if (!dataSetName || strlen(dataSetName) <= 0) {
10708 Info(
"RegisterDataSet",
"specifying a dataset name is mandatory");
10716 parallelverify =
kTRUE;
10724 mess <<
TString(dataSetName);
10732 Error(
"RegisterDataSet",
"dataset was not saved");
10738 if (!parallelverify)
return result;
10743 Error(
"RegisterDataSet",
"problems verifying dataset '%s'", dataSetName);
10760 Info(
"SetDataSetTreeName",
"functionality not supported by the server");
10764 if (!dataset || strlen(dataset) <= 0) {
10765 Info(
"SetDataSetTreeName",
"specifying a dataset name is mandatory");
10769 if (!treename || strlen(treename) <= 0) {
10770 Info(
"SetDataSetTreeName",
"specifying a tree name is mandatory");
10786 Error(
"SetDataSetTreeName",
"some error occured: default tree name not changed");
10802 Info(
"GetDataSets",
10803 "functionality not available: the server does not have dataset support");
10806 if (
fProtocol < 31 && strstr(optStr,
":lite:"))
10807 Warning(
"GetDataSets",
"'lite' option not supported by the server");
10811 mess <<
TString(uri ? uri :
"");
10812 mess <<
TString(optStr ? optStr :
"");
10816 TMap *dataSetMap = 0;
10818 Error(
"GetDataSets",
"error receiving datasets information");
10823 if (!(dataSetMap = (
TMap *)(retMess->
ReadObject(TMap::Class()))))
10824 Error(
"GetDataSets",
"error receiving datasets");
10826 Error(
"GetDataSets",
"message not found or wrong type (%p)", retMess);
10839 Info(
"ShowDataSets",
10840 "functionality not available: the server does not have dataset support");
10846 mess <<
TString(uri ? uri :
"");
10847 mess <<
TString(optStr ? optStr :
"");
10852 Error(
"ShowDataSets",
"error receiving datasets information");
10861 Info(
"ExistsDataSet",
"functionality not available: the server has an"
10862 " incompatible version of TFileInfo");
10866 if (!dataset || strlen(dataset) <= 0) {
10867 Error(
"ExistsDataSet",
"dataset name missing");
10889 Info(
"ClearDataSetCache",
"functionality not available on server");
10907 Info(
"ShowDataSetCache",
"functionality not available on server");
10930 Info(
"GetDataSet",
"functionality not available: the server has an"
10931 " incompatible version of TFileInfo");
10935 if (!uri || strlen(uri) <= 0) {
10936 Info(
"GetDataSet",
"specifying a dataset name is mandatory");
10943 nameMess <<
TString(optStr ? optStr:
"");
10945 Error(
"GetDataSet",
"sending request failed");
10950 Error(
"GetDataSet",
"error receiving datasets information");
10956 Error(
"GetDataSet",
"error reading list of files");
10958 Error(
"GetDataSet",
"message not found or wrong type (%p)", retMess);
10971 fileList->
Print(opt);
10974 Warning(
"ShowDataSet",
"no such dataset: %s", uri);
10985 nameMess <<
TString(uri?uri:
"");
10986 nameMess <<
TString(optStr?optStr:
"");
10988 Error(
"RemoveDataSet",
"sending request failed");
11002 Error (
"FindDataSets",
"not yet implemented");
11003 return (
TList *) 0;
11013 Error(
"RequestStagingDataSet",
11014 "functionality not supported by the server");
11025 Error(
"RequestStagingDataSet",
"staging request was unsuccessful");
11039 Error(
"CancelStagingDataSet",
11040 "functionality not supported by the server");
11051 Error(
"CancelStagingDataSet",
"cancel staging request was unsuccessful");
11066 Error(
"GetStagingStatusDataSet",
11067 "functionality not supported by the server");
11073 nameMess <<
TString(dataset);
11075 Error(
"GetStagingStatusDataSet",
"sending request failed");
11083 Error(
"GetStagingStatusDataSet",
"problem processing the request");
11089 retMess->
ReadObject(TFileCollection::Class()) );
11091 Error(
"GetStagingStatusDataSet",
"error reading list of files");
11094 Error(
"GetStagingStatusDataSet",
11095 "response message not found or wrong type (%p)", retMess);
11123 Info(
"VerifyDataSet",
"functionality not available: the server has an"
11124 " incompatible version of TFileInfo");
11129 if (!uri || (uri && strlen(uri) <= 0)) {
11130 Error(
"VerifyDataSet",
"dataset name is is mandatory");
11134 Int_t nmissingfiles = 0;
11139 Info(
"VerifyDataSet",
"Master-only verification");
11149 Info(
"VerifyDataSet",
"no such dataset %s", uri);
11153 return nmissingfiles;
11158 Error(
"VerifyDataSet",
"PROOF is in sequential mode (no workers): cannot do parallel verification.");
11159 Error(
"VerifyDataSet",
"Either start PROOF with some workers or force sequential adding 'S' as option.");
11173 Int_t nmissingfiles = 0;
11188 Int_t oldifiip = -1;
11193 const char* mss=
"";
11195 const char* stageoption=
"";
11213 if (oldifiip > -1) {
11214 SetParameter(
"PROOF_IncludeFileInfoInPacket", oldifiip);
11222 Int_t ntouched = 0;
11228 while ((obj = nxtout())) {
11233 while ((fiindout = (
TFileInfo*) nxt())) {
11234 lfiindout->
Add(fiindout);
11240 nmissingfiles += pdisappeared->
GetVal();
11243 if (pnopened &&
TString(pnopened->
GetName()).BeginsWith(
"PROOF_NoFilesOpened_")) {
11244 nopened += pnopened->
GetVal();
11247 if (pntouched &&
TString(pntouched->
GetName()).BeginsWith(
"PROOF_NoFilesTouched_")) {
11248 ntouched += pntouched->
GetVal();
11251 if (pchanged_ds &&
TString(pchanged_ds->
GetName()).BeginsWith(
"PROOF_DataSetChanged_")) {
11256 Info(
"VerifyDataSetParallel",
"%s: changed? %d (# files opened = %d, # files touched = %d,"
11257 " # missing files = %d)",
11258 uri, changed_ds, nopened, ntouched, nmissingfiles);
11260 return nmissingfiles;
11269 Info(
"UploadDataSet",
"Lite-session: functionality not implemented");
11275 mess <<
TString(optStr?optStr:
"");
11279 TMap *groupQuotaMap = 0;
11281 Info(
"GetDataSetQuota",
"could not receive quota");
11286 if (!(groupQuotaMap = (
TMap*)(retMess->
ReadObject(TMap::Class()))))
11287 Error(
"GetDataSetQuota",
"error getting quotas");
11289 Error(
"GetDataSetQuota",
"message not found or wrong type (%p)", retMess);
11292 return groupQuotaMap;
11302 Info(
"ShowDataSetQuota",
11303 "functionality not available: the server does not have dataset support");
11308 Info(
"UploadDataSet",
"Lite-session: functionality not implemented");
11319 Error(
"ShowDataSetQuota",
"error receiving quota information");
11382 if (!ord || strlen(ord) <= 0) {
11383 Info(
"ModifyWorkerLists",
11384 "an ordinal number - e.g. \"0.4\" or \"*\" for all - is required as input");
11388 Info(
"ModifyWorkerLists",
"ord: '%s' (add: %d, save: %d)", ord, add, save);
11425 while(oo.Tokenize(o, from,
","))
11435 while ((wrk = (
TSlave *) nxw())) {
11458 if (!allord && ords) {
11459 if (os) ords->
Remove(os);
11460 if (ords->
GetSize() == 0)
break;
11467 if (!fw && ords && ords->
GetSize() > 0) {
11470 while ((os = nxo())) {
11472 while ((wrk = (
TSlave *) nxw()))
11475 if (!oo.
IsNull()) oo +=
",";
11480 Warning(
"ModifyWorkerLists",
"worker(s) '%s' not found!", oo.
Data());
11500 mess << action <<
TString(ord);
11507 Warning(
"ModifyWorkerLists",
"request not completely full filled");
11509 Error(
"ModifyWorkerLists",
"request failed");
11514 if (oo.Contains(
","))
11515 Warning(
"ModifyWorkerLists",
"block request not supported by server: splitting into pieces ...");
11517 while(oo.Tokenize(o, from,
",")) {
11519 mess << action << o;
11574 const char *confdir,
Int_t loglevel)
11576 const char *pn =
"TProof::Open";
11584 ::Error(pn,
"plugin manager not found");
11588 if (
gROOT->IsBatch()) {
11589 ::Error(pn,
"we are in batch mode, cannot show PROOF Session Viewer");
11595 ::Error(pn,
"no plugin found for TSessionViewer");
11599 ::Error(pn,
"plugin for TSessionViewer could not be loaded");
11626 TString sport = opts(it + strlen(
"tunnel="), opts.
Length());
11632 host = sport(0, ic);
11633 sport.
Remove(0, ic + 1);
11644 port = sport.
Atoi();
11647 ::Info(
"TProof::Open",
"using tunnel at %s:%d", host.
Data(), port);
11653 "problems parsing tunnelling info from options: %s", opts.
Data());
11661 if (opts.
Length() > 0) {
11667 locid = opts.
Atoi();
11689 if (!proof || !proof->
IsValid()) {
11691 ::Error(pn,
"new session could not be attached");
11700 if (!proof || !proof->
IsValid()) {
11701 ::Error(pn,
"new session could not be created");
11733 ::Error(
"TProof::Reset",
11734 "unable to initialize a valid manager instance");
11806 Error(
"SaveWorkerInfo",
"gProofServ undefined");
11812 Warning(
"SaveWorkerInfo",
"all relevant worker lists is undefined");
11818 FILE *fwrk = fopen(fnwrk.
Data(),
"w");
11820 Error(
"SaveWorkerInfo",
11821 "cannot open %s for writing (errno: %d)", fnwrk.
Data(), errno);
11831 if (reLogTag.
Match(addlogext) == 2) {
11832 addLogTag = reLogTag[1];
11838 Info(
"SaveWorkerInfo",
"request for additional line with ext: '%s'", addlogext.
Data());
11848 while ((wrk = (
TSlave *) nxa())) {
11851 if (re.
Match(logfile) == 2) logfile = re[1];
11854 fprintf(fwrk,
"%s@%s:%d %d %s %s.log\n",
11858 if (addlogext.
Length() > 0) {
11859 fprintf(fwrk,
"%s@%s:%d %d %s(%s) %s.%s\n",
11869 while ((wrk = (
TSlave *) nxb())) {
11871 if (re.
Match(logfile) == 2) logfile = re[1];
11875 fprintf(fwrk,
"%s@%s:%d 0 %s %s.log\n",
11888 if (re.
Match(logfile) == 2) logfile = re[1];
11890 fprintf(fwrk,
"%s 2 %s %s.log\n",
11893 if (addlogext.
Length() > 0) {
11894 fprintf(fwrk,
"%s 2 %s(%s) %s.%s\n",
11896 logfile.
Data(), addlogext.
Data());
12010 if (!dset || !input || !mgr) {
12011 emsg.
Form(
"invalid inputs (%p, %p, %p)", dset, input, mgr);
12021 TString dsns(dsname), enlname;
12022 Ssiz_t eli = dsns.Index(
"?enl=");
12023 if (eli !=
kNPOS) {
12024 enlname = dsns(eli + strlen(
"?enl="), dsns.Length());
12025 dsns.Remove(eli, dsns.Length()-eli);
12029 if (dsname.
BeginsWith(
"TFileCollection:")) {
12035 emsg.
Form(
"TFileCollection %s not found in input list", dset->
GetName());
12046 input->
Add(
new TNamed(
"PROOF_LookupOpt", lookupopt.
Data()));
12068 if (validEnl && validSdsn && ((
fc = mgr->
GetDataSet(dsns) ))) {
12091 dsns = dsname.
Data();
12094 while (dsns.Tokenize(dsn1, from1,
"[, ]")) {
12097 while (dsn1.
Tokenize(dsn2, from2,
"|")) {
12100 if (ienl !=
kNPOS) {
12101 enlname = dsn2(ienl + 5, dsn2.
Length());
12138 if (!datasets || datasets->
GetSize() <= 0) {
12139 emsg.
Form(
"no dataset(s) found on the master corresponding to: %s", dsname.
Data());
12144 emsg.
Form(
"dataset pointer is null: corruption? - aborting");
12151 lookupopt =
gEnv->
GetValue(
"Proof.LookupOpt",
"stagedOnly");
12152 input->
Add(
new TNamed(
"PROOF_LookupOpt", lookupopt.
Data()));
12172 if (!dsTree.
IsNull() && dsTree !=
"/") {
12175 if (idx !=
kNPOS) {
12177 tree.Remove(0, idx);
12188 TList *srvmapslist = srvmapsref;
12194 if (srvmapsref && !srvmapslist) {
12195 msg.
Form(
"+++ Info: dataset server mapping(s) DISABLED by user");
12196 }
else if (srvmapsref && srvmapslist && srvmapslist != srvmapsref) {
12197 msg.
Form(
"+++ Info: dataset server mapping(s) modified by user");
12198 }
else if (!srvmapsref && srvmapslist) {
12199 msg.
Form(
"+++ Info: dataset server mapping(s) added by user");
12211 TIter nxds(datasets);
12212 while ((pair = (
TPair *) nxds())) {
12222 " entry list %s not found", os->
GetName()));
12227 " no sub-lists in entry-list!"));
12236 ds->SetSrvMaps(srvmapslist);
12237 if (!ds->Add(files, dsTree, availableOnly, missingFiles)) {
12238 emsg.
Form(
"error integrating dataset %s", dataset->
GetName());
12244 if (enl) ds->SetEntryList(enl);
12247 if (!dset->
Add(files, dsTree, availableOnly, missingFiles)) {
12248 emsg.
Form(
"error integrating dataset %s", dataset->
GetName());
12251 if (enl) entrylist = enl;
12253 if (missingFiles) {
12256 TIter next(missingFiles);
12258 while ((
file = next())) {
12260 listOfMissingFiles->
Add(
file);
12263 missingFiles->
Clear();
12269 while ((pair = (
TPair *) nxds())) {
12270 if (pair->
Key())
delete pair->
Key();
12277 if (srvmapslist && srvmapslist != srvmapsref) {
12288 if (listOfMissingFiles && listOfMissingFiles->
GetSize() > 0) {
12289 listOfMissingFiles->
SetName(
"MissingFiles");
12290 input->
Add(listOfMissingFiles);
12307 !cachedir || strlen(cachedir) <= 0)
return 0;
12312 if (!data && !inputdata)
return 0;
12319 if (dstname.BeginsWith(
"cache:")) {
12321 dstname.ReplaceAll(
"cache:",
"");
12322 srcname.
Form(
"%s/%s", cachedir, dstname.Data());
12324 emsg.
Form(
"input data file not found in cache (%s)", srcname.
Data());
12332 emsg.
Form(
"problems copying %s to %s", srcname.
Data(), dstname.Data());
12337 if (inputdata && inputdata->
GetSize() > 0) {
12341 inputdata->
Write();
12345 emsg.
Form(
"could not create %s", dstname.Data());
12349 emsg.
Form(
"no input data!");
12353 ::Info(
"TProof::SaveInputData",
"input data saved to %s", dstname.Data());
12358 input->
Remove(inputdata);
12379 if (!inputdata)
return 0;
12383 emsg.
Form(
"input data file not found in sandbox (%s)", fname.
Data());
12389 emsg.
Form(
"TProof object undefined or invalid: protocol error!");
12406 if (!input || !cachedir || strlen(cachedir) <= 0)
return 0;
12410 if (!inputdata)
return 0;
12415 emsg.
Form(
"input data file not found in cache (%s)", fname.
Data());
12421 added->
SetName(
"PROOF_InputObjsFromFile");
12427 emsg.
Form(
"could not get list of object keys from file");
12432 while ((k = (
TKey *)nxk())) {
12449 emsg.
Form(
"could not open %s", fname.
Data());
12462 if (!
gROOT->IsBatch()) {
12466 gROOT->GetPluginManager()->FindHandler(
"TProofProgressLog"))) {
12469 ::Error(
"TProof::LogViewer",
"cannot load the relevant plug-in");
12476 TString u = (url && strlen(url) <= 0) ?
"lite" : url;
12480 if (url && strlen(url) > 0) {
12481 ::Info(
"TProof::LogViewer",
12482 "batch mode: use TProofLog *pl = TProof::Mgr(\"%s\")->GetSessionLogs(%d)", url, idx);
12483 }
else if (url && strlen(url) <= 0) {
12484 ::Info(
"TProof::LogViewer",
12485 "batch mode: use TProofLog *pl = TProof::Mgr(\"lite\")->GetSessionLogs(%d)", idx);
12487 ::Info(
"TProof::LogViewer",
12488 "batch mode: use TProofLog *pl = TProof::Mgr(\"<master>\")->GetSessionLogs(%d)", idx);
12516 Warning(
"ShowMissingFiles",
"no (last) query found: do nothing");
12523 Info(
"ShowMissingFiles",
"no files missing in query %s:%s", xqr->
GetTitle(), xqr->
GetName());
12527 Int_t nmf = 0, ncf = 0;
12528 Long64_t msz = 0, mszzip = 0, mev = 0;
12531 TIter nxf(missing);
12552 if (msz <= 0) msz = -1;
12553 if (mszzip <= 0) mszzip = -1;
12555 if (msz > 0. || mszzip > 0.) {
12556 Printf(
" +++ %d file(s) missing, %d corrupted, i.e. %lld unprocessed events -->"
12557 " about %.2f%% of the total (%lld bytes, %lld zipped)",
12558 nmf, ncf, mev, xf * 100., msz, mszzip);
12560 Printf(
" +++ %d file(s) missing, %d corrupted, i.e. %lld unprocessed events -->"
12561 " about %.2f%% of the total", nmf, ncf, mev, xf * 100.);
12577 Warning(
"GetMissingFiles",
"no (last) query found: do nothing");
12585 Info(
"ShowMissingFiles",
"no files missing in query %s:%s", xqr->
GetTitle(), xqr->
GetName());
12593 fcname.
Form(
"%s.m0", ds->GetName());
12595 while (
gDirectory->FindObject(fcname) && j < 1000)
12596 fcname.
Form(
"%s.m%d", ds->GetName(), j++);
12599 if (ds)
fc->SetDefaultTreeName(ds->GetObjName());
12602 TIter nxf(missing);
12616 if (pf && strlen(pf) > 0) {
12620 if (withWrks)
SetParameter(
"PROOF_SlaveStatsTrace",
"");
12621 Info(
"SetPerfTree",
"saving of the performance tree enabled (%s)",
fPerfTree.
Data());
12627 Info(
"SetPerfTree",
"saving of the performance tree disabled");
12639 Error(
"SafePerfTree",
"this TProof instance is invalid!");
12645 if (ref && strlen(ref) > 0) {
12647 Error(
"SafePerfTree",
"requested to use query '%s' but player instance undefined!", ref);
12652 Error(
"SafePerfTree",
"TQueryResult instance for query '%s' could not be retrieved", ref);
12656 sref.
Form(
" for requested query '%s'", ref);
12658 if (!outls || (outls && outls->
GetSize() <= 0)) {
12659 Error(
"SafePerfTree",
"outputlist%s undefined or empty", sref.
Data());
12664 if (pf && strlen(pf)) fn = pf;
12665 if (fn.
IsNull()) fn =
"perftree.root";
12667 TFile f(fn,
"RECREATE");
12668 if (
f.IsZombie()) {
12669 Error(
"SavePerfTree",
"could not open file '%s' for writing", fn.
Data());
12674 while ((obj = nxo())) {
12679 if (objname ==
"PROOF_PerfStats" ||
12680 objname ==
"PROOF_PacketsHist" ||
12681 objname ==
"PROOF_EventsHist" ||
12682 objname ==
"PROOF_NodeHist" ||
12683 objname ==
"PROOF_LatencyHist" ||
12684 objname ==
"PROOF_ProcTimeHist" ||
12685 objname ==
"PROOF_CpuTimeHist")
12691 Info(
"SavePerfTree",
"performance information%s saved in %s ...", sref.
Data(), fn.
Data());
static void retrieve(const gsl_integration_workspace *workspace, double *a, double *b, double *r, double *e)
R__EXTERN TApplication * gApplication
const Bool_t kIterBackward
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
static unsigned int total
const Bool_t kSortDescending
R__EXTERN TProofDebug::EProofDebugMask gProofDebugMask
R__EXTERN Int_t gProofDebugLevel
R__EXTERN TProofServ * gProofServ
static Int_t PoDCheckUrl(TString *_cluster)
This a private API function.
const char *const kPROOF_WorkerIdleTO
const char *const kPROOF_PackDir
const char *const kPROOF_ConfFile
const char *const kPROOF_WorkDir
const Long64_t kPROOF_DynWrkPollInt_s
const char *const kPROOF_InputDataFile
R__EXTERN TProof * gProof
const char *const kPROOF_ConfDir
const Int_t kPROOF_Protocol
const char *const kPROOF_TerminateWorker
R__EXTERN TVirtualMutex * gROOTMutex
R__EXTERN TRandom * gRandom
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
Bool_t R_ISLNK(Int_t mode)
Bool_t R_ISDIR(Int_t mode)
R__EXTERN TSystem * gSystem
#define R__LOCKGUARD(mutex)
static struct mg_connection * fc(struct mg_context *ctx)
TSignalHandler * GetSignalHandler() const
Using a TBrowser one can browse all ROOT objects.
TObject * ReadObject(const TClass *cl) override
Read object from I/O buffer.
void WriteString(const char *s) override
Write string to I/O buffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void WriteObject(const TObject *obj, Bool_t cacheReuse=kTRUE) override
Write object to I/O buffer.
A chain is a collection of files containing TTree objects.
Collection abstract base class.
virtual void ls(Option_t *option="") const
List (ls) all objects in this collection.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
void SetName(const char *name)
virtual Int_t GetEntries() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual TObject * Clone(const char *newname="") const
Make a clone of an collection using the Streamer facility.
Bool_t Contains(const char *name) const
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write all objects in this collection.
Manages an element of a TDSet.
const char * GetObjName() const
const char * GetDirectory() const
Return directory where to look for object.
const char * GetMsd() const
const char * GetFileName() const
Long64_t GetFirst() const
This class implements a data set to be used for PROOF processing.
virtual void SetEntryList(TObject *aList)
Set entry (or event) list for this data set.
virtual Bool_t Add(const char *file, const char *objname=0, const char *dir=0, Long64_t first=0, Long64_t num=-1, const char *msd=0)
Add file to list of files to be analyzed.
Bool_t ElementsValid()
Check if all elements are valid.
void SetSrvMaps(TList *srvmaps=0)
Set (or unset) the list for mapping servers coordinate for files.
void Validate()
Validate the TDSet by opening files.
const char * GetType() const
TList * GetListOfElements() const
void SetDirectory(const char *dir)
Set/change directory.
void SetObjName(const char *objname)
Set/change object name.
const char * GetDirectory() const
const char * GetObjName() const
virtual TFileCollection * GetDataSet(const char *uri, const char *server=0)
Utility function used in various methods for user dataset upload.
static TList * GetDataSetSrvMaps()
Static getter for server mapping list.
static TList * ParseDataSetSrvMaps(const TString &srvmaps)
Create a server mapping list from the content of 'srvmaps' Return the list (owned by the caller) or 0...
Bool_t ParseUri(const char *uri, TString *dsGroup=0, TString *dsUser=0, TString *dsName=0, TString *dsTree=0, Bool_t onlyCurrent=kFALSE, Bool_t wildcards=kFALSE)
Parses a (relative) URI that describes a DataSet on the cluster.
Bool_t cd(const char *path=nullptr) override
Change current directory to "this" directory.
Utility class to draw objects in the feedback list during queries.
A List of entry numbers in a TTree or TChain.
virtual TList * GetLists() const
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=nullptr)
Set the value of a resource or create a new resource.
void cd(const char *dir) const
void put(const char *file, const char *remoteName=0)
Class that contains a list of TFileInfo's and accumulated meta data information about its entries.
void Print(Option_t *option="") const
Prints the contents of the TFileCollection.
const char * GetDefaultTreeName() const
Returns the tree set with SetDefaultTreeName if set Returns the name of the first tree in the meta da...
Int_t Add(TFileInfo *info)
Add TFileInfo to the collection.
virtual void Remove()
Remove file event handler from system file handler list.
virtual void Add()
Add file event handler to system file handler list.
Class describing a generic file including meta information.
TUrl * NextUrl()
Iterator function, start iteration by calling ResetUrl().
TUrl * GetFirstUrl() const
TUrl * GetCurrentUrl() const
Return the current url.
TFileInfoMeta * GetMetaData(const char *meta=0) const
Get meta data object with specified name.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
virtual Bool_t IsOpen() const
Returns kTRUE in case file is open and kFALSE if file is not open.
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=nullptr)
Resolve the file type as a function of the protocol field in 'name'.
virtual Bool_t Cp(const char *dst, Bool_t progressbar=kTRUE, UInt_t buffersize=1000000)
Allows to copy this file to the dst URL.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
void Close(Option_t *option="") override
Close a file.
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
TObject * FindObject(const char *name) const
Find object using its name.
TObject * Remove(TObject *obj)
Remove object from the list.
void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
const char * GetHostName() const
Book space in a file, create I/O buffers, to fill them, (un)compress them.
virtual const char * GetClassName() const
virtual void Add(TObject *obj)
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
virtual TObjLink * FirstLink() const
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
virtual void AddAfter(const TObject *after, TObject *obj)
Insert object after object after in the list.
virtual void RecursiveRemove(TObject *obj)
Remove object from this collection and recursively remove the object from all other objects (and coll...
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual void Clear(Option_t *option="")
Remove all objects from the list.
This code implements the MD5 message-digest algorithm.
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
Class supporting a collection of lines with C++ code.
virtual TObjString * AddLine(const char *text)
Add line with text in the list of lines of this macro.
TList * GetListOfLines() const
virtual TObjString * GetLineWith(const char *text) const
Search the first line containing text.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Bool_t AreAllWorkersAssigned()
Return if the determined number of workers has been already assigned to this merger.
Bool_t AreAllWorkersMerged()
Return if merger has already merged all workers, i.e. if it has finished its merging job.
virtual ~TMergerInfo()
Destructor.
void SetMergedWorker()
Increase number of already merged workers by 1.
void AddWorker(TSlave *sl)
Add new worker to the list of workers to be merged by this merger.
void AddMergedObjects(Int_t objects)
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
virtual void RemoveAll()
Remove all sockets from the monitor.
virtual void ActivateAll()
Activate all de-activated sockets.
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void DeActivateAll()
De-activate all activated sockets.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
The TNamed class is the base class for all named ROOT classes.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
virtual void SetName(const char *name)
Set the name of the TNamed.
virtual const char * GetTitle() const
Returns title of object.
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
virtual const char * GetName() const
Returns name of object.
Collectable string class.
const char * GetName() const
Returns name of object.
const TString & GetString() const
Mother of all ROOT objects.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
@ kSingleKey
write collection with single key
virtual const char * GetName() const
Returns name of object.
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
virtual void RecursiveRemove(TObject *obj)
Recursively remove this object from a list.
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
virtual void SysError(const char *method, const char *msgfmt,...) const
Issue system error message.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual TObject * FindObject(const char *name) const
Must be redefined in derived classes.
R__ALWAYS_INLINE Bool_t IsZombie() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual const char * GetTitle() const
Returns title of object.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex 'this' was created with.
The PROOF package manager contains tools to manage packages.
void Show(const char *title=0)
Show available packages.
Int_t Build(const char *pack, Int_t opt=TPackMgr::kCheckROOT)
Method to build a package.
Int_t Install(const char *par, Bool_t rmold=kFALSE)
Install package from par (unpack the file in the directory); par can be an URL for remote retrieval.
void ShowEnabled(const char *title=0)
Show enabled packages.
static Int_t FindParPath(TPackMgr *packmgr, const char *pack, TString &par)
Get the full path to PAR, looking also in the global dirs.
static Int_t RegisterGlobalPath(const char *paths)
Parse one or more paths as possible sources of packages Returns number of paths added; or -1 in case ...
Int_t Load(const char *pack, TList *optls=0)
Method to load a package taking an option list Return -1 on error, 0 otherwise.
Int_t Unload(const char *pack)
Method to unload a package.
TList * GetListOfEnabled() const
Get list of enabled packages Returns a pointer to a TList object, transferring ownership to the calle...
Int_t Remove(const char *pack=0, Bool_t dolock=kTRUE)
Remove package 'pack' If 'pack' is null or empty all packages are cleared.
TMD5 * ReadMD5(const char *pack)
Read MD5 checksum of the PAR file from the PROOF-INF/md5.txt file.
Class used by TMap to store (key,value) pairs.
const char * GetName() const
Returns name of object.
Named parameter, streamable and storable.
const AParamType & GetVal() const
const char * GetName() const
Returns name of object.
Long_t ExecPlugin(int nargs, const T &... params)
Int_t LoadPlugin()
Load the plugin library for this handler.
This class implements a plugin library manager.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Bool_t Notify()
TProof interrupt handler.
const char * Export(Bool_t &changed)
The PROOF manager interacts with the PROOF server coordinator to create or destroy a PROOF session,...
virtual Int_t Reset(Bool_t hard=kFALSE, const char *usr=0)
Send a cleanup request for the sessions associated with the current user.
virtual TList * QuerySessions(Option_t *opt="S")
Get list of sessions accessible to this manager.
static TProofMgr * Create(const char *url, Int_t loglevel=-1, const char *alias=0, Bool_t xpd=kTRUE)
Static method returning the appropriate TProofMgr object using the plugin manager.
virtual Int_t Rm(const char *, const char *=0, const char *=0)
Run 'rm' on 'what'. Locally it is just a call to TSystem::Unlink .
virtual void Find(const char *="~/", const char *=0, const char *=0)
virtual TProof * CreateSession(const char *=0, const char *=0, Int_t=-1)
Create a new remote session (master and associated workers).
virtual Bool_t IsValid() const
virtual Int_t GetFile(const char *, const char *, const char *=0)
virtual TProof * AttachSession(Int_t, Bool_t=kFALSE)
Dummy version provided for completeness.
virtual Bool_t IsProofd() const
virtual TProofDesc * GetProofDesc(Int_t id)
Get TProofDesc instance corresponding to 'id'.
virtual Int_t Stat(const char *, FileStat_t &, const char *=0)
virtual Bool_t IsLite() const
virtual void DiscardSession(TProof *p)
Discard TProofDesc of session 'p' from the internal list.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
const TString & GetMsd() const
const TString & GetImage() const
const TString & GetOrdinal() const
const TString & GetWorkDir() const
Int_t GetPerfIndex() const
const TString & GetNodeName() const
const char * GetName() const
Returns name of object.
Class to steer the merging of files produced on the workers.
const char * GetDir(Bool_t raw=kFALSE) const
const char * GetOutputFileName() const
const char * GetFileName() const
Bool_t IsRetrieve() const
void AttachList(TList *alist)
Attach to list 'alist'.
Container class for processing statistics.
Long64_t GetEntries() const
const char * GetOrdinal() const
const char * GetImage() const
TList * GetEnabledPackages() const
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
virtual void ReleaseWorker(const char *)
const char * GetUser() const
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TSocket * GetSocket() const
const char * GetGroup() const
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
TPackMgr * GetPackMgr() const
void SendParallel(Bool_t async=kFALSE)
Send number of parallel nodes to master or client.
const char * GetSessionDir() const
const char * GetWorkDir() const
const char * GetPrefix() const
Bool_t IsTopMaster() const
This class controls a Parallel ROOT Facility, PROOF, cluster.
void HandleLibIncPath(const char *what, Bool_t add, const char *dirs)
Handle lib, inc search paths modification request.
const char * GetSessionTag() const
Int_t Exec(const char *cmd, ESlaves list, Bool_t plusMaster)
Send command to be executed on the PROOF master and/or slaves.
Int_t GetNumberOfInactiveSlaves() const
Return number of inactive slaves, i.e.
void ShowPackages(Bool_t all=kFALSE, Bool_t redirlog=kFALSE)
List contents of package directory.
virtual void ShowData()
List contents of the data directory in the sandbox.
Int_t SendPrint(Option_t *option="")
Send print command to master server.
static TProofMgr * Mgr(const char *url)
Get instance of the effective manager for 'url' Return 0 on failure.
Bool_t CreateMerger(TSlave *sl, Int_t port)
Create a new merger.
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
Int_t GetNumberOfQueries()
Number of queries processed by this session.
void ActivateAsyncInput()
Activate the a-sync input handler.
TList * GetOutputNames()
FIXME: to be written.
Bool_t IsEndMaster() const
void DisableGoAsyn()
Signal to disable related switches.
void PutLog(TQueryResult *qr)
Display log of query pq into the log window frame.
void NotifyLogMsg(const char *msg, const char *sfx="\n")
Notify locally 'msg' to the appropriate units (file, stdout, window) If defined, 'sfx' is added after...
void Activate(TList *slaves=0)
Activate slave server list.
Int_t UploadPackage(const char *par, EUploadPackageOpt opt=kUntar, TList *workers=0)
Upload a PROOF archive (PAR file).
TMonitor * fCurrentMonitor
TMonitor * fAllUniqueMonitor
void SetFeedback(TString &opt, TString &optfb, Int_t action)
Extract from opt in optfb information about wanted feedback settings.
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
void Close(Option_t *option="")
Close all open slave servers.
TList * fTerminatedSlaveInfos
TList * GetListOfSlaves() const
TMonitor * fUniqueMonitor
Int_t SetParallelSilent(Int_t nodes, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
Int_t DisablePackages()
Remove all packages.
Bool_t fProgressDialogStarted
Int_t DownloadPackage(const char *par, const char *dstdir=0)
Download a PROOF archive (PAR file) from the master package repository.
TMap * GetDataSetQuota(const char *optStr="")
returns a map of the quotas of all groups
static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg)
Save input data file from 'cachedir' into the sandbox or create a the file with input data objects.
void ShowQueries(Option_t *opt="")
Ask the master for the list of queries.
Int_t BuildPackage(const char *package, EBuildPackageOpt opt=kBuildAll, Int_t chkveropt=TPackMgr::kCheckROOT, TList *workers=0)
Build specified package.
virtual void Print(Option_t *option="") const
Print status of PROOF cluster.
Int_t GetClientProtocol() const
virtual ~TProof()
Clean up PROOF environment.
void RemoveChain(TChain *chain)
Remove chain from data set.
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool=kFALSE)
Set up packages, loaded macros, include and lib paths ...
static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg)
Get the input data from the file defined in the input list.
TList * fNonUniqueMasters
Int_t SendObject(const TObject *obj, ESlaves list=kActive)
Send object to master or slave servers.
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action)
Extract from opt information about output handling settings.
TVirtualProofPlayer * fPlayer
void AddFeedback(const char *name)
Add object to feedback list.
Bool_t IsParallel() const
static void LogViewer(const char *url=0, Int_t sessionidx=0)
Start the log viewer window usign the plugin manager.
TVirtualProofPlayer * GetPlayer() const
void CleanGDirectory(TList *ol)
Remove links to objects in list 'ol' from gDirectory.
void DeActivateAsyncInput()
De-activate a-sync input handler.
Int_t BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
Broadcast a raw buffer of specified length to all slaves in the specified list.
static void ResetEnvVars()
Clear the list of environment variables passed to proofserv on the master and slaves.
void AddInputData(TObject *obj, Bool_t push=kFALSE)
Add data objects that might be needed during the processing of the selector (see Process()).
void AskParallel()
Ask the for the number of parallel slaves.
virtual void ClearCache(const char *file=0)
Remove file from all file caches.
virtual void ClearDataSetCache(const char *dataset=0)
Clear the content of the dataset cache, if any (matching 'dataset', if defined).
Int_t ActivateWorker(const char *ord, Bool_t save=kTRUE)
Make sure that the worker identified by the ordinal number 'ord' is in the active list.
Int_t CleanupSession(const char *sessiontag)
Send cleanup request for the session specified by tag.
virtual Bool_t CancelStagingDataSet(const char *dataset)
Cancels a dataset staging request.
TObject * GetParameter(const char *par) const
Get specified parameter.
void SetRunStatus(ERunStatus rst)
Long64_t fLastPollWorkers_s
Int_t Archive(Int_t query, const char *url)
Send archive request for the qry-th query in fQueries.
void AskForOutput(TSlave *sl)
Master asks for output from worker sl.
TList * GetListOfPackages()
Get from the master the list of names of the packages available.
TQueryResult * GetQueryResult(const char *ref=0)
Return pointer to the full TQueryResult instance owned by the player and referenced by 'ref'.
Int_t GetNumberOfSlaves() const
Return number of slaves as described in the config file.
void StartupMessage(const char *msg, Bool_t status, Int_t done, Int_t total)
Send startup message.
static void AssertMacroPath(const char *macro)
Make sure that the directory path contained by macro is in the macro path.
Int_t AddIncludePath(const char *incpath, Bool_t onClient=kFALSE, TList *wrks=0, Bool_t doCollect=kTRUE)
Add 'incpath' to the inc path search.
virtual TMap * GetDataSets(const char *uri="", const char *optStr="")
Lists all datasets that match given uri.
PrintProgress_t fPrintProgress
virtual void ShowCache(Bool_t all=kFALSE)
List contents of file cache.
void ClearFeedback()
Clear feedback list.
static TProof * Open(const char *url=0, const char *conffile=0, const char *confdir=0, Int_t loglevel=0)
Start a PROOF session on a specific cluster.
void Browse(TBrowser *b)
Build the PROOF's structure in the browser.
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
void ResetMergePrg()
Reset the merge progress notificator.
void SetPerfTree(const char *pf="perftree.root", Bool_t withWrks=kFALSE)
Enable/Disable saving of the performance tree.
void ClearData(UInt_t what=kUnregistered, const char *dsname=0)
Remove files for the data directory.
void Touch()
Ping PROOF slaves. Returns the number of slaves that responded.
Int_t AssertPath(const char *path, Bool_t writable)
Make sure that 'path' exists; if 'writable' is kTRUE, make also sure that the path is writable.
virtual void ShowStagingStatusDataSet(const char *dataset, const char *optStr="filter:SsCc")
Like GetStagingStatusDataSet, but displays results immediately.
Int_t VerifyDataSetParallel(const char *uri, const char *optStr)
Internal function for parallel dataset verification used TProof::VerifyDataSet and TProofLite::Verify...
const char * GetConfFile() const
Int_t GetRemoteProtocol() const
Int_t ModifyWorkerLists(const char *ord, Bool_t add, Bool_t save)
Modify the worker active/inactive list by making the worker identified by the ordinal number 'ord' ac...
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
Int_t GoMoreParallel(Int_t nWorkersToAdd)
Add nWorkersToAdd workers to current list of workers.
virtual void ShowDataSetCache(const char *dataset=0)
Display the content of the dataset cache, if any (matching 'dataset', if defined).
const char * GetImage() const
Int_t FindNextFreeMerger()
Return a merger, which is both active and still accepts some workers to be assigned to it.
Int_t GetRC(const char *RCenv, Int_t &env, const char *ord="0")
Get into 'env' the value of integer RC env variable 'rcenv' on node 'ord'.
static void AddEnvVar(const char *name, const char *value)
Add an variable to the list of environment variables passed to proofserv on the master and slaves.
static Bool_t GetFileInCmd(const char *cmd, TString &fn)
Static method to extract the filename (if any) form a CINT command.
virtual void ShowDataSets(const char *uri="", const char *optStr="")
Shows datasets in locations that match the uri.
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
Float_t GetCpuTime() const
void ClearDataProgress(Int_t r, Int_t t)
Progress bar for clear data.
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
Send the input data file to the workers.
void SetPlayer(TVirtualProofPlayer *player)
Set a new PROOF player.
Int_t ClearPackage(const char *package)
Remove a specific package.
Int_t SendInitialState()
Transfer the initial (i.e.
Int_t AddWorkers(TList *wrks)
Works on the master node only.
void GetStatistics(Bool_t verbose=kFALSE)
Get statistics about CPU time, real time and bytes read.
virtual Bool_t RegisterDataSet(const char *name, TFileCollection *dataset, const char *optStr="")
Register the 'dataSet' on the cluster under the current user, group and the given 'dataSetName'.
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
void DeleteParameters(const char *wildcard)
Delete the input list parameters specified by a wildcard (e.g.
void PrepareInputDataFile(TString &dataFile)
Prepare the file with the input data objects to be sent the master; the objects are taken from the de...
void InitMembers()
Default initializations.
void RedirectWorker(TSocket *s, TSlave *sl, Int_t output_size)
Redirect output of worker sl to some merger.
void HandleSubmerger(TMessage *mess, TSlave *sl)
Process a message of type kPROOF_SUBMERGER.
void SetInputDataFile(const char *datafile)
Set the file to be used to optimally distribute the input data objects.
Int_t ClearPackages()
Remove all packages.
void SetParameter(const char *par, const char *value)
Set input list parameter.
virtual Bool_t StartSlaves(Bool_t attach=kFALSE)
Start up PROOF slaves.
Int_t Ping()
Ping PROOF. Returns 1 if master server responded.
void SetActive(Bool_t=kTRUE)
TSlave * CreateSubmaster(const char *url, const char *ord, const char *image, const char *msd, Int_t nwk=1)
Create a new TSlave of type TSlave::kMaster.
void SaveActiveList()
Save current list of active workers.
const char * GetDataPoolUrl() const
virtual Int_t SetDataSetTreeName(const char *dataset, const char *treename)
Set/Change the name of the default tree.
void ShowMissingFiles(TQueryResult *qr=0)
Show information about missing files during query described by 'qr' or the last query if qr is null (...
Int_t RemoveIncludePath(const char *incpath, Bool_t onClient=kFALSE)
Remove 'incpath' from the inc path search.
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'.
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
Int_t UnloadPackage(const char *package)
Unload specified package.
void RemoveFeedback(const char *name)
Remove object from feedback list.
Int_t Remove(Int_t query, Bool_t all=kFALSE)
Send remove request for the qry-th query in fQueries.
std::recursive_mutex fCloseMutex
Bool_t SendingLogToWindow() const
static TPluginHandler * fgLogViewer
Long64_t GetBytesRead() const
void UpdateDialog()
Final update of the progress dialog.
void AskStatistics()
Ask the for the statistics of the slaves.
virtual void SetAlias(const char *alias="")
Set an alias for this session.
Bool_t fFinalizationRunning
void Detach(Option_t *opt="")
Detach this instance to its proofserv.
void PrintProgress(Long64_t total, Long64_t processed, Float_t procTime=-1., Long64_t bytesread=-1)
Print a progress bar on stderr. Used in batch mode.
TPluginHandler * fProgressDialog
Int_t Init(const char *masterurl, const char *conffile, const char *confdir, Int_t loglevel, const char *alias=0)
Start the PROOF environment.
const char * GetConfDir() const
virtual Long64_t DrawSelect(TDSet *dset, const char *varexp, const char *selection="", Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Execute the specified drawing action on a data set (TDSet).
Int_t DeactivateWorker(const char *ord, Bool_t save=kTRUE)
Remove the worker identified by the ordinal number 'ord' from the the active list.
void SetRealTimeLog(Bool_t on=kTRUE)
Switch ON/OFF the real-time logging facility.
void cd(Int_t id=-1)
Set session with 'id' the default one.
TFileCollection * GetMissingFiles(TQueryResult *qr=0)
Get a TFileCollection with the files missing in the query described by 'qr' or the last query if qr i...
void ParseConfigField(const char *config)
The config file field may contain special instructions which need to be parsed at the beginning,...
Int_t GetNumberOfBadSlaves() const
Return number of bad slaves.
Int_t Retrieve(Int_t query, const char *path=0)
Send retrieve request for the qry-th query in fQueries.
Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
Broadcast file to all workers in the specified list.
void ShowParameters(const char *wildcard="PROOF_*") const
Show the input list parameters specified by the wildcard.
TMonitor * fActiveMonitor
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
virtual Int_t RemoveDataSet(const char *dataset, const char *optStr="")
Remove the specified dataset from the PROOF cluster.
void LogMessage(const char *msg, Bool_t all)
Log a message into the appropriate window by emitting a signal.
static void Reset(const char *url, Bool_t hard=kFALSE)
Wrapper around TProofMgr::Reset(...).
virtual Int_t PollForNewWorkers()
Asks the PROOF Serv for new workers in Dynamic Startup mode and activates them.
Int_t fLastAssignedMerger
TList * GetQueryResults()
Return pointer to the list of query results in the player.
void ShowFeedback() const
Show items in feedback list.
Int_t GoParallel(Int_t nodes, Bool_t accept=kFALSE, Bool_t random=kFALSE)
Go in parallel mode with at most "nodes" slaves.
void SetQueryMode(EQueryMode mode)
Change query running mode to the one specified by 'mode'.
Int_t UnloadPackages()
Unload all packages.
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
TList * GetListOfActiveSlaves() const
TSignalHandler * fIntHandler
TList * FindDataSets(const char *searchString, const char *optStr="")
Find datasets, returns in a TList all found datasets.
Int_t SavePerfTree(const char *pf=0, const char *qref=0)
Save performance information from TPerfStats to file 'pf'.
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client.
Int_t AddDynamicPath(const char *libpath, Bool_t onClient=kFALSE, TList *wrks=0, Bool_t doCollect=kTRUE)
Add 'libpath' to the lib path search.
TProof()
Protected constructor to be used by classes deriving from TProof (they have to call Init themselves a...
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
void Progress(Long64_t total, Long64_t processed)
Get query progress information.
Long64_t Finalize(Int_t query=-1, Bool_t force=kFALSE)
Finalize the qry-th query in fQueries.
Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient=kFALSE)
Remove 'libpath' from the lib path search.
void ShowDataSetQuota(Option_t *opt=0)
shows the quota and usage of all groups if opt contains "U" shows also distribution of usage on user-...
void DeleteDrawFeedback(TDrawFeedback *f)
Delete draw feedback object.
virtual TFileCollection * GetStagingStatusDataSet(const char *dataset)
Obtains a TFileCollection showing the staging status of the specified dataset.
Bool_t CheckFile(const char *file, TSlave *sl, Long_t modtime, Int_t cpopt=(kCp|kCpBin))
Check if a file needs to be send to the slave.
Bool_t Prompt(const char *p)
Prompt the question 'p' requiring an answer y,Y,n,N Return kTRUE is the answer was y or Y,...
void SetProgressDialog(Bool_t on=kTRUE)
Enable/Disable the graphic progress dialog.
void GoAsynchronous()
Send GOASYNC message to the master.
TList * GetOutputList()
Get list with all object created during processing (see Process()).
TList * fAvailablePackages
void SetManager(TProofMgr *mgr)
Set manager and schedule its destruction after this for clean operations.
TString Getenv(const char *env, const char *ord="0")
Get value of environment variable 'env' on node 'ord'.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
TList * GetEnabledPackages() const
Int_t GetNumberOfActiveSlaves() const
Return number of active slaves, i.e.
TSlave * CreateSlave(const char *url, const char *ord, Int_t perf, const char *image, const char *workdir)
Create a new TSlave of type TSlave::kSlave.
void GetMaxQueries()
Get max number of queries whose full results are kept in the remote sandbox.
Int_t SendFile(const char *file, Int_t opt=(kBinary|kForward|kCp|kCpBin), const char *rfile=0, TSlave *sl=0)
Send a file to master or slave servers.
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.
friend class TProofInputHandler
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
Float_t GetRealTime() const
Int_t UploadDataSet(const char *, TList *, const char *=0, Int_t=0, TList *=0)
*** This function is deprecated and will disappear in future versions *** *** It is just a wrapper ar...
void MarkBad(TSlave *wrk, const char *reason=0)
Add a bad slave server to the bad slave list and remove it from the active list and from the two moni...
Int_t SendGroupView()
Send to all active slaves servers the current slave group size and their unique id.
Int_t RestoreActiveList()
Restore saved list of active workers.
virtual Int_t Echo(const TObject *obj)
Sends an object to master and workers and expect them to send back a message with the output of its T...
void ShowLog(Int_t qry=-1)
Display on screen the content of the temporary log file.
void ClearInputData(TObject *obj=0)
Remove obj form the input data list; if obj is null (default), clear the input data info.
Int_t SendCommand(const char *cmd, ESlaves list=kActive)
Send command to be executed on the PROOF master and/or slaves.
void Feedback(TList *objs)
Get list of feedback objects.
virtual Bool_t RequestStagingDataSet(const char *dataset)
Allows users to request staging of a particular dataset.
Int_t GetLogLevel() const
TObject * GetOutput(const char *name)
Get specified object that has been produced during the processing (see Process()).
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo's. In case of error return 0.
virtual TTree * GetTreeHeader(TDSet *tdset)
Creates a tree header (a tree with nonexisting files) object for the DataSet.
virtual void ValidateDSet(TDSet *dset)
Validate a TDSet.
TProofMgr::EServType fServType
Int_t UploadDataSetFromFile(const char *, const char *, const char *=0, Int_t=0, TList *=0)
*** This function is deprecated and will disappear in future versions *** *** It is just a wrapper ar...
static TList * fgProofEnvList
virtual void FindUniqueSlaves()
Add to the fUniqueSlave list the active slaves that have a unique (user) file system image.
const char * GetUser() const
void TerminateWorker(TSlave *wrk)
Ask an active worker 'wrk' to terminate, i.e. to shutdown.
static void SystemCmd(const char *cmd, Int_t fdout)
Exec system command 'cmd'. If fdout > -1, append the output to fdout.
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
TString fActiveSlavesSaved
Int_t LoadPackage(const char *package, Bool_t notOnClient=kFALSE, TList *loadopts=0, TList *workers=0)
Load specified package.
Bool_t fSendGroupView
list returned by kPROOF_GETSLAVEINFO
void ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst, Long64_t ent)
Reset progress dialog.
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
Set draw feedback option.
friend class TProofInterruptHandler
virtual TFileCollection * GetDataSet(const char *dataset, const char *optStr="")
Get a list of TFileInfo objects describing the files of the specified dataset.
TDrawFeedback * CreateDrawFeedback()
Draw feedback creation proxy.
void AddChain(TChain *chain)
Add chain to data set.
static const TList * GetEnvVars()
Get environemnt variables.
TSlave * FindSlave(TSocket *s) const
Find slave that has TSocket s. Returns 0 in case slave is not found.
TList * GetListOfEnabledPackages()
Get from the master the list of names of the packages enabled.
TList * GetFeedbackList() const
Return feedback list.
virtual void SendInputDataFile()
Send the input data objects to the master; the objects are taken from the dedicated list and / or the...
virtual TList * GetListOfQueries(Option_t *opt="")
Ask the master for the list of queries.
static void DelEnvVar(const char *name)
Remove an variable from the list of environment variables passed to proofserv on the master and slave...
Int_t BroadcastObject(const TObject *obj, Int_t kind, TList *slaves)
Broadcast an object to all slaves in the specified list.
void SetLogLevel(Int_t level, UInt_t mask=TProofDebug::kAll)
Set server logging level.
TList * fEnabledPackagesOnCluster
virtual Long64_t Process(TDSet *dset, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process a data set (TDSet) using the specified selector (.C) file or Tselector object Entry- or event...
void ClearInput()
Clear input object list.
TList * GetListOfBadSlaves() const
Int_t GetSandbox(TString &sb, Bool_t assert=kFALSE, const char *rc=0)
Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
Int_t DisablePackage(const char *package)
Remove a specific package.
Int_t GetQueryReference(Int_t qry, TString &ref)
Get reference for the qry-th query in fQueries (as displayed by ShowQueries).
Int_t GetNumberOfUniqueSlaves() const
Return number of unique slaves, i.e.
virtual Bool_t ExistsDataSet(const char *dataset)
Returns kTRUE if 'dataset' exists, kFALSE otherwise.
void ShowDataSet(const char *dataset="", const char *opt="filter:SsCc")
display meta-info for given dataset usi
Int_t HandleInputMessage(TSlave *wrk, TMessage *m, Bool_t deactonfail=kFALSE)
Analyze the received message.
void SetDSet(TDSet *dset)
void AddInput(TObject *obj)
Add objects that might be needed during the processing of the selector (see Process()).
void RecvLogFile(TSocket *s, Int_t size)
Receive the log file of the slave with socket s.
void CloseProgressDialog()
Close progress dialog.
TProofOutputList fOutputList
Int_t GetActiveMergersCount()
Get the active mergers count.
void DataSetStatus(const char *msg, Bool_t status, Int_t done, Int_t total)
Send dataset preparation status.
TMacro * GetLastLog()
Fill a TMacro with the log lines since the last reading (fLogFileR) Return (TMacro *)0 if no line was...
TList * GetInputList()
Get input list.
void ReleaseMonitor(TMonitor *mon)
Release the used monitor to be used, making sure to delete newly created monitors.
void ShowEnabledPackages(Bool_t all=kFALSE)
List which packages are enabled.
void SetMaxDrawQueries(Int_t max)
Set max number of draw queries whose results are saved.
void GetLog(Int_t start=-1, Int_t end=-1)
Ask for remote logs in the range [start, end].
Int_t CollectInputFrom(TSocket *s, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect and analyze available input from socket s.
const char * GetMaster() const
void QueryResultReady(const char *ref)
Notify availability of a query result.
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Int_t EnablePackage(const char *package, Bool_t notOnClient=kFALSE, TList *workers=0)
Enable specified package.
Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
See if the data is ready to be analyzed.
const char * GetGroup() const
virtual Int_t VerifyDataSet(const char *dataset, const char *optStr="")
Verify if all files in the specified dataset are available.
void Emit(const char *signal, const T &arg)
Activate signal with single parameter.
void EmitVA(const char *signal_name, Int_t, const T &... params)
Emit a signal with a varying number of arguments.
A container class for query results.
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
Long64_t GetEntries() const
void SetTermTime(Float_t termtime)
void SetArchived(const char *archfile)
Set (or update) query in archived state.
void SetPrepTime(Float_t preptime)
virtual void SetInputList(TList *in, Bool_t adopt=kTRUE)
Set / change the input list.
TObject * GetInputObject(const char *classname) const
Return first instance of class 'classname' in the input list.
void AddInput(TObject *obj)
Add obj to the input list.
TMacro * GetLogFile() const
Bool_t IsFinalized() const
static const char * GetMacroPath()
Get macro search path. Static utility function.
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
virtual Double_t Rndm()
Machine independent random number generator.
virtual UInt_t Integer(UInt_t imax)
Returns a random integer uniformly distributed on the interval [ 0, imax-1 ].
Regular expression class.
virtual const char * AsString(TString &out)
Returns short string with relevant information about this security context.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
Sequenceable collection abstract base class.
virtual void Add()
Add signal handler to system signal handler list.
virtual void Remove()
Remove signal handler from system signal handler list.
Int_t Compare(const TObject *obj) const
Used to sort slaveinfos by ordinal.
SysInfo_t GetSysInfo() const
void SetStatus(ESlaveStatus stat)
const char * GetOrdinal() const
void SetSysInfo(SysInfo_t si)
Setter for fSysInfo.
void SetOrdinal(const char *ord)
const char * GetName() const
Returns name of object.
Bool_t IsEqual(const TObject *obj) const
Used to compare slaveinfos by ordinal.
const char * GetDataDir() const
void Print(Option_t *option="") const
Print slave info.
Class describing a PROOF worker server.
virtual void SetAlias(const char *alias)
Set an alias for this session.
const char * GetWorkDir() const
Int_t GetSlaveType() const
Int_t GetParallel() const
const char * GetImage() const
void SetArchCompiler(const char *ac)
TSocket * GetSocket() const
virtual void SetStatus(Int_t st)
virtual void StopProcess(Bool_t abort, Int_t timeout)
Sent stop/abort request to PROOF server.
virtual void FlushSocket()
const char * GetUser() const
TFileHandler * GetInputHandler() const
const char * GetMsd() const
static TSlave * Create(const char *url, const char *ord, Int_t perf, const char *image, TProof *proof, Int_t stype, const char *workdir, const char *msd, Int_t nwk=1)
Static method returning the appropriate TSlave object for the remote server.
void SetInputHandler(TFileHandler *ih)
Adopt and register input handler for this slave.
virtual void Interrupt(Int_t type)
Send interrupt OOB byte to master or slave servers.
virtual void SetInterruptHandler(Bool_t)
virtual Int_t SetupServ(Int_t stype, const char *conffile)
Init a PROOF slave object.
virtual Int_t Ping()
Ping the remote master or slave servers.
virtual Int_t SendGroupPriority(const char *, Int_t)
const char * GetName() const
Returns name of object.
virtual Bool_t IsValid() const
void SetSessionTag(const char *st)
const char * GetOrdinal() const
const char * GetProofWorkDir() const
void SetROOTVersion(const char *rv)
virtual void Close(Option_t *opt="")
Close slave socket.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Int_t GetRemoteProtocol() const
TInetAddress GetInetAddress() const
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
TSecContext * GetSecContext() const
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
virtual Bool_t IsValid() const
virtual Int_t Reconnect()
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
A sorted doubly linked list.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Stop()
Stop the stopwatch.
int CompareTo(const char *cs, ECaseCompare cmp=kExact) const
Compare a string to char *cs2.
TString & Insert(Ssiz_t pos, const char *s)
Int_t Atoi() const
Return integer value of string.
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
TSubString Strip(EStripType s=kTrailing, char c=' ') const
Return a substring of self stripped at beginning and/or end.
Double_t Atof() const
Return floating-point value contained in string.
Bool_t IsFloat() const
Returns kTRUE if string contains a floating point or integer number.
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
Ssiz_t First(char c) const
Find first occurrence of a character c.
const char * Data() const
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
TString & ReplaceAll(const TString &s1, const TString &s2)
Ssiz_t Last(char c) const
Find last occurrence of a character c.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
TString & Prepend(const char *cs)
TString & Remove(Ssiz_t pos)
TString & Append(const char *cs)
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=nullptr)
Redirect standard output (stdout, stderr) to the specified file.
virtual const char * GetBuildCompilerVersion() const
Return the build compiler version.
static void ResetErrno()
Static function resetting system error number.
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
static Int_t GetErrno()
Static function returning system error number.
virtual void AddIncludePath(const char *includePath)
Add a directory to the already set include path.
virtual int Chmod(const char *file, UInt_t mode)
Set the file permission bits. Returns -1 in case or error, 0 otherwise.
virtual int GetPid()
Get process id.
virtual int CopyFile(const char *from, const char *to, Bool_t overwrite=kFALSE)
Copy a file.
virtual const char * Getenv(const char *env)
Get environment variable.
virtual const char * GetIncludePath()
Get the list of include path.
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form:
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
virtual Int_t Exec(const char *shellcmd)
Execute a command.
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
virtual FILE * OpenPipe(const char *command, const char *mode)
Open a pipe.
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
virtual FILE * TempFileName(TString &base, const char *dir=nullptr)
Create a secure temporary file by appending a unique 6 letter string to base.
virtual int ClosePipe(FILE *pipe)
Close the pipe.
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
virtual const char * GetDynamicPath()
Return the dynamic path (used to find shared libraries).
virtual TString GetFromPipe(const char *command)
Execute command and return output in TString.
virtual const char * HostName()
Return the system's host name.
virtual void Unsetenv(const char *name)
Unset environment variable.
virtual Bool_t IsAbsoluteFileName(const char *dir)
Return true if dir is an absolute pathname.
virtual void SetDynamicPath(const char *pathname)
Set the dynamic path to a new value.
virtual const char * WorkingDirectory()
Return working directory.
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
virtual void Setenv(const char *name, const char *value)
Set environment variable.
virtual const char * GetBuildArch() const
Return the build architecture.
virtual TString GetDirName(const char *pathname)
Return the directory name in pathname.
virtual int Unlink(const char *name)
Unlink, i.e.
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
A TTree represents a columnar dataset.
virtual Long64_t GetMaxEntryLoop() const
This class represents a RFC 3986 compatible URI.
Bool_t SetFragment(const TString &fragment)
Set fragment component of URI:
const TString GetUri() const
Returns the whole URI - an implementation of chapter 5.3 component recomposition.
This class represents a WWW compatible URL.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
const char * GetFile() const
void SetUser(const char *user)
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
const char * GetUser() const
const char * GetHost() const
void SetOptions(const char *opt)
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
const char * GetOptions() const
void SetHost(const char *host)
const char * GetProtocol() const
void SetPasswd(const char *pw)
The packetizer is a load balancing object created for each query.
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
virtual void MarkBad(TSlave *, TProofProgressStatus *, TList **)
Abstract interface for the PROOF player.
virtual TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)=0
virtual void AddEventsProcessed(Long64_t ev)=0
virtual void AddInput(TObject *inp)=0
virtual TQueryResult * GetCurrentQuery() const =0
virtual void AddOutput(TList *out)=0
virtual TList * GetInputList() const =0
virtual TObject * GetOutput(const char *name) const =0
virtual void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)=0
virtual void SetMaxDrawQueries(Int_t max)=0
virtual Long64_t GetEventsProcessed() const =0
virtual Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
virtual TQueryResult * GetQueryResult(const char *ref)=0
virtual void SetMerging(Bool_t on=kTRUE)=0
virtual EExitStatus GetExitStatus() const =0
virtual void Progress(Long64_t total, Long64_t processed)=0
virtual void AddQueryResult(TQueryResult *q)=0
static TVirtualProofPlayer * Create(const char *player, TProof *p, TSocket *s=0)
Create a PROOF player.
virtual void UpdateAutoBin(const char *name, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)=0
virtual Int_t AddOutputObject(TObject *obj)=0
virtual Bool_t JoinProcess(TList *workers)=0
virtual TList * GetListOfResults() const =0
virtual void ClearInput()=0
virtual void HandleRecvHisto(TMessage *mess)=0
virtual void RemoveQueryResult(const char *ref)=0
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)=0
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
virtual void StoreFeedback(TObject *slave, TList *out)=0
virtual TDrawFeedback * CreateDrawFeedback(TProof *p)=0
virtual TVirtualPacketizer * GetPacketizer() const
virtual void DeleteDrawFeedback(TDrawFeedback *f)=0
virtual TList * GetOutputList() const =0
virtual void SetInitTime()=0
virtual void SetCurrentQuery(TQueryResult *q)=0
Int_t Nint(T x)
Round to nearest integer. Rounds half integers to the nearest even integer.
Double_t Sqrt(Double_t x)