29# include "XrdNet/XrdNetAddr.hh"
31#include "Xrd/XrdBuffer.hh"
36#include "XrdOuc/XrdOucStream.hh"
37#include "XrdSys/XrdSysPlatform.hh"
107 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
126 XPDLOC(NMGR,
"NetMgr::Config")
132 std::list<XrdProofWorker *>::iterator
w =
fWorkers.begin();
138 XrdOucString mm(
"master ", 128);
146 XPDERR(
"problems parsing file ");
151 msg = (rcf) ?
"re-configuring" :
"configuring";
155 TRACE(ALL,
"PROOF config file: " <<
164 TRACE(ALL,
"PROOF config file will " <<
169 XPDERR(
"unable to find valid information in PROOF config file " <<
174 TRACE(ALL,
"file " <<
fPROOFcfg.
fName <<
" cannot be parsed: use default configuration to start with");
197 XPDFORM(msg,
"%d worker nodes defined at start-up",
fWorkers.size() - 1);
208 char *val, XrdOucStream *cfg,
bool rcf)
210 XPDLOC(NMGR,
"NetMgr::DoDirective")
216 if (
d->fName ==
"resource") {
218 }
else if (
d->fName ==
"adminreqto") {
220 }
else if (
d->fName ==
"worker") {
224 TRACE(XERR,
"unknown directive: " <<
d->fName);
234 list<XrdProofWorker *>::const_iterator iter, iter2;
235 list<XrdProofWorker *>::iterator iter3;
237 map<XrdProofWorker *, BalancerInfo> info;
239 unsigned int min = UINT_MAX;
241 unsigned int total = 0, total_perit = 0;
243 unsigned int total_added = 0;
245 list<XrdProofWorker *> tempNodes;
250 for (iter =
fNodes.begin(); iter !=
fNodes.end(); ++iter) {
258 info[*iter].available = 0;
260 if ((*iter)->Matches(*iter2)) {
261 info[*iter].available++;
264 info[*iter].added = 0;
266 if (info[*iter].available > 1 && info[*iter].available < min)
267 min = info[*iter].available;
269 total += info[*iter].available;
274 for (iter =
fNodes.begin(); iter !=
fNodes.end(); ++iter) {
275 if (info[*iter].available > 1) {
276 info[*iter].per_iteration = (
unsigned int)floor((
double)info[*iter].available / (
double)min);
278 info[*iter].per_iteration = 1;
281 total_perit += info[*iter].per_iteration;
286 tempNodes.push_back(
fWorkers.front());
290 while (total_added <
total) {
291 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
292 if (i->second.added < i->second.available) {
294 unsigned int to_add =
xrdmin(i->second.per_iteration,
295 (i->second.available - i->second.added));
297 for (
unsigned int j = 0; j < to_add; j++) {
298 tempNodes.push_back(i->first);
300 i->second.added += to_add;
301 total_added += to_add;
320 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
322 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
323 if ((*iter2)->Matches(*iter3)) {
325 (*iter2)->MergeProofServs(*(*iter3));
358 int to = strtol(val, 0, 10);
368 XPDLOC(NMGR,
"NetMgr::DoDirectiveResource")
374 if (!strcmp(
"static", val)) {
378 while ((val = cfg->GetWord()) && val[0]) {
380 if (s.beginswith(
"ucfg:")) {
382 }
else if (s.beginswith(
"reload:")) {
384 }
else if (s.beginswith(
"dfltfallback:")) {
385 fDfltFallback = (s.endswith(
"1") || s.endswith(
"yes")) ? 1 : 0;
386 }
else if (s.beginswith(
"wmx:")) {
387 }
else if (s.beginswith(
"selopt:")) {
397 if (errno == ENOENT) {
416 XPDLOC(NMGR,
"NetMgr::DoDirectiveWorker")
427 XrdOucString wrd(cfg->GetWord());
428 if (wrd.length() > 0) {
431 char rest[2048] = {0};
432 cfg->GetRest((
char *)&rest[0], 2048);
435 if (wrd ==
"master" || wrd ==
"node") {
438 if (pw->
fHost.beginswith(
"localhost") ||
448 int ir =
line.find(
"repeat=");
449 if (ir != STR_NPOS) {
450 XrdOucString
r(
line, ir + strlen(
"repeat="));
451 r.erase(
r.find(
' '));
454 TRACE(DBG,
"found repeat = " << nr);
460 TRACE(DBG,
"found multi-line with: " << mline.
N() <<
" tokens");
461 for (
int i = 0; i < mline.
N(); i++) {
462 TRACE(HDBG,
"found token: " << mline.
Get(i));
488 XPDLOC(NMGR,
"NetMgr::BroadcastCtrlC")
493 std::list<XrdProofWorker *>::iterator iw =
fNodes.begin();
495 while (iw !=
fNodes.end()) {
496 if ((
w = *iw) &&
w->fType !=
'M') {
498 bool us = (((
w->fHost.find(
"localhost") != STR_NPOS ||
499 XrdOucString(
fMgr->
Host()).find(
w->fHost.c_str()) != STR_NPOS)) &&
500 (
w->fPort == -1 ||
w->fPort ==
fMgr->
Port())) ? 1 : 0;
505 XrdOucString u = (
w->fUser.length() > 0) ?
w->
fUser : usr;
509 if (
w->fPort != -1) {
513 TRACE(HDBG,
"sending request to: "<<u);
519 memset(&reqhdr, 0,
sizeof(reqhdr));
526 TRACE(XERR,
"problems marshalling request");
530 TRACE(XERR,
"problems sending ctrl-c request to server " << u);
537 TRACE(DBG,
"broadcast request for ourselves: ignore");
555 XPDLOC(NMGR,
"NetMgr::Broadcast")
557 unsigned int nok = 0;
561 std::list<XrdProofWorker *>::iterator iw =
fNodes.begin();
564 while (iw !=
fNodes.end()) {
565 if ((
w = *iw) &&
w->fType !=
'M') {
567 bool us = (((
w->fHost.find(
"localhost") != STR_NPOS ||
568 XrdOucString(
fMgr->
Host()).find(
w->fHost.c_str()) != STR_NPOS)) &&
569 (
w->fPort == -1 ||
w->fPort ==
fMgr->
Port())) ? 1 : 0;
574 XrdOucString u = (
w->fUser.length() > 0) ?
w->fUser : usr;
578 if (
w->fPort != -1) {
583 int srvtype = (
w->fType !=
'W') ? (kXR_int32)
kXPD_Master
585 TRACE(HDBG,
"sending request to " << u);
587 if (!(xrsp =
Send(u.c_str(),
type, msg, srvtype,
r, notify, subtype))) {
588 TRACE(XERR,
"problems sending request to " << u);
595 TRACE(DBG,
"broadcast request for ourselves: ignore");
603 return (nok ==
fNodes.size()) ? 0 : -1;
614 XrdOucString buf =
" Manager connection from ";
634 const char *msg,
int srvtype,
638 XPDLOC(NMGR,
"NetMgr::Send")
643 if (!url || strlen(url) <= 0)
651 XrdOucString notifymsg(
"Send: ");
656 memset(&reqhdr, 0,
sizeof(reqhdr));
662 notifymsg +=
"change-of-ROOT version request to ";
664 notifymsg +=
" msg: ";
666 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
667 buf = (msg) ? (
const void *)msg : buf;
670 notifymsg +=
"cleanup request to ";
672 notifymsg +=
" for user: ";
676 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
677 buf = (msg) ? (
const void *)msg : buf;
680 notifymsg +=
"exec ";
681 notifymsg += subtype;
682 notifymsg +=
"request for ";
686 reqhdr.
header.dlen = (msg) ? strlen(msg) : 0;
687 buf = (msg) ? (
const void *)msg : buf;
691 TRACE(XERR,
"invalid request type " <<
type);
697 r->Send(kXR_attn,
kXPD_srvmsg, 0, (
char *) notifymsg.c_str(), notifymsg.length());
704 xrsp = conn->
SendReq(&reqhdr, buf, vout,
"NetMgr::Send");
711 XrdOucString cmsg = url;
714 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) cmsg.c_str(), cmsg.length());
721 TRACE(XERR,
"could not open connection to " << url);
723 XrdOucString cmsg =
"failure attempting connection to ";
725 r->Send(kXR_attn,
kXPD_srvmsg, (
char *) cmsg.c_str(), cmsg.length());
740 XPDLOC(NMGR,
"NetMgr::IsLocal")
743 if (host && strlen(host) > 0) {
748 char *fqn = XrdSysDNS::getHostName(uu.
Host.c_str());
751 aNA.Set(uu.
Host.c_str());
752 char *fqn = (
char *) aNA.Name();
755 if (fqn && (strstr(fqn,
"localhost") || !strcmp(fqn,
"127.0.0.1") ||
773 XPDLOC(NMGR,
"NetMgr::ReadBuffer")
782 kXR_int64 ofs = ntohll(
p->Request()->readbuf.ofs);
783 int len = ntohl(
p->Request()->readbuf.len);
789 int dlen =
p->Request()->header.dlen;
790 int grep = ntohl(
p->Request()->readbuf.int1);
793 if (dlen > 0 &&
p->Argp()->buff) {
794 file =
new char[dlen+1];
795 memcpy(file,
p->Argp()->buff, dlen);
799 if (ui.
Host.length() > 0) {
803 memcpy(file, ui.
File.c_str(), ui.
File.length());
804 file[ui.
File.length()] = 0;
805 blen = ui.
File.length();
806 TRACEP(
p, DBG,
"file is LOCAL");
812 pattern =
new char[
len + 1];
816 pattern[i++] = file[j++];
818 filen = strdup(file);
819 filen[blen -
len] = 0;
820 TRACEP(
p, DBG,
"grep operation " << grep <<
", pattern:" << pattern);
823 emsg =
"file name not found";
825 response->Send(kXR_InvalidRequest, emsg.c_str());
829 TRACEP(
p, REQ,
"file: " << filen <<
", ofs: " << ofs <<
", len: " <<
len <<
830 ", pattern: " << pattern);
832 TRACEP(
p, REQ,
"file: " << file <<
", ofs: " << ofs <<
", len: " <<
len);
850 if (u.
User.length() <= 0)
860 XPDFORM(emsg,
"nothing found by 'grep' in %s, pattern: %s", filen, pattern);
866 XPDFORM(emsg,
"could not read buffer from %s %s",
867 (local) ?
"local file " :
"remote file ", file);
869 response->Send(kXR_InvalidRequest, emsg.c_str());
875 emsg =
"nothing found in ";
876 emsg += (grep > 0) ? filen : file;
884 response->Send(buf, lout);
903 XPDLOC(NMGR,
"NetMgr::LocateLocalFile")
906 if (file.length() <= 0 || file.find(
'*') == STR_NPOS)
return 0;
910 int isl = file.rfind(
'/');
911 if (isl != STR_NPOS) {
912 fn.assign(file, isl + 1, -1);
913 dn.assign(file, 0, isl);
921 DIR *dirp = opendir(dn.c_str());
923 XPDFORM(emsg,
"cannot open '%s' - errno: %d", dn.c_str(), errno);
924 TRACE(XERR, emsg.c_str());
927 struct dirent *ent = 0;
929 while ((ent = readdir(dirp))) {
930 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
934 if (sent.matches(fn.c_str()) > 0)
break;
940 if (sent.length() > 0) {
941 XPDFORM(file,
"%s%s", dn.c_str(), sent.c_str());
958 XPDLOC(NMGR,
"NetMgr::ReadBufferLocal")
961 TRACE(REQ,
"file: " << path <<
", ofs: " << ofs <<
", len: " <<
len);
964 if (!path || strlen(path) <= 0) {
965 TRACE(XERR,
"path undefined!");
970 XrdOucString spath(path);
972 TRACE(XERR,
"path cannot be resolved! (" << path <<
")");
975 const char *file = spath.c_str();
978 int fd = open(file, O_RDONLY);
980 emsg =
"could not open ";
988 if (fstat(fd, &st) != 0) {
989 emsg =
"could not get size of file with stat: errno: ";
995 off_t ltot = st.st_size;
999 kXR_int64 start = ofs;
1000 off_t fst = (start < 0) ? ltot + start : start;
1001 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1003 kXR_int64 end = fst +
len;
1004 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1005 TRACE(DBG,
"file size: " << ltot <<
", read from: " << fst <<
" to " << lst);
1013 emsg =
"could not allocate enough memory on the heap: errno: ";
1022 lseek(fd, fst, SEEK_SET);
1028 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1031 TRACE(XERR,
"error reading from file: errno: " << errno);
1039 }
while (nr > 0 && left > 0);
1043 TRACE(HDBG,
"read " << nr <<
" bytes: " << buf);
1060 const char *pat,
int &
len,
int opt)
1062 XPDLOC(NMGR,
"NetMgr::ReadBufferLocal")
1065 TRACE(REQ,
"file: " << path <<
", pat: " << pat <<
", len: " <<
len);
1068 if (!path || strlen(path) <= 0) {
1069 TRACE(XERR,
"file path undefined!");
1074 XrdOucString spath(path);
1076 TRACE(XERR,
"path cannot be resolved! (" << path <<
")");
1079 const char *file = spath.c_str();
1083 if (stat(file, &st) != 0) {
1084 emsg =
"could not get size of file with stat: errno: ";
1089 off_t ltot = st.st_size;
1094 if (pat && strlen(pat) > 0) {
1095 lcmd = strlen(pat) + strlen(file) + 20;
1096 cmd =
new char[lcmd];
1098 snprintf(cmd, lcmd,
"grep %s %s", pat, file);
1099 }
else if (opt == 2) {
1100 snprintf(cmd, lcmd,
"grep -v %s %s", pat, file);
1101 }
else if (opt == 3) {
1102 snprintf(cmd, lcmd,
"cat %s | %s", file, pat);
1104 snprintf(cmd, lcmd,
"cat %s", file);
1107 lcmd = strlen(file) + 10;
1108 cmd =
new char[lcmd];
1109 snprintf(cmd, lcmd,
"cat %s", file);
1111 TRACE(DBG,
"cmd: " << cmd);
1114 FILE *fp = popen(cmd,
"r");
1116 emsg =
"could not run '";
1129 int bufsiz = 0, left = 0, lines = 0;
1130 while ((ltot > 0) && fgets(
line,
sizeof(
line), fp)) {
1132 int llen = strlen(
line);
1136 if (!buf || (llen > left)) {
1137 int dsiz = 100 * ((
int)((
len + llen) / lines) + 1);
1138 dsiz = (dsiz > llen) ? dsiz : llen;
1140 buf = (
char *)
realloc(buf, bufsiz + 1);
1144 emsg =
"could not allocate enough memory on the heap: errno: ";
1151 memcpy(buf +
len,
line, llen);
1155 fprintf(stderr,
"line: %s",
line);
1181 kXR_int64 ofs,
int &
len,
int grep)
1183 XPDLOC(NMGR,
"NetMgr::ReadBufferRemote")
1185 TRACE(REQ,
"url: " << (url ? url :
"undef") <<
1186 ", file: " << (file ? file :
"undef") <<
", ofs: " << ofs <<
1187 ", len: " <<
len <<
", grep: " << grep);
1190 if (!file || strlen(file) <= 0) {
1191 TRACE(XERR,
"file undefined!");
1195 if (!url || strlen(url) <= 0) {
1197 u.
TakeUrl(XrdOucString(file));
1205 if (conn && conn->
IsValid()) {
1208 memset(&reqhdr, 0,
sizeof(reqhdr));
1214 reqhdr.
header.dlen = strlen(file);
1215 const void *btmp = (
const void *) file;
1219 conn->
SendReq(&reqhdr, btmp, vout,
"NetMgr::ReadBufferRemote");
1222 if (xrsp && buf && (xrsp->
DataLen() > 0)) {
1225 if (xrsp && !(xrsp->
IsError()))
1248 XPDLOC(NMGR,
"NetMgr::ReadLogPaths")
1250 TRACE(REQ,
"url: " << (url ? url :
"undef") <<
1251 ", msg: " << (msg ? msg :
"undef") <<
", isess: " << isess);
1254 if (!url || strlen(url) <= 0) {
1255 TRACE(XERR,
"url undefined!");
1263 if (conn && conn->
IsValid()) {
1266 memset(&reqhdr, 0,
sizeof(reqhdr));
1272 reqhdr.
header.dlen = msg ? strlen(msg) : 0;
1273 const void *btmp = (
const void *) msg;
1277 conn->
SendReq(&reqhdr, btmp, vout,
"NetMgr::ReadLogPaths");
1280 if (xrsp && buf && (xrsp->
DataLen() > 0)) {
1282 buf = (
char *)
realloc((
void *)buf,
len + 1);
1306 XPDLOC(NMGR,
"NetMgr::ReadLogPaths")
1308 TRACE(REQ,
"msg: " << (msg ? msg :
"undef") <<
", isess: " << isess);
1310 char *buf = 0, *pbuf = buf;
1313 std::list<XrdProofWorker *>::iterator iw =
fNodes.begin();
1315 while (iw !=
fNodes.end()) {
1318 bool us = (((
w->fHost.find(
"localhost") != STR_NPOS ||
1319 XrdOucString(
fMgr->
Host()).find(
w->fHost.c_str()) != STR_NPOS)) &&
1320 (
w->fPort == -1 ||
w->fPort ==
fMgr->
Port())) ? 1 : 0;
1326 if (
w->fPort != -1) {
1333 len += strlen(bmst) + 1;
1335 pbuf = buf +
len - strlen(bmst) - 1;
1336 memcpy(pbuf, bmst, strlen(bmst) + 1);
1342 TRACE(DBG,
"request for ourselves: ignore");
1359 XPDLOC(NMGR,
"NetMgr::CreateDefaultPROOFcfg")
1371 XrdOucString mm(
"master ", 128);
1378 mm =
"worker localhost port=";
1382 TRACE(DBG,
"added line: " << mm);
1388 std::list<XrdProofWorker *>::iterator
w =
fDfltWorkers.begin();
1408 XPDLOC(NMGR,
"NetMgr::GetActiveWorkers")
1420 TRACE(XERR,
"unable to read the configuration file");
1421 return (std::list<XrdProofWorker *> *)0;
1425 TRACE(DBG,
"returning list with " <<
fWorkers.size() <<
" entries");
1437 const char *xpdloc =
"NetMgr::Dump";
1441 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1442 XPDPRT(
"+ Active workers status");
1446 std::list<XrdProofWorker *>::iterator iw;
1448 XPDPRT(
"+ wrk: " << (*iw)->fHost <<
":" << (*iw)->fPort <<
" type:" << (*iw)->fType <<
1449 " active sessions:" << (*iw)->Active());
1452 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1461 XPDLOC(NMGR,
"NetMgr::GetNodes")
1473 TRACE(XERR,
"unable to read the configuration file");
1474 return (std::list<XrdProofWorker *> *)0;
1478 TRACE(DBG,
"returning list with " <<
fNodes.size() <<
" entries");
1490 XPDLOC(NMGR,
"NetMgr::ReadPROOFcfg")
1514 TRACE(DBG,
"time of last modification: " << st.st_mtime);
1528 TRACE(XERR,
"continuing with existing list of workers.");
1542 XrdOucString mm(
"master ", 128);
1547 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
1558 while (fgets(lin,
sizeof(lin), fin)) {
1561 while (lin[
p++] ==
' ') {
1565 if (lin[
p] ==
'\0' || lin[
p] ==
'\n')
1573 if (lin[strlen(lin)-1] ==
'\n')
1574 lin[strlen(lin)-1] =
'\0';
1576 TRACE(DBG,
"found line: " << lin);
1581 const char *pfx[2] = {
"master",
"node" };
1582 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1583 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1585 if (pw->
fHost.beginswith(
"localhost") ||
1595 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
1600 if (!((*w)->fActive)) {
1601 if ((*w)->fHost == pw->
fHost && (*w)->fPort == pw->
fPort) {
1622 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
1624 if ((*w)->fActive) {
1639 return ((nw == 0) ? -1 : 0);
1650 XPDLOC(NMGR,
"NetMgr::FindUniqueNodes")
1659 std::list<XrdProofWorker *>::iterator
w =
fWorkers.begin();
1661 for (;
w !=
fWorkers.end(); ++
w)
if ((*w)->fActive) {
1663 std::list<XrdProofWorker *>::iterator
n;
1665 if ((*n)->Matches(*
w)) {
1674 TRACE(REQ,
"found " <<
fNodes.size() <<
" unique nodes");
#define TRACE(Flag, Args)
static unsigned int total
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
#define NAME_REQUESTTIMEOUT
#define EnvPutInt(name, val)
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
#define XPD_SETRESP(p, x)
#define TRACEP(p, act, x)
#define XrdSysMutexHelper
void TakeUrl(XrdOucString url)
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
void Reset(const char *str)
Set content from a config file-like string.
bool Matches(const char *host)
Check compatibility of host with this instance.
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdProofdNetMgr * NetMgr() const
const char * Host() const
const char * EffectiveUser() const
XrdOucString Get(int i)
Return i-th combination (i : 0 -> fN-1)
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
std::list< XrdProofWorker * > fDfltWorkers
void BalanceNodesOrder()
Indices (this will be used twice).
std::list< XrdProofWorker * > fNodes
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdOucError *e)
Constructor.
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
void RegisterDirectives()
Register config directives.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
std::list< XrdProofWorker * > fWorkers
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
std::list< XrdProofWorker * > fRegWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual ~XrdProofdNetMgr()
Destructor.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientReadbufRequest readbuf