29# include "XrdNet/XrdNetAddr.hh"
31#include "Xrd/XrdBuffer.hh"
36#include "XrdOuc/XrdOucStream.hh"
37#include "XrdSys/XrdSysPlatform.hh"
107 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
132 std::list<XrdProofWorker *>::iterator
w =
fWorkers.begin();
138 XrdOucString mm(
"master ", 128);
146 XPDERR(
"problems parsing file ");
151 msg = (
rcf) ?
"re-configuring" :
"configuring";
164 TRACE(
ALL,
"PROOF config file will " <<
169 XPDERR(
"unable to find valid information in PROOF config file " <<
216 if (
d->fName ==
"resource") {
218 }
else if (
d->fName ==
"adminreqto") {
220 }
else if (
d->fName ==
"worker") {
224 TRACE(
XERR,
"unknown directive: " <<
d->fName);
234 list<XrdProofWorker *>::const_iterator iter,
iter2;
235 list<XrdProofWorker *>::iterator
iter3;
250 for (iter =
fNodes.begin(); iter !=
fNodes.end(); ++iter) {
258 info[*iter].available = 0;
260 if ((*iter)->Matches(*
iter2)) {
261 info[*iter].available++;
264 info[*iter].added = 0;
266 if (
info[*iter].available > 1 &&
info[*iter].available < min)
267 min =
info[*iter].available;
274 for (iter =
fNodes.begin(); iter !=
fNodes.end(); ++iter) {
275 if (
info[*iter].available > 1) {
276 info[*iter].per_iteration = (
unsigned int)floor((
double)
info[*iter].available / (
double)min);
278 info[*iter].per_iteration = 1;
291 for (map<XrdProofWorker *, BalancerInfo>::iterator i =
info.
begin(); i !=
info.
end(); ++i) {
292 if (i->second.added < i->second.available) {
295 (i->second.available - i->second.added));
297 for (
unsigned int j = 0;
j <
to_add;
j++) {
300 i->second.added +=
to_add;
323 if ((*iter2)->Matches(*
iter3)) {
325 (*iter2)->MergeProofServs(*(*
iter3));
374 if (!
strcmp(
"static", val)) {
378 while ((val = cfg->GetWord()) && val[0]) {
380 if (s.beginswith(
"ucfg:")) {
382 }
else if (s.beginswith(
"reload:")) {
384 }
else if (s.beginswith(
"dfltfallback:")) {
385 fDfltFallback = (s.endswith(
"1") || s.endswith(
"yes")) ? 1 : 0;
386 }
else if (s.beginswith(
"wmx:")) {
387 }
else if (s.beginswith(
"selopt:")) {
427 XrdOucString
wrd(cfg->GetWord());
428 if (
wrd.length() > 0) {
431 char rest[2048] = {0};
432 cfg->GetRest((
char *)&rest[0], 2048);
435 if (
wrd ==
"master" ||
wrd ==
"node") {
438 if (
pw->fHost.beginswith(
"localhost") ||
448 int ir =
line.find(
"repeat=");
451 r.erase(
r.find(
' '));
459 if (
mline.IsValid()) {
460 TRACE(
DBG,
"found multi-line with: " <<
mline.N() <<
" tokens");
461 for (
int i = 0; i <
mline.N(); i++) {
493 std::list<XrdProofWorker *>::iterator
iw =
fNodes.begin();
496 if ((
w = *
iw) &&
w->fType !=
'M') {
498 bool us = (((
w->fHost.find(
"localhost") !=
STR_NPOS ||
500 (
w->fPort == -1 ||
w->fPort ==
fMgr->
Port())) ? 1 : 0;
505 XrdOucString
u = (
w->fUser.length() > 0) ?
w->fUser :
usr;
509 if (
w->fPort != -1) {
526 TRACE(
XERR,
"problems marshalling request");
530 TRACE(
XERR,
"problems sending ctrl-c request to server " <<
u);
537 TRACE(
DBG,
"broadcast request for ourselves: ignore");
557 unsigned int nok = 0;
561 std::list<XrdProofWorker *>::iterator
iw =
fNodes.begin();
565 if ((
w = *
iw) &&
w->fType !=
'M') {
567 bool us = (((
w->fHost.find(
"localhost") !=
STR_NPOS ||
569 (
w->fPort == -1 ||
w->fPort ==
fMgr->
Port())) ? 1 : 0;
574 XrdOucString
u = (
w->fUser.length() > 0) ?
w->fUser :
usr;
578 if (
w->fPort != -1) {
588 TRACE(
XERR,
"problems sending request to " <<
u);
595 TRACE(
DBG,
"broadcast request for ourselves: ignore");
614 XrdOucString buf =
" Manager connection from ";
662 notifymsg +=
"change-of-ROOT version request to ";
667 buf = (
msg) ? (
const void *)
msg : buf;
677 buf = (
msg) ? (
const void *)
msg : buf;
687 buf = (
msg) ? (
const void *)
msg : buf;
723 XrdOucString
cmsg =
"failure attempting connection to ";
743 if (host &&
strlen(host) > 0) {
745 if (
uu.Port <= 0)
uu.Port = 1093;
748 char *
fqn = XrdSysDNS::getHostName(
uu.Host.c_str());
751 aNA.Set(
uu.Host.c_str());
752 char *
fqn = (
char *)
aNA.Name();
782 kXR_int64 ofs =
ntohll(
p->Request()->readbuf.ofs);
783 int len =
ntohl(
p->Request()->readbuf.len);
789 int dlen =
p->Request()->header.dlen;
793 if (dlen > 0 &&
p->Argp()->buff) {
794 file =
new char[dlen+1];
795 memcpy(file,
p->Argp()->buff, dlen);
799 if (
ui.Host.length() > 0) {
803 memcpy(file,
ui.File.c_str(),
ui.File.length());
804 file[
ui.File.length()] = 0;
812 pattern =
new char[
len + 1];
816 pattern[i++] = file[
j++];
820 TRACEP(
p,
DBG,
"grep operation " <<
grep <<
", pattern:" << pattern);
823 emsg =
"file name not found";
830 ", pattern: " << pattern);
832 TRACEP(
p,
REQ,
"file: " << file <<
", ofs: " << ofs <<
", len: " <<
len);
850 if (
u.User.length() <= 0)
860 XPDFORM(
emsg,
"nothing found by 'grep' in %s, pattern: %s",
filen, pattern);
867 (
local) ?
"local file " :
"remote file ", file);
875 emsg =
"nothing found in ";
884 response->Send(buf,
lout);
906 if (file.length() <= 0 || file.find(
'*') ==
STR_NPOS)
return 0;
910 int isl = file.rfind(
'/');
912 fn.assign(file,
isl + 1, -1);
913 dn.assign(file, 0,
isl);
930 if (!
strncmp(ent->d_name,
".", 1) || !
strncmp(ent->d_name,
"..", 2))
934 if (
sent.matches(
fn.c_str()) > 0)
break;
940 if (
sent.length() > 0) {
961 TRACE(
REQ,
"file: " << path <<
", ofs: " << ofs <<
", len: " <<
len);
964 if (!path ||
strlen(path) <= 0) {
970 XrdOucString
spath(path);
972 TRACE(
XERR,
"path cannot be resolved! (" << path <<
")");
975 const char *file =
spath.c_str();
980 emsg =
"could not open ";
989 emsg =
"could not get size of file with stat: errno: ";
999 kXR_int64 start = ofs;
1003 kXR_int64 end =
fst +
len;
1013 emsg =
"could not allocate enough memory on the heap: errno: ";
1028 while ((
nr = read(fd, buf + pos, left)) < 0 &&
errno ==
EINTR)
1039 }
while (
nr > 0 && left > 0);
1060 const char *
pat,
int &
len,
int opt)
1065 TRACE(
REQ,
"file: " << path <<
", pat: " <<
pat <<
", len: " <<
len);
1068 if (!path ||
strlen(path) <= 0) {
1074 XrdOucString
spath(path);
1076 TRACE(
XERR,
"path cannot be resolved! (" << path <<
")");
1079 const char *file =
spath.c_str();
1083 if (stat(file, &
st) != 0) {
1084 emsg =
"could not get size of file with stat: errno: ";
1096 cmd =
new char[
lcmd];
1099 }
else if (opt == 2) {
1101 }
else if (opt == 3) {
1108 cmd =
new char[
lcmd];
1116 emsg =
"could not run '";
1129 int bufsiz = 0, left = 0, lines = 0;
1136 if (!buf || (
llen > left)) {
1144 emsg =
"could not allocate enough memory on the heap: errno: ";
1181 kXR_int64 ofs,
int &
len,
int grep)
1186 ", file: " << (file ? file :
"undef") <<
", ofs: " << ofs <<
1187 ", len: " <<
len <<
", grep: " <<
grep);
1190 if (!file ||
strlen(file) <= 0) {
1197 u.TakeUrl(XrdOucString(file));
1205 if (conn && conn->
IsValid()) {
1211 reqhdr.readbuf.ofs = ofs;
1215 const void *
btmp = (
const void *) file;
1222 if (
xrsp && buf && (
xrsp->DataLen() > 0)) {
1251 ", msg: " << (
msg ?
msg :
"undef") <<
", isess: " <<
isess);
1263 if (conn && conn->
IsValid()) {
1273 const void *
btmp = (
const void *)
msg;
1280 if (
xrsp && buf && (
xrsp->DataLen() > 0)) {
1282 buf = (
char *)
realloc((
void *)buf,
len + 1);
1310 char *buf = 0, *
pbuf = buf;
1313 std::list<XrdProofWorker *>::iterator
iw =
fNodes.begin();
1318 bool us = (((
w->fHost.find(
"localhost") !=
STR_NPOS ||
1320 (
w->fPort == -1 ||
w->fPort ==
fMgr->
Port())) ? 1 : 0;
1326 if (
w->fPort != -1) {
1342 TRACE(
DBG,
"request for ourselves: ignore");
1359 XPDLOC(
NMGR,
"NetMgr::CreateDefaultPROOFcfg")
1371 XrdOucString mm(
"master ", 128);
1378 mm =
"worker localhost port=";
1388 std::list<XrdProofWorker *>::iterator
w =
fDfltWorkers.begin();
1420 TRACE(
XERR,
"unable to read the configuration file");
1421 return (std::list<XrdProofWorker *> *)0;
1437 const char *
xpdloc =
"NetMgr::Dump";
1441 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1442 XPDPRT(
"+ Active workers status");
1446 std::list<XrdProofWorker *>::iterator
iw;
1448 XPDPRT(
"+ wrk: " << (*iw)->fHost <<
":" << (*iw)->fPort <<
" type:" << (*iw)->fType <<
1449 " active sessions:" << (*iw)->Active());
1452 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1473 TRACE(
XERR,
"unable to read the configuration file");
1474 return (std::list<XrdProofWorker *> *)0;
1478 TRACE(
DBG,
"returning list with " <<
fNodes.size() <<
" entries");
1514 TRACE(
DBG,
"time of last modification: " <<
st.st_mtime);
1528 TRACE(
XERR,
"continuing with existing list of workers.");
1542 XrdOucString mm(
"master ", 128);
1547 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
1561 while (
lin[
p++] ==
' ') {
1565 if (
lin[
p] ==
'\0' ||
lin[
p] ==
'\n')
1581 const char *
pfx[2] = {
"master",
"node" };
1585 if (
pw->fHost.beginswith(
"localhost") ||
1595 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
1600 if (!((*w)->fActive)) {
1601 if ((*w)->fHost ==
pw->fHost && (*w)->fPort ==
pw->fPort) {
1622 std::list<XrdProofWorker *>::iterator
w =
fRegWorkers.begin();
1624 if ((*w)->fActive) {
1639 return ((
nw == 0) ? -1 : 0);
1659 std::list<XrdProofWorker *>::iterator
w =
fWorkers.begin();
1661 for (;
w !=
fWorkers.end(); ++
w)
if ((*w)->fActive) {
1663 std::list<XrdProofWorker *>::iterator
n;
1665 if ((*n)->Matches(*
w)) {
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define TRACE(Flag, Args)
static unsigned int total
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
#define NAME_REQUESTTIMEOUT
#define EnvPutInt(name, val)
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
#define XPD_SETRESP(p, x)
#define TRACEP(p, act, x)
#define XrdSysMutexHelper
const_iterator begin() const
const_iterator end() const
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdProofdNetMgr * NetMgr() const
const char * Host() const
const char * EffectiveUser() const
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
std::list< XrdProofWorker * > fDfltWorkers
void BalanceNodesOrder()
Indices (this will be used twice).
std::list< XrdProofWorker * > fNodes
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdOucError *e)
Constructor.
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
void RegisterDirectives()
Register config directives.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
std::list< XrdProofWorker * > fWorkers
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
std::list< XrdProofWorker * > fRegWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual ~XrdProofdNetMgr()
Destructor.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...