13 #include "XrdNet/XrdNet.hh" 71 std::vector<XrdClientID *>::iterator i;
94 XPDLOC(PMGR,
"DecreaseWorkerCounters")
114 XPDLOC(PMGR,
"DumpWorkerCounters")
143 if (!o || !w)
return;
147 fWorkers.Add(o, w, 0, Hash_keepdata);
155 XPDLOC(SMGR,
"ProofServ::RemoveWorker")
159 TRACE(DBG,
"removing: "<<o);
175 XPDLOC(SMGR,
"ProofServ::Reset")
182 FILE *fpid = fopen(fn.c_str(),
"r");
185 if (fgets(line,
sizeof(line), fpid)) {
186 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] = 0;
189 TRACE(XERR,
"problems reading from file "<<fn);
193 TRACE(DBG,
"file: "<<fn<<
", st:"<<st);
279 XPDLOC(SMGR,
"ProofServ::GetClientID")
284 TRACE(XERR,
"negative ID: protocol error!");
308 if (cid >= (
int)
fClients.capacity())
313 for (; ic <= cid; ic++)
333 XPDLOC(SMGR,
"ProofServ::FreeClientID")
335 TRACE(DBG,
"svrPID: "<<
fSrvPID<<
", pid: "<<pid<<
", session status: "<<
339 TRACE(XERR,
"undefined pid!");
347 std::vector<XrdClientID *>::iterator i;
349 if ((*i) && (*i)->P()) {
350 if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
367 if (
TRACING(REQ) && (rc == 0)) {
369 TRACE(REQ, spid<<
": slot for client pid: "<<pid<<
" has been reset");
387 std::vector<XrdClientID *>::iterator i;
389 if ((*i) && (*i)->P() && (*i)->P()->Link())
fNClients++;
408 return ((disct > 0) ? disct : -1);
422 return ((idlet > 0) ? idlet : -1);
454 XPDLOC(SMGR,
"ProofServ::Broadcast")
461 if (msg && (len = strlen(msg)) > 0) {
463 int ic = 0, ncz = 0, sid = -1;
465 for (ic = 0; ic < ncz; ic++) {
476 XPDFORM(m,
"response instance for sid: %d not found", sid);
485 XPDFORM(m,
"type: %d, message: '%s' notified to %d clients", type, msg, nc);
500 XPDLOC(SMGR,
"ProofServ::TerminateProofServ")
510 TRACE(XERR,
"ord: problems signalling process: "<<
fSrvPID);
531 XPDLOC(SMGR,
"ProofServ::VerifyProofServ")
539 int len =
sizeof(kXR_int32);
540 char *buf =
new char[len];
542 kXR_int32 ifw = (
forward) ? (kXR_int32)1 : (kXR_int32)0;
543 ifw =
static_cast<kXR_int32
>(htonl(ifw));
544 memcpy(buf, &ifw,
sizeof(kXR_int32));
549 msg =
"could not propagate ping to proofsrv";
570 XPDLOC(SMGR,
"ProofServ::BroadcastPriority")
575 int len =
sizeof(kXR_int32);
576 char *buf =
new char[len];
577 kXR_int32 itmp = priority;
578 itmp =
static_cast<kXR_int32
>(htonl(itmp));
579 memcpy(buf, &itmp,
sizeof(kXR_int32));
583 TRACE(XERR,
"problems telling proofserv");
588 TRACE(DBG,
"priority "<<priority<<
" sent over");
598 XPDLOC(SMGR,
"ProofServ::SendData")
600 TRACE(HDBG,
"length: "<<len<<
" bytes (cid: "<<cid<<
")");
608 if (cid < 0 || cid > (
int)(
fClients.size() - 1) || !(csid =
fClients.at(cid))) {
609 XPDFORM(msg,
"client ID not found (cid: %d, size: %d)", cid,
fClients.size());
612 if (!rs && !(csid->
R())) {
613 XPDFORM(msg,
"client not connected: csid: %p, cid: %d, fSid: %d",
614 csid, cid, csid->
Sid());
642 XPDLOC(SMGR,
"ProofServ::SendDataN")
644 TRACE(HDBG,
"length: "<<len<<
" bytes");
653 for (ic = 0; ic < (int)
fClients.size(); ic++) {
654 if ((csid =
fClients.at(ic)) && csid->
P()) {
656 if (!resp || resp->
Send(kXR_attn,
kXPD_msg, buff, len) != 0)
670 XPDLOC(SMGR,
"ProofServ::ExportBuf")
674 XrdOucString tag, alias;
681 XPDFORM(buf,
" | %d %s %s %d %d",
id, tag.c_str(), alias.c_str(), status, nc);
682 TRACE(HDBG,
"buf: "<< buf);
693 XPDLOC(SMGR,
"ProofServ::CreateUNIXSock")
712 TRACE(XERR,
"unable to open / create admin path "<<
fAdminPath <<
"; errno = "<<errno);
719 if (unlink(
fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
720 XPDPRT(
"WARNING: path exists: unable to delete it:" 742 TRACE(XERR,
"unable to open / create path for UNIX socket; tried path "<<
fUNIXSockPath);
765 XPDLOC(SMGR,
"ProofServ::SetAdminPath")
772 if (!assert)
return 0;
775 FILE *fpid = fopen(a,
"a");
779 TRACE(XERR,
"unable to open / create admin path "<<
fAdminPath <<
"; errno = "<<errno);
786 if ((fpid = fopen(fn.c_str(),
"a"))) {
790 TRACE(XERR,
"unable to open / create status path "<< fn <<
"; errno = "<<errno);
798 TRACE(XERR,
"unable to get info for user "<<
fClient<<
"; errno = "<<errno);
802 TRACE(XERR,
"unable to give ownership of the status file "<< fn <<
" to user; errno = "<<errno);
818 XPDLOC(SMGR,
"ProofServ::Resume")
828 msg =
"could not propagate resume to proofsrv";
846 XPDLOC(PMGR,
"ExportWorkerDescription")
848 XrdOucString *wrks = (XrdOucString *)s;
851 if (w->
fType ==
'M') {
852 if (wrks->length() > 0) wrks->insert(
'&',0);
853 wrks->insert(w->
Export(), 0);
856 if (wrks->length() > 0)
885 XPDLOC(PMGR,
"DumpQueries")
889 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
891 ", # of queries: "<<
fQueries.size());
892 std::list<XrdProofQuery *>::iterator ii;
896 TRACE(ALL,
" +++ #"<<i<<
" tag:"<< (*ii)->GetTag()<<
" dset: "<<
897 (*ii)->GetDSName()<<
" size:"<<(*ii)->GetDSSize());
899 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
908 if (!tag || strlen(tag) <= 0)
return q;
914 std::list<XrdProofQuery *>::iterator ii;
917 if (!strcmp(tag, q->
GetTag()))
break;
930 if (!tag || strlen(tag) <= 0)
return;
936 std::list<XrdProofQuery *>::iterator ii;
939 if (!strcmp(tag, q->
GetTag()))
break;
957 int *actw = (
int *)s;
975 XPDLOC(PMGR,
"SendClusterInfo")
983 int neffs = (actw*1000)/
fWorkers.Num();
984 TRACE(DBG,
"# sessions: "<<nsess<<
", # active: "<<nacti<<
", # effective: "<<neffs/1000.);
989 int len = 3*
sizeof(kXR_int32);
990 char *buf =
new char[len];
992 kXR_int32 itmp = nsess;
993 itmp =
static_cast<kXR_int32
>(htonl(itmp));
994 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
995 off +=
sizeof(kXR_int32);
997 itmp =
static_cast<kXR_int32
>(htonl(itmp));
998 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
999 off +=
sizeof(kXR_int32);
1001 itmp =
static_cast<kXR_int32
>(htonl(itmp));
1002 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
1006 TRACE(XERR,
"problems sending proofserv");
1017 int shutopt,
int shutdel,
bool changeown,
int &nc)
1019 XPDLOC(PMGR,
"SendClusterInfo")
1029 if (!skipcheck || oldvers) {
1032 std::vector<XrdClientID *>::iterator i;
1034 if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
1037 if (nc <= 0 && (!isrec || oldvers)) {
1038 int idlet = -1, disct = -1, now = time(0);
1041 if (idlet <= 0) idlet = -1;
1044 if (disct <= 0) disct = -1;
1046 (shutopt == 1 && (idlet >= shutdel)) ||
1047 (shutopt == 2 && (disct >= shutdel))) {
1063 if (emsg.length() > 0) {
1064 TRACE(XERR,emsg.c_str());
void ExportBuf(XrdOucString &buf)
Fill buf with relevant info about this session.
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
const char * Export(const char *ord=0)
Export current content in a form understood by parsing algorithms inside the PROOF session...
XrdOucString fUNIXSockPath
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
void SetParent(XrdClientID *cid)
int Resume()
Send a resume message to the this session.
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
~XrdProofdProofServ()
Destructor.
void RemoveProofServ(XrdProofdProofServ *xps)
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
void AddWorker(const char *o, XrdProofWorker *w)
Add a worker assigned to this session with label 'o'.
void SetIdle()
Set status to idle and update the related time stamp.
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
#define TRACE(Flag, Args)
#define XPD_CLNT_VERSION_OK(p, v)
XrdOucHash< XrdProofWorker > fWorkers
void Reset()
Reset this instance.
void Broadcast(const char *msg, int type=kXPD_srvmsg)
Broadcast message 'msg' at 'type' to the attached clients.
std::list< XrdProofQuery * > fQueries
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
XrdProofdResponse * fResponse
void ClearWorkers()
Decrease worker counters and clean-up the list.
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
XrdProofdProtocol * P() const
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
void SetProtocol(XrdProofdProtocol *p)
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
void DeleteUNIXSock()
Delete the current UNIX socket.
void ExportWorkers(XrdOucString &wrks)
Export the assigned workers in the format understood by proofserv.
static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
Decrease active session counters on worker w.
int GetNClients(bool check)
Get the number of connected clients.
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
int changeown(const std::string &path, uid_t u, gid_t g)
Change the ownership of 'path' to the entity described by {u,g}.
#define XrdSysMutexHelper
bool SkipCheck()
Return the value of fSkipCheck and reset it to false.
int TerminateProofServ(bool changeown)
Terminate the associated process.
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
void SetConnection(XrdProofdResponse *r)
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
int SendData(int cid, void *buff, int len)
Send data to client cid.
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
void forward(const LAYERDATA &prevLayerData, LAYERDATA &currLayerData)
apply the weights (and functions) in forward direction of the DNN
XrdProofdProtocol * fProtocol
int DisconnectTime()
Return the time (in secs) all clients have been disconnected.
static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
Decrease active session counters on worker w.
int IdleTime()
Return the time (in secs) the session has been idle.
unsigned short Sid() const
void SetRunning()
Set status to running and reset the related time stamp.
XrdProofdProofServ()
Constructor.
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
std::vector< XrdClientID * > fClients
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int Send(void)
Auxilliary Send method.
XrdProofdResponse * R() const
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
void SendClusterInfo(int nsess, int nacti)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.