Logo ROOT   6.18/05
Reference Guide
XrdProofdProofServ.h
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis June 2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_XrdProofdProofServ
13#define ROOT_XrdProofdProofServ
14
15#include <string.h>
16#include <unistd.h>
17#include <sys/uio.h>
18#if !defined(__FreeBSD__) && !defined(__OpenBSD__) && !defined(__APPLE__)
19#include <sched.h>
20#endif
21
22// #include <list>
23// #include <map>
24#include <vector>
25
26#include "XpdSysPthread.h"
27#include "XpdSysSemWait.h"
28
29#include "Xrd/XrdLink.hh"
30#include "XrdOuc/XrdOucHash.hh"
31
32#include "XProofProtocol.h"
33#include "XrdProofdClient.h"
34#include "XrdProofWorker.h"
35
36//////////////////////////////////////////////////////////////////////////
37// //
38// XrdSrvBuffer //
39// //
40// Authors: G. Ganis, CERN, 2005 //
41// //
42// The following structure is used to store buffers to be sent or //
43// received from clients //
44// //
45//////////////////////////////////////////////////////////////////////////
47public:
48 int fSize;
49 char *fBuff;
50
51 XrdSrvBuffer(char *bp=0, int sz=0, bool dup=0) {
52 fBuff = 0;
53 fSize = 0;
54 if (dup && bp && sz > 0) {
55 fMembuf = (char *)malloc(sz);
56 if (fMembuf) {
57 memcpy(fMembuf, bp, sz);
58 fBuff = fMembuf;
59 fSize = sz;
60 }
61 } else {
62 fBuff = fMembuf = bp;
63 fSize = sz;
64 }}
66
67private:
68 char *fMembuf;
69};
70
71
72//////////////////////////////////////////////////////////////////////////
73// //
74// XrdProofQuery //
75// //
76// Helper class describing a query. Used for scheduling. //
77// //
78//////////////////////////////////////////////////////////////////////////
80{
81 XrdOucString fTag;
82 XrdOucString fDSName;
83 long fDSSize;
84public:
85 XrdProofQuery(const char *t, const char *n = "", long s = 0) : fTag(t), fDSName(n), fDSSize(s) { }
86
87 const char *GetTag() { return fTag.c_str(); }
88 const char *GetDSName() { return fDSName.c_str(); }
89 long GetDSSize() { return fDSSize; }
90};
91
92
93class XrdROOT;
94
95//////////////////////////////////////////////////////////////////////////
96// //
97// XrdProofdProofServ //
98// //
99// Authors: G. Ganis, CERN, 2005 //
100// //
101// This class represent an instance of TProofServ //
102// //
103//////////////////////////////////////////////////////////////////////////
104#define kXPROOFSRVTAGMAX 64
105#define kXPROOFSRVALIASMAX 256
106
107class XrdProofGroup;
108class XrdSysSemWait;
109
111{
112
113public:
116
117 void AddWorker(const char *o, XrdProofWorker *w);
118 inline const char *AdminPath() const { XrdSysMutexHelper mhp(fMutex); return fAdminPath.c_str(); }
119 inline const char *Alias() const { XrdSysMutexHelper mhp(fMutex); return fAlias.c_str(); }
120 void Broadcast(const char *msg, int type = kXPD_srvmsg);
121 int BroadcastPriority(int priority);
122 inline const char *Client() const { XrdSysMutexHelper mhp(fMutex); return fClient.c_str(); }
123 int CheckSession(bool oldvers, bool isrec,
124 int shutopt, int shutdel, bool changeown, int &nc);
125 XrdProofQuery *CurrentQuery() { XrdSysMutexHelper mhp(fMutex); return (fQueries.empty()? 0 : fQueries.front()); }
127 { XrdSysMutexHelper mhp(fMutex); if (fStartMsg) delete fStartMsg; fStartMsg = 0;}
128 int DisconnectTime();
129 void DumpQueries();
131 if (q) { fQueries.push_back(q); }; return fQueries.size(); }
132 void ExportBuf(XrdOucString &buf);
133 void ExportWorkers(XrdOucString &wrks);
134 inline const char *Fileout() const { XrdSysMutexHelper mhp(fMutex); return fFileout.c_str(); }
135 int FreeClientID(int pid);
136 XrdClientID *GetClientID(int cid);
137 int GetNClients(bool check);
138 XrdProofQuery *GetQuery(const char *tag);
139 inline const char *Group() const { XrdSysMutexHelper mhp(fMutex); return fGroup.c_str(); }
140 int IdleTime();
141 inline short int ID() const { XrdSysMutexHelper mhp(fMutex); return fID; }
142 inline bool IsPLite() const { XrdSysMutexHelper mhp(fMutex); return (fPLiteNWrks > 1); }
143 inline bool IsShutdown() const { XrdSysMutexHelper mhp(fMutex); return fIsShutdown; }
144 inline bool IsValid() const { XrdSysMutexHelper mhp(fMutex); return fIsValid; }
145 inline bool Match(short int id) const { XrdSysMutexHelper mhp(fMutex); return (id == fID); }
146 inline const char *Ordinal() const { XrdSysMutexHelper mhp(fMutex); return fOrdinal.c_str(); }
147 inline XrdClientID *Parent() const { XrdSysMutexHelper mhp(fMutex); return fParent; }
148 inline void PingSem() const { XrdSysMutexHelper mhp(fMutex); if (fPingSem) fPingSem->Post(); }
149 inline int PLiteNWrks() const { XrdSysMutexHelper mhp(fMutex); return fPLiteNWrks; }
151 inline std::list<XrdProofQuery *> *Queries() const
152 { return (std::list<XrdProofQuery *> *)&fQueries; }
153 void RemoveQuery(const char *tag);
154 void RemoveWorker(const char *o);
155 void Reset();
156 int Reset(const char *msg, int type);
157 int Resume();
158
159 inline XrdROOT *ROOT() const { XrdSysMutexHelper mhp(fMutex); return fROOT; }
161 int SendData(int cid, void *buff, int len);
162 int SendDataN(void *buff, int len);
163 void SendClusterInfo(int nsess, int nacti);
164 int SetAdminPath(const char *a, bool assert, bool setown);
165 void SetAlias(const char *a) { XrdSysMutexHelper mhp(fMutex); fAlias = a; }
166 void SetClient(const char *c) { XrdSysMutexHelper mhp(fMutex); fClient = c; }
168
169 void SetFileout(const char *f) { XrdSysMutexHelper mhp(fMutex); fFileout = f; }
170 inline void SetGroup(const char *g) { XrdSysMutexHelper mhp(fMutex); fGroup = g; }
171 void SetIdle();
172 inline void SetID(short int id) { XrdSysMutexHelper mhp(fMutex); fID = id;}
173 void SetOrdinal(const char *o) { XrdSysMutexHelper mhp(fMutex); fOrdinal = o; }
174 inline void SetParent(XrdClientID *cid) { XrdSysMutexHelper mhp(fMutex); fParent = cid; }
175 inline void SetPLiteNWrks(int n) { XrdSysMutexHelper mhp(fMutex); fPLiteNWrks = n; }
177 inline void SetProtVer(int pv) { XrdSysMutexHelper mhp(fMutex); fProtVer = pv; }
178 inline void SetROOT(XrdROOT *r) { XrdSysMutexHelper mhp(fMutex); fROOT = r; }
179 void SetRunning();
180 inline void SetShutdown() { XrdSysMutexHelper mhp(fMutex); fIsShutdown = true; }
181 inline void SetSkipCheck() { XrdSysMutexHelper mhp(fMutex); fSkipCheck = true; }
182 void SetSrvPID(int pid) { XrdSysMutexHelper mhp(fMutex); fSrvPID = pid; }
183 inline void SetSrvType(int id) { XrdSysMutexHelper mhp(fMutex); fSrvType = id; }
184 inline void SetStartMsg(XrdSrvBuffer *sm) { XrdSysMutexHelper mhp(fMutex); delete fStartMsg; fStartMsg = sm; }
185 inline void SetStatus(int st) { XrdSysMutexHelper mhp(fMutex); fStatus = st; }
186 void SetTag(const char *t) { XrdSysMutexHelper mhp(fMutex); fTag = t; }
187 void SetUNIXSockPath(const char *s) { XrdSysMutexHelper mhp(fMutex); fUNIXSockPath = s; };
188 void SetUserEnvs(const char *t) { XrdSysMutexHelper mhp(fMutex); fUserEnvs = t; }
189 inline void SetValid(bool valid = 1) { XrdSysMutexHelper mhp(fMutex); fIsValid = valid; }
190 bool SkipCheck();
191 inline int SrvPID() const { XrdSysMutexHelper mhp(fMutex); return fSrvPID; }
192 inline int SrvType() const { XrdSysMutexHelper mhp(fMutex); return fSrvType; }
193 inline XrdSrvBuffer *StartMsg() const { XrdSysMutexHelper mhp(fMutex); return fStartMsg; }
194 inline int Status() const { XrdSysMutexHelper mhp(fMutex); return fStatus;}
195 inline const char *Tag() const { XrdSysMutexHelper mhp(fMutex); return fTag.c_str(); }
197 inline const char *UserEnvs() const { XrdSysMutexHelper mhp(fMutex); return fUserEnvs.c_str(); }
198 int VerifyProofServ(bool fw);
199 inline XrdOucHash<XrdProofWorker> *Workers() const
200 { XrdSysMutexHelper mhp(fMutex); return (XrdOucHash<XrdProofWorker> *)&fWorkers; }
201
202 // UNIX socket related methods
203 int CreateUNIXSock(XrdSysError *edest);
204 void DeleteUNIXSock();
205 XrdNet *UNIXSock() const { return fUNIXSock; }
206 const char *UNIXSockPath() const { return fUNIXSockPath.c_str(); }
207
208 private:
209
211 XrdProofdProtocol *fProtocol; // Protocol instance attached to this session
212 XrdProofdResponse *fResponse; // Response instance attached to this session
213
214 XrdClientID *fParent; // Parent creating this session
215 int fNClients; // Number of attached clients
216 std::vector<XrdClientID *> fClients; // Attached clients stream ids
217 XrdOucHash<XrdProofWorker> fWorkers; // Workers assigned to the session
218
219 XrdSysSemWait *fPingSem; // To sychronize ping requests
220
221 XrdSrvBuffer *fStartMsg; // Msg with start processing info
222
223 time_t fDisconnectTime; // Time at which all clients disconnected
224 time_t fSetIdleTime; // Time at which the session went idle
225
227 int fSrvPID; // Srv process ID
229 short int fID;
231 XrdOucString fFileout;
232
233 int fPLiteNWrks; // # of wrks when PLite master
234
235 XrdNet *fUNIXSock; // UNIX server socket for internal connections
236 XrdOucString fUNIXSockPath; // UNIX server socket path
237
238 bool fIsShutdown; // Whether asked to shutdown
239 bool fIsValid; // Validity flag
240 bool fSkipCheck; // Skip next validity check
241
242 XrdOucString fAlias; // Session alias
243 XrdOucString fClient; // Client name
244 XrdOucString fTag; // Session unique tag
245 XrdOucString fOrdinal; // Session ordinal number
246 XrdOucString fUserEnvs; // List of envs received from the user
247 XrdOucString fAdminPath; // Admin file in the form "<active-sessions>/<usr>.<grp>.<pid>"
248
249 XrdROOT *fROOT; // ROOT version run by this session
250
251 XrdOucString fGroup; // Group, if any, to which the owner belongs
252
253 void ClearWorkers();
254
258 { XrdSysMutexHelper mhp(fMutex); if (fPingSem) delete fPingSem; fPingSem = 0;}
259 std::list<XrdProofQuery *> fQueries; // the enqueued queries of this session
260};
261#endif
ROOT::R::TRInterface & r
Definition: Object.C:4
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
#define g(i)
Definition: RSha256.hxx:105
XFontStruct * id
Definition: TGX11.cxx:108
int type
Definition: TGX11.cxx:120
float * q
Definition: THbookFile.cxx:87
@ kXPD_srvmsg
#define XrdSysError
Definition: XpdSysError.h:8
#define XrdSysSemWait
Definition: XpdSysSemWait.h:8
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
#define free
Definition: civetweb.c:1539
#define malloc
Definition: civetweb.c:1536
XrdProofQuery(const char *t, const char *n="", long s=0)
XrdOucString fDSName
const char * GetTag()
const char * GetDSName()
XrdOucString fTag
XrdSrvBuffer * StartMsg() const
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
int Resume()
Send a resume message to the this session.
XrdOucHash< XrdProofWorker > fWorkers
XrdSrvBuffer * fStartMsg
int Enqueue(XrdProofQuery *q)
const char * Client() const
void SetIdle()
Set status to idle and update the related time stamp.
void ClearWorkers()
Decrease worker counters and clean-up the list.
void AddWorker(const char *o, XrdProofWorker *w)
Add a worker assigned to this session with label 'o'.
void SetOrdinal(const char *o)
XrdSysRecMutex * fMutex
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
void SetGroup(const char *g)
void DeleteUNIXSock()
Delete the current UNIX socket.
void SetClient(const char *c)
void SetROOT(XrdROOT *r)
void SetValid(bool valid=1)
XrdProofdResponse * Response() const
void SetTag(const char *t)
const char * Tag() const
const char * AdminPath() const
void SetID(short int id)
void SetConnection(XrdProofdResponse *r)
int DisconnectTime()
Return the time (in secs) all clients have been disconnected.
std::list< XrdProofQuery * > fQueries
XrdOucHash< XrdProofWorker > * Workers() const
void ExportBuf(XrdOucString &buf)
Fill buf with relevant info about this session.
void SetProtocol(XrdProofdProtocol *p)
XrdProofdProofServ()
Constructor.
const char * Ordinal() const
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
const char * UserEnvs() const
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
const char * Alias() const
int SendData(int cid, void *buff, int len)
Send data to client cid.
XrdSysSemWait * fPingSem
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
std::list< XrdProofQuery * > * Queries() const
const char * UNIXSockPath() const
int TerminateProofServ(bool changeown)
Terminate the associated process.
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
void SetFileout(const char *f)
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
void SetUserEnvs(const char *t)
XrdOucString fUNIXSockPath
const char * Group() const
void SendClusterInfo(int nsess, int nacti)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdROOT * ROOT() const
void SetAlias(const char *a)
bool Match(short int id) const
XrdProofdProtocol * fProtocol
void ExportWorkers(XrdOucString &wrks)
Export the assigned workers in the format understood by proofserv.
int GetNClients(bool check)
Get the number of connected clients.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
void Reset()
Reset this instance.
void SetUNIXSockPath(const char *s)
~XrdProofdProofServ()
Destructor.
void Broadcast(const char *msg, int type=kXPD_srvmsg)
Broadcast message 'msg' at 'type' to the attached clients.
std::vector< XrdClientID * > fClients
int IdleTime()
Return the time (in secs) the session has been idle.
XrdProofQuery * CurrentQuery()
XrdNet * UNIXSock() const
void SetStartMsg(XrdSrvBuffer *sm)
XrdProofdResponse * fResponse
bool SkipCheck()
Return the value of fSkipCheck and reset it to false.
short int ID() const
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
XrdClientID * Parent() const
void SetRunning()
Set status to running and reset the related time stamp.
void SetParent(XrdClientID *cid)
const char * Fileout() const
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdProtocol * Protocol() const
XrdSrvBuffer(char *bp=0, int sz=0, bool dup=0)
const Int_t n
Definition: legend1.C:16
static constexpr double s
int changeown(const std::string &path, uid_t u, gid_t g)
Change the ownership of 'path' to the entity described by {u,g}.
Definition: proofexecv.cxx:738
auto * a
Definition: textangle.C:12