50# include <sys/types.h>
57#include "RConfigure.h"
133 a = Getline(
"\nSwitch to asynchronous mode not supported remotely:"
134 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
136 a = Getline(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
137 " any other key to continue: ");
139 if (
a[0] ==
'Q' ||
a[0] ==
'S' ||
a[0] ==
'q' ||
a[0] ==
's') {
141 Info(
"Notify",
"Processing interrupt signal ... %c",
a[0]);
162 fSocket(s), fProof(
p)
193 while (myord && otherord) {
194 Int_t myval = atoi(myord);
195 Int_t otherval = atoi(otherord);
196 if (myval < otherval)
return 1;
197 if (myval > otherval)
return -1;
198 myord = strchr(myord,
'.');
200 otherord = strchr(otherord,
'.');
201 if (otherord) otherord++;
203 if (myord)
return -1;
204 if (otherord)
return 1;
301 Error(
"SetMergedWorker",
"all workers have been already merged before!");
314 Error(
"AddWorker",
"all workers have been already assigned to this merger");
368 if( 0 == _cluster->
Length() ) {
369 Error(
"PoDCheckUrl",
"PoD server is not running");
409 if (!masterurl || strlen(masterurl) <= 0) {
412 }
else if (!(strstr(masterurl,
"://"))) {
449 }
else if (
fMaster ==
"prooflite") {
458 Init(masterurl, conffile, confdir, loglevel, alias);
463 if (
Exec(
"gProofServ->GetUser()",
"0",
kTRUE) == 0) {
470 emsg =
"could not find 'const char *' string in macro log";
473 emsg =
"could not retrieve user info";
482 Warning(
"TProof",
"%s: using local default %s", emsg.
Data(), usr.
Data());
492 gROOT->GetListOfSockets()->Remove(mgr);
493 gROOT->GetListOfSockets()->Add(mgr);
498 if (!
gROOT->GetListOfProofs()->FindObject(
this))
499 gROOT->GetListOfProofs()->Add(
this);
518 if (!
gROOT->GetListOfProofs()->FindObject(
this))
519 gROOT->GetListOfProofs()->Add(
this);
626 while (envs.Tokenize(env, from,
",")) {
629 Warning(
"Init",
"request for sending over undefined environemnt variable '%s' - ignoring", env.
Data());
631 if (!envsfound.
IsNull()) envsfound +=
",";
639 Warning(
"Init",
"none of the requested env variables were found: '%s'", envs.Data());
641 Info(
"Init",
"the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.
Data());
730 gROOT->GetListOfProofs()->Remove(
this);
746 Emit(
"CloseWindow()");
758 const char *confdir,
Int_t loglevel,
const char *alias)
782 if (!conffile || !conffile[0])
784 if (!confdir || !confdir[0])
840 Error(
"Init",
"could not create temporary logfile");
842 Error(
"Init",
"could not open temp logfile for reading");
904 if (enableSchemaEvolution) {
907 Info(
"TProof",
"automatic schema evolution in TMessage explicitly disabled");
917 Error(
"Init",
"failure asserting sandbox directory %s", sandbox.
Data());
926 Error(
"Init",
"failure asserting directory %s", packdir.
Data());
931 Info(
"Init",
"package directory set to %s", packdir.
Data());
940 Info(
"Init",
" %d global package directories registered", nglb);
962 GetRC(
"Proof.DynamicStartup", dyn);
1002 gROOT->GetListOfSockets()->Add(
this);
1027 }
else if (sb ==
"..") {
1052 const char *cq = (
IsLite()) ?
"\"" :
"";
1053 while (sconf.Tokenize(opt, from,
",")) {
1054 if (opt.
IsNull())
continue;
1061 TString mst, top, sub, wrk, all;
1066 all =
n->GetTitle();
1068 mst =
n->GetTitle();
1070 top =
n->GetTitle();
1072 sub =
n->GetTitle();
1074 wrk =
n->GetTitle();
1076 if (all !=
"" && mst ==
"") mst = all;
1077 if (all !=
"" && top ==
"") top = all;
1078 if (all !=
"" && sub ==
"") sub = all;
1079 if (all !=
"" && wrk ==
"") wrk = all;
1080 if (all !=
"" && all.
BeginsWith(
"valgrind_opts:")) {
1082 Info(
"ParseConfigField",
"valgrind run: resetting 'PROOF_WRAPPERCMD':"
1083 " must be set again for next run , if any");
1087 cmd.
Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1088 TString mstlab(
"NO"), wrklab(
"NO");
1089 Bool_t doMaster = (opt ==
"valgrind" || (opt.
Contains(
"master") &&
1095 if (mst ==
"" || mst.
BeginsWith(
"valgrind_opts:")) {
1097 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), mst.
Data());
1100 }
else if (mst !=
"") {
1106 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1108 if (!opt.
Contains(
"workers"))
return;
1110 if (opt ==
"valgrind" || opt ==
"valgrind=") opt =
"valgrind=workers";
1115 if (top ==
"" || top.
BeginsWith(
"valgrind_opts:")) {
1117 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), top.
Data());
1120 }
else if (top !=
"") {
1126 if (sub ==
"" || sub.
BeginsWith(
"valgrind_opts:")) {
1128 var.
Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.
Data(), sub.
Data());
1131 }
else if (sub !=
"") {
1137 if (wrk ==
"" || wrk.
BeginsWith(
"valgrind_opts:")) {
1139 var.
Form(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.
Data(), wrk.
Data(), cq);
1144 nwrks = opt(inw+1, opt.
Length());
1145 if (!nwrks.
IsDigit()) nwrks =
"2";
1157 }
else if (wrk !=
"") {
1171 Printf(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.
Data());
1173 Printf(
" ---> Starting a debug run with valgrind (workers:%s)", wrklab.
Data());
1175 Printf(
" ---> Please be patient: startup may be VERY slow ...");
1176 Printf(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1177 Printf(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1186 Printf(
"*** Requested IgProf performance profiling ***");
1187 TString addLogExt =
"__igprof.pp__.log";
1188 TString addLogFmt =
"igprof -pk -pp -t proofserv.exe -o %s.%s";
1196 tmp.
Form(addLogFmt.
Data(),
"<logfilemst>", addLogExt.
Data());
1199 tmp.
Form(addLogFmt.
Data(),
"<logfilewrk>", addLogExt.
Data());
1225 if ((
c !=
'+') && ((
c <
'0') || (
c >
'9')))
1246 if (
IsLite() && cpuPin) {
1247 Printf(
"*** Requested CPU pinning ***");
1249 const char *pinCmd =
"taskset -c <cpupin>";
1252 if (ev && (
p =
dynamic_cast<TNamed *
>(ev->
FindObject(
"PROOF_SLAVE_WRAPPERCMD")))) {
1258 val.
Form(
"\"%s\"", pinCmd);
1271 if (!inpath || strlen(inpath) <= 0) {
1272 Error(
"AssertPath",
"undefined input path");
1281 Error(
"AssertPath",
"could not create path %s", path.
Data());
1288 Error(
"AssertPath",
"could not make path %s writable", path.
Data());
1307 gROOT->GetListOfSockets()->Remove(mgr);
1308 gROOT->GetListOfSockets()->Add(mgr);
1323 Error(
"AddWorkers",
"AddWorkers can only be called on the master!");
1327 if (!workerList || !(workerList->
GetSize())) {
1328 Error(
"AddWorkers",
"empty list of workers!");
1349 if (!addedWorkers) {
1351 Error(
"AddWorkers",
"cannot create new list for the workers to be added");
1359 while ((to = next())) {
1408 addedWorkers->
Add(slave);
1416 Info(
"AddWorkers",
"worker on host %s created"
1422 m <<
TString(
"Opening connections to workers") << nSlaves
1423 << nSlavesDone << slaveOk;
1437 TIter nxsl(addedWorkers);
1439 while ((sl = (
TSlave *) nxsl())) {
1450 Info(
"AddWorkers",
"worker on host %s finalized"
1460 m <<
TString(
"Setting up worker servers") << nSlaves
1461 << nSlavesDone << slaveOk;
1480 Info(
"AddWorkers",
"will invoke GoMoreParallel()");
1483 Info(
"AddWorkers",
"GoMoreParallel()=%d", nw);
1489 Info(
"AddWorkers",
"will invoke GoParallel()");
1498 Info(
"AddWorkers",
"will invoke SaveWorkerInfo()");
1504 Info(
"AddWorkers",
"will invoke SendParallel()");
1507 if (goMoreParallel &&
fPlayer) {
1512 Info(
"AddWorkers",
"will send the PROCESS message to selected workers");
1520 delete addedWorkers;
1533 if (packs && packs->
GetSize() > 0) {
1536 while ((pck = (
TPair *) nxp())) {
1542 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on added workers");
1547 Info(
"SetupWorkersEnv",
"will invoke UploadPackage() and EnablePackage() on all workers");
1556 delete server_packs;
1565 Info(
"SetupWorkersEnv",
"will invoke Load() on selected workers");
1577 Info(
"SetupWorkersEnv",
"will invoke AddDynamicPath() on selected workers");
1585 Info(
"SetupWorkersEnv",
"will invoke AddIncludePath() on selected workers");
1600 Error(
"RemoveWorkers",
"RemoveWorkers can only be called on the master!");
1610 while ((sl = (
TSlave *) nxsl())) {
1616 if (!(workerList->
GetSize())) {
1617 Error(
"RemoveWorkers",
"The list of workers should not be empty!");
1625 while ((to = next())) {
1627 if (!strcmp(to->
ClassName(),
"TProofNodeInfo")) {
1631 while ((sl = (
TSlave *) nxsl())) {
1639 Warning(
"RemoveWorkers",
"unknown object type: %s - it should be"
1640 " TProofNodeInfo or inheriting from TSlave", to->
ClassName());
1670 TString emsg(
"no resource currently available for this session: please retry later");
1682 Printf(
"Starting master: opening connection ...");
1688 fprintf(stderr,
"Starting master:"
1689 " connection open: setting up server ... \r");
1703 Printf(
"Starting master: OK ");
1709 Error(
"StartSlaves",
1710 "client and remote protocols not compatible (%d and %d)",
1729 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1732 if (slStatus == -99)
1733 Error(
"StartSlaves",
"no resources available or problems setting up workers (check logs)");
1734 else if (slStatus == -98)
1735 Error(
"StartSlaves",
"could not setup output redirection on master");
1737 Error(
"StartSlaves",
"setting up master");
1748 Error(
"StartSlaves",
1749 "failed to setup connection with PROOF master server");
1755 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1761 Printf(
"Starting master: failure");
1766 Printf(
"Starting master: OK ");
1771 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1784 Error(
"StartSlaves",
"failed to create (or connect to) the PROOF master server");
1800 { std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
1809 while ((sl = (
TSlave *)nxs()))
1823 gROOT->GetListOfSockets()->Remove(
this);
1835 gROOT->GetListOfProofs()->Remove(
this);
1854 Int_t perf,
const char *image,
const char *workdir)
1876 const char *image,
const char *msd,
Int_t nwk)
1896 while ((sl = (
TSlave *)next())) {
1923 if (
fImage == sl->fImage) {
1933 TSlave *replace_slave = 0;
1936 if (sl->fImage == sl2->fImage) {
1941 replace_slave = sl2;
1948 Error(
"FindUniqueSlaves",
"TSlave is neither Master nor Slave");
1961 if (replace_slave) {
2053 if (s.
Contains(
"Total MB's processed:")) {
2056 }
else if (s.
Contains(
"Total real time used (s):")) {
2057 s.
ReplaceAll(
"Total real time used (s):",
"");
2059 }
else if (s.
Contains(
"Total CPU time used (s):")) {
2060 s.
ReplaceAll(
"Total CPU time used (s):",
"");
2069 Printf(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2173 Printf(
"+++ Options: \"A\" show all queries known to server");
2174 Printf(
"+++ \"L\" show retrieved queries");
2175 Printf(
"+++ \"F\" full listing of query info");
2176 Printf(
"+++ \"H\" print this menu");
2178 Printf(
"+++ (case insensitive)");
2180 Printf(
"+++ Use Retrieve(<#>) to retrieve the full"
2181 " query results from the master");
2182 Printf(
"+++ e.g. Retrieve(8)");
2212 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2214 while ((pq = nxq()))
2221 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2228 Printf(
"+++ Queries available locally: %d", listlocal->
GetSize());
2229 TIter nxlq(listlocal);
2230 while ((pq = nxlq()))
2246 while (
TSlave *sl =
dynamic_cast<TSlave*
>(nextSlave())) {
2256 if (submasters.
GetSize() > 0) {
2264 EmitVA(
"IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2267 Info(
"IsDataReady",
"%lld / %lld (%s)",
2268 bytesready, totalbytes,
fDataReady?
"READY":
"NOT READY");
2286 if (slaves->
GetSize() == 0)
return;
2291 while ((sl = (
TSlave *)next())) {
2310 Int_t nparallel = 0;
2311 while (
TSlave* sl =
dynamic_cast<TSlave*
>(nextSlave()))
2312 if (sl->GetParallel() >= 0)
2313 nparallel += sl->GetParallel();
2336 while ((slave = (
TSlave *) next()) != 0) {
2346 while ((activeslave = (
TSlave *) nextactive())) {
2355 while ((badslave = (
TSlave *) nextbad())) {
2364 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2372 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2377 Error(
"GetSlaveInfo",
"TSlave is neither Master nor Slave");
2398 while ((sl = (
TSlave*) next())) {
2428 if (workers->
GetSize() == 0)
return 0;
2431 TIter next(workers);
2434 while ((wrk = (
TSlave *)next())) {
2437 MarkBad(wrk,
"could not send group priority");
2479 if (!slaves || slaves->
GetSize() == 0)
return 0;
2485 while ((sl = (
TSlave *)next())) {
2488 MarkBad(sl,
"could not broadcast request");
2571 if (slaves->
GetSize() == 0)
return 0;
2577 while ((sl = (
TSlave *)next())) {
2580 MarkBad(sl,
"could not send broadcast-raw request");
2614 if (wrks->
GetSize() == 0)
return 0;
2620 while ((wrk = (
TSlave *)next())) {
2623 Error(
"BroadcastFile",
2624 "problems sending file to worker %s (%s)",
2684 rc =
Collect(mon, timeout, endtype, deactonfail);
2710 while ((sl = (
TSlave*) next())) {
2715 rc =
Collect(mon, timeout, endtype, deactonfail);
2742 rc =
Collect(mon, timeout, endtype, deactonfail);
2761 Info(
"Collect",
">>>>>> Entering collect responses #%04d", collectId);
2791 int cnt = 0, rc = 0;
2796 Info(
"Collect",
"#%04d: active: %d", collectId, mon->
GetActive());
2808 while ((nact = mon->
GetActive(sto)) && (nto < 0 || nto > 0)) {
2814 if (al && al->
GetSize() > 0) {
2815 Info(
"Collect",
" %d node(s) still active:", al->
GetSize());
2818 while ((xs = (
TSocket *)nxs())) {
2840 Info(
"Collect",
"#%04d: now active: %d", collectId, mon->
GetActive());
2845 Info(
"Collect",
"Will invoke Select() #%04d", collectId);
2848 if (s && s != (
TSocket *)(-1)) {
2851 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2856 Info(
"Collect",
"#%04d: deactivating %p (active: %d, %p)", collectId,
2860 }
else if (rc == 2) {
2867 Info(
"Collect",
"save monitor: deactivating %p (active: %d, %p)",
2885 if (s == (
TSocket *)(-1) && nto > 0)
2897 while (mxws && (wrk = (
TSlave *) nxwr())) {
2902 Info(
"Collect",
"worker %s was asked to send its output to master",
2927 if (al && al->
GetSize() > 0) {
2929 Info(
"Collect",
" %d node(s) went in timeout:", al->
GetSize());
2932 while ((xs = (
TSocket *)nxs())) {
2961 Info(
"Collect",
"<<<<<< Exiting collect responses #%04d", collectId);
2978 Error(
"PollForNewWorkers",
"Can't invoke: not on a master -- should not happen!");
2982 Error(
"PollForNewWorkers",
"No ProofServ available -- should not happen!");
2992 TIter next(reqWorkers);
3003 while (( sl =
dynamic_cast<TSlave *
>(nextInner()) )) {
3010 if (found)
delete ni;
3012 newWorkers->
Add(ni);
3014 Info(
"PollForNewWorkers",
"New worker found: %s:%s",
3024 if (nNewWorkers > 0) {
3026 Info(
"PollForNewWorkers",
"Requesting to add %d new worker(s)", newWorkers->
GetEntries());
3029 Error(
"PollForNewWorkers",
"Call to AddWorkers() failed (got %d < 0)", rv);
3036 Info(
"PollForNewWorkers",
"No new worker found");
3065 if ((recvrc = s->
Recv(mess)) < 0) {
3067 Info(
"CollectInputFrom",
"%p: got %d from Recv()", s, recvrc);
3078 MarkBad(s,
"problems receiving a message in TProof::CollectInputFrom(...)");
3084 MarkBad(s,
"undefined message in TProof::CollectInputFrom(...)");
3092 if (rc == 1 && (endtype >= 0) && (
what != endtype))
3112 Warning(
"HandleInputMessage",
"given an empty message or undefined worker");
3118 Warning(
"HandleInputMessage",
"worker socket is undefined");
3145 MarkBad(s,
"received kPROOF_FATAL");
3158 Info(
"HandleInputMessage",
"received kPROOF_STOP from %s: disabling any further collection this worker",
3190 Info(
"HandleInputMessage",
"%s: kPROOF_GETPACKET", sl->
GetOrdinal());
3240 Info(
"HandleInputMessage",
"%s: kPROOF_LOGDONE: status %d parallel %d",
3274 Info(
"HandleInputMessage",
3290 rc = (async) ? 0 : 1;
3315 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: enter");
3325 Error(
"HandleInputMessage",
3326 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3335 Error(
"HandleInputMessage",
3336 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3340 Error(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: unknown type: %d",
type);
3353 Info(
"HandleInputMessage",
"kPROOF_SENDOUTPUT: enter (%s)", sl->
GetOrdinal());
3369 Info(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: enter");
3373 Info(
"HandleInputMessage",
"finalization on %s started ...", prefix);
3399 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: query result missing");
3401 }
else if (
type > 0) {
3411 }
else if (
IsTty() || changed) {
3412 fprintf(stderr,
"%s\r", msg.
Data());
3432 while ((xo = nxin()))
3444 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: player undefined!");
3455 Info(
"HandleInputMessage",
"%s: kPROOF_OUTPUTLIST: enter", sl->
GetOrdinal());
3477 Info(
"HandleInputMessage",
3478 "%s: kPROOF_OUTPUTLIST: query result missing", sl->
GetOrdinal());
3487 Info(
"HandleInputMessage",
3488 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->
GetOrdinal());
3492 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->
GetOrdinal());
3502 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYLIST: enter");
3515 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_RETRIEVE: enter");
3524 Info(
"HandleInputMessage",
3525 "kPROOF_RETRIEVE: query result missing or player undefined");
3532 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MAXQUERIES: enter");
3536 Printf(
"Number of queries fully kept remotely: %d", max);
3542 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SERVERSTARTED: enter");
3544 UInt_t tot = 0, done = 0;
3548 (*mess) >> action >> tot >> done >> st;
3555 char msg[512] = {0};
3557 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3560 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3561 action.
Data(), done, tot, frac);
3564 fprintf(stderr,
"%s", msg);
3574 m << action << tot << done << st;
3582 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATASET_STATUS: enter");
3584 UInt_t tot = 0, done = 0;
3588 (*mess) >> action >> tot >> done >> st;
3594 char msg[512] = {0};
3596 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3599 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3600 action.
Data(), done, tot, frac);
3603 fprintf(stderr,
"%s", msg);
3613 m << action << tot << done << st;
3621 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STARTPROCESS: enter");
3641 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"Preparation time: %f s",
fPrepTime);
3646 (*mess) >> selec >> dsz >>
first >> nent;
3648 if (!
gROOT->IsBatch()) {
3667 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_ENDINIT: enter");
3686 Info(
"HandleInputMessage",
"%s: got kPROOF_SETIDLE", sl->
GetOrdinal());
3689 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3703 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_QUERYSUBMITTED: enter");
3728 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SESSIONTAG: enter");
3751 Info(
"HandleInputMessage",
"kPROOF_FEEDBACK: enter");
3764 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_AUTOBIN: enter");
3783 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_PROGRESS: enter");
3792 Float_t initTime, procTime, evtrti, mbrti;
3793 (*mess) >>
total >> processed >> bytesread
3794 >> initTime >> procTime
3798 initTime, procTime, evtrti, mbrti);
3803 (*mess) >>
total >> processed;
3817 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_STOPPROCESS: enter");
3824 (*mess) >> status >> abort;
3826 (*mess) >> events >> abort;
3832 TList *listOfMissingFiles = 0;
3833 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
3834 listOfMissingFiles =
new TList();
3835 listOfMissingFiles->
SetName(
"MissingFiles");
3854 Emit(
"StopProcess(Bool_t)", abort);
3860 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_SUBMERGER: enter");
3867 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: enter");
3874 Error(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: no list received!");
3918 Info(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: enter");
3922 Error(
"HandleInputMessage",
"kPROOF_VALIDATE_DSET: fDSet not set");
3931 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_DATA_READY: enter");
3934 (*mess) >> dataready >> totalbytes >> bytesready;
3947 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_MESSAGE: enter");
3960 fprintf(stderr,
"%s%c", msg.
Data(), (lfeed ?
'\n' :
'\r'));
3968 fprintf(stderr,
"%s%c", msg.
Data(), (lfeed ?
'\n' :
'\r'));
3984 PDB(kGlobal,2)
Info(
"HandleInputMessage",
"kPROOF_VERSARCHCOMP: %s", vac.
Data());
3997 Error(
"HandleInputMessage",
"unknown command received from '%s' (what = %d)",
4025 Int_t merger_id = -1;
4026 (*mess) >> merger_id;
4029 Info(
"HandleSubmerger",
"kOutputSent: Worker %s:%d:%s had sent its output to merger #%d",
4033 Error(
"HandleSubmerger",
"kOutputSize: #%d not in list ", merger_id);
4046 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"all mergers removed ... ");
4050 PDB(kSubmerger, 2)
Error(
"HandleSubmerger",
"kOutputSent: received not on endmaster!");
4057 Int_t merger_id = -1;
4058 (*mess) >> merger_id;
4060 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown: #%d ", merger_id);
4063 Error(
"HandleSubmerger",
"kMergerDown: #%d not in list ", merger_id);
4086 while ((o = nxo())) {
4089 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
"kMergerDown:%d: exit", merger_id);
4097 Info(
"HandleSubmerger",
"worker %s reported as finished ", sl->
GetOrdinal());
4101 Info(
"HandleSubmerger",
"finalization on %s started ...", prefix);
4105 Int_t output_size = 0;
4106 Int_t merging_port = 0;
4107 (*mess) >> output_size >> merging_port;
4109 PDB(kSubmerger, 2)
Info(
"HandleSubmerger",
4110 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4126 msg.
Form(
"%s: Invalid request: cannot start %d mergers for %d workers",
4136 if (activeWorkers > 1) {
4142 msg.
Form(
"%s: Number of mergers set dynamically to %d (for %d workers)",
4145 msg.
Form(
"%s: No mergers will be used for %d workers",
4146 prefix, activeWorkers);
4155 if (activeWorkers > 1) {
4160 while ((wrk = nxwk())) {
4168 msg.
Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host",
4171 msg.
Form(
"%s: No mergers will be used for %d workers",
4172 prefix, activeWorkers);
4180 msg.
Form(
"%s: Number of mergers set by user to %d (for %d workers)",
4248 Error(
"HandleSubMerger",
"kOutputSize received not on endmaster!");
4260 Int_t merger_id = -1;
4274 if (merger_id == -1) {
4281 Info(
"RedirectWorker",
"redirecting worker %s to merger %d", sl->
GetOrdinal(), merger_id);
4283 PDB(kSubmerger, 2)
Info(
"RedirectWorker",
"redirecting output to merger #%d", merger_id);
4285 Error(
"RedirectWorker",
"#%d not in list ", merger_id);
4291 sendoutput << merger_id;
4292 sendoutput << hname;
4294 s->
Send(sendoutput);
4306 while (fLastAssignedMerger < fMergers->GetSize() &&
4318 while (fLastAssignedMerger < fMergers->GetSize() &&
4339 PDB(kSubmerger, 2)
Info(
"AskForOutput",
4340 "worker %s was asked to send its output to master",
4344 sendoutput <<
TString(
"master");
4360 Info(
"UpdateDialog",
4361 "processing was aborted - %lld events processed",
4376 Info(
"UpdateDialog",
4377 "processing was stopped - %lld events processed",
4394 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4399 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4415 while ((sl = (
TSlave*) next()))
4428 while ((sl = (
TSlave*) next()))
4440 Int_t active_mergers = 0;
4445 if (mi->
IsActive()) active_mergers++;
4448 return active_mergers;
4457 Info(
"CreateMerger",
"worker %s will be merger ", sl->
GetOrdinal());
4459 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"Begin");
4463 Info(
"CreateMerger",
"cannot create merger on port %d - exit", port);
4480 Int_t workersOnHost = 0;
4484 workers = workersOnHost - 1;
4488 msg.
Form(
"worker %s on host %s will be merger for %d additional workers", sl->
GetOrdinal(), sl->
GetName(), workers);
4500 bemerger << workers;
4503 PDB(kSubmerger,2)
Info(
"CreateMerger",
4504 "merger #%d (port: %d) for %d workers started",
4512 PDB(kSubmerger, 2)
Info(
"CreateMerger",
"exit");
4523 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4529 Error(
"MarkBad",
"worker instance undefined: protocol error? ");
4550 msg.
Form(
"\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4552 (reason && strlen(reason)) ? reason :
"unknown");
4557 msg +=
TString::Format(
"\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4562 msg +=
TString::Format(
" +++ Please check the session logs for error messages either using\n");
4566 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4570 msg +=
TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4571 "Display(\"*\")\n\n", thisurl.
Data());
4574 }
else if (reason) {
4576 Info(
"MarkBad",
"worker %s at %s:%d asked to terminate",
4584 TList *listOfMissingFiles = 0;
4585 if (!(listOfMissingFiles = (
TList *)
GetOutput(
"MissingFiles"))) {
4586 listOfMissingFiles =
new TList();
4587 listOfMissingFiles->
SetName(
"MissingFiles");
4596 packetizer->
MarkBad(wrk, 0, &listOfMissingFiles);
4652 Int_t mergersCount = -1;
4654 if (mc) mergersCount = mc->
GetVal();
4656 if (mergersCount == 0) {
4658 if (activeWorkers > 1) {
4684 std::lock_guard<std::recursive_mutex> lock(
fCloseMutex);
4699 Warning(
"TerminateWorker",
"worker instance undefined: protocol error? ");
4709 Info(
"TerminateWorker",
"connection to worker is already down: cannot"
4710 " send termination message");
4722 if (ord && strlen(ord) > 0) {
4727 while ((wrk = (
TSlave *)nxw())) {
4728 if (all || !strcmp(wrk->
GetOrdinal(), ord)) {