43 # include <sys/stat.h> 44 # include <sys/types.h> 45 # include "snprintf.h" 51 #include "RConfigure.h" 121 if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
124 fProof->StopProcess(
kTRUE);
129 if (fProof->GetRemoteProtocol() < 22) {
130 a = Getline(
"\nSwitch to asynchronous mode not supported remotely:" 131 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
133 a = Getline(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit," 134 " any other key to continue: ");
136 if (a[0] ==
'Q' || a[0] ==
'S' || a[0] ==
'q' || a[0] ==
's') {
138 Info(
"Notify",
"Processing interrupt signal ... %c", a[0]);
142 fProof->StopProcess(abort);
144 }
else if ((a[0] ==
'A' || a[0] ==
'a') && fProof->GetRemoteProtocol() >= 22) {
146 fProof->GoAsynchronous();
159 fSocket(s), fProof(p)
186 if (!si)
return fOrdinal.CompareTo(obj->GetName());
188 const char *myord = GetOrdinal();
190 while (myord && otherord) {
191 Int_t myval = atoi(myord);
192 Int_t otherval = atoi(otherord);
193 if (myval < otherval)
return 1;
194 if (myval > otherval)
return -1;
195 myord = strchr(myord,
'.');
197 otherord = strchr(otherord,
'.');
198 if (otherord) otherord++;
200 if (myord)
return -1;
201 if (otherord)
return 1;
213 return (strcmp(GetOrdinal(), si->
GetOrdinal()) == 0);
224 TString stat = fStatus == kActive ?
"active" :
225 fStatus == kBad ?
"bad" :
234 if (oo ==
"active" && fStatus != kActive)
return;
235 if (oo ==
"notactive" && fStatus != kNotActive)
return;
236 if (oo ==
"bad" && fStatus != kBad)
return;
240 if (!(fMsd.IsNull())) msd.
Form(
"| msd: %s ", fMsd.Data());
241 if (!(fDataDir.IsNull())) datadir.
Form(
"| datadir: %s ", fDataDir.Data());
242 if (fSysInfo.fCpus > 0) {
243 si.
Form(
"| %s, %d cores, %d MB ram", fHostName.Data(),
244 fSysInfo.fCpus, fSysInfo.fPhysRam);
246 si.
Form(
"| %s", fHostName.Data());
253 std::cout <<
"Slave: " << fOrdinal
254 <<
" hostname: " << fHostName
256 <<
" perf index: " << fPerfIndex
267 fSysInfo.fOS = si.
fOS;
268 fSysInfo.fModel = si.
fModel;
270 fSysInfo.fCpus = si.
fCpus;
288 fWorkers->SetOwner(
kFALSE);
297 if (AreAllWorkersMerged())
298 Error(
"SetMergedWorker",
"all workers have been already merged before!");
309 fWorkers =
new TList();
310 if (fWorkersToMerge == fWorkers->GetSize()) {
311 Error(
"AddWorker",
"all workers have been already assigned to this merger");
322 return (fWorkersToMerge == fMergedWorkers);
333 return (fWorkers->GetSize() == fWorkersToMerge);
365 if( 0 == _cluster->
Length() ) {
366 Error(
"PoDCheckUrl",
"PoD server is not running");
406 if (!masterurl || strlen(masterurl) <= 0) {
409 }
else if (!(strstr(masterurl,
"://"))) {
446 }
else if (
fMaster ==
"prooflite") {
455 Init(masterurl, conffile, confdir, loglevel, alias);
460 if (
Exec(
"gProofServ->GetUser()",
"0",
kTRUE) == 0) {
467 emsg =
"could not find 'const char *' string in macro log";
470 emsg =
"could not retrieve user info";
479 Warning(
"TProof",
"%s: using local default %s", emsg.
Data(), usr.
Data());
489 gROOT->GetListOfSockets()->Remove(mgr);
490 gROOT->GetListOfSockets()->Add(mgr);
495 if (!
gROOT->GetListOfProofs()->FindObject(
this))
496 gROOT->GetListOfProofs()->Add(
this);
515 if (!
gROOT->GetListOfProofs()->FindObject(
this))
516 gROOT->GetListOfProofs()->Add(
this);
623 while (envs.Tokenize(env, from,
",")) {
626 Warning(
"Init",
"request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
628 if (!envsfound.IsNull()) envsfound +=
",";
635 if (envsfound.IsNull()) {
636 Warning(
"Init",
"none of the requested env variables were found: '%s'", envs.Data());
638 Info(
"Init",
"the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
718 gROOT->GetListOfProofs()->Remove(
this);
723 if (gProof && gProof ==
this) {
726 while ((gProof = (
TProof *)pvp())) {
734 Emit(
"CloseWindow()");
746 const char *confdir,
Int_t loglevel,
const char *alias)
770 if (!conffile || !conffile[0])
772 if (!confdir || !confdir[0])
828 Error(
"Init",
"could not create temporary logfile");
830 Error(
"Init",
"could not open temp logfile for reading");
892 if (enableSchemaEvolution) {
895 Info(
"TProof",
"automatic schema evolution in TMessage explicitly disabled");
905 Error(
"Init",
"failure asserting sandbox directory %s", sandbox.
Data());
914 Error(
"Init",
"failure asserting directory %s", packdir.
Data());
919 Info(
"Init",
"package directory set to %s", packdir.
Data());
928 Info(
"Init",
" %d global package directories registered", nglb);
950 GetRC(
"Proof.DynamicStartup", dyn);
967 if (s.IsDigit()) nwrk = s.
Atoi();
990 gROOT->GetListOfSockets()->Add(
this);
1015 }
else if (sb ==
"..") {
1038 const char *cq = (
IsLite()) ?
"\"" :
"";
1039 while (sconf.Tokenize(opt, from,
",")) {
1040 if (opt.
IsNull())
continue;
1047 TString mst, top, sub, wrk, all;
1062 if (all !=
"" && mst ==
"") mst = all;
1063 if (all !=
"" && top ==
"") top = all;
1064 if (all !=
"" && sub ==
"") sub = all;
1065 if (all !=
"" && wrk ==
"") wrk = all;
1066 if (all !=
"" && all.
BeginsWith(
"valgrind_opts:")) {
1068 Info(
"ParseConfigField",
"valgrind run: resetting 'PROOF_WRAPPERCMD':" 1069 " must be set again for next run , if any");
1073 cmd.
Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1074 TString mstlab(
"NO"), wrklab(
"NO");
1075 Bool_t doMaster = (opt ==
"valgrind" || (opt.
Contains(
"master") &&
1081 if (mst ==
"" || mst.
BeginsWith(
"valgrind_opts:")) {
1083 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), mst.
Data());
1086 }
else if (mst !=
"") {
1092 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1094 if (!opt.
Contains(
"workers"))
return;
1096 if (opt ==
"valgrind" || opt ==
"valgrind=") opt =
"valgrind=workers";
1101 if (top ==
"" || top.
BeginsWith(
"valgrind_opts:")) {
1103 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), top.
Data());
1106 }
else if (top !=
"") {
1112 if (sub ==
"" || sub.
BeginsWith(
"valgrind_opts:")) {
1114 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), sub.
Data());
1117 }
else if (sub !=
"") {
1123 if (wrk ==
"" || wrk.
BeginsWith(
"valgrind_opts:")) {
1125 var.
Form(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.
Data(), wrk.
Data(), cq);
1130 nwrks = opt(inw+1, opt.
Length());
1131 if (!nwrks.
IsDigit()) nwrks =
"2";
1143 }
else if (wrk !=
"") {
1157 Printf(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.
Data());
1159 Printf(
" ---> Starting a debug run with valgrind (workers:%s)", wrklab.
Data());
1161 Printf(
" ---> Please be patient: startup may be VERY slow ...");
1162 Printf(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1163 Printf(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1172 Printf(
"*** Requested IgProf performance profiling ***");
1173 TString addLogExt =
"__igprof.pp__.log";
1174 TString addLogFmt =
"igprof -pk -pp -t proofserv.exe -o %s.%s";
1182 tmp.
Form(addLogFmt.
Data(),
"<logfilemst>", addLogExt.
Data());
1185 tmp.
Form(addLogFmt.
Data(),
"<logfilewrk>", addLogExt.
Data());
1211 if ((c !=
'+') && ((c < '0') || (c >
'9')))
1230 if (
IsLite() && cpuPin) {
1231 Printf(
"*** Requested CPU pinning ***");
1233 const char *pinCmd =
"taskset -c <cpupin>";
1236 if (ev && (p = dynamic_cast<TNamed *>(ev->
FindObject(
"PROOF_SLAVE_WRAPPERCMD")))) {
1242 val.
Form(
"\"%s\"", pinCmd);
1255 if (!inpath || strlen(inpath) <= 0) {
1256 Error(
"AssertPath",
"undefined input path");
1265 Error(
"AssertPath",
"could not create path %s", path.
Data());
1272 Error(
"AssertPath",
"could not make path %s writable", path.
Data());
1291 gROOT->GetListOfSockets()->Remove(mgr);
1292 gROOT->GetListOfSockets()->Add(mgr);
1307 Error(
"AddWorkers",
"AddWorkers can only be called on the master!");
1311 if (!workerList || !(workerList->
GetSize())) {
1312 Error(
"AddWorkers",
"empty list of workers!");
1333 if (!addedWorkers) {
1335 Error(
"AddWorkers",
"cannot create new list for the workers to be added");
1343 while ((to = next())) {
1370 if (wn ==
"localhost" || wn.BeginsWith(
"localhost.")) wn =
gSystem->
HostName();
1392 addedWorkers->
Add(slave);
1400 Info(
"AddWorkers",
"worker on host %s created" 1406 m <<
TString(
"Opening connections to workers") << nSlaves
1407 << nSlavesDone << slaveOk;
1421 TIter nxsl(addedWorkers);
1423 while ((sl = (
TSlave *) nxsl())) {
1434 Info(
"AddWorkers",
"worker on host %s finalized" 1444 m <<
TString(
"Setting up worker servers") << nSlaves
1445 << nSlavesDone << slaveOk;
1458 if (s.IsDigit()) nwrk = s.
Atoi();
1464 Info(
"AddWorkers",
"will invoke GoMoreParallel()");
1467 Info(
"AddWorkers",
"GoMoreParallel()=%d", nw);
1473 Info(
"AddWorkers",
"will invoke GoParallel()");
1482 Info(
"AddWorkers",
"will invoke SaveWorkerInfo()");
1488 Info(
"AddWorkers",
"will invoke SendParallel()");
1491 if (goMoreParallel &&
fPlayer) {
1496 Info(
"AddWorkers",
"will send the PROCESS message to selected workers");
1504 delete addedWorkers;
1516 if (packs && packs->
GetSize() > 0) {
1519 while ((pck = (
TPair *) nxp())) {
1525 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on added workers");
1530 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on all workers");
1543 Info(
"SetupWorkersEnv",
"will invoke Load() on selected workers");
1552 dyn.ReplaceAll(
":",
" ");
1553 dyn.ReplaceAll(
"\"",
" ");
1555 Info(
"SetupWorkersEnv",
"will invoke AddDynamicPath() on selected workers");
1563 Info(
"SetupWorkersEnv",
"will invoke AddIncludePath() on selected workers");
1578 Error(
"RemoveWorkers",
"RemoveWorkers can only be called on the master!");
1588 while ((sl = (
TSlave *) nxsl())) {
1594 if (!(workerList->
GetSize())) {
1595 Error(
"RemoveWorkers",
"The list of workers should not be empty!");
1603 while ((to = next())) {
1605 if (!strcmp(to->
ClassName(),
"TProofNodeInfo")) {
1609 while ((sl = (
TSlave *) nxsl())) {
1617 Warning(
"RemoveWorkers",
"unknown object type: %s - it should be" 1618 " TProofNodeInfo or inheriting from TSlave", to->
ClassName());
1648 TString emsg(
"no resource currently available for this session: please retry later");
1660 Printf(
"Starting master: opening connection ...");
1666 fprintf(stderr,
"Starting master:" 1667 " connection open: setting up server ... \r");
1681 Printf(
"Starting master: OK ");
1687 Error(
"StartSlaves",
1688 "client and remote protocols not compatible (%d and %d)",
1707 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1710 if (slStatus == -99)
1711 Error(
"StartSlaves",
"no resources available or problems setting up workers (check logs)");
1712 else if (slStatus == -98)
1713 Error(
"StartSlaves",
"could not setup output redirection on master");
1715 Error(
"StartSlaves",
"setting up master");
1726 Error(
"StartSlaves",
1727 "failed to setup connection with PROOF master server");
1733 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1739 Printf(
"Starting master: failure");
1744 Printf(
"Starting master: OK ");
1749 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1762 Error(
"StartSlaves",
"failed to create (or connect to) the PROOF master server");
1778 { std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
1787 while ((sl = (
TSlave *)nxs()))
1801 gROOT->GetListOfSockets()->Remove(
this);
1813 gROOT->GetListOfProofs()->Remove(
this);
1814 if (gProof && gProof ==
this) {
1817 while ((gProof = (
TProof *)pvp())) {
1832 Int_t perf,
const char *image,
const char *workdir)
1854 const char *image,
const char *msd,
Int_t nwk)
1874 while ((sl = (
TSlave *)next())) {
1900 while (
TSlave *sl = dynamic_cast<TSlave*>(next())) {
1901 if (
fImage == sl->fImage) {
1911 TSlave *replace_slave = 0;
1913 while (
TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1914 if (sl->fImage == sl2->fImage) {
1919 replace_slave = sl2;
1926 Error(
"FindUniqueSlaves",
"TSlave is neither Master nor Slave");
1939 if (replace_slave) {
2031 if (s.Contains(
"Total MB's processed:")) {
2034 }
else if (s.Contains(
"Total real time used (s):")) {
2035 s.ReplaceAll(
"Total real time used (s):",
"");
2037 }
else if (s.Contains(
"Total CPU time used (s):")) {
2038 s.ReplaceAll(
"Total CPU time used (s):",
"");
2039 if (s.IsFloat())
fCpuTime = s.Atof();
2047 Printf(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2151 Printf(
"+++ Options: \"A\" show all queries known to server");
2152 Printf(
"+++ \"L\" show retrieved queries");
2153 Printf(
"+++ \"F\" full listing of query info");
2154 Printf(
"+++ \"H\" print this menu");
2156 Printf(
"+++ (case insensitive)");
2158 Printf(
"+++ Use Retrieve(<#>) to retrieve the full" 2159 " query results from the master");
2160 Printf(
"+++ e.g. Retrieve(8)");
2190 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2192 while ((pq = nxq()))
2199 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2206 Printf(
"+++ Queries available locally: %d", listlocal->
GetSize());
2207 TIter nxlq(listlocal);
2208 while ((pq = nxlq()))
2224 while (
TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2234 if (submasters.
GetSize() > 0) {
2242 EmitVA(
"IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2245 Info(
"IsDataReady",
"%lld / %lld (%s)",
2246 bytesready, totalbytes,
fDataReady?
"READY":
"NOT READY");
2264 if (slaves->
GetSize() == 0)
return;
2269 while ((sl = (
TSlave *)next())) {
2288 Int_t nparallel = 0;
2289 while (
TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2290 if (sl->GetParallel() >= 0)
2291 nparallel += sl->GetParallel();
2314 while ((slave = (
TSlave *) next()) != 0) {
2324 while ((activeslave = (
TSlave *) nextactive())) {
2333 while ((badslave = (
TSlave *) nextbad())) {
2342 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2350 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2355 Error(
"GetSlaveInfo",
"TSlave is neither Master nor Slave");
2376 while ((sl = (
TSlave*) next())) {
2406 if (workers->
GetSize() == 0)
return 0;
2409 TIter next(workers);
2412 while ((wrk = (
TSlave *)next())) {
2415 MarkBad(wrk,
"could not send group priority");
2457 if (!slaves || slaves->
GetSize() == 0)
return 0;
2463 while ((sl = (
TSlave *)next())) {
2466 MarkBad(sl,
"could not broadcast request");
2549 if (slaves->
GetSize() == 0)
return 0;
2555 while ((sl = (
TSlave *)next())) {
2558 MarkBad(sl,
"could not send broadcast-raw request");
2592 if (wrks->
GetSize() == 0)
return 0;
2598 while ((wrk = (
TSlave *)next())) {
2600 if (
SendFile(file, opt, rfile, wrk) < 0)
2601 Error(
"BroadcastFile",
2602 "problems sending file to worker %s (%s)",
2662 rc =
Collect(mon, timeout, endtype, deactonfail);
2688 while ((sl = (
TSlave*) next())) {
2693 rc =
Collect(mon, timeout, endtype, deactonfail);
2720 rc =
Collect(mon, timeout, endtype, deactonfail);
2739 Info(
"Collect",
">>>>>> Entering collect responses #%04d", collectId);
2769 int cnt = 0, rc = 0;
2774 Info(
"Collect",
"#%04d: active: %d", collectId, mon->
GetActive());
2786 while ((nact = mon->
GetActive(sto)) && (nto < 0 || nto > 0)) {
2792 if (al && al->
GetSize() > 0) {
2793 Info(
"Collect",
" %d node(s) still active:", al->
GetSize());
2796 while ((xs = (
TSocket *)nxs())) {
2814 fLastPollWorkers_s = time(0);
2817 Info(
"Collect",
"#%04d: now active: %d", collectId, mon->
GetActive());
2822 Info(
"Collect",
"Will invoke Select() #%04d", collectId);
2825 if (s && s != (
TSocket *)(-1)) {
2828 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2832 Info(
"Collect",
"#%04d: deactivating %p (active: %d, %p)", collectId,
2835 }
else if (rc == 2) {
2841 Info(
"Collect",
"save monitor: deactivating %p (active: %d, %p)",
2858 if (s == (
TSocket *)(-1) && nto > 0)
2870 while (mxws && (wrk = (
TSlave *) nxwr())) {
2875 Info(
"Collect",
"worker %s was asked to send its output to master",
2900 if (al && al->
GetSize() > 0) {
2902 Info(
"Collect",
" %d node(s) went in timeout:", al->
GetSize());
2905 while ((xs = (
TSocket *)nxs())) {
2933 Info(
"Collect",
"<<<<<< Exiting collect responses #%04d", collectId);
2950 Error(
"PollForNewWorkers",
"Can't invoke: not on a master -- should not happen!");
2954 Error(
"PollForNewWorkers",
"No ProofServ available -- should not happen!");
2964 TIter next(reqWorkers);
2967 while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2975 while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2982 if (found)
delete ni;
2984 newWorkers->
Add(ni);
2986 Info(
"PollForNewWorkers",
"New worker found: %s:%s",
2996 if (nNewWorkers > 0) {
2998 Info(
"PollForNewWorkers",
"Requesting to add %d new worker(s)", newWorkers->
GetEntries());
3001 Error(
"PollForNewWorkers",
"Call to AddWorkers() failed (got %d < 0)", rv);
3008 Info(
"PollForNewWorkers",
"No new worker found");
3037 if ((recvrc = s->
Recv(mess)) < 0) {
3039 Info(
"CollectInputFrom",
"%p: got %d from Recv()", s, recvrc);
3050 MarkBad(s,
"problems receiving a message in TProof::CollectInputFrom(...)");
3056 MarkBad(s,
"undefined message in TProof::CollectInputFrom(...)");
3064 if (rc == 1 && (endtype >= 0) && (what != endtype))
3084 Warning(
"HandleInputMessage",
"given an empty message or undefined worker");
3090 Warning(
"HandleInputMessage",
"worker socket is undefined");
3098 Info(
"HandleInputMessage",
"got type %d from '%s'", what, sl->
GetOrdinal());
3117 MarkBad(s,
"received kPROOF_FATAL");
3130 Info(
"HandleInputMessage",
"received kPROOF_STOP from %s: disabling any further collection this worker",
3162 Info(
"HandleInputMessage",
"%s: kPROOF_GETPACKET", sl->
GetOrdinal());
3204 Info(
"HandleInputMessage",
"%s: kPROOF_LOGFILE: size: %d", sl->
GetOrdinal(), size);
3212 Info(
"HandleInputMessage",
"%s: kPROOF_LOGDONE: status %d parallel %d",
3246 Info(
"HandleInputMessage",
3262 rc = (async) ? 0 : 1;
3287 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: enter");
3297 Error(
"HandleInputMessage",
3298 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3307 Error(
"HandleInputMessage",
3308 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3312 Error(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: unknown type: %d", type);
3325 Info(
"HandleInputMessage",
"kPROOF_SENDOUTPUT: enter (%s)", sl->
GetOrdinal());
3341 Info(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: enter");
3345 Info(
"HandleInputMessage",
"finalization on %s started ...", prefix);
3371 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: query result missing");
3373 }
else if (type > 0) {
3383 }
else if (
IsTty() || changed) {
3384 fprintf(stderr,
"%s\r", msg.
Data());
3404 while ((xo = nxin()))
3416 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: player undefined!");
3428 Info(
"HandleInputMessage",
"%s: kPROOF_OUTPUTLIST: enter", sl->
GetOrdinal());
3444 out = (
TList *) out->Clone();
3449 Info(
"HandleInputMessage",
3450 "%s: kPROOF_OUTPUTLIST: query result missing", sl->
GetOrdinal());
3459 Info(
"HandleInputMessage",
3460 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->
GetOrdinal());
3464 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->
GetOrdinal());
3474 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYLIST: enter");
3487 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_RETRIEVE: enter");
3496 Info(
"HandleInputMessage",
3497 "kPROOF_RETRIEVE: query result missing or player undefined");
3504 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MAXQUERIES: enter");
3508 Printf(
"Number of queries fully kept remotely: %d", max);
3514 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SERVERSTARTED: enter");
3516 UInt_t tot = 0, done = 0;
3520 (*mess) >> action >> tot >> done >> st;
3527 char msg[512] = {0};
3529 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3532 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3533 action.
Data(), done, tot, frac);
3536 fprintf(stderr,
"%s", msg);
3546 m << action << tot << done << st;
3554 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATASET_STATUS: enter");
3556 UInt_t tot = 0, done = 0;
3560 (*mess) >> action >> tot >> done >> st;
3566 char msg[512] = {0};
3568 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3571 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3572 action.
Data(), done, tot, frac);
3575 fprintf(stderr,
"%s", msg);
3585 m << action << tot << done << st;
3593 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STARTPROCESS: enter");
3613 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"Preparation time: %f s",
fPrepTime);
3618 (*mess) >> selec >> dsz >> first >> nent;
3620 if (!
gROOT->IsBatch()) {
3625 selec.
Data(), dsz, first, nent);
3639 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_ENDINIT: enter");
3658 Info(
"HandleInputMessage",
"%s: got kPROOF_SETIDLE", sl->
GetOrdinal());
3661 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3675 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYSUBMITTED: enter");
3700 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SESSIONTAG: enter");
3723 Info(
"HandleInputMessage",
"kPROOF_FEEDBACK: enter");
3736 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_AUTOBIN: enter");
3741 (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3747 answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3755 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PROGRESS: enter");
3764 Float_t initTime, procTime, evtrti, mbrti;
3765 (*mess) >> total >> processed >> bytesread
3766 >> initTime >> procTime
3770 initTime, procTime, evtrti, mbrti);
3775 (*mess) >> total >> processed;
3789 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STOPPROCESS: enter");
3796 (*mess) >> status >> abort;
3798 (*mess) >> events >> abort;
3804 TList *listOfMissingFiles = 0;
3805 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
3806 listOfMissingFiles =
new TList();
3807 listOfMissingFiles->
SetName(
"MissingFiles");
3826 Emit(
"StopProcess(Bool_t)", abort);
3832 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SUBMERGER: enter");
3839 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: enter");
3846 Error(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: no list received!");
3848 tmpinfo->SetOwner(
kFALSE);
3890 Info(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: enter");
3894 Error(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: fDSet not set");
3903 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATA_READY: enter");
3906 (*mess) >> dataready >> totalbytes >> bytesready;
3919 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MESSAGE: enter");
3932 fprintf(stderr,
"%s%c", msg.Data(), (lfeed ?
'\n' :
'\r'));
3940 fprintf(stderr,
"%s%c", msg.Data(), (lfeed ?
'\n' :
'\r'));
3956 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_VERSARCHCOMP: %s", vac.Data());
3959 if (vac.Tokenize(vers, from,
"|"))
3969 Error(
"HandleInputMessage",
"unknown command received from '%s' (what = %d)",
3997 Int_t merger_id = -1;
3998 (*mess) >> merger_id;
4001 Info(
"HandleSubmerger",
"kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4005 Error(
"HandleSubmerger",
"kOutputSize: #%d not in list ", merger_id);
4008 TMergerInfo * mi = (TMergerInfo *)
fMergers->
At(merger_id);
4018 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"all mergers removed ... ");
4022 PDB(kSubmerger, 2)
Error(
"HandleSubmerger",
"kOutputSent: received not on endmaster!");
4029 Int_t merger_id = -1;
4030 (*mess) >> merger_id;
4032 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown: #%d ", merger_id);
4035 Error(
"HandleSubmerger",
"kMergerDown: #%d not in list ", merger_id);
4039 TMergerInfo * mi = (TMergerInfo *)
fMergers->
At(merger_id);
4058 while ((o = nxo())) {
4061 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown:%d: exit", merger_id);
4069 Info(
"HandleSubmerger",
"worker %s reported as finished ", sl->
GetOrdinal());
4073 Info(
"HandleSubmerger",
"finalization on %s started ...", prefix);
4077 Int_t output_size = 0;
4078 Int_t merging_port = 0;
4079 (*mess) >> output_size >> merging_port;
4081 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
4082 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4098 msg.Form(
"%s: Invalid request: cannot start %d mergers for %d workers",
4108 if (activeWorkers > 1) {
4114 msg.Form(
"%s: Number of mergers set dynamically to %d (for %d workers)",
4117 msg.Form(
"%s: No mergers will be used for %d workers",
4118 prefix, activeWorkers);
4127 if (activeWorkers > 1) {
4132 while ((wrk = nxwk())) {
4140 msg.Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host",
4143 msg.Form(
"%s: No mergers will be used for %d workers",
4144 prefix, activeWorkers);
4152 msg.Form(
"%s: Number of mergers set by user to %d (for %d workers)",
4198 TMergerInfo *mgi = 0;
4199 while ((mgi = (TMergerInfo *) nxmg())) {
4220 Error(
"HandleSubMerger",
"kOutputSize received not on endmaster!");
4232 Int_t merger_id = -1;
4236 TMergerInfo *mgi = (TMergerInfo *)
fMergers->
At(i);
4246 if (merger_id == -1) {
4253 Info(
"RedirectWorker",
"redirecting worker %s to merger %d", sl->
GetOrdinal(), merger_id);
4255 PDB(kSubmerger, 2)
Info(
"RedirectWorker",
"redirecting output to merger #%d", merger_id);
4257 Error(
"RedirectWorker",
"#%d not in list ", merger_id);
4260 TMergerInfo * mi = (TMergerInfo *)
fMergers->
At(merger_id);
4263 sendoutput << merger_id;
4264 sendoutput << hname;
4266 s->
Send(sendoutput);
4278 while (fLastAssignedMerger < fMergers->GetSize() &&
4290 while (fLastAssignedMerger < fMergers->GetSize() &&
4311 PDB(kSubmerger, 2)
Info(
"AskForOutput",
4312 "worker %s was asked to send its output to master",
4316 sendoutput <<
TString(
"master");
4332 Info(
"UpdateDialog",
4333 "processing was aborted - %lld events processed",
4348 Info(
"UpdateDialog",
4349 "processing was stopped - %lld events processed",
4366 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4371 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4387 while ((sl = (
TSlave*) next()))
4400 while ((sl = (
TSlave*) next()))
4412 Int_t active_mergers = 0;
4415 TMergerInfo *mi = 0;
4416 while ((mi = (TMergerInfo *)mergers())) {
4417 if (mi->
IsActive()) active_mergers++;
4420 return active_mergers;
4429 Info(
"CreateMerger",
"worker %s will be merger ", sl->
GetOrdinal());
4431 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"Begin");
4435 Info(
"CreateMerger",
"cannot create merger on port %d - exit", port);
4452 Int_t workersOnHost = 0;
4456 workers = workersOnHost - 1;
4460 msg.
Form(
"worker %s on host %s will be merger for %d additional workers", sl->
GetOrdinal(), sl->
GetName(), workers);
4467 TMergerInfo * merger =
new TMergerInfo(sl, port, workers);
4472 bemerger << workers;
4475 PDB(kSubmerger,2)
Info(
"CreateMerger",
4476 "merger #%d (port: %d) for %d workers started",
4484 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"exit");
4495 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4501 Error(
"MarkBad",
"worker instance undefined: protocol error? ");
4522 msg.
Form(
"\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4524 (reason && strlen(reason)) ? reason :
"unknown");
4525 Info(
"MarkBad",
"%s", msg.Data());
4529 msg +=
TString::Format(
"\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4534 msg +=
TString::Format(
" +++ Please check the session logs for error messages either using\n");
4538 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->" 4542 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->" 4543 "Display(\"*\")\n\n", thisurl.
Data());
4544 Printf(
"%s", msg.Data());
4546 }
else if (reason) {
4548 Info(
"MarkBad",
"worker %s at %s:%d asked to terminate",
4556 TList *listOfMissingFiles = 0;
4557 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
4558 listOfMissingFiles =
new TList();
4559 listOfMissingFiles->
SetName(
"MissingFiles");
4568 packetizer->
MarkBad(wrk, 0, &listOfMissingFiles);
4575 if (
id !=
kNPOS) ord.Remove(0,
id+1);
4624 Int_t mergersCount = -1;
4626 if (mc) mergersCount = mc->
GetVal();
4628 if (mergersCount == 0) {
4630 if (activeWorkers > 1) {
4656 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4671 Warning(
"TerminateWorker",
"worker instance undefined: protocol error? ");
4681 Info(
"TerminateWorker",
"connection to worker is already down: cannot" 4682 " send termination message");
4694 if (ord && strlen(ord) > 0) {
4699 while ((wrk = (
TSlave *)nxw())) {
4700 if (all || !strcmp(wrk->
GetOrdinal(), ord)) {
4732 if (slaves->
GetSize() == 0)
return 0;
4738 while ((sl = (
TSlave *)next())) {
4740 if (sl->
Ping() == -1) {
4741 MarkBad(sl,
"ping unsuccessful");
4758 if (slaves->
GetSize() == 0)
return;
4763 while ((sl = (
TSlave *)next())) {
4781 IsValid() ?
"valid" :
"invalid");
4784 Printf(
"ROOT version|rev: %s|%s",
gROOT->GetVersion(),
gROOT->GetGitCommit());
4790 if (sl->GetSocket()->GetSecContext())
4791 Printf(
"Security context: %s",
4792 sl->GetSocket()->GetSecContext()->AsString(sc));
4793 Printf(
"Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4795 Printf(
"Security context: Error - No connection");
4796 Printf(
"Proofd protocol version: Error - No connection");
4808 Printf(
"*** Master server %s (parallel mode, %d workers):",
4811 Printf(
"*** Master server %s (sequential mode):",
4825 Printf(
"ROOT version|rev|tag: %s", ver.
Data());
4843 Printf(
"List of workers:");
4846 while (
TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4847 if (!sl->IsValid())
continue;
4854 if (sl->GetSocket()->Send(mess) == -1)
4855 const_cast<TProof*>(
this)->MarkBad(sl,
"could not send kPROOF_PRINT request");
4859 Error(
"Print",
"TSlave is neither Master nor Worker");
4911 TString outfile, dsname, stfopt;
4915 while (opt.
Tokenize(oo, from,
"[; ]")) {
4918 iof = opt.
Index(tagf);
4921 iof = opt.
Index(tagf);
4924 iod = opt.
Index(tagd);
4927 iod = opt.
Index(tagd);
4930 ios = opt.
Index(tags);
4932 tags =
"savetofile";
4933 ios = opt.
Index(tags);
4938 Error(
"HandleOutputOptions",
"options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4944 from = iof + tagf.
Length();
4946 Error(
"HandleOutputOptions",
"could not extract output file settings string! (%s)", opt.
Data());
4954 from = iod + tagd.
Length();
4955 if (!opt.
Tokenize(dsname, from,
"[; ]"))
4956 if (
gDebug > 0)
Info(
"HandleOutputOptions",
"no dataset name found: use default");
4965 if (dsname.
IsNull()) dsname =
"dataset_<qtag>";
4969 from = ios + tags.
Length();
4970 if (!opt.
Tokenize(stfopt, from,
"[; ]"))
4971 if (
gDebug > 0)
Info(
"HandleOutputOptions",
"save-to-file not found: use default");
4979 Error(
"HandleOutputOptions",
"save-to-file option must be a digit! (%s)", stfopt.
Data());
5003 Warning(
"HandleOutputOptions",
5004 "directory '%s' for the output file does not exists or is not writable:" 5025 if (
Exec(
"gProofServ->GetDataDir()",
"0",
kTRUE) == 0) {
5030 ddir = os->
GetString()(fst+1, lst-fst-1);
5032 emsg =
"could not find 'const char *' string in macro log! cannot continue";
5035 emsg =
"could not retrieve master data directory info! cannot continue";
5038 Error(
"HandleOutputOptions",
"%s", emsg.
Data());
5042 if (!ddir.
IsNull()) ddir +=
"/";
5044 outfile.
Form(
"%s<file>", ddir.
Data());
5065 Warning(
"HandleOutputOptions",
"Dataset required bu Save-To-File disabled: enabling!");
5066 stfopt.
Form(
"%d", ostf+1);
5081 if (target ==
"ds|V") {
5086 while ((o = nxo())) {
5098 Warning(
"HandleOutputOptions",
"could not retrieve TFileCollection for dataset '%s'", dsname.
Data());
5101 Warning(
"HandleOutputOptions",
"dataset not found!");
5113 Printf(
" Output successfully copied to %s", target.
Data());
5114 targetcopied =
kTRUE;
5116 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.
Data());
5124 while ((o = nxo())) {
5128 if (pof == pf && targetcopied)
continue;
5133 Printf(
" Output successfully copied to %s", target.
Data());
5136 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.
Data());
5146 Warning(
"HandleOutputOptions",
5153 if (!target.
IsNull() && !swapcopied) {
5156 if (!fout || (fout && fout->IsZombie())) {
5158 Warning(
"HandleOutputOptions",
"problems opening output file %s", target.
Data());
5163 while ((o = nxo())) {
5176 Printf(
" Output saved to %s", target.
Data());
5207 if (action == 0 || (action == 1 && optfb.
IsNull())) {
5212 ifb = opt.
Index(tag);
5214 if (ifb ==
kNPOS)
return;
5215 from = ifb + tag.
Length();
5218 Warning(
"SetFeedback",
"could not extract feedback string! Ignoring ...");
5227 TString nm, startdraw, stopdraw;
5229 while (optfb.
Tokenize(nm, from,
",")) {
5231 if (nm ==
"stats") {
5233 startdraw.
Form(
"gDirectory->Add(new TStatsFeedback((TProof *)%p))",
this);
5240 stopdraw.
Form(
"TObject *o = gDirectory->FindObject(\"%s\"); " 5241 " if (o && strcmp(o->ClassName(), \"TStatsFeedback\")) " 5253 startdraw.
Form(
"gDirectory->Add(new TDrawFeedback((TProof *)%p))",
this);
5258 stopdraw.
Form(
"TObject *o = gDirectory->FindObject(\"%s\"); " 5259 " if (o && strcmp(o->ClassName(), \"TDrawFeedback\")) " 5283 TString opt(option), optfb, outfile;
5285 if (opt.Contains(
"fb=") || opt.Contains(
"feedback="))
SetFeedback(opt, optfb, 0);
5294 Info(
"Process",
"session is in waiting or processing status: switch to asynchronous mode");
5296 opt.ReplaceAll(
"SYNC",
"");
5330 if (selector && strlen(selector)) {
5335 Error(
"Process",
"neither a selecrot file nor a selector object have" 5336 " been specified: cannot process!");
5358 if (sst) rv = sst->
GetVal();
5390 Info(
"Process",
"server version < 5.18/00:" 5391 " processing of TFileCollection not supported");
5402 if (selector && strlen(selector)) {
5403 retval =
Process(dset, selector, option, nentries, first);
5407 Error(
"Process",
"neither a selecrot file nor a selector object have" 5408 " been specified: cannot process!");
5480 Info(
"Process",
"processing 'by name' not supported by the server");
5484 TString dsname, fname(dsetname);
5497 const Int_t blen = 8192;
5501 Long64_t len = (rest > blen - 1) ? blen - 1 : rest;
5503 Error(
"Process",
"problems reading from file '%s'", fname.
Data());
5514 if (rest > 0)
return -1;
5516 Error(
"Process",
"could not open file '%s'", fname.
Data());
5528 Info(
"Process",
"processing multi-dataset read from file '%s':", fname.
Data());
5529 Info(
"Process",
" '%s'", dsname.
Data());
5536 Info(
"Process",
"multi-dataset processing not supported by the server");
5543 while (names.Tokenize(name, from,
"[, |]")) {
5548 Int_t ienl = name.Index(
"?enl=");
5549 if (ienl ==
kNPOS) {
5550 ienl = name.Index(
"<<");
5551 if (ienl !=
kNPOS) {
5552 newname.Remove(ienl);
5553 ienl += strlen(
"<<");
5556 newname.Remove(ienl);
5557 ienl += strlen(
"?enl=");
5562 Int_t idxc = newname.Index(
"#");
5563 if (idxc !=
kNPOS) {
5565 if (idxs !=
kNPOS) {
5566 obj = newname(idxs+1, newname.Length());
5567 dir = newname(idxc+1, newname.Length());
5568 dir.
Remove(dir.Index(
"/") + 1);
5571 obj = newname(idxc+1, newname.Length());
5574 }
else if (newname.Index(
":") !=
kNPOS && newname.Index(
"://") ==
kNPOS) {
5576 Error(
"Process",
"bad name syntax (%s): please use" 5577 " a '#' after the dataset name", name.Data());
5585 }
else if (obj != dsobj || dir != dsdir) {
5587 Warning(
"Process",
"'obj' or 'dir' specification not consistent w/ the first given: ignore");
5590 if (ienl !=
kNPOS) {
5592 enl =
name(ienl, name.Length());
5600 if ((el = dynamic_cast<TEntryList *>(oel))) {
5614 while ((k = (
TKey *) nxk())) {
5615 if (!strcmp(k->GetClassName(),
"TEntryList")) {
5617 if ((el = dynamic_cast<TEntryList *>(f->
Get(k->GetName())))) {
5629 }
else if (strcmp(el->GetName(), k->GetName())) {
5630 Warning(
"Process",
"multiple entry lists found in file '%s': the first one is taken;\n" 5631 "if this is not what you want, load first the content in memory" 5632 "and select it by name ", enl.Data());
5637 Warning(
"Process",
"file '%s' cannot be open or is empty - ignoring", enl.Data());
5646 newname += el->GetName();
5659 TDSet *dset =
new TDSet(dsname, dsobj, dsdir);
5668 if (selector && strlen(selector)) {
5669 retval =
Process(dset, selector, option, nentries, first);
5673 Error(
"Process",
"neither a selector file nor a selector object have" 5674 " been specified: cannot process!");
5698 Info(
"Process",
"server version < 5.17/04: generic processing not supported");
5707 if (selector && strlen(selector)) {
5708 retval =
Process(dset, selector, option, n);
5712 Error(
"Process",
"neither a selector file nor a selector object have" 5713 " been specified: cannot process!");
5737 Error(
"Process",
"server version < 5.33/02:" 5738 "processing by object not supported");
5742 Error(
"Process",
"selector object undefined!");
5763 Error(
"Process",
"server version < 5.33/02:" 5764 "processing by object not supported");
5768 Error(
"Process",
"selector object undefined!");
5786 Error(
"Process",
"server version < 5.33/02:" 5787 "processing by object not supported");
5791 Error(
"Process",
"selector object undefined!");
5795 Long64_t rc =
Process(dsetname, (
const char*)0, option, nentries, first, elist);
5810 Error(
"Process",
"server version < 5.33/02:" 5811 "processing by object not supported");
5815 Error(
"Process",
"selector object undefined!");
5863 Info(
"Finalize",
"query #%d not found", qry);
5897 Info(
"Finalize",
"query already finalized:" 5898 " use Finalize(<qry>,kTRUE) to force new retrieval");
5927 Info(
"Retrieve",
"query #%d not found", qry);
5929 Info(
"Retrieve",
"positive argument required - do nothing");
5956 if (!farc || (farc && !(farc->
IsOpen()))) {
5957 Info(
"Retrieve",
"archive file cannot be open (%s)", path);
5972 Info(
"Retrieve",
"query not found after retrieve");
5992 Info(
"Remove",
"query #%d not found", qry);
5994 Info(
"Remove",
"positive argument required - do nothing");
6036 Info(
"Archive",
"query #%d not found", qry);
6038 Info(
"Archive",
"positive argument required - do nothing");
6095 if (mode && (strlen(mode) > 0)) {
6106 Info(
"GetQueryMode",
"query mode is set to: %s", qmode ==
kSync ?
6119 const char *selection,
Option_t *option,
6126 Info(
"DrawSelect",
"not idle, asynchronous Draw not supported");
6155 const char *selection,
Option_t *option,
6159 Info(
"Process",
"processing 'by name' not supported by the server");
6167 if (idxc !=
kNPOS) {
6169 if (idxs !=
kNPOS) {
6172 dir.Remove(dir.Index(
"/") + 1);
6180 Error(
"DrawSelect",
"bad name syntax (%s): please use" 6181 " a '#' after the dataset name", dsetname);
6199 Info(
"StopProcess",
"enter %d", abort);
6222 while ((sl = (
TSlave *)next()))
6233 Emit(
"DisableGoAsyn()");
6244 Info(
"GoAsynchronous",
"functionality not supported by the server - ignoring");
6252 Info(
"GoAsynchronous",
"either idle or already in asynchronous mode - ignoring");
6261 const Int_t kMAXBUF = 16384;
6275 Warning(
"RecvLogFile",
"file descriptor for outputs undefined (%d):" 6276 " will not log msgs", fdout);
6279 lseek(fdout, (off_t) 0, SEEK_END);
6285 while (filesize < size) {
6286 left =
Int_t(size - filesize);
6287 if (left >= kMAXBUF)
6290 filesize = (rec > 0) ? (filesize + rec) : filesize;
6299 w =
write(fdout, p, r);
6302 SysError(
"RecvLogFile",
"error writing to unit: %d", fdout);
6308 }
else if (rec < 0) {
6309 Error(
"RecvLogFile",
"error during receiving log file");
6315 EmitVA(
"LogMessage(const char*,Bool_t)", 2, buf,
kFALSE);
6334 if (!msg || (len = strlen(msg)) <= 0)
6338 Int_t lsfx = (sfx) ? strlen(sfx) : 0;
6345 Warning(
"NotifyLogMsg",
"file descriptor for outputs undefined (%d):" 6346 " will not notify msgs", fdout);
6349 lseek(fdout, (off_t) 0, SEEK_END);
6355 char *p = (
char *)msg;
6360 SysError(
"NotifyLogMsg",
"error writing to unit: %d", fdout);
6368 if (
write(fdout, sfx, lsfx) != lsfx)
6369 SysError(
"NotifyLogMsg",
"error writing to unit: %d", fdout);
6375 EmitVA(
"LogMessage(const char*,Bool_t)", 2, msg,
kFALSE);
6389 Info(
"LogMessage",
"Enter ... %s, 'all: %s", msg ? msg :
"",
6390 all ?
"true" :
"false");
6392 if (
gROOT->IsBatch()) {
6393 PDB(kGlobal,1)
Info(
"LogMessage",
"GUI not started - use TProof::ShowLog()");
6398 EmitVA(
"LogMessage(const char*,Bool_t)", 2, msg, all);
6404 lseek(fileno(
fLogFileR), (off_t) 0, SEEK_SET);
6406 const Int_t kMAXBUF = 32768;
6410 while ((len =
read(fileno(
fLogFileR), buf, kMAXBUF-1)) < 0 &&
6415 Error(
"LogMessage",
"error reading log file");
6421 EmitVA(
"LogMessage(const char*,Bool_t)", 2, buf,
kFALSE);
6445 while ((sl = (
TSlave *)next())) {
6448 MarkBad(sl,
"could not send kPROOF_GROUPVIEW message");
6512 if (!s.
Length())
return 0;
6521 Error(
"Exec",
"file %s could not be transfered", fn);
6532 Error(
"Exec",
"macro %s not found", filename.
Data());
6540 gROOT->ProcessLine(cmd);
6568 if (!s.
Length())
return 0;
6572 gROOT->ProcessLine(cmd);
6580 if (strcmp(ord,
"master") && strcmp(ord,
"0"))
ActivateWorker(ord);
6640 TString cmd =
TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6654 Printf(
"%s: %d", rcenv, env);
6666 TString cmd =
TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6680 Printf(
"%s: %f", rcenv, env);
6692 TString cmd =
TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6701 env = os->
GetString()(fst+1, lst-fst-1);
6783 FileMap_t::const_iterator it;
6790 if ((*md5) != md.
fMD5) {
6813 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
6827 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
6874 slaves =
new TList();
6878 if (slaves->
GetSize() == 0)
return 0;
6886 SysError(
"SendFile",
"cannot open file %s", file);
6894 Error(
"SendFile",
"cannot stat file %s", file);
6899 Error(
"SendFile",
"empty file %s", file);
6911 if ((opt &
kCp)) cpopt |= kCp;
6912 if ((opt &
kCpBin)) cpopt |= (kCp | kCpBin);
6914 const Int_t kMAXBUF = 32768;
6921 if (fnam ==
"cache") {
6923 }
else if (fnam.
IsNull()) {
6928 while ((sl = (
TSlave *)next())) {
6938 Info(
"SendFile",
"%s sending file %s to: %s:%s (%d)", snd,
6946 snprintf(buf, kMAXBUF,
"%s %d %lld %d", fnam.
Data(), bin, siz, fw);
6948 MarkBad(sl,
"could not send kPROOF_SENDFILE request");
6954 lseek(fd, 0, SEEK_SET);
6962 SysError(
"SendFile",
"error reading from file %s", file);
6969 SysError(
"SendFile",
"error writing to slave %s:%s (now offline)",
6971 MarkBad(sl,
"sendraw failure");
6992 return (
fStatus != 0) ? -1 : nsl;
7002 if (!
IsValid() || !obj)
return -1;
7028 if (!
IsValid() || !obj)
return -1;
7057 snprintf(str, 32,
"%d %u", level, mask);
7075 Warning(
"SetRealTimeLog",
"session is invalid - do nothing");
7093 PDB(kGlobal,1)
Info(
"SetParallelSilent",
"request all nodes");
7095 PDB(kGlobal,1)
Info(
"SetParallelSilent",
"request %d node%s", nodes,
7096 nodes == 1 ?
"" :
"s");
7099 mess << nodes << random;
7103 PDB(kGlobal,1)
Info(
"SetParallelSilent",
"got %d node%s", n, n == 1 ?
"" :
"s");
7122 Printf(
"PROOF set to sequential mode");
7124 TString subfix = (n == 1) ?
"" :
"s";
7126 subfix +=
", randomly selected";
7127 Printf(
"PROOF set to parallel mode (%d worker%s)", n, subfix.
Data());
7145 Error(
"GoMoreParallel",
"can't invoke here -- should not happen!");
7149 Error(
"GoMoreParallel",
"no ProofServ available nor Lite -- should not happen!");
7155 Int_t nAddedWorkers = 0;
7157 while (((nAddedWorkers < nWorkersToAdd) || (nWorkersToAdd == -1)) &&
7158 (( sl = dynamic_cast<TSlave *>( next() ) ))) {
7163 Error(
"GoMoreParallel",
"TSlave is neither a Master nor a Slave: %s:%s",
7170 (strcmp(
"IGNORE", sl->
GetImage()) == 0)) {
7172 Info(
"GoMoreParallel",
"Worker %s:%s won't be considered",
7179 Info(
"GoMoreParallel",
"Worker %s:%s is already active: skipping",
7195 Info(
"GoMoreParallel",
"Worker %s:%s marked as active!",
7200 Error(
"GoMoreParallel",
"Dynamic addition of master is not supported");
7208 Info(
"GoMoreParallel",
"Will invoke AskStatistics() -- implies a Collect()");
7213 Info(
"GoMoreParallel",
"Will invoke FindUniqueSlaves()");
7218 Info(
"GoMoreParallel",
"Will invoke SendGroupView()");
7222 Info(
"GoMoreParallel",
"Will invoke GetParallel()");
7228 s.
Form(
"PROOF just went more parallel (%d additional worker%s, %d worker%s total)",
7229 nAddedWorkers, (nAddedWorkers == 1) ?
"" :
"s",
7230 nTotalWorkers, (nTotalWorkers == 1) ?
"" :
"s");
7232 Info(
"GoMoreParallel",
"%s", s.
Data());
7234 return nTotalWorkers;
7258 while ((sl = (
TSlave *)nxt())) {
7260 if (strcmp(
"IGNORE", sl->
GetImage()) == 0)
continue;
7263 Error(
"GoParallel",
"TSlave is neither Master nor Slave");
7276 while (cnt < nwrks) {
7286 Error(
"GoParallel",
"attaching to candidate!");
7292 Int_t slavenodes = 0;
7303 Int_t nn = (nodes < 0) ? -1 : nodes-cnt;
7311 MarkBad(sl,
"could not send kPROOF_PARALLEL or kPROOF_LOGFILE request");
7327 MarkBad(sl,
"collect failed after kPROOF_PARALLEL or kPROOF_LOGFILE request");
7354 printf(
"PROOF set to sequential mode\n");
7356 printf(
"PROOF set to parallel mode (%d worker%s)\n",
7357 n, n == 1 ?
"" :
"s");
7360 PDB(kGlobal,1)
Info(
"GoParallel",
"got %d node%s", n, n == 1 ?
"" :
"s");
7396 if (doask && !
Prompt(
"Do you really want to remove all data files"))
return;
7397 if (
fManager->
Rm(
"~/data/*",
"-rf",
"all") < 0)
7398 Warning(
"ClearData",
"problems purging data directory");
7402 if (!dsname || strlen(dsname) <= 0) {
7403 Error(
"ClearData",
"dataset name mandatory when removing a full dataset");
7408 Error(
"ClearData",
"dataset '%s' does not exists", dsname);
7414 Error(
"ClearData",
"could not retrieve info about dataset '%s'", dsname);
7419 " of dataset '%s'", dsname);
7431 if (!(fi->GetFirstUrl())) {
7432 Error(
"ClearData",
"GetFirstUrl() returns NULL for '%s' - skipping",
7436 TUrl uf(*(fi->GetFirstUrl()));
7440 Int_t nurl = fi->GetNUrls();
7443 while (nurl-- && fi->NextUrl()) {
7444 up = fi->GetCurrentUrl();
7447 if (opt.BeginsWith(
"node=")) {
7457 Error(
"ClearData",
"problems removing '%s'", file.
Data());
7464 fprintf(stderr,
"\n");
7472 TString outtmp(
"ProofClearData_");
7475 Error(
"ClearData",
"cannot create temp file for logs");
7485 in.open(outtmp.
Data());
7486 if (!in.is_open()) {
7487 Error(
"ClearData",
"could not open temp file for logs: %s", outtmp.
Data());
7498 if (line.
IsNull())
continue;
7502 if (!line.
Tokenize(host, from,
"| "))
continue;
7504 if (!line.
Tokenize(file, from,
"| "))
continue;
7515 Info(
"ClearData",
"added info for: h:%s, f:%s", host.
Data(), file.
Data());
7517 Warning(
"ClearData",
"found incomplete line: '%s'", line.
Data());
7527 if (!fcmap || (fcmap && fcmap->
GetSize() <= 0)) {
7529 Warning(
"ClearData",
"no dataset beloning to '%s'", sel.
Data());
7548 while (nurl-- && fi->
NextUrl()) {
7557 Info(
"ClearData",
"found: host: %s, file: %s", host.
Data(), file.
Data());
7568 "registered file '%s' not found in the full list!",
7584 Info(
"ClearData",
"%d unregistered files to be removed:", nfiles);
7588 " unregistered data files", nfiles);
7602 Error(
"ClearData",
"problems removing '%s' on host '%s'",
7610 fprintf(stderr,
"\n");
7625 if (!pp.
Contains(
"[y/N]")) pp +=
" [y/N]";
7627 if (a !=
"\n" && a[0] !=
'y' && a[0] !=
'Y' && a[0] !=
'n' && a[0] !=
'N') {
7628 Printf(
"Please answer y, Y, n or N");
7631 }
else if (a ==
"\n" || a[0] ==
'n' || a[0] ==
'N') {
7644 fprintf(stderr,
"[TProof::ClearData] Total %5d files\t|", t);
7646 if (r > 0 && t > 0) {
7648 fprintf(stderr,
"=");
7649 else if (
l == 20*r/t)
7650 fprintf(stderr,
">");
7651 else if (
l > 20*r/t)
7652 fprintf(stderr,
".");
7654 fprintf(stderr,
"=");
7656 fprintf(stderr,
"| %.02f %% \r", 100.0*(t ? (r/t) : 1));
7718 while (fgets(line, 2048, fin)) {
7721 if (
write(fdout, line, r) < 0) {
7723 "errno %d writing to file descriptor %d",
7752 Warning(
"ShowPackages",
"file descriptor for outputs undefined (%p):" 7753 " will not log msgs", fout);
7756 lseek(fileno(fout), (off_t) 0, SEEK_END);
7833 if (!package || !package[0]) {
7834 Error(
"ClearPackage",
"need to specify a package name");
7861 if (!pack || strlen(pack) <= 0) {
7862 Error(
"DisablePackage",
"need to specify a package name");
7873 Warning(
"DisablePackage",
"problem removing locally package '%s'", pack);
7883 path.
Form(
"~/packages/%s", pack);
7884 if (
fManager->
Rm(path,
"-rf",
"all") != -1) {
7920 Warning(
"DisablePackages",
"problem removing packages locally");
7929 if (
fManager->
Rm(
"~/packages/*",
"-rf",
"all") != -1) {
7967 if (!package || !package[0]) {
7968 Error(
"BuildPackage",
"need to specify a package name");
8007 if (buildOnClient) {
8013 if (!
IsLite() || !buildOnClient) {
8046 if (!package || !package[0]) {
8047 Error(
"LoadPackage",
"need to specify a package name");
8058 if (
fPackMgr->
Load(package, loadopts) == -1)
return -1;
8062 if (loadopts) mess << loadopts;
8071 Info(
"LoadPackage",
"Sending load message to selected workers only");
8073 if (doCollect)
Collect(workers, -1, -1, deactivateOnFailure);
8090 if (!package || !package[0]) {
8091 Error(
"UnloadPackage",
"need to specify a package name");
8102 Warning(
"UnloadPackage",
"unable to remove symlink to %s", package);
8173 if (loadopts && strlen(loadopts)) {
8191 Warning(
"EnablePackage",
"'checkversion' option unknown from argument: '%s' - ignored", ocv.
Data());
8194 Info(
"EnablePackage",
"setting check version option from argument: %d", cvopt);
8198 if (lcv !=
kNPOS && fcv == 0) ocv += os->
String()[lcv];
8204 if (!optls) optls =
new TList;
8210 Warning(
"EnablePackage",
"remote server does not support options: ignoring the option string");
8235 if (!package || !package[0]) {
8236 Error(
"EnablePackage",
"need to specify a package name");
8254 if (ocv ==
"off" || ocv ==
"0")
8256 else if (ocv ==
"on" || ocv ==
"1")
8259 Warning(
"EnablePackage",
"'checkversion' option unknown from rootrc: '%s' - ignored", ocv.
Data());
8264 chkveropt = pcv->
GetVal();
8270 Info(
"EnablePackage",
"using check version option: %d", chkveropt);
8275 TList *optls = (loadopts && loadopts->
GetSize() > 0) ? loadopts : 0;
8277 Warning(
"EnablePackage",
"remote server does not support options: ignoring the option list");
8281 if (
LoadPackage(pac, notOnClient, optls, workers) == -1)
8309 Error(
"DownloadPackage",
"the manager is undefined!");
8315 if (!parname.EndsWith(
".par")) parname +=
".par";
8316 src.
Form(
"packages/%s", parname.Data());
8317 if (!dstdir || strlen(dstdir) <= 0) {
8318 dst.Form(
"./%s", parname.Data());
8325 Error(
"DownloadPackage",
8326 "could not create the destination directory '%s' (errno: %d)",
8331 Error(
"DownloadPackage",
8332 "destination path '%s' exist but is not a directory!", dstdir);
8335 dst.Form(
"%s/%s", dstdir, parname.Data());
8345 Warning(
"DownloadPackage",
"problems restoring output");
8358 if (s.Contains(
"*** Global Package cache")) {
8360 s.
Remove(0, s.Last(
':') + 1);
8364 }
else if (s.Contains(
"*** Package cache")) {
8369 if (isGlobal && s.Contains(parname)) {
8370 src.
Form(
"%s/%s", globaldir.
Data(), parname.Data());
8381 Error(
"DownloadPackage",
"problems downloading '%s' (src:%s, dst:%s)",
8382 pack, src.Data(), dst.Data());
8385 Info(
"DownloadPackage",
"'%s' cross-checked against master repository (local path: %s)",
8423 if (
par.EndsWith(
".par")) {
8425 name = base(0, base.
Length() - strlen(
".par"));
8440 Info(
"UploadPackage",
"global package found (%s): no upload needed",
8443 }
else if (xrc < 0) {
8444 Error(
"UploadPackage",
"PAR file '%s' not found",
par.Data());
8482 mess << smsg << (*md5);
8485 mess2 << smsg << (*md5);
8488 mess3 << smsg << (*md5);
8503 TIter next(workers);
8505 while ((sl = (
TSlave *) next())) {
8519 Error(
"UploadPackage",
"%s: problems uploading file %s",
8526 if (!ftp.IsZombie()) {
8528 ftp.cd(smsg.
Data());
8538 Error(
"UploadPackage",
"%s: unpacking of package %s failed",
8548 while ((ma = (
TSlave *) nextmaster())) {
8558 Error(
"UploadPackage",
"package %s did not exist on submaster %s",
8574 if (macro && strlen(macro) > 0) {
8576 if (!macrop.Contains(dirn)) {
8578 gROOT->SetMacroPath(macrop);
8605 if (!macro || !macro[0]) {
8606 Error(
"Load",
"need to specify a macro name");
8616 TString addsname, implname = macro;
8618 if (icom !=
kNPOS) {
8619 addsname = implname(icom + 1, implname.
Length());
8623 TString bmsg(basemacro), acmode, args, io;
8629 Info(
"Load",
"macro '%s' does not contain a '.': do nothing", macro);
8645 Info(
"Load",
"no associated header file found: tried: %s %s",
8653 if (!addsname.
IsNull()) {
8656 while (addsname.
Tokenize(fn, from,
",")) {
8658 Error(
"Load",
"additional file '%s' not found", fn.
Data());
8664 if (addincs.IsNull()) {
8665 addincs.
Form(
"-I%s", dirn.Data());
8666 }
else if (!addincs.Contains(dirn)) {
8678 Error(
"Load",
"problems sending implementation file %s", implname.
Data());
8683 Error(
"Load",
"problems sending header file %s", headname.
Data());
8688 TIter nxfn(&addfiles);
8693 Error(
"Load",
"problems sending additional file %s", os->
GetName());
8745 PDB(kGlobal, 1)
Info(
"Load",
"adding loaded macro: %s", macro);
8762 if (uniqueWorkers) {
8778 while ((wrk = (
TSlave *)nxw())) {
8795 PDB(kGlobal, 1)
Info(
"Load",
"adding loaded macro: %s", macro);
8818 if ((!libpath || !libpath[0])) {
8820 Info(
"AddDynamicPath",
"list is empty - nothing to do");
8832 if (libpath && strlen(libpath)) {
8839 m << (
Int_t)doCollect;
8863 if ((!incpath || !incpath[0])) {
8865 Info(
"AddIncludePath",
"list is empty - nothing to do");
8877 if (incpath && strlen(incpath)) {
8884 m << (
Int_t)doCollect;
8907 if ((!libpath || !libpath[0])) {
8909 Info(
"RemoveDynamicPath",
"list is empty - nothing to do");
8921 if (libpath && strlen(libpath))
8941 if ((!incpath || !incpath[0])) {
8943 Info(
"RemoveIncludePath",
"list is empty - nothing to do");
8955 if (incpath && strlen(incpath))
8976 if ((type !=
"lib") && (type !=
"inc")) {
8977 Error(
"HandleLibIncPath",
"unknown action type: %s - protocol error?", type.
Data());
8986 if (path.
Length() > 0 && path !=
"-") {
8988 Warning(
"HandleLibIncPath",
"decomposing path %s", path.
Data());
8995 if (type ==
"lib") {
9017 Info(
"HandleLibIncPath",
9018 "libpath %s does not exist or cannot be read - not added", xlib.
Data());
9038 Info(
"HandleLibIncPath",
9039 "incpath %s does not exist or cannot be read - not added", xinc.
Data());
9046 if (type ==
"lib") {
9119 (*fPrintProgress)(
total, processed, procTime, bytesread);
9124 fprintf(stderr,
"[TProof::Progress] Total %lld events\t|", total);
9126 for (
int l = 0;
l < 20;
l++) {
9128 if (
l < 20*processed/total)
9129 fprintf(stderr,
"=");
9130 else if (
l == 20*processed/total)
9131 fprintf(stderr,
">");
9132 else if (
l > 20*processed/total)
9133 fprintf(stderr,
".");
9135 fprintf(stderr,
"=");
9137 Float_t evtrti = (procTime > 0. && processed > 0) ? processed / procTime : -1.;
9138 Float_t mbsrti = (procTime > 0. && bytesread > 0) ? bytesread / procTime : -1.;
9141 Float_t remainingTime = (total >= processed) ? (total - processed) / evtrti : -1;
9143 const Float_t toK = 1024., toM = 1048576., toG = 1073741824.;
9144 if (mbsrti >= toG) {
9147 }
else if (mbsrti >= toM) {
9150 }
else if (mbsrti >= toK) {
9154 fprintf(stderr,
"| %.02f %% [%.1f evts/s, %.1f %s, time left: %.1f s]\r",
9155 (total ? ((100.0*processed)/total) : 100.0), evtrti, mbsrti, sunit.
Data(), remainingTime);
9157 fprintf(stderr,
"| %.02f %% [%.1f evts/s, time left: %.1f s]\r",
9158 (total ? ((100.0*processed)/total) : 100.0), evtrti, remainingTime);
9161 fprintf(stderr,
"| %.02f %%\r",
9162 (total ? ((100.0*processed)/total) : 100.0));
9164 if (processed >= total) {
9165 fprintf(stderr,
"\n Query processing time: %.1f s\n", procTime);
9181 Info(
"Progress",
"%2f (%lld/%lld)", 100.*processed/total, processed, total);
9183 if (
gROOT->IsBatch()) {
9188 EmitVA(
"Progress(Long64_t,Long64_t)", 2, total, processed);
9201 Info(
"Progress",
"%lld %lld %lld %f %f %f %f", total, processed, bytesread,
9202 initTime, procTime, evtrti, mbrti);
9204 if (
gROOT->IsBatch()) {
9209 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
9210 7, total, processed, bytesread, initTime, procTime, evtrti, mbrti);
9223 Info(
"Progress",
"%lld %lld %lld %f %f %f %f %d %f", total, processed, bytesread,
9224 initTime, procTime, evtrti, mbrti, actw, eses);
9226 if (
gROOT->IsBatch()) {
9231 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
9232 10, total, processed, bytesread, initTime, procTime, evtrti, mbrti, actw, tses, eses);
9249 Emit(
"Feedback(TList *objs)", (
Long_t) objs);
9258 Info(
"CloseProgressDialog",
9265 Emit(
"CloseProgressDialog()");
9275 Info(
"ResetProgressDialog",
"(%s,%d,%lld,%lld)", sel, sz, fst, ent);
9277 EmitVA(
"ResetProgressDialog(const char*,Int_t,Long64_t,Long64_t)",
9278 4, sel, sz, fst, ent);
9287 Info(
"StartupMessage",
"(%s,%d,%d,%d)", msg, st, done, total);
9289 EmitVA(
"StartupMessage(const char*,Bool_t,Int_t,Int_t)",
9290 4, msg, st, done, total);
9299 Info(
"DataSetStatus",
"(%s,%d,%d,%d)", msg, st, done, total);
9301 EmitVA(
"DataSetStatus(const char*,Bool_t,Int_t,Int_t)",
9302 4, msg, st, done, total);
9315 char msg[512] = {0};
9317 snprintf(msg, 512,
"%s: OK (%d %s) \n",
9318 action,tot, type.
Data());
9320 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
9321 action, done, tot, frac);
9324 fprintf(stderr,
"%s", msg);
9333 mess <<
TString(action) << tot << done << st;
9344 Info(
"QueryResultReady",
"ref: %s", ref);
9346 Emit(
"QueryResultReady(const char*)",ref);
9366 while (
TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
9371 sllist->
SetName(sl->GetName());
9372 slholder.
Add(sllist);
9375 elemholder.
Add(elemlist);
9376 nodes.
Add(
new TPair(sllist, elemlist));
9378 sllist =
dynamic_cast<TList*
>(p->
Key());
9380 if (sllist) sllist->
Add(sl);
9386 for (
Int_t i = 0; i < 2; i++) {
9389 while (
TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
9390 if (elem->GetValid())
continue;
9422 Warning(
"ValidateDSet",
"invalid values from TPair! Protocol error?");
9430 Warning(
"ValidateDSet",
"no node to allocate TDSetElement to - ignoring");
9438 TIter nextNode(&nodes);
9440 while (
TPair *node = dynamic_cast<TPair*>(nextNode())) {
9441 TList *slaves =
dynamic_cast<TList*
>(node->Key());
9442 TList *setelements =
dynamic_cast<TList*
>(node->Value());
9443 if (!slaves || !setelements)
continue;
9447 for (
Int_t i=0; i<nslaves; i++) {
9451 for (
Int_t j = (i*nelements)/nslaves;
9452 j < ((i+1)*nelements)/nslaves;
9463 if (copyset.GetListOfElements()->GetSize()>0) {
9469 PDB(kGlobal,1)
Info(
"ValidateDSet",
9470 "Sending TDSet with %d elements to slave %s" 9472 copyset.GetListOfElements()->GetSize(),
9482 Info(
"ValidateDSet",
"Calling Collect");
9559 if (datafile && strlen(datafile) > 0) {
9591 if (dataFile.
Length() > 0) {
9593 Info(
"SendInputDataFile",
"broadcasting %s", dataFile.
Data());
9640 if (!list_ok && !file_ok)
return;
9643 if (file_ok && !list_ok) {
9646 }
else if (!file_ok && list_ok) {
9656 while ((obj = next())) {
9666 }
else if (file_ok && list_ok) {
9686 while ((obj = next())) {
9692 Error(
"PrepareInputDataFile",
"could not open %s for updating", dataFile.
Data());
9751 if (!name || (name && strlen(name) <= 0) ||
9752 !out || (out && out->
GetSize() <= 0))
return o;
9759 while ((o = nxo())) {
9760 if ((pf = dynamic_cast<TProofOutputFile *> (o))) {
9765 if (!f || (f && f->IsZombie())) {
9766 ::Warning(
"TProof::GetOutput",
"problems opening file %s", fn.
Data());
9769 if (f && (o = f->
Get(name)))
return o;
9797 Warning(
"SetParameter",
"player undefined! Ignoring");
9816 Warning(
"SetParameter",
"player undefined! Ignoring");
9835 Warning(
"SetParameter",
"player undefined! Ignoring");
9854 Warning(
"SetParameter",
"player undefined! Ignoring");
9873 Warning(
"SetParameter",
"player undefined! Ignoring");
9893 Warning(
"GetParameter",
"player undefined! Ignoring");
9909 if (!wildcard) wildcard =
"";
9911 Int_t nch = strlen(wildcard);
9917 while ((p = next())) {
9919 if (nch && s != wildcard && s.
Index(re) ==
kNPOS)
continue;
9934 if (!wildcard) wildcard =
"";
9936 Int_t nch = strlen(wildcard);
9941 while ((p = next())) {
9943 if (nch && s != wildcard && s.
Index(re) ==
kNPOS)
continue;
9964 Info(
"AddFeedback",
"Adding object \"%s\" to feedback", name);
9995 Info(
"",
"no feedback requested");
10019 Error(
"GetTreeHeader",
"No connection");
10036 d = soc->
Recv(reply);
10039 Error(
"GetTreeHeader",
"Error getting a replay from the master.Result %d", (
int) d);
10046 if (s1 ==
"Success")
10051 Info(
"GetTreeHeader",
"%s, message size: %d, entries: %d",
10052 s1.Data(), reply->
BufferSize(), (int) t->GetMaxEntryLoop());
10054 Info(
"GetTreeHeader",
"tree header retrieval failed");
10218 msg << start << end;
10234 off_t nowlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_CUR);
10237 "problem lseeking log file to current position (errno: %d)",
TSystem::GetErrno());
10242 off_t startlog = nowlog;
10243 off_t endlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_END);
10252 if (tolog <= 0)
return maclog;
10255 if (lseek(fileno(
fLogFileR), startlog, SEEK_SET) < 0) {
10257 "problem lseeking log file to start position (errno: %d)",
TSystem::GetErrno());
10266 Int_t wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10267 while (fgets(line, wanted,
fLogFileR)) {
10270 if (line[r-1] ==
'\n') line[r-1] =
'\0';
10277 wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10281 if (lseek(fileno(
fLogFileR), nowlog, SEEK_SET) < 0) {
10283 "problem lseeking log file to original position (errno: %d)",
TSystem::GetErrno());
10322 if (strstr(queryref, qr->GetTitle()) &&
10323 strstr(queryref, qr->GetName()))
10346 off_t nowlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_CUR);
10353 off_t startlog = nowlog;
10354 off_t endlog = lseek(fileno(
fLogFileR), (off_t) 0, SEEK_END);
10360 lseek(fileno(
fLogFileR), nowlog, SEEK_SET);
10363 lseek(fileno(
fLogFileR), (off_t) 0, SEEK_SET);
10364 }
else if (qry != -1) {
10375 }
else if (qry > 0) {
10378 TIter nxq(queries);
10385 TIter nxq(queries);
10396 Info(
"ShowLog",
"query %d not found in list", qry);
10408 lseek(fileno(
fLogFileR), startlog, SEEK_SET);
10413 Int_t wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10414 while (fgets(line, wanted,
fLogFileR)) {
10418 if (line[r-1] !=
'\n') line[r-1] =
'\n';
10424 SysError(
"ShowLog",
"error writing to stdout");
10431 tolog -= strlen(line);
10436 const char *opt = Getline(
"More (y/n)? [y]");
10446 wanted = (tolog >
sizeof(
line)) ?
sizeof(
line) : tolog;
10449 if (line[r-1] ==
'\n') line[r-1] = 0;
10455 if (
write(fileno(stdout),
"\n", 1) != 1)
10456 SysError(
"ShowLog",
"error writing to stdout");
10461 lseek(fileno(
fLogFileR), nowlog, SEEK_SET);
10499 Error(
"Detach",
"corrupted worker instance: wrk:%p, sock:%p", sl, s);
10506 if (shutdown && !
IsIdle()) {
10511 timeout = (timeout > 20) ? timeout : 20;
10609 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10640 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10658 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10684 Info(
"RegisterDataSet",
10685 "functionality not available: the server does not have dataset support");
10689 if (!dataSetName || strlen(dataSetName) <= 0) {
10690 Info(
"RegisterDataSet",
"specifying a dataset name is mandatory");
10698 parallelverify =
kTRUE;
10706 mess <<
TString(dataSetName);
10714 Error(
"RegisterDataSet",
"dataset was not saved");
10720 if (!parallelverify)
return result;
10725 Error(
"RegisterDataSet",
"problems verifying dataset '%s'", dataSetName);
10742 Info(
"SetDataSetTreeName",
"functionality not supported by the server");
10746 if (!dataset || strlen(dataset) <= 0) {
10747 Info(
"SetDataSetTreeName",
"specifying a dataset name is mandatory");
10751 if (!treename || strlen(treename) <= 0) {
10752 Info(
"SetDataSetTreeName",
"specifying a tree name is mandatory");
10768 Error(
"SetDataSetTreeName",
"some error occured: default tree name not changed");
10784 Info(
"GetDataSets",
10785 "functionality not available: the server does not have dataset support");
10788 if (
fProtocol < 31 && strstr(optStr,
":lite:"))
10789 Warning(
"GetDataSets",
"'lite' option not supported by the server");
10793 mess <<
TString(uri ? uri :
"");
10794 mess << TString(optStr ? optStr :
"");
10798 TMap *dataSetMap = 0;
10800 Error(
"GetDataSets",
"error receiving datasets information");
10806 Error(
"GetDataSets",
"error receiving datasets");
10808 Error(
"GetDataSets",
"message not found or wrong type (%p)", retMess);
10821 Info(
"ShowDataSets",
10822 "functionality not available: the server does not have dataset support");
10828 mess <<
TString(uri ? uri :
"");
10829 mess << TString(optStr ? optStr :
"");
10834 Error(
"ShowDataSets",
"error receiving datasets information");
10843 Info(
"ExistsDataSet",
"functionality not available: the server has an" 10844 " incompatible version of TFileInfo");
10848 if (!dataset || strlen(dataset) <= 0) {
10849 Error(
"ExistsDataSet",
"dataset name missing");
10871 Info(
"ClearDataSetCache",
"functionality not available on server");
10889 Info(
"ShowDataSetCache",
"functionality not available on server");
10912 Info(
"GetDataSet",
"functionality not available: the server has an" 10913 " incompatible version of TFileInfo");
10917 if (!uri || strlen(uri) <= 0) {
10918 Info(
"GetDataSet",
"specifying a dataset name is mandatory");
10925 nameMess << TString(optStr ? optStr:
"");
10927 Error(
"GetDataSet",
"sending request failed");
10932 Error(
"GetDataSet",
"error receiving datasets information");
10938 Error(
"GetDataSet",
"error reading list of files");
10940 Error(
"GetDataSet",
"message not found or wrong type (%p)", retMess);
10953 fileList->
Print(opt);
10956 Warning(
"ShowDataSet",
"no such dataset: %s", uri);
10967 nameMess <<
TString(uri?uri:
"");
10968 nameMess << TString(optStr?optStr:
"");
10970 Error(
"RemoveDataSet",
"sending request failed");
10984 Error (
"FindDataSets",
"not yet implemented");
10985 return (
TList *) 0;
10995 Error(
"RequestStagingDataSet",
10996 "functionality not supported by the server");
11007 Error(
"RequestStagingDataSet",
"staging request was unsuccessful");
11021 Error(
"CancelStagingDataSet",
11022 "functionality not supported by the server");
11033 Error(
"CancelStagingDataSet",
"cancel staging request was unsuccessful");
11048 Error(
"GetStagingStatusDataSet",
11049 "functionality not supported by the server");
11055 nameMess <<
TString(dataset);
11057 Error(
"GetStagingStatusDataSet",
"sending request failed");
11065 Error(
"GetStagingStatusDataSet",
"problem processing the request");
11073 Error(
"GetStagingStatusDataSet",
"error reading list of files");
11076 Error(
"GetStagingStatusDataSet",
11077 "response message not found or wrong type (%p)", retMess);
11105 Info(
"VerifyDataSet",
"functionality not available: the server has an" 11106 " incompatible version of TFileInfo");
11111 if (!uri || (uri && strlen(uri) <= 0)) {
11112 Error(
"VerifyDataSet",
"dataset name is is mandatory");
11116 Int_t nmissingfiles = 0;
11121 Info(
"VerifyDataSet",
"Master-only verification");
11124 nameMess <<
TString(uri ? uri :
"");
11131 Info(
"VerifyDataSet",
"no such dataset %s", uri);
11135 return nmissingfiles;
11140 Error(
"VerifyDataSet",
"PROOF is in sequential mode (no workers): cannot do parallel verification.");
11141 Error(
"VerifyDataSet",
"Either start PROOF with some workers or force sequential adding 'S' as option.");
11155 Int_t nmissingfiles = 0;
11170 Int_t oldifiip = -1;
11175 const char* mss=
"";
11177 const char* stageoption=
"";
11195 if (oldifiip > -1) {
11196 SetParameter(
"PROOF_IncludeFileInfoInPacket", oldifiip);
11204 Int_t ntouched = 0;
11210 while ((obj = nxtout())) {
11215 while ((fiindout = (
TFileInfo*) nxt())) {
11216 lfiindout->
Add(fiindout);
11222 nmissingfiles += pdisappeared->
GetVal();
11226 nopened += pnopened->
GetVal();
11230 ntouched += pntouched->
GetVal();
11238 Info(
"VerifyDataSetParallel",
"%s: changed? %d (# files opened = %d, # files touched = %d," 11239 " # missing files = %d)",
11240 uri, changed_ds, nopened, ntouched, nmissingfiles);
11242 return nmissingfiles;
11251 Info(
"UploadDataSet",
"Lite-session: functionality not implemented");
11257 mess <<
TString(optStr?optStr:
"");
11261 TMap *groupQuotaMap = 0;
11263 Info(
"GetDataSetQuota",
"could not receive quota");
11269 Error(
"GetDataSetQuota",
"error getting quotas");
11271 Error(
"GetDataSetQuota",
"message not found or wrong type (%p)", retMess);
11274 return groupQuotaMap;
11284 Info(
"ShowDataSetQuota",
11285 "functionality not available: the server does not have dataset support");
11290 Info(
"UploadDataSet",
"Lite-session: functionality not implemented");
11301 Error(
"ShowDataSetQuota",
"error receiving quota information");
11364 if (!ord || strlen(ord) <= 0) {
11365 Info(
"ModifyWorkerLists",
11366 "an ordinal number - e.g. \"0.4\" or \"*\" for all - is required as input");
11370 Info(
"ModifyWorkerLists",
"ord: '%s' (add: %d, save: %d)", ord, add, save);
11407 while(oo.Tokenize(o, from,
","))
11417 while ((wrk = (
TSlave *) nxw())) {
11440 if (!allord && ords) {
11441 if (os) ords->
Remove(os);
11442 if (ords->
GetSize() == 0)
break;
11449 if (!fw && ords && ords->
GetSize() > 0) {
11452 while ((os = nxo())) {
11454 while ((wrk = (
TSlave *) nxw()))
11457 if (!oo.
IsNull()) oo +=
",";
11462 Warning(
"ModifyWorkerLists",
"worker(s) '%s' not found!", oo.
Data());
11482 mess << action <<
TString(ord);
11489 Warning(
"ModifyWorkerLists",
"request not completely full filled");
11491 Error(
"ModifyWorkerLists",
"request failed");
11496 if (oo.Contains(
","))
11497 Warning(
"ModifyWorkerLists",
"block request not supported by server: splitting into pieces ...");
11499 while(oo.Tokenize(o, from,
",")) {
11501 mess << action << o;
11556 const char *confdir,
Int_t loglevel)
11558 const char *pn =
"TProof::Open";
11566 ::Error(pn,
"plugin manager not found");
11570 if (
gROOT->IsBatch()) {
11571 ::Error(pn,
"we are in batch mode, cannot show PROOF Session Viewer");
11577 ::Error(pn,
"no plugin found for TSessionViewer");
11581 ::Error(pn,
"plugin for TSessionViewer could not be loaded");
11605 if (!opts.IsNull()) {
11608 TString sport = opts(it + strlen(
"tunnel="), opts.Length());
11614 host = sport(0, ic);
11615 sport.
Remove(0, ic + 1);
11626 port = sport.
Atoi();
11629 ::Info(
"TProof::Open",
"using tunnel at %s:%d", host.Data(), port);
11635 "problems parsing tunnelling info from options: %s", opts.Data());
11643 if (opts.Length() > 0) {
11648 }
else if (opts.IsDigit()) {
11649 locid = opts.Atoi();
11671 if (!proof || !proof->IsValid()) {
11673 ::Error(pn,
"new session could not be attached");
11682 if (!proof || !proof->IsValid()) {
11683 ::Error(pn,
"new session could not be created");
11716 "unable to initialize a valid manager instance");
11734 if (
gDebug > 0)
::Info(
"TProof::AddEnvVar",
"%s=%s", name, value);
11788 Error(
"SaveWorkerInfo",
"gProofServ undefined");
11794 Warning(
"SaveWorkerInfo",
"all relevant worker lists is undefined");
11801 FILE *fwrk = fopen(fnwrk.
Data(),
"w");
11803 Error(
"SaveWorkerInfo",
11804 "cannot open %s for writing (errno: %d)", fnwrk.
Data(), errno);
11814 if (reLogTag.
Match(addlogext) == 2) {
11815 addLogTag = reLogTag[1];
11821 Info(
"SaveWorkerInfo",
"request for additional line with ext: '%s'", addlogext.
Data());
11831 while ((wrk = (
TSlave *) nxa())) {
11834 if (re.Match(logfile) == 2) logfile = re[1];
11837 fprintf(fwrk,
"%s@%s:%d %d %s %s.log\n",
11841 if (addlogext.
Length() > 0) {
11842 fprintf(fwrk,
"%s@%s:%d %d %s(%s) %s.%s\n",
11852 while ((wrk = (
TSlave *) nxb())) {
11854 if (re.Match(logfile) == 2) logfile = re[1];
11858 fprintf(fwrk,
"%s@%s:%d 0 %s %s.log\n",
11871 if (re.Match(logfile) == 2) logfile = re[1];
11873 fprintf(fwrk,
"%s 2 %s %s.log\n",
11876 if (addlogext.
Length() > 0) {
11877 fprintf(fwrk,
"%s 2 %s(%s) %s.%s\n",
11879 logfile.
Data(), addlogext.
Data());
11993 if (!dset || !input || !mgr) {
11994 emsg.
Form(
"invalid inputs (%p, %p, %p)", dset, input, mgr);
12003 if (dsname.BeginsWith(
"TFileCollection:")) {
12009 emsg.
Form(
"TFileCollection %s not found in input list", dset->
GetName());
12020 input->
Add(
new TNamed(
"PROOF_LookupOpt", lookupopt.
Data()));
12036 TString dsns( dsname.Data() ), enl;
12039 if (eli !=
kNPOS) {
12040 enl = dsns(eli+5, dsns.Length());
12041 dsns.Remove(eli, dsns.Length()-eli);
12051 if (validEnl && validSdsn && (( fc = mgr->
GetDataSet(dsns) ))) {
12074 dsns = dsname.Data();
12077 while (dsns.Tokenize(dsn1, from1,
"[, ]")) {
12080 while (dsn1.
Tokenize(dsn2, from2,
"|")) {
12083 if (ienl !=
kNPOS) {
12084 enl = dsn2(ienl + 5, dsn2.
Length());
12121 if (!datasets || datasets->
GetSize() <= 0) {
12122 emsg.
Form(
"no dataset(s) found on the master corresponding to: %s", dsname.Data());
12127 emsg.
Form(
"dataset pointer is null: corruption? - aborting");
12134 lookupopt =
gEnv->
GetValue(
"Proof.LookupOpt",
"stagedOnly");
12135 input->
Add(
new TNamed(
"PROOF_LookupOpt", lookupopt.
Data()));
12155 if (!dsTree.
IsNull() && dsTree !=
"/") {
12158 if (idx !=
kNPOS) {
12171 TList *srvmapslist = srvmapsref;
12177 if (srvmapsref && !srvmapslist) {
12178 msg.
Form(
"+++ Info: dataset server mapping(s) DISABLED by user");
12179 }
else if (srvmapsref && srvmapslist && srvmapslist != srvmapsref) {
12180 msg.
Form(
"+++ Info: dataset server mapping(s) modified by user");
12181 }
else if (!srvmapsref && srvmapslist) {
12182 msg.
Form(
"+++ Info: dataset server mapping(s) added by user");
12194 TIter nxds(datasets);
12195 while ((pair = (
TPair *) nxds())) {
12205 " entry list %s not found", os->
GetName()));
12210 " no sub-lists in entry-list!"));
12220 if (!ds->
Add(files, dsTree, availableOnly, missingFiles)) {
12221 emsg.
Form(
"error integrating dataset %s", dataset->
GetName());
12230 if (!dset->
Add(files, dsTree, availableOnly, missingFiles)) {
12231 emsg.
Form(
"error integrating dataset %s", dataset->
GetName());
12234 if (enl) entrylist = enl;
12236 if (missingFiles) {
12239 TIter next(missingFiles);
12241 while ((file = next())) {
12243 listOfMissingFiles->
Add(file);
12246 missingFiles->
Clear();
12252 while ((pair = (
TPair *) nxds())) {
12253 if (pair->
Key())
delete pair->
Key();
12260 if (srvmapslist && srvmapslist != srvmapsref) {
12271 if (listOfMissingFiles && listOfMissingFiles->
GetSize() > 0) {
12272 listOfMissingFiles->
SetName(
"MissingFiles");
12273 input->
Add(listOfMissingFiles);
12290 !cachedir || strlen(cachedir) <= 0)
return 0;
12295 if (!data && !inputdata)
return 0;
12302 if (dstname.BeginsWith(
"cache:")) {
12304 dstname.ReplaceAll(
"cache:",
"");
12305 srcname.Form(
"%s/%s", cachedir, dstname.Data());
12307 emsg.
Form(
"input data file not found in cache (%s)", srcname.Data());
12315 emsg.
Form(
"problems copying %s to %s", srcname.Data(), dstname.Data());
12320 if (inputdata && inputdata->
GetSize() > 0) {
12324 inputdata->
Write();
12328 emsg.
Form(
"could not create %s", dstname.Data());
12332 emsg.
Form(
"no input data!");
12336 ::Info(
"TProof::SaveInputData",
"input data saved to %s", dstname.Data());
12341 input->
Remove(inputdata);
12362 if (!inputdata)
return 0;
12366 emsg.
Form(
"input data file not found in sandbox (%s)", fname.Data());
12372 emsg.
Form(
"TProof object undefined or invalid: protocol error!");
12389 if (!input || !cachedir || strlen(cachedir) <= 0)
return 0;
12393 if (!inputdata)
return 0;
12398 emsg.
Form(
"input data file not found in cache (%s)", fname.
Data());
12404 added->
SetName(
"PROOF_InputObjsFromFile");
12410 emsg.
Form(
"could not get list of object keys from file");
12415 while ((k = (
TKey *)nxk())) {
12432 emsg.
Form(
"could not open %s", fname.
Data());
12445 if (!
gROOT->IsBatch()) {
12449 gROOT->GetPluginManager()->FindHandler(
"TProofProgressLog"))) {
12452 ::Error(
"TProof::LogViewer",
"cannot load the relevant plug-in");
12459 TString u = (url && strlen(url) <= 0) ?
"lite" : url;
12463 if (url && strlen(url) > 0) {
12464 ::Info(
"TProof::LogViewer",
12465 "batch mode: use TProofLog *pl = TProof::Mgr(\"%s\")->GetSessionLogs(%d)", url, idx);
12466 }
else if (url && strlen(url) <= 0) {
12467 ::Info(
"TProof::LogViewer",
12468 "batch mode: use TProofLog *pl = TProof::Mgr(\"lite\")->GetSessionLogs(%d)", idx);
12470 ::Info(
"TProof::LogViewer",
12471 "batch mode: use TProofLog *pl = TProof::Mgr(\"<master>\")->GetSessionLogs(%d)", idx);
12499 Warning(
"ShowMissingFiles",
"no (last) query found: do nothing");
12506 Info(
"ShowMissingFiles",
"no files missing in query %s:%s", xqr->
GetTitle(), xqr->
GetName());
12510 Int_t nmf = 0, ncf = 0;
12511 Long64_t msz = 0, mszzip = 0, mev = 0;
12514 TIter nxf(missing);
12535 if (msz <= 0) msz = -1;
12536 if (mszzip <= 0) mszzip = -1;
12538 if (msz > 0. || mszzip > 0.) {
12539 Printf(
" +++ %d file(s) missing, %d corrupted, i.e. %lld unprocessed events -->" 12540 " about %.2f%% of the total (%lld bytes, %lld zipped)",
12541 nmf, ncf, mev, xf * 100., msz, mszzip);
12543 Printf(
" +++ %d file(s) missing, %d corrupted, i.e. %lld unprocessed events -->" 12544 " about %.2f%% of the total", nmf, ncf, mev, xf * 100.);
12560 Warning(
"GetMissingFiles",
"no (last) query found: do nothing");
12568 Info(
"ShowMissingFiles",
"no files missing in query %s:%s", xqr->
GetTitle(), xqr->
GetName());
12578 while (
gDirectory->FindObject(fcname) && j < 1000)
12585 TIter nxf(missing);
12599 if (pf && strlen(pf) > 0) {
12603 if (withWrks)
SetParameter(
"PROOF_SlaveStatsTrace",
"");
12604 Info(
"SetPerfTree",
"saving of the performance tree enabled (%s)",
fPerfTree.
Data());
12610 Info(
"SetPerfTree",
"saving of the performance tree disabled");
12622 Error(
"SafePerfTree",
"this TProof instance is invalid!");
12628 if (ref && strlen(ref) > 0) {
12630 Error(
"SafePerfTree",
"requested to use query '%s' but player instance undefined!", ref);
12635 Error(
"SafePerfTree",
"TQueryResult instance for query '%s' could not be retrieved", ref);
12639 sref.
Form(
" for requested query '%s'", ref);
12641 if (!outls || (outls && outls->
GetSize() <= 0)) {
12642 Error(
"SafePerfTree",
"outputlist%s undefined or empty", sref.
Data());
12647 if (pf && strlen(pf)) fn = pf;
12648 if (fn.
IsNull()) fn =
"perftree.root";
12650 TFile f(fn,
"RECREATE");
12652 Error(
"SavePerfTree",
"could not open file '%s' for writing", fn.
Data());
12657 while ((obj = nxo())) {
12659 if (objname.BeginsWith(
"PROOF_")) {
12662 if (objname ==
"PROOF_PerfStats" ||
12663 objname ==
"PROOF_PacketsHist" ||
12664 objname ==
"PROOF_EventsHist" ||
12665 objname ==
"PROOF_NodeHist" ||
12666 objname ==
"PROOF_LatencyHist" ||
12667 objname ==
"PROOF_ProcTimeHist" ||
12668 objname ==
"PROOF_CpuTimeHist")
12674 Info(
"SavePerfTree",
"performance information%s saved in %s ...", sref.
Data(), fn.
Data());
static Int_t FindParPath(TPackMgr *packmgr, const char *pack, TString &par)
Get the full path to PAR, looking also in the global dirs.
Int_t GetSlaveType() const
virtual TQueryResult * GetQueryResult(const char *ref)=0
void Add(TObject *obj, const char *name=0, Int_t check=-1)
Add object with name to browser.
void AddInputData(TObject *obj, Bool_t push=kFALSE)
Add data objects that might be needed during the processing of the selector (see Process()).
virtual void DeleteDrawFeedback(TDrawFeedback *f)=0
virtual Bool_t cd(const char *path=0)
Change current directory to "this" directory.
Int_t HandleInputMessage(TSlave *wrk, TMessage *m, Bool_t deactonfail=kFALSE)
Analyze the received message.
virtual const char * BaseName(const char *pathname)
Base name of a file name. Base name of /user/root is root.
static void AssertMacroPath(const char *macro)
Make sure that the directory path contained by macro is in the macro path.
virtual const char * GetName() const
Returns name of object.
virtual void SysError(const char *method, const char *msgfmt,...) const
Issue system error message.
virtual void ShowData()
List contents of the data directory in the sandbox.
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write this object to the current directory.
Long64_t GetEntries() const
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
void AddFeedback(const char *name)
Add object to feedback list.
static void SystemCmd(const char *cmd, Int_t fdout)
Exec system command 'cmd'. If fdout > -1, append the output to fdout.
virtual Bool_t AccessPathName(const char *path, EAccessMode mode=kFileExists)
Returns FALSE if one can access a file using the specified access mode.
TList * GetListOfLines() const
void RecvLogFile(TSocket *s, Int_t size)
Receive the log file of the slave with socket s.
void Interrupt(EUrgent type, ESlaves list=kActive)
Send interrupt to master or slave servers.
void SetSessionTag(const char *st)
double read(const std::string &file_name)
reading
virtual Int_t SendGroupPriority(const char *, Int_t)
void Progress(Long64_t total, Long64_t processed)
Get query progress information.
const Long64_t kPROOF_DynWrkPollInt_s
TList * GetListOfBadSlaves() const
TMD5 * ReadMD5(const char *pack)
Read MD5 checksum of the PAR file from the PROOF-INF/md5.txt file.
virtual Bool_t IsAbsoluteFileName(const char *dir)
Return true if dir is an absolute pathname.
virtual void SetAlias(const char *alias="")
Set an alias for this session.
virtual TList * GetInputList() const =0
The PROOF package manager contains tools to manage packages.
virtual void WriteString(const char *s)
Write string to I/O buffer.
virtual Bool_t IsValid() const
virtual void AddOutput(TList *out)=0
virtual int GetPid()
Get process id.
Int_t UnloadPackages()
Unload all packages.
const char * GetName() const
Returns name of object.
virtual void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
void ActivateAsyncInput()
Activate the a-sync input handler.
const char * GetName() const
Returns name of object.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Int_t GetNumberOfActiveSlaves() const
Return number of active slaves, i.e.
void AskParallel()
Ask the for the number of parallel slaves.
virtual TFileCollection * GetStagingStatusDataSet(const char *dataset)
Obtains a TFileCollection showing the staging status of the specified dataset.
Long64_t GetBytesRead() const
static Bool_t GetFileInCmd(const char *cmd, TString &fn)
Static method to extract the filename (if any) form a CINT command.
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
Bool_t R_ISLNK(Int_t mode)
Int_t ClearPackages()
Remove all packages.
virtual const char * GetBuildCompilerVersion() const
Return the build compiler version.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
TMonitor * fAllUniqueMonitor
void MarkBad(TSlave *wrk, const char *reason=0)
Add a bad slave server to the bad slave list and remove it from the active list and from the two moni...
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void AddInput(TObject *inp)=0
const char * GetDefaultTreeName() const
Returns the tree set with SetDefaultTreeName if set Returns the name of the first tree in the meta da...
void Activate(TList *slaves=0)
Activate slave server list.
void RemoveChain(TChain *chain)
Remove chain from data set.
virtual Bool_t ExistsDataSet(const char *dataset)
Returns kTRUE if 'dataset' exists, kFALSE otherwise.
static double p3(double t, double a, double b, double c, double d)
virtual ~TProof()
Clean up PROOF environment.
void PrepareInputDataFile(TString &dataFile)
Prepare the file with the input data objects to be sent the master; the objects are taken from the de...
R__EXTERN TProofDebug::EProofDebugMask gProofDebugMask
Int_t EnablePackage(const char *package, Bool_t notOnClient=kFALSE, TList *workers=0)
Enable specified package.
virtual void MarkBad(TSlave *, TProofProgressStatus *, TList **)
void ClearData(UInt_t what=kUnregistered, const char *dsname=0)
Remove files for the data directory.
void SetPerfTree(const char *pf="perftree.root", Bool_t withWrks=kFALSE)
Enable/Disable saving of the performance tree.
virtual const char * WorkingDirectory()
Return working directory.
virtual Bool_t RegisterDataSet(const char *name, TFileCollection *dataset, const char *optStr="")
Register the 'dataSet' on the cluster under the current user, group and the given 'dataSetName'...
virtual Long64_t DrawSelect(TDSet *dset, const char *varexp, const char *selection="", Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Execute the specified drawing action on a data set (TDSet).
static TMD5 * FileChecksum(const char *file)
Returns checksum of specified file.
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
void SetRealTimeLog(Bool_t on=kTRUE)
Switch ON/OFF the real-time logging facility.
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange, Bool_t resume=kFALSE)
Get list of workers to be used from now on.
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
virtual void FlushSocket()
Collectable string class.
virtual TVirtualProofPlayer * MakePlayer(const char *player=0, TSocket *s=0)
Construct a TProofPlayer object.
const char * GetHostName() const
void SetPrepTime(Float_t preptime)
virtual TString SplitAclicMode(const char *filename, TString &mode, TString &args, TString &io) const
This method split a filename of the form: ~~~ {.cpp} [path/]macro.C[+|++[k|f|g|O|c|s|d|v|-]][(args)]...
void SetMonitor(TMonitor *mon=0, Bool_t on=kTRUE)
Activate (on == TRUE) or deactivate (on == FALSE) all sockets monitored by 'mon'. ...
Int_t GetParallel() const
virtual Int_t Reconnect()
void InterruptCurrentMonitor()
If in active in a monitor set ready state.
void SetPasswd(const char *pw)
virtual void SetMaxDrawQueries(Int_t max)=0
virtual Bool_t StartSlaves(Bool_t attach=kFALSE)
Start up PROOF slaves.
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
virtual void ShowDataSetCache(const char *dataset=0)
Display the content of the dataset cache, if any (matching 'dataset', if defined).
This class represents a WWW compatible URL.
TString & ReplaceAll(const TString &s1, const TString &s2)
void SetupWorkersEnv(TList *wrks, Bool_t increasingpool=kFALSE)
Set up packages, loaded macros, include and lib paths ...
int GetPathInfo(const char *path, Long_t *id, Long_t *size, Long_t *flags, Long_t *modtime)
Get info about a file: id, size, flags, modification time.
TUrl * GetCurrentUrl() const
Return the current url.
virtual Long64_t DrawSelect(TDSet *set, const char *varexp, const char *selection, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
void SetPlayer(TVirtualProofPlayer *player)
Set a new PROOF player.
TObject * GetParameter(const char *par) const
Get specified parameter.
TSlave * FindSlave(TSocket *s) const
Find slave that has TSocket s. Returns 0 in case slave is not found.
Bool_t TestBit(UInt_t f) const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
const char * GetProtocol() const
This class implements a data set to be used for PROOF processing.
const char * GetGroup() const
TList * GetOutputNames()
FIXME: to be written.
std::istream & ReadLine(std::istream &str, Bool_t skipWhite=kTRUE)
Read a line from stream upto newline skipping any whitespace.
void ShowParameters(const char *wildcard="PROOF_*") const
Show the input list parameters specified by the wildcard.
virtual void SetName(const char *name)
Set the name of the TNamed.
void TerminateWorker(TSlave *wrk)
Ask an active worker 'wrk' to terminate, i.e. to shutdown.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Int_t Unload(const char *pack)
Method to unload a package.
virtual TList * GetLists() const
TProof * GetProof() const
virtual TString GetFromPipe(const char *command)
Execute command and return output in TString.
virtual void FindUniqueSlaves()
Add to the fUniqueSlave list the active slaves that have a unique (user) file system image...
Int_t FindNextFreeMerger()
Return a merger, which is both active and still accepts some workers to be assigned to it...
void SetParameter(const char *par, const char *value)
Set input list parameter.
virtual void StoreFeedback(TObject *slave, TList *out)=0
TUrl * NextUrl()
Iterator function, start iteration by calling ResetUrl().
The PROOF manager interacts with the PROOF server coordinator to create or destroy a PROOF session...
void SetROOTVersion(const char *rv)
TSocket * GetSocket() const
Bool_t IsFloat() const
Returns kTRUE if string contains a floating point or integer number.
const char *const kPROOF_WorkDir
static const TList * GetEnvVars()
Get environemnt variables.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
virtual EExitStatus GetExitStatus() const =0
virtual void Add()
Add signal handler to system signal handler list.
virtual Int_t GetEntries() const
const char * GetMsd() const
virtual Bool_t ReadBuffer(char *buf, Int_t len)
Read a buffer from the file.
virtual TFileCollection * GetDataSet(const char *dataset, const char *optStr="")
Get a list of TFileInfo objects describing the files of the specified dataset.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
TList * GetOutputList()
Get list with all object created during processing (see Process()).
void SetArchCompiler(const char *ac)
TObject * GetInputObject(const char *classname) const
Return first instance of class 'classname' in the input list.
virtual void AddSignalHandler(TSignalHandler *sh)
Add a signal handler to list of system signal handlers.
Int_t DisablePackage(const char *package)
Remove a specific package.
void ToUpper()
Change string to upper case.
virtual TFileCollection * GetDataSet(const char *uri, const char *server=0)
Utility function used in various methods for user dataset upload.
Regular expression class.
const char * GetName() const
Returns name of object.
TFileCollection * GetMissingFiles(TQueryResult *qr=0)
Get a TFileCollection with the files missing in the query described by 'qr' or the last query if qr i...
virtual TObject * Get(const char *namecycle)
Return pointer to object identified by namecycle.
TPluginHandler * FindHandler(const char *base, const char *uri=0)
Returns the handler if there exists a handler for the specified URI.
Int_t GetPerfIndex() const
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
void Delete(Option_t *option="")
Remove all objects from the list AND delete all heap based objects.
const char * GetOrdinal() const
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Long64_t GetFirst() const
virtual void StopProcess(Bool_t abort, Int_t timeout)
Sent stop/abort request to PROOF server.
Class supporting a collection of lines with C++ code.
Int_t LoadPlugin()
Load the plugin library for this handler.
TList * GetListOfSlaves() const
virtual void SetCurrentQuery(TQueryResult *q)=0
const char * GetOrdinal() const
virtual void Add()
Add file event handler to system file handler list.
virtual void Progress(Long64_t total, Long64_t processed)=0
Int_t GetNumberOfSlaves() const
Return number of slaves as described in the config file.
void SetDSet(TDSet *dset)
virtual void SetInterruptHandler(Bool_t)
Int_t GetClientProtocol() const
virtual void SaveWorkerInfo()
Save information about the worker set in the file .workers in the working dir.
Int_t Load(const char *pack, TList *optls=0)
Method to load a package taking an option list Return -1 on error, 0 otherwise.
virtual const char * DirName(const char *pathname)
Return the directory name in pathname.
void RemoveFeedback(const char *name)
Remove object from feedback list.
virtual Bool_t JoinProcess(TList *workers)=0
void SetProgressDialog(Bool_t on=kTRUE)
Enable/Disable the graphic progress dialog.
R__EXTERN TVirtualMutex * gROOTMutex
void LogMessage(const char *msg, Bool_t all)
Log a message into the appropriate window by emitting a signal.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
void SetQueryMode(EQueryMode mode)
Change query running mode to the one specified by 'mode'.
void SetUser(const char *user)
Int_t Broadcast(const TMessage &mess, TList *slaves)
Broadcast a message to all slaves in the specified list.
const char * GetOptions() const
const TString & GetImage() const
virtual char * Which(const char *search, const char *file, EAccessMode mode=kFileExists)
Find location of file in a search path.
virtual void RemoveAll()
Remove all sockets from the monitor.
void ShowLog(Int_t qry=-1)
Display on screen the content of the temporary log file.
This class represents a RFC 3986 compatible URI.
void SetMaxDrawQueries(Int_t max)
Set max number of draw queries whose results are saved.
TFileHandler * GetInputHandler() const
const char * GetConfFile() const
virtual FILE * OpenPipe(const char *command, const char *mode)
Open a pipe.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
const char * GetName() const
Returns name of object.
static TPluginHandler * fgLogViewer
TString & Prepend(const char *cs)
R__EXTERN TApplication * gApplication
Bool_t Contains(const char *name) const
void EmitVA(const char *signal_name, Int_t, const T &... params)
Int_t ModifyWorkerLists(const char *ord, Bool_t add, Bool_t save)
Modify the worker active/inactive list by making the worker identified by the ordinal number 'ord' ac...
void Show(const char *title=0)
Show available packages.
virtual void DeActivateAll()
De-activate all activated sockets.
Bool_t IsTopMaster() const
TMacro * GetLogFile() const
TObject * FindObject(const char *name) const
Find object using its name.
void PutLog(TQueryResult *qr)
Display log of query pq into the log window frame.
Long64_t GetEntries() const
void CloseProgressDialog()
Close progress dialog.
Int_t GetNumberOfQueries()
Number of queries processed by this session.
Int_t AddIncludePath(const char *incpath, Bool_t onClient=kFALSE, TList *wrks=0, Bool_t doCollect=kTRUE)
Add 'incpath' to the inc path search.
virtual TObject * Clone(const char *newname="") const
Make a clone of an collection using the Streamer facility.
const char * GetDataPoolUrl() const
static void ResolveKeywords(TString &fname, const char *path=0)
Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and <build> placeholders in fname...
static void retrieve(const gsl_integration_workspace *workspace, double *a, double *b, double *r, double *e)
TString & Insert(Ssiz_t pos, const char *s)
virtual void ClearCache(const char *file=0)
Remove file from all file caches.
void DisableGoAsyn()
Signal to disable related switches.
Int_t SendObject(const TObject *obj, ESlaves list=kActive)
Send object to master or slave servers.
Int_t SendCurrentState(ESlaves list=kActive)
Transfer the current state of the master to the active slave servers.
Int_t BroadcastRaw(const void *buffer, Int_t length, TList *slaves)
Broadcast a raw buffer of specified length to all slaves in the specified list.
TList * GetListOfElements() const
TVirtualProofPlayer * fPlayer
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
virtual int mkdir(const char *name, Bool_t recursive=kFALSE)
Make a file system directory.
const char * GetOutputFileName() const
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
void SendParallel(Bool_t async=kFALSE)
Send number of parallel nodes to master or client.
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
const char * GetImage() const
virtual void DiscardSession(TProof *p)
Discard TProofDesc of session 'p' from the internal list.
TString & Replace(Ssiz_t pos, Ssiz_t n, const char *s)
static const char * GetMacroPath()
Get macro search path. Static utility function.
static Int_t GetErrno()
Static function returning system error number.
static TProofMgr * Mgr(const char *url)
Get instance of the effective manager for 'url' Return 0 on failure.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
virtual void SetValue(const char *name, const char *value, EEnvLevel level=kEnvChange, const char *type=0)
Set the value of a resource or create a new resource.
const char * GetFileName() const
virtual Bool_t Cp(const char *dst, Bool_t progressbar=kTRUE, UInt_t buffersize=1000000)
Allows to copy this file to the dst URL.
Int_t RemoveIncludePath(const char *incpath, Bool_t onClient=kFALSE)
Remove 'incpath' from the inc path search.
virtual Long64_t Process(TDSet *set, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)=0
Int_t SavePerfTree(const char *pf=0, const char *qref=0)
Save performance information from TPerfStats to file 'pf'.
Bool_t fFinalizationRunning
const char * GetFile() const
const char * GetConfDir() const
Int_t Install(const char *par, Bool_t rmold=kFALSE)
Install package from par (unpack the file in the directory); par can be an URL for remote retrieval...
TSignalHandler * fIntHandler
Manages an element of a TDSet.
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
void AddMergedObjects(Int_t objects)
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
TList * GetListOfEnabled() const
Get list of enabled packages Returns a pointer to a TList object, transferring ownership to the calle...
static struct mg_connection * fc(struct mg_context *ctx)
Bool_t SendingLogToWindow() const
const char * GetHost() const
virtual TVirtualPacketizer * GetPacketizer() const
Int_t Update(Long64_t avgsize=-1)
Update accumulated information about the elements of the collection (e.g.
TPluginHandler * fProgressDialog
virtual TList * GetOutputList() const =0
Int_t fLastAssignedMerger
virtual int Unlink(const char *name)
Unlink, i.e. remove, a file.
Sequenceable collection abstract base class.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Bool_t fSendGroupView
list returned by kPROOF_GETSLAVEINFO
void Stop()
Stop the stopwatch.
virtual Bool_t CancelStagingDataSet(const char *dataset)
Cancels a dataset staging request.
virtual Bool_t IsValid() const
const char * Export(Bool_t &changed)
virtual void Print(Option_t *option="") const
Print status of PROOF cluster.
void StopProcess(Bool_t abort, Int_t timeout=-1)
Send STOPPROCESS message to master and workers.
virtual TObjString * GetLineWith(const char *text) const
Search the first line containing text.
virtual Bool_t IsProofd() const
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
virtual FILE * TempFileName(TString &base, const char *dir=0)
Create a secure temporary file by appending a unique 6 letter string to base.
void UpdateDialog()
Final update of the progress dialog.
virtual Bool_t IsLite() const
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
TSlave * CreateSlave(const char *url, const char *ord, Int_t perf, const char *image, const char *workdir)
Create a new TSlave of type TSlave::kSlave.
TSocket * GetSocket() const
This code implements the MD5 message-digest algorithm.
The TNamed class is the base class for all named ROOT classes.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
void SetLogLevel(Int_t level, UInt_t mask=TProofDebug::kAll)
Set server logging level.
virtual TDSetElement * GetNextPacket(TSlave *slave, TMessage *r)=0
Int_t DownloadPackage(const char *par, const char *dstdir=0)
Download a PROOF archive (PAR file) from the master package repository.
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
Int_t GetLogLevel() const
void DeActivateAsyncInput()
De-activate a-sync input handler.
virtual void ReleaseWorker(const char *)
virtual void ShowCache(Bool_t all=kFALSE)
List contents of file cache.
static EFileType GetType(const char *name, Option_t *option="", TString *prefix=0)
Resolve the file type as a function of the protocol field in 'name'.
virtual Int_t Rm(const char *, const char *=0, const char *=0)
Run 'rm' on 'what'. Locally it is just a call to TSystem::Unlink .
Bool_t IsEndMaster() const
void ShowEnabled(const char *title=0)
Show enabled packages.
const char *const kPROOF_WorkerIdleTO
virtual const char * GetDynamicPath()
Return the dynamic path (used to find shared libraries).
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriory.
void SetInputDataFile(const char *datafile)
Set the file to be used to optimally distribute the input data objects.
const char * GetGroup() const
void ShowDataSet(const char *dataset="", const char *opt="filter:SsCc")
display meta-info for given dataset usi
static double p2(double t, double a, double b, double c)
static TList * GetDataSetSrvMaps()
Static getter for server mapping list.
virtual const char * Getenv(const char *env)
Get environment variable.
virtual UInt_t Integer(UInt_t imax)
Returns a random integer on [ 0, imax-1 ].
Int_t Collect(const TSlave *sl, Long_t timeout=-1, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect responses from slave sl.
static void LogViewer(const char *url=0, Int_t sessionidx=0)
Start the log viewer window usign the plugin manager.
const char * GetDirectory() const
Return directory where to look for object.
void ResetMergePrg()
Reset the merge progress notificator.
A sorted doubly linked list.
void Info(const char *location, const char *msgfmt,...)
virtual TMap * GetDataSets(const char *uri="", const char *optStr="")
Lists all datasets that match given uri.
Int_t GoMoreParallel(Int_t nWorkersToAdd)
Add nWorkersToAdd workers to current list of workers.
Int_t Init(const char *masterurl, const char *conffile, const char *confdir, Int_t loglevel, const char *alias=0)
Start the PROOF environment.
virtual void ShowDataSets(const char *uri="", const char *optStr="")
Shows datasets in locations that match the uri.
virtual void AddEventsProcessed(Long64_t ev)=0
TString & Append(const char *cs)
Bool_t EndsWith(const char *pat, ECaseCompare cmp=kExact) const
Return true if string ends with the specified string.
virtual Int_t PollForNewWorkers()
Asks the PROOF Serv for new workers in Dynamic Startup mode and activates them.
const char * GetSessionTag() const
virtual UserGroup_t * GetUserInfo(Int_t uid)
Returns all user info in the UserGroup_t structure.
TSlave * CreateSubmaster(const char *url, const char *ord, const char *image, const char *msd, Int_t nwk=1)
Create a new TSlave of type TSlave::kMaster.
void Validate()
Validate the TDSet by opening files.
void ClearDataProgress(Int_t r, Int_t t)
Progress bar for clear data.
virtual TProofDesc * GetProofDesc(Int_t id)
Get TProofDesc instance corresponding to 'id'.
const char * GetProofWorkDir() const
void FlushLogFile()
Reposition the read pointer in the log file to the very end.
TQueryResult * GetQueryResult(const char *ref=0)
Return pointer to the full TQueryResult instance owned by the player and referenced by 'ref'...
static TProof * Open(const char *url=0, const char *conffile=0, const char *confdir=0, Int_t loglevel=0)
Start a PROOF session on a specific cluster.
const char * GetDir(Bool_t raw=kFALSE) const
Ssiz_t First(char c) const
Find first occurrence of a character c.
Int_t UploadDataSet(const char *, TList *, const char *=0, Int_t=0, TList *=0)
*** This function is deprecated and will disappear in future versions *** *** It is just a wrapper ar...
virtual void SetOutputList(TList *out, Bool_t adopt=kTRUE)
Set / change the output list.
virtual const char * TempDirectory() const
Return a user configured or systemwide directory to create temporary files in.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
virtual void Interrupt(Int_t type)
Send interrupt OOB byte to master or slave servers.
TList * GetEnabledPackages() const
void SetDirectory(const char *dir)
Set/change directory.
const Bool_t kSortDescending
friend class TProofInputHandler
const char * GetDataDir() const
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
void SetSrvMaps(TList *srvmaps=0)
Set (or unset) the list for mapping servers coordinate for files.
Bool_t IsRetrieve() const
A container class for query results.
TList * GetQueryResults()
Return pointer to the list of query results in the player.
Int_t ActivateWorker(const char *ord, Bool_t save=kTRUE)
Make sure that the worker identified by the ordinal number 'ord' is in the active list...
PrintProgress_t fPrintProgress
TList * GetListOfSlaveInfos()
Returns list of TSlaveInfo's. In case of error return 0.
virtual void Close(Option_t *opt="")
Close slave socket.
void Error(const char *location, const char *msgfmt,...)
static TList * fgProofEnvList
void HandleLibIncPath(const char *what, Bool_t add, const char *dirs)
Handle lib, inc search paths modification request.
Float_t GetRealTime() const
virtual TList * GetListOfQueries(Option_t *opt="")
Ask the master for the list of queries.
TSocket * Select()
Return pointer to socket for which an event is waiting.
void SetManager(TProofMgr *mgr)
Set manager and schedule its destruction after this for clean operations.
static Int_t PoDCheckUrl(TString *_cluster)
This a private API function.
void Emit(const char *signal)
Acitvate signal without args.
virtual Int_t GetFile(const char *, const char *, const char *=0)
std::recursive_mutex fCloseMutex
void SetTermTime(Float_t termtime)
virtual TProof * AttachSession(Int_t, Bool_t=kFALSE)
Dummy version provided for completeness.
static TVirtualProofPlayer * Create(const char *player, TProof *p, TSocket *s=0)
Create a PROOF player.
virtual void SetStatus(Int_t st)
void ParseConfigField(const char *config)
The config file field may contain special instructions which need to be parsed at the beginning...
TProof()
Protected constructor to be used by classes deriving from TProof (they have to call Init themselves a...
virtual void ClearInput()=0
Int_t GetNumberOfInactiveSlaves() const
Return number of inactive slaves, i.e.
void AddInput(TObject *obj)
Add obj to the input list.
const char * GetObjName() const
virtual Double_t Rndm()
Machine independent random number generator.
const char * GetUser() const
Int_t CleanupSession(const char *sessiontag)
Send cleanup request for the session specified by tag.
void AddChain(TChain *chain)
Add chain to data set.
TMonitor * fUniqueMonitor
void SetMergedWorker()
Increase number of already merged workers by 1.
TFileInfoMeta * GetMetaData(const char *meta=0) const
Get meta data object with specified name.
const char *const kPROOF_ConfFile
Int_t RestoreActiveList()
Restore saved list of active workers.
const char * GetName() const
Returns name of object.
void cd(Int_t id=-1)
Set session with 'id' the default one.
Bool_t ElementsValid()
Check if all elements are valid.
Int_t Remove(Int_t query, Bool_t all=kFALSE)
Send remove request for the qry-th query in fQueries.
Int_t UploadPackage(const char *par, EUploadPackageOpt opt=kUntar, TList *workers=0)
Upload a PROOF archive (PAR file).
virtual void AddQueryResult(TQueryResult *q)=0
void SendAsynMessage(const char *msg, Bool_t lf=kTRUE)
Send an asychronous message to the master / client .
Int_t Archive(Int_t query, const char *url)
Send archive request for the qry-th query in fQueries.
Using a TBrowser one can browse all ROOT objects.
void InitMembers()
Default initializations.
Named parameter, streamable and storable.
friend class TProofInterruptHandler
static void Reset(const char *url, Bool_t hard=kFALSE)
Wrapper around TProofMgr::Reset(...).
Int_t Build(const char *pack, Int_t opt=TPackMgr::kCheckROOT)
Method to build a package.
virtual const char * GetBuildArch() const
Return the build architecture.
void DeleteDrawFeedback(TDrawFeedback *f)
Delete draw feedback object.
virtual void Find(const char *="~/", const char *=0, const char *=0)
const char *const kPROOF_TerminateWorker
void QueryResultReady(const char *ref)
Notify availability of a query result.
const TString GetUri() const
Returns the whole URI - an implementation of chapter 5.3 component recomposition. ...
virtual void HandleRecvHisto(TMessage *mess)=0
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
virtual TObject * FindObject(const char *name) const
Must be redefined in derived classes.
virtual void Setenv(const char *name, const char *value)
Set environment variable.
The purpose of this class is to provide a complete node description for masters, submasters and worke...
virtual Long64_t Finalize(Bool_t force=kFALSE, Bool_t sync=kFALSE)=0
virtual TObject * GetOutput(const char *name) const =0
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
const TString & GetNodeName() const
void Touch()
Ping PROOF slaves. Returns the number of slaves that responded.
Int_t DisablePackages()
Remove all packages.
virtual Bool_t RequestStagingDataSet(const char *dataset)
Allows users to request staging of a particular dataset.
R__EXTERN TSystem * gSystem
virtual Int_t Ping()
Ping the remote master or slave servers.
void Print(Option_t *option="") const
Print slave info.
Int_t GetQueryReference(Int_t qry, TString &ref)
Get reference for the qry-th query in fQueries (as displayed by ShowQueries).
const char * GetSessionDir() const
virtual TProof * CreateSession(const char *=0, const char *=0, Int_t=-1)
Create a new remote session (master and associated workers).
void ShowDataSetQuota(Option_t *opt=0)
shows the quota and usage of all groups if opt contains "U" shows also distribution of usage on user-...
TMonitor * fCurrentMonitor
Int_t BroadcastObject(const TObject *obj, Int_t kind, TList *slaves)
Broadcast an object to all slaves in the specified list.
Int_t Add(TFileInfo *info)
Add TFileInfo to the collection.
Long_t ExecPlugin(int nargs, const T &... params)
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
void Print(Option_t *option="") const
Prints the contents of the TFileCollection.
virtual void SetInitTime()=0
virtual TObject * Remove(TObject *obj)
Remove object from the list.
void ShowPackages(Bool_t all=kFALSE, Bool_t redirlog=kFALSE)
List contents of package directory.
const char *const kPROOF_ConfDir
void SaveActiveList()
Save current list of active workers.
virtual Bool_t InheritsFrom(const char *classname) const
Returns kTRUE if object inherits from class "classname".
Bool_t ParseUri(const char *uri, TString *dsGroup=0, TString *dsUser=0, TString *dsName=0, TString *dsTree=0, Bool_t onlyCurrent=kFALSE, Bool_t wildcards=kFALSE)
Parses a (relative) URI that describes a DataSet on the cluster.
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
static int push(struct mg_context *ctx, FILE *fp, SOCKET sock, SSL *ssl, const char *buf, int len, double timeout)
virtual void SetDynamicPath(const char *pathname)
Set the dynamic path to a new value.
TObject * Remove(TObject *obj)
Remove object from the list.
Collection abstract base class.
void SetActive(Bool_t=kTRUE)
Int_t BuildPackage(const char *package, EBuildPackageOpt opt=kBuildAll, Int_t chkveropt=TPackMgr::kCheckROOT, TList *workers=0)
Build specified package.
virtual void ClearDataSetCache(const char *dataset=0)
Clear the content of the dataset cache, if any (matching 'dataset', if defined).
void GetMaxQueries()
Get max number of queries whose full results are kept in the remote sandbox.
virtual Int_t VerifyDataSet(const char *dataset, const char *optStr="")
Verify if all files in the specified dataset are available.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
char * Form(const char *fmt,...)
TObject * GetOutput(const char *name)
Get specified object that has been produced during the processing (see Process()).
Class to steer the merging of files produced on the workers.
Bool_t IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
See if the data is ready to be analyzed.
This class implements a plugin library manager.
TSubString Strip(EStripType s=kTrailing, char c=' ') const
Return a substring of self stripped at beginning and/or end.
virtual void SetMerging(Bool_t on=kTRUE)=0
virtual Int_t Echo(const TObject *obj)
Sends an object to master and workers and expect them to send back a message with the output of its T...
virtual Int_t Stat(const char *, FileStat_t &, const char *=0)
static void AddEnvVar(const char *name, const char *value)
Add an variable to the list of environment variables passed to proofserv on the master and slaves...
static void ResetEnvVars()
Clear the list of environment variables passed to proofserv on the master and slaves.
Bool_t CheckFile(const char *file, TSlave *sl, Long_t modtime, Int_t cpopt=(kCp|kCpBin))
Check if a file needs to be send to the slave.
virtual Int_t Reset(Bool_t hard=kFALSE, const char *usr=0)
Send a cleanup request for the sessions associated with the current user.
Bool_t SetFragment(const TString &fragment)
Set fragment component of URI: fragment = *( pchar / "/" / "?" ).
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
virtual Long64_t Process(TDSet *dset, const char *selector, Option_t *option="", Long64_t nentries=-1, Long64_t firstentry=0)
Process a data set (TDSet) using the specified selector (.C) file or Tselector object Entry- or event...
Bool_t fProgressDialogStarted
virtual Int_t Exec(const char *shellcmd)
Execute a command.
const Int_t kPROOF_Protocol
Int_t SendGroupView()
Send to all active slaves servers the current slave group size and their unique id.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Int_t Exec(const char *cmd, ESlaves list, Bool_t plusMaster)
Send command to be executed on the PROOF master and/or slaves.
TPackMgr * GetPackMgr() const
write collection with single key
void SetStatus(ESlaveStatus stat)
Int_t RemoveWorkers(TList *wrks)
Used for shuting down the workres after a query is finished.
void SetName(const char *name)
Bool_t IsEqual(const TObject *obj) const
Used to compare slaveinfos by ordinal.
Int_t SendFile(const char *file, Int_t opt=(kBinary|kForward|kCp|kCpBin), const char *rfile=0, TSlave *sl=0)
Send a file to master or slave servers.
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
TList * GetEnabledPackages() const
R__EXTERN TRandom * gRandom
void RedirectWorker(TSocket *s, TSlave *sl, Int_t output_size)
Redirect output of worker sl to some merger.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
void PrintProgress(Long64_t total, Long64_t processed, Float_t procTime=-1., Long64_t bytesread=-1)
Print a progress bar on stderr. Used in batch mode.
void ReleaseMonitor(TMonitor *mon)
Release the used monitor to be used, making sure to delete newly created monitors.
virtual void Unsetenv(const char *name)
Unset environment variable.
void DataSetStatus(const char *msg, Bool_t status, Int_t done, Int_t total)
Send dataset preparation status.
void AddInput(TObject *obj)
Add objects that might be needed during the processing of the selector (see Process()).
virtual int ClosePipe(FILE *pipe)
Close the pipe.
TString GetString() const
virtual void ValidateDSet(TDSet *dset)
Validate a TDSet.
virtual TObjLink * FirstLink() const
const TString & GetMsd() const
const TString & GetOrdinal() const
#define R__LOCKGUARD2(mutex)
TMacro * GetLastLog()
Fill a TMacro with the log lines since the last reading (fLogFileR) Return (TMacro *)0 if no line was...
Int_t VerifyDataSetParallel(const char *uri, const char *optStr)
Internal function for parallel dataset verification used TProof::VerifyDataSet and TProofLite::Verify...
Int_t RemoveDynamicPath(const char *libpath, Bool_t onClient=kFALSE)
Remove 'libpath' from the lib path search.
static TSlave * Create(const char *url, const char *ord, Int_t perf, const char *image, TProof *proof, Int_t stype, const char *workdir, const char *msd, Int_t nwk=1)
Static method returning the appropriate TSlave object for the remote server.
void GoAsynchronous()
Send GOASYNC message to the master.
virtual void RemoveQueryResult(const char *ref)=0
virtual void SetAlias(const char *alias)
Set an alias for this session.
void ClearInputData(TObject *obj=0)
Remove obj form the input data list; if obj is null (default), clear the input data info...
Int_t SendCommand(const char *cmd, ESlaves list=kActive)
Send command to be executed on the PROOF master and/or slaves.
static unsigned int total
Int_t AssertPath(const char *path, Bool_t writable)
Make sure that 'path' exists; if 'writable' is kTRUE, make also sure that the path is writable...
virtual Int_t RedirectOutput(const char *name, const char *mode="a", RedirectHandle_t *h=0)
Redirect standard output (stdout, stderr) to the specified file.
Int_t LoadPackage(const char *package, Bool_t notOnClient=kFALSE, TList *loadopts=0, TList *workers=0)
Load specified package.
void SetHost(const char *host)
TString & Remove(Ssiz_t pos)
The packetizer is a load balancing object created for each query.
TList * GetInputList()
Get input list.
void NotifyLogMsg(const char *msg, const char *sfx="\)
Notify locally 'msg' to the appropriate units (file, stdout, window) If defined, 'sfx' is added after...
virtual int Chmod(const char *file, UInt_t mode)
Set the file permission bits. Returns -1 in case or error, 0 otherwise.
Int_t HandleOutputOptions(TString &opt, TString &target, Int_t action)
Extract from opt information about output handling settings.
Bool_t AreAllWorkersAssigned()
Return if the determined number of workers has been already assigned to this merger.
Class used by TMap to store (key,value) pairs.
virtual TSignalHandler * RemoveSignalHandler(TSignalHandler *sh)
Remove a signal handler from list of signal handlers.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
void Close(Option_t *option="")
Close all open slave servers.
virtual const char * GetIncludePath()
Get the list of include path.
void SetFeedback(TString &opt, TString &optfb, Int_t action)
Extract from opt in optfb information about wanted feedback settings.
virtual Bool_t IsOpen() const
Returns kTRUE in case file is open and kFALSE if file is not open.
void SetDefaultTreeName(const char *treeName)
const char *const kPROOF_InputDataFile
void ShowFeedback() const
Show items in feedback list.
TDrawFeedback * CreateDrawFeedback()
Draw feedback creation proxy.
virtual Long64_t GetEventsProcessed() const =0
void SetSysInfo(SysInfo_t si)
Setter for fSysInfo.
TVirtualProofPlayer * GetPlayer() const
void DeleteParameters(const char *wildcard)
Delete the input list parameters specified by a wildcard (e.g.
Int_t SetParallelSilent(Int_t nodes, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
Int_t GoParallel(Int_t nodes, Bool_t accept=kFALSE, Bool_t random=kFALSE)
Go in parallel mode with at most "nodes" slaves.
TSignalHandler * GetSignalHandler() const
void SetObjName(const char *objname)
Set/change object name.
virtual const char * HostName()
Return the system's host name.
virtual void ls(Option_t *option="") const
List (ls) all objects in this collection.
Ssiz_t Last(char c) const
Find last occurrence of a character c.
virtual void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)=0
static Int_t GetInputData(TList *input, const char *cachedir, TString &emsg)
Get the input data from the file defined in the input list.
void ClearInput()
Clear input object list.
Bool_t IsFinalized() const
const char * GetMaster() const
virtual void SetEntryList(TObject *aList)
Set entry (or event) list for this data set.
EQueryMode GetQueryMode(Option_t *mode=0) const
Find out the query mode based on the current setting and 'mode'.
virtual void Remove()
Remove signal handler from system signal handler list.
void Feedback(TList *objs)
Get list of feedback objects.
Int_t GetParallel() const
Returns number of slaves active in parallel mode.
TList * fNonUniqueMasters
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
virtual Long64_t GetSize() const
Returns the current file size.
Int_t GetNumberOfBadSlaves() const
Return number of bad slaves.
virtual void SendInputDataFile()
Send the input data objects to the master; the objects are taken from the dedicated list and / or the...
TList * fEnabledPackagesOnCluster
This class controls a Parallel ROOT Facility, PROOF, cluster.
virtual Bool_t IsValid() const
static RooMathCoreReg dummy
void Detach(Option_t *opt="")
Detach this instance to its proofserv.
virtual void SetInputList(TList *in, Bool_t adopt=kTRUE)
Set / change the input list.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual TQueryResult * GetCurrentQuery() const =0
void SetRunStatus(ERunStatus rst)
const char * GetFileName() const
static Int_t RegisterGlobalPath(const char *paths)
Parse one or more paths as possible sources of packages Returns number of paths added; or -1 in case ...
virtual TTree * GetTreeHeader(TDSet *tdset)
Creates a tree header (a tree with nonexisting files) object for the DataSet.
const char * GetWorkDir() const
int CompareTo(const char *cs, ECaseCompare cmp=kExact) const
Compare a string to char *cs2.
void ShowMissingFiles(TQueryResult *qr=0)
Show information about missing files during query described by 'qr' or the last query if qr is null (...
virtual void StopProcess(Bool_t abort, Int_t timeout=-1)=0
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Int_t GetSandbox(TString &sb, Bool_t assert=kFALSE, const char *rc=0)
Set the sandbox path from ' Proof.Sandbox' or the alternative var 'rc'.
void ShowQueries(Option_t *opt="")
Ask the master for the list of queries.
Int_t AddWorkers(TList *wrks)
Works on the master node only.
const char * GetType() const
void SetArchived(const char *archfile)
Set (or update) query in archived state.
virtual void Remove()
Remove file event handler from system file handler list.
Bool_t AreAllWorkersMerged()
Return if merger has already merged all workers, i.e. if it has finished its merging job...
Long64_t Finalize(Int_t query=-1, Bool_t force=kFALSE)
Finalize the qry-th query in fQueries.
Int_t Match(const TString &s, UInt_t start=0)
Runs a match on s against the regex 'this' was created with.
virtual void AddAfter(const TObject *after, TObject *obj)
Insert object after object after in the list.
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
TString Getenv(const char *env, const char *ord="0")
Get value of environment variable 'env' on node 'ord'.
Int_t UnloadPackage(const char *package)
Unload specified package.
Mother of all ROOT objects.
virtual TObject * Clone(const char *newname="") const
Make a clone of an object using the Streamer facility.
virtual const char * GetTitle() const
Returns title of object.
void CleanGDirectory(TList *ol)
Remove links to objects in list 'ol' from gDirectory.
TList * GetListOfEnabledPackages()
Get from the master the list of names of the packages enabled.
TString fActiveSlavesSaved
const char * GetImage() const
virtual Bool_t Add(const char *file, const char *objname=0, const char *dir=0, Long64_t first=0, Long64_t num=-1, const char *msd=0)
Add file to list of files to be analyzed.
Float_t GetCpuTime() const
void ShowEnabledPackages(Bool_t all=kFALSE)
List which packages are enabled.
Int_t GetRC(const char *RCenv, Int_t &env, const char *ord="0")
Get into 'env' the value of integer RC env variable 'rcenv' on node 'ord'.
Bool_t R_ISDIR(Int_t mode)
static void SetMacroPath(const char *newpath)
Set or extend the macro search path.
R__EXTERN TProofServ * gProofServ
const TString & GetWorkDir() const
Int_t UploadDataSetFromFile(const char *, const char *, const char *=0, Int_t=0, TList *=0)
*** This function is deprecated and will disappear in future versions *** *** It is just a wrapper ar...
virtual void Add(TObject *obj)
virtual TList * QuerySessions(Option_t *opt="S")
Get list of sessions accessible to this manager.
Wrapper for PCRE library (Perl Compatible Regular Expressions).
Class that contains a list of TFileInfo's and accumulated meta data information about its entries...
virtual void RecursiveRemove(TObject *obj)
Remove object from this collection and recursively remove the object from all other objects (and coll...
TMap * GetDataSetQuota(const char *optStr="")
returns a map of the quotas of all groups
Utility class to draw objects in the feedback list during queries.
Int_t GetActiveMergersCount()
Get the active mergers count.
TProofOutputList fOutputList
const char * GetWorkDir() const
SysInfo_t GetSysInfo() const
virtual TList * GetListOfKeys() const
Int_t Ping()
Ping PROOF. Returns 1 if master server responded.
const char * GetMsd() const
R__EXTERN Int_t gProofDebugLevel
A chain is a collection of files containg TTree objects.
void WriteObject(const TObject *obj)
Write object to message buffer.
void SetOptions(const char *opt)
void AskStatistics()
Ask the for the statistics of the slaves.
const char * GetImage() const
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Int_t GetRemoteProtocol() const
static Int_t SaveInputData(TQueryResult *qr, const char *cachedir, TString &emsg)
Save input data file from 'cachedir' into the sandbox or create a the file with input data objects...
const char * GetUser() const
static TList * ParseDataSetSrvMaps(const TString &srvmaps)
Create a server mapping list from the content of 'srvmaps' Return the list (owned by the caller) or 0...
void ClearFeedback()
Clear feedback list.
virtual TObject * FindObject(const char *name) const
Find an object in this collection using its name.
Int_t SetParallel(Int_t nodes=-1, Bool_t random=kFALSE)
Tell PROOF how many slaves to use in parallel.
virtual int CopyFile(const char *from, const char *to, Bool_t overwrite=kFALSE)
Copy a file.
void AddWorker(TSlave *sl)
Add new worker to the list of workers to be merged by this merger.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Int_t Remove(const char *pack=0, Bool_t dolock=kTRUE)
Remove package 'pack' If 'pack' is null or empty all packages are cleared.
Int_t Atoi() const
Return integer value of string.
Int_t AddDynamicPath(const char *libpath, Bool_t onClient=kFALSE, TList *wrks=0, Bool_t doCollect=kTRUE)
Add 'libpath' to the lib path search.
TInetAddress GetInetAddress() const
Bool_t IsParallel() const
TList * GetListOfPackages()
Get from the master the list of names of the packages available.
virtual void AddIncludePath(const char *includePath)
Add includePath to the already set include path.
const char * GetUser() const
Double_t Atof() const
Return floating-point value contained in string.
virtual Int_t AddOutputObject(TObject *obj)=0
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
TList * fAvailablePackages
A TTree object has a header with a name and a title.
const AParamType & GetVal() const
void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
Set draw feedback option.
static void ResetErrno()
Static function resetting system error number.
Class describing a generic file including meta information.
Int_t SendInitialState()
Transfer the initial (i.e.
const char * GetName() const
Returns name of object.
Int_t Retrieve(Int_t query, const char *path=0)
Send retrieve request for the qry-th query in fQueries.
TProofMgr::EServType fServType
void AttachList(TList *alist)
Attach to list 'alist'.
virtual Bool_t ExpandPathName(TString &path)
Expand a pathname getting rid of special shell characters like ~.
virtual void ActivateAll()
Activate all de-activated sockets.
Abstract interface for the PROOF player.
TList * fTerminatedSlaveInfos
Int_t BroadcastGroupPriority(const char *grp, Int_t priority, ESlaves list=kAllUnique)
Broadcast the group priority to all workers in the specified list.
void GetStatistics(Bool_t verbose=kFALSE)
Get statistics about CPU time, real time and bytes read.
TList * FindDataSets(const char *searchString, const char *optStr="")
Find datasets, returns in a TList all found datasets.
Bool_t Prompt(const char *p)
Prompt the question 'p' requiring an answer y,Y,n,N Return kTRUE is the answer was y or Y...
Double_t Sqrt(Double_t x)
virtual void ShowStagingStatusDataSet(const char *dataset, const char *optStr="filter:SsCc")
Like GetStagingStatusDataSet, but displays results immediately.
virtual Int_t RemoveDataSet(const char *dataset, const char *optStr="")
Remove the specified dataset from the PROOF cluster.
void AskForOutput(TSlave *sl)
Master asks for output from worker sl.
const Bool_t kIterBackward
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
static TProofMgr * Create(const char *url, Int_t loglevel=-1, const char *alias=0, Bool_t xpd=kTRUE)
Static method returning the appropriate TProofMgr object using the plugin manager.
virtual Int_t Load(const char *macro, Bool_t notOnClient=kFALSE, Bool_t uniqueOnly=kTRUE, TList *wrks=0)
Load the specified macro on master, workers and, if notOnClient is kFALSE, on the client...
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
Class describing a PROOF worker server.
const char * GetUser() const
Container class for processing statistics.
virtual Int_t SetDataSetTreeName(const char *dataset, const char *treename)
Set/Change the name of the default tree.
A TSelector object is used by the TTree::Draw, TTree::Scan, TTree::Process to navigate in a TTree and...
const char * GetObjName() const
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
void GetLog(Int_t start=-1, Int_t end=-1)
Ask for remote logs in the range [start, end].
void ResetProgressDialog(const char *sel, Int_t sz, Long64_t fst, Long64_t ent)
Reset progress dialog.
Int_t ClearPackage(const char *package)
Remove a specific package.
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Int_t SendPrint(Option_t *option="")
Send print command to master server.
void StartupMessage(const char *msg, Bool_t status, Int_t done, Int_t total)
Send startup message.
A List of entry numbers in a TTree or TChain.
Int_t DeactivateWorker(const char *ord, Bool_t save=kTRUE)
Remove the worker identified by the ordinal number 'ord' from the the active list.
TList * GetListOfActiveSlaves() const
const char * GetOrdinal() const
virtual Int_t SetupServ(Int_t stype, const char *conffile)
Init a PROOF slave object.
static void DelEnvVar(const char *name)
Remove an variable from the list of environment variables passed to proofserv on the master and slave...
TMonitor * fActiveMonitor
virtual Int_t Write(const char *name=0, Int_t option=0, Int_t bufsize=0)
Write all objects in this collection.
virtual TList * GetListOfResults() const =0
TList * GetFeedbackList() const
Return feedback list.
Int_t GetNumberOfUniqueSlaves() const
Return number of unique slaves, i.e.
void Browse(TBrowser *b)
Build the PROOF's structure in the browser.
virtual TObjString * AddLine(const char *text)
Add line with text in the list of lines of this macro.
virtual void UpdateAutoBin(const char *name, Double_t &xmin, Double_t &xmax, Double_t &ymin, Double_t &ymax, Double_t &zmin, Double_t &zmax)=0
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual const char * GetName() const
Return name of this collection.
Int_t CollectInputFrom(TSocket *s, Int_t endtype=-1, Bool_t deactonfail=kFALSE)
Collect and analyze available input from socket s.
virtual TDrawFeedback * CreateDrawFeedback(TProof *p)=0
Long64_t fLastPollWorkers_s
virtual void SetIncludePath(const char *includePath)
IncludePath should contain the list of compiler flags to indicate where to find user defined header f...
Int_t BroadcastFile(const char *file, Int_t opt, const char *rfile, TList *wrks)
Broadcast file to all workers in the specified list.
virtual const char * GetTitle() const
Returns title of object.
const char * GetPrefix() const
static Int_t SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
Send the input data file to the workers.
const char * GetDirectory() const
void HandleSubmerger(TMessage *mess, TSlave *sl)
Process a message of type kPROOF_SUBMERGER.
virtual void Close(Option_t *option="")
Close a file.
const char * Data() const
void SetOrdinal(const char *ord)
const char *const kPROOF_PackDir
void SetInputHandler(TFileHandler *ih)
Adopt and register input handler for this slave.
static Int_t AssertDataSet(TDSet *dset, TList *input, TDataSetManager *mgr, TString &emsg)
Make sure that dataset is in the form to be processed.
Bool_t CreateMerger(TSlave *sl, Int_t port)
Create a new merger.