22#include "RConfigure.h"
48#if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
49 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
50 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
52#define lockf(fd, op, sz) flock((fd), (op))
54#define F_LOCK (LOCK_EX | LOCK_NB)
57#define F_ULOCK LOCK_UN
89#include "compiledata.h"
128static const char *__crashreporter_info__ = 0;
129asm(
".desc ___crashreporter_info__, 0x10");
167 Printf(
"Received SIGTERM: terminating");
259 Error(
"TProofServLogHandler",
"executing command in pipe");
263 Error(
"TProofServLogHandler",
264 "undefined command (%p) or socket (%p)", (
int *)cmd, s);
282 Error(
"TProofServLogHandler",
"undefined file (%p) or socket (%p)",
f, s);
295 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
313 if ((plf = strchr(
line,
'\n')))
363 Error(
"TProofServLogHandlerGuard",
"invalid handler");
367 Error(
"TProofServLogHandlerGuard",
"undefined command");
383 Error(
"TProofServLogHandlerGuard",
"invalid handler");
387 Error(
"TProofServLogHandlerGuard",
"undefined file");
421 printf(
"TShutdownTimer::Notify: checking activity on the input socket\n");
431 printf(
"TShutdownTimer::Notify: input socket: %p: did not show any activity"
432 " during the last %d mins: aborting\n", xs,
fTimeout);
438 printf(
"TShutdownTimer::Notify: input socket: %p: show activity"
439 " %ld secs ago\n", xs, dt / 60000);
469 spid.
Form(
"%d", pid);
489 pid = waitpid(
p->GetVal(), &status, WNOHANG);
490 }
while (pid < 0 && errno == EINTR);
493 pid = _cwait(&status, (intptr_t)
p->GetVal(), 0);
495 if (pid > 0 && pid ==
p->GetVal()) {
519 Info (
"Notify",
"session idle for more then %lld secs: terminating",
Long64_t(
fTime)/1000);
525 Warning(
"Notify",
"problems updating session status (errno: %d)", -uss_rc);
529 msg.
Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
530 "// Please IGNORE any error message possibly displayed below\n//",
533 msg.
Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
541 Warning(
"Notify",
"fProofServ undefined!");
568 Printf(
"proofserv: command line testing: OK");
580 if (fgVirtMemMax < 0 && gSystem->Getenv(
"PROOF_VIRTMEMMAX")) {
582 if (mmx < kMaxLong && mmx > 0)
586 if (fgVirtMemMax < 0 && gSystem->Getenv(
"ROOTPROOFASHARD")) {
588 if (mmx < kMaxLong && mmx > 0)
593 if (fgResMemMax < 0 && gSystem->Getenv(
"PROOF_RESMEMMAX")) {
595 if (mmx < kMaxLong && mmx > 0)
603 Warning(
"TProofServ",
"requested memory fraction threshold to stop processing"
604 " (MemStop) out of range [0,1] - ignoring");
608 Warning(
"TProofServ",
"requested memory fraction threshold for warning and finer monitoring"
609 " (MemHWM) out of range [0,MemStop] - ignoring");
623 if (argc && *argc >= 4)
624 if (!strcmp(argv[3],
"test"))
628 if (argc && *argc < 2) {
629 Error(
"TProofServ",
"Must have at least 1 arguments (see proofd).");
737 Warning(
"TProofServ",
"bad formatted log file size limit ignored: '%s'", logmx.
Data());
757 char c = (slog[0] ==
'M' || slog[0] ==
'm') ?
'm' :
'a';
758 c = (slog[0] ==
'W' || slog[0] ==
'w') ?
'w' :
c;
765 Warning(
"TProofServ",
"request for syslog logging ineffective!");
780 if (enableSchemaEvolution) {
783 Info(
"TProofServ",
"automatic schema evolution in TMessage explicitly disabled");
796 if (opensock.
Length() <= 0)
800 Fatal(
"CreateServer",
"Invalid socket descriptor number (%d)", sock);
824 Info(
"CreateServer",
"Service %s ConfDir %s IsMaster %d\n",
911 TString master =
"proof://__master__";
915 master +=
a.GetPort();
921 Error(
"CreateServer",
"no plugin manager found");
930 Error(
"CreateServer",
"no plugin found for TProof with a"
938 if (
h->LoadPlugin() == -1) {
939 Error(
"CreateServer",
"plugin for TProof could not be loaded");
951 Error(
"CreateServer",
"plugin for TProof could not be executed");
974 msg.
Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
975 " This may generate compatibility problems between streamed objects.\n"
976 " The advise is to move to ROOT >= 5.21/02 .");
988 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
990 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
1034 motdname +=
"/etc/proof/noproof";
1036 if ((motd = fopen(motdname,
"r"))) {
1039 while ((
c = getc(motd)) != EOF)
1051 Long_t id, flags, modtime, lasttime = 0;
1056 if (time(0) - lasttime > (time_t)86400)
1065 motdname +=
"/etc/proof/motd";
1068 if (modtime > lasttime || show) {
1069 if ((motd = fopen(motdname,
"r"))) {
1072 while ((
c = getc(motd)) != EOF)
1082 Int_t fd = creat(lastname.
Data(), 0600);
1083 if (fd >= 0) close(fd);
1096 Error(
"Get",
"problems sending request");
1114 Error(
"Get",
"command %d cannot be executed while processing",
what);
1115 }
else if (xrc == -2) {
1116 Error(
"Get",
"unknown command %d ! Protocol error?",
what);
1134 Info(
"RestartComputeTime",
"compute time restarted after %f secs (%d entries)",
1163 Error(
"GetNextPacket",
"no progress status object");
1180 req << cacheSize << learnent;
1185 req << totalEntries;
1192 Info(
"GetNextPacket",
"cacheSize: %lld, learnent: %d", cacheSize, learnent);
1200 << bytesRead << totalEntries;
1208 Error(
"GetNextPacket",
"Send() failed, returned %d", rc);
1216 Warning(
"GetNextPacket",
"problems saving partial results");
1227 Error(
"GetNextPacket",
"Recv() failed, returned %d", rc);
1243 PDB(kLoop, 2)
Info(
"GetNextPacket",
"'%s' '%s' '%s' %lld %lld",
1244 e->GetFileName(),
e->GetDirectory(),
1245 e->GetObjName(),
e->GetFirst(),
e->GetNum());
1247 PDB(kLoop, 2)
Info(
"GetNextPacket",
"Done");
1257 PDB(kLoop, 2)
Info(
"GetNextPacket:kPROOF_STOPPROCESS",
"received");
1263 Error(
"GetNextPacket",
"command %d cannot be executed while processing",
what);
1264 }
else if (xrc == -2) {
1265 Error(
"GetNextPacket",
"unknown command %d ! Protocol error?",
what);
1284 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3],
"test")) ?
kTRUE :
kFALSE;
1287 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1288 Printf(
"proofserv: command line testing: OK");
1292 if (!argc || (argc && *argc <= 1)) {
1293 Fatal(
"GetOptions",
"Must be started from proofd with arguments");
1297 if (!strcmp(argv[1],
"proofserv")) {
1300 }
else if (!strcmp(argv[1],
"proofslave")) {
1304 Fatal(
"GetOptions",
"Must be started as 'proofserv' or 'proofslave'");
1312 Fatal(
"GetOptions",
"ROOTCONFDIR shell variable not set");
1343 Error(
"HandleSocketInput",
"retrieving message from input socket");
1364 emsg.
Form(
"HandleSocketInput: command %d cannot be executed while processing",
what);
1365 }
else if (rc == -3) {
1366 emsg.
Form(
"HandleSocketInput: message %d undefined! Protocol error?",
what);
1368 emsg.
Form(
"HandleSocketInput: unknown command %d! Protocol error?",
what);
1371 }
else if (rc == 2) {
1375 Info(
"HandleSocketInput",
"message of type %d enqueued; sz: %d",
1385 Info(
"HandleSocketInput",
"processing enqueued message of type %d; left: %d",
1395 }
catch (std::bad_alloc &) {
1397 exmsg.
Form(
"caught exception 'bad_alloc' (memory leak?) %s %lld",
1399 }
catch (std::exception &exc) {
1401 exmsg.
Form(
"caught standard exception '%s' %s %lld",
1405 exmsg.
Form(
"caught exception throwing %d %s %lld",
1407 }
catch (
const char *str) {
1409 exmsg.
Form(
"caught exception throwing '%s' %s %lld",
1413 exmsg.
Form(
"caught exception <unknown> %s %lld",
1420 Error(
"HandleSocketInput",
"%s", exmsg.
Data());
1431 exmsg.
Form(
"high-memory footprint detected during Process(...) - terminating");
1432 Error(
"HandleSocketInput",
"%s", exmsg.
Data());
1447 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1448 SendAsynMessage(
" *** No workers left: cannot continue! Terminating ... *** ");
1475 if (!mess)
return -3;
1479 Info(
"HandleSocketInput",
"processing message type %d from '%s'",
1484 Int_t rc = 0, lirc = 0;
1502 Info(
"HandleSocketInput:kMESS_CINT",
"processing: %s...", str);
1521 if (pslb) slb = str;
1558 Info(
"HandleSocketInput:kPROOF_LOGLEVEL",
"debug level set to %d (mask: 0x%x)",
1589 Warning(
"HandleSocketInput:kPROOF_STATUS",
1590 "kPROOF_STATUS message is obsolete");
1592 Warning(
"HandleSocketInput:kPROOF_STATUS",
"problem sending of request");
1609 Info(
"HandleSocketInput:kPROOF_STOP",
"request for worker %s", ord.
Data());
1613 Info(
"HandleSocketInput:kPROOF_STOP",
"got request to terminate");
1626 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
"enter");
1633 Info(
"HandleSocketInput:kPROOF_STOPPROCESS",
1634 "recursive mode: enter %d, %ld", aborted, timeout);
1648 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_PROCESS",
"enter");
1658 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_SENDOUTPUT",
1659 "worker was asked to send output to master");
1662 Error(
"HandleSocketInput:kPROOF_SENDOUTPUT",
"problems sending output list");
1707 Info(
"HandleSocketInput:kPROOF_MAXQUERIES",
"Enter");
1719 Info(
"HandleSocketInput:kPROOF_CLEANUPSESSION",
"Enter");
1723 Printf(
"Session %s cleaned up", stag.
Data());
1725 Printf(
"Could not cleanup session %s", stag.
Data());
1735 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Enter");
1743 (*mess) >> isTree >>
filename >> dir >> objname;
1744 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1745 "Report size of object %s (%s) in dir %s in file %s",
1746 objname.Data(), isTree ?
"T" :
"O",
1749 PDB(kGlobal, 2)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
1750 "Found %lld %s", entries, isTree ?
"entries" :
"objects");
1755 answ << entries << objname;
1758 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETENTRIES",
"Done");
1783 sscanf(str,
"%1023s %d %ld %d",
name, &bin, &
size, &fw);
1785 sscanf(str,
"%1023s %d %ld",
name, &bin, &
size);
1812 Info(
"HandleSocketInput",
"forwarding file: %s", fnam.
Data());
1813 if (
fProof->
SendFile(fnam, opt, (copytocache ?
"cache" :
"")) < 0) {
1814 Error(
"HandleSocketInput",
"forwarding file: %s", fnam.
Data());
1828 (*mess) >> start >> end;
1830 Info(
"HandleSocketInput:kPROOF_LOGFILE",
1831 "Logfile request - byte range: %d - %d", start, end);
1862 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_CACHE",
"enter");
1875 Warning(
"HandleSocketInput:kPROOF_WORKERLISTS",
1876 "Action meaning-less on worker nodes: protocol error?");
1887 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Enter");
1901 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1902 "adding a list of worker nodes returned: %d", ret);
1905 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1906 "getting list of worker nodes returned: %d", retVal);
1926 answ << (
TList *)info;
1932 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
"Done");
1943 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Enter");
1947 p->HandleGetTreeHeader(mess);
1950 Error(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"could not create TProofPlayer instance!");
1953 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"Done");
1963 {
PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Enter");
1964 TList* outputList = 0;
1968 outputList =
new TList();
1970 outputList =
new TList();
1975 while ( (o = next()) ) {
1985 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_GETOUTPUTLIST",
"Done");
1992 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Enter");
2005 Info(
"HandleSocketInput:kPROOF_VALIDATE_DSET",
"Done");
2015 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Enter");
2018 Long64_t totalbytes = 0, bytesready = 0;
2020 answ << dataready << totalbytes << bytesready;
2022 Error(
"HandleSocketInput:kPROOF_DATA_READY",
2023 "This message should not be sent to slaves");
2027 PDB(kGlobal, 1)
Info(
"HandleSocketInput:kPROOF_DATA_READY",
"Done");
2043 Error(
"HandleSocketInput",
"old client: no or incompatible dataset support");
2068 Info(
"HandleSocketInput:kPROOF_REALTIMELOG",
2069 "setting real-time logging %s", (
on ?
"ON" :
"OFF"));
2092 Error(
"HandleSocketInput",
"no queries enqueued");
2105 Error(
"HandleSocketInput",
"adding a list of worker nodes returned: %d", ret);
2118 Error(
"HandleSocketInput",
"error getting list of worker nodes");
2120 Warning(
"HandleSocketInput",
"query was re-queued!");
2122 Error(
"HandleSocketInput",
"unexpected answer: %d", retVal);
2143 " idle or undefined player - ignoring");
2169 smsg.
Form(
"Echo response from %s:%s: %s",
2180 Error(
"HandleSocketInput",
"Can't redirect output");
2195 smsg.
Form(
"*** Echo response from %s:%s ***\n",
2218 Error(
"HandleSocketInput",
"unknown command %d",
what);
2243 Int_t mergedWorkers = 0;
2245 PDB(kSubmerger, 1)
Info(
"AcceptResults",
"enter");
2253 Int_t numworkers = 0;
2258 Info(
"AcceptResults",
"interrupt!");
2266 if (sw && sw != (
TSocket *)(-1)) {
2270 Info(
"AcceptResults",
"connection from a worker accepted on merger %s ",
2273 if (++numworkers >= connections)
2277 Info(
"AcceptResults",
"spurious signal found of merging socket");
2280 if (s->
Recv(mess) < 0) {
2281 Error(
"AcceptResults",
"problems receiving message");
2285 Info(
"AcceptResults",
"message received: %d ", (mess ? mess->
What() : 0));
2287 Error(
"AcceptResults",
"message received: %p ", mess);
2296 PDB(kSubmerger, 2)
Info(
"AcceptResults",
" type %d ",
type);
2300 Info(
"AcceptResults",
2301 "a new worker has been mergerd. Total merged workers: %d",
2307 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"removing %p (has been merged)", o);
2310 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"%p not merged yet", o);
2320 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"closing socket");
2328 PDB(kSubmerger, 2)
Info(
"AcceptResults",
"exit: %d",
result);
2338 Int_t n, nch, wasted = 0;
2340 const Int_t kBufSize = 1024;
2341 char waste[kBufSize];
2347 Info(
"HandleUrgentData",
"handling oob...");
2368 if (nch > kBufSize) nch = kBufSize;
2371 Error(
"HandleUrgentData",
"error receiving waste");
2376 Error(
"HandleUrgentData",
"error receiving OOB");
2382 Info(
"HandleUrgentData",
"got OOB byte: %d\n", oob_byte);
2389 Info(
"HandleUrgentData",
"*** Hard Interrupt");
2406 Error(
"HandleUrgentData",
"error sending OOB");
2417 if (nch > kBufSize) nch = kBufSize;
2420 Error(
"HandleUrgentData",
"error receiving waste (2)");
2430 Info(
"HandleUrgentData",
"Soft Interrupt");
2437 Error(
"HandleUrgentData",
"soft interrupt flushed stream");
2448 Info(
"HandleUrgentData",
"Shutdown Interrupt");
2459 Error(
"HandleUrgentData",
"unexpected OOB byte");
2479 Info(
"HandleSigPipe",
"keepAlive probe failed");
2488 Info(
"HandleSigPipe",
"keepAlive probe failed");
2531 if ((freopen(logfile,
mode, stdout)) == 0)
2532 SysError(
"RedirectOutput",
"could not freopen stdout (%s)", logfile);
2534 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2535 SysError(
"RedirectOutput",
"could not redirect stderr");
2537 if ((
fLogFile = fopen(logfile,
"r")) == 0)
2538 SysError(
"RedirectOutput",
"could not open logfile '%s'", logfile);
2542 Warning(
"RedirectOutput",
"no way to tell master (or client) where"
2543 " to upload packages");
2558 dd.
Replace(0, ic,
"proofserv");
2582 if (
size <= 0)
return 0;
2585 Int_t fd = open(
file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2587 SysError(
"ReceiveFile",
"error opening file %s",
file);
2591 const Int_t kMAXBUF = 16384;
2592 char buf[kMAXBUF], cpy[kMAXBUF];
2597 while (filesize <
size) {
2610 Int_t k = 0, i = 0, j = 0;
2617 cpy[j++] = buf[i++];
2621 w = write(fd,
q,
r);
2623 w = write(fd,
p,
r);
2627 SysError(
"ReceiveFile",
"error writing to file %s",
file);
2635 Error(
"ReceiveFile",
"error during receiving file %s",
file);
2643 if (chmod(
file, 0644) != 0)
2644 Warning(
"ReceiveFile",
"error setting mode 0644 on file %s",
file);
2683 off_t ltot=0, lnow=0;
2688 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2691 if (ltot >= 0 && lnow >= 0) {
2694 if (end <= start || end > ltot)
2696 left = (
Int_t)(end - start);
2701 left = (
Int_t)(ltot - lnow);
2708 SysError(
"SendLogFile",
"error sending kPROOF_LOGFILE");
2712 const Int_t kMAXBUF = 32768;
2714 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2722 SysError(
"SendLogFile",
"error reading log file");
2726 if (end == ltot &&
len == wanted)
2730 SysError(
"SendLogFile",
"error sending log file");
2736 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2738 }
while (
len > 0 && left > 0);
2742 if (adhoc && lnow >=0 )
2749 mess << status << (
Int_t) 1;
2752 SysError(
"SendLogFile",
"error sending kPROOF_LOGDONE");
2756 PDB(kGlobal, 1)
Info(
"SendLogFile",
"kPROOF_LOGDONE sent");
2773 mess << bytesread << realtime << cputime << workdir;
2784 Int_t nparallel = 0;
2787 Info(
"SendParallel",
"Will invoke AskParallel()");
2790 Info(
"SendParallel",
"Will invoke GetParallel()");
2797 mess << nparallel << async;
2816 Error(
"Setup",
"failed to send proof server startup message");
2824 Error(
"Setup",
"failed to receive remote proof protocol");
2828 Error(
"Setup",
"failed to send local proof protocol");
2836 Error(
"Setup",
"OldAuthSetup: failed to setup authentication");
2855 Error(
"Setup",
"failed to receive ordinal and config info");
2884 if (tmpWorkDir !=
"")
2888 Info(
"Setup",
"invalid config file %s (missing or unreadable",
2934 Error(
"Setup",
"common setup failed");
2967 if (paths.
Length() > 0) {
2971 else if (paths.
Contains(
"<compiler>"))
2980 if (!bindir.
IsNull()) bindir +=
":";
2982 }
else if (icomp == -1) {
2983 if (!path.
IsNull()) path +=
":";
2991 else if (paths.
Contains(
"<sysbin>"))
2995 if (!bindir.
IsNull()) bindir +=
":";
2996 bindir +=
"/bin:/usr/bin:/usr/local/bin";
2997 }
else if (isysb == -1) {
2998 if (!path.
IsNull()) path +=
":";
2999 path +=
"/bin:/usr/bin:/usr/local/bin";
3004 if (!bindir.
IsNull()) bindir +=
":";
3012 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3021 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3056 const char *k = (
IsMaster()) ?
"Mst" :
"Wrk";
3060 Info(
"SetupCommon",
"package directory set to %s", packdir.
Data());
3075 Warning(
"SetupCommon",
"problems creating path '%s' (errno: %d)",
3084 if (!dataDirOpts.
IsNull()) {
3092 if (dataDirOpts.
Contains(
"g"))
m = 0775;
3095 Info(
"SetupCommon",
"requested mode for data directories is '%o'",
m);
3102 if (subp.
IsNull())
continue;
3107 Warning(
"SetupCommon",
"problems setting mode '%o' on path '%s' (errno: %d)",
3114 Warning(
"SetupCommon",
"problems stat-ing path '%s' (errno: %d; datadir: %s)",
3127 Info(
"SetupCommon",
" %d global package directories registered", nglb);
3136 Error(
"SetupCommon",
"can not change to working directory '%s'",
3189 while (dsms.
Tokenize(dsm, from,
",")) {
3191 Warning(
"SetupCommon",
"a valid dataset manager already initialized");
3192 Warning(
"SetupCommon",
"support for multiple managers not yet available");
3196 if (
gROOT->GetPluginManager()) {
3198 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
3199 if (
h &&
h->LoadPlugin() != -1) {
3209 Warning(
"SetupCommon",
"dataset manager plug-in initialization failed");
3210 SendAsynMessage(
"TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3226 h =
gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
3227 if (
h &&
h->LoadPlugin() == -1)
h = 0;
3236 Warning(
"SetupCommon",
"default dataset manager plug-in initialization failed");
3242 if (!dsReqCfg.
IsNull()) {
3243 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
3245 if (reReqDir.
Match(dsReqCfg) == 5) {
3247 dsDirFmt.
Form(
"dir:%s perms:open", reReqDir[3].Data());
3253 "failed init of dataset staging requests repository");
3258 "specify, with [dir:]<path>, a valid path for staging requests");
3261 Warning(
"SetupCommon",
"no repository for staging requests available");
3275 while (quotas.
Tokenize(tok, from,
" ")) {
3283 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.
Data());
3286 const char *ksz[2] = {
"hwmsz=",
"maxsz="};
3287 for (
Int_t j = 0; j < 2; j++) {
3294 const char *s[3] = {
"k",
"m",
"g"};
3295 Int_t i = 0, ki = 1024;
3310 TString ssz(ksz[j], strlen(ksz[j])-1);
3311 Info(
"SetupCommon",
"parsing '%s' : ignoring token %s", ssz.
Data(), tok.
Data());
3321 Warning(
"SetupCommon",
"problems applying fMaxQueries");
3341 if (!
name.IsNull()) {
3363 Info(
"SetupCommon",
"successfully completed");
3383 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3417 Info(
"Terminate",
"data directory '%s' has been removed",
fDataDir.
Data());
3424 while ((fh = next())) {
3442 if (!path || strlen(path) <= 0)
return kFALSE;
3448 const char *ent = 0;
3450 if (!strcmp(ent,
".") || !strcmp(ent,
".."))
continue;
3451 fpath.
Form(
"%s/%s", path, ent);
3468 Warning(
"UnlinkDataDir",
"data directory '%s' is empty but could not be removed", path);
3499 if (!oldAuthSetupHook) {
3501 TString authlib =
"libRootAuth";
3507 Error(
"OldAuthSetup",
"can't load %s",authlib.
Data());
3511 Error(
"OldAuthSetup",
"can't locate %s",authlib.
Data());
3520 Error(
"OldAuthSetup",
"can't find OldProofServAuthSetup");
3536 TDSet *dset,
const char *selec,
3553 fst, dset, selec, elist);
3571 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3575 Info(
"SetQueryRunning",
"starting query: %d", pq->
GetSeqNum());
3603 Info(
"HandleArchive",
"Enter");
3607 (*mess) >> queryref >> path;
3609 if (slb) slb->
Form(
"%s %s", queryref.
Data(), path.
Data());
3612 if (queryref ==
"Default") {
3614 Info(
"HandleArchive",
3624 if (path.
Length() <= 0) {
3626 Info(
"HandleArchive",
3627 "archive paths are not defined - do nothing");
3631 path.
Form(
"%s/session-%s-%d.root",
3642 if (!pqr || qry < 0) {
3644 fout +=
"/query-result.root";
3650 TIter nxk(
f->GetListOfKeys());
3652 while ((k = (
TKey *)nxk())) {
3662 Info(
"HandleArchive",
3663 "file cannot be open (%s)",fout.
Data());
3670 PDB(kGlobal, 1)
Info(
"HandleArchive",
3671 "archive path for query #%d: %s",
3678 if (!farc || !(farc->
IsOpen())) {
3679 Info(
"HandleArchive",
3680 "archive file cannot be open (%s)",path.
Data());
3694 if (qry > -1 &&
fQMgr)
3698 Info(
"HandleArchive",
3699 "results of query %s archived to file %s",
3719 emsg.
Form(
"file collection undefined!");
3762 Info(
"HandleProcess",
"Enter");
3787 if ((!hasNoData) && elist)
3798 Error(
"HandleProcess",
"AssertDataSet: %s", emsg.
Data());
3803 }
else if (hasNoData) {
3805 TNamed *ftp =
dynamic_cast<TNamed *
>(
input->FindObject(
"PROOF_FilesToProcess"));
3813 emsg.
Form(
"dataset manager not initialized!");
3818 emsg.
Form(
"requested dataset '%s' does not exists", dsn.
Data());
3825 fcmap->
SetName(
"PROOF_FilesToProcess");
3833 Error(
"HandleProcess",
"%s", emsg.
Data());
3849 if (dset)
input->Add(dset);
3850 if (elist)
input->Add(elist);
3854 input->Clear(
"nodelete");
3859 Warning(
"HandleProcess",
"could not save input data: %s", emsg.
Data());
3885 Error(
"HandleProcess",
"error getting list of worker nodes");
3892 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3896 Error(
"HandleProcess",
"Adding a list of worker nodes returned: %d",
3906 Error(
"HandleProcess",
"error getting list of worker nodes");
3913 Info(
"HandleProcess",
"query %d enqueued", pq->
GetSeqNum());
3915 Error(
"HandleProcess",
"unknown return value: %d", retVal);
3926 if (!sync || enqueued) {
3934 Info(
"HandleProcess",
4001 Warning(
"HandleProcess",
"could not get input data: %s", emsg.
Data());
4005 Warning(
"HandleProcess",
"could not get query sequential number!");
4009 while ((nord =
input->FindObject(
"PROOF_Ordinal")))
4010 input->Remove(nord);
4016 while ((o = next())) {
4017 PDB(kGlobal, 2)
Info(
"HandleProcess",
"adding: %s", o->
GetName());
4025 while ((obj = nxt())){
4029 Info(
"HandleProcess",
"selector obj for '%s' found", selector_obj->
ClassName());
4045 Info(
"HandleProcess",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4049 Info(
"HandleProcess",
"calling fPlayer->Process() with selector name: %s",
filename.Data());
4061 m << status << abort;
4074 Info(
"TProofServ::Handleprocess",
4075 "worker %s has finished processing with %d objects in output list",
4095 if (!isSubMerging) {
4115 Info(
"HandleProcess",
"controlled mode: worker %s has finished,"
4122 Info(
"HandleProcess",
"submerging disabled because of high-memory case");
4125 PDB(kGlobal, 2)
Info(
"HandleProcess",
"merging mode check: %d", isSubMerging);
4141 Int_t merge_port = 0;
4144 Info(
"HandleProcess",
"possible port for merging connections: %d",
4148 msg_osize << merge_port;
4162 PDB(kGlobal, 2)
Info(
"HandleProcess",
"sending result directly to master");
4164 Warning(
"HandleProcess",
"problems sending output list");
4186 Warning(
"HandleProcess",
"the output list is empty!");
4188 Warning(
"HandleProcess",
"problems sending output list");
4205 while ((obj = nex())) {
4213 TList *added =
dynamic_cast<TList *
>(
input->FindObject(
"PROOF_InputObjsFromFile"));
4221 while ((o = nxo())) {
input->Remove(o); }
4222 input->Remove(added);
4238 PDB(kGlobal, 1)
Info(
"HandleProcess",
"done");
4249 PDB(kOutput, 2)
Info(
"SendResults",
"enter");
4260 msg.
Form(
"%s: merging output objects ... done ",
4264 msg.
Form(
"%s: objects merged; sending output: %d objs",
fPrefix.
Data(), olsz);
4269 if (sock->
Send(mbuf) < 0)
return -1;
4275 Int_t totsz = 0, objsz = 0;
4277 while ((o = nxo())) {
4281 "message has %d bytes: limit of %lld bytes reached - sending ...",
4294 msg.
Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4298 if (sock->
Send(mbuf) < 0)
return -1;
4305 mbuf << (
Int_t) ((ns >= olsz) ? 2 : 1);
4320 msg.
Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4324 if (sock->
Send(mbuf) < 0)
return -1;
4328 msg.
Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4340 msg.
Form(
"%s: merging output objects ... done ",
4344 msg.
Form(
"%s: objects merged; sending output: %d objs",
fPrefix.
Data(), olsz);
4349 if (sock->
Send(mbuf) < 0)
return -1;
4353 Int_t totsz = 0, objsz = 0;
4356 while ((o = nxo())) {
4373 msg.
Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4377 if (sock->
Send(mbuf) < 0)
return -1;
4382 msg.
Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4396 msg.
Form(
"%s: sending output: %d objs, %d bytes",
fPrefix.
Data(), olsz, blen);
4398 if (sock->
Send(mbuf) < 0)
return -1;
4402 PDB(kGlobal, 2)
Info(
"SendResults",
"sending output list");
4404 PDB(kGlobal, 2)
Info(
"SendResults",
"notifying failure or abort");
4409 PDB(kOutput,2)
Info(
"SendResults",
"done");
4461 Error(
"ProcessNext",
"no TDset object: cannot continue");
4483 while ((obj = nxt())){
4487 Info(
"ProcessNext",
"found object for selector '%s'", obj->
ClassName());
4494 Error(
"ProcessNext",
"empty waiting queries list!");
4540 if (
gEnv->
Lookup(
"Proof.UseMergers") && !
input->FindObject(
"PROOF_UseMergers")) {
4544 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"PROOF_UseMergers set to %d", smg);
4550 if ((o =
input->FindObject(
"PROOF_MergersByHost"))) {
input->Remove(o);
delete o; }
4552 PDB(kSubmerger, 2)
Info(
"ProcessNext",
"submergers setup by host/node");
4561 while ((o = next())) {
4567 if ((o =
input->FindObject(
"MissingFiles")))
input->Remove(o);
4572 Info(
"ProcessNext",
"calling fPlayer->Process() with selector object: %s", selector_obj->
ClassName());
4576 Info(
"ProcessNext",
"calling fPlayer->Process() with selector name: %s",
filename.Data());
4591 m << status << abort;
4607 Warning(
"ProcessNext",
"problems registering produced datasets: %s", emsg.
Data());
4628 while ((xo = nxo())) {