29 # include "XrdNet/XrdNetAddr.hh" 31 #include "Xrd/XrdBuffer.hh" 36 #include "XrdOuc/XrdOucStream.hh" 37 #include "XrdSys/XrdSysPlatform.hh" 109 std::list<XrdProofWorker *>::iterator w =
fRegWorkers.begin();
128 XPDLOC(NMGR,
"NetMgr::Config")
134 std::list<XrdProofWorker *>::iterator w =
fWorkers.begin();
140 XrdOucString
mm(
"master ", 128);
148 XPDERR(
"problems parsing file ");
153 msg = (rcf) ?
"re-configuring" :
"configuring";
157 TRACE(ALL,
"PROOF config file: " <<
166 TRACE(ALL,
"PROOF config file will " <<
171 XPDERR(
"unable to find valid information in PROOF config file " <<
176 TRACE(ALL,
"file " <<
fPROOFcfg.
fName <<
" cannot be parsed: use default configuration to start with");
199 XPDFORM(msg,
"%d worker nodes defined at start-up",
fWorkers.size() - 1);
210 char *val, XrdOucStream *cfg,
bool rcf)
212 XPDLOC(NMGR,
"NetMgr::DoDirective")
218 if (d->
fName ==
"resource") {
220 }
else if (d->
fName ==
"adminreqto") {
222 }
else if (d->
fName ==
"worker") {
224 }
else if (d->
fName ==
"bonjour") {
228 TRACE(XERR,
"unknown directive: " << d->
fName);
237 XPDLOC(NMGR,
"NetMgr::DoDirectiveBonjour");
240 TRACE(DBG,
"processing Bonjour directive");
246 TRACE(XERR,
"Bonjour support is disabled");
255 list<XrdProofWorker *>::const_iterator iter, iter2;
256 list<XrdProofWorker *>::iterator iter3;
258 map<XrdProofWorker *, BalancerInfo> info;
260 unsigned int min = UINT_MAX;
262 unsigned int total = 0, total_perit = 0;
264 unsigned int total_added = 0;
266 list<XrdProofWorker *> tempNodes;
271 for (iter =
fNodes.begin(); iter !=
fNodes.end(); ++iter) {
279 info[*iter].available = 0;
281 if ((*iter)->Matches(*iter2)) {
282 info[*iter].available++;
285 info[*iter].added = 0;
287 if (info[*iter].available > 1 && info[*iter].available < min)
288 min = info[*iter].available;
290 total += info[*iter].available;
295 for (iter =
fNodes.begin(); iter !=
fNodes.end(); ++iter) {
296 if (info[*iter].available > 1) {
297 info[*iter].per_iteration = (
unsigned int)
floor((
double)info[*iter].available / (double)min);
299 info[*iter].per_iteration = 1;
302 total_perit += info[*iter].per_iteration;
307 tempNodes.push_back(
fWorkers.front());
311 while (total_added < total) {
312 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
313 if (i->second.added < i->second.available) {
315 unsigned int to_add =
xrdmin(i->second.per_iteration,
316 (i->second.available - i->second.added));
318 for (
unsigned int j = 0; j < to_add; j++) {
319 tempNodes.push_back(i->first);
321 i->second.added += to_add;
322 total_added += to_add;
341 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
343 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
344 if ((*iter2)->Matches(*iter3)) {
346 (*iter2)->MergeProofServs(*(*iter3));
379 int to = strtol(val, 0, 10);
389 XPDLOC(NMGR,
"NetMgr::DoDirectiveResource")
395 if (!strcmp(
"static", val)) {
399 while ((val = cfg->GetWord()) && val[0]) {
401 if (s.beginswith(
"ucfg:")) {
403 }
else if (s.beginswith(
"reload:")) {
405 }
else if (s.beginswith(
"dfltfallback:")) {
406 fDfltFallback = (s.endswith(
"1") || s.endswith(
"yes")) ? 1 : 0;
407 }
else if (s.beginswith(
"wmx:")) {
408 }
else if (s.beginswith(
"selopt:")) {
418 if (errno == ENOENT) {
437 XPDLOC(NMGR,
"NetMgr::DoDirectiveWorker")
448 XrdOucString wrd(cfg->GetWord());
449 if (wrd.length() > 0) {
452 char rest[2048] = {0};
453 cfg->GetRest((
char *)&rest[0], 2048);
454 XPDFORM(line,
"%s %s", wrd.c_str(), rest);
456 if (wrd ==
"master" || wrd ==
"node") {
459 if (pw->
fHost.beginswith(
"localhost") ||
463 fw->
Reset(line.c_str());
469 int ir = line.find(
"repeat=");
470 if (ir != STR_NPOS) {
471 XrdOucString
r(line, ir + strlen(
"repeat="));
472 r.erase(r.find(
' '));
475 TRACE(DBG,
"found repeat = " << nr);
480 if (mline.IsValid()) {
481 TRACE(DBG,
"found multi-line with: " << mline.N() <<
" tokens");
482 for (
int i = 0; i < mline.N(); i++) {
483 TRACE(HDBG,
"found token: " << mline.Get(i));
487 TRACE(DBG,
"found line: " << line);
509 XPDLOC(NMGR,
"NetMgr::BroadcastCtrlC")
514 std::list<XrdProofWorker *>::iterator iw =
fNodes.begin();
516 while (iw !=
fNodes.end()) {
517 if ((w = *iw) && w->
fType !=
'M') {
519 bool us = (((w->
fHost.find(
"localhost") != STR_NPOS ||
520 XrdOucString(
fMgr->
Host()).find(w->
fHost.c_str()) != STR_NPOS)) &&
526 XrdOucString u = (w->
fUser.length() > 0) ? w->
fUser : usr;
530 if (w->
fPort != -1) {
534 TRACE(HDBG,
"sending request to: "<<u);
540 memset(&reqhdr, 0,
sizeof(reqhdr));
547 TRACE(XERR,
"problems marshalling request");
551 TRACE(XERR,
"problems sending ctrl-c request to server " << u);
558 TRACE(DBG,
"broadcast request for ourselves: ignore");
576 XPDLOC(NMGR,
"NetMgr::Broadcast")
578 unsigned int nok = 0;
579 TRACE(REQ,
"type: " << type);
582 std::list<XrdProofWorker *>::iterator iw =
fNodes.begin();
585 while (iw !=
fNodes.end()) {
586 if ((w = *iw) && w->
fType !=
'M') {
588 bool us = (((w->
fHost.find(
"localhost") != STR_NPOS ||
589 XrdOucString(
fMgr->
Host()).find(w->
fHost.c_str()) != STR_NPOS)) &&
595 XrdOucString u = (w->
fUser.length() > 0) ? w->
fUser : usr;
599 if (w->
fPort != -1) {
606 TRACE(HDBG,
"sending request to " << u);
608 if (!(xrsp =
Send(u.c_str(),
type, msg, srvtype,
r, notify, subtype))) {
609 TRACE(XERR,
"problems sending request to " << u);
616 TRACE(DBG,
"broadcast request for ourselves: ignore");
624 return (nok ==
fNodes.size()) ? 0 : -1;
635 XrdOucString buf =
" Manager connection from ";
655 const char *msg,
int srvtype,
659 XPDLOC(NMGR,
"NetMgr::Send")
662 TRACE(REQ,
"type: " << type);
664 if (!url || strlen(url) <= 0)
672 XrdOucString notifymsg(
"Send: ");
677 memset(&reqhdr, 0,
sizeof(reqhdr));
683 notifymsg +=
"change-of-ROOT version request to ";
685 notifymsg +=
" msg: ";
687 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
688 buf = (msg) ? (
const void *)msg : buf;
691 notifymsg +=
"cleanup request to ";
693 notifymsg +=
" for user: ";
697 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
698 buf = (msg) ? (
const void *)msg : buf;
701 notifymsg +=
"exec ";
702 notifymsg += subtype;
703 notifymsg +=
"request for ";
707 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
708 buf = (msg) ? (
const void *)msg : buf;
712 TRACE(XERR,
"invalid request type " << type);
718 r->
Send(kXR_attn,
kXPD_srvmsg, 0, (
char *) notifymsg.c_str(), notifymsg.length());
725 xrsp = conn->
SendReq(&reqhdr, buf, vout,
"NetMgr::Send");
732 XrdOucString cmsg = url;
735 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) cmsg.c_str(), cmsg.length());
742 TRACE(XERR,
"could not open connection to " << url);
744 XrdOucString cmsg =
"failure attempting connection to ";
761 XPDLOC(NMGR,
"NetMgr::IsLocal")
764 if (host && strlen(host) > 0) {
769 char *fqn = XrdSysDNS::getHostName(uu.
Host.c_str());
772 aNA.Set(uu.
Host.c_str());
773 char *fqn = (
char *) aNA.Name();
776 if (fqn && (strstr(fqn,
"localhost") || !strcmp(fqn,
"127.0.0.1") ||
794 XPDLOC(NMGR,
"NetMgr::ReadBuffer")
814 if (dlen > 0 && p->
Argp()->buff) {
815 file =
new char[dlen+1];
816 memcpy(file, p->
Argp()->buff, dlen);
820 if (ui.
Host.length() > 0) {
824 memcpy(file, ui.
File.c_str(), ui.
File.length());
825 file[ui.
File.length()] = 0;
826 blen = ui.
File.length();
827 TRACEP(p, DBG,
"file is LOCAL");
833 pattern =
new char[len + 1];
837 pattern[i++] = file[j++];
839 filen = strdup(file);
840 filen[blen - len] = 0;
841 TRACEP(p, DBG,
"grep operation " << grep <<
", pattern:" << pattern);
844 emsg =
"file name not found";
846 response->Send(kXR_InvalidRequest, emsg.c_str());
850 TRACEP(p, REQ,
"file: " << filen <<
", ofs: " << ofs <<
", len: " << len <<
851 ", pattern: " << pattern);
853 TRACEP(p, REQ,
"file: " << file <<
", ofs: " << ofs <<
", len: " << len);
871 if (u.
User.length() <= 0)
881 XPDFORM(emsg,
"nothing found by 'grep' in %s, pattern: %s", filen, pattern);
887 XPDFORM(emsg,
"could not read buffer from %s %s",
888 (local) ?
"local file " :
"remote file ", file);
890 response->Send(kXR_InvalidRequest, emsg.c_str());
896 emsg =
"nothing found in ";
897 emsg += (grep > 0) ? filen : file;
905 response->Send(buf, lout);
924 XPDLOC(NMGR,
"NetMgr::LocateLocalFile")
927 if (file.length() <= 0 || file.find(
'*') == STR_NPOS)
return 0;
931 int isl = file.rfind(
'/');
932 if (isl != STR_NPOS) {
933 fn.assign(file, isl + 1, -1);
934 dn.assign(file, 0, isl);
942 DIR *dirp = opendir(dn.c_str());
944 XPDFORM(emsg,
"cannot open '%s' - errno: %d", dn.c_str(), errno);
945 TRACE(XERR, emsg.c_str());
948 struct dirent *ent = 0;
950 while ((ent = readdir(dirp))) {
951 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
955 if (sent.matches(fn.c_str()) > 0)
break;
961 if (sent.length() > 0) {
962 XPDFORM(file,
"%s%s", dn.c_str(), sent.c_str());
979 XPDLOC(NMGR,
"NetMgr::ReadBufferLocal")
982 TRACE(REQ,
"file: " << path <<
", ofs: " << ofs <<
", len: " << len);
985 if (!path || strlen(path) <= 0) {
986 TRACE(XERR,
"path undefined!");
991 XrdOucString spath(path);
993 TRACE(XERR,
"path cannot be resolved! (" << path <<
")");
996 const char *
file = spath.c_str();
999 int fd =
open(file, O_RDONLY);
1001 emsg =
"could not open ";
1009 if (fstat(fd, &st) != 0) {
1010 emsg =
"could not get size of file with stat: errno: ";
1016 off_t ltot = st.st_size;
1020 kXR_int64 start = ofs;
1021 off_t fst = (start < 0) ? ltot + start : start;
1022 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1024 kXR_int64 end = fst + len;
1025 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1026 TRACE(DBG,
"file size: " << ltot <<
", read from: " << fst <<
" to " << lst);
1032 char *buf = (
char *)
malloc(len + 1);
1034 emsg =
"could not allocate enough memory on the heap: errno: ";
1043 lseek(fd, fst, SEEK_SET);
1049 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1052 TRACE(XERR,
"error reading from file: errno: " << errno);
1060 }
while (nr > 0 && left > 0);
1064 TRACE(HDBG,
"read " << nr <<
" bytes: " << buf);
1081 const char *pat,
int &len,
int opt)
1083 XPDLOC(NMGR,
"NetMgr::ReadBufferLocal")
1086 TRACE(REQ,
"file: " << path <<
", pat: " << pat <<
", len: " << len);
1089 if (!path || strlen(path) <= 0) {
1090 TRACE(XERR,
"file path undefined!");
1095 XrdOucString spath(path);
1097 TRACE(XERR,
"path cannot be resolved! (" << path <<
")");
1100 const char *
file = spath.c_str();
1104 if (stat(file, &st) != 0) {
1105 emsg =
"could not get size of file with stat: errno: ";
1110 off_t ltot = st.st_size;
1115 if (pat && strlen(pat) > 0) {
1116 lcmd = strlen(pat) + strlen(file) + 20;
1117 cmd =
new char[lcmd];
1119 snprintf(cmd, lcmd,
"grep %s %s", pat, file);
1120 }
else if (opt == 2) {
1121 snprintf(cmd, lcmd,
"grep -v %s %s", pat, file);
1122 }
else if (opt == 3) {
1123 snprintf(cmd, lcmd,
"cat %s | %s", file, pat);
1125 snprintf(cmd, lcmd,
"cat %s", file);
1128 lcmd = strlen(file) + 10;
1129 cmd =
new char[lcmd];
1130 snprintf(cmd, lcmd,
"cat %s", file);
1132 TRACE(DBG,
"cmd: " << cmd);
1135 FILE *fp = popen(cmd,
"r");
1137 emsg =
"could not run '";
1150 int bufsiz = 0, left = 0, lines = 0;
1151 while ((ltot > 0) && fgets(line,
sizeof(line), fp)) {
1153 int llen = strlen(line);
1157 if (!buf || (llen > left)) {
1158 int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1159 dsiz = (dsiz > llen) ? dsiz : llen;
1161 buf = (
char *)
realloc(buf, bufsiz + 1);
1165 emsg =
"could not allocate enough memory on the heap: errno: ";
1172 memcpy(buf + len, line, llen);
1176 fprintf(stderr,
"line: %s", line);
1202 kXR_int64 ofs,
int &len,
int grep)
1204 XPDLOC(NMGR,
"NetMgr::ReadBufferRemote")
1206 TRACE(REQ,
"url: " << (url ? url :
"undef") <<
1207 ", file: " << (file ? file :
"undef") <<
", ofs: " << ofs <<
1208 ", len: " << len <<
", grep: " << grep);
1211 if (!file || strlen(file) <= 0) {
1212 TRACE(XERR,
"file undefined!");
1216 if (!url || strlen(url) <= 0) {
1218 u.
TakeUrl(XrdOucString(file));
1226 if (conn && conn->
IsValid()) {
1229 memset(&reqhdr, 0,
sizeof(reqhdr));
1235 reqhdr.
header.dlen = strlen(file);
1236 const void *btmp = (
const void *) file;
1240 conn->
SendReq(&reqhdr, btmp, vout,
"NetMgr::ReadBufferRemote");
1243 if (xrsp && buf && (xrsp->
DataLen() > 0)) {
1246 if (xrsp && !(xrsp->
IsError()))
1269 XPDLOC(NMGR,
"NetMgr::ReadLogPaths")
1271 TRACE(REQ,
"url: " << (url ? url :
"undef") <<
1272 ", msg: " << (msg ? msg :
"undef") <<
", isess: " << isess);
1275 if (!url || strlen(url) <= 0) {
1276 TRACE(XERR,
"url undefined!");
1284 if (conn && conn->
IsValid()) {
1287 memset(&reqhdr, 0,
sizeof(reqhdr));
1293 reqhdr.
header.dlen = msg ? strlen(msg) : 0;
1294 const void *btmp = (
const void *) msg;
1298 conn->
SendReq(&reqhdr, btmp, vout,
"NetMgr::ReadLogPaths");
1301 if (xrsp && buf && (xrsp->
DataLen() > 0)) {
1303 buf = (
char *)
realloc((
void *)buf, len + 1);
1327 XPDLOC(NMGR,
"NetMgr::ReadLogPaths")
1329 TRACE(REQ,
"msg: " << (msg ? msg :
"undef") <<
", isess: " << isess);
1331 char *buf = 0, *pbuf = buf;
1334 std::list<XrdProofWorker *>::iterator iw =
fNodes.begin();
1336 while (iw !=
fNodes.end()) {
1339 bool us = (((w->
fHost.find(
"localhost") != STR_NPOS ||
1340 XrdOucString(
fMgr->
Host()).find(w->
fHost.c_str()) != STR_NPOS)) &&
1347 if (w->
fPort != -1) {
1354 len += strlen(bmst) + 1;
1355 buf = (
char *)
realloc((
void *)buf, len);
1356 pbuf = buf + len - strlen(bmst) - 1;
1357 memcpy(pbuf, bmst, strlen(bmst) + 1);
1363 TRACE(DBG,
"request for ourselves: ignore");
1380 XPDLOC(NMGR,
"NetMgr::CreateDefaultPROOFcfg")
1392 XrdOucString
mm(
"master ", 128);
1399 mm =
"worker localhost port=";
1403 TRACE(DBG,
"added line: " << mm);
1409 std::list<XrdProofWorker *>::iterator w =
fDfltWorkers.begin();
1429 XPDLOC(NMGR,
"NetMgr::GetActiveWorkers")
1441 TRACE(XERR,
"unable to read the configuration file");
1442 return (std::list<XrdProofWorker *> *)0;
1446 TRACE(DBG,
"returning list with " <<
fWorkers.size() <<
" entries");
1458 const char *xpdloc =
"NetMgr::Dump";
1462 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1463 XPDPRT(
"+ Active workers status");
1467 std::list<XrdProofWorker *>::iterator iw;
1469 XPDPRT(
"+ wrk: " << (*iw)->fHost <<
":" << (*iw)->fPort <<
" type:" << (*iw)->fType <<
1470 " active sessions:" << (*iw)->Active());
1473 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1482 XPDLOC(NMGR,
"NetMgr::GetNodes")
1494 TRACE(XERR,
"unable to read the configuration file");
1495 return (std::list<XrdProofWorker *> *)0;
1499 TRACE(DBG,
"returning list with " <<
fNodes.size() <<
" entries");
1511 XPDLOC(NMGR,
"NetMgr::ReadPROOFcfg")
1535 TRACE(DBG,
"time of last modification: " << st.st_mtime);
1549 TRACE(XERR,
"continuing with existing list of workers.");
1563 XrdOucString
mm(
"master ", 128);
1568 std::list<XrdProofWorker *>::iterator w =
fRegWorkers.begin();
1579 while (fgets(lin,
sizeof(lin), fin)) {
1582 while (lin[p++] ==
' ') {
1586 if (lin[p] ==
'\0' || lin[p] ==
'\n')
1594 if (lin[strlen(lin)-1] ==
'\n')
1595 lin[strlen(lin)-1] =
'\0';
1597 TRACE(DBG,
"found line: " << lin);
1602 const char *pfx[2] = {
"master",
"node" };
1603 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1604 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1606 if (pw->
fHost.beginswith(
"localhost") ||
1616 std::list<XrdProofWorker *>::iterator w =
fRegWorkers.begin();
1621 if (!((*w)->fActive)) {
1622 if ((*w)->fHost == pw->
fHost && (*w)->fPort == pw->
fPort) {
1643 std::list<XrdProofWorker *>::iterator w =
fRegWorkers.begin();
1645 if ((*w)->fActive) {
1660 return ((nw == 0) ? -1 : 0);
1671 XPDLOC(NMGR,
"NetMgr::FindUniqueNodes")
1680 std::list<XrdProofWorker *>::iterator w =
fWorkers.begin();
1682 for (; w !=
fWorkers.end(); ++w)
if ((*w)->fActive) {
1684 std::list<XrdProofWorker *>::iterator
n;
1686 if ((*n)->Matches(*w)) {
1695 TRACE(REQ,
"found " <<
fNodes.size() <<
" unique nodes");
int DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
void RegisterDirectives()
Register config directives.
void TakeUrl(XrdOucString url)
static constexpr double pi
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
static constexpr double us
struct XPClientReadbufRequest readbuf
XrdProofdNetMgr * NetMgr() const
std::list< XrdProofWorker * > fRegWorkers
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
#define TRACE(Flag, Args)
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
void Reset(const char *str)
Set content from a config file-like string.
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
static constexpr double mm
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
struct ClientRequestHdr header
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
std::list< XrdProofWorker * > fNodes
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
const char * User() const
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
const char * GetLastErr()
#define EnvPutInt(name, val)
virtual ~XrdProofdNetMgr()
Destructor.
bool Matches(const char *host)
Check compatibility of host with this instance.
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
#define XrdSysMutexHelper
struct XPClientProofRequest proof
const char * Host() const
#define TRACEP(p, act, x)
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
static unsigned int total
std::list< XrdProofWorker * > fWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found, return -1.
std::list< XrdProofWorker * > fDfltWorkers
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
static constexpr double s
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
const char * EffectiveUser() const
void BalanceNodesOrder()
Indices (this will be used twice).
#define XPD_SETRESP(p, x)
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
#define NAME_REQUESTTIMEOUT
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
bool IsValid() const
Test validity of this connection.
XrdProofdClient * Client() const
virtual int Config(bool rcf=0)
int Send(void)
Auxilliary Send method.
void Register(const char *dname, XrdProofdDirective *d)
XPClientRequest * Request() const
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.