ROOT  6.06/09
Reference Guide
XrdProofdProofServ.h
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: G. Ganis June 2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_XrdProofdProofServ
13 #define ROOT_XrdProofdProofServ
14 
15 #include <string.h>
16 #include <unistd.h>
17 #include <sys/uio.h>
18 #if !defined(__FreeBSD__) && !defined(__OpenBSD__) && !defined(__APPLE__)
19 #include <sched.h>
20 #endif
21 
22 // #include <list>
23 // #include <map>
24 #include <vector>
25 
26 #include "XpdSysPthread.h"
27 #include "XpdSysSemWait.h"
28 
29 #include "Xrd/XrdLink.hh"
30 #include "XrdOuc/XrdOucHash.hh"
31 
32 #include "XProofProtocol.h"
33 #include "XrdProofdClient.h"
34 #include "XrdProofWorker.h"
35 
36 //////////////////////////////////////////////////////////////////////////
37 // //
38 // XrdSrvBuffer //
39 // //
40 // Authors: G. Ganis, CERN, 2005 //
41 // //
42 // The following structure is used to store buffers to be sent or //
43 // received from clients //
44 // //
45 //////////////////////////////////////////////////////////////////////////
46 class XrdSrvBuffer {
47 public:
48  int fSize;
49  char *fBuff;
50 
51  XrdSrvBuffer(char *bp=0, int sz=0, bool dup=0) {
52  fBuff = 0;
53  fSize = 0;
54  if (dup && bp && sz > 0) {
55  fMembuf = (char *)malloc(sz);
56  if (fMembuf) {
57  memcpy(fMembuf, bp, sz);
58  fBuff = fMembuf;
59  fSize = sz;
60  }
61  } else {
62  fBuff = fMembuf = bp;
63  fSize = sz;
64  }}
66 
67 private:
68  char *fMembuf;
69 };
70 
71 
72 //////////////////////////////////////////////////////////////////////////
73 // //
74 // XrdProofQuery //
75 // //
76 // Helper class describing a query. Used for scheduling. //
77 // //
78 //////////////////////////////////////////////////////////////////////////
80 {
81  XrdOucString fTag;
82  XrdOucString fDSName;
83  long fDSSize;
84 public:
85  XrdProofQuery(const char *t, const char *n = "", long s = 0) : fTag(t), fDSName(n), fDSSize(s) { }
86 
87  const char *GetTag() { return fTag.c_str(); }
88  const char *GetDSName() { return fDSName.c_str(); }
89  long GetDSSize() { return fDSSize; }
90 };
91 
92 
93 class XrdROOT;
94 
95 //////////////////////////////////////////////////////////////////////////
96 // //
97 // XrdProofdProofServ //
98 // //
99 // Authors: G. Ganis, CERN, 2005 //
100 // //
101 // This class represent an instance of TProofServ //
102 // //
103 //////////////////////////////////////////////////////////////////////////
104 #define kXPROOFSRVTAGMAX 64
105 #define kXPROOFSRVALIASMAX 256
106 
107 class XrdProofGroup;
108 class XrdSysSemWait;
109 
111 {
112 
113 public:
116 
117  void AddWorker(const char *o, XrdProofWorker *w);
118  inline const char *AdminPath() const { XrdSysMutexHelper mhp(fMutex); return fAdminPath.c_str(); }
119  inline const char *Alias() const { XrdSysMutexHelper mhp(fMutex); return fAlias.c_str(); }
120  void Broadcast(const char *msg, int type = kXPD_srvmsg);
121  int BroadcastPriority(int priority);
122  inline const char *Client() const { XrdSysMutexHelper mhp(fMutex); return fClient.c_str(); }
123  int CheckSession(bool oldvers, bool isrec,
124  int shutopt, int shutdel, bool changeown, int &nc);
125  XrdProofQuery *CurrentQuery() { XrdSysMutexHelper mhp(fMutex); return (fQueries.empty()? 0 : fQueries.front()); }
127  { XrdSysMutexHelper mhp(fMutex); if (fStartMsg) delete fStartMsg; fStartMsg = 0;}
128  int DisconnectTime();
129  void DumpQueries();
131  if (q) { fQueries.push_back(q); }; return fQueries.size(); }
132  void ExportBuf(XrdOucString &buf);
133  void ExportWorkers(XrdOucString &wrks);
134  inline const char *Fileout() const { XrdSysMutexHelper mhp(fMutex); return fFileout.c_str(); }
135  int FreeClientID(int pid);
136  XrdClientID *GetClientID(int cid);
137  int GetNClients(bool check);
138  XrdProofQuery *GetQuery(const char *tag);
139  inline const char *Group() const { XrdSysMutexHelper mhp(fMutex); return fGroup.c_str(); }
140  int IdleTime();
141  inline short int ID() const { XrdSysMutexHelper mhp(fMutex); return fID; }
142  inline bool IsPLite() const { XrdSysMutexHelper mhp(fMutex); return (fPLiteNWrks > 1); }
143  inline bool IsShutdown() const { XrdSysMutexHelper mhp(fMutex); return fIsShutdown; }
144  inline bool IsValid() const { XrdSysMutexHelper mhp(fMutex); return fIsValid; }
145  inline bool Match(short int id) const { XrdSysMutexHelper mhp(fMutex); return (id == fID); }
146  inline const char *Ordinal() const { XrdSysMutexHelper mhp(fMutex); return fOrdinal.c_str(); }
147  inline XrdClientID *Parent() const { XrdSysMutexHelper mhp(fMutex); return fParent; }
148  inline void PingSem() const { XrdSysMutexHelper mhp(fMutex); if (fPingSem) fPingSem->Post(); }
149  inline int PLiteNWrks() const { XrdSysMutexHelper mhp(fMutex); return fPLiteNWrks; }
150  inline XrdProofdProtocol *Protocol() const { XrdSysMutexHelper mhp(fMutex); return fProtocol; }
151  inline std::list<XrdProofQuery *> *Queries() const
152  { return (std::list<XrdProofQuery *> *)&fQueries; }
153  void RemoveQuery(const char *tag);
154  void RemoveWorker(const char *o);
155  void Reset();
156  int Reset(const char *msg, int type);
157  int Resume();
158 
159  inline XrdROOT *ROOT() const { XrdSysMutexHelper mhp(fMutex); return fROOT; }
160  inline XrdProofdResponse *Response() const { XrdSysMutexHelper mhp(fMutex); return fResponse; }
161  int SendData(int cid, void *buff, int len);
162  int SendDataN(void *buff, int len);
163  void SendClusterInfo(int nsess, int nacti);
164  int SetAdminPath(const char *a, bool assert, bool setown);
165  void SetAlias(const char *a) { XrdSysMutexHelper mhp(fMutex); fAlias = a; }
166  void SetClient(const char *c) { XrdSysMutexHelper mhp(fMutex); fClient = c; }
168 
169  void SetFileout(const char *f) { XrdSysMutexHelper mhp(fMutex); fFileout = f; }
170  inline void SetGroup(const char *g) { XrdSysMutexHelper mhp(fMutex); fGroup = g; }
171  void SetIdle();
172  inline void SetID(short int id) { XrdSysMutexHelper mhp(fMutex); fID = id;}
173  void SetOrdinal(const char *o) { XrdSysMutexHelper mhp(fMutex); fOrdinal = o; }
174  inline void SetParent(XrdClientID *cid) { XrdSysMutexHelper mhp(fMutex); fParent = cid; }
175  inline void SetPLiteNWrks(int n) { XrdSysMutexHelper mhp(fMutex); fPLiteNWrks = n; }
177  inline void SetProtVer(int pv) { XrdSysMutexHelper mhp(fMutex); fProtVer = pv; }
178  inline void SetROOT(XrdROOT *r) { XrdSysMutexHelper mhp(fMutex); fROOT = r; }
179  void SetRunning();
180  inline void SetShutdown() { XrdSysMutexHelper mhp(fMutex); fIsShutdown = true; }
181  inline void SetSkipCheck() { XrdSysMutexHelper mhp(fMutex); fSkipCheck = true; }
182  void SetSrvPID(int pid) { XrdSysMutexHelper mhp(fMutex); fSrvPID = pid; }
183  inline void SetSrvType(int id) { XrdSysMutexHelper mhp(fMutex); fSrvType = id; }
184  inline void SetStartMsg(XrdSrvBuffer *sm) { XrdSysMutexHelper mhp(fMutex); delete fStartMsg; fStartMsg = sm; }
185  inline void SetStatus(int st) { XrdSysMutexHelper mhp(fMutex); fStatus = st; }
186  void SetTag(const char *t) { XrdSysMutexHelper mhp(fMutex); fTag = t; }
187  void SetUNIXSockPath(const char *s) { XrdSysMutexHelper mhp(fMutex); fUNIXSockPath = s; };
188  void SetUserEnvs(const char *t) { XrdSysMutexHelper mhp(fMutex); fUserEnvs = t; }
189  inline void SetValid(bool valid = 1) { XrdSysMutexHelper mhp(fMutex); fIsValid = valid; }
190  bool SkipCheck();
191  inline int SrvPID() const { XrdSysMutexHelper mhp(fMutex); return fSrvPID; }
192  inline int SrvType() const { XrdSysMutexHelper mhp(fMutex); return fSrvType; }
193  inline XrdSrvBuffer *StartMsg() const { XrdSysMutexHelper mhp(fMutex); return fStartMsg; }
194  inline int Status() const { XrdSysMutexHelper mhp(fMutex); return fStatus;}
195  inline const char *Tag() const { XrdSysMutexHelper mhp(fMutex); return fTag.c_str(); }
196  int TerminateProofServ(bool changeown);
197  inline const char *UserEnvs() const { XrdSysMutexHelper mhp(fMutex); return fUserEnvs.c_str(); }
198  int VerifyProofServ(bool fw);
199  inline XrdOucHash<XrdProofWorker> *Workers() const
200  { XrdSysMutexHelper mhp(fMutex); return (XrdOucHash<XrdProofWorker> *)&fWorkers; }
201 
202  // UNIX socket related methods
203  int CreateUNIXSock(XrdSysError *edest);
204  void DeleteUNIXSock();
205  XrdNet *UNIXSock() const { return fUNIXSock; }
206  const char *UNIXSockPath() const { return fUNIXSockPath.c_str(); }
207 
208  private:
209 
211  XrdProofdProtocol *fProtocol; // Protocol instance attached to this session
212  XrdProofdResponse *fResponse; // Response instance attached to this session
213 
214  XrdClientID *fParent; // Parent creating this session
215  int fNClients; // Number of attached clients
216  std::vector<XrdClientID *> fClients; // Attached clients stream ids
217  XrdOucHash<XrdProofWorker> fWorkers; // Workers assigned to the session
218 
219  XrdSysSemWait *fPingSem; // To sychronize ping requests
220 
221  XrdSrvBuffer *fStartMsg; // Msg with start processing info
222 
223  time_t fDisconnectTime; // Time at which all clients disconnected
224  time_t fSetIdleTime; // Time at which the session went idle
225 
226  int fStatus;
227  int fSrvPID; // Srv process ID
228  int fSrvType;
229  short int fID;
230  char fProtVer;
231  XrdOucString fFileout;
232 
233  int fPLiteNWrks; // # of wrks when PLite master
234 
235  XrdNet *fUNIXSock; // UNIX server socket for internal connections
236  XrdOucString fUNIXSockPath; // UNIX server socket path
237 
238  bool fIsShutdown; // Whether asked to shutdown
239  bool fIsValid; // Validity flag
240  bool fSkipCheck; // Skip next validity check
241 
242  XrdOucString fAlias; // Session alias
243  XrdOucString fClient; // Client name
244  XrdOucString fTag; // Session unique tag
245  XrdOucString fOrdinal; // Session ordinal number
246  XrdOucString fUserEnvs; // List of envs received from the user
247  XrdOucString fAdminPath; // Admin file in the form "<active-sessions>/<usr>.<grp>.<pid>"
248 
249  XrdROOT *fROOT; // ROOT version run by this session
250 
251  XrdOucString fGroup; // Group, if any, to which the owner belongs
252 
253  void ClearWorkers();
254 
256  { XrdSysMutexHelper mhp(fMutex); fPingSem = new XrdSysSemWait(0);}
258  { XrdSysMutexHelper mhp(fMutex); if (fPingSem) delete fPingSem; fPingSem = 0;}
259  std::list<XrdProofQuery *> fQueries; // the enqueued queries of this session
260 };
261 #endif
XrdProofdResponse * Response() const
void ExportBuf(XrdOucString &buf)
Fill buf with relevant info about this session.
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
XrdOucString fUNIXSockPath
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
void SetParent(XrdClientID *cid)
int Resume()
Send a resume message to the this session.
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
XrdROOT * ROOT() const
~XrdProofdProofServ()
Destructor.
int Enqueue(XrdProofQuery *q)
void SetAlias(const char *a)
#define assert(cond)
Definition: unittest.h:542
short int ID() const
void AddWorker(const char *o, XrdProofWorker *w)
Add a worker assigned to this session with label 'o'.
void SetIdle()
Set status to idle and update the related time stamp.
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
bool Match(short int id) const
XrdOucString fDSName
XrdSysSemWait * fPingSem
XrdOucHash< XrdProofWorker > fWorkers
void Reset()
Reset this instance.
void Broadcast(const char *msg, int type=kXPD_srvmsg)
Broadcast message 'msg' at 'type' to the attached clients.
TArc * a
Definition: textangle.C:12
std::list< XrdProofQuery * > fQueries
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
XrdSysRecMutex * fMutex
XrdClientID * Parent() const
void SetOrdinal(const char *o)
XrdProofdResponse * fResponse
void ClearWorkers()
Decrease worker counters and clean-up the list.
XrdSrvBuffer(char *bp=0, int sz=0, bool dup=0)
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
const char * GetTag()
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
const char * UNIXSockPath() const
std::list< XrdProofQuery * > * Queries() const
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
Definition: memory.h:94
void SetProtocol(XrdProofdProtocol *p)
XrdProofQuery(const char *t, const char *n="", long s=0)
XrdOucString fTag
const char * Client() const
void DeleteUNIXSock()
Delete the current UNIX socket.
XFontStruct * id
Definition: TGX11.cxx:108
void SetUNIXSockPath(const char *s)
void ExportWorkers(XrdOucString &wrks)
Export the assigned workers in the format understood by proofserv.
#define XrdSysSemWait
Definition: XpdSysSemWait.h:8
void SetGroup(const char *g)
XrdProofdProtocol * Protocol() const
void SetROOT(XrdROOT *r)
void SetFileout(const char *f)
XrdSrvBuffer * StartMsg() const
const char * Ordinal() const
int GetNClients(bool check)
Get the number of connected clients.
ROOT::R::TRInterface & r
Definition: Object.C:4
int changeown(const std::string &path, uid_t u, gid_t g)
Change the ownership of 'path' to the entity described by {u,g}.
Definition: proofexecv.cxx:802
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
bool SkipCheck()
Return the value of fSkipCheck and reset it to false.
#define XrdSysError
Definition: XpdSysError.h:8
const char * AdminPath() const
int TerminateProofServ(bool changeown)
Terminate the associated process.
bool IsShutdown() const
void SetID(short int id)
void SetTag(const char *t)
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
const char * UserEnvs() const
void SetConnection(XrdProofdResponse *r)
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
XrdNet * UNIXSock() const
XrdSrvBuffer * fStartMsg
double f(double x)
int type
Definition: TGX11.cxx:120
int SendData(int cid, void *buff, int len)
Send data to client cid.
const char * Alias() const
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
void SetValid(bool valid=1)
void SetUserEnvs(const char *t)
XrdOucHash< XrdProofWorker > * Workers() const
XrdProofdProtocol * fProtocol
int DisconnectTime()
Return the time (in secs) all clients have been disconnected.
const char * Tag() const
int IdleTime()
Return the time (in secs) the session has been idle.
const char * Fileout() const
void SetRunning()
Set status to running and reset the related time stamp.
XrdProofdProofServ()
Constructor.
std::vector< XrdClientID * > fClients
float * q
Definition: THbookFile.cxx:87
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
const Int_t n
Definition: legend1.C:16
void SetStartMsg(XrdSrvBuffer *sm)
void SetClient(const char *c)
void SendClusterInfo(int nsess, int nacti)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
const char * Group() const
const char * GetDSName()
XrdProofQuery * CurrentQuery()