Logo ROOT   6.10/09
Reference Guide
XrdClientConn.hh
Go to the documentation of this file.
1 #ifndef XRD_CONN_H
2 #define XRD_CONN_H
3 /******************************************************************************/
4 /* */
5 /* X r d C l i e n t C o n n . h h */
6 /* */
7 /* Author: Fabrizio Furano (INFN Padova, 2004) */
8 /* Adapted from TXNetFile (root.cern.ch) originally done by */
9 /* Alvise Dorigo, Fabrizio Furano */
10 /* INFN Padova, 2003 */
11 /* */
12 /* This file is part of the XRootD software suite. */
13 /* */
14 /* XRootD is free software: you can redistribute it and/or modify it under */
15 /* the terms of the GNU Lesser General Public License as published by the */
16 /* Free Software Foundation, either version 3 of the License, or (at your */
17 /* option) any later version. */
18 /* */
19 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
20 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
21 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
22 /* License for more details. */
23 /* */
24 /* You should have received a copy of the GNU Lesser General Public License */
25 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
26 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
27 /* */
28 /* The copyright holder's institutional names and contributor's names may not */
29 /* be used to endorse or promote products derived from this software without */
30 /* specific prior written permission of the institution or contributor. */
31 /******************************************************************************/
32 
33 //////////////////////////////////////////////////////////////////////////
34 // //
35 // High level handler of connections to xrootd. //
36 // //
37 //////////////////////////////////////////////////////////////////////////
38 
40 
41 #include "time.h"
46 #include "XrdOuc/XrdOucHash.hh"
47 #include "XrdSys/XrdSysPthread.hh"
48 
49 #define ConnectionManager XrdClientConn::GetConnectionMgr()
50 
51 class XrdClientAbs;
52 class XrdSecProtocol;
53 
55 
56 public:
57 
64  };
69  };
70 
71  // To keep info about an open session
72  struct SessionIDInfo {
73  char id[16];
74  };
75 
78  XErrorCode fOpenError;
79 
80  XrdOucString fRedirOpaque; // Opaque info returned by the server when
81 
82  // redirecting. To be used in the next opens
83  XrdClientConn();
84  virtual ~XrdClientConn();
85 
86  inline bool CacheWillFit(long long bytes) {
87  if (!fMainReadCache)
88  return FALSE;
89  return fMainReadCache->WillFit(bytes);
90  }
91 
92  bool CheckHostDomain(XrdOucString hostToCheck);
93  short Connect(XrdClientUrlInfo Host2Conn,
94  XrdClientAbsUnsolMsgHandler *unsolhandler);
95  void Disconnect(bool ForcePhysicalDisc);
96  virtual bool GetAccessToSrv();
97  XReqErrorType GoBackToRedirector();
98 
99  XrdOucString GetClientHostDomain() { return fgClientHostDomain; }
100 
101 
102  static XrdClientPhyConnection *GetPhyConn(int LogConnID);
103 
104 
105  // --------- Cache related stuff
106 
107  long GetDataFromCache(const void *buffer,
108  long long begin_offs,
109  long long end_offs,
110  bool PerfCalc,
111  XrdClientIntvList &missingblks,
112  long &outstandingblks );
113 
115  long long begin_offs,
116  long long end_offs);
117 
118  bool SubmitRawDataToCache(const void *buffer,
119  long long begin_offs,
120  long long end_offs);
121 
122  void SubmitPlaceholderToCache(long long begin_offs,
123  long long end_offs) {
124  if (fMainReadCache)
125  fMainReadCache->PutPlaceholder(begin_offs, end_offs);
126  }
127 
128 
129  void RemoveAllDataFromCache(bool keepwriteblocks=true) {
130  if (fMainReadCache)
131  fMainReadCache->RemoveItems(keepwriteblocks);
132  }
133 
134  void RemoveDataFromCache(long long begin_offs,
135  long long end_offs, bool remove_overlapped = false) {
136  if (fMainReadCache)
137  fMainReadCache->RemoveItems(begin_offs, end_offs, remove_overlapped);
138  }
139 
141  if (fMainReadCache)
143  }
144 
145  void PrintCache() {
146  if (fMainReadCache)
148  }
149 
150 
152  // The actual cache size
153  int &size,
154 
155  // The number of bytes submitted since the beginning
156  long long &bytessubmitted,
157 
158  // The number of bytes found in the cache (estimate)
159  long long &byteshit,
160 
161  // The number of reads which did not find their data
162  // (estimate)
163  long long &misscount,
164 
165  // miss/totalreads ratio (estimate)
166  float &missrate,
167 
168  // number of read requests towards the cache
169  long long &readreqcnt,
170 
171  // ratio between bytes found / bytes submitted
172  float &bytesusefulness
173  ) {
174  if (!fMainReadCache) return false;
175 
176  fMainReadCache->GetInfo(size,
177  bytessubmitted,
178  byteshit,
179  misscount,
180  missrate,
181  readreqcnt,
182  bytesusefulness);
183  return true;
184  }
185 
186 
187  void SetCacheSize(int CacheSize) {
188  if (!fMainReadCache && CacheSize)
190 
191  if (fMainReadCache)
192  fMainReadCache->SetSize(CacheSize);
193  }
194 
195  void SetCacheRmPolicy(int RmPolicy) {
196  if (fMainReadCache)
198  }
199 
200  void UnPinCacheBlk(long long begin_offs, long long end_offs) {
201  fMainReadCache->UnPinCacheBlk(begin_offs, end_offs);
202  // Also use this to signal the possibility to proceed for a hard checkpoint
203  fWriteWaitAck->Broadcast();
204  }
205 
206 
207  // -------------------
208 
209 
210  int GetLogConnID() const { return fLogConnID; }
211 
213 
214  kXR_unt16 GetStreamID() const { return fPrimaryStreamid; }
215 
216  inline XrdClientUrlInfo *GetLBSUrl() { return fLBSUrl; }
217  inline XrdClientUrlInfo *GetMetaUrl() { return fMetaUrl; }
218  inline XrdClientUrlInfo GetCurrentUrl() { return fUrl; }
219  inline XrdClientUrlInfo GetRedirUrl() { return fREQUrl; }
220 
221  XErrorCode GetOpenError() const { return fOpenError; }
222  virtual XReqErrorType GoToAnotherServer(XrdClientUrlInfo &newdest);
223  virtual XReqErrorType GoToMetaManager();
224  bool IsConnected() const { return fConnected; }
225  bool IsPhyConnConnected();
226 
227  struct ServerResponseHeader
229 
230  struct ServerResponseBody_Error
232 
234  memset(&LastServerError, 0, sizeof(LastServerError));
235  LastServerError.errnum = kXR_noErrorYet;
236  }
237 
239 
240  virtual bool SendGenCommand(ClientRequest *req,
241  const void *reqMoreData,
242  void **answMoreDataAllocated,
243  void *answMoreData, bool HasToAlloc,
244  char *CmdName, int substreamid = 0);
245 
246  int GetOpenSockFD() const { return fOpenSockFD; }
247 
248  void SetClientHostDomain(const char *src) { fgClientHostDomain = src; }
249  void SetConnected(bool conn) { fConnected = conn; }
250 
251  void SetOpenError(XErrorCode err) { fOpenError = err; }
252 
253  // Gets a parallel stream id to use to set the return path for a re
254  int GetParallelStreamToUse(int reqsperstream);
255  int GetParallelStreamCount(); // Returns the total number of connected streams
256 
257  void SetRedirHandler(XrdClientAbs *rh) { fRedirHandler = rh; }
258 
259  void SetRequestedDestHost(char *newh, kXR_int32 port) {
260  fREQUrl = fUrl;
261  fREQUrl.Host = newh;
262  fREQUrl.Port = port;
264  }
265 
266  // Puts this instance in pause state for wsec seconds.
267  // A value <= 0 revokes immediately the pause state
268  void SetREQPauseState(kXR_int32 wsec) {
269  // Lock mutex
270  fREQWait->Lock();
271 
272  if (wsec > 0)
273  fREQWaitTimeLimit = time(0) + wsec;
274  else {
275  fREQWaitTimeLimit = 0;
276  fREQWait->Broadcast();
277  }
278 
279  // UnLock mutex
280  fREQWait->UnLock();
281  }
282 
283  // Puts this instance in connect-pause state for wsec seconds.
284  // Any future connection attempt will not happen before wsec
285  // and the first one will be towards the given host
286  void SetREQDelayedConnectState(kXR_int32 wsec) {
287  // Lock mutex
288  fREQConnectWait->Lock();
289 
290  if (wsec > 0)
291  fREQConnectWaitTimeLimit = time(0) + wsec;
292  else {
294  fREQConnectWait->Broadcast();
295  }
296 
297  // UnLock mutex
298  fREQConnectWait->UnLock();
299  }
300 
301  void SetSID(kXR_char *sid);
302  inline void SetUrl(XrdClientUrlInfo thisUrl) { fUrl = thisUrl; }
303 
304  // Sends the request to the server, through logconn with ID LogConnID
305  // The request is sent with a streamid 'child' of the current one, then marked as pending
306  // Its answer will be caught asynchronously
307  XReqErrorType WriteToServer_Async(ClientRequest *req,
308  const void* reqMoreData,
309  int substreamid = 0);
310 
312  { return fgConnectionMgr;} //Instance of the conn manager
313 
314  static void DelSessionIDRepo() {fSessionIDRMutex.Lock();
315  fSessionIDRepo.Purge();
316  fSessionIDRMutex.UnLock();
317  }
318 
319  void GetSessionID(SessionIDInfo &sess) {sess = mySessionID;}
320 
321  long GetServerProtocol() { return fServerProto; }
322 
323  short GetMaxRedirCnt() const { return fMaxGlobalRedirCnt; }
324  void SetMaxRedirCnt(short mx) {fMaxGlobalRedirCnt = mx; }
325  short GetRedirCnt() const { return fGlobalRedirCnt; }
326 
327  bool DoWriteSoftCheckPoint();
328  bool DoWriteHardCheckPoint();
329  void UnPinCacheBlk();
330 
331 
332  // To give a max number of seconds for an operation to complete, no matter what happens inside
333  // e.g. redirections, sleeps, failed connection attempts etc.
334  void SetOpTimeLimit(int delta_secs);
335  bool IsOpTimeLimitElapsed(time_t timenow);
336 
337 
338 protected:
339  void SetLogConnID(int cid) { fLogConnID = cid; }
340  void SetStreamID(kXR_unt16 sid) { fPrimaryStreamid = sid; }
341 
342 
343 
344  // The handler which first tried to connect somewhere
346 
347  XrdClientUrlInfo fUrl; // The current URL
348  XrdClientUrlInfo *fLBSUrl; // Needed to save the load balancer url
349  XrdClientUrlInfo fREQUrl; // For explicitly requested redirs
350 
351  short fGlobalRedirCnt; // Number of redirections
352 
353 private:
354 
355  static XrdOucString fgClientHostDomain; // Save the client's domain name
357  bool fGettingAccessToSrv; // To avoid recursion in desperate situations
358  time_t fGlobalRedirLastUpdateTimestamp; // Timestamp of last redirection
359 
360  int fLogConnID; // Logical connection ID used
361  kXR_unt16 fPrimaryStreamid; // Streamid used for normal communication
362  // NB it's a copy of the one contained in
363  // the logconn
364 
367 
368  // The time limit for a transaction
369  time_t fOpTimeLimit;
370 
371  XrdClientAbs *fRedirHandler; // Pointer to a class inheriting from
372  // XrdClientAbs providing methods
373  // to handle a redir at higher level
374 
375  XrdOucString fRedirInternalToken; // Token returned by the server when
376  // redirecting. To be used in the next logins
377 
378  XrdSysCondVar *fREQWaitResp; // For explicitly requested delayed async responses
379  ServerResponseBody_Attn_asynresp *
380  fREQWaitRespData; // For explicitly requested delayed async responses
381 
382  time_t fREQWaitTimeLimit; // For explicitly requested pause state
383  XrdSysCondVar *fREQWait; // For explicitly requested pause state
384  time_t fREQConnectWaitTimeLimit; // For explicitly requested delayed reconnect
385  XrdSysCondVar *fREQConnectWait; // For explicitly requested delayed reconnect
386 
387  long fServerProto; // The server protocol
388  ERemoteServerType fServerType; // Server type as returned by doHandShake()
389  SessionIDInfo mySessionID; // Login session ID
390 
391 
392  static XrdSysMutex fSessionIDRMutex; // Mutex for the Repo
393  static XrdOucHash<SessionIDInfo>
394  fSessionIDRepo; // The repository of session IDs, shared.
395  // Association between
396  // <hostname>:<port>.<user> and a SessionIDInfo struct
397 
398  int fOpenSockFD; // Descriptor of the underlying socket
399  static XrdClientConnectionMgr *fgConnectionMgr; //Instance of the Connection Manager
400 
401  XrdSysCondVar *fWriteWaitAck;
402  XrdClientVector<ClientRequest> fWriteReqsToRetry; // To store the write reqs to retry in case of a disconnection
403 
404  bool CheckErrorStatus(XrdClientMessage *, short &, char *);
405  void CheckPort(int &port);
406  void CheckREQPauseState();
408  bool CheckResp(struct ServerResponseHeader *resp, const char *method);
409  XrdClientMessage *ClientServerCmd(ClientRequest *req,
410  const void *reqMoreData,
411  void **answMoreDataAllocated,
412  void *answMoreData,
413  bool HasToAlloc,
414  int substreamid = 0);
415  XrdSecProtocol *DoAuthentication(char *plist, int plsiz);
416 
418 
419  bool DoLogin();
420  bool DomainMatcher(XrdOucString dom, XrdOucString domlist);
421 
422  XrdOucString GetDomainToMatch(XrdOucString hostname);
423 
425  ClientRequest *);
426  bool MatchStreamid(struct ServerResponseHeader *ServerResponse);
427 
428  // Sends a close request, without waiting for an answer
429  // useful (?) to be sent just before closing a badly working stream
430  bool PanicClose();
431 
432  XrdOucString ParseDomainFromHostname(XrdOucString hostname);
433 
434  XrdClientMessage *ReadPartialAnswer(XReqErrorType &, size_t &,
435  ClientRequest *, bool, void**,
437 
438 // void ClearSessionID();
439 
440  XReqErrorType WriteToServer(ClientRequest *req,
441  const void* reqMoreData,
442  short LogConnID,
443  int substreamid = 0);
444 
445  bool WaitResp(int secsmax);
446 
447  XrdClientUrlInfo *fMetaUrl; // Meta manager url
448  bool fLBSIsMeta; // Is current redirector a meta manager?
449 
450 public:
451  XrdOucString fRedirCGI; // Same as fRedirOpaque but persistent
452 
453 };
454 #endif
XrdSecProtocol * DoAuthentication(char *plist, int plsiz)
XrdOucString fRedirInternalToken
virtual bool SendGenCommand(ClientRequest *req, const void *reqMoreData, void **answMoreDataAllocated, void *answMoreData, bool HasToAlloc, char *CmdName, int substreamid=0)
void Disconnect(bool ForcePhysicalDisc)
XrdClientUrlInfo * fMetaUrl
XrdOucString ParseDomainFromHostname(XrdOucString hostname)
XrdClientUrlInfo fREQUrl
int GetLogConnID() const
XrdOucString fRedirOpaque
void PutPlaceholder(long long begin_offs, long long end_offs)
static XrdClientPhyConnection * GetPhyConn(int LogConnID)
static XrdSysMutex fSessionIDRMutex
bool fGettingAccessToSrv
void UnPinCacheBlk(long long begin_offs, long long end_offs)
bool CheckErrorStatus(XrdClientMessage *, short &, char *)
bool WillFit(long long bc)
XrdSysCondVar * fREQWaitResp
bool DomainMatcher(XrdOucString dom, XrdOucString domlist)
void CheckREQPauseState()
XrdSysCondVar * fREQWait
bool CheckHostDomain(XrdOucString hostToCheck)
kXR_unt16 fPrimaryStreamid
void RemoveItems(bool leavepinned=true)
bool IsConnected() const
XrdClientMessage * ReadPartialAnswer(XReqErrorType &, size_t &, ClientRequest *, bool, void **, EThreeStateReadHandler &)
XrdClientUrlInfo * fLBSUrl
void CheckREQConnectWaitState()
bool GetCacheInfo(int &size, long long &bytessubmitted, long long &byteshit, long long &misscount, float &missrate, long long &readreqcnt, float &bytesusefulness)
XrdOucString fRedirCGI
void SetLogConnID(int cid)
virtual bool GetAccessToSrv()
void SetStreamID(kXR_unt16 sid)
bool PanicClose()
void SetSID(kXR_char *sid)
SessionIDInfo mySessionID
ERemoteServerType GetServerType() const
void SubmitPlaceholderToCache(long long begin_offs, long long end_offs)
static XrdClientConnectionMgr * GetConnectionMgr()
XrdClientUrlInfo fUrl
void SetBlkRemovalPolicy(int p)
long GetServerProtocol()
static void DelSessionIDRepo()
XReqErrorType GoBackToRedirector()
short Connect(XrdClientUrlInfo Host2Conn, XrdClientAbsUnsolMsgHandler *unsolhandler)
bool DoWriteHardCheckPoint()
ServerResponseBody_Attn_asynresp * fREQWaitRespData
void SetConnected(bool conn)
XrdClientVector< ClientRequest > fWriteReqsToRetry
ESrvErrorHandlerRetval HandleServerError(XReqErrorType &, XrdClientMessage *, ClientRequest *)
time_t fREQWaitTimeLimit
void SetUrl(XrdClientUrlInfo thisUrl)
XrdClientUrlInfo * GetLBSUrl()
void RemoveAllDataFromCache(bool keepwriteblocks=true)
UnsolRespProcResult
XrdOucString GetDomainToMatch(XrdOucString hostname)
void SetOpTimeLimit(int delta_secs)
#define XrdSysMutex
Definition: XrdSysToOuc.h:16
long GetDataFromCache(const void *buffer, long long begin_offs, long long end_offs, bool PerfCalc, XrdClientIntvList &missingblks, long &outstandingblks)
bool WaitResp(int secsmax)
XrdClientAbs * fRedirHandler
short GetRedirCnt() const
XErrorCode GetOpenError() const
void SetCacheSize(int CacheSize)
XrdClientUrlInfo GetRedirUrl()
bool IsOpTimeLimitElapsed(time_t timenow)
static XrdOucHash< SessionIDInfo > fSessionIDRepo
int GetParallelStreamCount()
kXR_unt16 GetStreamID() const
void SetOpenError(XErrorCode err)
void GetInfo(int &size, long long &bytessubmitted, long long &byteshit, long long &misscount, float &missrate, long long &readreqcnt, float &bytesusefulness)
XReqErrorType WriteToServer(ClientRequest *req, const void *reqMoreData, short LogConnID, int substreamid=0)
void SetREQPauseState(kXR_int32 wsec)
ERemoteServerType DoHandShake(short log)
void SetRequestedDestHost(char *newh, kXR_int32 port)
XErrorCode fOpenError
time_t fREQConnectWaitTimeLimit
XrdSysCondVar * fREQConnectWait
virtual XReqErrorType GoToAnotherServer(XrdClientUrlInfo &newdest)
virtual ~XrdClientConn()
XrdClientUrlInfo GetCurrentUrl()
void RemoveDataFromCache(long long begin_offs, long long end_offs, bool remove_overlapped=false)
bool SubmitRawDataToCache(const void *buffer, long long begin_offs, long long end_offs)
void UnPinCacheBlk(long long begin_offs, long long end_offs)
void GetSessionID(SessionIDInfo &sess)
void CheckPort(int &port)
bool CacheWillFit(long long bytes)
UnsolRespProcResult ProcessAsynResp(XrdClientMessage *unsolmsg)
XrdClientMessage * ClientServerCmd(ClientRequest *req, const void *reqMoreData, void **answMoreDataAllocated, void *answMoreData, bool HasToAlloc, int substreamid=0)
time_t fGlobalRedirLastUpdateTimestamp
void SetRedirHandler(XrdClientAbs *rh)
bool DoWriteSoftCheckPoint()
struct ServerResponseHeader LastServerResp
void SetCacheRmPolicy(int RmPolicy)
XrdClientAbsUnsolMsgHandler * fUnsolMsgHandler
void SetREQDelayedConnectState(kXR_int32 wsec)
short fMaxGlobalRedirCnt
bool SubmitDataToCache(XrdClientMessage *xmsg, long long begin_offs, long long end_offs)
void UnPinCacheBlk()
#define FALSE
bool IsPhyConnConnected()
void SetAddrFromHost()
ERemoteServerType fServerType
void SetMaxRedirCnt(short mx)
int GetOpenSockFD() const
bool CheckResp(struct ServerResponseHeader *resp, const char *method)
XrdSysCondVar * fWriteWaitAck
int GetParallelStreamToUse(int reqsperstream)
static XrdOucString fgClientHostDomain
static XrdClientConnectionMgr * fgConnectionMgr
void ClearLastServerError()
XrdClientReadCache * fMainReadCache
bool MatchStreamid(struct ServerResponseHeader *ServerResponse)
void SetClientHostDomain(const char *src)
void RemovePlaceholdersFromCache()
short GetMaxRedirCnt() const
XrdClientUrlInfo * GetMetaUrl()
double log(double)
virtual XReqErrorType GoToMetaManager()
XrdOucString GetClientHostDomain()
struct ServerResponseBody_Error LastServerError
XReqErrorType WriteToServer_Async(ClientRequest *req, const void *reqMoreData, int substreamid=0)