Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TXSocket.h
Go to the documentation of this file.
1// @(#)root/proofx:$Id$
2// Author: G. Ganis Oct 2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TXSocket
13#define ROOT_TXSocket
14
15//////////////////////////////////////////////////////////////////////////
16// //
17// TXSocket //
18// //
19// High level handler of connections to xproofd. //
20// //
21//////////////////////////////////////////////////////////////////////////
22
23#define DFLT_CONNECTMAXTRY 10
24
25#include "TSemaphore.h"
26#include "TString.h"
27#include "TList.h"
28#include "TMessage.h"
29#include "TUrl.h"
30#include "TSocket.h"
31#ifndef __XPTYPES_H
32#include "XProtocol/XPtypes.hh"
33#endif
35
36#include <list>
37#include <mutex>
38
39class TObjString;
40class TXSockBuf;
41class TXSockPipe;
42class TXHandler;
43class TXSocketHandler;
45class XrdProofConn;
46
47// To transmit info to Handlers
48typedef struct {
54typedef struct {
56 const char *fMsg;
58
60
61friend class TXProofMgr;
62friend class TXProofServ;
63friend class TXSlave;
64friend class TXSocketHandler;
65friend class TXSockPipe;
66friend class TXUnixSocket;
67
68private:
69 char fMode; // 'e' (def) or 'i' (internal - proofsrv)
70 kXR_int32 fSendOpt; // Options for sending messages
71 Short_t fSessionID; // proofsrv: remote ID of connected session
72 TString fUser; // Username used for login
73 TString fHost; // Remote host
74 Int_t fPort; // Remote port
75
76 Int_t fLogLevel; // Log level to be transmitted to servers
77
78 TString fBuffer; // Container for exchanging information
79 TObject *fReference; // Generic object reference of this socket
80 TXHandler *fHandler; // Handler of asynchronous events (input, error)
81
82 XrdProofConn *fConn; // instance of the underlying connection module
83
84 // Asynchronous messages
85 TSemaphore fASem; // Control access to conn async msg queue
86 std::recursive_mutex fAMtx; // To protect async msg queue
87 Bool_t fAWait; // kTRUE if waiting at the async msg queue
88 std::list<TXSockBuf *> fAQue; // list of asynchronous messages
89 Int_t fByteLeft; // bytes left in the first buffer
90 Int_t fByteCur; // current position in the first buffer
91 TXSockBuf *fBufCur; // current read buffer
92
93 TSemaphore fAsynProc; // Control actions while processing async messages
94
95 // Interrupts
96 std::recursive_mutex fIMtx; // To protect interrupt queue
97 kXR_int32 fILev; // Highest received interrupt
98 Bool_t fIForward; // Whether the interrupt should be propagated
99
100 // Process ID of the instatiating process (to signal interrupts)
102
103 // Whether to timeout or not
104 Bool_t fDontTimeout; // If true wait forever for incoming messages
105 Bool_t fRDInterrupt; // To interrupt waiting for messages
106
107 // Version of the remote XrdProofdProtocol
109
110 // Static area for input handling
111 static TXSockPipe fgPipe; // Pipe for input monitoring
112 static TString fgLoc; // Location string
113 static Bool_t fgInitDone; // Avoid initializing more than once
114
115 // List of spare buffers
116 static std::mutex fgSMtx; // To protect spare list
117 static std::list<TXSockBuf *> fgSQue; // list of spare buffers
118
119 // Manage asynchronous message
122 void PushBackSpare();
123
124 // Post a message into the queue for asynchronous processing
125 void PostMsg(Int_t type, const char *msg = 0);
126
127 // Wake up all threads waiting for at the semaphore (used by TXSlave)
128 void PostSemAll();
129
130 // Auxilliary
131 Int_t GetLowSocket() const;
132
133 static void SetLocation(const char *loc = ""); // Set location string
134
135 static void InitEnvs(); // Initialize environment variables
136
137public:
138 // Should be the same as in proofd/src/XrdProofdProtocol::Urgent
140
141 TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
142 const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
143 virtual ~TXSocket();
144
145 virtual void Close(Option_t *opt = "");
146 Bool_t Create(Bool_t attach = kFALSE);
147 void DisconnectSession(Int_t id, Option_t *opt = "");
148
149 void DoError(int level,
150 const char *location, const char *fmt, va_list va) const;
151
153 XrdClientMessage *msg);
154
155 virtual Int_t GetClientID() const { return -1; }
156 virtual Int_t GetClientIDSize() const { return 1; }
157 Int_t GetLogConnID() const;
158 Int_t GetOpenError() const;
159 Int_t GetServType() const;
160 Int_t GetSessionID() const;
162
163 Bool_t IsValid() const;
165 virtual void RemoveClientID() { }
166 virtual void SetClientID(Int_t) { }
168 void SetSessionID(Int_t id);
169
170 // Send interfaces
171 Int_t Send(const TMessage &mess);
172 Int_t Send(Int_t kind) { return TSocket::Send(kind); }
173 Int_t Send(Int_t status, Int_t kind)
174 { return TSocket::Send(status, kind); }
175 Int_t Send(const char *mess, Int_t kind = kMESS_STRING)
176 { return TSocket::Send(mess, kind); }
177 Int_t SendRaw(const void *buf, Int_t len,
179
180 TObjString *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
181 Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
182
183 // Recv interfaces
184 Int_t Recv(TMessage *&mess);
185 Int_t Recv(Int_t &status, Int_t &kind)
186 { return TSocket::Recv(status, kind); }
187 Int_t Recv(char *mess, Int_t max)
188 { return TSocket::Recv(mess, max); }
189 Int_t Recv(char *mess, Int_t max, Int_t &kind)
190 { return TSocket::Recv(mess, max, kind); }
191 Int_t RecvRaw(void *buf, Int_t len,
193
194 // Interrupts
196 Int_t GetInterrupt(Bool_t &forward);
197
198 // Urgent message
199 void SendUrgent(Int_t type, Int_t int1, Int_t int2);
200
201 // Interrupt the low level socket
202 void SetInterrupt(Bool_t i = kTRUE);
203 inline Bool_t IsInterrupt() { std::lock_guard<std::recursive_mutex> lock(fAMtx);
204 return fRDInterrupt; }
205
206 // Set / Check async msg queue waiting status
207 inline void SetAWait(Bool_t w = kTRUE) {
208 std::lock_guard<std::recursive_mutex> lock(fAMtx);
209 fAWait = w; }
210 inline Bool_t IsAWait() { std::lock_guard<std::recursive_mutex> lock(fAMtx);
211 return fAWait; }
212 // Flush the asynchronous queue
213 Int_t Flush();
214
215 // Ping the counterpart
216 Bool_t Ping(const char *ord = 0);
217
218 // Request remote touch of the admin file associated with this connection
219 void RemoteTouch();
220 // Propagate a Ctrl-C
221 void CtrlC();
222
223 // Standard options cannot be set
225
226 // Disable / Enable read timeout
229
230 // Try reconnection after error
231 virtual Int_t Reconnect();
232
233 ClassDef(TXSocket, 0) //A high level connection class for PROOF
234};
235
236
237//
238// The following structure is used to store buffers received asynchronously
239//
241public:
247
248 TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
249 ~TXSockBuf();
250
251 void Resize(Int_t sz);
252
253 static Long64_t BuffMem();
254 static Long64_t GetMemMax();
255 static void SetMemMax(Long64_t memmax);
256
257private:
259 static Long64_t fgBuffMem; // Total allocated memory
260 static Long64_t fgMemMax; // Max allocated memory allowed
261};
262
263//
264// The following class describes internal pipes
265//
267public:
268
269 TXSockPipe(const char *loc = "");
270 virtual ~TXSockPipe();
271
272 Bool_t IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
273
275
276 Int_t GetRead() const { return fPipe[0]; }
277 Int_t Post(TSocket *s); // Notify socket ready via global pipe
278 Int_t Clean(TSocket *s); // Clean previous pipe notification
279 Int_t Flush(TSocket *s); // Remove any instance of 's' from the pipe
280 void DumpReadySock();
281
282 void SetLoc(const char *loc = "") { fLoc = loc; }
283
284private:
285 std::recursive_mutex fMutex; // Protect access to the sockets-ready list
286 Int_t fPipe[2]; // Pipe for input monitoring
287 TString fLoc; // Location string
288 TList fReadySock; // List of sockets ready to be read
289};
290
291//
292// Guard for a semaphore
293//
295public:
296
298 virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
299
300 Bool_t IsValid() const { return fValid; }
301
302private:
305};
306
307#endif
@ kMESS_STRING
int Int_t
Definition RtypesCore.h:45
char Char_t
Definition RtypesCore.h:37
const Bool_t kFALSE
Definition RtypesCore.h:101
bool Bool_t
Definition RtypesCore.h:63
short Short_t
Definition RtypesCore.h:39
long long Long64_t
Definition RtypesCore.h:80
const Bool_t kTRUE
Definition RtypesCore.h:100
const char Option_t
Definition RtypesCore.h:66
#define ClassDef(name, id)
Definition Rtypes.h:325
int type
Definition TGX11.cxx:121
ESockOptions
Definition TSystem.h:215
ESendRecvOptions
Definition TSystem.h:228
@ kDontBlock
Definition TSystem.h:232
@ kDefault
Definition TSystem.h:229
UnsolRespProcResult
A doubly linked list.
Definition TList.h:38
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:41
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
Int_t Post()
Increment the value of the semaphore.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:136
Handler of asynchronous events for XProofD sockets.
Definition TXHandler.h:28
Implementation of the functionality provided by TProofMgr in the case of a xproofd-based session.
Definition TXProofMgr.h:40
This class implements the XProofD version of TProofServ, with respect to which it differs only for th...
Definition TXProofServ.h:30
Bool_t IsValid() const
Definition TXSocket.h:300
TSemaphore * fSem
Definition TXSocket.h:303
TXSemaphoreGuard(TSemaphore *sem)
Definition TXSocket.h:297
virtual ~TXSemaphoreGuard()
Definition TXSocket.h:298
This is the version of TSlave for workers servers based on XProofD.
Definition TXSlave.h:32
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Bool_t fOwn
Definition TXSocket.h:245
Int_t fSiz
Definition TXSocket.h:242
Int_t fLen
Definition TXSocket.h:243
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Char_t * fMem
Definition TXSocket.h:258
static Long64_t BuffMem()
Return the currently allocated memory.
static Long64_t fgMemMax
Definition TXSocket.h:260
~TXSockBuf()
destructor
static Long64_t fgBuffMem
Definition TXSocket.h:259
Char_t * fBuf
Definition TXSocket.h:244
void Resize(Int_t sz)
resize socket buffer
Int_t fCid
Definition TXSocket.h:246
void SetLoc(const char *loc="")
Definition TXSocket.h:282
TList fReadySock
Definition TXSocket.h:288
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
std::recursive_mutex fMutex
Definition TXSocket.h:285
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
virtual ~TXSockPipe()
Destructor.
Int_t GetRead() const
Definition TXSocket.h:276
void DumpReadySock()
Dump content of the ready socket list.
Bool_t IsValid() const
Definition TXSocket.h:272
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
TXSocket * GetLastReady()
Return last ready socket.
TString fLoc
Definition TXSocket.h:287
Int_t fPipe[2]
Definition TXSocket.h:286
Input handler for XProofD sockets.
High level handler of connections to XProofD.
Definition TXSocket.h:59
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Definition TXSocket.cxx:256
static std::list< TXSockBuf * > fgSQue
Definition TXSocket.h:117
TXSockBuf * fBufCur
Definition TXSocket.h:91
Int_t GetSessionID() const
Getter for session ID.
Definition TXSocket.cxx:943
virtual Int_t GetClientID() const
Definition TXSocket.h:155
TString fUser
Definition TXSocket.h:72
Int_t SetOption(ESockOptions, Int_t)
Set socket options.
Definition TXSocket.h:224
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Definition TXSocket.cxx:905
Int_t GetLogConnID() const
Getter for logical connection ID.
Definition TXSocket.cxx:919
Int_t Recv(char *mess, Int_t max)
Receive a character string message of maximum max length.
Definition TXSocket.h:187
Bool_t fRDInterrupt
Definition TXSocket.h:105
Int_t GetOpenError() const
Getter for last error.
Definition TXSocket.cxx:927
void EnableTimeout()
Definition TXSocket.h:228
std::list< TXSockBuf * > fAQue
Definition TXSocket.h:88
Int_t GetServType() const
Getter for server type.
Definition TXSocket.cxx:935
Int_t Send(Int_t status, Int_t kind)
Send a status and a single message opcode.
Definition TXSocket.h:173
static void SetLocation(const char *loc="")
Set location string.
Definition TXSocket.cxx:242
Int_t fPort
Definition TXSocket.h:74
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition TXSocket.cxx:71
Int_t fByteLeft
Definition TXSocket.h:89
std::recursive_mutex fAMtx
Definition TXSocket.h:86
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Int_t Send(const char *mess, Int_t kind=kMESS_STRING)
Send a character string buffer.
Definition TXSocket.h:175
@ kStopProcess
Definition TXSocket.h:139
virtual Int_t GetClientIDSize() const
Definition TXSocket.h:156
virtual ~TXSocket()
Destructor.
Definition TXSocket.cxx:231
void PushBackSpare()
Release read buffer giving back to the spare list.
Int_t Recv(char *mess, Int_t max, Int_t &kind)
Receive a character string message of maximum max length.
Definition TXSocket.h:189
Int_t fPid
Definition TXSocket.h:101
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition TXSocket.cxx:368
TSemaphore fAsynProc
Definition TXSocket.h:93
kXR_int32 fSendOpt
Definition TXSocket.h:70
static std::mutex fgSMtx
Definition TXSocket.h:116
virtual void RemoveClientID()
Definition TXSocket.h:165
XrdProofConn * fConn
Definition TXSocket.h:82
static void InitEnvs()
Init environment variables for XrdClient.
Int_t Send(Int_t kind)
Send a single message opcode.
Definition TXSocket.h:172
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Bool_t fIForward
Definition TXSocket.h:98
Bool_t IsInterrupt()
Definition TXSocket.h:203
Int_t fXrdProofdVersion
Definition TXSocket.h:108
std::recursive_mutex fIMtx
Definition TXSocket.h:96
TObject * fReference
Definition TXSocket.h:79
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Int_t GetLowSocket() const
static Bool_t fgInitDone
Definition TXSocket.h:113
Int_t fLogLevel
Definition TXSocket.h:76
virtual void Close(Option_t *opt="")
Close connection.
Definition TXSocket.cxx:311
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Int_t Recv(Int_t &status, Int_t &kind)
Receives a status and a message type.
Definition TXSocket.h:185
Bool_t IsAWait()
Definition TXSocket.h:210
void SetInterrupt(Bool_t i=kTRUE)
static TString fgLoc
Definition TXSocket.h:112
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
virtual Int_t Reconnect()
Try reconnection after failure.
void SetSendOpt(ESendRecvOptions o)
Definition TXSocket.h:167
TSemaphore fASem
Definition TXSocket.h:85
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
TString fBuffer
Definition TXSocket.h:78
TXHandler * fHandler
Definition TXSocket.h:80
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Bool_t fDontTimeout
Definition TXSocket.h:104
Int_t Flush()
Flush the asynchronous queue.
Int_t fByteCur
Definition TXSocket.h:90
static TXSockPipe fgPipe
Definition TXSocket.h:111
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition TXSocket.cxx:268
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition TXSocket.cxx:972
Short_t fSessionID
Definition TXSocket.h:71
virtual void SetClientID(Int_t)
Definition TXSocket.h:166
Int_t GetXrdProofdVersion() const
Definition TXSocket.h:161
char fMode
Definition TXSocket.h:69
Int_t Send(const TMessage &mess)
Send a TMessage object.
kXR_int32 fILev
Definition TXSocket.h:97
void CtrlC()
Interrupt the remote protocol instance.
TString fHost
Definition TXSocket.h:73
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
Definition TXSocket.cxx:959
void DisableTimeout()
Definition TXSocket.h:227
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
Definition TXSocket.cxx:848
void SetAWait(Bool_t w=kTRUE)
Definition TXSocket.h:207
Bool_t IsValid() const
Getter for validity status.
Definition TXSocket.cxx:951
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Bool_t fAWait
Definition TXSocket.h:87
Implementation of TXSocket using PF_UNIX sockets.
Int_t fOpt
Definition TXSocket.h:55
const char * fMsg
Definition TXSocket.h:56
Int_t fInt2
Definition TXSocket.h:50
Int_t fInt4
Definition TXSocket.h:52
Int_t fInt1
Definition TXSocket.h:49
Int_t fInt3
Definition TXSocket.h:51