13#include "XrdNet/XrdNet.hh"
29 fMutex =
new XrdSysRecMutex;
71 std::vector<XrdClientID *>::iterator i;
94 XPDLOC(PMGR,
"DecreaseWorkerCounters")
114 XPDLOC(PMGR,
"DumpWorkerCounters")
131 XrdSysMutexHelper mhp(
fMutex);
143 if (!o || !w)
return;
145 XrdSysMutexHelper mhp(
fMutex);
147 fWorkers.Add(o, w, 0, Hash_keepdata);
155 XPDLOC(SMGR,
"ProofServ::RemoveWorker")
159 TRACE(DBG,
"removing: "<<o);
161 XrdSysMutexHelper mhp(
fMutex);
175 XPDLOC(SMGR,
"ProofServ::Reset")
182 FILE *fpid = fopen(fn.c_str(),
"r");
185 if (fgets(
line,
sizeof(
line), fpid)) {
189 TRACE(XERR,
"problems reading from file "<<fn);
193 TRACE(DBG,
"file: "<<fn<<
", st:"<<st);
194 XrdSysMutexHelper mhp(
fMutex);
214 XrdSysMutexHelper mhp(
fMutex);
267 XrdSysMutexHelper mhp(
fMutex);
279 XPDLOC(SMGR,
"ProofServ::GetClientID")
284 TRACE(XERR,
"negative ID: protocol error!");
289 { XrdSysMutexHelper mhp(
fMutex);
308 if (cid >= (
int)
fClients.capacity())
313 for (; ic <= cid; ic++)
333 XPDLOC(SMGR,
"ProofServ::FreeClientID")
335 TRACE(DBG,
"svrPID: "<<
fSrvPID<<
", pid: "<<pid<<
", session status: "<<
339 TRACE(XERR,
"undefined pid!");
344 { XrdSysMutexHelper mhp(
fMutex);
347 std::vector<XrdClientID *>::iterator i;
349 if ((*i) && (*i)->P()) {
350 if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
367 if (
TRACING(REQ) && (rc == 0)) {
369 TRACE(REQ, spid<<
": slot for client pid: "<<pid<<
" has been reset");
382 XrdSysMutexHelper mhp(
fMutex);
387 std::vector<XrdClientID *>::iterator i;
389 if ((*i) && (*i)->P() && (*i)->P()->Link())
fNClients++;
403 XrdSysMutexHelper mhp(
fMutex);
408 return ((disct > 0) ? disct : -1);
417 XrdSysMutexHelper mhp(
fMutex);
422 return ((idlet > 0) ? idlet : -1);
431 XrdSysMutexHelper mhp(
fMutex);
443 XrdSysMutexHelper mhp(
fMutex);
454 XPDLOC(SMGR,
"ProofServ::Broadcast")
461 if (msg && (len = strlen(msg)) > 0) {
463 int ic = 0, ncz = 0, sid = -1;
465 for (ic = 0; ic < ncz; ic++) {
466 { XrdSysMutexHelper mhp(
fMutex);
476 XPDFORM(
m,
"response instance for sid: %d not found", sid);
485 XPDFORM(
m,
"type: %d, message: '%s' notified to %d clients",
type, msg, nc);
500 XPDLOC(SMGR,
"ProofServ::TerminateProofServ")
510 TRACE(XERR,
"ord: problems signalling process: "<<
fSrvPID);
512 XrdSysMutexHelper mhp(
fMutex);
531 XPDLOC(SMGR,
"ProofServ::VerifyProofServ")
539 int len =
sizeof(kXR_int32);
540 char *buf =
new char[len];
542 kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
543 ifw =
static_cast<kXR_int32
>(htonl(ifw));
544 memcpy(buf, &ifw,
sizeof(kXR_int32));
546 { XrdSysMutexHelper mhp(
fMutex);
549 msg =
"could not propagate ping to proofsrv";
570 XPDLOC(SMGR,
"ProofServ::BroadcastPriority")
572 XrdSysMutexHelper mhp(
fMutex);
575 int len =
sizeof(kXR_int32);
576 char *buf =
new char[len];
577 kXR_int32 itmp = priority;
578 itmp =
static_cast<kXR_int32
>(htonl(itmp));
579 memcpy(buf, &itmp,
sizeof(kXR_int32));
583 TRACE(XERR,
"problems telling proofserv");
588 TRACE(DBG,
"priority "<<priority<<
" sent over");
598 XPDLOC(SMGR,
"ProofServ::SendData")
600 TRACE(HDBG,
"length: "<<len<<
" bytes (cid: "<<cid<<
")");
607 { XrdSysMutexHelper mhp(
fMutex);
608 if (cid < 0 || cid > (
int)(
fClients.size() - 1) || !(csid =
fClients.at(cid))) {
609 XPDFORM(msg,
"client ID not found (cid: %d, size: %d)", cid,
fClients.size());
612 if (!rs && !(csid->
R())) {
613 XPDFORM(msg,
"client not connected: csid: %p, cid: %d, fSid: %d",
614 csid, cid, csid->
Sid());
642 XPDLOC(SMGR,
"ProofServ::SendDataN")
644 TRACE(HDBG,
"length: "<<len<<
" bytes");
648 XrdSysMutexHelper mhp(
fMutex);
654 if ((csid =
fClients.at(ic)) && csid->
P()) {
656 if (!resp || resp->
Send(kXR_attn,
kXPD_msg, buff, len) != 0)
670 XPDLOC(SMGR,
"ProofServ::ExportBuf")
674 XrdOucString tag, alias;
675 { XrdSysMutexHelper mhp(
fMutex);
681 XPDFORM(buf,
" | %d %s %s %d %d",
id, tag.c_str(), alias.c_str(), status, nc);
682 TRACE(HDBG,
"buf: "<< buf);
693 XPDLOC(SMGR,
"ProofServ::CreateUNIXSock")
712 TRACE(XERR,
"unable to open / create admin path "<<
fAdminPath <<
"; errno = "<<errno);
719 if (unlink(
fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
720 XPDPRT(
"WARNING: path exists: unable to delete it:"
728 if ((fd = open(
fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
742 TRACE(XERR,
"unable to open / create path for UNIX socket; tried path "<<
fUNIXSockPath);
765 XPDLOC(SMGR,
"ProofServ::SetAdminPath")
767 XrdSysMutexHelper mhp(
fMutex);
772 if (!assert)
return 0;
775 FILE *fpid = fopen(
a,
"a");
779 TRACE(XERR,
"unable to open / create admin path "<<
fAdminPath <<
"; errno = "<<errno);
786 if ((fpid = fopen(fn.c_str(),
"a"))) {
790 TRACE(XERR,
"unable to open / create status path "<< fn <<
"; errno = "<<errno);
798 TRACE(XERR,
"unable to get info for user "<<
fClient<<
"; errno = "<<errno);
802 TRACE(XERR,
"unable to give ownership of the status file "<< fn <<
" to user; errno = "<<errno);
818 XPDLOC(SMGR,
"ProofServ::Resume")
825 { XrdSysMutexHelper mhp(
fMutex);
828 msg =
"could not propagate resume to proofsrv";
846 XPDLOC(PMGR,
"ExportWorkerDescription")
848 XrdOucString *wrks = (XrdOucString *)s;
851 if (w->
fType ==
'M') {
852 if (wrks->length() > 0) wrks->insert(
'&',0);
853 wrks->insert(w->
Export(), 0);
856 if (wrks->length() > 0)
875 XrdSysMutexHelper mhp(
fMutex);
885 XPDLOC(PMGR,
"DumpQueries")
887 XrdSysMutexHelper mhp(
fMutex);
889 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
891 ", # of queries: "<<
fQueries.size());
892 std::list<XrdProofQuery *>::iterator ii;
896 TRACE(ALL,
" +++ #"<<i<<
" tag:"<< (*ii)->GetTag()<<
" dset: "<<
897 (*ii)->GetDSName()<<
" size:"<<(*ii)->GetDSSize());
899 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
908 if (!tag || strlen(tag) <= 0)
return q;
910 XrdSysMutexHelper mhp(
fMutex);
914 std::list<XrdProofQuery *>::iterator ii;
917 if (!strcmp(tag,
q->GetTag()))
break;
930 if (!tag || strlen(tag) <= 0)
return;
932 XrdSysMutexHelper mhp(
fMutex);
936 std::list<XrdProofQuery *>::iterator ii;
939 if (!strcmp(tag,
q->GetTag()))
break;
957 int *actw = (
int *)s;
975 XPDLOC(PMGR,
"SendClusterInfo")
983 int neffs = (actw*1000)/
fWorkers.Num();
984 TRACE(DBG,
"# sessions: "<<nsess<<
", # active: "<<nacti<<
", # effective: "<<neffs/1000.);
986 XrdSysMutexHelper mhp(
fMutex);
989 int len = 3*
sizeof(kXR_int32);
990 char *buf =
new char[len];
992 kXR_int32 itmp = nsess;
993 itmp =
static_cast<kXR_int32
>(htonl(itmp));
994 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
995 off +=
sizeof(kXR_int32);
997 itmp =
static_cast<kXR_int32
>(htonl(itmp));
998 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
999 off +=
sizeof(kXR_int32);
1001 itmp =
static_cast<kXR_int32
>(htonl(itmp));
1002 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
1006 TRACE(XERR,
"problems sending proofserv");
1017 int shutopt,
int shutdel,
bool changeown,
int &nc)
1019 XPDLOC(PMGR,
"SendClusterInfo")
1024 { XrdSysMutexHelper mhp(
fMutex);
1029 if (!skipcheck || oldvers) {
1032 std::vector<XrdClientID *>::iterator i;
1034 if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
1037 if (nc <= 0 && (!isrec || oldvers)) {
1038 int idlet = -1, disct = -1, now = time(0);
1041 if (idlet <= 0) idlet = -1;
1044 if (disct <= 0) disct = -1;
1046 (shutopt == 1 && (idlet >= shutdel)) ||
1047 (shutopt == 2 && (disct >= shutdel))) {
1063 if (emsg.length() > 0) {
1064 TRACE(XERR,emsg.c_str());
#define TRACE(Flag, Args)
static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
Decrease active session counters on worker w.
static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
Decrease active session counters on worker w.
static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
#define XPD_CLNT_VERSION_OK(p, v)
XrdProofdResponse * R() const
unsigned short Sid() const
XrdProofdProtocol * P() const
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
void RemoveProofServ(XrdProofdProofServ *xps)
const char * Export(const char *ord=0)
Export current content in a form understood by parsing algorithms inside the PROOF session,...
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
int Resume()
Send a resume message to the this session.
XrdOucHash< XrdProofWorker > fWorkers
void SetIdle()
Set status to idle and update the related time stamp.
void ClearWorkers()
Decrease worker counters and clean-up the list.
void AddWorker(const char *o, XrdProofWorker *w)
Add a worker assigned to this session with label 'o'.
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
void DeleteUNIXSock()
Delete the current UNIX socket.
void SetConnection(XrdProofdResponse *r)
int DisconnectTime()
Return the time (in secs) all clients have been disconnected.
std::list< XrdProofQuery * > fQueries
void ExportBuf(XrdOucString &buf)
Fill buf with relevant info about this session.
void SetProtocol(XrdProofdProtocol *p)
XrdProofdProofServ()
Constructor.
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
int SendData(int cid, void *buff, int len)
Send data to client cid.
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int TerminateProofServ(bool changeown)
Terminate the associated process.
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
XrdOucString fUNIXSockPath
void SendClusterInfo(int nsess, int nacti)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdProtocol * fProtocol
void ExportWorkers(XrdOucString &wrks)
Export the assigned workers in the format understood by proofserv.
int GetNClients(bool check)
Get the number of connected clients.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
void Reset()
Reset this instance.
~XrdProofdProofServ()
Destructor.
void Broadcast(const char *msg, int type=kXPD_srvmsg)
Broadcast message 'msg' at 'type' to the attached clients.
std::vector< XrdClientID * > fClients
int IdleTime()
Return the time (in secs) the session has been idle.
XrdProofdResponse * fResponse
bool SkipCheck()
Return the value of fSkipCheck and reset it to false.
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
void SetRunning()
Set status to running and reset the related time stamp.
void SetParent(XrdClientID *cid)
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
int Send(void)
Auxilliary Send method.
int changeown(const std::string &path, uid_t u, gid_t g)
Change the ownership of 'path' to the entity described by {u,g}.