ROOT  6.06/09
Reference Guide
XrdProofdProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 #include <sys/stat.h>
12 
13 #include "XrdNet/XrdNet.hh"
14 
15 #include "XrdProofdAux.h"
16 #include "XrdProofdProofServ.h"
17 #include "XrdProofWorker.h"
18 #include "XrdProofSched.h"
19 #include "XrdProofdManager.h"
20 
21 // Tracing utils
22 #include "XrdProofdTrace.h"
23 
24 ////////////////////////////////////////////////////////////////////////////////
25 /// Constructor
26 
28 {
29  fMutex = new XrdSysRecMutex;
30  fResponse = 0;
31  fProtocol = 0;
32  fParent = 0;
33  fPingSem = 0;
34  fStartMsg = 0;
36  fSrvPID = -1;
38  fPLiteNWrks = -1;
39  fID = -1;
40  fIsShutdown = false;
41  fIsValid = true; // It is created for a valid server ...
42  fSkipCheck = false;
43  fProtVer = -1;
44  fNClients = 0;
45  fClients.reserve(10);
46  fDisconnectTime = -1;
47  fSetIdleTime = time(0);
48  fROOT = 0;
49  // Strings
50  fAdminPath = "";
51  fAlias = "";
52  fClient = "";
53  fFileout = "";
54  fGroup = "";
55  fOrdinal = "";
56  fTag = "";
57  fUserEnvs = "";
58  fUNIXSock = 0;
59  fUNIXSockPath = "";
60  fQueries.clear();
61 }
62 
63 ////////////////////////////////////////////////////////////////////////////////
64 /// Destructor
65 
67 {
70 
71  std::vector<XrdClientID *>::iterator i;
72  for (i = fClients.begin(); i != fClients.end(); i++)
73  if (*i)
74  delete (*i);
75  fClients.clear();
76 
77  // Cleanup worker info
78  ClearWorkers();
79 
80  // Cleanup queries info
81  fQueries.clear();
82 
83  // Remove the associated UNIX socket path
84  unlink(fUNIXSockPath.c_str());
85 
86  SafeDel(fMutex);
87 }
88 
89 ////////////////////////////////////////////////////////////////////////////////
90 /// Decrease active session counters on worker w
91 
92 static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
93 {
94  XPDLOC(PMGR, "DecreaseWorkerCounters")
95 
97 
98  if (w && xps) {
99  w->RemoveProofServ(xps);
100  TRACE(REQ, w->fHost.c_str() <<" done");
101  // Check next
102  return 0;
103  }
104 
105  // Not enough info: stop
106  return 1;
107 }
108 
109 ////////////////////////////////////////////////////////////////////////////////
110 /// Decrease active session counters on worker w
111 
112 static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
113 {
114  XPDLOC(PMGR, "DumpWorkerCounters")
115 
116  if (w) {
117  TRACE(ALL, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
118  // Check next
119  return 0;
120  }
121 
122  // Not enough info: stop
123  return 1;
124 }
125 
126 ////////////////////////////////////////////////////////////////////////////////
127 /// Decrease worker counters and clean-up the list
128 
130 {
132 
133  // Decrease workers' counters and remove this from workers
134  fWorkers.Apply(DecreaseWorkerCounters, this);
135  fWorkers.Purge();
136 }
137 
138 ////////////////////////////////////////////////////////////////////////////////
139 /// Add a worker assigned to this session with label 'o'
140 
142 {
143  if (!o || !w) return;
144 
146 
147  fWorkers.Add(o, w, 0, Hash_keepdata);
148 }
149 
150 ////////////////////////////////////////////////////////////////////////////////
151 /// Release worker assigned to this session with label 'o'
152 
154 {
155  XPDLOC(SMGR, "ProofServ::RemoveWorker")
156 
157  if (!o) return;
158 
159  TRACE(DBG,"removing: "<<o);
160 
162 
163  XrdProofWorker *w = fWorkers.Find(o);
164  if (w) w->RemoveProofServ(this);
165  fWorkers.Del(o);
166  if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
167 }
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Reset this instance, broadcasting a message to the clients.
171 /// return 1 if top master, 0 otherwise
172 
173 int XrdProofdProofServ::Reset(const char *msg, int type)
174 {
175  XPDLOC(SMGR, "ProofServ::Reset")
176 
177  int rc = 0;
178  // Read the status file
179  int st = -1;
180  XrdOucString fn;
181  XPDFORM(fn, "%s.status", fAdminPath.c_str());
182  FILE *fpid = fopen(fn.c_str(), "r");
183  if (fpid) {
184  char line[64];
185  if (fgets(line, sizeof(line), fpid)) {
186  if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = 0;
187  st = atoi(line);
188  } else {
189  TRACE(XERR,"problems reading from file "<<fn);
190  }
191  fclose(fpid);
192  }
193  TRACE(DBG,"file: "<<fn<<", st:"<<st);
195  // Broadcast msg
196  if (st == 4) {
197  Broadcast("idle-timeout", type);
198  } else {
199  Broadcast(msg, type);
200  }
201  // What kind of server is this?
202  if (fSrvType == kXPD_TopMaster) rc = 1;
203  // Reset instance
204  Reset();
205  // Done
206  return rc;
207 }
208 
209 ////////////////////////////////////////////////////////////////////////////////
210 /// Reset this instance
211 
213 {
215 
216  fResponse = 0;
217  fProtocol = 0;
218  fParent = 0;
220  SafeDel(fPingSem);
221  fSrvPID = -1;
222  fID = -1;
223  fIsShutdown = false;
224  fIsValid = false;
225  fSkipCheck = false;
226  fProtVer = -1;
227  fNClients = 0;
228  fClients.clear();
229  fDisconnectTime = -1;
230  fSetIdleTime = -1;
231  fROOT = 0;
232  // Cleanup worker info
233  ClearWorkers();
234  // ClearWorkers depends on the fSrvType and fStatus
236  fPLiteNWrks = -1;
237  fStatus = kXPD_idle;
238  // Cleanup queries info
239  fQueries.clear();
240  // Strings
241  fAdminPath = "";
242  fAlias = "";
243  fClient = "";
244  fFileout = "";
245  fGroup = "";
246  fOrdinal = "";
247  fTag = "";
248  fUserEnvs = "";
249  DeleteUNIXSock();
250 }
251 
252 ////////////////////////////////////////////////////////////////////////////////
253 /// Delete the current UNIX socket
254 
256 {
258  unlink(fUNIXSockPath.c_str());
259  fUNIXSockPath = "";
260 }
261 
262 ////////////////////////////////////////////////////////////////////////////////
263 /// Return the value of fSkipCheck and reset it to false
264 
266 {
268 
269  bool rc = fSkipCheck;
270  fSkipCheck = false;
271  return rc;
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Get instance corresponding to cid
276 
278 {
279  XPDLOC(SMGR, "ProofServ::GetClientID")
280 
281  XrdClientID *csid = 0;
282 
283  if (cid < 0) {
284  TRACE(XERR, "negative ID: protocol error!");
285  return csid;
286  }
287 
288  XrdOucString msg;
289  { XrdSysMutexHelper mhp(fMutex);
290 
291  // Count new attached client
292  fNClients++;
293 
294  // If in the allocate range reset the corresponding instance and
295  // return it
296  if (cid < (int)fClients.size()) {
297  csid = fClients.at(cid);
298  csid->Reset();
299 
300  // Notification message
301  if (TRACING(DBG)) {
302  XPDFORM(msg, "cid: %d, size: %d", cid, fClients.size());
303  }
304  }
305 
306  if (!csid) {
307  // If not, allocate a new one; we need to resize (double it)
308  if (cid >= (int)fClients.capacity())
309  fClients.reserve(2*fClients.capacity());
310 
311  // Allocate new elements (for fast access we need all of them)
312  int ic = (int)fClients.size();
313  for (; ic <= cid; ic++)
314  fClients.push_back((csid = new XrdClientID()));
315 
316  // Notification message
317  if (TRACING(DBG)) {
318  XPDFORM(msg, "cid: %d, new size: %d", cid, fClients.size());
319  }
320  }
321  }
322  TRACE(DBG, msg);
323 
324  // We are done
325  return csid;
326 }
327 
328 ////////////////////////////////////////////////////////////////////////////////
329 /// Free instance corresponding to protocol connecting process 'pid'
330 
332 {
333  XPDLOC(SMGR, "ProofServ::FreeClientID")
334 
335  TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
336  fStatus<<", # clients: "<< fNClients);
337  int rc = -1;
338  if (pid <= 0) {
339  TRACE(XERR, "undefined pid!");
340  return rc;
341  }
342  if (!IsValid()) return rc;
343 
344  { XrdSysMutexHelper mhp(fMutex);
345 
346  // Remove this from the list of clients
347  std::vector<XrdClientID *>::iterator i;
348  for (i = fClients.begin(); i != fClients.end(); ++i) {
349  if ((*i) && (*i)->P()) {
350  if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
351  if (fProtocol == (*i)->P()) {
352  SetProtocol(0);
353  SetConnection(0);
354  }
355  (*i)->Reset();
356  if (fParent == (*i)) SetParent(0);
357  fNClients--;
358  // Record time of last disconnection
359  if (fNClients <= 0)
360  fDisconnectTime = time(0);
361  rc = 0;
362  break;
363  }
364  }
365  }
366  }
367  if (TRACING(REQ) && (rc == 0)) {
368  int spid = SrvPID();
369  TRACE(REQ, spid<<": slot for client pid: "<<pid<<" has been reset");
370  }
371 
372  // Out of range
373  return rc;
374 }
375 
376 ////////////////////////////////////////////////////////////////////////////////
377 /// Get the number of connected clients. If check is true check that
378 /// they are still valid ones and free the slots for the invalid ones
379 
381 {
383 
384  if (check) {
385  fNClients = 0;
386  // Remove this from the list of clients
387  std::vector<XrdClientID *>::iterator i;
388  for (i = fClients.begin(); i != fClients.end(); ++i) {
389  if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
390  }
391  }
392 
393  // Done
394  return fNClients;
395 }
396 
397 ////////////////////////////////////////////////////////////////////////////////
398 /// Return the time (in secs) all clients have been disconnected.
399 /// Return -1 if the session is running
400 
402 {
404 
405  int disct = -1;
406  if (fDisconnectTime > 0)
407  disct = time(0) - fDisconnectTime;
408  return ((disct > 0) ? disct : -1);
409 }
410 
411 ////////////////////////////////////////////////////////////////////////////////
412 /// Return the time (in secs) the session has been idle.
413 /// Return -1 if the session is running
414 
416 {
418 
419  int idlet = -1;
420  if (fStatus == kXPD_idle)
421  idlet = time(0) - fSetIdleTime;
422  return ((idlet > 0) ? idlet : -1);
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Set status to idle and update the related time stamp
427 ///
428 
430 {
432 
433  fStatus = kXPD_idle;
434  fSetIdleTime = time(0);
435 }
436 
437 ////////////////////////////////////////////////////////////////////////////////
438 /// Set status to running and reset the related time stamp
439 ///
440 
442 {
444 
446  fSetIdleTime = -1;
447 }
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Broadcast message 'msg' at 'type' to the attached clients
451 
452 void XrdProofdProofServ::Broadcast(const char *msg, int type)
453 {
454  XPDLOC(SMGR, "ProofServ::Broadcast")
455 
456  // Backward-compatibility check
457  int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
458 
459  XrdOucString m;
460  int len = 0, nc = 0;
461  if (msg && (len = strlen(msg)) > 0) {
462  XrdProofdProtocol *p = 0;
463  int ic = 0, ncz = 0, sid = -1;
464  { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
465  for (ic = 0; ic < ncz; ic++) {
466  { XrdSysMutexHelper mhp(fMutex);
467  p = fClients.at(ic)->P();
468  sid = fClients.at(ic)->Sid(); }
469  // Send message
470  if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
471  XrdProofdResponse *response = p->Response(sid);
472  if (response) {
473  response->Send(kXR_attn, (XProofActionCode)type, (void *)msg, len);
474  nc++;
475  } else {
476  XPDFORM(m, "response instance for sid: %d not found", sid);
477  }
478  }
479  if (m.length() > 0)
480  TRACE(XERR, m);
481  m = "";
482  }
483  }
484  if (TRACING(DBG)) {
485  XPDFORM(m, "type: %d, message: '%s' notified to %d clients", type, msg, nc);
486  XPDPRT(m);
487  }
488 }
489 
490 ////////////////////////////////////////////////////////////////////////////////
491 /// Terminate the associated process.
492 /// A shutdown interrupt message is forwarded.
493 /// If add is TRUE (default) the pid is added to the list of processes
494 /// requested to terminate.
495 /// Return the pid of tyhe terminated process on success, -1 if not allowed
496 /// or other errors occured.
497 
499 {
500  XPDLOC(SMGR, "ProofServ::TerminateProofServ")
501 
502  int pid = fSrvPID;
503  TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
504 
505  // Send a terminate signal to the proofserv
506  if (pid > -1) {
507  XrdProofUI ui;
508  XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
509  if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
510  TRACE(XERR, "ord: problems signalling process: "<<fSrvPID);
511  }
513  fIsShutdown = true;
514  }
515 
516  // Failed
517  return -1;
518 }
519 
520 ////////////////////////////////////////////////////////////////////////////////
521 /// Check if the associated proofserv process is alive. This is done
522 /// asynchronously by asking the process to callback and proof its vitality.
523 /// We do not block here: the caller may setup a waiting structure if
524 /// required.
525 /// If forward is true, the process will forward the request to the following
526 /// tiers.
527 /// Return 0 if the request was send successfully, -1 in case of error.
528 
530 {
531  XPDLOC(SMGR, "ProofServ::VerifyProofServ")
532 
533  TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
534 
535  int rc = 0;
536  XrdOucString msg;
537 
538  // Prepare buffer
539  int len = sizeof(kXR_int32);
540  char *buf = new char[len];
541  // Option
542  kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
543  ifw = static_cast<kXR_int32>(htonl(ifw));
544  memcpy(buf, &ifw, sizeof(kXR_int32));
545 
546  { XrdSysMutexHelper mhp(fMutex);
547  // Propagate the ping request
548  if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
549  msg = "could not propagate ping to proofsrv";
550  rc = -1;
551  }
552  }
553  // Cleanup
554  delete[] buf;
555 
556  // Notify errors, if any
557  if (rc != 0)
558  TRACE(XERR, msg);
559 
560  // Done
561  return rc;
562 }
563 
564 ////////////////////////////////////////////////////////////////////////////////
565 /// Broadcast a new group priority value to the worker servers.
566 /// Called by masters.
567 
569 {
570  XPDLOC(SMGR, "ProofServ::BroadcastPriority")
571 
573 
574  // Prepare buffer
575  int len = sizeof(kXR_int32);
576  char *buf = new char[len];
577  kXR_int32 itmp = priority;
578  itmp = static_cast<kXR_int32>(htonl(itmp));
579  memcpy(buf, &itmp, sizeof(kXR_int32));
580  // Send over
581  if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
582  // Failure
583  TRACE(XERR,"problems telling proofserv");
584  SafeDelArray(buf);
585  return -1;
586  }
587  SafeDelArray(buf);
588  TRACE(DBG, "priority "<<priority<<" sent over");
589  // Done
590  return 0;
591 }
592 
593 ////////////////////////////////////////////////////////////////////////////////
594 /// Send data to client cid.
595 
596 int XrdProofdProofServ::SendData(int cid, void *buff, int len)
597 {
598  XPDLOC(SMGR, "ProofServ::SendData")
599 
600  TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
601 
602  int rs = 0;
603  XrdOucString msg;
604 
605  // Get corresponding instance
606  XrdClientID *csid = 0;
607  { XrdSysMutexHelper mhp(fMutex);
608  if (cid < 0 || cid > (int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
609  XPDFORM(msg, "client ID not found (cid: %d, size: %d)", cid, fClients.size());
610  rs = -1;
611  }
612  if (!rs && !(csid->R())) {
613  XPDFORM(msg, "client not connected: csid: %p, cid: %d, fSid: %d",
614  csid, cid, csid->Sid());
615  rs = -1;
616  }
617  }
618 
619  //
620  // The message is strictly for the client requiring it
621  if (!rs) {
622  rs = -1;
623  XrdProofdResponse *response = csid->R() ? csid->R() : 0;
624  if (response)
625  if (!response->Send(kXR_attn, kXPD_msg, buff, len))
626  rs = 0;
627  } else {
628  // Notify error
629  TRACE(XERR, msg);
630  }
631 
632  // Done
633  return rs;
634 }
635 
636 ////////////////////////////////////////////////////////////////////////////////
637 /// Send data over the open client links of this session.
638 /// Used when all the connected clients are eligible to receive the message.
639 
640 int XrdProofdProofServ::SendDataN(void *buff, int len)
641 {
642  XPDLOC(SMGR, "ProofServ::SendDataN")
643 
644  TRACE(HDBG, "length: "<<len<<" bytes");
645 
646  XrdOucString msg;
647 
649 
650  // Send to connected clients
651  XrdClientID *csid = 0;
652  int ic = 0;
653  for (ic = 0; ic < (int) fClients.size(); ic++) {
654  if ((csid = fClients.at(ic)) && csid->P()) {
655  XrdProofdResponse *resp = csid->R();
656  if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
657  return -1;
658  }
659  }
660 
661  // Done
662  return 0;
663 }
664 
665 ////////////////////////////////////////////////////////////////////////////////
666 /// Fill buf with relevant info about this session
667 
668 void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
669 {
670  XPDLOC(SMGR, "ProofServ::ExportBuf")
671 
672  buf = "";
673  int id, status, nc;
674  XrdOucString tag, alias;
675  { XrdSysMutexHelper mhp(fMutex);
676  id = fID;
677  status = fStatus;
678  nc = fNClients;
679  tag = fTag;
680  alias = fAlias; }
681  XPDFORM(buf, " | %d %s %s %d %d", id, tag.c_str(), alias.c_str(), status, nc);
682  TRACE(HDBG, "buf: "<< buf);
683 
684  // Done
685  return;
686 }
687 
688 ////////////////////////////////////////////////////////////////////////////////
689 /// Create UNIX socket for internal connections
690 
692 {
693  XPDLOC(SMGR, "ProofServ::CreateUNIXSock")
694 
695  TRACE(DBG, "enter");
696 
697  // Make sure we do not have already a socket
698  if (fUNIXSock) {
699  TRACE(DBG,"UNIX socket exists already! ("<<fUNIXSockPath<<")");
700  return 0;
701  }
702 
703  // Create socket
704  fUNIXSock = new XrdNet(edest);
705 
706  // Make sure the admin path exists
707  if (fAdminPath.length() > 0) {
708  FILE *fadm = fopen(fAdminPath.c_str(), "a");
709  if (fadm) {
710  fclose(fadm);
711  } else {
712  TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
713  return -1;
714  }
715  }
716 
717  // Check the path
718  bool ok = 0;
719  if (unlink(fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
720  XPDPRT("WARNING: path exists: unable to delete it:"
721  " try to use it anyway " <<fUNIXSockPath);
722  ok = 1;
723  }
724 
725  // Create the path
726  int fd = 0;
727  if (!ok) {
728  if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
729  TRACE(XERR, "unable to create path: " <<fUNIXSockPath);
730  return -1;
731  }
732  close(fd);
733  }
734  if (fd > -1) {
735  // Change ownership
736  if (fUNIXSock->Bind((char *)fUNIXSockPath.c_str())) {
737  TRACE(XERR, " problems binding to UNIX socket; path: " <<fUNIXSockPath);
738  return -1;
739  } else
740  TRACE(DBG, "path for UNIX for socket is " <<fUNIXSockPath);
741  } else {
742  TRACE(XERR, "unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
743  return -1;
744  }
745 
746  // Change ownership if running as super-user
747  if (!geteuid()) {
748  XrdProofUI ui;
749  XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
750  if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
751  TRACE(XERR, "unable to change ownership of the UNIX socket"<<fUNIXSockPath);
752  return -1;
753  }
754  }
755 
756  // We are done
757  return 0;
758 }
759 
760 ////////////////////////////////////////////////////////////////////////////////
761 /// Set the admin path and make sure the file exists
762 
763 int XrdProofdProofServ::SetAdminPath(const char *a, bool assert, bool setown)
764 {
765  XPDLOC(SMGR, "ProofServ::SetAdminPath")
766 
768 
769  fAdminPath = a;
770 
771  // If we are not asked to assert the file we are done
772  if (!assert) return 0;
773 
774  // Check if the session file exists
775  FILE *fpid = fopen(a, "a");
776  if (fpid) {
777  fclose(fpid);
778  } else {
779  TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
780  return -1;
781  }
782 
783  // Check if the status file exists
784  XrdOucString fn;
785  XPDFORM(fn, "%s.status", a);
786  if ((fpid = fopen(fn.c_str(), "a"))) {
787  fprintf(fpid, "%d", fStatus);
788  fclose(fpid);
789  } else {
790  TRACE(XERR, "unable to open / create status path "<< fn << "; errno = "<<errno);
791  return -1;
792  }
793 
794  if (setown) {
795  // Set the ownership of the status file to the user
796  XrdProofUI ui;
797  if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
798  TRACE(XERR, "unable to get info for user "<<fClient<<"; errno = "<<errno);
799  return -1;
800  }
801  if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
802  TRACE(XERR, "unable to give ownership of the status file "<< fn << " to user; errno = "<<errno);
803  return -1;
804  }
805  }
806 
807  // Done
808  return 0;
809 }
810 
811 ////////////////////////////////////////////////////////////////////////////////
812 /// Send a resume message to the this session. It is assumed that the session
813 /// has at least one async query to process and will immediately send
814 /// a getworkers request (the workers are already assigned).
815 
817 {
818  XPDLOC(SMGR, "ProofServ::Resume")
819 
820  TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
821 
822  int rc = 0;
823  XrdOucString msg;
824 
825  { XrdSysMutexHelper mhp(fMutex);
826  //
827  if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
828  msg = "could not propagate resume to proofsrv";
829  rc = -1;
830  }
831  }
832 
833  // Notify errors, if any
834  if (rc != 0)
835  TRACE(XERR, msg);
836 
837  // Done
838  return rc;
839 }
840 
841 ////////////////////////////////////////////////////////////////////////////////
842 /// Decrease active session counters on worker w
843 
844 static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
845 {
846  XPDLOC(PMGR, "ExportWorkerDescription")
847 
848  XrdOucString *wrks = (XrdOucString *)s;
849  if (w && wrks) {
850  // Master at the beginning
851  if (w->fType == 'M') {
852  if (wrks->length() > 0) wrks->insert('&',0);
853  wrks->insert(w->Export(), 0);
854  } else {
855  // Add separator if not the first
856  if (wrks->length() > 0)
857  (*wrks) += '&';
858  // Add export version of the info
859  (*wrks) += w->Export(k);
860  }
861  TRACE(HDBG, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
862  // Check next
863  return 0;
864  }
865 
866  // Not enough info: stop
867  return 1;
868 }
869 
870 ////////////////////////////////////////////////////////////////////////////////
871 /// Export the assigned workers in the format understood by proofserv
872 
873 void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
874 {
876  wrks = "";
877  fWorkers.Apply(ExportWorkerDescription, (void *)&wrks);
878 }
879 
880 ////////////////////////////////////////////////////////////////////////////////
881 /// Export the assigned workers in the format understood by proofserv
882 
884 {
885  XPDLOC(PMGR, "DumpQueries")
886 
888 
889  TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
890  TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
891  ", # of queries: "<< fQueries.size());
892  std::list<XrdProofQuery *>::iterator ii;
893  int i = 0;
894  for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
895  i++;
896  TRACE(ALL," +++ #"<<i<<" tag:"<< (*ii)->GetTag()<<" dset: "<<
897  (*ii)->GetDSName()<<" size:"<<(*ii)->GetDSSize());
898  }
899  TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
900 }
901 
902 ////////////////////////////////////////////////////////////////////////////////
903 /// Get query with tag form the list of queries
904 
906 {
907  XrdProofQuery *q = 0;
908  if (!tag || strlen(tag) <= 0) return q;
909 
911 
912  if (fQueries.size() <= 0) return q;
913 
914  std::list<XrdProofQuery *>::iterator ii;
915  for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
916  q = *ii;
917  if (!strcmp(tag, q->GetTag())) break;
918  q = 0;
919  }
920  // Done
921  return q;
922 }
923 
924 ////////////////////////////////////////////////////////////////////////////////
925 /// remove query with tag form the list of queries
926 
927 void XrdProofdProofServ::RemoveQuery(const char *tag)
928 {
929  XrdProofQuery *q = 0;
930  if (!tag || strlen(tag) <= 0) return;
931 
933 
934  if (fQueries.size() <= 0) return;
935 
936  std::list<XrdProofQuery *>::iterator ii;
937  for (ii = fQueries.begin(); ii != fQueries.end(); ii++) {
938  q = *ii;
939  if (!strcmp(tag, q->GetTag())) break;
940  q = 0;
941  }
942  // remove it
943  if (q) {
944  fQueries.remove(q);
945  delete q;
946  }
947 
948  // Done
949  return;
950 }
951 
952 ////////////////////////////////////////////////////////////////////////////////
953 /// Decrease active session counters on worker w
954 
955 static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
956 {
957  int *actw = (int *)s;
958  if (w && actw) {
959  *actw += w->GetNActiveSessions();
960  // Check next
961  return 0;
962  }
963 
964  // Not enough info: stop
965  return 1;
966 }
967 
968 ////////////////////////////////////////////////////////////////////////////////
969 /// Calculate the effective number of users on this session nodes
970 /// and communicate it to the master together with the total number
971 /// of sessions and the number of active sessions. for monitoring issues.
972 
973 void XrdProofdProofServ::SendClusterInfo(int nsess, int nacti)
974 {
975  XPDLOC(PMGR, "SendClusterInfo")
976 
977  // Only if we are active
978  if (fWorkers.Num() <= 0) return;
979 
980  int actw = 0;
981  fWorkers.Apply(CountEffectiveSessions, (void *)&actw);
982  // The number of effective sessions * 1000
983  int neffs = (actw*1000)/fWorkers.Num();
984  TRACE(DBG, "# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.);
985 
987 
988  // Prepare buffer
989  int len = 3*sizeof(kXR_int32);
990  char *buf = new char[len];
991  kXR_int32 off = 0;
992  kXR_int32 itmp = nsess;
993  itmp = static_cast<kXR_int32>(htonl(itmp));
994  memcpy(buf + off, &itmp, sizeof(kXR_int32));
995  off += sizeof(kXR_int32);
996  itmp = nacti;
997  itmp = static_cast<kXR_int32>(htonl(itmp));
998  memcpy(buf + off, &itmp, sizeof(kXR_int32));
999  off += sizeof(kXR_int32);
1000  itmp = neffs;
1001  itmp = static_cast<kXR_int32>(htonl(itmp));
1002  memcpy(buf + off, &itmp, sizeof(kXR_int32));
1003  // Send over
1004  if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
1005  // Failure
1006  TRACE(XERR,"problems sending proofserv");
1007  }
1008  SafeDelArray(buf);
1009 }
1010 
1011 ////////////////////////////////////////////////////////////////////////////////
1012 /// Calculate the effective number of users on this session nodes
1013 /// and communicate it to the master together with the total number
1014 /// of sessions and the number of active sessions. for monitoring issues.
1015 
1016 int XrdProofdProofServ::CheckSession(bool oldvers, bool isrec,
1017  int shutopt, int shutdel, bool changeown, int &nc)
1018 {
1019  XPDLOC(PMGR, "SendClusterInfo")
1020 
1021  XrdOucString emsg;
1022  bool rmsession = 0;
1023  nc = -1;
1024  { XrdSysMutexHelper mhp(fMutex);
1025 
1026  bool skipcheck = fSkipCheck;
1027  fSkipCheck = false;
1028 
1029  if (!skipcheck || oldvers) {
1030  nc = 0;
1031  // Remove this from the list of clients
1032  std::vector<XrdClientID *>::iterator i;
1033  for (i = fClients.begin(); i != fClients.end(); ++i) {
1034  if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
1035  }
1036  // Check if we need to shutdown it
1037  if (nc <= 0 && (!isrec || oldvers)) {
1038  int idlet = -1, disct = -1, now = time(0);
1039  if (fStatus == kXPD_idle)
1040  idlet = now - fSetIdleTime;
1041  if (idlet <= 0) idlet = -1;
1042  if (fDisconnectTime > 0)
1043  disct = now - fDisconnectTime;
1044  if (disct <= 0) disct = -1;
1045  if ((fSrvType != kXPD_TopMaster) ||
1046  (shutopt == 1 && (idlet >= shutdel)) ||
1047  (shutopt == 2 && (disct >= shutdel))) {
1048  // Send a terminate signal to the proofserv
1049  if (fSrvPID > -1) {
1050  XrdProofUI ui;
1051  XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
1052  if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
1053  XPDFORM(emsg, "ord: problems signalling process: %d", fSrvPID);
1054  }
1055  fIsShutdown = true;
1056  }
1057  rmsession = 1;
1058  }
1059  }
1060  }
1061  }
1062  // Notify error, if any
1063  if (emsg.length() > 0) {
1064  TRACE(XERR,emsg.c_str());
1065  }
1066  // Done
1067  return rmsession;
1068 }
void ExportBuf(XrdOucString &buf)
Fill buf with relevant info about this session.
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
const char * Export(const char *ord=0)
Export current content in a form understood by parsing algorithms inside the PROOF session...
XrdOucString fUNIXSockPath
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
void SetParent(XrdClientID *cid)
int Resume()
Send a resume message to the this session.
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
#define SafeDel(x)
Definition: XrdProofdAux.h:335
~XrdProofdProofServ()
Destructor.
void RemoveProofServ(XrdProofdProofServ *xps)
#define kXPD_TopMaster
#define TRACING(x)
TLine * line
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
XrdProofdProtocol * P() const
static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
#define assert(cond)
Definition: unittest.h:542
void AddWorker(const char *o, XrdProofWorker *w)
Add a worker assigned to this session with label 'o'.
void SetIdle()
Set status to idle and update the related time stamp.
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
#define TRACE(Flag, Args)
Definition: TGHtml.h:124
#define XPD_CLNT_VERSION_OK(p, v)
XrdSysSemWait * fPingSem
XrdOucHash< XrdProofWorker > fWorkers
void Reset()
Reset this instance.
void Broadcast(const char *msg, int type=kXPD_srvmsg)
Broadcast message 'msg' at 'type' to the attached clients.
TAlienJobStatus * status
Definition: TAlienJob.cxx:51
TArc * a
Definition: textangle.C:12
std::list< XrdProofQuery * > fQueries
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
XrdSysRecMutex * fMutex
XrdProofdResponse * fResponse
void ClearWorkers()
Decrease worker counters and clean-up the list.
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
const char * GetTag()
#define XPDPRT(x)
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
Double_t x[n]
Definition: legend1.C:17
void SetProtocol(XrdProofdProtocol *p)
#define SafeDelArray(x)
Definition: XrdProofdAux.h:338
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
void DeleteUNIXSock()
Delete the current UNIX socket.
XFontStruct * id
Definition: TGX11.cxx:108
void ExportWorkers(XrdOucString &wrks)
Export the assigned workers in the format understood by proofserv.
#define XPDLOC(d, x)
static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
Decrease active session counters on worker w.
int GetNClients(bool check)
Get the number of connected clients.
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
int changeown(const std::string &path, uid_t u, gid_t g)
Change the ownership of 'path' to the entity described by {u,g}.
Definition: proofexecv.cxx:802
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
bool SkipCheck()
Return the value of fSkipCheck and reset it to false.
#define XrdSysError
Definition: XpdSysError.h:8
TMarker * m
Definition: textangle.C:8
int TerminateProofServ(bool changeown)
Terminate the associated process.
unsigned short Sid() const
#define kXPD_AnyServer
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
void SetConnection(XrdProofdResponse *r)
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
XrdSrvBuffer * fStartMsg
#define XPDFORM
Definition: XrdProofdAux.h:381
XProofActionCode
int type
Definition: TGX11.cxx:120
int SendData(int cid, void *buff, int len)
Send data to client cid.
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
XrdProofdProtocol * fProtocol
int DisconnectTime()
Return the time (in secs) all clients have been disconnected.
static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
Decrease active session counters on worker w.
int IdleTime()
Return the time (in secs) the session has been idle.
void SetRunning()
Set status to running and reset the related time stamp.
XrdProofdProofServ()
Constructor.
XrdProofdResponse * R() const
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
int Active() const
std::vector< XrdClientID * > fClients
float * q
Definition: THbookFile.cxx:87
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int Send(void)
Auxilliary Send method.
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
void SendClusterInfo(int nsess, int nacti)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
XrdOucString fHost