ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TXSocket.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TXSocket
13 \ingroup proofx
14 
15 High level handler of connections to XProofD.
16 See TSocket for details.
17 
18 */
19 
20 #include "MessageTypes.h"
21 #include "TEnv.h"
22 #include "TError.h"
23 #include "TException.h"
24 #include "TMonitor.h"
25 #include "TObjString.h"
26 #include "TProof.h"
27 #include "TSlave.h"
28 #include "TRegexp.h"
29 #include "TROOT.h"
30 #include "TUrl.h"
31 #include "TXHandler.h"
32 #include "TXSocket.h"
33 #include "XProofProtocol.h"
34 
35 #include "XrdProofConn.h"
36 
38 #include "XrdClient/XrdClientConst.hh"
39 #include "XrdClient/XrdClientEnv.hh"
42 
43 #ifndef WIN32
44 #include <sys/socket.h>
45 #else
46 #include <Winsock2.h>
47 #endif
48 
49 
50 #include "XpdSysError.h"
51 #include "XpdSysLogger.h"
52 
53 // ---- Tracing utils ----------------------------------------------------------
54 #include "XrdProofdTrace.h"
55 XrdOucTrace *XrdProofdTrace = 0;
57 static XrdSysError eDest(0, "Proofx");
58 
59 #ifdef WIN32
62 #endif
63 
64 //______________________________________________________________________________
65 
66 //---- error handling ----------------------------------------------------------
67 
68 ////////////////////////////////////////////////////////////////////////////////
69 /// Interface to ErrorHandler (protected).
70 
71 void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
72 {
73  ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
74 }
75 
76 //----- Ping handler -----------------------------------------------------------
77 ////////////////////////////////////////////////////////////////////////////////
78 
79 class TXSocketPingHandler : public TFileHandler {
80  TXSocket *fSocket;
81 public:
82  TXSocketPingHandler(TXSocket *s, Int_t fd)
83  : TFileHandler(fd, 1) { fSocket = s; }
84  Bool_t Notify();
85  Bool_t ReadNotify() { return Notify(); }
86 };
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 /// Ping the socket
90 
92 {
93  fSocket->Ping("ping handler");
94 
95  return kTRUE;
96 }
97 
98 // Env variables init flag
99 Bool_t TXSocket::fgInitDone = kFALSE;
100 
101 // Static variables for input notification
102 TXSockPipe TXSocket::fgPipe; // Pipe for input monitoring
103 TString TXSocket::fgLoc = "undef"; // Location string
104 
105 // Static buffer manager
106 std::mutex TXSocket::fgSMtx; // To protect spare list
107 std::list<TXSockBuf *> TXSocket::fgSQue; // list of spare buffers
108 Long64_t TXSockBuf::fgBuffMem = 0; // Total allocated memory
109 Long64_t TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 MB]
110 
111 ////////////////////////////////////////////////////////////////////////////////
112 /// Constructor
113 /// Open the connection to a remote XrdProofd instance and start a PROOF
114 /// session.
115 /// The mode 'm' indicates the role of this connection:
116 /// 'a' Administrator; used by an XPD to contact the head XPD
117 /// 'i' Internal; used by a TXProofServ to call back its creator
118 /// (see XrdProofUnixConn)
119 /// 'C' PROOF manager: open connection only (do not start a session)
120 /// 'M' Client creating a top master
121 /// 'A' Client attaching to top master
122 /// 'm' Top master creating a submaster
123 /// 's' Master creating a slave
124 /// The buffer 'logbuf' is a null terminated string to be sent over at
125 /// login.
126 
127 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
128  const char *logbuf, Int_t loglevel, TXHandler *handler)
129  : TSocket(), fMode(m), fLogLevel(loglevel),
130  fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131  fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
132 {
133  fUrl = url;
134  // Enable tracing in the XrdProof client. if not done already
135  eDest.logger(&eLogger);
136  if (!XrdProofdTrace)
137  XrdProofdTrace = new XrdOucTrace(&eDest);
138 
139  // Init envs the first time
140  if (!fgInitDone)
141  InitEnvs();
142 
143  // Async queue related stuff
144  fAQue.clear();
145 
146  // Interrupts queue related stuff
147  fILev = -1;
148  fIForward = kFALSE;
149 
150  // Init some variables
151  fByteLeft = 0;
152  fByteCur = 0;
153  fBufCur = 0;
154  fServType = kPROOFD; // for consistency
155  fTcpWindowSize = -1;
156  fRemoteProtocol = -1;
157  // By default forward directly to end-point
158  fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
159  fSessionID = (fMode == 'C') ? -1 : psid;
160  fSocket = -1;
161 
162  // This is used by external code to create a link between this object
163  // and another one
164  fReference = 0;
165 
166  // The global pipe
167  if (!fgPipe.IsValid()) {
168  Error("TXSocket", "internal pipe is invalid");
169  return;
170  }
171 
172  // Some initial values
173  TUrl u(url);
175  u.SetProtocol("proof", kTRUE);
176  fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
177 
178  // Set the asynchronous handler
179  fHandler = handler;
180 
181  if (url) {
182 
183  // Create connection (for managers the type of the connection is the same
184  // as for top masters)
185  char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
186  fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
187  if (!fConn || !(fConn->IsValid())) {
189  if (gDebug > 0)
190  Error("TXSocket", "fatal error occurred while opening a connection"
191  " to server [%s]: %s", url, fConn->GetLastErr());
192  return;
193  }
194 
195  // Fill some info
196  fUser = fConn->fUser.c_str();
197  fHost = fConn->fHost.c_str();
198  fPort = fConn->fPort;
199 
200  // Create new proofserv if not client manager or administrator or internal mode
201  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A'|| fMode == 'L') {
202  // We attach or create
203  if (!Create()) {
204  // Failure
205  Error("TXSocket", "create or attach failed (%s)",
206  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
207  Close();
208  return;
209  }
210  }
211 
212  // Fill some other info available if Create is successful
213  if (fMode == 'C') {
216  }
217 
218  // Also in the base class
219  fUrl = fConn->fUrl.GetUrl().c_str();
220  fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
221  fAddress.fPort = fPort;
222 
223  // This is needed for the reader thread to signal an interrupt
224  fPid = gSystem->GetPid();
225  }
226 }
227 
228 ////////////////////////////////////////////////////////////////////////////////
229 /// Destructor
230 
232 {
233  // Disconnect from remote server (the connection manager is
234  // responsible of the underlying physical connection, so we do not
235  // force its closing)
236  Close();
237 }
238 
239 ////////////////////////////////////////////////////////////////////////////////
240 /// Set location string
241 
242 void TXSocket::SetLocation(const char *loc)
243 {
244  if (loc) {
245  fgLoc = loc;
246  fgPipe.SetLoc(loc);
247  } else {
248  fgLoc = "";
249  fgPipe.SetLoc("");
250  }
251 }
252 
253 ////////////////////////////////////////////////////////////////////////////////
254 /// Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
255 
257 {
258  if (id < 0 && fConn)
259  fConn->SetAsync(0);
260  fSessionID = id;
261 }
262 
263 ////////////////////////////////////////////////////////////////////////////////
264 /// Disconnect a session. Use opt= "S" or "s" to
265 /// shutdown remote session.
266 /// Default is opt = "".
267 
269 {
270  // Make sure we are connected
271  if (!IsValid()) {
272  if (gDebug > 0)
273  Info("DisconnectSession","not connected: nothing to do");
274  return;
275  }
276 
277  Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
278  Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
279 
280  if (id > -1 || all) {
281  // Prepare request
282  XPClientRequest Request;
283  memset(&Request, 0, sizeof(Request) );
284  fConn->SetSID(Request.header.streamid);
285  if (shutdown)
286  Request.proof.requestid = kXP_destroy;
287  else
288  Request.proof.requestid = kXP_detach;
289  Request.proof.sid = id;
290 
291  // Send request
292  XrdClientMessage *xrsp =
293  fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
294 
295  // Print error msg, if any
296  if (!xrsp && fConn->GetLastErr())
297  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
298 
299  // Cleanup
300  SafeDelete(xrsp);
301  }
302 }
303 
304 ////////////////////////////////////////////////////////////////////////////////
305 /// Close connection. Available options are (case insensitive)
306 /// 'P' force closing of the underlying physical connection
307 /// 'S' shutdown remote session, is any
308 /// A session ID can be given using #...# signature, e.g. "#1#".
309 /// Default is opt = "".
310 
312 {
313  Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
314  if (fAsynProc.Wait(to*1000) != 0)
315  Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
316 
317  // Remove any reference in the global pipe and ready-sock queue
318  TXSocket::fgPipe.Flush(this);
319 
320  // Make sure we have a connection
321  if (!fConn) {
322  if (gDebug > 0)
323  Info("Close","no connection: nothing to do");
324  fAsynProc.Post();
325  return;
326  }
327 
328  // Disconnect the asynchronous requests handler
329  fConn->SetAsync(0);
330 
331  // If we are connected we disconnect
332  if (IsValid()) {
333 
334  // Parse options
335  TString o(opt);
336  Int_t sessID = fSessionID;
337  if (o.Index("#") != kNPOS) {
338  o.Remove(0,o.Index("#")+1);
339  if (o.Index("#") != kNPOS) {
340  o.Remove(o.Index("#"));
341  sessID = o.IsDigit() ? o.Atoi() : sessID;
342  }
343  }
344 
345  if (sessID > -1) {
346  // Warn the remote session, if any (after destroy the session is gone)
347  DisconnectSession(sessID, opt);
348  } else {
349  // We are the manager: close underlying connection
350  fConn->Close(opt);
351  }
352  }
353 
354  // Delete the connection module
355  SafeDelete(fConn);
356 
357  // Post semaphore
358  fAsynProc.Post();
359 }
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 /// We are here if an unsolicited response comes from a logical conn
363 /// The response comes in the form of an XrdClientMessage *, that must NOT be
364 /// destroyed after processing. It is destroyed by the first sender.
365 /// Remember that we are in a separate thread, since unsolicited
366 /// responses are asynchronous by nature.
367 
368 UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
370 {
371  UnsolRespProcResult rc = kUNSOL_KEEP;
372 
373  // If we are closing we will not do anything
375  if (!semg.IsValid()) {
376  Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
377  return kUNSOL_CONTINUE;
378  }
379 
380  if (!m) {
381  if (gDebug > 2)
382  Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
383  // Some one is perhaps interested in empty messages
384  return kUNSOL_CONTINUE;
385  } else {
386  if (gDebug > 2)
387  Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
388  this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
389  }
390 
391  // Error notification
392  if (m->IsError()) {
394  if (gDebug > 0)
395  Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
396  XHandleErr_t herr = {1, 0};
397  if (!fHandler || fHandler->HandleError((const void *)&herr)) {
398  if (gDebug > 0)
399  Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
400  // Avoid to contact the server any more
401  fSessionID = -1;
402  } else {
403  // Connection still usable: update usage timestamp
404  Touch();
405  }
406  } else {
407  // Time out
408  if (gDebug > 2)
409  Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
410  }
411  // Propagate the message to other possible handlers
412  return kUNSOL_CONTINUE;
413  }
414 
415  // From now on make sure is for us (but only if not during setup, i.e. fConn == 0; otherwise
416  // we may miss some important server message)
417  if (fConn && !m->MatchStreamid(fConn->fStreamid)) {
418  if (gDebug > 1)
419  Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
420  return kUNSOL_CONTINUE;
421  }
422 
423  // Local processing ...
424  Int_t len = 0;
425  if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
426  Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
428  return rc;
429  }
430 
431  // Activity on the line: update usage timestamp
432  Touch();
433 
434  // The first 4 bytes contain the action code
435  kXR_int32 acod = 0;
436  memcpy(&acod, m->GetData(), sizeof(kXR_int32));
437  if (acod > 10000)
438  Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
439  this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
440  //
441  // Update pointer to data
442  void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
443  len -= sizeof(kXR_int32);
444  if (gDebug > 1)
445  Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
446  this, acod, len, m->HeaderSID());
447 
448  if (gDebug > 3)
450 
451  // Case by case
452  kXR_int32 ilev = -1;
453  const char *lab = 0;
454 
455  switch (acod) {
456  case kXPD_ping:
457  //
458  // Special interrupt
459  ilev = TProof::kPing;
460  lab = "kXPD_ping";
461  case kXPD_interrupt:
462  //
463  // Interrupt
464  lab = !lab ? "kXPD_interrupt" : lab;
465  { std::lock_guard<std::recursive_mutex> lock(fIMtx);
466  if (acod == kXPD_interrupt) {
467  memcpy(&ilev, pdata, sizeof(kXR_int32));
468  ilev = net2host(ilev);
469  // Update pointer to data
470  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
471  len -= sizeof(kXR_int32);
472  }
473  // The next 4 bytes contain the forwarding option
474  kXR_int32 ifw = 0;
475  if (len > 0) {
476  memcpy(&ifw, pdata, sizeof(kXR_int32));
477  ifw = net2host(ifw);
478  if (gDebug > 1)
479  Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
480  }
481  //
482  // Save the interrupt
483  fILev = ilev;
484  fIForward = (ifw == 1) ? kTRUE : kFALSE;
485 
486  // Handle this input in this thread to avoid queuing on the
487  // main thread
488  XHandleIn_t hin = {acod, 0, 0, 0};
489  if (fHandler)
490  fHandler->HandleInput((const void *)&hin);
491  else
492  Error("ProcessUnsolicitedMsg","handler undefined");
493  }
494  break;
495  case kXPD_timer:
496  //
497  // Set shutdown timer
498  {
499  kXR_int32 opt = 1;
500  kXR_int32 delay = 0;
501  // The next 4 bytes contain the shutdown option
502  if (len > 0) {
503  memcpy(&opt, pdata, sizeof(kXR_int32));
504  opt = net2host(opt);
505  if (gDebug > 1)
506  Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
507  // Update pointer to data
508  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
509  len -= sizeof(kXR_int32);
510  }
511  // The next 4 bytes contain the delay
512  if (len > 0) {
513  memcpy(&delay, pdata, sizeof(kXR_int32));
514  delay = net2host(delay);
515  if (gDebug > 1)
516  Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
517  // Update pointer to data
518  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
519  len -= sizeof(kXR_int32);
520  }
521 
522  // Handle this input in this thread to avoid queuing on the
523  // main thread
524  XHandleIn_t hin = {acod, opt, delay, 0};
525  if (fHandler)
526  fHandler->HandleInput((const void *)&hin);
527  else
528  Error("ProcessUnsolicitedMsg","handler undefined");
529  }
530  break;
531  case kXPD_inflate:
532  //
533  // Set inflate factor
534  {
535  kXR_int32 inflate = 1000;
536  if (len > 0) {
537  memcpy(&inflate, pdata, sizeof(kXR_int32));
538  inflate = net2host(inflate);
539  if (gDebug > 1)
540  Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
541  // Update pointer to data
542  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
543  len -= sizeof(kXR_int32);
544  }
545  // Handle this input in this thread to avoid queuing on the
546  // main thread
547  XHandleIn_t hin = {acod, inflate, 0, 0};
548  if (fHandler)
549  fHandler->HandleInput((const void *)&hin);
550  else
551  Error("ProcessUnsolicitedMsg","handler undefined");
552  }
553  break;
554  case kXPD_priority:
555  //
556  // Broadcast group priority
557  {
558  kXR_int32 priority = -1;
559  if (len > 0) {
560  memcpy(&priority, pdata, sizeof(kXR_int32));
561  priority = net2host(priority);
562  if (gDebug > 1)
563  Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
564  // Update pointer to data
565  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
566  len -= sizeof(kXR_int32);
567  }
568  // Handle this input in this thread to avoid queuing on the
569  // main thread
570  XHandleIn_t hin = {acod, priority, 0, 0};
571  if (fHandler)
572  fHandler->HandleInput((const void *)&hin);
573  else
574  Error("ProcessUnsolicitedMsg","handler undefined");
575  }
576  break;
577  case kXPD_flush:
578  //
579  // Flush request
580  {
581  // Handle this input in this thread to avoid queuing on the
582  // main thread
583  XHandleIn_t hin = {acod, 0, 0, 0};
584  if (fHandler)
585  fHandler->HandleInput((const void *)&hin);
586  else
587  Error("ProcessUnsolicitedMsg","handler undefined");
588  }
589  break;
590  case kXPD_urgent:
591  //
592  // Set shutdown timer
593  {
594  // The next 4 bytes contain the urgent msg type
595  kXR_int32 type = -1;
596  if (len > 0) {
597  memcpy(&type, pdata, sizeof(kXR_int32));
598  type = net2host(type);
599  if (gDebug > 1)
600  Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
601  // Update pointer to data
602  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
603  len -= sizeof(kXR_int32);
604  }
605  // The next 4 bytes contain the first info container
606  kXR_int32 int1 = -1;
607  if (len > 0) {
608  memcpy(&int1, pdata, sizeof(kXR_int32));
609  int1 = net2host(int1);
610  if (gDebug > 1)
611  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
612  // Update pointer to data
613  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
614  len -= sizeof(kXR_int32);
615  }
616  // The next 4 bytes contain the second info container
617  kXR_int32 int2 = -1;
618  if (len > 0) {
619  memcpy(&int2, pdata, sizeof(kXR_int32));
620  int2 = net2host(int2);
621  if (gDebug > 1)
622  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
623  // Update pointer to data
624  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
625  len -= sizeof(kXR_int32);
626  }
627 
628  // Handle this input in this thread to avoid queuing on the
629  // main thread
630  XHandleIn_t hin = {acod, type, int1, int2};
631  if (fHandler)
632  fHandler->HandleInput((const void *)&hin);
633  else
634  Error("ProcessUnsolicitedMsg","handler undefined");
635  }
636  break;
637  case kXPD_msg:
638  //
639  // Data message
640  { std::lock_guard<std::recursive_mutex> lock(fAMtx);
641 
642  // Get a spare buffer
643  TXSockBuf *b = PopUpSpare(len);
644  if (!b) {
645  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
646  return rc;
647  }
648  memcpy(b->fBuf, pdata, len);
649  b->fLen = len;
650 
651  // Update counters
652  fBytesRecv += len;
653 
654  // Produce the message
655  fAQue.push_back(b);
656 
657  // Post the global pipe
658  fgPipe.Post(this);
659 
660  // Signal it and release the mutex
661  if (gDebug > 2)
662  Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
663  this, GetTitle(), &fASem, len);
664  fASem.Post();
665  }
666 
667  break;
668  case kXPD_feedback:
669  Info("ProcessUnsolicitedMsg",
670  "kXPD_feedback treatment not yet implemented");
671  break;
672  case kXPD_srvmsg:
673  //
674  // Service message
675  {
676  // The next 4 bytes may contain a flag to control the way the message is displayed
677  kXR_int32 opt = 0;
678  memcpy(&opt, pdata, sizeof(kXR_int32));
679  opt = net2host(opt);
680  if (opt >= 0 && opt <= 4) {
681  // Update pointer to data
682  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
683  len -= sizeof(kXR_int32);
684  } else {
685  opt = 1;
686  }
687 
688  if (opt == 0) {
689  // One line
690  Printf("| %.*s", len, (char *)pdata);
691  } else if (opt == 2) {
692  // Raw displaying
693  Printf("%.*s", len, (char *)pdata);
694  } else if (opt == 3) {
695  // Incremental displaying
696  fprintf(stderr, "%.*s", len, (char *)pdata);
697  } else if (opt == 4) {
698  // Rewind
699  fprintf(stderr, "%.*s\r", len, (char *)pdata);
700  } else {
701  // A small header
702  Printf(" ");
703  Printf("| Message from server:");
704  Printf("| %.*s", len, (char *)pdata);
705  }
706  }
707  break;
708  case kXPD_errmsg:
709  //
710  // Error condition with message
711  Printf("\n\n");
712  Printf("| Error condition occured: message from server:");
713  Printf("| %.*s", len, (char *)pdata);
714  Printf("\n");
715  // Handle error
716  if (fHandler)
718  else
719  Error("ProcessUnsolicitedMsg","handler undefined");
720  break;
721  case kXPD_msgsid:
722  //
723  // Data message
724  { std::lock_guard<std::recursive_mutex> lock(fAMtx);
725 
726  // The next 4 bytes contain the sessiond id
727  kXR_int32 cid = 0;
728  memcpy(&cid, pdata, sizeof(kXR_int32));
729  cid = net2host(cid);
730 
731  if (gDebug > 1)
732  Info("ProcessUnsolicitedMsg","found cid: %d", cid);
733 
734  // Update pointer to data
735  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
736  len -= sizeof(kXR_int32);
737 
738  // Get a spare buffer
739  TXSockBuf *b = PopUpSpare(len);
740  if (!b) {
741  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
742  return rc;
743  }
744  memcpy(b->fBuf, pdata, len);
745  b->fLen = len;
746 
747  // Set the sid
748  b->fCid = cid;
749 
750  // Update counters
751  fBytesRecv += len;
752 
753  // Produce the message
754  fAQue.push_back(b);
755 
756  // Post the global pipe
757  fgPipe.Post(this);
758 
759  // Signal it and release the mutex
760  if (gDebug > 2)
761  Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
762  this, cid, &fASem, len);
763  fASem.Post();
764  }
765 
766  break;
767  case kXPD_wrkmortem:
768  //
769  // A worker died
770  { TString what = TString::Format("%.*s", len, (char *)pdata);
771  if (what.BeginsWith("idle-timeout")) {
772  // Notify the idle timeout
774  } else {
775  Printf(" ");
776  Printf("| %s", what.Data());
777  // Handle error
778  if (fHandler)
780  else
781  Error("ProcessUnsolicitedMsg","handler undefined");
782  }
783  }
784  break;
785 
786  case kXPD_touch:
787  //
788  // Request for remote touch: post a message to do that
790  break;
791  case kXPD_resume:
792  //
793  // process the next query (in the TXProofServ)
795  break;
796  case kXPD_clusterinfo:
797  //
798  // Broadcast cluster information
799  {
800  kXR_int32 nsess = -1, nacti = -1, neffs = -1;
801  if (len > 0) {
802  // Total sessions
803  memcpy(&nsess, pdata, sizeof(kXR_int32));
804  nsess = net2host(nsess);
805  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
806  len -= sizeof(kXR_int32);
807  // Active sessions
808  memcpy(&nacti, pdata, sizeof(kXR_int32));
809  nacti = net2host(nacti);
810  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
811  len -= sizeof(kXR_int32);
812  // Effective sessions
813  memcpy(&neffs, pdata, sizeof(kXR_int32));
814  neffs = net2host(neffs);
815  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
816  len -= sizeof(kXR_int32);
817  }
818  if (gDebug > 1)
819  Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
820  " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
821  // Handle this input in this thread to avoid queuing on the
822  // main thread
823  XHandleIn_t hin = {acod, nsess, nacti, neffs};
824  if (fHandler)
825  fHandler->HandleInput((const void *)&hin);
826  else
827  Error("ProcessUnsolicitedMsg","handler undefined");
828  }
829  break;
830  default:
831  Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
832  this, acod, GetTitle());
834  break;
835  }
836 
837  // We are done
838  return rc;
839 }
840 
841 ////////////////////////////////////////////////////////////////////////////////
842 /// Post a message of type 'type' into the read messages queue.
843 /// If 'msg' is defined it is also added as TString.
844 /// This is used, for example, with kPROOF_FATAL to force the main thread
845 /// to mark this socket as bad, avoiding race condition when a worker
846 /// dies while in processing state.
847 
848 void TXSocket::PostMsg(Int_t type, const char *msg)
849 {
850  // Create the message
851  TMessage m(type);
852 
853  // Add the string if any
854  if (msg && strlen(msg) > 0)
855  m << TString(msg);
856 
857  // Write length in first word of buffer
858  m.SetLength();
859 
860  // Get pointer to the message buffer
861  char *mbuf = m.Buffer();
862  Int_t mlen = m.Length();
863  if (m.CompBuffer()) {
864  mbuf = m.CompBuffer();
865  mlen = m.CompLength();
866  }
867 
868  //
869  // Data message
870  std::lock_guard<std::recursive_mutex> lock(fAMtx);
871 
872  // Get a spare buffer
873  TXSockBuf *b = PopUpSpare(mlen);
874  if (!b) {
875  Error("PostMsg", "could allocate spare buffer");
876  return;
877  }
878 
879  // Fill the pipe buffer
880  memcpy(b->fBuf, mbuf, mlen);
881  b->fLen = mlen;
882 
883  // Update counters
884  fBytesRecv += mlen;
885 
886  // Produce the message
887  fAQue.push_back(b);
888 
889  // Post the global pipe
890  fgPipe.Post(this);
891 
892  // Signal it and release the mutex
893  if (gDebug > 0)
894  Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
895  this, type, &fASem, mlen);
896  fASem.Post();
897 
898  // Done
899  return;
900 }
901 
902 ////////////////////////////////////////////////////////////////////////////////
903 /// Wake up all threads waiting for at the semaphore (used by TXSlave)
904 
906 {
907  std::lock_guard<std::recursive_mutex> lock(fAMtx);
908 
909  // Post semaphore to wake up anybody waiting; send as many posts as needed
910  while (fASem.TryWait() != 1)
911  fASem.Post();
912 
913  return;
914 }
915 
916 
917 ////////////////////////////////////////////////////////////////////////////////
918 /// Return kTRUE if the remote server is a 'proofd'
919 
921 {
923  return kTRUE;
924 
925  // Failure
926  return kFALSE;
927 }
928 
929 ////////////////////////////////////////////////////////////////////////////////
930 /// Get latest interrupt level and reset it; if the interrupt has to be
931 /// propagated to lower stages forward will be kTRUE after the call
932 
934 {
935  if (gDebug > 2)
936  Info("GetInterrupt","%p: waiting to lock mutex", this);
937 
938  std::lock_guard<std::recursive_mutex> lock(fIMtx);
939 
940  // Reset values
941  Int_t ilev = -1;
942  forward = kFALSE;
943 
944  // Check if filled
945  if (fILev == -1)
946  Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
947 
948  // Fill output
949  ilev = fILev;
950  forward = fIForward;
951 
952  // Reset values (we process it only once)
953  fILev = -1;
954  fIForward = kFALSE;
955 
956  // Return what we got
957  return ilev;
958 }
959 
960 ////////////////////////////////////////////////////////////////////////////////
961 /// Flush the asynchronous queue.
962 /// Typically called when a kHardInterrupt is received.
963 /// Returns number of bytes in flushed buffers.
964 
966 {
967  Int_t nf = 0;
968  list<TXSockBuf *> splist;
969  list<TXSockBuf *>::iterator i;
970 
971  { std::lock_guard<std::recursive_mutex> lock(fAMtx);
972 
973  // Must have something to flush
974  if (fAQue.size() > 0) {
975 
976  // Save size for later semaphore cleanup
977  Int_t sz = fAQue.size();
978  // get the highest interrupt level
979  for (i = fAQue.begin(); i != fAQue.end();) {
980  if (*i) {
981  splist.push_back(*i);
982  nf += (*i)->fLen;
983  i = fAQue.erase(i);
984  }
985  }
986 
987  // Reset the asynchronous queue
988  while (sz--) {
989  if (fASem.TryWait() == 1)
990  Printf("Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
991  }
992  fAQue.clear();
993  }
994  }
995 
996  // Move spares to the spare queue
997  { std::lock_guard<std::mutex> lock(fgSMtx);
998  if (splist.size() > 0) {
999  for (i = splist.begin(); i != splist.end();) {
1000  fgSQue.push_back(*i);
1001  i = splist.erase(i);
1002  }
1003  }
1004  }
1005 
1006  // We are done
1007  return nf;
1008 }
1009 
1010 ////////////////////////////////////////////////////////////////////////////////
1011 /// This method sends a request for creation of (or attachment to) a remote
1012 /// server application.
1013 
1015 {
1016  // Make sure we are connected
1017  if (!IsValid()) {
1018  if (gDebug > 0)
1019  Info("Create","not connected: nothing to do");
1020  return kFALSE;
1021  }
1022 
1023  Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
1024 
1025  while (retriesleft--) {
1026 
1027  XPClientRequest reqhdr;
1028 
1029  // We fill the header struct containing the request for login
1030  memset( &reqhdr, 0, sizeof(reqhdr));
1031  fConn->SetSID(reqhdr.header.streamid);
1032 
1033  // This will be a kXP_attach or kXP_create request
1034  if (fMode == 'A' || attach) {
1035  reqhdr.header.requestid = kXP_attach;
1036  reqhdr.proof.sid = fSessionID;
1037  } else {
1038  reqhdr.header.requestid = kXP_create;
1039  }
1040 
1041  // Send log level
1042  reqhdr.proof.int1 = fLogLevel;
1043 
1044  // Send also the chosen alias
1045  const void *buf = (const void *)(fBuffer.Data());
1046  reqhdr.header.dlen = fBuffer.Length();
1047  if (gDebug >= 2)
1048  Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
1049 
1050  // We call SendReq, the function devoted to sending commands.
1051  if (gDebug > 1)
1052  Info("Create", "creating session of server %s", fUrl.Data());
1053 
1054  // server response header
1055  char *answData = 0;
1056  XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
1057  &answData, "TXSocket::Create", 0);
1058  struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
1059 
1060  // If any, the URL the data pool entry point will be stored here
1061  fBuffer = "";
1062  if (xrsp) {
1063 
1064  //
1065  // Pointer to data
1066  void *pdata = (void *)(xrsp->GetData());
1067  Int_t len = xrsp->DataLen();
1068 
1069  if (len >= (Int_t)sizeof(kXR_int32)) {
1070  // The first 4 bytes contain the session ID
1071  kXR_int32 psid = 0;
1072  memcpy(&psid, pdata, sizeof(kXR_int32));
1073  fSessionID = net2host(psid);
1074  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1075  len -= sizeof(kXR_int32);
1076  } else {
1077  Error("Create","session ID is undefined!");
1078  fSessionID = -1;
1079  if (srvresp) free(srvresp);
1080  return kFALSE;
1081  }
1082 
1083  if (len >= (Int_t)sizeof(kXR_int16)) {
1084  // The second 2 bytes contain the remote PROOF protocol version
1085  kXR_int16 dver = 0;
1086  memcpy(&dver, pdata, sizeof(kXR_int16));
1087  fRemoteProtocol = net2host(dver);
1088  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1089  len -= sizeof(kXR_int16);
1090  } else {
1091  Warning("Create","protocol version of the remote PROOF undefined!");
1092  }
1093 
1094  if (fRemoteProtocol == 0) {
1095  // We are dealing with an older server: the PROOF protocol is on 4 bytes
1096  len += sizeof(kXR_int16);
1097  kXR_int32 dver = 0;
1098  memcpy(&dver, pdata, sizeof(kXR_int32));
1099  fRemoteProtocol = net2host(dver);
1100  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1101  len -= sizeof(kXR_int32);
1102  } else {
1103  if (len >= (Int_t)sizeof(kXR_int16)) {
1104  // The third 2 bytes contain the remote XrdProofdProtocol version
1105  kXR_int16 dver = 0;
1106  memcpy(&dver, pdata, sizeof(kXR_int16));
1107  fXrdProofdVersion = net2host(dver);
1108  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1109  len -= sizeof(kXR_int16);
1110  } else {
1111  Warning("Create","version of the remote XrdProofdProtocol undefined!");
1112  }
1113  }
1114 
1115  if (len > 0) {
1116  // From top masters, the url of the data pool
1117  char *url = new char[len+1];
1118  memcpy(url, pdata, len);
1119  url[len] = 0;
1120  fBuffer = url;
1121  delete[] url;
1122  }
1123 
1124  // Cleanup
1125  SafeDelete(xrsp);
1126  if (srvresp) free(srvresp);
1127 
1128  // Notify
1129  return kTRUE;
1130  } else {
1131  // Extract log file path, if any
1132  Ssiz_t ilog = kNPOS;
1133  if (retriesleft <= 0 && fConn->GetLastErr()) {
1134  fBuffer = fConn->GetLastErr();
1135  if ((ilog = fBuffer.Index("|log:")) != kNPOS) fBuffer.Remove(0, ilog);
1136  }
1137  // If not free resources now, just give up
1138  if (fConn->GetOpenError() == kXP_TooManySess) {
1139  // Avoid to contact the server any more
1140  fSessionID = -1;
1141  if (srvresp) free(srvresp);
1142  return kFALSE;
1143  } else {
1144  // Print error msg, if any
1145  if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr()) {
1146  TString emsg(fConn->GetLastErr());
1147  if ((ilog = emsg.Index("|log:")) != kNPOS) emsg.Remove(ilog);
1148  Printf("%s: %s", fHost.Data(), emsg.Data());
1149  }
1150  }
1151  }
1152 
1153  if (gDebug > 0)
1154  Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
1155  if (retriesleft <= 0)
1156  Error("Create", "%d creation/attachment attempts failed: no attempts left",
1157  gEnv->GetValue("XProof.CreationRetries", 4));
1158 
1159  if (srvresp) free(srvresp);
1160  } // Creation retries
1161 
1162  // The session is invalid: reset the sessionID to invalid state (it was our protocol
1163  // number during creation
1164  fSessionID = -1;
1165 
1166  // Notify failure
1167  Error("Create:",
1168  "problems creating or attaching to a remote server (%s)",
1169  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
1170  return kFALSE;
1171 }
1172 
1173 ////////////////////////////////////////////////////////////////////////////////
1174 /// Send a raw buffer of specified length.
1175 /// Use opt = kDontBlock to ask xproofd to push the message into the proofsrv.
1176 /// (by default is appended to a queue waiting for a request from proofsrv).
1177 /// Returns the number of bytes sent or -1 in case of error.
1178 
1180 {
1182 
1183  // Options and request ID
1184  fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
1185  : (~kXPD_async & fSendOpt) ;
1186 
1187  // Prepare request
1188  XPClientRequest Request;
1189  memset( &Request, 0, sizeof(Request) );
1190  fConn->SetSID(Request.header.streamid);
1191  Request.sendrcv.requestid = kXP_sendmsg;
1192  Request.sendrcv.sid = fSessionID;
1193  Request.sendrcv.opt = fSendOpt;
1194  Request.sendrcv.cid = GetClientID();
1195  Request.sendrcv.dlen = length;
1196  if (gDebug >= 2)
1197  Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
1198 
1199  // Send request
1200  XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
1201 
1202  if (xrsp) {
1203  // Prepare return info
1204  Int_t nsent = length;
1205 
1206  // Update counters
1207  fBytesSent += length;
1208 
1209  // Cleanup
1210  SafeDelete(xrsp);
1211 
1212  // Success: update usage timestamp
1213  Touch();
1214 
1215  // ok
1216  return nsent;
1217  } else {
1218  // Print error message, if any
1219  if (fConn->GetLastErr())
1220  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1221  else
1222  Printf("%s: error occured but no message from server", fHost.Data());
1223  }
1224 
1225  // Failure notification (avoid using the handler: we may be exiting)
1226  Error("SendRaw", "%s: problems sending %d bytes to server",
1227  fHost.Data(), length);
1228  return -1;
1229 }
1230 
1231 ////////////////////////////////////////////////////////////////////////////////
1232 /// Ping functionality: contact the server to check its vitality.
1233 /// If external, the server waits for a reply from the server
1234 /// Returns kTRUE if OK or kFALSE in case of error.
1235 
1237 {
1239 
1240  if (gDebug > 0)
1241  Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
1242 
1243  // Make sure we are connected
1244  if (!IsValid()) {
1245  Error("Ping","not connected: nothing to do");
1246  return kFALSE;
1247  }
1248 
1249  // Options
1250  kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
1251 
1252  // Prepare request
1253  XPClientRequest Request;
1254  memset( &Request, 0, sizeof(Request) );
1255  fConn->SetSID(Request.header.streamid);
1256  Request.sendrcv.requestid = kXP_ping;
1257  Request.sendrcv.sid = fSessionID;
1258  Request.sendrcv.opt = options;
1259  Request.sendrcv.dlen = 0;
1260 
1261  // Send request
1262  Bool_t res = kFALSE;
1263  if (fMode != 'i') {
1264  char *pans = 0;
1265  XrdClientMessage *xrsp =
1266  fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
1267  kXR_int32 *pres = (kXR_int32 *) pans;
1268 
1269  // Get the result
1270  if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
1271  *pres = net2host(*pres);
1272  res = (*pres == 1) ? kTRUE : kFALSE;
1273  // Success: update usage timestamp
1274  Touch();
1275  } else {
1276  // Print error msg, if any
1277  if (fConn->GetLastErr())
1278  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1279  }
1280 
1281  // Cleanup
1282  SafeDelete(xrsp);
1283  if (pans) free(pans);
1284 
1285  } else {
1286  if (XPD::clientMarshall(&Request) == 0) {
1287  XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
1288  res = (e == kOK) ? kTRUE : kFALSE;
1289  } else {
1290  Error("Ping", "%p: int: problems marshalling request", this);
1291  }
1292  }
1293 
1294  // Failure notification (avoid using the handler: we may be exiting)
1295  if (!res) {
1296  Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
1297  } else if (gDebug > 0) {
1298  Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
1299  }
1300 
1301  return res;
1302 }
1303 
1304 ////////////////////////////////////////////////////////////////////////////////
1305 /// Remote touch functionality: contact the server to proof our vitality.
1306 /// No reply from server is expected.
1307 
1309 {
1311 
1312  if (gDebug > 0)
1313  Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
1314 
1315  // Make sure we are connected
1316  if (!IsValid()) {
1317  Error("RemoteTouch","not connected: nothing to do");
1318  return;
1319  }
1320 
1321  // Prepare request
1322  XPClientRequest Request;
1323  memset( &Request, 0, sizeof(Request) );
1324  fConn->SetSID(Request.header.streamid);
1325  Request.sendrcv.requestid = kXP_touch;
1326  Request.sendrcv.sid = fSessionID;
1327  Request.sendrcv.opt = 0;
1328  Request.sendrcv.dlen = 0;
1329 
1330  // We need the right order
1331  if (XPD::clientMarshall(&Request) != 0) {
1332  Error("Touch", "%p: problems marshalling request ", this);
1333  return;
1334  }
1335  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1336  Error("Touch", "%p: problems sending touch request to server", this);
1337 
1338  // Done
1339  return;
1340 }
1341 
1342 ////////////////////////////////////////////////////////////////////////////////
1343 /// Interrupt the remote protocol instance. Used to propagate Ctrl-C.
1344 /// No reply from server is expected.
1345 
1347 {
1349 
1350  if (gDebug > 0)
1351  Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
1352 
1353  // Make sure we are connected
1354  if (!IsValid()) {
1355  Error("CtrlC","not connected: nothing to do");
1356  return;
1357  }
1358 
1359  // Prepare request
1360  XPClientRequest Request;
1361  memset( &Request, 0, sizeof(Request) );
1362  fConn->SetSID(Request.header.streamid);
1363  Request.proof.requestid = kXP_ctrlc;
1364  Request.proof.sid = 0;
1365  Request.proof.dlen = 0;
1366 
1367  // We need the right order
1368  if (XPD::clientMarshall(&Request) != 0) {
1369  Error("CtrlC", "%p: problems marshalling request ", this);
1370  return;
1371  }
1372  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1373  Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
1374 
1375  // Done
1376  return;
1377 }
1378 
1379 ////////////////////////////////////////////////////////////////////////////////
1380 /// Wait and pick-up next buffer from the asynchronous queue
1381 
1383 {
1384  fBufCur = 0;
1385  fByteLeft = 0;
1386  fByteCur = 0;
1387  if (gDebug > 2)
1388  Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
1389 
1390  // User can choose whether to wait forever or for a fixed amount of time
1391  if (!fDontTimeout) {
1392  static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
1393  static Int_t dt = 2000;
1394  Int_t to = timeout;
1396  while (to && !IsInterrupt()) {
1397  SetAWait(kTRUE);
1398  if (fASem.Wait(dt) != 0) {
1399  to -= dt;
1400  if (to <= 0) {
1401  Error("PickUpReady","error waiting at semaphore");
1402  return -1;
1403  } else {
1404  if (gDebug > 0)
1405  Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
1406  this, GetTitle(), to/1000);
1407  }
1408  } else
1409  break;
1410  SetAWait(kFALSE);
1411  }
1412  // We wait forever
1413  if (IsInterrupt()) {
1414  if (gDebug > 2)
1415  Info("PickUpReady","interrupted");
1417  SetAWait(kFALSE);
1418  return -1;
1419  }
1420  } else {
1421  // We wait forever
1422  SetAWait(kTRUE);
1423  if (fASem.Wait() != 0) {
1424  Error("PickUpReady","error waiting at semaphore");
1425  SetAWait(kFALSE);
1426  return -1;
1427  }
1428  SetAWait(kFALSE);
1429  }
1430  if (gDebug > 2)
1431  Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
1432 
1433  std::lock_guard<std::recursive_mutex> lock(fAMtx);
1434 
1435  // Get message, if any
1436  if (fAQue.size() <= 0) {
1437  Error("PickUpReady","queue is empty - protocol error ?");
1438  return -1;
1439  }
1440  if (!(fBufCur = fAQue.front())) {
1441  Error("PickUpReady","got invalid buffer - protocol error ?");
1442  return -1;
1443  }
1444  // Remove message from the queue
1445  fAQue.pop_front();
1446 
1447  // Set number of available bytes
1448  fByteLeft = fBufCur->fLen;
1449 
1450  if (gDebug > 2)
1451  Info("PickUpReady", "%p: %s: got message (%d bytes)",
1452  this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
1453 
1454  // Update counters
1455  fBytesRecv += fBufCur->fLen;
1456 
1457  // Set session ID
1458  if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
1460 
1461  // Clean entry in the underlying pipe
1462  fgPipe.Clean(this);
1463 
1464  // We are done
1465  return 0;
1466 }
1467 
1468 ////////////////////////////////////////////////////////////////////////////////
1469 /// Pop-up a buffer of at least size bytes from the spare list
1470 /// If none is found either one is reallocated or a new one
1471 /// created
1472 
1474 {
1475  TXSockBuf *buf = 0;
1476  static Int_t nBuf = 0;
1477 
1478  std::lock_guard<std::mutex> lock(fgSMtx);
1479 
1480  Int_t maxsz = 0;
1481  if (fgSQue.size() > 0) {
1482  list<TXSockBuf *>::iterator i;
1483  for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
1484  maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1485  if ((*i) && (*i)->fSiz >= size) {
1486  buf = *i;
1487  if (gDebug > 2)
1488  Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1489  size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
1490  // Drop from this list
1491  fgSQue.erase(i);
1492  return buf;
1493  }
1494  }
1495  // All buffers are too small: enlarge the first one
1496  buf = fgSQue.front();
1497  buf->Resize(size);
1498  if (gDebug > 2)
1499  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1500  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1501  // Drop from this list
1502  fgSQue.pop_front();
1503  return buf;
1504  }
1505 
1506  // Create a new buffer
1507  buf = new TXSockBuf((char *)malloc(size), size);
1508  nBuf++;
1509 
1510  if (gDebug > 2)
1511  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1512  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1513 
1514  // We are done
1515  return buf;
1516 }
1517 
1518 ////////////////////////////////////////////////////////////////////////////////
1519 /// Release read buffer giving back to the spare list
1520 
1522 {
1523  std::lock_guard<std::mutex> lock(fgSMtx);
1524 
1525  if (gDebug > 2)
1526  Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
1528 
1530  fgSQue.push_back(fBufCur);
1531  } else {
1532  delete fBufCur;
1533  }
1534  fBufCur = 0;
1535  fByteCur = 0;
1536  fByteLeft = 0;
1537 }
1538 
1539 ////////////////////////////////////////////////////////////////////////////////
1540 /// Receive a raw buffer of specified length bytes.
1541 
1543 {
1544  // Inputs must make sense
1545  if (!buffer || (length <= 0))
1546  return -1;
1547 
1548  // Wait and pick-up a read buffer if we do not have one
1549  if (!fBufCur && (PickUpReady() != 0))
1550  return -1;
1551 
1552  // Use it
1553  if (fByteLeft >= length) {
1554  memcpy(buffer, fBufCur->fBuf + fByteCur, length);
1555  fByteCur += length;
1556  if ((fByteLeft -= length) <= 0)
1557  // All used: give back
1558  PushBackSpare();
1559  // Success: update usage timestamp
1560  Touch();
1561  return length;
1562  } else {
1563  // Copy the first part
1564  memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
1565  Int_t at = fByteLeft;
1566  Int_t tobecopied = length - fByteLeft;
1567  PushBackSpare();
1568  while (tobecopied > 0) {
1569  // Pick-up next buffer (it may wait inside)
1570  if (PickUpReady() != 0)
1571  return -1;
1572  // Copy the fresh meat
1573  Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1574  memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
1575  fByteCur = ncpy;
1576  if ((fByteLeft -= ncpy) <= 0)
1577  // All used: give back
1578  PushBackSpare();
1579  // Recalculate
1580  tobecopied -= ncpy;
1581  at += ncpy;
1582  }
1583  }
1584 
1585  // Update counters
1586  fBytesRecv += length;
1587  fgBytesRecv += length;
1588 
1589  // Success: update usage timestamp
1590  Touch();
1591 
1592  return length;
1593 }
1594 
1595 ////////////////////////////////////////////////////////////////////////////////
1596 /// Send urgent message (interrupt) to remote server
1597 /// Returns 0 or -1 in case of error.
1598 
1600 {
1602 
1603  // Prepare request
1604  XPClientRequest Request;
1605  memset(&Request, 0, sizeof(Request) );
1606  fConn->SetSID(Request.header.streamid);
1607  if (type == (Int_t) TProof::kShutdownInterrupt)
1608  Request.interrupt.requestid = kXP_destroy;
1609  else
1610  Request.interrupt.requestid = kXP_interrupt;
1611  Request.interrupt.sid = fSessionID;
1612  Request.interrupt.type = type; // type of interrupt (see TProof::EUrgent)
1613  Request.interrupt.dlen = 0;
1614 
1615  // Send request
1616  XrdClientMessage *xrsp =
1617  fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
1618  if (xrsp) {
1619  // Success: update usage timestamp
1620  Touch();
1621  // Cleanup
1622  SafeDelete(xrsp);
1623  // ok
1624  return 0;
1625  } else {
1626  // Print error msg, if any
1627  if (fConn->GetLastErr())
1628  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1629  }
1630 
1631  // Failure notification (avoid using the handler: we may be exiting)
1632  Error("SendInterrupt", "problems sending interrupt to server");
1633  return -1;
1634 }
1635 
1636 ////////////////////////////////////////////////////////////////////////////////
1637 /// Send a TMessage object. Returns the number of bytes in the TMessage
1638 /// that were sent and -1 in case of error.
1639 
1641 {
1643 
1644  if (mess.IsReading()) {
1645  Error("Send", "cannot send a message used for reading");
1646  return -1;
1647  }
1648 
1649  // send streamer infos in case schema evolution is enabled in the TMessage
1650  SendStreamerInfos(mess);
1651 
1652  // send the process id's so TRefs work
1653  SendProcessIDs(mess);
1654 
1655  mess.SetLength(); //write length in first word of buffer
1656 
1657  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
1658  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
1659 
1660  if (mess.GetCompressionLevel() > 0)
1661  const_cast<TMessage&>(mess).Compress();
1662 
1663  char *mbuf = mess.Buffer();
1664  Int_t mlen = mess.Length();
1665  if (mess.CompBuffer()) {
1666  mbuf = mess.CompBuffer();
1667  mlen = mess.CompLength();
1668  }
1669 
1670  // Parse message type to choose sending options
1671  kXR_int32 fSendOptDefault = fSendOpt;
1672  switch (mess.What()) {
1673  case kPROOF_PROCESS:
1675  break;
1676  case kPROOF_PROGRESS:
1677  case kPROOF_FEEDBACK:
1679  break;
1680  case kPROOF_QUERYSUBMITTED:
1683  break;
1684  case kPROOF_STARTPROCESS:
1687  break;
1688  case kPROOF_STOPPROCESS:
1690  break;
1691  case kPROOF_SETIDLE:
1694  break;
1695  case kPROOF_LOGFILE:
1696  case kPROOF_LOGDONE:
1697  if (GetClientIDSize() <= 1)
1698  fSendOpt |= kXPD_logmsg;
1699  break;
1700  default:
1701  break;
1702  }
1703 
1704  if (gDebug > 2)
1705  Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
1706 
1707  Int_t nsent = SendRaw(mbuf, mlen);
1708  fSendOpt = fSendOptDefault;
1709 
1710  if (nsent <= 0)
1711  return nsent;
1712 
1713  fBytesSent += nsent;
1714  fgBytesSent += nsent;
1715 
1716  return nsent - sizeof(UInt_t); //length - length header
1717 }
1718 
1719 ////////////////////////////////////////////////////////////////////////////////
1720 /// Receive a TMessage object. The user must delete the TMessage object.
1721 /// Returns length of message in bytes (can be 0 if other side of connection
1722 /// is closed) or -1 in case of error or -5 if pipe broken (connection invalid).
1723 /// In those case mess == 0.
1724 
1726 {
1728 
1729  if (!IsValid()) {
1730  mess = 0;
1731  return -5;
1732  }
1733 
1734 oncemore:
1735  Int_t n;
1736  UInt_t len;
1737  if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
1738  mess = 0;
1739  return n;
1740  }
1741  len = net2host(len); //from network to host byte order
1742 
1743  char *buf = new char[len+sizeof(UInt_t)];
1744  if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
1745  delete [] buf;
1746  mess = 0;
1747  return n;
1748  }
1749 
1750  fBytesRecv += n + sizeof(UInt_t);
1751  fgBytesRecv += n + sizeof(UInt_t);
1752 
1753  mess = new TMessage(buf, len+sizeof(UInt_t));
1754 
1755  // receive any streamer infos
1756  if (RecvStreamerInfos(mess))
1757  goto oncemore;
1758 
1759  // receive any process ids
1760  if (RecvProcessIDs(mess))
1761  goto oncemore;
1762 
1763  if (mess->What() & kMESS_ACK) {
1764  // Acknowledgement embedded: ignore ...
1765  mess->SetWhat(mess->What() & ~kMESS_ACK);
1766  }
1767 
1768  return n;
1769 }
1770 
1771 ////////////////////////////////////////////////////////////////////////////////
1772 /// Send message to intermediate coordinator.
1773 /// If any output is due, this is returned as an obj string to be
1774 /// deleted by the caller
1775 
1776 TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
1777  Long64_t l64, Int_t int3, const char *)
1778 {
1779  TObjString *sout = 0;
1780 
1781  // We fill the header struct containing the request
1782  XPClientRequest reqhdr;
1783  const void *buf = 0;
1784  char *bout = 0;
1785  char **vout = 0;
1786  memset(&reqhdr, 0, sizeof(reqhdr));
1787  fConn->SetSID(reqhdr.header.streamid);
1788  reqhdr.header.requestid = kXP_admin;
1789  reqhdr.proof.int1 = kind;
1790  reqhdr.proof.int2 = int2;
1791  switch (kind) {
1792  case kQueryMssUrl:
1793  case kQueryROOTVersions:
1794  case kQuerySessions:
1795  case kQueryWorkers:
1796  reqhdr.proof.sid = 0;
1797  reqhdr.header.dlen = 0;
1798  vout = (char **)&bout;
1799  break;
1800  case kCleanupSessions:
1801  reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
1802  : (kXR_int32) kXPD_TopMaster;
1803  reqhdr.proof.int3 = int2;
1804  reqhdr.proof.sid = fSessionID;
1805  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1806  buf = (msg) ? (const void *)msg : buf;
1807  break;
1808  case kCpFile:
1809  case kGetFile:
1810  case kPutFile:
1811  case kExec:
1812  reqhdr.proof.sid = fSessionID;
1813  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1814  buf = (msg) ? (const void *)msg : buf;
1815  vout = (char **)&bout;
1816  break;
1817  case kQueryLogPaths:
1818  vout = (char **)&bout;
1819  reqhdr.proof.int3 = int3;
1820  case kReleaseWorker:
1821  case kSendMsgToUser:
1822  case kGroupProperties:
1823  case kSessionTag:
1824  case kSessionAlias:
1825  reqhdr.proof.sid = fSessionID;
1826  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1827  buf = (msg) ? (const void *)msg : buf;
1828  break;
1829  case kROOTVersion:
1830  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1831  buf = (msg) ? (const void *)msg : buf;
1832  break;
1833  case kGetWorkers:
1834  reqhdr.proof.sid = fSessionID;
1835  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1836  if (msg)
1837  buf = (const void *)msg;
1838  vout = (char **)&bout;
1839  break;
1840  case kReadBuffer:
1841  reqhdr.header.requestid = kXP_readbuf;
1842  reqhdr.readbuf.ofs = l64;
1843  reqhdr.readbuf.len = int2;
1844  if (int3 > 0 && fXrdProofdVersion < 1003) {
1845  Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
1846  " grep functionality not supported", fXrdProofdVersion);
1847  return sout;
1848  }
1849  reqhdr.readbuf.int1 = int3;
1850  if (!msg || strlen(msg) <= 0) {
1851  Info("SendCoordinator", "kReadBuffer: file path undefined");
1852  return sout;
1853  }
1854  reqhdr.header.dlen = strlen(msg);
1855  buf = (const void *)msg;
1856  vout = (char **)&bout;
1857  break;
1858  default:
1859  Info("SendCoordinator", "unknown message kind: %d", kind);
1860  return sout;
1861  }
1862 
1863  // server response header
1864  Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
1865  XrdClientMessage *xrsp =
1866  fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
1867 
1868  // If positive answer
1869  if (xrsp) {
1870  // Check if we need to create an output string
1871  if (bout && (xrsp->DataLen() > 0))
1872  sout = new TObjString(TString(bout,xrsp->DataLen()));
1873  if (bout)
1874  free(bout);
1875  // Success: update usage timestamp
1876  Touch();
1877  SafeDelete(xrsp);
1878  } else {
1879  // Print error msg, if any
1880  if (fConn->GetLastErr())
1881  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1882  }
1883 
1884  // Failure notification (avoid using the handler: we may be exiting)
1885  return sout;
1886 }
1887 
1888 ////////////////////////////////////////////////////////////////////////////////
1889 /// Send urgent message to counterpart; 'type' specifies the type of
1890 /// the message (see TXSocket::EUrgentMsgType), and 'int1', 'int2'
1891 /// two containers for additional information.
1892 
1894 {
1896 
1897  // Prepare request
1898  XPClientRequest Request;
1899  memset(&Request, 0, sizeof(Request) );
1900  fConn->SetSID(Request.header.streamid);
1901  Request.proof.requestid = kXP_urgent;
1902  Request.proof.sid = fSessionID;
1903  Request.proof.int1 = type; // type of urgent msg (see TXSocket::EUrgentMsgType)
1904  Request.proof.int2 = int1; // 4-byte container info 1
1905  Request.proof.int3 = int2; // 4-byte container info 2
1906  Request.proof.dlen = 0;
1907 
1908  // Send request
1909  XrdClientMessage *xrsp =
1910  fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
1911  if (xrsp) {
1912  // Success: update usage timestamp
1913  Touch();
1914  // Cleanup
1915  SafeDelete(xrsp);
1916  } else {
1917  // Print error msg, if any
1918  if (fConn->GetLastErr())
1919  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1920  }
1921 
1922  // Done
1923  return;
1924 }
1925 
1926 ////////////////////////////////////////////////////////////////////////////////
1927 /// Init environment variables for XrdClient
1928 
1930 {
1931  // Set debug level
1932  Int_t deb = gEnv->GetValue("XProof.Debug", -1);
1933  EnvPutInt(NAME_DEBUG, deb);
1934  if (deb > 0) {
1935  XrdProofdTrace->What |= TRACE_REQ;
1936  if (deb > 1) {
1937  XrdProofdTrace->What |= TRACE_DBG;
1938  if (deb > 2)
1939  XrdProofdTrace->What |= TRACE_ALL;
1940  }
1941  }
1942  const char *cenv = 0;
1943 
1944  // List of domains where connection is allowed
1945  TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
1946  if (allowCO.Length() > 0)
1947  EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
1948 
1949  // List of domains where connection is denied
1950  TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
1951  if (denyCO.Length() > 0)
1952  EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
1953 
1954  // Max number of retries on first connect and related timeout
1956  Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
1957  EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
1958  Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
1959  EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
1960 
1961  // Reconnect Wait
1962  Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
1963  DFLT_RECONNECTWAIT);
1964  if (recoTO == DFLT_RECONNECTWAIT) {
1965  // Check also the old variable name
1966  recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
1967  DFLT_RECONNECTWAIT);
1968  }
1969  EnvPutInt(NAME_RECONNECTWAIT, recoTO);
1970 
1971  // Request Timeout
1972  Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
1973  EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
1974 
1975  // No automatic proofd backward-compatibility
1976  EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
1977 
1978  // Dynamic forwarding (SOCKS4)
1979  TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
1980  Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
1981  if (socks4Port > 0) {
1982  if (socks4Host.IsNull())
1983  // Default
1984  socks4Host = "127.0.0.1";
1985  EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
1986  EnvPutInt(NAME_SOCKS4PORT, socks4Port);
1987  }
1988 
1989  // For password-based authentication
1990  TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
1991  if (autolog.Length() > 0 &&
1992  (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
1993  gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
1994 
1995  // For password-based authentication
1996  TString netrc;
1997  netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
1998  gSystem->Setenv("XrdSecNETRC", netrc.Data());
1999 
2000  TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
2001  if (alogfile.Length() > 0)
2002  gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
2003 
2004  TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
2005  if (verisrv.Length() > 0 &&
2006  (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2007  gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
2008 
2009  TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
2010  if (srvpuk.Length() > 0)
2011  gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
2012 
2013  // For GSI authentication
2014  TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
2015  if (cadir.Length() > 0)
2016  gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
2017 
2018  TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
2019  if (crldir.Length() > 0)
2020  gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
2021 
2022  TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
2023  if (crlext.Length() > 0)
2024  gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
2025 
2026  TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
2027  if (ucert.Length() > 0)
2028  gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
2029 
2030  TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
2031  if (ukey.Length() > 0)
2032  gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
2033 
2034  TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
2035  if (upxy.Length() > 0)
2036  gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
2037 
2038  TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
2039  if (valid.Length() > 0)
2040  gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
2041 
2042  TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
2043  if (deplen.Length() > 0 &&
2044  (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2045  gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
2046 
2047  TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
2048  if (pxybits.Length() > 0)
2049  gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
2050 
2051  TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
2052  if (crlcheck.Length() > 0 &&
2053  (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2054  gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
2055 
2056  TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
2057  if (delegpxy.Length() > 0 &&
2058  (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2059  gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
2060 
2061  TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
2062  if (signpxy.Length() > 0 &&
2063  (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2064  gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
2065 
2066  // Print the tag, if required (only once)
2067  if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
2068  ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
2069  gROOT->GetVersion());
2070 
2071  // Only once
2072  fgInitDone = kTRUE;
2073 }
2074 
2075 ////////////////////////////////////////////////////////////////////////////////
2076 /// Try reconnection after failure
2077 
2079 {
2080  if (gDebug > 0) {
2081  Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2082  this, fConn, (fConn ? fConn->IsValid() : 0),
2083  fUrl.Data(), fConn->GetLogConnID());
2084  }
2085 
2086  Int_t tryreconnect = gEnv->GetValue("TXSocket.Reconnect", 0);
2087  if (tryreconnect == 0 || fXrdProofdVersion < 1005) {
2088  if (tryreconnect == 0)
2089  Info("Reconnect","%p: reconnection attempts explicitly disabled!", this);
2090  else
2091  Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
2092  this, fXrdProofdVersion);
2093  return -1;
2094  }
2095 
2096  if (fConn) {
2097  if (gDebug > 0)
2098  Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
2099  fConn->ReConnect();
2100  if (fConn->IsValid()) {
2101  // Create new proofserv if not client manager or administrator or internal mode
2102  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
2103  // We attach or create
2104  if (!Create(kTRUE)) {
2105  // Failure
2106  Error("TXSocket", "create or attach failed (%s)",
2107  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
2108  Close();
2109  return -1;
2110  }
2111  }
2112  }
2113  }
2114 
2115  if (gDebug > 0) {
2116  if (fConn) {
2117  Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
2118  (fConn->IsValid() ? "succeeded!" : "failed"),
2119  fConn->GetLogConnID() );
2120  } else {
2121  Info("Reconnect", "%p (c:0x0): attempt failed", this);
2122  }
2123  }
2124 
2125  // Done
2126  return ((fConn && fConn->IsValid()) ? 0 : -1);
2127 }
2128 
2129 ////////////////////////////////////////////////////////////////////////////////
2130 ///constructor
2131 
2133 {
2134  fBuf = fMem = bp;
2135  fSiz = fLen = sz;
2136  fOwn = own;
2137  fCid = -1;
2138  fgBuffMem += sz;
2139 }
2140 
2141 ////////////////////////////////////////////////////////////////////////////////
2142 ///destructor
2143 
2145 {
2146  if (fOwn && fMem) {
2147  free(fMem);
2148  fgBuffMem -= fSiz;
2149  }
2150 }
2151 
2152 ////////////////////////////////////////////////////////////////////////////////
2153 ///resize socket buffer
2154 
2156 {
2157  if (sz > fSiz) {
2158  if ((fMem = (Char_t *)realloc(fMem, sz))) {
2159  fgBuffMem += (sz - fSiz);
2160  fBuf = fMem;
2161  fSiz = sz;
2162  fLen = 0;
2163  }
2164  }
2165 }
2166 
2167 //_____________________________________________________________________________
2168 //
2169 // TXSockBuf static methods
2170 //
2171 
2172 ////////////////////////////////////////////////////////////////////////////////
2173 /// Return the currently allocated memory
2174 
2176 {
2177  return fgBuffMem;
2178 }
2179 
2180 ////////////////////////////////////////////////////////////////////////////////
2181 /// Return the max allocated memory allowed
2182 
2184 {
2185  return fgMemMax;
2186 }
2187 
2188 ////////////////////////////////////////////////////////////////////////////////
2189 /// Return the max allocated memory allowed
2190 
2192 {
2193  fgMemMax = memmax > 0 ? memmax : fgMemMax;
2194 }
2195 
2196 //_____________________________________________________________________________
2197 //
2198 // TXSockPipe
2199 //
2200 
2201 ////////////////////////////////////////////////////////////////////////////////
2202 /// Constructor
2203 
2204 TXSockPipe::TXSockPipe(const char *loc) : fLoc(loc)
2205 {
2206  // Create the pipe
2207  if (pipe(fPipe) != 0) {
2208  Printf("TXSockPipe: problem initializing pipe for socket inputs");
2209  fPipe[0] = -1;
2210  fPipe[1] = -1;
2211  return;
2212  }
2213 }
2214 
2215 ////////////////////////////////////////////////////////////////////////////////
2216 /// Destructor
2217 
2219 {
2220  if (fPipe[0] >= 0) close(fPipe[0]);
2221  if (fPipe[1] >= 0) close(fPipe[1]);
2222 }
2223 
2224 
2225 ////////////////////////////////////////////////////////////////////////////////
2226 /// Write a byte to the global pipe to signal new availibility of
2227 /// new messages
2228 
2230 {
2231  if (!IsValid() || !s) return -1;
2232 
2233  // This must be an atomic action
2234  Int_t sz = 0;
2235  { std::lock_guard<std::recursive_mutex> lock(fMutex);
2236  // Add this one
2237  fReadySock.Add(s);
2238 
2239  // Only one char
2240  Char_t c = 1;
2241  if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
2242  Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
2243  return -1;
2244  }
2245  if (gDebug > 2) sz = fReadySock.GetSize();
2246  }
2247 
2248  if (gDebug > 2)
2249  Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2250  fLoc.Data(), s, sz, fPipe[1]);
2251  // We are done
2252  return 0;
2253 }
2254 
2255 ////////////////////////////////////////////////////////////////////////////////
2256 /// Read a byte to the global pipe to synchronize message pickup
2257 
2259 {
2260  // Pipe must have been created
2261  if (!IsValid() || !s) return -1;
2262 
2263  // Only one char
2264  Int_t sz = 0;
2265  Char_t c = 0;
2266  { std::lock_guard<std::recursive_mutex> lock(fMutex);
2267  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
2268  Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
2269  return -1;
2270  }
2271  // Remove this one
2272  fReadySock.Remove(s);
2273 
2274  if (gDebug > 2) sz = fReadySock.GetSize();
2275  }
2276 
2277  if (gDebug > 2)
2278  Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2279  fLoc.Data(), s, sz, fPipe[0]);
2280 
2281  // We are done
2282  return 0;
2283 }
2284 
2285 ////////////////////////////////////////////////////////////////////////////////
2286 /// Remove any reference to socket 's' from the global pipe and
2287 /// ready-socket queue
2288 
2290 {
2291  // Pipe must have been created
2292  if (!IsValid() || !s) return -1;
2293 
2294  TObject *o = 0;
2295  // This must be an atomic action
2296  { std::lock_guard<std::recursive_mutex> lock(fMutex);
2297  o = fReadySock.FindObject(s);
2298 
2299  while (o) {
2300  // Remove from the list
2301  fReadySock.Remove(s);
2302  o = fReadySock.FindObject(s);
2303  // Remove one notification from the pipe
2304  Char_t c = 0;
2305  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
2306  Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
2307  }
2308  }
2309  // Flush also the socket
2310  ((TXSocket *)s)->Flush();
2311 
2312  // Notify
2313  if (gDebug > 0)
2314  Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
2315 
2316  // We are done
2317  return 0;
2318 }
2319 
2320 ////////////////////////////////////////////////////////////////////////////////
2321 /// Dump content of the ready socket list
2322 
2324 {
2325  std::lock_guard<std::recursive_mutex> lock(fMutex);
2326 
2327  TString buf = Form("%d |", fReadySock.GetSize());
2328  TIter nxs(&fReadySock);
2329  TObject *o = 0;
2330  while ((o = nxs()))
2331  buf += Form(" %p",o);
2332  Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
2333 }
2334 
2335 ////////////////////////////////////////////////////////////////////////////////
2336 /// Return last ready socket
2337 
2339 {
2340  std::lock_guard<std::recursive_mutex> lock(fMutex);
2341 
2342  return (TXSocket *) fReadySock.Last();
2343 }
const char * GetHost() const
Definition: TUrl.h:76
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
double read(const std::string &file_name)
reading
Int_t fSocket
Definition: TSocket.h:100
TXSocket(const char *url, Char_t mode= 'M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Definition: TXSocket.cxx:127
Int_t fCid
Definition: TXSocket.h:262
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define kXPD_querynum
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:242
Bool_t IsValid() const
Definition: TXSocket.h:176
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:711
Int_t fTcpWindowSize
Definition: TSocket.h:101
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition: TSocket.cxx:932
virtual void SetClientID(Int_t)
Definition: TXSocket.h:179
tuple buffer
Definition: tree.py:99
TSemaphore fAsynProc
Definition: TXSocket.h:106
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
long long Long64_t
Definition: RtypesCore.h:69
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Definition: TXSocket.cxx:1179
kXR_unt16 fStreamid
Definition: XrdProofConn.h:73
#define kXPD_TopMaster
TSemaphore fASem
Definition: TXSocket.h:98
void SetLoc(const char *loc="")
Definition: TXSocket.h:298
Bool_t IsReading() const
Definition: TBuffer.h:83
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Int_t fPipe[2]
Definition: TXSocket.h:302
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
Bool_t IsValid() const
Definition: TXSocket.h:288
Ssiz_t Length() const
Definition: TString.h:390
Collectable string class.
Definition: TObjString.h:32
virtual void Close(Option_t *opt="")
Close connection.
Definition: TXSocket.cxx:311
return c
const char Option_t
Definition: RtypesCore.h:62
Int_t fLogLevel
Definition: TXSocket.h:89
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition: TSocket.cxx:979
std::recursive_mutex fMutex
Definition: TXSocket.h:301
static Long64_t BuffMem()
Return the currently allocated memory.
Definition: TXSocket.cxx:2175
virtual Int_t GetClientID() const
Definition: TXSocket.h:168
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Definition: TUrl.h:41
int GetLogConnID() const
Definition: XrdProofConn.h:140
XrdProofConn * fConn
Definition: TXSocket.h:95
char fMode
Definition: TXSocket.h:82
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
Definition: TSemaphore.cxx:87
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition: TXSocket.cxx:368
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
struct XPClientInterruptRequest interrupt
char * CompBuffer() const
Definition: TMessage.h:94
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
~TXSockBuf()
destructor
Definition: TXSocket.cxx:2144
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
Definition: TSystem.cxx:873
std::recursive_mutex fIMtx
Definition: TXSocket.h:109
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
Bool_t Notify()
Definition: TTimer.cxx:65
#define gROOT
Definition: TROOT.h:344
XrdOucTrace * XrdProofdTrace
Definition: TXSocket.cxx:55
Basic string class.
Definition: TString.h:137
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
Definition: TXHandler.cxx:37
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition: TSocket.cxx:650
Int_t CompLength() const
Definition: TMessage.h:95
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TXSocket()
Destructor.
Definition: TXSocket.cxx:231
const Bool_t kFALSE
Definition: Rtypes.h:92
Int_t GetCompressionLevel() const
Definition: TSocket.h:211
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
int GetServType() const
Definition: XrdProofConn.h:143
void SetInterrupt(Bool_t i=kTRUE)
Definition: TXSocket.h:215
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
TInetAddress fAddress
Definition: TSocket.h:90
#define kXPD_async
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Definition: TXSocket.cxx:256
static std::mutex fgSMtx
Definition: TXSocket.h:129
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
TXSocket * GetLastReady()
Return last ready socket.
Definition: TXSocket.cxx:2338
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Definition: TXSocket.cxx:1308
TObject Int_t at
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2229
ESendRecvOptions
Definition: TSocket.h:65
static Long64_t fgBuffMem
Definition: TXSocket.h:275
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
Definition: TXSocket.cxx:2258
const char * Data() const
Definition: TString.h:349
static Long64_t fgMemMax
Definition: TXSocket.h:276
struct ClientRequestHdr header
#define SafeDelete(p)
Definition: RConfig.h:436
UShort_t net2host(UShort_t x)
Definition: Bytes.h:579
Int_t fPort
Definition: TXSocket.h:87
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TXSocket.cxx:1542
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2321
static void InitEnvs()
Init environment variables for XrdClient.
Definition: TXSocket.cxx:1929
const char * ord
Definition: TXSlave.cxx:46
#define kXPD_setidle
#define kXPD_process
TString fUrl
Definition: TSocket.h:102
const char *const kPROOF_WorkerIdleTO
Definition: TProof.h:161
static std::list< TXSockBuf * > fgSQue
Definition: TXSocket.h:130
Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TXSocket.cxx:1640
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition: TXSocket.cxx:933
Bool_t fIForward
Definition: TXSocket.h:111
TXHandler * fHandler
Definition: TXSocket.h:93
const char * GetLastErr()
Definition: XrdProofConn.h:146
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1575
if(pyself &&pyself!=Py_None)
virtual Int_t GetClientIDSize() const
Definition: TXSocket.h:169
virtual Bool_t HandleInput(const void *in=0)
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1951
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Definition: TXSocket.cxx:1473
char * Buffer() const
Definition: TBuffer.h:93
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
Definition: TXSocket.cxx:920
XFontStruct * id
Definition: TGX11.cxx:108
#define kXPD_internal
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
TString fUser
Definition: TXSocket.h:85
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition: TSystem.cxx:2193
int fRemoteProtocol
Definition: XrdProofConn.h:74
struct XPClientSendRcvRequest sendrcv
int GetOpenError() const
Definition: XrdProofConn.h:142
static const char * what
Definition: stlLoader.cc:6
kXR_unt16 HeaderSID()
Bool_t fOwn
Definition: TXSocket.h:261
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Definition: TXSocket.cxx:1014
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Definition: TSemaphore.cxx:35
Int_t fByteCur
Definition: TXSocket.h:103
Int_t Flush()
Flush the asynchronous queue.
Definition: TXSocket.cxx:965
Bool_t IsInterrupt()
Definition: TXSocket.h:220
Int_t GetPort() const
Definition: TUrl.h:87
TString fHost
Definition: TXSocket.h:86
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
Definition: TXSocket.cxx:2132
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition: TSystem.cxx:1559
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
void SetAWait(Bool_t w=kTRUE)
Definition: TXSocket.h:223
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
Handler of asynchronous events for XProofD sockets.
Definition: TXHandler.h:30
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:675
Int_t fXrdProofdVersion
Definition: TXSocket.h:121
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
Definition: TXSocket.cxx:848
High level handler of connections to XProofD.
Definition: TXSocket.h:72
#define XrdSysError
Definition: XpdSysError.h:8
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2308
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Definition: TXSocket.cxx:1236
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
UInt_t fBytesSent
Definition: TSocket.h:92
char * Form(const char *fmt,...)
TString fBuffer
Definition: TXSocket.h:91
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
std::recursive_mutex fAMtx
Definition: TXSocket.h:99
#define TRACE_REQ
std::list< TXSockBuf * > fAQue
Definition: TXSocket.h:101
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
#define TRACE_ALL
Int_t Post()
Increment the value of the semaphore.
Definition: TSemaphore.cxx:103
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
Definition: TXSocket.cxx:2289
Bool_t IsNull() const
Definition: TString.h:387
static TString fgLoc
Definition: TXSocket.h:125
Char_t * fBuf
Definition: TXSocket.h:260
void Touch()
Definition: TSocket.h:187
TString fLoc
Definition: TXSocket.h:303
XrdOucString fUser
Definition: XrdProofConn.h:79
TList fReadySock
Definition: TXSocket.h:304
Int_t fSiz
Definition: TXSocket.h:258
#define kXPD_AnyServer
tuple free
Definition: fildir.py:30
#define kXPD_logmsg
Int_t GetCompressionLevel() const
Definition: TMessage.h:112
#define Printf
Definition: TGeoToOCC.h:18
XrdClientUrlInfo fUrl
Definition: XrdProofConn.h:102
Int_t fLen
Definition: TXSocket.h:259
virtual ~TXSockPipe()
Destructor.
Definition: TXSocket.cxx:2218
virtual Int_t Reconnect()
Try reconnection after failure.
Definition: TXSocket.cxx:2078
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition: TXSocket.cxx:268
#define TRACE_DBG
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:581
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
int Ssiz_t
Definition: RtypesCore.h:63
TXSockBuf * fBufCur
Definition: TXSocket.h:104
Short_t fSessionID
Definition: TXSocket.h:84
virtual Int_t GetSize() const
Definition: TCollection.h:95
XrdOucString fHost
Definition: XrdProofConn.h:80
Bool_t IsValid() const
Definition: TXSocket.h:316
bool IsValid() const
Test validity of this connection.
Int_t fCompress
Definition: TSocket.h:93
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
unsigned long long ULong64_t
Definition: RtypesCore.h:70
#define kXPD_fb_prog
static XrdSysLogger eLogger
Definition: TXSocket.cxx:56
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
Definition: XrdProofConn.h:93
UInt_t What() const
Definition: TMessage.h:80
Bool_t fDontTimeout
Definition: TXSocket.h:117
TXSockPipe(const char *loc="")
Constructor.
Definition: TXSocket.cxx:2204
XrdOucString fLastErrMsg
Definition: XrdProofConn.h:82
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TXSocket.cxx:1725
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
Char_t * fMem
Definition: TXSocket.h:274
kXR_int32 fSendOpt
Definition: TXSocket.h:83
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
EServiceType fServType
Definition: TSocket.h:99
void DumpReadySock()
Dump content of the ready socket list.
Definition: TXSocket.cxx:2323
TObject * fReference
Definition: TXSocket.h:92
Mother of all ROOT objects.
Definition: TObject.h:58
static ULong64_t fgBytesSent
Definition: TSocket.h:110
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition: TString.cxx:1793
char Char_t
Definition: RtypesCore.h:29
void PushBackSpare()
Release read buffer giving back to the spare list.
Definition: TXSocket.cxx:1521
kXR_int32 fILev
Definition: TXSocket.h:110
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition: TSocket.cxx:685
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
Definition: TError.cxx:202
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Int_t Length() const
Definition: TBuffer.h:96
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Definition: TXSocket.cxx:1382
static Bool_t fgInitDone
Definition: TXSocket.h:126
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
Definition: TXSocket.cxx:1776
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
Definition: TXSocket.cxx:1893
static TXSockPipe fgPipe
Definition: TXSocket.h:124
static ULong64_t fgBytesRecv
Definition: TSocket.h:109
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
Int_t fRemoteProtocol
Definition: TSocket.h:95
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:280
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2191
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
Definition: TXSocket.cxx:1599
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition: TXSocket.cxx:71
Int_t fPid
Definition: TXSocket.h:114
bool MatchStreamid(short sid)
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:582
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t fByteLeft
Definition: TXSocket.h:102
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Definition: TXSocket.cxx:905
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
tuple all
Definition: na49view.py:13
const Int_t n
Definition: legend1.C:16
void Resize(Int_t sz)
resize socket buffer
Definition: TXSocket.cxx:2155
void CtrlC()
Interrupt the remote protocol instance.
Definition: TXSocket.cxx:1346
UInt_t fBytesRecv
Definition: TSocket.h:91
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2183
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:188
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904
static XrdSysError eDest(0,"Proofx")