Logo ROOT   6.07/09
Reference Guide
TXSocket.h
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: G. Ganis Oct 2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TXSocket
13 #define ROOT_TXSocket
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TXSocket //
18 // //
19 // High level handler of connections to xproofd. //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #define DFLT_CONNECTMAXTRY 10
24 
25 #ifndef ROOT_TSemaphore
26 #include "TSemaphore.h"
27 #endif
28 #ifndef ROOT_TString
29 #include "TString.h"
30 #endif
31 #ifndef ROOT_TList
32 #include "TList.h"
33 #endif
34 #ifndef ROOT_TMessage
35 #include "TMessage.h"
36 #endif
37 #ifndef ROOT_TUrl
38 #include "TUrl.h"
39 #endif
40 #ifndef ROOT_TSocket
41 #include "TSocket.h"
42 #endif
43 #ifndef __XPTYPES_H
44 #include "XProtocol/XPtypes.hh"
45 #endif
46 #ifndef XRC_UNSOLMSG_H
48 #endif
49 
50 #include <list>
51 #include <mutex>
52 
53 class TObjString;
54 class TXSockBuf;
55 class TXSockPipe;
56 class TXHandler;
57 class TXSocketHandler;
58 class XrdClientMessage;
59 class XrdProofConn;
60 
61 // To transmit info to Handlers
62 typedef struct {
67 } XHandleIn_t;
68 typedef struct {
70  const char *fMsg;
71 } XHandleErr_t;
72 
74 
75 friend class TXProofMgr;
76 friend class TXProofServ;
77 friend class TXSlave;
78 friend class TXSocketHandler;
79 friend class TXSockPipe;
80 friend class TXUnixSocket;
81 
82 private:
83  char fMode; // 'e' (def) or 'i' (internal - proofsrv)
84  kXR_int32 fSendOpt; // Options for sending messages
85  Short_t fSessionID; // proofsrv: remote ID of connected session
86  TString fUser; // Username used for login
87  TString fHost; // Remote host
88  Int_t fPort; // Remote port
89 
90  Int_t fLogLevel; // Log level to be transmitted to servers
91 
92  TString fBuffer; // Container for exchanging information
93  TObject *fReference; // Generic object reference of this socket
94  TXHandler *fHandler; // Handler of asynchronous events (input, error)
95 
96  XrdProofConn *fConn; // instance of the underlying connection module
97 
98  // Asynchronous messages
99  TSemaphore fASem; // Control access to conn async msg queue
100  std::recursive_mutex fAMtx; // To protect async msg queue
101  Bool_t fAWait; // kTRUE if waiting at the async msg queue
102  std::list<TXSockBuf *> fAQue; // list of asynchronous messages
103  Int_t fByteLeft; // bytes left in the first buffer
104  Int_t fByteCur; // current position in the first buffer
105  TXSockBuf *fBufCur; // current read buffer
106 
107  TSemaphore fAsynProc; // Control actions while processing async messages
108 
109  // Interrupts
110  std::recursive_mutex fIMtx; // To protect interrupt queue
111  kXR_int32 fILev; // Highest received interrupt
112  Bool_t fIForward; // Whether the interrupt should be propagated
113 
114  // Process ID of the instatiating process (to signal interrupts)
116 
117  // Whether to timeout or not
118  Bool_t fDontTimeout; // If true wait forever for incoming messages
119  Bool_t fRDInterrupt; // To interrupt waiting for messages
120 
121  // Version of the remote XrdProofdProtocol
123 
124  // Static area for input handling
125  static TXSockPipe fgPipe; // Pipe for input monitoring
126  static TString fgLoc; // Location string
127  static Bool_t fgInitDone; // Avoid initializing more than once
128 
129  // List of spare buffers
130  static std::mutex fgSMtx; // To protect spare list
131  static std::list<TXSockBuf *> fgSQue; // list of spare buffers
132 
133  // Manage asynchronous message
134  Int_t PickUpReady();
136  void PushBackSpare();
137 
138  // Post a message into the queue for asynchronous processing
139  void PostMsg(Int_t type, const char *msg = 0);
140 
141  // Wake up all threads waiting for at the semaphore (used by TXSlave)
142  void PostSemAll();
143 
144  // Auxilliary
145  Int_t GetLowSocket() const;
146 
147  static void SetLocation(const char *loc = ""); // Set location string
148 
149  static void InitEnvs(); // Initialize environment variables
150 
151 public:
152  // Should be the same as in proofd/src/XrdProofdProtocol::Urgent
153  enum EUrgentMsgType { kStopProcess = 2000 };
154 
155  TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
156  const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
157  virtual ~TXSocket();
158 
159  virtual void Close(Option_t *opt = "");
160  Bool_t Create(Bool_t attach = kFALSE);
161  void DisconnectSession(Int_t id, Option_t *opt = "");
162 
163  void DoError(int level,
164  const char *location, const char *fmt, va_list va) const;
165 
167  XrdClientMessage *msg);
168 
169  virtual Int_t GetClientID() const { return -1; }
170  virtual Int_t GetClientIDSize() const { return 1; }
171  Int_t GetLogConnID() const;
172  Int_t GetOpenError() const;
173  Int_t GetServType() const;
174  Int_t GetSessionID() const;
176 
177  Bool_t IsValid() const;
179  virtual void RemoveClientID() { }
180  virtual void SetClientID(Int_t) { }
181  void SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
182  void SetSessionID(Int_t id);
183 
184  // Send interfaces
185  Int_t Send(const TMessage &mess);
186  Int_t Send(Int_t kind) { return TSocket::Send(kind); }
187  Int_t Send(Int_t status, Int_t kind)
188  { return TSocket::Send(status, kind); }
189  Int_t Send(const char *mess, Int_t kind = kMESS_STRING)
190  { return TSocket::Send(mess, kind); }
191  Int_t SendRaw(const void *buf, Int_t len,
193 
194  TObjString *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
195  Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
196 
197  // Recv interfaces
198  Int_t Recv(TMessage *&mess);
199  Int_t Recv(Int_t &status, Int_t &kind)
200  { return TSocket::Recv(status, kind); }
201  Int_t Recv(char *mess, Int_t max)
202  { return TSocket::Recv(mess, max); }
203  Int_t Recv(char *mess, Int_t max, Int_t &kind)
204  { return TSocket::Recv(mess, max, kind); }
205  Int_t RecvRaw(void *buf, Int_t len,
206  ESendRecvOptions opt = kDefault);
207 
208  // Interrupts
211 
212  // Urgent message
213  void SendUrgent(Int_t type, Int_t int1, Int_t int2);
214 
215  // Interrupt the low level socket
216  void SetInterrupt(Bool_t i = kTRUE);
217  inline Bool_t IsInterrupt() { std::lock_guard<std::recursive_mutex> lock(fAMtx);
218  return fRDInterrupt; }
219 
220  // Set / Check async msg queue waiting status
221  inline void SetAWait(Bool_t w = kTRUE) {
222  std::lock_guard<std::recursive_mutex> lock(fAMtx);
223  fAWait = w; }
224  inline Bool_t IsAWait() { std::lock_guard<std::recursive_mutex> lock(fAMtx);
225  return fAWait; }
226  // Flush the asynchronous queue
227  Int_t Flush();
228 
229  // Ping the counterpart
230  Bool_t Ping(const char *ord = 0);
231 
232  // Request remote touch of the admin file associated with this connection
233  void RemoteTouch();
234  // Propagate a Ctrl-C
235  void CtrlC();
236 
237  // Standard options cannot be set
239 
240  // Disable / Enable read timeout
241  void DisableTimeout() { fDontTimeout = kTRUE; }
242  void EnableTimeout() { fDontTimeout = kFALSE; }
243 
244  // Try reconnection after error
245  virtual Int_t Reconnect();
246 
247  ClassDef(TXSocket, 0) //A high level connection class for PROOF
248 };
249 
250 
251 //
252 // The following structure is used to store buffers received asynchronously
253 //
254 class TXSockBuf {
255 public:
261 
262  TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
263  ~TXSockBuf();
264 
265  void Resize(Int_t sz);
266 
267  static Long64_t BuffMem();
268  static Long64_t GetMemMax();
269  static void SetMemMax(Long64_t memmax);
270 
271 private:
273  static Long64_t fgBuffMem; // Total allocated memory
274  static Long64_t fgMemMax; // Max allocated memory allowed
275 };
276 
277 //
278 // The following class describes internal pipes
279 //
280 class TXSockPipe {
281 public:
282 
283  TXSockPipe(const char *loc = "");
284  virtual ~TXSockPipe();
285 
286  Bool_t IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
287 
288  TXSocket *GetLastReady();
289 
290  Int_t GetRead() const { return fPipe[0]; }
291  Int_t Post(TSocket *s); // Notify socket ready via global pipe
292  Int_t Clean(TSocket *s); // Clean previous pipe notification
293  Int_t Flush(TSocket *s); // Remove any instance of 's' from the pipe
294  void DumpReadySock();
295 
296  void SetLoc(const char *loc = "") { fLoc = loc; }
297 
298 private:
299  std::recursive_mutex fMutex; // Protect access to the sockets-ready list
300  Int_t fPipe[2]; // Pipe for input monitoring
301  TString fLoc; // Location string
302  TList fReadySock; // List of sockets ready to be read
303 };
304 
305 //
306 // Guard for a semaphore
307 //
309 public:
310 
311  TXSemaphoreGuard(TSemaphore *sem) : fSem(sem), fValid(kTRUE) { if (!fSem || fSem->TryWait()) fValid = kFALSE; }
312  virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
313 
314  Bool_t IsValid() const { return fValid; }
315 
316 private:
319 };
320 
321 #endif
void SetSendOpt(ESendRecvOptions o)
Definition: TXSocket.h:181
Int_t fCid
Definition: TXSocket.h:260
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:242
virtual void RemoveClientID()
Definition: TXSocket.h:179
virtual void SetClientID(Int_t)
Definition: TXSocket.h:180
Int_t Send(const char *mess, Int_t kind=kMESS_STRING)
Send a character string buffer.
Definition: TXSocket.h:189
TSemaphore fAsynProc
Definition: TXSocket.h:107
Int_t Recv(Int_t &status, Int_t &kind)
Receives a status and a message type.
Definition: TXSocket.h:199
long long Long64_t
Definition: RtypesCore.h:69
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Definition: TXSocket.cxx:1218
TSemaphore fASem
Definition: TXSocket.h:99
void SetLoc(const char *loc="")
Definition: TXSocket.h:296
Int_t Recv(char *mess, Int_t max)
Receive a character string message of maximum max length.
Definition: TXSocket.h:201
Int_t GetXrdProofdVersion() const
Definition: TXSocket.h:175
Bool_t IsValid() const
Definition: TXSocket.h:286
Collectable string class.
Definition: TObjString.h:32
Bool_t IsAWait()
Definition: TXSocket.h:224
const char Option_t
Definition: RtypesCore.h:62
ESockOptions
Definition: TSocket.h:52
Int_t fLogLevel
Definition: TXSocket.h:90
std::recursive_mutex fMutex
Definition: TXSocket.h:299
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual Int_t GetClientID() const
Definition: TXSocket.h:169
int GetLogConnID() const
Definition: XrdProofConn.h:140
XrdProofConn * fConn
Definition: TXSocket.h:96
char fMode
Definition: TXSocket.h:83
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
EUrgentMsgType
Definition: TXSocket.h:153
std::recursive_mutex fIMtx
Definition: TXSocket.h:110
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TXSocket()
Destructor.
Definition: TXSocket.cxx:231
Int_t Recv(char *mess, Int_t max, Int_t &kind)
Receive a character string message of maximum max length.
Definition: TXSocket.h:203
const Bool_t kFALSE
Definition: Rtypes.h:92
int GetServType() const
Definition: XrdProofConn.h:143
void SetSessionID(Int_t id)
Set session ID to &#39;id&#39;. If id < 0, disable also the asynchronous handler.
Definition: TXSocket.cxx:256
This is the version of TSlave for workers servers based on XProofD.
Definition: TXSlave.h:36
static std::mutex fgSMtx
Definition: TXSocket.h:130
Int_t fInt3
Definition: TXSocket.h:65
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Definition: TXSocket.cxx:1347
Int_t fOpt
Definition: TXSocket.h:69
ESendRecvOptions
Definition: TSocket.h:65
Int_t fInt2
Definition: TXSocket.h:64
static Long64_t fgBuffMem
Definition: TXSocket.h:273
virtual ~TXSemaphoreGuard()
Definition: TXSocket.h:312
void SetInterrupt()
Interrupt the underlying socket.
Implementation of TXSocket using PF_UNIX sockets.
Definition: TXUnixSocket.h:31
static Long64_t fgMemMax
Definition: TXSocket.h:274
Bool_t fAWait
Definition: TXSocket.h:101
Int_t fPort
Definition: TXSocket.h:88
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TXSocket.cxx:1581
#define ClassDef(name, id)
Definition: Rtypes.h:254
static void InitEnvs()
Init environment variables for XrdClient.
Definition: TXSocket.cxx:1984
static std::list< TXSockBuf * > fgSQue
Definition: TXSocket.h:131
Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TXSocket.cxx:1689
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition: TXSocket.cxx:972
Bool_t fIForward
Definition: TXSocket.h:112
TXHandler * fHandler
Definition: TXSocket.h:94
virtual Int_t GetClientIDSize() const
Definition: TXSocket.h:170
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Definition: TXSocket.cxx:1512
Bool_t IsServProofd()
Return kTRUE if the remote server is a &#39;proofd&#39;.
Definition: TXSocket.cxx:959
TString fUser
Definition: TXSocket.h:86
int GetOpenError() const
Definition: XrdProofConn.h:142
Int_t fInt4
Definition: TXSocket.h:66
A doubly linked list.
Definition: TList.h:47
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
Definition: TXProofServ.h:34
Bool_t fOwn
Definition: TXSocket.h:259
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Definition: TXSocket.cxx:1053
Int_t fByteCur
Definition: TXSocket.h:104
Int_t Flush()
Flush the asynchronous queue.
Definition: TXSocket.cxx:1004
Bool_t IsInterrupt()
Definition: TXSocket.h:217
TString fHost
Definition: TXSocket.h:87
UnsolRespProcResult
void SetAWait(Bool_t w=kTRUE)
Definition: TXSocket.h:221
Handler of asynchronous events for XProofD sockets.
Definition: TXHandler.h:30
Input handler for XProofD sockets.
Int_t fXrdProofdVersion
Definition: TXSocket.h:122
void PostMsg(Int_t type, const char *msg=0)
Post a message of type &#39;type&#39; into the read messages queue.
Definition: TXSocket.cxx:848
High level handler of connections to XProofD.
Definition: TXSocket.h:73
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Definition: TXSocket.cxx:1275
TString fBuffer
Definition: TXSocket.h:92
std::recursive_mutex fAMtx
Definition: TXSocket.h:100
short Short_t
Definition: RtypesCore.h:35
std::list< TXSockBuf * > fAQue
Definition: TXSocket.h:102
static TString fgLoc
Definition: TXSocket.h:126
Char_t * fBuf
Definition: TXSocket.h:258
Implementation of the functionality provided by TProofMgr in the case of a xproofd-based session...
Definition: TXProofMgr.h:46
TString fLoc
Definition: TXSocket.h:301
TList fReadySock
Definition: TXSocket.h:302
Int_t fSiz
Definition: TXSocket.h:256
TXSemaphoreGuard(TSemaphore *sem)
Definition: TXSocket.h:311
Int_t fLen
Definition: TXSocket.h:257
Int_t Send(Int_t status, Int_t kind)
Send a status and a single message opcode.
Definition: TXSocket.h:187
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition: TXSocket.cxx:268
TXSockBuf * fBufCur
Definition: TXSocket.h:105
int GetLowSocket()
Return the socket descriptor of the underlying connection.
Short_t fSessionID
Definition: TXSocket.h:85
Int_t Send(Int_t kind)
Send a single message opcode.
Definition: TXSocket.h:186
friend class TXSocket
Definition: XrdProofConn.h:60
Bool_t IsValid() const
Definition: TXSocket.h:314
bool IsValid() const
Test validity of this connection.
int type
Definition: TGX11.cxx:120
virtual void Close(const char *opt="")
Close connection.
void forward(const LAYERDATA &prevLayerData, LAYERDATA &currLayerData)
apply the weights (and functions) in forward direction of the DNN
Definition: NeuralNet.icc:539
Bool_t fDontTimeout
Definition: TXSocket.h:118
void DisableTimeout()
Definition: TXSocket.h:241
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TXSocket.cxx:1774
Char_t * fMem
Definition: TXSocket.h:272
Int_t fInt1
Definition: TXSocket.h:63
kXR_int32 fSendOpt
Definition: TXSocket.h:84
Bool_t fRDInterrupt
Definition: TXSocket.h:119
TObject * fReference
Definition: TXSocket.h:93
Mother of all ROOT objects.
Definition: TObject.h:44
Int_t SetOption(ESockOptions, Int_t)
Set socket options.
Definition: TXSocket.h:238
char Char_t
Definition: RtypesCore.h:29
void PushBackSpare()
Release read buffer giving back to the spare list.
Definition: TXSocket.cxx:1560
const char * fMsg
Definition: TXSocket.h:70
kXR_int32 fILev
Definition: TXSocket.h:111
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Definition: TXSocket.cxx:1421
static Bool_t fgInitDone
Definition: TXSocket.h:127
Int_t Reconnect()
Try reconnection after failure.
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
Definition: TXSocket.cxx:1825
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; &#39;type&#39; specifies the type of the message (see TXSocket::EUrgentMs...
Definition: TXSocket.cxx:1942
static TXSockPipe fgPipe
Definition: TXSocket.h:125
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
Definition: TXSocket.cxx:1638
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition: TXSocket.cxx:71
Int_t fPid
Definition: TXSocket.h:115
short GetSessionID() const
Definition: XrdProofConn.h:144
TSemaphore * fSem
Definition: TXSocket.h:317
Int_t GetRead() const
Definition: TXSocket.h:290
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t fByteLeft
Definition: TXSocket.h:103
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Definition: TXSocket.cxx:905
void EnableTimeout()
Definition: TXSocket.h:242
void CtrlC()
Interrupt the remote protocol instance.
Definition: TXSocket.cxx:1385
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *m)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...