27#include "Xrd/XrdBuffer.hh"
29#include "Xrd/XrdScheduler.hh"
30#include "XrdNet/XrdNet.hh"
31#include "XrdOuc/XrdOucRash.hh"
32#include "XrdOuc/XrdOucStream.hh"
34#include "XrdSys/XrdSysPlugin.hh"
54} XpdBroadcastPriority_t;
63#define PutEnv(x,e) { if (e) { putenv(x); } else { delete[] x; } }
84 XPDLOC(SMGR,
"ProofServCron")
90 TRACE(XERR,
"undefined session manager: cannot start");
96 int quickcheckfreq = 5;
100 int lastrun = time(0);
101 int lastcheck = lastrun, ckfreq = mgr->
CheckFrequency(), waitt = 0;
102 int deltat = ((
int)(0.1*ckfreq) >= 1) ? (
int)(0.1*ckfreq) : 1;
103 int maxdelay = 5*ckfreq;
105 TRACE(ALL,
"next full sessions check in "<<ckfreq<<
" secs");
110 waitt = ckfreq - (time(0) - lastcheck);
111 if (waitt > quickcheckfreq || waitt <= 0)
112 waitt = quickcheckfreq;
113 int pollRet = mgr->
Pipe()->
Poll(waitt);
119 if ((rc = mgr->
Pipe()->
Recv(msg)) != 0) {
120 TRACE(XERR,
"problems receiving message; errno: "<<-rc);
127 if ((rc = msg.
Get(fpid)) != 0) {
128 TRACE(XERR,
"kSessionRemoval: problems receiving process ID (buf: '"<<
129 msg.
Buf()<<
"'); errno: "<<-rc);
132 XrdSysMutexHelper mhp(mgr->
Mutex());
140 TRACE(XERR,
"kSessionRemoval: problem posting the scheduler pipe");
144 TRACE(REQ,
"kSessionRemoval: session: "<<fpid<<
145 " has been removed from the active list");
148 TRACE(XERR,
"obsolete type: XrdProofdProofServMgr::kClientDisconnect");
155 rc = (rc == 0) ? msg.
Get(svrtype) : rc;
157 TRACE(XERR,
"kCleanSessions: problems parsing message (buf: '"<<
158 msg.
Buf()<<
"'); errno: "<<-rc);
162 TRACE(REQ,
"kCleanSessions: request for user: '"<<usr<<
"', server type: "<<svrtype);
174 TRACE(XERR,
"unknown type: "<<msg.
Type());
185 if ((now - lastrun) < maxdelay) {
187 lastcheck = now + 5 - ckfreq;
190 TRACE(ALL,
"postponing sessions check (will retry in 5 secs)");
194 TRACE(ALL,
"Max time without checks reached ("<<maxdelay<<
"): force a session check");
205 if (clnlostscale <= 0) {
213 TRACE(ALL, cursess <<
" sessions are currently active");
242 XPDLOC(SMGR,
"ProofServRecover")
247 TRACE(XERR,
"undefined session manager: cannot start");
256 TRACE(ALL,
"timeout recovering sessions: "<<rc<<
" sessions not recovered");
258 TRACE(XERR,
"some problem occured while recovering sessions");
260 TRACE(ALL,
"recovering successfully terminated");
271 XrdProtocol_Config *pi, XrdSysError *
e)
274 XPDLOC(SMGR,
"XrdProofdProofServMgr")
310 TRACE(XERR,
"unable to generate pipe for the session poller");
324 XPDLOC(SMGR,
"ProofServMgr::Config")
328 bool notify = (rcf) ? 0 : 1;
339 TRACE(XERR,
"problems parsing file ");
344 msg = (rcf) ?
"re-configuring" :
"configuring";
352 msg =
"client sessions shutdown after disconnection";
354 XPDFORM(msg,
"client sessions kept %sfor %d secs after disconnection",
388 for ( ; ircs !=
fProofServRCs.end(); ++ircs) { (*ircs).Print(
"rc"); }
393 for ( ; ienvs !=
fProofServEnvs.end(); ++ienvs) { (*ienvs).Print(
"env"); }
398 XPDFORM(msg,
"using %s to start proofserv sessions",
fUseFork ?
"fork()" :
"system()");
405 TRACE(XERR,
"problems trying to recover active sessions");
407 XPDFORM(msg,
"%d active sessions have been recovered", nr);
417 (
void *)&
fManagerCron, 0,
"ProofServMgr cron thread") != 0) {
418 TRACE(XERR,
"could not start cron thread");
421 XPDPRT(
"cron thread started");
433 XPDLOC(SMGR,
"ProofServMgr::AddSession")
435 TRACE(REQ,
"adding new active session ...");
439 TRACE(XERR,
"invalid inputs: "<<(s ?
"" :
"s, ") <<
", "<< (p->
Client() ?
"" :
"p->Client()"));
461 XPDLOC(SMGR,
"ProofServMgr::IsSessionSocket")
463 TRACE(REQ,
"checking "<<fpid<<
" ...");
466 if (!fpid || strlen(fpid) <= 0) {
467 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
472 XrdOucString spath(fpid);
473 if (!spath.endswith(
".sock"))
return 0;
478 XrdOucString apath = spath;
479 apath.replace(
".sock",
"");
483 if (stat(apath.c_str(), &st) != 0 && (errno == ENOENT)) {
486 unlink(spath.c_str());
487 TRACE(REQ,
"missing admin path: removing "<<spath<<
" ...");
500 XPDLOC(SMGR,
"ProofServMgr::MvSession")
502 TRACE(REQ,
"moving "<<fpid<<
" ...");
505 if (!fpid || strlen(fpid) <= 0) {
506 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
511 XrdOucString opath(fpid), npath;
515 opath.replace(
".status",
"");
518 opath.replace(
".status",
"");
525 XrdOucString spath = opath;
527 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
528 TRACE(XERR,
"problems removing the UNIX socket path: "<<spath<<
"; errno: "<<errno);
529 spath.replace(
".sock",
".status");
530 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
531 TRACE(XERR,
"problems removing the status file: "<<spath<<
"; errno: "<<errno);
536 if ((rc = rename(opath.c_str(), npath.c_str())) == 0 || (errno == ENOENT)) {
543 TRACE(XERR,
"session pid file cannot be moved: "<<opath<<
544 "; target file: "<<npath<<
"; errno: "<<errno);
553 XPDLOC(SMGR,
"ProofServMgr::RmSession")
555 TRACE(REQ,
"removing "<<fpid<<
" ...");
558 if (!fpid || strlen(fpid) <= 0) {
559 TRACE(XERR,
"invalid input: "<< (fpid ? fpid :
"<nul>"));
568 if (unlink(path.c_str()) == 0)
571 TRACE(XERR,
"session pid file cannot be unlinked: "<<
572 path<<
"; error: "<<errno);
581 XPDLOC(SMGR,
"ProofServMgr::TouchSession")
583 TRACE(REQ,
"touching "<<(fpid ? fpid :
"<nul>")<<
", "<<(fpath ? fpath :
"<nul>")<<
" ...");
586 if (!fpid || strlen(fpid) <= 0) {
587 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
592 XrdOucString path(fpath);
593 if (!fpath || !fpath[0])
597 if (utime(path.c_str(), 0) == 0)
600 TRACE(XERR,
"time stamps for session pid file cannot be updated: "<<
601 path<<
"; error: "<<errno);
612 int to,
const char *fpath)
614 XPDLOC(SMGR,
"ProofServMgr::VerifySession")
617 if (!fpid || strlen(fpid) <= 0) {
618 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
624 if (fpath && strlen(fpath) > 0)
625 XPDFORM(path,
"%s/%s", fpath, fpid);
635 if (stat(path.c_str(), &st)) {
636 TRACE(XERR,
"session status file cannot be stat'ed: "<<
637 path<<
"; error: "<<errno);
642 deltat = time(0) - st.st_mtime;
644 if (path.endswith(
".status")) {
646 path.erase(path.rfind(
".status"));
649 TRACE(DBG,
"admin path for session "<<fpid<<
" hase not been touched"
650 " since at least "<< xto <<
" secs");
660 TRACE(DBG,
"admin path for session "<<fpid<<
" was touched " <<
661 deltat <<
" secs ago");
671 XPDLOC(SMGR,
"ProofServMgr::DeleteFromSessions")
673 TRACE(REQ,
"session: "<<fpid);
676 if (!fpid || strlen(fpid) <= 0) {
677 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
681 XrdOucString key = fpid;
682 key.replace(
".status",
"");
683 key.erase(0, key.rfind(
'.') + 1);
689 XPDFORM(msg,
"session: %s terminated by peer", fpid);
694 XrdSysMutexHelper mhp(
fMutex);
711 XPDLOC(SMGR,
"ProofServMgr::PrepareSessionRecovering")
719 TRACE(REQ,
"preparing recovering of active sessions ...");
723 struct dirent *ent = 0;
724 while ((ent = (
struct dirent *)readdir(dir))) {
725 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
727 XrdOucString rest,
a;
730 if (
a.length() > 0)
continue;
735 TRACE(DBG,
"found active session: "<<pid);
757 0,
"ProofServMgr session recover thread") != 0) {
758 TRACE(XERR,
"could not start session recover thread");
761 XPDPRT(
"session recover thread started");
782 XPDLOC(SMGR,
"ProofServMgr::RecoverActiveSessions")
788 TRACE(XERR,
"recovering clients list undefined");
794 TRACE(REQ,
"start recovering of "<<nrc<<
" clients");
813 { XrdSysMutexHelper mhp(cls->
fMutex);
823 TRACE(REQ, nrc<<
" clients still to recover");
836 std::list<XpdClientSessions* >::iterator ii =
fRecoverClients->begin();
838 rc += (*ii)->fProofServs.size();
863 XPDLOC(SMGR,
"ProofServMgr::IsClientRecovering")
866 TRACE(XERR,
"invalid inputs: usr: "<<(usr ? usr :
"")<<
", grp:"<<(grp ? grp :
"")<<
" ...");
874 std::list<XpdClientSessions *>::iterator ii =
fRecoverClients->begin();
876 if ((*ii)->fClient && (*ii)->fClient->Match(usr, grp)) {
884 TRACE(DBG,
"checking usr: "<<usr<<
", grp:"<<grp<<
" ... recovering? "<<
885 rc<<
", until: "<<deadline);
899 XPDLOC(SMGR,
"ProofServMgr::CheckActiveSessions")
901 TRACE(REQ,
"checking active sessions ...");
911 struct dirent *ent = 0;
912 while ((ent = (
struct dirent *)readdir(dir))) {
913 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
916 if (strstr(ent->d_name,
".sock") &&
IsSessionSocket(ent->d_name))
continue;
918 XrdOucString rest, key, after;
921 if (after !=
"status")
continue;
927 { XrdSysMutexHelper mhp(
fMutex);
931 bool sessionalive = (
VerifySession(ent->d_name) == 0) ? 1 : 0;
934 if (!xps->
IsValid() || !sessionalive) rmsession = 1;
938 if (sessionalive)
continue;
956 if (!rmsession && verify && !oldvers) {
962 TRACE(REQ,
"session: "<<ent->d_name<<
"; nc: "<<nc<<
"; rm: "<<rmsession);
981 XPDLOC(SMGR,
"ProofServMgr::CheckTerminatedSessions")
983 TRACE(REQ,
"checking terminated sessions ...");
994 struct dirent *ent = 0;
995 while ((ent = (
struct dirent *)readdir(dir))) {
996 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
998 XrdOucString rest,
a;
1003 now = (now > 0) ? now : time(0);
1011 int rcst = stat(path.c_str(), &st);
1012 TRACE(DBG, pid<<
": rcst: "<<rcst<<
", now - mtime: "<<now - st.st_mtime<<
" secs")
1040 XPDLOC(SMGR,
"ProofServMgr::CleanClientSessions")
1042 TRACE(REQ,
"cleaning "<<usr<<
" ...");
1045 bool all = (!usr || strlen(usr) <= 0 || !strcmp(usr,
"all")) ? 1 : 0;
1051 XrdOucString path, rest, key,
a;
1054 XrdSysRecMutex *mtx = 0;
1064 std::list<int> tobedel;
1065 { XrdSysMutexHelper mtxh(mtx);
1073 struct dirent *ent = 0;
1074 while ((ent = (
struct dirent *)readdir(dir))) {
1076 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
1084 if (!all && info.
fUser != usr)
continue;
1111 struct dirent *ent = 0;
1112 while ((ent = (
struct dirent *)readdir(dir))) {
1114 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
1117 if (
a ==
"status")
continue;
1122 if (!all && info.
fUser != usr)
continue;
1131 tobedel.push_back(pid);
1143 std::list<int>::iterator ii = tobedel.begin();
1144 while (ii != tobedel.end()) {
1146 XrdSysMutexHelper mhp(
fMutex);
1149 std::list<XrdProofdProofServ *>::iterator ixps =
fActiveSessions.begin();
1157 if (!active)
fSessions.Del(key.c_str());
1191 char *val, XrdOucStream *cfg,
bool rcf)
1193 XPDLOC(SMGR,
"ProofServMgr::DoDirective")
1199 if (
d->fName ==
"proofservmgr") {
1201 }
else if (
d->fName ==
"putenv") {
1203 }
else if (
d->fName ==
"putrc") {
1205 }
else if (
d->fName ==
"shutdown") {
1208 TRACE(XERR,
"unknown directive: "<<
d->fName);
1218 XPDLOC(SMGR,
"ProofServMgr::DoDirectiveProofServMgr")
1236 XrdOucString tok(val);
1237 if (tok.beginswith(
"checkfq:")) {
1238 tok.replace(
"checkfq:",
"");
1239 checkfq = strtol(tok.c_str(), 0, 10);
1240 }
else if (tok.beginswith(
"termto:")) {
1241 tok.replace(
"termto:",
"");
1242 termto = strtol(tok.c_str(), 0, 10);
1243 }
else if (tok.beginswith(
"verifyto:")) {
1244 tok.replace(
"verifyto:",
"");
1245 verifyto = strtol(tok.c_str(), 0, 10);
1246 }
else if (tok.beginswith(
"recoverto:")) {
1247 tok.replace(
"recoverto:",
"");
1248 recoverto = strtol(tok.c_str(), 0, 10);
1249 }
else if (tok.beginswith(
"checklost:")) {
1250 tok.replace(
"checklost:",
"");
1251 checklost = strtol(tok.c_str(), 0, 10);
1252 }
else if (tok.beginswith(
"usefork:")) {
1253 tok.replace(
"usefork:",
"");
1254 usefork = strtol(tok.c_str(), 0, 10);
1257 val = cfg->GetWord();
1275 XPDFORM(msg,
"checkfq: %d s, termto: %d s, verifyto: %d s, recoverto: %d s, checklost: %d, usefork: %d",
1292 XrdOucString users, groups, rcval, rcnam;
1293 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1295 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1298 int iequ = rcnam.find(
'=');
1299 if (iequ == STR_NPOS)
return -1;
1304 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1323 XrdOucString users, groups, rcval, rcnam;
1324 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1326 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1330 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1339 XrdOucString &users, XrdOucString &groups,
1340 XrdOucString &rcval, XrdOucString &rcnam,
1341 int &smi,
int &smx,
int &vmi,
int &vmx,
bool &hex)
1343 XrdOucString ssvn, sver;
1345 while (val && val[0]) {
1346 if (!strncmp(val,
"u:", 2)) {
1349 }
else if (!strncmp(val,
"g:", 2)) {
1352 }
else if (!strncmp(val,
"s:", 2)) {
1355 idash = ssvn.find(
'-');
1356 if (idash != STR_NPOS) {
1357 if (ssvn.isdigit(0, idash-1)) smi = ssvn.atoi(0, idash-1);
1358 if (ssvn.isdigit(idash+1)) smx = ssvn.atoi(idash+1);
1360 if (ssvn.isdigit()) smi = ssvn.atoi();
1362 }
else if (!strncmp(val,
"v:", 2)) {
1366 if (sver.beginswith(
'x')) {
1370 idash = sver.find(
'-');
1371 if (idash != STR_NPOS) {
1372 if (sver.isdigit(0, idash-1)) vmi = sver.atoi(0, idash-1);
1373 if (sver.isdigit(idash+1)) vmx = sver.atoi(idash+1);
1375 if (sver.isdigit()) vmi = sver.atoi();
1378 if (rcval.length() > 0) {
1385 val = cfg->GetWord();
1395 const char *usrs,
const char *grps,
1396 int smi,
int smx,
int vmi,
int vmx,
bool hex)
1398 XPDLOC(SMGR,
"ProofServMgr::FillEnvList")
1401 TRACE(ALL,
"env list undefined!");
1405 XrdOucString users(usrs), groups(grps);
1410 XpdEnv xpe(nam, val, users.c_str(), groups.c_str(), smi, smx, vmi, vmx);
1411 if (users.length() > 0) {
1414 while ((from = users.tokenize(usr, from,
',')) != -1) {
1415 if (usr.length() > 0) {
1416 if (groups.length() > 0) {
1419 while ((fromg = groups.tokenize(grp, from,
',')) != -1) {
1420 if (grp.length() > 0) {
1421 xpe.
Reset(nam, val, usr.c_str(), grp.c_str(), smi, smx, vmi, vmx);
1426 xpe.
Reset(nam, val, usr.c_str(), 0, smi, smx, vmi, vmx);
1432 if (groups.length() > 0) {
1435 while ((fromg = groups.tokenize(grp, fromg,
',')) != -1) {
1436 if (grp.length() > 0) {
1437 xpe.
Reset(nam, val, 0, grp.c_str(), smi, smx, vmi, vmx);
1462 int dp = strtol(val,0,10);
1463 if (dp >= 0 && dp <= 2)
1466 if ((val = cfg->GetWord())) {
1467 int l = strlen(val);
1469 XrdOucString tval = val;
1471 if (val[
l-1] ==
's') {
1473 }
else if (val[
l-1] ==
'm') {
1476 }
else if (val[
l-1] ==
'h') {
1479 }
else if (val[
l-1] < 48 || val[
l-1] > 57) {
1483 int de = strtol(val,0,10);
1506 XPDLOC(SMGR,
"ProofServMgr::Process")
1517 XrdOucString emsg(
"Invalid request code: ");
1522 response->Send(kXR_ServerError,
1523 "ProofServMgr::Process: error posting internal pipe for authorization to proceed");
1527 response->Send(kXR_ServerError,
1528 "ProofServMgr::Process: timed-out waiting for authorization to proceed - retry later");
1550 response->Send(kXR_InvalidRequest, emsg.c_str());
1559 XPDLOC(SMGR,
"ProofServMgr::Attach")
1561 int psid = -1, rc = 0;
1566 TRACEP(p, REQ,
"psid: "<<psid<<
", CID = "<<p->
CID());
1571 TRACEP(p, XERR,
"client instance undefined");
1572 response->Send(kXR_ServerError,
"client instance undefined");
1581 while ((deadline < 0) || (now < deadline)) {
1582 if (!(xps =
c->GetServer(psid)) || !xps->
IsValid()) {
1586 TRACEP(p, XERR,
"session ID not found: "<<psid);
1587 response->Send(kXR_InvalidRequest,
"session ID not found");
1591 deadline = (deadline > 0) ? deadline : defdeadline;
1602 if (!xps || !xps->
IsValid()) {
1603 TRACEP(p, XERR,
"session ID not found: "<<psid);
1604 response->Send(kXR_InvalidRequest,
"session ID not found");
1607 TRACEP(p, DBG,
"xps: "<<xps<<
", status: "<< xps->
Status());
1611 memcpy((
void *)&sid, (
const void *)&(p->
Request()->
header.streamid[0]), 2);
1628 if (!dpu.endswith(
'/'))
1632 (
void *) dpu.c_str(), dpu.length());
1652 unsigned short &sid)
1654 XPDLOC(SMGR,
"ProofServMgr::PrepareProofServ")
1662 memcpy((
void *)&sid, (
const void *)&(p->
Request()->
header.streamid[0]), 2);
1679 XPDFORM(msg,
"++++ Using NON-default ROOT version: %s ++++\n", xps->
ROOT()->
Export());
1680 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) msg.c_str(), msg.length());
1693 XrdOucString &tag, XrdOucString &ord,
1694 XrdOucString &cffile,
1695 XrdOucString &uenvs,
int &intwait)
1697 XPDLOC(SMGR,
"ProofServMgr::ParseCreateBuffer")
1700 char *buf = p->
Argp()->buff;
1704 tag.assign(buf,0,len-1);
1706 TRACEP(p, DBG,
"received buf: "<<tag);
1708 tag.erase(tag.find(
'|'));
1709 xps->
SetTag(tag.c_str());
1710 TRACEP(p, DBG,
"tag: "<<tag);
1715 ord.assign(buf,0,len-1);
1716 int iord = ord.find(
"|ord:");
1717 if (iord != STR_NPOS) {
1718 ord.erase(0,iord+5);
1719 ord.erase(ord.find(
"|"));
1726 cffile.assign(buf,0,len-1);
1727 int icf = cffile.find(
"|cf:");
1728 if (icf != STR_NPOS) {
1729 cffile.erase(0,icf+4);
1730 cffile.erase(cffile.find(
"|"));
1735 XrdOucString plitenwk;
1736 plitenwk.assign(buf,0,len-1);
1737 int inwk = plitenwk.find(
"|plite:");
1738 if (inwk != STR_NPOS) {
1739 plitenwk.erase(0,inwk+7);
1740 plitenwk.erase(plitenwk.find(
"|"));
1741 int nwk = plitenwk.atoi();
1744 TRACEP(p, DBG,
"P-Lite master with "<<nwk<<
" workers (0 means # or cores)");
1749 uenvs.assign(buf,0,len-1);
1750 int ienv = uenvs.find(
"|envs:");
1751 if (ienv != STR_NPOS) {
1752 uenvs.erase(0,ienv+6);
1753 uenvs.erase(uenvs.find(
"|"));
1760 if (uenvs.length() > 0) {
1761 TRACEP(p, DBG,
"user envs: "<<uenvs);
1763 if ((iiw = uenvs.find(
"PROOF_INTWAIT=")) != STR_NPOS) {
1764 XrdOucString s(uenvs, iiw + strlen(
"PROOF_INTWAIT="));
1765 s.erase(s.find(
','));
1768 TRACEP(p, ALL,
"startup internal wait set by user to "<<intwait);
1779 XPDLOC(SMGR,
"ProofServMgr::Create")
1781 int psid = -1, rc = 0;
1792 XrdSysMutexHelper mhp(
fMutex);
1794 TRACEP(p,ALL,
" cursess: "<<cursess);
1795 if (mxsess <= cursess) {
1796 XPDFORM(msg,
" ++++ Max number of sessions reached (%d) - please retry later ++++ \n", cursess);
1797 response->Send(kXR_attn,
kXPD_srvmsg, (
char *) msg.c_str(), msg.length());
1809 TRACEP(p, DBG, nc <<
" threads are creating a new session");
1822 XrdOucString tag, ord, cffile, uenvs;
1826 TRACEP(p, DBG,
"{ord,cfg,psid,cid,log}: {"<<ord<<
","<<cffile<<
","<<psid
1827 <<
","<<p->
CID()<<
","<<loglevel<<
"}");
1835 response->Send(
kXP_ServerError,
"timed-out acquiring fork semaphore");
1845 "unable to create pipes for communication during setup");
1850 ProofServEnv_t in = {xps, loglevel, cffile.c_str(),
"",
"", tag.c_str(),
"",
"", 1};
1855 TRACEP(p, FORK,
"Forking external proofsrv");
1856 if (!(pid =
fMgr->
Sched()->Fork(
"proofsrv"))) {
1869 XrdOucString pmsg =
"*** spawned child process ";
1870 pmsg += (
int) getpid();
1876 TRACE(XERR,
"chown on '"<<in.
fLogFile.c_str()<<
"'; errno: "<<errno);
1879 XrdOucString path, sockpath, emsg;
1882 if (fpc.
Poll() < 0) {
1883 TRACE(XERR,
"error while polling to receive the admin path from parent - EXIT" );
1886 if (fpc.
Recv(xmsg) != 0) {
1887 TRACE(XERR,
"error reading message while waiting for the admin path from parent - EXIT" );
1890 if (xmsg.
Type() < 0) {
1891 TRACE(XERR,
"the parent failed to setup the admin path - EXIT" );
1897 TRACE(FORK,
"admin path: "<<path);
1901 if (fpc.
Poll() < 0) {
1902 TRACE(XERR,
"error while polling to receive the sock path from parent - EXIT" );
1905 if (fpc.
Recv(xmsg) != 0) {
1906 TRACE(XERR,
"error reading message while waiting for the sock path from parent - EXIT" );
1909 if (xmsg.
Type() < 0) {
1910 TRACE(XERR,
"the parent failed to setup the sock path - EXIT" );
1914 sockpath = xmsg.
Buf();
1916 TRACE(FORK,
"UNIX sock path: "<<sockpath);
1919 bool asserdatadir = 1;
1921 TRACE(ALL,
"srvtype = "<< srvtype);
1927 const char *pord = asserdatadir ? ord.c_str() : 0;
1928 const char *ptag = asserdatadir ? in.
fSessionTag.c_str() : 0;
1930 emsg =
"SetUserOwnerships did not return OK - EXIT";
1932 if (fcp.
Post(0, emsg.c_str()) != 0)
1933 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1939 emsg =
"SetUserEnvironment did not return OK - EXIT";
1941 if (fcp.
Post(0, emsg.c_str()) != 0)
1942 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1946 char *argvv[7] = {0};
1950 emsg =
"XrdProofdManager instance undefined!";
1952 if (fcp.
Post(0, emsg.c_str()) != 0)
1953 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1959 size_t len = strlen(
fMgr->
AdminPath()) + strlen(
"xpdpath:") + 1;
1960 sxpd =
new char[len];
1964 sxpd =
new char[10];
1965 snprintf(sxpd, 10,
"%d", getppid());
1969 char slog[10] = {0};
1970 snprintf(slog, 10,
"%d", loglevel);
1973 char ssrv[10] = {0};
1980 argvv[2] = (
char *)
"xpd";
1981 argvv[3] = (
char *)sxpd;
1982 argvv[4] = (
char *)slog;
1983 argvv[5] = (
char *)ssrv;
1988 emsg =
"SetProofServEnv did not return OK - EXIT";
1990 if (fcp.
Post(0, emsg.c_str()) != 0)
1991 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1994 TRACE(FORK, (
int)getpid() <<
": proofserv env set up");
1999 TRACE(XERR,
"cannot write log file path to internal pipe; errno: "<<errno);
2002 TRACE(FORK, (
int)getpid()<<
": log file path communicated");
2006 sigemptyset(&myset);
2007 sigaddset(&myset, SIGUSR1);
2008 sigaddset(&myset, SIGUSR2);
2009 pthread_sigmask(SIG_UNBLOCK, &myset, 0);
2016 ", uid: "<<getuid()<<
", euid:"<<geteuid()<<
2017 ", psrv: "<<xps->
ROOT()->
PrgmSrv()<<
", argvv[1]: "<<argvv[1]);
2022 TRACE(XERR,
"returned from execv: bad, bad sign !!! errno:" << (
int)errno);
2037 TRACEP(p, FORK,
"Parent process: child is "<<pid);
2060 XrdOucString path, sockpath;
2065 struct sockaddr_un unserver;
2066 if (sockpath.length() > (
int)(
sizeof(unserver.sun_path) - 1)) {
2067 emsg =
"socket path very long (";
2068 emsg += sockpath.length();
2069 emsg +=
"): this may lead to stack corruption!";
2070 emsg +=
" Use xpd.sockpathdir to change it";
2071 TRACEP(p, XERR, emsg.c_str());
2076 if ((pathrc = fpc.
Post(0, path.c_str())) != 0) {
2077 emsg =
"failed to communicating path to child";
2079 TRACEP(p, XERR, emsg.c_str());
2082 emsg =
"failed to setup child admin path";
2084 if ((pathrc = fpc.
Post(-1, path.c_str())) != 0) {
2085 emsg +=
": failed communicating failure to child";
2087 TRACEP(p, XERR, emsg.c_str());
2095 emsg =
"failure creating UNIX socket on " ;
2098 TRACEP(p, XERR, emsg.c_str());
2104 emsg =
"failure changing ownership of the UNIX socket on " ;
2106 emsg +=
"; errno: " ;
2109 TRACEP(p, XERR, emsg.c_str());
2115 if ((pathrc = fpc.
Post(0, sockpath.c_str())) != 0) {
2116 emsg =
"failed to communicating path to child";
2118 TRACEP(p, XERR, emsg.c_str());
2121 emsg =
"failed to setup child admin path";
2123 if ((pathrc = fpc.
Post(-1, sockpath.c_str())) != 0) {
2124 emsg +=
": failed communicating failure to child";
2126 TRACEP(p, XERR, emsg.c_str());
2137 emsg.insert(npfx, 0);
2142 TRACEP(p, FORK,
"waiting for client setup status ...");
2144 emsg =
"proofserv setup";
2148 int ntry = 10, prc = 0, rst = -1;
2149 while (prc == 0 && ntry--) {
2151 if ((prc = fcp.
Poll(2)) > 0) {
2154 if (fcp.
Recv(xmsg) != 0) {
2155 emsg =
"error receiving message from pipe";
2157 TRACEP(p, XERR, emsg.c_str());
2164 XrdOucString xbuf = xmsg.
Buf();
2165 if (xbuf.length() <= 0) {
2166 emsg =
"error reading buffer {logfile, error message} from message received on the pipe";
2168 TRACEP(p, XERR, emsg.c_str());
2176 XrdOucString stag(xbuf);
2177 stag.erase(stag.rfind(
'/'));
2178 stag.erase(0, stag.find(
"session-") + strlen(
"session-"));
2179 xps->
SetTag(stag.c_str());
2187 TRACEP(p, XERR, emsg.c_str());
2191 }
else if (prc < 0) {
2192 emsg =
"error receive status-of-setup from pipe";
2194 TRACEP(p, XERR, emsg.c_str());
2197 TRACEP(p, FORK,
"receiving status-of-setup from pipe: waiting 2 s ..."<<pid);
2210 emsg =
"failure setting up proofserv" ;
2211 if (prc == 0) emsg +=
": timed-out receiving status-of-setup from pipe";
2220 TRACEP(p, XERR, emsg.c_str());
2221 emsg.insert(npfx, 0);
2233 (
void *) info.c_str(), info.length());
2237 TRACEP(p, FORK,
"server launched: wait for callback ");
2244 emsg =
"problems accepting callback: ";
2247 emsg +=
"process could not be killed - pid: ";
2249 emsg +=
"process killed - pid: ";
2256 TRACEP(p, XERR, emsg.c_str());
2257 emsg.insert(npfx, 0);
2258 response->Send(kXR_attn,
kXPD_errmsg, (
char *) emsg.c_str(), emsg.length());
2268 TRACEP(p, XERR,
"problems changing child process priority");
2269 }
else if (dp > 0) {
2270 TRACEP(p, DBG,
"priority of the child process changed by " << dp <<
" units");
2274 TRACEP(p, FORK,
"xps: "<<xps<<
", ClientID: "<<(
int *)cid<<
" (sid: "<<sid<<
")"<<
" NClients: "<<xps->
GetNClients(1));
2278 TRACEP(p, REQ,
"problems recording session in sandbox");
2284 XrdOucString key; key += pid;
2285 { XrdSysMutexHelper mh(
fMutex);
2286 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2294 TRACEP(p, XERR,
"PROOF session is invalid: protocol error? " <<emsg);
2309 bool assert = (pid > 0) ? 1 : 0;
2312 if (pid > 0) path += pid;
2314 XPDFORM(emsg,
"failure setting admin path '%s'", path.c_str());
2327 unsigned int seq, XrdOucString &emsg)
2329 XPDLOC(SMGR,
"ProofServMgr::CreateSockPath")
2331 XrdOucString sockpath;
2334 TRACEP(p, ALL,
"socket path: " << sockpath);
2335 struct sockaddr_un unserver;
2336 if (sockpath.length() > (
int)(
sizeof(unserver.sun_path) - 1)) {
2337 XPDFORM(emsg,
"socket path very long (%d): this may lead to stack corruption! ", sockpath.length());
2344 XPDFORM(emsg,
"failure creating UNIX socket on '%s'", sockpath.c_str());
2347 if (chmod(sockpath.c_str(), 0755) != 0) {
2348 XPDFORM(emsg,
"failure changing permissions of the UNIX socket on '%s'; errno: %d",
2349 sockpath.c_str(), (
int)errno);
2362 XPDLOC(SMGR,
"ProofServMgr::SendErrLog")
2364 XrdOucString emsg(
"An error occured: the content of errlog follows:");
2365 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) emsg.c_str(), emsg.length());
2366 emsg =
"------------------------------------------------\n";
2367 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2369 int ierr = open(errlog, O_RDONLY);
2371 XPDFORM(emsg,
"cannot open '%s' (errno: %d)", errlog, errno);
2372 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2376 if (fstat(ierr, &st) != 0) {
2377 XPDFORM(emsg,
"cannot stat '%s' (errno: %d)", errlog, errno);
2378 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2382 off_t len = st.st_size;
2383 TRACE(ALL,
" reading "<<len<<
" bytes from "<<errlog);
2384 ssize_t chunk = 2048, nb, nr;
2388 nb = (left > chunk) ? chunk : left;
2389 if ((nr = read(ierr, buf, nb)) < 0) {
2390 XPDFORM(emsg,
"problems reading from '%s' (errno: %d)", errlog, errno);
2391 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2400 emsg =
"------------------------------------------------";
2401 r->Send(kXR_attn,
kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2412 XPDLOC(SMGR,
"ProofServMgr::ResolveSession")
2414 TRACE(REQ,
"resolving "<< (fpid ? fpid :
"<nul>")<<
" ...");
2432 TRACE(DBG,
"session does not support recovering: protocol "
2441 TRACE(DBG,
"client instance not initialized");
2449 TRACE(DBG,
"server object not initialized");
2467 std::list<XpdClientSessions *>::iterator ii =
fRecoverClients->begin();
2469 if ((*ii)->fClient ==
c)
2474 (*ii)->fProofServs.push_back(xps);
2490 XPDLOC(SMGR,
"ProofServMgr::Recover")
2493 TRACE(XERR,
"invalid input!");
2511 if (emsg ==
"timeout") {
2512 TRACE(DBG,
"timeout while accepting callback");
2514 TRACE(XERR,
"problems accepting callback: "<<emsg);
2518 XrdOucString key; key += xps->
SrvPID();
2519 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2532 " successfully recovered ("<<left<<
" left); pid: "<<pid);
2548 int to, XrdOucString &msg)
2550 XPDLOC(SMGR,
"ProofServMgr::AcceptPeer")
2553 XrdNetPeer peerpsrv;
2557 XPDFORM(msg,
"session pointer undefined or socket invalid: %p", xps);
2560 TRACE(REQ,
"waiting for server callback for "<<to<<
" secs ... on "<<xps->
UNIXSockPath());
2563 if (!(xps->
UNIXSock()->Accept(peerpsrv, XRDNET_NODNTRIM, to))) {
2570 msg =
"could not assert connected peer: ";
2584 XPDLOC(SMGR,
"ProofServMgr::SetupProtocol")
2587 XrdLink *linkpsrv = 0;
2593 if (peerpsrv.InetName)
free(peerpsrv.InetName);
2594 peerpsrv.InetName = XrdSysDNS::getHostName(
"localhost");
2597 if (!(linkpsrv = XrdLink::Alloc(peerpsrv, lnkopts))) {
2598 msg =
"could not allocate network object: ";
2604 peerpsrv.InetBuff = 0;
2605 TRACE(DBG,
"connection accepted: matching protocol ... ");
2608 if (!(xp = p->
Match(linkpsrv))) {
2609 msg =
"match failed: protocol error: ";
2621 if (xp->Process(linkpsrv) != 0) {
2622 msg =
"handshake with internal link failed: ";
2629 msg =
"could not attach new internal link to poller: ";
2641 linkpsrv->setProtocol(xp);
2643 TRACE(REQ,
"Protocol "<<xp<<
" attached to link "<<linkpsrv<<
" ("<< peerpsrv.InetName <<
")");
2663 int to, XrdOucString &msg)
2665 XPDLOC(SMGR,
"ProofServMgr::AcceptPeer")
2671 if (!xps || !xps->UNIXSock()) {
2672 XPDFORM(msg,
"session pointer undefined or socket invalid: %p", xps);
2675 TRACE(REQ,
"waiting for server callback for "<<to<<
" secs ... on "<<xps->
UNIXSockPath());
2678 if (!(xps->
UNIXSock()->Accept(netaddr, 0, to))) {
2685 msg =
"could not assert connected peer: ";
2699 XPDLOC(SMGR,
"ProofServMgr::SetupProtocol")
2702 XrdLink *linkpsrv = 0;
2708 if (!(linkpsrv = XrdLink::Alloc(netaddr, lnkopts))) {
2709 msg =
"could not allocate network object: ";
2714 TRACE(DBG,
"connection accepted: matching protocol ... ");
2717 if (!(xp = p->
Match(linkpsrv))) {
2718 msg =
"match failed: protocol error: ";
2730 if (xp->Process(linkpsrv) != 0) {
2731 msg =
"handshake with internal link failed: ";
2738 msg =
"could not attach new internal link to poller: ";
2750 linkpsrv->setProtocol(xp);
2752 TRACE(REQ,
"Protocol "<<xp<<
" attached to link "<<linkpsrv<<
" ("<< netaddr.Name() <<
")");
2771 XPDLOC(SMGR,
"ProofServMgr::Detach")
2773 int psid = -1, rc = 0;
2778 TRACEP(p, REQ,
"psid: "<<psid);
2783 TRACEP(p, XERR,
"session ID not found: "<<psid);
2784 response->Send(kXR_InvalidRequest,
"session ID not found");
2800 XPDLOC(SMGR,
"ProofServMgr::Destroy")
2802 int psid = -1, rc = 0;
2807 TRACEP(p, REQ,
"psid: "<<psid);
2816 TRACEP(p, XERR,
"reference session ID not found");
2817 response->Send(kXR_InvalidRequest,
"reference session ID not found");
2822 XPDFORM(msg,
"all sessions destroyed by %s", p->
Link()->ID);
2844 XPDLOC(SMGR,
"WriteSessEnvs")
2848 XpdWriteEnv_t *xwe = (XpdWriteEnv_t *)s;
2850 if (env && xwe && xwe->fMgr && xwe->fClient && xwe->
fEnv) {
2851 if (env->
fEnv.length() > 0) {
2853 xwe->fMgr->ResolveKeywords(env->
fEnv, xwe->fClient);
2855 char *ev =
new char[env->
fEnv.length()+1];
2856 strncpy(ev, env->
fEnv.c_str(), env->
fEnv.length());
2857 ev[env->
fEnv.length()] = 0;
2858 fprintf(xwe->fEnv,
"%s\n", ev);
2860 PutEnv(ev, xwe->fExport);
2865 emsg =
"some input undefined";
2869 TRACE(XERR,
"protocol error: "<<emsg);
2879 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnvOld")
2884 if (!p || !p->
Client() || !input) {
2885 TRACE(XERR,
"at leat one input is invalid - cannot continue");
2891 TRACE(XERR,
"problems setting basic environment - exit");
2900 TRACE(XERR,
"unable to get instance of proofserv proxy");
2903 int psid = xps->
ID();
2910 size_t len = strlen(
"ROOTPROOFSESSDIR=") + in->
fWrkDir.length() + 2;
2917 len = strlen(
"ROOTPROOFLOGLEVEL=") + 5;
2924 len = strlen(
"ROOTPROOFORDINAL=")+strlen(xps->
Ordinal()) + 2;
2931 len = strlen(
"ROOTVERSIONTAG=")+strlen(p->
Client()->
ROOT()->
Tag())+2;
2938 TRACE(DBG,
"creating env file");
2939 XrdOucString envfile = in->
fWrkDir;
2941 FILE *fenv = fopen(envfile.c_str(),
"w");
2944 "unable to open env file: "<<envfile);
2947 TRACE(DBG,
"environment file: "<< envfile);
2953 XrdOucString secenvs(getenv(
"XrdSecENVS"));
2954 if (secenvs.length() > 0) {
2958 while ((from = secenvs.tokenize(env, from,
',')) != -1) {
2959 if (env.length() > 0) {
2961 ev =
new char[env.length()+1];
2962 strncpy(ev, env.c_str(), env.length());
2963 ev[env.length()] = 0;
2965 fprintf(fenv,
"%s\n", ev);
2972 XrdSecCredentials *creds = p->
AuthProt()->getCredentials();
2974 len = strlen(
"XrdSecCREDS=")+creds->size;
2975 ev =
new char[len + 1];
2976 strcpy(ev,
"XrdSecCREDS=");
2977 memcpy(ev + strlen(
"XrdSecCREDS="), creds->buffer, creds->size);
2980 TRACE(DBG,
"XrdSecCREDS set");
2982 XrdOucString credsdir = udir;
2983 credsdir +=
"/.creds";
2987 TRACE(DBG,
"problems in saving authentication creds under "<<credsdir);
2990 TRACE(XERR,
"unable to create creds dir: "<<credsdir);
2999 fprintf(fenv,
"ROOTSYS=%s\n", xps->
ROOT()->
Dir());
3002 fprintf(fenv,
"ROOTCONFDIR=%s\n", xps->
ROOT()->
Dir());
3005 fprintf(fenv,
"ROOTTMPDIR=%s\n",
fMgr->
TMPdir());
3008 fprintf(fenv,
"ROOTXPDPORT=%d\n",
fMgr->
Port());
3011 fprintf(fenv,
"ROOTPROOFWORKDIR=%s\n", udir.c_str());
3014 fprintf(fenv,
"ROOTPROOFSESSIONTAG=%s\n", in->
fSessionTag.c_str());
3018 fprintf(fenv,
"ROOTUSEUSERCFG=1\n");
3021 fprintf(fenv,
"ROOTOPENSOCK=%s\n", xps->
UNIXSockPath());
3024 fprintf(fenv,
"ROOTENTITY=%s@%s\n", p->
Client()->
User(), p->
Link()->Host());
3027 fprintf(fenv,
"ROOTSESSIONID=%d\n", psid);
3030 fprintf(fenv,
"ROOTCLIENTID=%d\n", p->
CID());
3033 fprintf(fenv,
"ROOTPROOFCLNTVERS=%d\n", p->
ProofProtocol());
3036 fprintf(fenv,
"ROOTPROOFORDINAL=%s\n", xps->
Ordinal());
3039 if (getenv(
"ROOTVERSIONTAG"))
3040 fprintf(fenv,
"ROOTVERSIONTAG=%s\n", getenv(
"ROOTVERSIONTAG"));
3043 if (in->
fCfg.length() > 0)
3044 fprintf(fenv,
"ROOTPROOFCFGFILE=%s\n", in->
fCfg.c_str());
3047 fprintf(fenv,
"ROOTPROOFLOGFILE=%s\n", in->
fLogFile.c_str());
3054 XrdOucHash<XpdEnv> sessenvs;
3059 if (envmatch >= 0) {
3060 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3064 if (envmatch > envmtcex) {
3067 sessenvs.Rep(env->
fName.c_str(), env, 0, Hash_keepdata);
3072 sessenvs.Add(env->
fName.c_str(), env, 0, Hash_keepdata);
3074 TRACE(HDBG,
"Adding: "<<(*ienvs).fEnv);
3087 XrdOucString env, namelist;
3088 int from = 0, ieq = -1;
3089 while ((from = ue.tokenize(env, from,
',')) != -1) {
3090 if (env.length() > 0 && (ieq = env.find(
'=')) != -1) {
3093 ev =
new char[env.length()+1];
3094 strncpy(ev, env.c_str(), env.length());
3095 ev[env.length()] = 0;
3097 fprintf(fenv,
"%s\n", ev);
3100 if (namelist.length() > 0)
3106 len = strlen(
"PROOF_ALLVARS=") + namelist.length() + 2;
3108 snprintf(ev, len,
"PROOF_ALLVARS=%s", namelist.c_str());
3110 fprintf(fenv,
"%s\n", ev);
3118 TRACE(DBG,
"creating symlink");
3119 XrdOucString syml = udir;
3121 syml +=
"/last-worker-session";
3123 syml +=
"/last-master-session";
3125 TRACE(XERR,
"problems creating symlink to last session (errno: "<<errno<<
")");
3138 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnv")
3143 TRACE(REQ,
"ROOT dir: "<< (
r ?
r->Dir() :
"*** undef ***"));
3146 char *libdir = (
char *)
r->LibDir();
3149 len = 32 + strlen(libdir) + strlen(mgr->
BareLibPath());
3150 ldpath =
new char[len];
3153 len = 32 + strlen(libdir);
3154 ldpath =
new char[len];
3159 char *rootsys = (
char *)
r->Dir();
3160 len = 15 + strlen(rootsys);
3162 snprintf(ev, len,
"ROOTSYS=%s", rootsys);
3166 char *bindir = (
char *)
r->BinDir();
3167 len = 15 + strlen(bindir);
3169 snprintf(ev, len,
"ROOTBINDIR=%s", bindir);
3173 char *confdir = (
char *)
r->DataDir();
3174 len = 20 + strlen(confdir);
3176 snprintf(ev, len,
"ROOTCONFDIR=%s", confdir);
3180 len = 20 + strlen(mgr->
TMPdir());
3190 TRACE(XERR,
"XrdROOT instance undefined!");
3198 const char *sessiondir,
3200 XrdOucString &outfn)
3203 XrdOucString ord = xps->
Ordinal();
3207 if (host.find(
".") != STR_NPOS)
3208 host.erase(host.find(
"."));
3211 else role =
"master";
3216 XPDFORM(outfn,
"%s/%s-%s-%s.%s",
3230 XrdOucString &sesstag, XrdOucString &topsesstag,
3231 XrdOucString &sessiondir, XrdOucString &sesswrkdir)
3233 XPDLOC(SMGR,
"GetTagDirs")
3242 if (host.find(
".") != STR_NPOS)
3243 host.erase(host.find(
"."));
3244 XPDFORM(sesstag,
"%s-%d-", host.c_str(), (
int)time(0));
3249 sessiondir +=
"/session-";
3250 sessiondir += sesstag;
3251 topsesstag = sesstag;
3254 sessiondir += xps->
Tag();
3255 topsesstag = xps->
Tag();
3256 topsesstag.replace(
"session-",
"");
3260 TRACE(XERR,
"problems asserting dir '"<<sessiondir<<
"' - errno: "<<errno);
3265 }
else if (pid > 0) {
3272 topsesstag = sesstag;
3274 xps->
SetTag(sesstag.c_str());
3278 if (pid == (
int) getpid()) {
3286 sesswrkdir = sessiondir;
3288 XPDFORM(sesswrkdir,
"%s/worker-%s-%s", sessiondir.c_str(), xps->
Ordinal(), sesstag.c_str());
3290 XPDFORM(sesswrkdir,
"%s/master-%s-%s", sessiondir.c_str(), xps->
Ordinal(), sesstag.c_str());
3293 TRACE(XERR,
"negative pid ("<<pid<<
"): should not have got here!");
3305 XPDLOC(SMGR,
"WriteSessRCs")
3308 FILE *frc = (FILE *)
f;
3310 XrdOucString rc = erc->
fEnv;
3311 if (rc.length() > 0) {
3312 if (rc.find(
"Proof.DataSetManager") != STR_NPOS) {
3313 TRACE(ALL,
"Proof.DataSetManager ignored: use xpd.datasetsrc to define dataset managers");
3315 fprintf(frc,
"%s\n", rc.c_str());
3321 emsg =
"file or input entry undefined";
3325 TRACE(XERR,
"protocol error: "<<emsg);
3334 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnv")
3337 if (!p || !p->
Client() || !input) {
3338 TRACE(XERR,
"at leat one input is invalid - cannot continue");
3344 TRACE(DBG,
"rootvers: "<< rootvers);
3345 if (rootvers < 14 && rootvers > -1)
3353 TRACE(XERR,
"unable to get instance of proofserv proxy");
3356 int psid = xps->
ID();
3375 TRACE(XERR,
"problems setting basic environment - exit");
3380 TRACE(DBG,
"creating rc and env files");
3381 XrdOucString rcfile, envfile;
3384 TRACE(XERR,
"problems creating RC file "<<rcfile.c_str());
3390 TRACE(XERR,
"problems creating environment file "<<envfile.c_str());
3396 TRACE(REQ,
"creating symlink");
3397 XrdOucString syml = udir;
3399 syml +=
"/last-worker-session";
3401 syml +=
"/last-master-session";
3403 TRACE(XERR,
"problems creating symlink to "
3404 " last session (errno: "<<errno<<
")");
3418 const char *envfn,
const char *rcfn)
3420 XPDLOC(SMGR,
"ProofServMgr::CreateProofServEnvFile")
3423 if (!p || !input || (!envfn ||
3424 (envfn && strlen(envfn) <= 0)) || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3425 TRACE(XERR,
"invalid inputs!");
3435 TRACE(XERR,
"unable to get instance of proofserv proxy");
3439 FILE *fenv = fopen(envfn,
"w");
3441 TRACE(XERR,
"unable to open env file: "<<envfn);
3444 TRACE(REQ,
"environment file: "<< envfn);
3452 XrdOucString secenvs(getenv(
"XrdSecENVS"));
3453 if (secenvs.length() > 0) {
3457 while ((from = secenvs.tokenize(env, from,
',')) != -1) {
3458 if (env.length() > 0) {
3460 ev =
new char[env.length()+1];
3461 strncpy(ev, env.c_str(), env.length());
3462 ev[env.length()] = 0;
3463 fprintf(fenv,
"%s\n", ev);
3471 XrdSecCredentials *creds = p->
AuthProt()->getCredentials();
3473 int lev = strlen(
"XrdSecCREDS=") + creds->size;
3474 ev =
new char[lev+1];
3475 strncpy(ev,
"XrdSecCREDS=", lev);
3476 memcpy(ev+strlen(
"XrdSecCREDS="), creds->buffer, creds->size);
3479 TRACE(DBG,
"XrdSecCREDS set");
3483 credsdir +=
"/.creds";
3487 TRACE(DBG,
"problems in saving authentication creds under "<<credsdir);
3490 TRACE(XERR,
"unable to create creds dir: "<<credsdir);
3502 fprintf(fenv,
"ROOTSYS=%s\n", xps->
ROOT()->
Dir());
3505 fprintf(fenv,
"ROOTCONFDIR=%s\n", xps->
ROOT()->
Dir());
3508 fprintf(fenv,
"TMPDIR=%s\n",
fMgr->
TMPdir());
3512 len = strlen(
"ROOTRCFILE=") + strlen(rcfn) + 2;
3514 snprintf(ev, len,
"ROOTRCFILE=%s", rcfn);
3515 fprintf(fenv,
"%s\n", ev);
3521 len = strlen(
"ROOTVERSIONTAG=") + strlen(p->
Client()->
ROOT()->
Tag()) + 2;
3524 fprintf(fenv,
"%s\n", ev);
3530 len = strlen(
"ROOTPROOFLOGFILE=") + in->
fLogFile.length() + 2;
3533 fprintf(fenv,
"%s\n", ev);
3540 XrdOucString locdatasrv;
3544 TRACE(HDBG, nrk <<
" placeholders resolved for LOCALDATASERVER");
3545 len = strlen(
"LOCALDATASERVER=") + locdatasrv.length() + 2;
3547 snprintf(ev, len,
"LOCALDATASERVER=%s", locdatasrv.c_str());
3548 fprintf(fenv,
"%s\n", ev);
3554 len = strlen(
"XRDCF=") + strlen(
CfgFile()) + 2;
3557 fprintf(fenv,
"%s\n", ev);
3566 XrdOucHash<XpdEnv> sessenvs;
3571 if (envmatch >= 0) {
3572 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3576 if (envmatch > envmtcex) {
3579 sessenvs.Rep(env->
fName.c_str(), env, 0, Hash_keepdata);
3584 sessenvs.Add(env->
fName.c_str(), env, 0, Hash_keepdata);
3586 TRACE(HDBG,
"Adding: "<<(*ienvs).fEnv);
3598 XrdOucString env, namelist;
3599 int from = 0, ieq = -1;
3600 while ((from = ue.tokenize(env, from,
',')) != -1) {
3601 if (env.length() > 0 && (ieq = env.find(
'=')) != -1) {
3604 ev =
new char[env.length()+1];
3605 strncpy(ev, env.c_str(), env.length());
3606 ev[env.length()] = 0;
3607 if (env.find(
"WRAPPERCMD") == STR_NPOS || !xps->
IsPLite())
3608 fprintf(fenv,
"%s\n", ev);
3611 if (env.find(
"WRAPPERCMD") == STR_NPOS || !xps->
IsPLite()) {
3613 if (namelist.length() > 0)
3620 len = strlen(
"PROOF_ALLVARS=") + namelist.length() + 2;
3622 snprintf(ev, len,
"PROOF_ALLVARS=%s", namelist.c_str());
3623 fprintf(fenv,
"%s\n", ev);
3640 void *input,
const char *rcfn)
3642 XPDLOC(SMGR,
"ProofServMgr::CreateProofServRootRc")
3645 if (!p || !input || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3646 TRACE(XERR,
"invalid inputs!");
3656 TRACE(XERR,
"unable to get instance of proofserv proxy");
3659 int psid = xps->
ID();
3661 FILE *frc = fopen(rcfn,
"w");
3663 TRACE(XERR,
"unable to open rootrc file: "<<rcfn);
3669 TRACE(XERR,
"problems creating symlink to 'session.rootrc' (errno: "<<errno<<
")");
3672 TRACE(REQ,
"session rootrc file: "<< rcfn);
3675 fprintf(frc,
"# XrdProofdProtocol listening port\n");
3676 fprintf(frc,
"ProofServ.XpdPort: %d\n",
fMgr->
Port());
3680 fprintf(frc,
"# Prefix to be prepended to local paths\n");
3687 if (!purl.endswith(
"/"))
3689 fprintf(frc,
"# URL for the data pool entry-point\n");
3690 fprintf(frc,
"ProofServ.PoolUrl: %s\n", purl.c_str());
3695 fprintf(frc,
"# The session working dir\n");
3696 fprintf(frc,
"ProofServ.SessionDir: %s\n", in->
fWrkDir.c_str());
3700 fprintf(frc,
"# Proof Log/Debug level\n");
3701 fprintf(frc,
"Proof.DebugLevel: %d\n", in->
fLogLevel);
3704 fprintf(frc,
"# Ordinal number\n");
3705 fprintf(frc,
"ProofServ.Ordinal: %s\n", xps->
Ordinal());
3709 fprintf(frc,
"# ROOT Version tag\n");
3710 fprintf(frc,
"ProofServ.RootVersionTag: %s\n", p->
Client()->
ROOT()->
Tag());
3714 fprintf(frc,
"# Proof group\n");
3715 fprintf(frc,
"ProofServ.ProofGroup: %s\n", p->
Client()->
Group());
3720 fprintf(frc,
"# File with group information\n");
3726 fprintf(frc,
"# Users sandbox\n");
3727 fprintf(frc,
"ProofServ.Sandbox: %s\n", udir.c_str());
3731 fprintf(frc,
"# Server image\n");
3732 fprintf(frc,
"ProofServ.Image: %s\n",
fMgr->
Image());
3737 fprintf(frc,
"# Session tag\n");
3738 fprintf(frc,
"ProofServ.SessionTag: %s\n", in->
fSessionTag.c_str());
3739 fprintf(frc,
"# Top Session tag\n");
3740 fprintf(frc,
"ProofServ.TopSessionTag: %s\n", in->
fTopSessionTag.c_str());
3744 fprintf(frc,
"# Session admin path\n");
3746 if (proofvrs < 0 || proofvrs < 27) {
3748 fprintf(frc,
"ProofServ.AdminPath: %s\n", xps->
AdminPath());
3752 fprintf(frc,
"ProofServ.AdminPath: %s.status\n", xps->
AdminPath());
3758 fprintf(frc,
"# Whether user specific config files are enabled\n");
3759 fprintf(frc,
"ProofServ.UseUserCfg: 1\n");
3762 fprintf(frc,
"# Open socket\n");
3763 fprintf(frc,
"ProofServ.OpenSock: %s\n", xps->
UNIXSockPath());
3765 fprintf(frc,
"# Entity\n");
3767 fprintf(frc,
"ProofServ.Entity: %s:%s@%s\n",
3770 fprintf(frc,
"ProofServ.Entity: %s@%s\n", p->
Client()->
User(), p->
Link()->Host());
3774 fprintf(frc,
"# Session ID\n");
3775 fprintf(frc,
"ProofServ.SessionID: %d\n", psid);
3778 fprintf(frc,
"# Client ID\n");
3779 fprintf(frc,
"ProofServ.ClientID: %d\n", p->
CID());
3782 fprintf(frc,
"# Client Protocol\n");
3783 fprintf(frc,
"ProofServ.ClientVersion: %d\n", p->
ProofProtocol());
3786 if (in->
fCfg.length() > 0) {
3787 if (in->
fCfg ==
"masteronly") {
3788 fprintf(frc,
"# MasterOnly option\n");
3790 fprintf(frc,
"Proof.MasterOnly: 1\n");
3792 fprintf(frc,
"# Config file\n");
3794 fprintf(frc,
"ProofServ.ProofConfFile: %s\n", in->
fCfg.c_str());
3797 fprintf(frc,
"# Config file\n");
3799 fprintf(frc,
"ProofServ.ProofConfFile: sm:\n");
3801 fprintf(frc,
"ProofServ.ProofConfFile: lite:\n");
3802 fprintf(frc,
"# Number of ProofLite workers\n");
3803 fprintf(frc,
"ProofLite.Workers: %d\n", xps->
PLiteNWrks());
3804 fprintf(frc,
"# Users sandbox\n");
3805 fprintf(frc,
"ProofLite.Sandbox: %s\n", udir.c_str());
3806 fprintf(frc,
"# No subpaths\n");
3807 fprintf(frc,
"ProofLite.SubPath: 0\n");
3809 fprintf(frc,
"ProofServ.ProofConfFile: %s\n",
fProofPlugin.c_str());
3815 fprintf(frc,
"# Default settings for XrdClient\n");
3816 fprintf(frc,
"XNet.FirstConnectMaxCnt 3\n");
3817 fprintf(frc,
"XNet.ConnectTimeout 5\n");
3822 fprintf(frc,
"# Force remote reading also for local files to avoid a wrong TTreeCache initialization\n");
3823 fprintf(frc,
"Path.ForceRemote 1\n");
3829 fprintf(frc,
"# Additional rootrcs (xpd.putrc directives)\n");
3831 XrdOucHash<XpdEnv> sessrcs;
3837 XpdEnv *rcenv = sessrcs.Find((*ircs).fName.c_str());
3841 if (rcmatch > rcmtcex) {
3844 sessrcs.Rep(rcenv->
fName.c_str(), rcenv, 0, Hash_keepdata);
3849 sessrcs.Add(rcenv->
fName.c_str(), rcenv, 0, Hash_keepdata);
3851 TRACE(HDBG,
"Adding: "<<(*ircs).fEnv);
3859 fprintf(frc,
"# Dataset sources\n");
3860 XrdOucString rc(
"Proof.DataSetManager: ");
3861 std::list<XrdProofdDSInfo *>::iterator ii;
3870 rc += (*ii)->fObscure;
3872 fprintf(frc,
"%s\n", rc.c_str());
3877 fprintf(frc,
"# Dataset staging requests repository\n");
3883 fprintf(frc,
"# Data directory\n");
3891 fprintf(frc,
"%s\n", rc.c_str());
3911 XPDLOC(SMGR,
"ProofServMgr::CleanupLostProofServ")
3914 TRACE(REQ,
"disabled ...");
3918 TRACE(REQ,
"checking for orphalin proofserv processes ...");
3922 std::map<int,XrdOucString> procs;
3924 TRACE(DBG,
" no proofservs around: nothing to do");
3935 XrdOucRash<int, int> controlled, xrdproc;
3938 XrdOucHash<XrdOucString> sessionspaths;
3942 XrdOucString cmd, apath, pidpath, sessiondir, emsg, rest, after;
3943 std::map<int,XrdOucString>::iterator ip;
3944 for (ip = procs.begin(); ip != procs.end(); ++ip) {
3947 if ((ia = cmd.find(
"xpdpath:")) != STR_NPOS) {
3948 cmd.tokenize(apath, ia,
' ');
3949 apath.replace(
"xpdpath:",
"");
3950 if (apath.length() <= 0) {
3951 TRACE(ALL,
"admin path not found; initial cmd line: "<<cmd);
3955 XPDFORM(pidpath,
"%s/xrootd.pid", apath.c_str());
3956 TRACE(ALL,
"pidpath: "<<pidpath);
3958 int *alive = xrdproc.Find(xpid);
3961 xrdproc.Add(xpid,
a);
3969 const char *subdir[2] = {
"activesessions",
"terminatedsessions"};
3970 for (
int i = 0; i < 2; i++) {
3971 XPDFORM(sessiondir,
"%s/%s", apath.c_str(), subdir[i]);
3972 if (!sessionspaths.Find(sessiondir.c_str())) {
3973 DIR *sdir = opendir(sessiondir.c_str());
3975 XPDFORM(emsg,
"cannot open '%s' - errno: %d", apath.c_str(), errno);
3976 TRACE(XERR, emsg.c_str());
3979 struct dirent *sent = 0;
3980 while ((sent = readdir(sdir))) {
3981 if (!strncmp(sent->d_name,
".", 1) || !strncmp(sent->d_name,
"..", 2))
3986 controlled.Add(ppid, ppid);
3989 sessionspaths.Add(sessiondir.c_str(), 0, 0, Hash_data_is_key);
3991 ok = (controlled.Find(pid)) ? 1 : ok;
3998 TRACE(ALL,
"process: "<<pid<<
" lost its controller: killing");
4021 XPDLOC(SMGR,
"ProofServMgr::CleanupProofServ")
4023 TRACE(REQ,
"all: "<<all<<
", usr: " << (usr ? usr :
"undef"));
4027 const char *pn =
"proofserv";
4034 TRACE(DBG,
"usr must be defined for all = FALSE");
4038 TRACE(DBG,
"problems getting info for user " << usr);
4046 DIR *dir = opendir(
"/proc");
4048 XrdOucString emsg(
"cannot open /proc - errno: ");
4050 TRACE(DBG, emsg.c_str());
4054 struct dirent *ent = 0;
4055 while ((ent = readdir(dir))) {
4056 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
4057 if (
DIGIT(ent->d_name[0])) {
4058 XrdOucString fn(
"/proc/", 256);
4062 FILE *ffn = fopen(fn.c_str(),
"r");
4064 XrdOucString emsg(
"cannot open file ");
4065 emsg += fn; emsg +=
" - errno: "; emsg += errno;
4070 bool xname = 1, xpid = 1, xppid = 1;
4071 bool xuid = (all) ? 0 : 1;
4074 char line[2048] = { 0 };
4075 while (fgets(
line,
sizeof(
line), ffn) &&
4076 (xname || xpid || xppid || xuid)) {
4078 if (xname && strstr(
line,
"Name:")) {
4079 if (!strstr(
line, pn))
4083 if (xpid && strstr(
line,
"Pid:")) {
4087 if (xppid && strstr(
line,
"PPid:")) {
4095 if (xuid && strstr(
line,
"Uid:")) {
4104 if (!xname && !xpid && !xppid && !xuid) {
4112 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4127 DIR *dir = opendir(
"/proc");
4129 XrdOucString emsg(
"cannot open /proc - errno: ");
4135 struct dirent *ent = 0;
4136 while ((ent = readdir(dir))) {
4137 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
4138 if (
DIGIT(ent->d_name[0])) {
4139 XrdOucString fn(
"/proc/", 256);
4143 int ffd = open(fn.c_str(), O_RDONLY);
4145 XrdOucString emsg(
"cannot open file ");
4146 emsg += fn; emsg +=
" - errno: "; emsg += errno;
4152 bool xuid = (all) ? 0 : 1;
4156 if (read(ffd, &psi,
sizeof(psinfo_t)) !=
sizeof(psinfo_t)) {
4157 XrdOucString emsg(
"cannot read ");
4158 emsg += fn; emsg +=
": errno: "; emsg += errno;
4168 if (!strstr(psi.pr_fname, pn))
4174 if (refuid == psi.pr_uid)
4178 int ppid = psi.pr_ppid;
4186 if (!xname && !xppid && !xuid) {
4193 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4205#elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
4211 if ((ern = XrdProofdAux::GetMacProcList(&pl, np)) != 0) {
4212 XrdOucString emsg(
"cannot get the process list: errno: ");
4221 if (strstr(pl[ii].kp_proc.p_comm, pn)) {
4222 if (all || (
int)(pl[ii].kp_eproc.e_ucred.cr_uid) == refuid) {
4224 int ppid = pl[ii].kp_eproc.e_ppid;
4226 if (ppid != getpid()) {
4229 if (strstr(pl[jj].kp_proc.p_comm,
"xrootd") &&
4230 pl[jj].kp_proc.p_pid == ppid) {
4243 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4260 XrdOucString cmd =
"ps ";
4264 const char *cusr = (usr && strlen(usr) && fSuperUser) ? usr : fPClient->ID();
4266 const char *cusr = (usr && strlen(usr)) ? usr : 0;
4280 cmd +=
" | grep proofserv 2>/dev/null";
4284 snprintf(cpid, 10,
"%d", getpid());
4287 XrdOucString pids =
":";
4288 FILE *fp = popen(cmd.c_str(),
"r");
4290 char line[2048] = { 0 };
4291 while (fgets(
line,
sizeof(
line), fp)) {
4293 char *px = strstr(
line,
"xpd");
4297 char *pi = strstr(px+3, cpid);
4302 TRACE(HDBG,
"found alternative parent ID: "<< ppid);
4310 from += strlen(cusr);
4318 if (!srv || (srv && !strcmp(usr, srv->
Client())))
4342 const char *ord,
const char *stag)
4344 XPDLOC(SMGR,
"ProofServMgr::SetUserOwnerships")
4346 TRACE(REQ,
"enter");
4353 std::list<XrdProofdDSInfo *>::iterator ii;
4355 TRACE(ALL,
"Checking dataset source: url:"<<(*ii)->fUrl<<
", local:"
4356 <<(*ii)->fLocal<<
", rw:"<<(*ii)->fRW);
4357 if ((*ii)->fLocal && (*ii)->fRW) {
4366 TRACE(XERR,
"problems setting permissions 0755 on: "<<
d);
4369 TRACE(XERR,
"problems asserting: "<<
d);
4372 TRACE(XERR,
"problems setting permissions 0777 on: "<<
d);
4375 TRACE(XERR,
"problems asserting: "<<
d);
4387 XrdOucString dgr, dus[3];
4391 unsigned int mode = 0755;
4395 XPDFORM(dus[1],
"%s/%s", dus[0].c_str(), ord);
4396 XPDFORM(dus[2],
"%s/%s", dus[1].c_str(), stag);
4397 for (
int i = 0; i < 3; i++) {
4400 std::ios_base::fmtflags oflags = std::cerr.flags();
4401 TRACE(XERR,
"problems setting permissions "<< oct << mode<<
" on: "<<dus[i]);
4402 std::cerr.flags(oflags);
4405 TRACE(XERR,
"problems asserting: "<<dus[i]);
4410 TRACE(XERR,
"problems setting permissions 0777 on: "<<dgr);
4413 TRACE(XERR,
"problems asserting: "<<dgr);
4422 TRACE(XERR,
"can't change ownership of "<<creds);
4440 XPDLOC(SMGR,
"ProofServMgr::SetUserEnvironment")
4442 TRACE(REQ,
"enter");
4453 char *
h =
new char[len];
4460 char *u =
new char[len];
4463 TRACE(DBG,
"set "<<u);
4467 TRACE(DBG,
"setting ACLs");
4472 TRACE(XERR,
"could not get privileges");
4500 XrdOucString key; key += pid;
4509 XPDLOC(SMGR,
"BroadcastPriority")
4511 XpdBroadcastPriority_t *bp = (XpdBroadcastPriority_t *)s;
4513 int nb = *(bp->fNBroadcast);
4520 ? bp->fGroupMgr->GetGroup(ps->
Group()) : 0;
4522 if (
g &&
g->Active() > 0) {
4523 TRACE(DBG,
"priority: "<<
g->Priority()<<
" active: "<<
g->Active());
4524 int prio = (
int) (
g->Priority() * 100);
4532 emsg =
"input entry undefined";
4536 TRACE(XERR,
"protocol error: "<<emsg);
4545 XPDLOC(SMGR,
"ProofServMgr::BroadcastClusterInfo")
4547 TRACE(REQ,
"enter");
4549 int tot = 0, act = 0;
4550 std::list<XrdProofdProofServ *>::iterator si =
fActiveSessions.begin();
4559 XPDPRT(
"tot: "<<tot<<
", act: "<<act);
4564 (*si)->SrvType() !=
kXPD_Worker) (*si)->SendClusterInfo(tot, act);
4568 TRACE(DBG,
"No master or submaster controlled by this manager");
4578 XPDLOC(SMGR,
"ProofServMgr::BroadcastPriorities")
4580 TRACE(REQ,
"enter");
4612 XrdSysMutexHelper mhp(
fMutex);
4627 XrdSysMutexHelper mhp(
fMutex);
4631 std::map<XrdProofdProtocol*,int>::iterator iter =
fDestroyTimes.begin();
4633 int rect = now - iter->second;
4635 if (p == iter->first) alive =
false;
4650 XPDLOC(SMGR,
"FreeClientID")
4652 int pid = *((
int *)s);
4661 TRACE(XERR,
"protocol error: undefined session!");
4671 XrdSysMutexHelper mhp(
fMutex);
4681 XPDLOC(SMGR,
"CountTopMasters")
4683 int *ntm = (
int *)s;
4691 emsg =
"input entry undefined";
4695 TRACE(XERR,
"protocol error: "<<emsg);
4704 XPDLOC(SMGR,
"ProofServMgr::CurrentSessions")
4706 TRACE(REQ,
"enter");
4708 XrdSysMutexHelper mhp(
fMutex);
4730 if (!isWorker && s.find(
"<logfilemst>") != STR_NPOS) {
4732 if (lfr.endswith(
".log")) lfr.erase(lfr.rfind(
".log"));
4733 s.replace(
"<logfilemst>", lfr);
4734 }
else if (isWorker && s.find(
"<logfilewrk>") != STR_NPOS) {
4736 if (lfr.endswith(
".log")) lfr.erase(lfr.rfind(
".log"));
4737 s.replace(
"<logfilewrk>", lfr);
4741 if (getenv(
"USER") && s.find(
"<user>") != STR_NPOS) {
4742 XrdOucString usr(getenv(
"USER"));
4743 s.replace(
"<user>", usr);
4747 if (getenv(
"ROOTSYS") && s.find(
"<rootsys>") != STR_NPOS) {
4748 XrdOucString rootsys(getenv(
"ROOTSYS"));
4749 s.replace(
"<rootsys>", rootsys);
4770 fID = s ? s->
ID() : -1;
4790 XPDLOC(SMGR,
"SessionInfo::FillProofServ")
4810 "' not availabe anymore: setting the default");
4824 XPDLOC(SMGR,
"SessionInfo::SaveToFile")
4831 TRACE(HDBG,
"session saved to file: "<<
file);
4834 FILE *fpid = fopen(
file,
"w");
4836 fprintf(fpid,
"%s %s\n",
fUser.c_str(),
fGroup.c_str());
4837 fprintf(fpid,
"%s\n",
fUnixPath.c_str());
4840 fprintf(fpid,
"%s\n",
fLogFile.c_str());
4843 fprintf(fpid,
"\n%s",
fUserEnvs.c_str());
4848 if (chmod(
file, 0666) != 0) {
4849 TRACE(XERR,
"could not change mode to 0666 on file "<<
4850 file<<
"; error: "<<errno);
4856 TRACE(XERR,
"session pid file cannot be (re-)created: "<<
4857 file<<
"; error: "<<errno);
4890 XPDLOC(SMGR,
"SessionInfo::ReadFromFile")
4901 FILE *fpid = fopen(
file,
"r");
4904 XrdOucString sline, t;
4906 if (fgets(
line,
sizeof(
line), fpid)) {
4909 if ((from = sline.tokenize(
fUser, from,
' ')) == -1)
4910 TRACE(XERR,
"warning: fUser: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4911 if ((from = sline.tokenize(
fGroup, from,
' ')) == -1)
4912 TRACE(XERR,
"warning: fGroup: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4914 if (fgets(
line,
sizeof(
line), fpid)) {
4918 if (fgets(
line,
sizeof(
line), fpid)) {
4922 if ((from = sline.tokenize(t, from,
' ')) == -1)
4923 TRACE(XERR,
"warning: fPid: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4925 if ((from = sline.tokenize(t, from,
' ')) == -1)
4926 TRACE(XERR,
"warning: fID: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4928 if ((from = sline.tokenize(t, from,
' ')) == -1)
4929 TRACE(XERR,
"warning: fSrvType: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4932 if (fgets(
line,
sizeof(
line), fpid)) {
4936 if ((from = sline.tokenize(
fOrdinal, from,
' ')) == -1)
4937 TRACE(XERR,
"warning: fOrdinal: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4938 if ((from = sline.tokenize(
fTag, from,
' ')) == -1)
4939 TRACE(XERR,
"warning: fTag: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4940 if ((from = sline.tokenize(
fAlias, from,
' ')) == -1)
4943 if (fgets(
line,
sizeof(
line), fpid)) {
4947 if (fgets(
line,
sizeof(
line), fpid)) {
4951 if ((from = sline.tokenize(t, from,
' ')) == -1)
4952 TRACE(XERR,
"warning: fSrvProtVers: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4954 if ((from = sline.tokenize(
fROOTTag, from,
' ')) == -1)
4955 TRACE(XERR,
"warning: fROOTTag: corrupted line? "<<
line<<
" (file: "<<
file<<
")");
4959 off_t lnow = lseek(fileno(fpid), (off_t) 0, SEEK_CUR);
4960 off_t ltot = lseek(fileno(fpid), (off_t) 0, SEEK_END);
4961 int left = (
int)(ltot - lnow);
4964 int wanted = (left > 4095) ? 4095 : left;
4965 while ((len = read(fileno(fpid),
line, wanted)) < 0 &&
4968 if (len < 0 || len < wanted) {
4976 }
while (len > 0 && left > 0);
4983 if (!stat(
file, &st))
4986 TRACE(XERR,
"session file cannot be open: "<<
file<<
"; error: "<<errno);
4991 XrdOucString fs(
file);
4993 fpid = fopen(fs.c_str(),
"r");
4996 if (fgets(
line,
sizeof(
line), fpid)) {
5003 TRACE(DBG,
"no session status file for: "<< fs<<
"; session was probably terminated");
5018 XPDLOC(SMGR,
"XpdEnv::Matches")
5022 if (
fUsers.length() > 0) {
5023 XrdOucString u(usr);
5024 if ((nmtc = u.matches(
fUsers.c_str())) == 0)
return -1;
5032 XrdOucString
g(grp);
5033 if ((nmtcg =
g.matches(
fGroups.c_str())) == 0)
return -1;
5035 nmtcg = strlen(grp);
5039 TRACE(HDBG,
fEnv <<
", u:"<<usr<<
", g:"<<grp<<
" --> nmtc: "<<nmtc);
5056 int maj = -1, min = -1, ptc = -1, xv = ver;
5061 ptc = xv - min * 256;
5066 ptc = xv - min * 100;
5069 int vc = (maj << 16) + (min << 8) + ptc;
5080 XrdOucString vmi(
"-1"), vmx(
"-1");
5083 int min = ((
fVerMin - maj * 65536) >> 8);
5084 int ptc =
fVerMin - maj * 65536 - min * 256;
5085 XPDFORM(vmi,
"%d%d%d", maj, min, ptc);
5089 int min = ((
fVerMax - maj * 65536) >> 8);
5090 int ptc =
fVerMax - maj * 65536 - min * 256;
5091 XPDFORM(vmx,
"%d%d%d", maj, min, ptc);
5093 XrdOucString u(
"allusers"),
g(
"allgroups");
5098 "} svn:["<<
fSvnMin<<
","<<
fSvnMax<<
"] vers:["<<vmi<<
","<<vmx<<
"]");
#define TRACE(Flag, Args)
R__EXTERN C unsigned int sleep(unsigned int seconds)
#define kXPD_ClientMaster
#define kXPD_MasterMaster
#define kXPD_MasterWorker
int DoDirectiveString(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for a string.
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
#define XpdBadPGuard(g, u)
static XpdManagerCron_t fManagerCron
static int WriteSessRCs(const char *, XpdEnv *erc, void *f)
Run thorugh entries to broadcast the relevant priority.
static int FreeClientID(const char *, XrdProofdProofServ *ps, void *s)
Run through entries to reset the disconnecting client slots.
static int WriteSessEnvs(const char *, XpdEnv *env, void *s)
Run thorugh entries to broadcast the relevant priority.
static int CountTopMasters(const char *, XrdProofdProofServ *ps, void *s)
Run thorugh entries to count top-masters.
static int BroadcastPriority(const char *, XrdProofdProofServ *ps, void *s)
Run thorugh entries to broadcast the relevant priority.
void * XrdProofdProofServRecover(void *p)
Waiting for session to recover after an abrupt shutdown.
void * XrdProofdProofServCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
static XpdManagerCron_t fManagerCron
#define XPD_SETRESP(p, x)
#define TRACEP(p, act, x)
XrdProofdClient * fClient
std::list< XrdProofdProofServ * > fProofServs
int Matches(const char *usr, const char *grp, int ver=-1)
Check if this env applies to 'usr', 'grp, 'ver'.
void Print(const char *what)
Print the content of this env.
void Reset(const char *n, const char *env, const char *usr=0, const char *grp=0, int smi=-1, int smx=-1, int vmi=-1, int vmx=-1)
static int ToVersCode(int ver, bool hex=0)
Transform version number ver (format patch + 100*minor + 10000*maj, e.g.
int Get(int &i)
Get next token and interpret it as an int.
void SetP(XrdProofdProtocol *p)
void SetSid(unsigned short sid)
static int Attach(XrdLink *lp)
const char * GetCfgFile() const
virtual int MaxSessions() const
void FillProofServ(XrdProofdProofServ &s, XrdROOTMgr *rmgr)
Fill 's' fields using the stored info.
int SaveToFile(const char *file)
Save content to 'file'.
void Reset()
Reset the content.
XrdProofSessionInfo(XrdProofdClient *c, XrdProofdProofServ *s)
Construct from 'c' and 's'.
int ReadFromFile(const char *file)
Read content from 'file'.
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int AssertDir(const char *path, XrdProofUI ui, bool changeown)
Make sure that 'path' exists and is owned by the entity described by 'ui'.
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table.
static long int GetLong(char *str)
Extract first integer from string at 'str', if any.
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
static int ChangeToDir(const char *dir, XrdProofUI ui, bool changeown)
Change current directory to 'dir'.
static int ChangeMod(const char *path, unsigned int mode)
Change the permission mode of 'path' to 'mode'.
static int GetProcesses(const char *pn, std::map< int, XrdOucString > *plist)
Get from the process table list of PIDs for processes named "proofserv' For {linux,...
static int GetIDFromPath(const char *path, XrdOucString &emsg)
Extract an integer from a file.
static void LogEmsgToFile(const char *flog, const char *emsg, const char *pfx=0)
Logs error message 'emsg' to file 'flog' using standard technology.
static int SymLink(const char *path, const char *link)
Create a symlink 'link' to 'path' Return 0 in case of success, -1 in case of error.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static int ParsePidPath(const char *path, XrdOucString &before, XrdOucString &after)
Parse a path in the form of "<before>[.<pid>][.<after>]", filling 'rest' and returning 'pid'.
XrdProofdClient * GetClient(const char *usr, const char *grp=0, bool create=1)
Handle request for localizing a client instance for {usr, grp} from the list.
XrdSysRecMutex * Mutex() const
void TerminateSessions(int srvtype, XrdProofdProofServ *ref, const char *msg, XrdProofdPipe *pipe, bool changeown)
Terminate client sessions; IDs of signalled processes are added to sigpid.
XrdProofdProofServ * GetFreeServObj()
Get next free server ID.
XrdProofdSandbox * Sandbox() const
const char * Group() const
const char * User() const
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
bool ReadFile(bool update=true)
Return true if the file has never been read or did change since last reading, false otherwise.
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
const char * CfgFile() const
XrdProofdPriorityMgr * PriorityMgr() const
const char * PoolURL() const
XrdProofSched * ProofSched() const
const char * SockPathDir() const
const char * DataDirUrlOpts() const
XrdROOTMgr * ROOTMgr() const
std::list< XrdProofdDSInfo * > * DataSetSrcs()
const char * DataDirOpts() const
XrdProofdNetMgr * NetMgr() const
XrdProofGroupMgr * GroupsMgr() const
XrdScheduler * Sched() const
XrdProofdClientMgr * ClientMgr() const
const char * LocalROOT() const
const char * Host() const
const char * StageReqRepo() const
const char * BareLibPath() const
const char * Image() const
const char * DataDir() const
const char * NameSpace() const
const char * TMPdir() const
const char * EffectiveUser() const
const char * AdminPath() const
int ResolveKeywords(XrdOucString &s, XrdProofdClient *pcl)
Resolve special keywords in 's' for client 'pcl'.
bool WorkerUsrCfg() const
int Recv(XpdMsg &msg)
Recv message from the pipe.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
void Close()
If open, close and invalidated the pipe descriptors.
int Post(int type, const char *msg)
Post message on the pipe.
int SetProcessPriority(int pid, const char *usr, int &dp)
Change priority of process pid belonging to user, if needed.
XrdProofdProofServ * GetActiveSession(int pid)
Return active session with process ID pid, if any.
int Attach(XrdProofdProtocol *p)
Handle a request to attach to an existing session.
void UpdateCounter(int t, int n)
int Create(XrdProofdProtocol *p)
Handle a request to create a new session.
void BroadcastClusterInfo()
Broadcast cluster info to the active sessions.
XrdProofdProofServ * PrepareProofServ(XrdProofdProtocol *p, XrdProofdResponse *r, unsigned short &sid)
Allocate and prepare the XrdProofdProofServ object describing this session.
int SetUserEnvironment(XrdProofdProtocol *p)
Set user environment: set effective user and group ID of the process to the ones of the owner of this...
XrdOucString fParentExecs
int Destroy(XrdProofdProtocol *p)
Handle a request to shutdown an existing session.
int CurrentSessions(bool recalculate=0)
Return the number of current sessions (top masters)
int DoDirectiveProofServMgr(char *, XrdOucStream *, bool)
Process 'proofswrvmgr' directive eg: xpd.proofswrvmgr checkfq:120 termto:100 verifyto:5 recoverto:20.
void ExtractEnv(char *, XrdOucStream *, XrdOucString &users, XrdOucString &groups, XrdOucString &rcval, XrdOucString &rcnam, int &smi, int &smx, int &vmi, int &vmx, bool &hex)
Extract env information from the stream 'cfg'.
int fCounters[PSMMAXCNTS]
int CheckTerminatedSessions()
Go through the terminated sessions admin path and make sure sessions they are gone.
int Detach(XrdProofdProtocol *p)
Handle a request to detach from an existing session.
int PrepareSessionRecovering()
Go through the active sessions admin path and prepare reconnection of those still alive.
int CleanupLostProofServ()
Cleanup (kill) all 'proofserv' processes which lost control from their creator or controller daemon.
int BroadcastPriorities()
Broadcast priorities to the active sessions.
XrdProofdProofServMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
int DoDirectivePutRc(char *, XrdOucStream *, bool)
Process 'putrc' directives.
int CleanupProofServ(bool all=0, const char *usr=0)
Cleanup (kill) all 'proofserv' processes from the process table.
void FillEnvList(std::list< XpdEnv > *el, const char *nam, const char *val, const char *usrs=0, const char *grps=0, int smi=-1, int smx=-1, int vmi=-1, int vmx=-1, bool hex=0)
Fill env entry(ies) in the relevant list.
int AcceptPeer(XrdProofdProofServ *xps, int to, XrdOucString &e)
Accept a callback from a starting-up server and setup the related protocol object.
void ParseCreateBuffer(XrdProofdProtocol *p, XrdProofdProofServ *xps, XrdOucString &tag, XrdOucString &ord, XrdOucString &cffile, XrdOucString &uenvs, int &intwait)
Extract relevant quantities from the buffer received during a create request.
bool IsClientRecovering(const char *usr, const char *grp, int &deadline)
Returns true (an the recovering deadline) if the client has sessions in recovering state; returns fal...
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
void FormFileNameInSessionDir(XrdProofdProtocol *p, XrdProofdProofServ *xps, const char *sessiondir, const char *extension, XrdOucString &outfn)
void DisconnectFromProofServ(int pid)
Change reconnecting status.
XrdOucHash< XrdProofdProofServ > fSessions
int SetProofServEnv(XrdProofdProtocol *p, void *in)
Set environment for proofserv.
int CreateSockPath(XrdProofdProofServ *xps, XrdProofdProtocol *p, unsigned int seq, XrdOucString &emsg)
Create the socket path for the starting session Return 0 on success, -1 on error (error message in 'e...
bool IsReconnecting()
Return true if in reconnection state, i.e.
XrdSysSemWait fProcessSem
XrdOucString fTermAdminPath
XrdSysSemWait * ProcessSem()
int DoDirectiveShutdown(char *, XrdOucStream *, bool)
Process 'shutdown' directive.
XrdSecCredsSaver_t fCredsSaver
int RmSession(const char *fpid)
Remove session file from the terminated sessions area.
int SetProofServEnvOld(XrdProofdProtocol *p, void *in)
Set environment for proofserv; old version preparing the environment for proofserv protocol version <...
int CreateAdminPath(XrdProofdProofServ *xps, XrdProofdProtocol *p, int pid, XrdOucString &emsg)
Create the admin path for the starting session Return 0 on success, -1 on error (error message in 'em...
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
int CreateProofServEnvFile(XrdProofdProtocol *p, void *input, const char *envfn, const char *rcfn)
Create in 'rcfn' the rootrc file for the proofserv being created return 0 on success,...
std::list< XrdProofdProofServ * > fActiveSessions
XrdSysRecMutex fRecoverMutex
int DoDirectivePutEnv(char *, XrdOucStream *, bool)
Process 'putenv' directives.
int RecoverActiveSessions()
Accept connections from sessions still alive.
int AddSession(XrdProofdProtocol *p, XrdProofdProofServ *s)
Add new active session.
std::map< XrdProofdProtocol *, int > fDestroyTimes
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
bool IsSessionSocket(const char *fpid)
Checks is fpid is the path of a session UNIX socket Returns TRUE is yes; cleans the socket if the ses...
int SetupProtocol(XrdNetPeer &peerpsrv, XrdProofdProofServ *xps, XrdOucString &e)
Setup the protocol object serving the peer described by 'peerpsrv'.
void SetNextSessionsCheck(int t)
XrdOucString fActiAdminPath
int SetUserOwnerships(XrdProofdProtocol *p, const char *ord, const char *stag)
Set user ownerships on some critical files or directories.
void GetTagDirs(int opt, XrdProofdProtocol *p, XrdProofdProofServ *xps, XrdOucString &sesstag, XrdOucString &topsesstag, XrdOucString &sessiondir, XrdOucString &sesswrkdir)
Determine the unique tag and relevant dirs for this session.
std::list< XpdClientSessions * > * fRecoverClients
void RegisterDirectives()
Register directives for configuration.
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
unsigned int fSeqSessionN
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
int Recover(XpdClientSessions *cl)
Handle a request to recover a session after stop&restart for a specific client.
std::list< XpdEnv > fProofServRCs
int TouchSession(const char *fpid, const char *path=0)
Update the access time for the session pid file to the current time.
void SendErrLog(const char *errlog, XrdProofdResponse *r)
Send content of errlog upstream asynchronously.
XrdOucString fProofPlugin
int Process(XrdProofdProtocol *p)
Process manager request.
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
void SetReconnectTime(bool on=1)
Change reconnecting status.
int CreateProofServRootRc(XrdProofdProtocol *p, void *input, const char *rcfn)
Create in 'rcfn' the rootrc file for the proofserv being created return 0 on success,...
int CheckFrequency() const
void ResolveKeywords(XrdOucString &s, ProofServEnv_t *in)
Resolve some keywords in 's' <logfileroot>, <user>, <rootsys>
XrdSysRecMutex fEnvsMutex
int VerifySession(const char *fpid, int to=-1, const char *path=0)
Check if the session is alive, i.e.
int ResolveSession(const char *fpid)
Handle a request to recover a session after stop&restart.
std::list< XpdEnv > fProofServEnvs
int CleanClientSessions(const char *usr, int srvtype)
Go through the sessions admin path and clean all sessions belonging to 'usr'.
XrdSrvBuffer * StartMsg() const
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
const char * Client() const
void SetPLiteNWrks(int n)
void SetOrdinal(const char *o)
void SetGroup(const char *g)
void DeleteUNIXSock()
Delete the current UNIX socket.
void SetClient(const char *c)
void SetValid(bool valid=1)
void SetTag(const char *t)
const char * AdminPath() const
void SetProtocol(XrdProofdProtocol *p)
const char * Ordinal() const
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
const char * UserEnvs() const
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
const char * Alias() const
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
const char * UNIXSockPath() const
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
void SetFileout(const char *f)
void SetUserEnvs(const char *t)
const char * Group() const
void SetAlias(const char *a)
int GetNClients(bool check)
Get the number of connected clients.
void Reset()
Reset this instance.
void SetUNIXSockPath(const char *s)
XrdNet * UNIXSock() const
XrdClientID * Parent() const
void SetParent(XrdClientID *cid)
const char * Fileout() const
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdProtocol * Protocol() const
short int ProofProtocol() const
static int EUidAtStartup()
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdProofdClient * Client() const
XrdSecProtocol * AuthProt() const
XPClientRequest * Request() const
void SetAdminPath(const char *p)
int AddSession(const char *tag)
Record entry for new proofserv session tagged 'tag' in the active sessions file (<SandBox>/....
XrdROOT * GetVersion(const char *tag)
Return pointer to the ROOT version corresponding to 'tag' or 0 if not found.
XrdROOT * DefaultVersion() const
static int GetVersionCode(const char *release)
Translate 'release' into a version code integer following the rules in $ROOTSYS/include/RVersion....
const char * Export() const
const char * PrgmSrv() const
kXR_int16 SrvProtVers() const
static int ChangePerm(uid_t uid, gid_t gid)
XrdOucString fTopSessionTag
XrdProofdProofServMgr * fSessionMgr
XrdProofdClientMgr * fClientMgr
XrdProofSched * fProofSched
struct ClientRequestHdr header
struct XPClientProofRequest proof