Logo ROOT   6.10/09
Reference Guide
TXSocket.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TXSocket
13 \ingroup proofx
14 
15 High level handler of connections to XProofD.
16 See TSocket for details.
17 
18 */
19 
20 #include "MessageTypes.h"
21 #include "TEnv.h"
22 #include "TError.h"
23 #include "TException.h"
24 #include "TMonitor.h"
25 #include "TObjString.h"
26 #include "TProof.h"
27 #include "TSlave.h"
28 #include "TRegexp.h"
29 #include "TROOT.h"
30 #include "TUrl.h"
31 #include "TXHandler.h"
32 #include "TXSocket.h"
33 #include "XProofProtocol.h"
34 
35 #include "XrdProofConn.h"
36 
42 
43 #ifndef WIN32
44 #include <sys/socket.h>
45 #else
46 #include <Winsock2.h>
47 #endif
48 
49 
50 #include "XpdSysError.h"
51 #include "XpdSysLogger.h"
52 
53 // ---- Tracing utils ----------------------------------------------------------
54 #include "XrdProofdTrace.h"
55 XrdOucTrace *XrdProofdTrace = 0;
57 static XrdSysError eDest(0, "Proofx");
58 
59 #ifdef WIN32
62 #endif
63 
64 //______________________________________________________________________________
65 
66 //---- error handling ----------------------------------------------------------
67 
68 ////////////////////////////////////////////////////////////////////////////////
69 /// Interface to ErrorHandler (protected).
70 
71 void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
72 {
73  ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
74 }
75 
76 //----- Ping handler -----------------------------------------------------------
77 ////////////////////////////////////////////////////////////////////////////////
78 
79 class TXSocketPingHandler : public TFileHandler {
81 public:
82  TXSocketPingHandler(TXSocket *s, Int_t fd)
83  : TFileHandler(fd, 1) { fSocket = s; }
84  Bool_t Notify();
85  Bool_t ReadNotify() { return Notify(); }
86 };
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 /// Ping the socket
90 
91 Bool_t TXSocketPingHandler::Notify()
92 {
93  fSocket->Ping("ping handler");
94 
95  return kTRUE;
96 }
97 
98 // Env variables init flag
100 
101 // Static variables for input notification
102 TXSockPipe TXSocket::fgPipe; // Pipe for input monitoring
103 TString TXSocket::fgLoc = "undef"; // Location string
104 
105 // Static buffer manager
106 std::mutex TXSocket::fgSMtx; // To protect spare list
107 std::list<TXSockBuf *> TXSocket::fgSQue; // list of spare buffers
108 Long64_t TXSockBuf::fgBuffMem = 0; // Total allocated memory
109 Long64_t TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 MB]
110 
111 ////////////////////////////////////////////////////////////////////////////////
112 /// Constructor
113 /// Open the connection to a remote XrdProofd instance and start a PROOF
114 /// session.
115 /// The mode 'm' indicates the role of this connection:
116 /// 'a' Administrator; used by an XPD to contact the head XPD
117 /// 'i' Internal; used by a TXProofServ to call back its creator
118 /// (see XrdProofUnixConn)
119 /// 'C' PROOF manager: open connection only (do not start a session)
120 /// 'M' Client creating a top master
121 /// 'A' Client attaching to top master
122 /// 'm' Top master creating a submaster
123 /// 's' Master creating a slave
124 /// The buffer 'logbuf' is a null terminated string to be sent over at
125 /// login.
126 
127 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
128  const char *logbuf, Int_t loglevel, TXHandler *handler)
129  : TSocket(), fMode(m), fLogLevel(loglevel),
130  fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
132 {
133  fUrl = url;
134  // Enable tracing in the XrdProof client. if not done already
135  eDest.logger(&eLogger);
136  if (!XrdProofdTrace)
137  XrdProofdTrace = new XrdOucTrace(&eDest);
138 
139  // Init envs the first time
140  if (!fgInitDone)
141  InitEnvs();
142 
143  // Async queue related stuff
144  fAQue.clear();
145 
146  // Interrupts queue related stuff
147  fILev = -1;
148  fIForward = kFALSE;
149 
150  // Init some variables
151  fByteLeft = 0;
152  fByteCur = 0;
153  fBufCur = 0;
154  fServType = kPROOFD; // for consistency
155  fTcpWindowSize = -1;
156  fRemoteProtocol = -1;
157  // By default forward directly to end-point
158  fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
159  fSessionID = (fMode == 'C') ? -1 : psid;
160  fSocket = -1;
161 
162  // This is used by external code to create a link between this object
163  // and another one
164  fReference = 0;
165 
166  // The global pipe
167  if (!fgPipe.IsValid()) {
168  Error("TXSocket", "internal pipe is invalid");
169  return;
170  }
171 
172  // Some initial values
173  TUrl u(url);
175  u.SetProtocol("proof", kTRUE);
176  fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
177 
178  // Set the asynchronous handler
179  fHandler = handler;
180 
181  if (url) {
182 
183  // Create connection (for managers the type of the connection is the same
184  // as for top masters)
185  char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
186  fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
187  if (!fConn || !(fConn->IsValid())) {
189  if (gDebug > 0)
190  Error("TXSocket", "fatal error occurred while opening a connection"
191  " to server [%s]: %s", url, fConn->GetLastErr());
192  return;
193  }
194 
195  // Fill some info
196  fUser = fConn->fUser.c_str();
197  fHost = fConn->fHost.c_str();
198  fPort = fConn->fPort;
199 
200  // Create new proofserv if not client manager or administrator or internal mode
201  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A'|| fMode == 'L') {
202  // We attach or create
203  if (!Create()) {
204  // Failure
205  Error("TXSocket", "create or attach failed (%s)",
206  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
207  Close();
208  return;
209  }
210  }
211 
212  // Fill some other info available if Create is successful
213  if (fMode == 'C') {
216  }
217 
218  // Also in the base class
219  fUrl = fConn->fUrl.GetUrl().c_str();
221  fAddress.fPort = fPort;
222 
223  // This is needed for the reader thread to signal an interrupt
224  fPid = gSystem->GetPid();
225  }
226 }
227 
228 ////////////////////////////////////////////////////////////////////////////////
229 /// Destructor
230 
232 {
233  // Disconnect from remote server (the connection manager is
234  // responsible of the underlying physical connection, so we do not
235  // force its closing)
236  Close();
237 }
238 
239 ////////////////////////////////////////////////////////////////////////////////
240 /// Set location string
241 
242 void TXSocket::SetLocation(const char *loc)
243 {
244  if (loc) {
245  fgLoc = loc;
246  fgPipe.SetLoc(loc);
247  } else {
248  fgLoc = "";
249  fgPipe.SetLoc("");
250  }
251 }
252 
253 ////////////////////////////////////////////////////////////////////////////////
254 /// Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
255 
257 {
258  if (id < 0 && fConn)
259  fConn->SetAsync(0);
260  fSessionID = id;
261 }
262 
263 ////////////////////////////////////////////////////////////////////////////////
264 /// Disconnect a session. Use opt= "S" or "s" to
265 /// shutdown remote session.
266 /// Default is opt = "".
267 
269 {
270  // Make sure we are connected
271  if (!IsValid()) {
272  if (gDebug > 0)
273  Info("DisconnectSession","not connected: nothing to do");
274  return;
275  }
276 
277  Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
278  Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
279 
280  if (id > -1 || all) {
281  // Prepare request
282  XPClientRequest Request;
283  memset(&Request, 0, sizeof(Request) );
284  fConn->SetSID(Request.header.streamid);
285  if (shutdown)
286  Request.proof.requestid = kXP_destroy;
287  else
288  Request.proof.requestid = kXP_detach;
289  Request.proof.sid = id;
290 
291  // Send request
292  XrdClientMessage *xrsp =
293  fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
294 
295  // Print error msg, if any
296  if (!xrsp && fConn->GetLastErr())
297  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
298 
299  // Cleanup
300  SafeDelete(xrsp);
301  }
302 }
303 
304 ////////////////////////////////////////////////////////////////////////////////
305 /// Close connection. Available options are (case insensitive)
306 /// 'P' force closing of the underlying physical connection
307 /// 'S' shutdown remote session, is any
308 /// A session ID can be given using #...# signature, e.g. "#1#".
309 /// Default is opt = "".
310 
312 {
313  Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
314  if (fAsynProc.Wait(to*1000) != 0)
315  Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
316 
317  // Remove any reference in the global pipe and ready-sock queue
318  TXSocket::fgPipe.Flush(this);
319 
320  // Make sure we have a connection
321  if (!fConn) {
322  if (gDebug > 0)
323  Info("Close","no connection: nothing to do");
324  fAsynProc.Post();
325  return;
326  }
327 
328  // Disconnect the asynchronous requests handler
329  fConn->SetAsync(0);
330 
331  // If we are connected we disconnect
332  if (IsValid()) {
333 
334  // Parse options
335  TString o(opt);
336  Int_t sessID = fSessionID;
337  if (o.Index("#") != kNPOS) {
338  o.Remove(0,o.Index("#")+1);
339  if (o.Index("#") != kNPOS) {
340  o.Remove(o.Index("#"));
341  sessID = o.IsDigit() ? o.Atoi() : sessID;
342  }
343  }
344 
345  if (sessID > -1) {
346  // Warn the remote session, if any (after destroy the session is gone)
347  DisconnectSession(sessID, opt);
348  } else {
349  // We are the manager: close underlying connection
350  fConn->Close(opt);
351  }
352  }
353 
354  // Delete the connection module
355  SafeDelete(fConn);
356 
357  // Post semaphore
358  fAsynProc.Post();
359 }
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 /// We are here if an unsolicited response comes from a logical conn
363 /// The response comes in the form of an XrdClientMessage *, that must NOT be
364 /// destroyed after processing. It is destroyed by the first sender.
365 /// Remember that we are in a separate thread, since unsolicited
366 /// responses are asynchronous by nature.
367 
370 {
372 
373  // If we are closing we will not do anything
375  if (!semg.IsValid()) {
376  Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
377  return kUNSOL_CONTINUE;
378  }
379 
380  if (!m) {
381  if (gDebug > 2)
382  Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
383  // Some one is perhaps interested in empty messages
384  return kUNSOL_CONTINUE;
385  } else {
386  if (gDebug > 2)
387  Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
388  this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
389  }
390 
391  // Error notification
392  if (m->IsError()) {
394  if (gDebug > 0)
395  Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
396  XHandleErr_t herr = {1, 0};
397  if (!fHandler || fHandler->HandleError((const void *)&herr)) {
398  if (gDebug > 0)
399  Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
400  // Avoid to contact the server any more
401  fSessionID = -1;
402  } else {
403  // Connection still usable: update usage timestamp
404  Touch();
405  }
406  } else {
407  // Time out
408  if (gDebug > 2)
409  Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
410  }
411  // Propagate the message to other possible handlers
412  return kUNSOL_CONTINUE;
413  }
414 
415  // From now on make sure is for us (but only if not during setup, i.e. fConn == 0; otherwise
416  // we may miss some important server message)
417  if (fConn && !m->MatchStreamid(fConn->fStreamid)) {
418  if (gDebug > 1)
419  Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
420  return kUNSOL_CONTINUE;
421  }
422 
423  // Local processing ...
424  Int_t len = 0;
425  if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
426  Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
428  return rc;
429  }
430 
431  // Activity on the line: update usage timestamp
432  Touch();
433 
434  // The first 4 bytes contain the action code
435  kXR_int32 acod = 0;
436  memcpy(&acod, m->GetData(), sizeof(kXR_int32));
437  if (acod > 10000)
438  Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
439  this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
440  //
441  // Update pointer to data
442  void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
443  len -= sizeof(kXR_int32);
444  if (gDebug > 1)
445  Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
446  this, acod, len, m->HeaderSID());
447 
448  if (gDebug > 3)
450 
451  // Case by case
452  kXR_int32 ilev = -1;
453  const char *lab = 0;
454 
455  switch (acod) {
456  case kXPD_ping:
457  //
458  // Special interrupt
459  ilev = TProof::kPing;
460  lab = "kXPD_ping";
461  case kXPD_interrupt:
462  //
463  // Interrupt
464  lab = !lab ? "kXPD_interrupt" : lab;
465  { std::lock_guard<std::recursive_mutex> lock(fIMtx);
466  if (acod == kXPD_interrupt) {
467  memcpy(&ilev, pdata, sizeof(kXR_int32));
468  ilev = net2host(ilev);
469  // Update pointer to data
470  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
471  len -= sizeof(kXR_int32);
472  }
473  // The next 4 bytes contain the forwarding option
474  kXR_int32 ifw = 0;
475  if (len > 0) {
476  memcpy(&ifw, pdata, sizeof(kXR_int32));
477  ifw = net2host(ifw);
478  if (gDebug > 1)
479  Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
480  }
481  //
482  // Save the interrupt
483  fILev = ilev;
484  fIForward = (ifw == 1) ? kTRUE : kFALSE;
485 
486  // Handle this input in this thread to avoid queuing on the
487  // main thread
488  XHandleIn_t hin = {acod, 0, 0, 0};
489  if (fHandler)
490  fHandler->HandleInput((const void *)&hin);
491  else
492  Error("ProcessUnsolicitedMsg","handler undefined");
493  }
494  break;
495  case kXPD_timer:
496  //
497  // Set shutdown timer
498  {
499  kXR_int32 opt = 1;
500  kXR_int32 delay = 0;
501  // The next 4 bytes contain the shutdown option
502  if (len > 0) {
503  memcpy(&opt, pdata, sizeof(kXR_int32));
504  opt = net2host(opt);
505  if (gDebug > 1)
506  Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
507  // Update pointer to data
508  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
509  len -= sizeof(kXR_int32);
510  }
511  // The next 4 bytes contain the delay
512  if (len > 0) {
513  memcpy(&delay, pdata, sizeof(kXR_int32));
514  delay = net2host(delay);
515  if (gDebug > 1)
516  Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
517  // Update pointer to data
518  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
519  len -= sizeof(kXR_int32);
520  }
521 
522  // Handle this input in this thread to avoid queuing on the
523  // main thread
524  XHandleIn_t hin = {acod, opt, delay, 0};
525  if (fHandler)
526  fHandler->HandleInput((const void *)&hin);
527  else
528  Error("ProcessUnsolicitedMsg","handler undefined");
529  }
530  break;
531  case kXPD_inflate:
532  //
533  // Set inflate factor
534  {
535  kXR_int32 inflate = 1000;
536  if (len > 0) {
537  memcpy(&inflate, pdata, sizeof(kXR_int32));
538  inflate = net2host(inflate);
539  if (gDebug > 1)
540  Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
541  // Update pointer to data
542  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
543  len -= sizeof(kXR_int32);
544  }
545  // Handle this input in this thread to avoid queuing on the
546  // main thread
547  XHandleIn_t hin = {acod, inflate, 0, 0};
548  if (fHandler)
549  fHandler->HandleInput((const void *)&hin);
550  else
551  Error("ProcessUnsolicitedMsg","handler undefined");
552  }
553  break;
554  case kXPD_priority:
555  //
556  // Broadcast group priority
557  {
558  kXR_int32 priority = -1;
559  if (len > 0) {
560  memcpy(&priority, pdata, sizeof(kXR_int32));
561  priority = net2host(priority);
562  if (gDebug > 1)
563  Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
564  // Update pointer to data
565  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
566  len -= sizeof(kXR_int32);
567  }
568  // Handle this input in this thread to avoid queuing on the
569  // main thread
570  XHandleIn_t hin = {acod, priority, 0, 0};
571  if (fHandler)
572  fHandler->HandleInput((const void *)&hin);
573  else
574  Error("ProcessUnsolicitedMsg","handler undefined");
575  }
576  break;
577  case kXPD_flush:
578  //
579  // Flush request
580  {
581  // Handle this input in this thread to avoid queuing on the
582  // main thread
583  XHandleIn_t hin = {acod, 0, 0, 0};
584  if (fHandler)
585  fHandler->HandleInput((const void *)&hin);
586  else
587  Error("ProcessUnsolicitedMsg","handler undefined");
588  }
589  break;
590  case kXPD_urgent:
591  //
592  // Set shutdown timer
593  {
594  // The next 4 bytes contain the urgent msg type
595  kXR_int32 type = -1;
596  if (len > 0) {
597  memcpy(&type, pdata, sizeof(kXR_int32));
598  type = net2host(type);
599  if (gDebug > 1)
600  Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
601  // Update pointer to data
602  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
603  len -= sizeof(kXR_int32);
604  }
605  // The next 4 bytes contain the first info container
606  kXR_int32 int1 = -1;
607  if (len > 0) {
608  memcpy(&int1, pdata, sizeof(kXR_int32));
609  int1 = net2host(int1);
610  if (gDebug > 1)
611  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
612  // Update pointer to data
613  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
614  len -= sizeof(kXR_int32);
615  }
616  // The next 4 bytes contain the second info container
617  kXR_int32 int2 = -1;
618  if (len > 0) {
619  memcpy(&int2, pdata, sizeof(kXR_int32));
620  int2 = net2host(int2);
621  if (gDebug > 1)
622  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
623  // Update pointer to data
624  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
625  len -= sizeof(kXR_int32);
626  }
627 
628  // Handle this input in this thread to avoid queuing on the
629  // main thread
630  XHandleIn_t hin = {acod, type, int1, int2};
631  if (fHandler)
632  fHandler->HandleInput((const void *)&hin);
633  else
634  Error("ProcessUnsolicitedMsg","handler undefined");
635  }
636  break;
637  case kXPD_msg:
638  //
639  // Data message
640  { std::lock_guard<std::recursive_mutex> lock(fAMtx);
641 
642  // Get a spare buffer
643  TXSockBuf *b = PopUpSpare(len);
644  if (!b) {
645  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
646  return rc;
647  }
648  memcpy(b->fBuf, pdata, len);
649  b->fLen = len;
650 
651  // Update counters
652  fBytesRecv += len;
653 
654  // Produce the message
655  fAQue.push_back(b);
656 
657  // Post the global pipe
658  fgPipe.Post(this);
659 
660  // Signal it and release the mutex
661  if (gDebug > 2)
662  Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
663  this, GetTitle(), &fASem, len);
664  fASem.Post();
665  }
666 
667  break;
668  case kXPD_feedback:
669  Info("ProcessUnsolicitedMsg",
670  "kXPD_feedback treatment not yet implemented");
671  break;
672  case kXPD_srvmsg:
673  //
674  // Service message
675  {
676  // The next 4 bytes may contain a flag to control the way the message is displayed
677  kXR_int32 opt = 0;
678  memcpy(&opt, pdata, sizeof(kXR_int32));
679  opt = net2host(opt);
680  if (opt >= 0 && opt <= 4) {
681  // Update pointer to data
682  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
683  len -= sizeof(kXR_int32);
684  } else {
685  opt = 1;
686  }
687 
688  if (opt == 0) {
689  // One line
690  Printf("| %.*s", len, (char *)pdata);
691  } else if (opt == 2) {
692  // Raw displaying
693  Printf("%.*s", len, (char *)pdata);
694  } else if (opt == 3) {
695  // Incremental displaying
696  fprintf(stderr, "%.*s", len, (char *)pdata);
697  } else if (opt == 4) {
698  // Rewind
699  fprintf(stderr, "%.*s\r", len, (char *)pdata);
700  } else {
701  // A small header
702  Printf(" ");
703  Printf("| Message from server:");
704  Printf("| %.*s", len, (char *)pdata);
705  }
706  }
707  break;
708  case kXPD_errmsg:
709  //
710  // Error condition with message
711  Printf("\n\n");
712  Printf("| Error condition occured: message from server:");
713  Printf("| %.*s", len, (char *)pdata);
714  Printf("\n");
715  // Handle error
716  if (fHandler)
718  else
719  Error("ProcessUnsolicitedMsg","handler undefined");
720  break;
721  case kXPD_msgsid:
722  //
723  // Data message
724  { std::lock_guard<std::recursive_mutex> lock(fAMtx);
725 
726  // The next 4 bytes contain the sessiond id
727  kXR_int32 cid = 0;
728  memcpy(&cid, pdata, sizeof(kXR_int32));
729  cid = net2host(cid);
730 
731  if (gDebug > 1)
732  Info("ProcessUnsolicitedMsg","found cid: %d", cid);
733 
734  // Update pointer to data
735  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
736  len -= sizeof(kXR_int32);
737 
738  // Get a spare buffer
739  TXSockBuf *b = PopUpSpare(len);
740  if (!b) {
741  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
742  return rc;
743  }
744  memcpy(b->fBuf, pdata, len);
745  b->fLen = len;
746 
747  // Set the sid
748  b->fCid = cid;
749 
750  // Update counters
751  fBytesRecv += len;
752 
753  // Produce the message
754  fAQue.push_back(b);
755 
756  // Post the global pipe
757  fgPipe.Post(this);
758 
759  // Signal it and release the mutex
760  if (gDebug > 2)
761  Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
762  this, cid, &fASem, len);
763  fASem.Post();
764  }
765 
766  break;
767  case kXPD_wrkmortem:
768  //
769  // A worker died
770  { TString what = TString::Format("%.*s", len, (char *)pdata);
771  if (what.BeginsWith("idle-timeout")) {
772  // Notify the idle timeout
774  } else {
775  Printf(" ");
776  Printf("| %s", what.Data());
777  // Handle error
778  if (fHandler)
780  else
781  Error("ProcessUnsolicitedMsg","handler undefined");
782  }
783  }
784  break;
785 
786  case kXPD_touch:
787  //
788  // Request for remote touch: post a message to do that
790  break;
791  case kXPD_resume:
792  //
793  // process the next query (in the TXProofServ)
795  break;
796  case kXPD_clusterinfo:
797  //
798  // Broadcast cluster information
799  {
800  kXR_int32 nsess = -1, nacti = -1, neffs = -1;
801  if (len > 0) {
802  // Total sessions
803  memcpy(&nsess, pdata, sizeof(kXR_int32));
804  nsess = net2host(nsess);
805  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
806  len -= sizeof(kXR_int32);
807  // Active sessions
808  memcpy(&nacti, pdata, sizeof(kXR_int32));
809  nacti = net2host(nacti);
810  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
811  len -= sizeof(kXR_int32);
812  // Effective sessions
813  memcpy(&neffs, pdata, sizeof(kXR_int32));
814  neffs = net2host(neffs);
815  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
816  len -= sizeof(kXR_int32);
817  }
818  if (gDebug > 1)
819  Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
820  " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
821  // Handle this input in this thread to avoid queuing on the
822  // main thread
823  XHandleIn_t hin = {acod, nsess, nacti, neffs};
824  if (fHandler)
825  fHandler->HandleInput((const void *)&hin);
826  else
827  Error("ProcessUnsolicitedMsg","handler undefined");
828  }
829  break;
830  default:
831  Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
832  this, acod, GetTitle());
834  break;
835  }
836 
837  // We are done
838  return rc;
839 }
840 
841 ////////////////////////////////////////////////////////////////////////////////
842 /// Post a message of type 'type' into the read messages queue.
843 /// If 'msg' is defined it is also added as TString.
844 /// This is used, for example, with kPROOF_FATAL to force the main thread
845 /// to mark this socket as bad, avoiding race condition when a worker
846 /// dies while in processing state.
847 
848 void TXSocket::PostMsg(Int_t type, const char *msg)
849 {
850  // Create the message
851  TMessage m(type);
852 
853  // Add the string if any
854  if (msg && strlen(msg) > 0)
855  m << TString(msg);
856 
857  // Write length in first word of buffer
858  m.SetLength();
859 
860  // Get pointer to the message buffer
861  char *mbuf = m.Buffer();
862  Int_t mlen = m.Length();
863  if (m.CompBuffer()) {
864  mbuf = m.CompBuffer();
865  mlen = m.CompLength();
866  }
867 
868  //
869  // Data message
870  std::lock_guard<std::recursive_mutex> lock(fAMtx);
871 
872  // Get a spare buffer
873  TXSockBuf *b = PopUpSpare(mlen);
874  if (!b) {
875  Error("PostMsg", "could allocate spare buffer");
876  return;
877  }
878 
879  // Fill the pipe buffer
880  memcpy(b->fBuf, mbuf, mlen);
881  b->fLen = mlen;
882 
883  // Update counters
884  fBytesRecv += mlen;
885 
886  // Produce the message
887  fAQue.push_back(b);
888 
889  // Post the global pipe
890  fgPipe.Post(this);
891 
892  // Signal it and release the mutex
893  if (gDebug > 0)
894  Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
895  this, type, &fASem, mlen);
896  fASem.Post();
897 
898  // Done
899  return;
900 }
901 
902 ////////////////////////////////////////////////////////////////////////////////
903 /// Wake up all threads waiting for at the semaphore (used by TXSlave)
904 
906 {
907  std::lock_guard<std::recursive_mutex> lock(fAMtx);
908 
909  // Post semaphore to wake up anybody waiting; send as many posts as needed
910  while (fASem.TryWait() != 1)
911  fASem.Post();
912 
913  return;
914 }
915 
916 ////////////////////////////////////////////////////////////////////////////////
917 /// Getter for logical connection ID
918 
920 {
921  return (fConn ? fConn->GetLogConnID() : -1);
922 }
923 
924 ////////////////////////////////////////////////////////////////////////////////
925 /// Getter for last error
926 
928 {
929  return (fConn ? fConn->GetOpenError() : -1);
930 }
931 
932 ////////////////////////////////////////////////////////////////////////////////
933 /// Getter for server type
934 
936 {
937  return (fConn ? fConn->GetServType() : -1);
938 }
939 
940 ////////////////////////////////////////////////////////////////////////////////
941 /// Getter for session ID
942 
944 {
945  return (fConn ? fConn->GetSessionID() : -1);
946 }
947 
948 ////////////////////////////////////////////////////////////////////////////////
949 /// Getter for validity status
950 
952 {
953  return (fConn ? (fConn->IsValid()) : kFALSE);
954 }
955 
956 ////////////////////////////////////////////////////////////////////////////////
957 /// Return kTRUE if the remote server is a 'proofd'
958 
960 {
962  return kTRUE;
963 
964  // Failure
965  return kFALSE;
966 }
967 
968 ////////////////////////////////////////////////////////////////////////////////
969 /// Get latest interrupt level and reset it; if the interrupt has to be
970 /// propagated to lower stages forward will be kTRUE after the call
971 
973 {
974  if (gDebug > 2)
975  Info("GetInterrupt","%p: waiting to lock mutex", this);
976 
977  std::lock_guard<std::recursive_mutex> lock(fIMtx);
978 
979  // Reset values
980  Int_t ilev = -1;
981  forward = kFALSE;
982 
983  // Check if filled
984  if (fILev == -1)
985  Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
986 
987  // Fill output
988  ilev = fILev;
989  forward = fIForward;
990 
991  // Reset values (we process it only once)
992  fILev = -1;
993  fIForward = kFALSE;
994 
995  // Return what we got
996  return ilev;
997 }
998 
999 ////////////////////////////////////////////////////////////////////////////////
1000 /// Flush the asynchronous queue.
1001 /// Typically called when a kHardInterrupt is received.
1002 /// Returns number of bytes in flushed buffers.
1003 
1005 {
1006  Int_t nf = 0;
1007  list<TXSockBuf *> splist;
1008  list<TXSockBuf *>::iterator i;
1009 
1010  { std::lock_guard<std::recursive_mutex> lock(fAMtx);
1011 
1012  // Must have something to flush
1013  if (fAQue.size() > 0) {
1014 
1015  // Save size for later semaphore cleanup
1016  Int_t sz = fAQue.size();
1017  // get the highest interrupt level
1018  for (i = fAQue.begin(); i != fAQue.end();) {
1019  if (*i) {
1020  splist.push_back(*i);
1021  nf += (*i)->fLen;
1022  i = fAQue.erase(i);
1023  }
1024  }
1025 
1026  // Reset the asynchronous queue
1027  while (sz--) {
1028  if (fASem.TryWait() == 1)
1029  Printf("Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
1030  }
1031  fAQue.clear();
1032  }
1033  }
1034 
1035  // Move spares to the spare queue
1036  { std::lock_guard<std::mutex> lock(fgSMtx);
1037  if (splist.size() > 0) {
1038  for (i = splist.begin(); i != splist.end();) {
1039  fgSQue.push_back(*i);
1040  i = splist.erase(i);
1041  }
1042  }
1043  }
1044 
1045  // We are done
1046  return nf;
1047 }
1048 
1049 ////////////////////////////////////////////////////////////////////////////////
1050 /// This method sends a request for creation of (or attachment to) a remote
1051 /// server application.
1052 
1054 {
1055  // Make sure we are connected
1056  if (!IsValid()) {
1057  if (gDebug > 0)
1058  Info("Create","not connected: nothing to do");
1059  return kFALSE;
1060  }
1061 
1062  Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
1063 
1064  while (retriesleft--) {
1065 
1066  XPClientRequest reqhdr;
1067 
1068  // We fill the header struct containing the request for login
1069  memset( &reqhdr, 0, sizeof(reqhdr));
1070  fConn->SetSID(reqhdr.header.streamid);
1071 
1072  // This will be a kXP_attach or kXP_create request
1073  if (fMode == 'A' || attach) {
1074  reqhdr.header.requestid = kXP_attach;
1075  reqhdr.proof.sid = fSessionID;
1076  } else {
1077  reqhdr.header.requestid = kXP_create;
1078  }
1079 
1080  // Send log level
1081  reqhdr.proof.int1 = fLogLevel;
1082 
1083  // Send also the chosen alias
1084  const void *buf = (const void *)(fBuffer.Data());
1085  reqhdr.header.dlen = fBuffer.Length();
1086  if (gDebug >= 2)
1087  Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
1088 
1089  // We call SendReq, the function devoted to sending commands.
1090  if (gDebug > 1)
1091  Info("Create", "creating session of server %s", fUrl.Data());
1092 
1093  // server response header
1094  char *answData = 0;
1095  XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
1096  &answData, "TXSocket::Create", 0);
1097  struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
1098 
1099  // If any, the URL the data pool entry point will be stored here
1100  fBuffer = "";
1101  if (xrsp) {
1102 
1103  //
1104  // Pointer to data
1105  void *pdata = (void *)(xrsp->GetData());
1106  Int_t len = xrsp->DataLen();
1107 
1108  if (len >= (Int_t)sizeof(kXR_int32)) {
1109  // The first 4 bytes contain the session ID
1110  kXR_int32 psid = 0;
1111  memcpy(&psid, pdata, sizeof(kXR_int32));
1112  fSessionID = net2host(psid);
1113  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1114  len -= sizeof(kXR_int32);
1115  } else {
1116  Error("Create","session ID is undefined!");
1117  fSessionID = -1;
1118  if (srvresp) free(srvresp);
1119  return kFALSE;
1120  }
1121 
1122  if (len >= (Int_t)sizeof(kXR_int16)) {
1123  // The second 2 bytes contain the remote PROOF protocol version
1124  kXR_int16 dver = 0;
1125  memcpy(&dver, pdata, sizeof(kXR_int16));
1126  fRemoteProtocol = net2host(dver);
1127  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1128  len -= sizeof(kXR_int16);
1129  } else {
1130  Warning("Create","protocol version of the remote PROOF undefined!");
1131  }
1132 
1133  if (fRemoteProtocol == 0) {
1134  // We are dealing with an older server: the PROOF protocol is on 4 bytes
1135  len += sizeof(kXR_int16);
1136  kXR_int32 dver = 0;
1137  memcpy(&dver, pdata, sizeof(kXR_int32));
1138  fRemoteProtocol = net2host(dver);
1139  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1140  len -= sizeof(kXR_int32);
1141  } else {
1142  if (len >= (Int_t)sizeof(kXR_int16)) {
1143  // The third 2 bytes contain the remote XrdProofdProtocol version
1144  kXR_int16 dver = 0;
1145  memcpy(&dver, pdata, sizeof(kXR_int16));
1146  fXrdProofdVersion = net2host(dver);
1147  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1148  len -= sizeof(kXR_int16);
1149  } else {
1150  Warning("Create","version of the remote XrdProofdProtocol undefined!");
1151  }
1152  }
1153 
1154  if (len > 0) {
1155  // From top masters, the url of the data pool
1156  char *url = new char[len+1];
1157  memcpy(url, pdata, len);
1158  url[len] = 0;
1159  fBuffer = url;
1160  delete[] url;
1161  }
1162 
1163  // Cleanup
1164  SafeDelete(xrsp);
1165  if (srvresp) free(srvresp);
1166 
1167  // Notify
1168  return kTRUE;
1169  } else {
1170  // Extract log file path, if any
1171  Ssiz_t ilog = kNPOS;
1172  if (retriesleft <= 0 && fConn->GetLastErr()) {
1173  fBuffer = fConn->GetLastErr();
1174  if ((ilog = fBuffer.Index("|log:")) != kNPOS) fBuffer.Remove(0, ilog);
1175  }
1176  // If not free resources now, just give up
1177  if (fConn->GetOpenError() == kXP_TooManySess) {
1178  // Avoid to contact the server any more
1179  fSessionID = -1;
1180  if (srvresp) free(srvresp);
1181  return kFALSE;
1182  } else {
1183  // Print error msg, if any
1184  if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr()) {
1185  TString emsg(fConn->GetLastErr());
1186  if ((ilog = emsg.Index("|log:")) != kNPOS) emsg.Remove(ilog);
1187  Printf("%s: %s", fHost.Data(), emsg.Data());
1188  }
1189  }
1190  }
1191 
1192  if (gDebug > 0)
1193  Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
1194  if (retriesleft <= 0)
1195  Error("Create", "%d creation/attachment attempts failed: no attempts left",
1196  gEnv->GetValue("XProof.CreationRetries", 4));
1197 
1198  if (srvresp) free(srvresp);
1199  } // Creation retries
1200 
1201  // The session is invalid: reset the sessionID to invalid state (it was our protocol
1202  // number during creation
1203  fSessionID = -1;
1204 
1205  // Notify failure
1206  Error("Create:",
1207  "problems creating or attaching to a remote server (%s)",
1208  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
1209  return kFALSE;
1210 }
1211 
1212 ////////////////////////////////////////////////////////////////////////////////
1213 /// Send a raw buffer of specified length.
1214 /// Use opt = kDontBlock to ask xproofd to push the message into the proofsrv.
1215 /// (by default is appended to a queue waiting for a request from proofsrv).
1216 /// Returns the number of bytes sent or -1 in case of error.
1217 
1218 Int_t TXSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
1219 {
1221 
1222  // Options and request ID
1223  fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
1224  : (~kXPD_async & fSendOpt) ;
1225 
1226  // Prepare request
1227  XPClientRequest Request;
1228  memset( &Request, 0, sizeof(Request) );
1229  fConn->SetSID(Request.header.streamid);
1230  Request.sendrcv.requestid = kXP_sendmsg;
1231  Request.sendrcv.sid = fSessionID;
1232  Request.sendrcv.opt = fSendOpt;
1233  Request.sendrcv.cid = GetClientID();
1234  Request.sendrcv.dlen = length;
1235  if (gDebug >= 2)
1236  Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
1237 
1238  // Send request
1239  XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
1240 
1241  if (xrsp) {
1242  // Prepare return info
1243  Int_t nsent = length;
1244 
1245  // Update counters
1246  fBytesSent += length;
1247 
1248  // Cleanup
1249  SafeDelete(xrsp);
1250 
1251  // Success: update usage timestamp
1252  Touch();
1253 
1254  // ok
1255  return nsent;
1256  } else {
1257  // Print error message, if any
1258  if (fConn->GetLastErr())
1259  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1260  else
1261  Printf("%s: error occured but no message from server", fHost.Data());
1262  }
1263 
1264  // Failure notification (avoid using the handler: we may be exiting)
1265  Error("SendRaw", "%s: problems sending %d bytes to server",
1266  fHost.Data(), length);
1267  return -1;
1268 }
1269 
1270 ////////////////////////////////////////////////////////////////////////////////
1271 /// Ping functionality: contact the server to check its vitality.
1272 /// If external, the server waits for a reply from the server
1273 /// Returns kTRUE if OK or kFALSE in case of error.
1274 
1275 Bool_t TXSocket::Ping(const char *ord)
1276 {
1278 
1279  if (gDebug > 0)
1280  Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
1281 
1282  // Make sure we are connected
1283  if (!IsValid()) {
1284  Error("Ping","not connected: nothing to do");
1285  return kFALSE;
1286  }
1287 
1288  // Options
1289  kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
1290 
1291  // Prepare request
1292  XPClientRequest Request;
1293  memset( &Request, 0, sizeof(Request) );
1294  fConn->SetSID(Request.header.streamid);
1295  Request.sendrcv.requestid = kXP_ping;
1296  Request.sendrcv.sid = fSessionID;
1297  Request.sendrcv.opt = options;
1298  Request.sendrcv.dlen = 0;
1299 
1300  // Send request
1301  Bool_t res = kFALSE;
1302  if (fMode != 'i') {
1303  char *pans = 0;
1304  XrdClientMessage *xrsp =
1305  fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
1306  kXR_int32 *pres = (kXR_int32 *) pans;
1307 
1308  // Get the result
1309  if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
1310  *pres = net2host(*pres);
1311  res = (*pres == 1) ? kTRUE : kFALSE;
1312  // Success: update usage timestamp
1313  Touch();
1314  } else {
1315  // Print error msg, if any
1316  if (fConn->GetLastErr())
1317  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1318  }
1319 
1320  // Cleanup
1321  SafeDelete(xrsp);
1322  if (pans) free(pans);
1323 
1324  } else {
1325  if (XPD::clientMarshall(&Request) == 0) {
1326  XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
1327  res = (e == kOK) ? kTRUE : kFALSE;
1328  } else {
1329  Error("Ping", "%p: int: problems marshalling request", this);
1330  }
1331  }
1332 
1333  // Failure notification (avoid using the handler: we may be exiting)
1334  if (!res) {
1335  Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
1336  } else if (gDebug > 0) {
1337  Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
1338  }
1339 
1340  return res;
1341 }
1342 
1343 ////////////////////////////////////////////////////////////////////////////////
1344 /// Remote touch functionality: contact the server to proof our vitality.
1345 /// No reply from server is expected.
1346 
1348 {
1350 
1351  if (gDebug > 0)
1352  Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
1353 
1354  // Make sure we are connected
1355  if (!IsValid()) {
1356  Error("RemoteTouch","not connected: nothing to do");
1357  return;
1358  }
1359 
1360  // Prepare request
1361  XPClientRequest Request;
1362  memset( &Request, 0, sizeof(Request) );
1363  fConn->SetSID(Request.header.streamid);
1364  Request.sendrcv.requestid = kXP_touch;
1365  Request.sendrcv.sid = fSessionID;
1366  Request.sendrcv.opt = 0;
1367  Request.sendrcv.dlen = 0;
1368 
1369  // We need the right order
1370  if (XPD::clientMarshall(&Request) != 0) {
1371  Error("Touch", "%p: problems marshalling request ", this);
1372  return;
1373  }
1374  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1375  Error("Touch", "%p: problems sending touch request to server", this);
1376 
1377  // Done
1378  return;
1379 }
1380 
1381 ////////////////////////////////////////////////////////////////////////////////
1382 /// Interrupt the remote protocol instance. Used to propagate Ctrl-C.
1383 /// No reply from server is expected.
1384 
1386 {
1388 
1389  if (gDebug > 0)
1390  Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
1391 
1392  // Make sure we are connected
1393  if (!IsValid()) {
1394  Error("CtrlC","not connected: nothing to do");
1395  return;
1396  }
1397 
1398  // Prepare request
1399  XPClientRequest Request;
1400  memset( &Request, 0, sizeof(Request) );
1401  fConn->SetSID(Request.header.streamid);
1402  Request.proof.requestid = kXP_ctrlc;
1403  Request.proof.sid = 0;
1404  Request.proof.dlen = 0;
1405 
1406  // We need the right order
1407  if (XPD::clientMarshall(&Request) != 0) {
1408  Error("CtrlC", "%p: problems marshalling request ", this);
1409  return;
1410  }
1411  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1412  Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
1413 
1414  // Done
1415  return;
1416 }
1417 
1418 ////////////////////////////////////////////////////////////////////////////////
1419 /// Wait and pick-up next buffer from the asynchronous queue
1420 
1422 {
1423  fBufCur = 0;
1424  fByteLeft = 0;
1425  fByteCur = 0;
1426  if (gDebug > 2)
1427  Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
1428 
1429  // User can choose whether to wait forever or for a fixed amount of time
1430  if (!fDontTimeout) {
1431  static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
1432  static Int_t dt = 2000;
1433  Int_t to = timeout;
1435  while (to && !IsInterrupt()) {
1436  SetAWait(kTRUE);
1437  if (fASem.Wait(dt) != 0) {
1438  to -= dt;
1439  if (to <= 0) {
1440  Error("PickUpReady","error waiting at semaphore");
1441  return -1;
1442  } else {
1443  if (gDebug > 0)
1444  Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
1445  this, GetTitle(), to/1000);
1446  }
1447  } else
1448  break;
1449  SetAWait(kFALSE);
1450  }
1451  // We wait forever
1452  if (IsInterrupt()) {
1453  if (gDebug > 2)
1454  Info("PickUpReady","interrupted");
1456  SetAWait(kFALSE);
1457  return -1;
1458  }
1459  } else {
1460  // We wait forever
1461  SetAWait(kTRUE);
1462  if (fASem.Wait() != 0) {
1463  Error("PickUpReady","error waiting at semaphore");
1464  SetAWait(kFALSE);
1465  return -1;
1466  }
1467  SetAWait(kFALSE);
1468  }
1469  if (gDebug > 2)
1470  Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
1471 
1472  std::lock_guard<std::recursive_mutex> lock(fAMtx);
1473 
1474  // Get message, if any
1475  if (fAQue.size() <= 0) {
1476  Error("PickUpReady","queue is empty - protocol error ?");
1477  return -1;
1478  }
1479  if (!(fBufCur = fAQue.front())) {
1480  Error("PickUpReady","got invalid buffer - protocol error ?");
1481  return -1;
1482  }
1483  // Remove message from the queue
1484  fAQue.pop_front();
1485 
1486  // Set number of available bytes
1487  fByteLeft = fBufCur->fLen;
1488 
1489  if (gDebug > 2)
1490  Info("PickUpReady", "%p: %s: got message (%d bytes)",
1491  this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
1492 
1493  // Update counters
1494  fBytesRecv += fBufCur->fLen;
1495 
1496  // Set session ID
1497  if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
1499 
1500  // Clean entry in the underlying pipe
1501  fgPipe.Clean(this);
1502 
1503  // We are done
1504  return 0;
1505 }
1506 
1507 ////////////////////////////////////////////////////////////////////////////////
1508 /// Pop-up a buffer of at least size bytes from the spare list
1509 /// If none is found either one is reallocated or a new one
1510 /// created
1511 
1513 {
1514  TXSockBuf *buf = 0;
1515  static Int_t nBuf = 0;
1516 
1517  std::lock_guard<std::mutex> lock(fgSMtx);
1518 
1519  Int_t maxsz = 0;
1520  if (fgSQue.size() > 0) {
1521  list<TXSockBuf *>::iterator i;
1522  for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
1523  maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1524  if ((*i) && (*i)->fSiz >= size) {
1525  buf = *i;
1526  if (gDebug > 2)
1527  Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1528  size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
1529  // Drop from this list
1530  fgSQue.erase(i);
1531  return buf;
1532  }
1533  }
1534  // All buffers are too small: enlarge the first one
1535  buf = fgSQue.front();
1536  buf->Resize(size);
1537  if (gDebug > 2)
1538  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1539  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1540  // Drop from this list
1541  fgSQue.pop_front();
1542  return buf;
1543  }
1544 
1545  // Create a new buffer
1546  buf = new TXSockBuf((char *)malloc(size), size);
1547  nBuf++;
1548 
1549  if (gDebug > 2)
1550  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1551  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1552 
1553  // We are done
1554  return buf;
1555 }
1556 
1557 ////////////////////////////////////////////////////////////////////////////////
1558 /// Release read buffer giving back to the spare list
1559 
1561 {
1562  std::lock_guard<std::mutex> lock(fgSMtx);
1563 
1564  if (gDebug > 2)
1565  Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
1567 
1569  fgSQue.push_back(fBufCur);
1570  } else {
1571  delete fBufCur;
1572  }
1573  fBufCur = 0;
1574  fByteCur = 0;
1575  fByteLeft = 0;
1576 }
1577 
1578 ////////////////////////////////////////////////////////////////////////////////
1579 /// Receive a raw buffer of specified length bytes.
1580 
1582 {
1583  // Inputs must make sense
1584  if (!buffer || (length <= 0))
1585  return -1;
1586 
1587  // Wait and pick-up a read buffer if we do not have one
1588  if (!fBufCur && (PickUpReady() != 0))
1589  return -1;
1590 
1591  // Use it
1592  if (fByteLeft >= length) {
1593  memcpy(buffer, fBufCur->fBuf + fByteCur, length);
1594  fByteCur += length;
1595  if ((fByteLeft -= length) <= 0)
1596  // All used: give back
1597  PushBackSpare();
1598  // Success: update usage timestamp
1599  Touch();
1600  return length;
1601  } else {
1602  // Copy the first part
1603  memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
1604  Int_t at = fByteLeft;
1605  Int_t tobecopied = length - fByteLeft;
1606  PushBackSpare();
1607  while (tobecopied > 0) {
1608  // Pick-up next buffer (it may wait inside)
1609  if (PickUpReady() != 0)
1610  return -1;
1611  // Copy the fresh meat
1612  Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1613  memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
1614  fByteCur = ncpy;
1615  if ((fByteLeft -= ncpy) <= 0)
1616  // All used: give back
1617  PushBackSpare();
1618  // Recalculate
1619  tobecopied -= ncpy;
1620  at += ncpy;
1621  }
1622  }
1623 
1624  // Update counters
1625  fBytesRecv += length;
1626  fgBytesRecv += length;
1627 
1628  // Success: update usage timestamp
1629  Touch();
1630 
1631  return length;
1632 }
1633 
1634 ////////////////////////////////////////////////////////////////////////////////
1635 /// Send urgent message (interrupt) to remote server
1636 /// Returns 0 or -1 in case of error.
1637 
1639 {
1641 
1642  // Prepare request
1643  XPClientRequest Request;
1644  memset(&Request, 0, sizeof(Request) );
1645  fConn->SetSID(Request.header.streamid);
1646  if (type == (Int_t) TProof::kShutdownInterrupt)
1647  Request.interrupt.requestid = kXP_destroy;
1648  else
1649  Request.interrupt.requestid = kXP_interrupt;
1650  Request.interrupt.sid = fSessionID;
1651  Request.interrupt.type = type; // type of interrupt (see TProof::EUrgent)
1652  Request.interrupt.dlen = 0;
1653 
1654  // Send request
1655  XrdClientMessage *xrsp =
1656  fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
1657  if (xrsp) {
1658  // Success: update usage timestamp
1659  Touch();
1660  // Cleanup
1661  SafeDelete(xrsp);
1662  // ok
1663  return 0;
1664  } else {
1665  // Print error msg, if any
1666  if (fConn->GetLastErr())
1667  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1668  }
1669 
1670  // Failure notification (avoid using the handler: we may be exiting)
1671  Error("SendInterrupt", "problems sending interrupt to server");
1672  return -1;
1673 }
1674 
1675 ////////////////////////////////////////////////////////////////////////////////
1676 
1678 {
1679  std::lock_guard<std::recursive_mutex> lock(fAMtx);
1680  fRDInterrupt = i;
1681  if (i && fConn) fConn->SetInterrupt();
1682  if (i && fAWait) fASem.Post();
1683 }
1684 
1685 ////////////////////////////////////////////////////////////////////////////////
1686 /// Send a TMessage object. Returns the number of bytes in the TMessage
1687 /// that were sent and -1 in case of error.
1688 
1690 {
1692 
1693  if (mess.IsReading()) {
1694  Error("Send", "cannot send a message used for reading");
1695  return -1;
1696  }
1697 
1698  // send streamer infos in case schema evolution is enabled in the TMessage
1699  SendStreamerInfos(mess);
1700 
1701  // send the process id's so TRefs work
1702  SendProcessIDs(mess);
1703 
1704  mess.SetLength(); //write length in first word of buffer
1705 
1706  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
1707  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
1708 
1709  if (mess.GetCompressionLevel() > 0)
1710  const_cast<TMessage&>(mess).Compress();
1711 
1712  char *mbuf = mess.Buffer();
1713  Int_t mlen = mess.Length();
1714  if (mess.CompBuffer()) {
1715  mbuf = mess.CompBuffer();
1716  mlen = mess.CompLength();
1717  }
1718 
1719  // Parse message type to choose sending options
1720  kXR_int32 fSendOptDefault = fSendOpt;
1721  switch (mess.What()) {
1722  case kPROOF_PROCESS:
1724  break;
1725  case kPROOF_PROGRESS:
1726  case kPROOF_FEEDBACK:
1728  break;
1729  case kPROOF_QUERYSUBMITTED:
1732  break;
1733  case kPROOF_STARTPROCESS:
1736  break;
1737  case kPROOF_STOPPROCESS:
1739  break;
1740  case kPROOF_SETIDLE:
1743  break;
1744  case kPROOF_LOGFILE:
1745  case kPROOF_LOGDONE:
1746  if (GetClientIDSize() <= 1)
1747  fSendOpt |= kXPD_logmsg;
1748  break;
1749  default:
1750  break;
1751  }
1752 
1753  if (gDebug > 2)
1754  Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
1755 
1756  Int_t nsent = SendRaw(mbuf, mlen);
1757  fSendOpt = fSendOptDefault;
1758 
1759  if (nsent <= 0)
1760  return nsent;
1761 
1762  fBytesSent += nsent;
1763  fgBytesSent += nsent;
1764 
1765  return nsent - sizeof(UInt_t); //length - length header
1766 }
1767 
1768 ////////////////////////////////////////////////////////////////////////////////
1769 /// Receive a TMessage object. The user must delete the TMessage object.
1770 /// Returns length of message in bytes (can be 0 if other side of connection
1771 /// is closed) or -1 in case of error or -5 if pipe broken (connection invalid).
1772 /// In those case mess == 0.
1773 
1775 {
1777 
1778  if (!IsValid()) {
1779  mess = 0;
1780  return -5;
1781  }
1782 
1783 oncemore:
1784  Int_t n;
1785  UInt_t len;
1786  if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
1787  mess = 0;
1788  return n;
1789  }
1790  len = net2host(len); //from network to host byte order
1791 
1792  char *buf = new char[len+sizeof(UInt_t)];
1793  if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
1794  delete [] buf;
1795  mess = 0;
1796  return n;
1797  }
1798 
1799  fBytesRecv += n + sizeof(UInt_t);
1800  fgBytesRecv += n + sizeof(UInt_t);
1801 
1802  mess = new TMessage(buf, len+sizeof(UInt_t));
1803 
1804  // receive any streamer infos
1805  if (RecvStreamerInfos(mess))
1806  goto oncemore;
1807 
1808  // receive any process ids
1809  if (RecvProcessIDs(mess))
1810  goto oncemore;
1811 
1812  if (mess->What() & kMESS_ACK) {
1813  // Acknowledgement embedded: ignore ...
1814  mess->SetWhat(mess->What() & ~kMESS_ACK);
1815  }
1816 
1817  return n;
1818 }
1819 
1820 ////////////////////////////////////////////////////////////////////////////////
1821 /// Send message to intermediate coordinator.
1822 /// If any output is due, this is returned as an obj string to be
1823 /// deleted by the caller
1824 
1825 TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
1826  Long64_t l64, Int_t int3, const char *)
1827 {
1828  TObjString *sout = 0;
1829 
1830  // We fill the header struct containing the request
1831  XPClientRequest reqhdr;
1832  const void *buf = 0;
1833  char *bout = 0;
1834  char **vout = 0;
1835  memset(&reqhdr, 0, sizeof(reqhdr));
1836  fConn->SetSID(reqhdr.header.streamid);
1837  reqhdr.header.requestid = kXP_admin;
1838  reqhdr.proof.int1 = kind;
1839  reqhdr.proof.int2 = int2;
1840  switch (kind) {
1841  case kQueryMssUrl:
1842  case kQueryROOTVersions:
1843  case kQuerySessions:
1844  case kQueryWorkers:
1845  reqhdr.proof.sid = 0;
1846  reqhdr.header.dlen = 0;
1847  vout = (char **)&bout;
1848  break;
1849  case kCleanupSessions:
1850  reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
1851  : (kXR_int32) kXPD_TopMaster;
1852  reqhdr.proof.int3 = int2;
1853  reqhdr.proof.sid = fSessionID;
1854  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1855  buf = (msg) ? (const void *)msg : buf;
1856  break;
1857  case kCpFile:
1858  case kGetFile:
1859  case kPutFile:
1860  case kExec:
1861  reqhdr.proof.sid = fSessionID;
1862  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1863  buf = (msg) ? (const void *)msg : buf;
1864  vout = (char **)&bout;
1865  break;
1866  case kQueryLogPaths:
1867  vout = (char **)&bout;
1868  reqhdr.proof.int3 = int3;
1869  case kReleaseWorker:
1870  case kSendMsgToUser:
1871  case kGroupProperties:
1872  case kSessionTag:
1873  case kSessionAlias:
1874  reqhdr.proof.sid = fSessionID;
1875  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1876  buf = (msg) ? (const void *)msg : buf;
1877  break;
1878  case kROOTVersion:
1879  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1880  buf = (msg) ? (const void *)msg : buf;
1881  break;
1882  case kGetWorkers:
1883  reqhdr.proof.sid = fSessionID;
1884  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1885  if (msg)
1886  buf = (const void *)msg;
1887  vout = (char **)&bout;
1888  break;
1889  case kReadBuffer:
1890  reqhdr.header.requestid = kXP_readbuf;
1891  reqhdr.readbuf.ofs = l64;
1892  reqhdr.readbuf.len = int2;
1893  if (int3 > 0 && fXrdProofdVersion < 1003) {
1894  Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
1895  " grep functionality not supported", fXrdProofdVersion);
1896  return sout;
1897  }
1898  reqhdr.readbuf.int1 = int3;
1899  if (!msg || strlen(msg) <= 0) {
1900  Info("SendCoordinator", "kReadBuffer: file path undefined");
1901  return sout;
1902  }
1903  reqhdr.header.dlen = strlen(msg);
1904  buf = (const void *)msg;
1905  vout = (char **)&bout;
1906  break;
1907  default:
1908  Info("SendCoordinator", "unknown message kind: %d", kind);
1909  return sout;
1910  }
1911 
1912  // server response header
1913  Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
1914  XrdClientMessage *xrsp =
1915  fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
1916 
1917  // If positive answer
1918  if (xrsp) {
1919  // Check if we need to create an output string
1920  if (bout && (xrsp->DataLen() > 0))
1921  sout = new TObjString(TString(bout,xrsp->DataLen()));
1922  if (bout)
1923  free(bout);
1924  // Success: update usage timestamp
1925  Touch();
1926  SafeDelete(xrsp);
1927  } else {
1928  // Print error msg, if any
1929  if (fConn->GetLastErr())
1930  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1931  }
1932 
1933  // Failure notification (avoid using the handler: we may be exiting)
1934  return sout;
1935 }
1936 
1937 ////////////////////////////////////////////////////////////////////////////////
1938 /// Send urgent message to counterpart; 'type' specifies the type of
1939 /// the message (see TXSocket::EUrgentMsgType), and 'int1', 'int2'
1940 /// two containers for additional information.
1941 
1943 {
1945 
1946  // Prepare request
1947  XPClientRequest Request;
1948  memset(&Request, 0, sizeof(Request) );
1949  fConn->SetSID(Request.header.streamid);
1950  Request.proof.requestid = kXP_urgent;
1951  Request.proof.sid = fSessionID;
1952  Request.proof.int1 = type; // type of urgent msg (see TXSocket::EUrgentMsgType)
1953  Request.proof.int2 = int1; // 4-byte container info 1
1954  Request.proof.int3 = int2; // 4-byte container info 2
1955  Request.proof.dlen = 0;
1956 
1957  // Send request
1958  XrdClientMessage *xrsp =
1959  fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
1960  if (xrsp) {
1961  // Success: update usage timestamp
1962  Touch();
1963  // Cleanup
1964  SafeDelete(xrsp);
1965  } else {
1966  // Print error msg, if any
1967  if (fConn->GetLastErr())
1968  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1969  }
1970 
1971  // Done
1972  return;
1973 }
1974 
1975 ////////////////////////////////////////////////////////////////////////////////
1976 
1978  return (fConn ? fConn->GetLowSocket() : -1);
1979 }
1980 
1981 ////////////////////////////////////////////////////////////////////////////////
1982 /// Init environment variables for XrdClient
1983 
1985 {
1986  // Set debug level
1987  Int_t deb = gEnv->GetValue("XProof.Debug", -1);
1988  EnvPutInt(NAME_DEBUG, deb);
1989  if (deb > 0) {
1990  XrdProofdTrace->What |= TRACE_REQ;
1991  if (deb > 1) {
1992  XrdProofdTrace->What |= TRACE_DBG;
1993  if (deb > 2)
1994  XrdProofdTrace->What |= TRACE_ALL;
1995  }
1996  }
1997  const char *cenv = 0;
1998 
1999  // List of domains where connection is allowed
2000  TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
2001  if (allowCO.Length() > 0)
2002  EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
2003 
2004  // List of domains where connection is denied
2005  TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
2006  if (denyCO.Length() > 0)
2007  EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
2008 
2009  // Max number of retries on first connect and related timeout
2011  Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
2012  EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
2013  Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
2014  EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
2015 
2016  // Reconnect Wait
2017  Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
2019  if (recoTO == DFLT_RECONNECTWAIT) {
2020  // Check also the old variable name
2021  recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
2023  }
2024  EnvPutInt(NAME_RECONNECTWAIT, recoTO);
2025 
2026  // Request Timeout
2027  Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
2028  EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
2029 
2030  // No automatic proofd backward-compatibility
2032 
2033  // Dynamic forwarding (SOCKS4)
2034  TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
2035  Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
2036  if (socks4Port > 0) {
2037  if (socks4Host.IsNull())
2038  // Default
2039  socks4Host = "127.0.0.1";
2040  EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
2041  EnvPutInt(NAME_SOCKS4PORT, socks4Port);
2042  }
2043 
2044  // For password-based authentication
2045  TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
2046  if (autolog.Length() > 0 &&
2047  (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2048  gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
2049 
2050  // For password-based authentication
2051  TString netrc;
2052  netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
2053  gSystem->Setenv("XrdSecNETRC", netrc.Data());
2054 
2055  TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
2056  if (alogfile.Length() > 0)
2057  gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
2058 
2059  TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
2060  if (verisrv.Length() > 0 &&
2061  (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2062  gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
2063 
2064  TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
2065  if (srvpuk.Length() > 0)
2066  gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
2067 
2068  // For GSI authentication
2069  TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
2070  if (cadir.Length() > 0)
2071  gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
2072 
2073  TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
2074  if (crldir.Length() > 0)
2075  gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
2076 
2077  TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
2078  if (crlext.Length() > 0)
2079  gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
2080 
2081  TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
2082  if (ucert.Length() > 0)
2083  gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
2084 
2085  TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
2086  if (ukey.Length() > 0)
2087  gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
2088 
2089  TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
2090  if (upxy.Length() > 0)
2091  gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
2092 
2093  TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
2094  if (valid.Length() > 0)
2095  gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
2096 
2097  TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
2098  if (deplen.Length() > 0 &&
2099  (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2100  gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
2101 
2102  TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
2103  if (pxybits.Length() > 0)
2104  gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
2105 
2106  TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
2107  if (crlcheck.Length() > 0 &&
2108  (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2109  gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
2110 
2111  TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
2112  if (delegpxy.Length() > 0 &&
2113  (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2114  gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
2115 
2116  TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
2117  if (signpxy.Length() > 0 &&
2118  (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2119  gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
2120 
2121  // Print the tag, if required (only once)
2122  if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
2123  ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
2124  gROOT->GetVersion());
2125 
2126  // Only once
2127  fgInitDone = kTRUE;
2128 }
2129 
2130 ////////////////////////////////////////////////////////////////////////////////
2131 /// Try reconnection after failure
2132 
2134 {
2135  if (gDebug > 0) {
2136  Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2137  this, fConn, (fConn ? fConn->IsValid() : 0),
2138  fUrl.Data(), (fConn ? fConn->GetLogConnID() : -1));
2139  }
2140 
2141  Int_t tryreconnect = gEnv->GetValue("TXSocket.Reconnect", 0);
2142  if (tryreconnect == 0 || fXrdProofdVersion < 1005) {
2143  if (tryreconnect == 0)
2144  Info("Reconnect","%p: reconnection attempts explicitly disabled!", this);
2145  else
2146  Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
2147  this, fXrdProofdVersion);
2148  return -1;
2149  }
2150 
2151  if (fConn) {
2152  if (gDebug > 0)
2153  Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
2154  fConn->ReConnect();
2155  if (fConn->IsValid()) {
2156  // Create new proofserv if not client manager or administrator or internal mode
2157  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
2158  // We attach or create
2159  if (!Create(kTRUE)) {
2160  // Failure
2161  Error("TXSocket", "create or attach failed (%s)",
2162  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
2163  Close();
2164  return -1;
2165  }
2166  }
2167  }
2168  }
2169 
2170  if (gDebug > 0) {
2171  if (fConn) {
2172  Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
2173  (fConn->IsValid() ? "succeeded!" : "failed"),
2174  fConn->GetLogConnID() );
2175  } else {
2176  Info("Reconnect", "%p (c:0x0): attempt failed", this);
2177  }
2178  }
2179 
2180  // Done
2181  return ((fConn && fConn->IsValid()) ? 0 : -1);
2182 }
2183 
2184 ////////////////////////////////////////////////////////////////////////////////
2185 ///constructor
2186 
2188 {
2189  fBuf = fMem = bp;
2190  fSiz = fLen = sz;
2191  fOwn = own;
2192  fCid = -1;
2193  fgBuffMem += sz;
2194 }
2195 
2196 ////////////////////////////////////////////////////////////////////////////////
2197 ///destructor
2198 
2200 {
2201  if (fOwn && fMem) {
2202  free(fMem);
2203  fgBuffMem -= fSiz;
2204  }
2205 }
2206 
2207 ////////////////////////////////////////////////////////////////////////////////
2208 ///resize socket buffer
2209 
2211 {
2212  if (sz > fSiz) {
2213  if ((fMem = (Char_t *)realloc(fMem, sz))) {
2214  fgBuffMem += (sz - fSiz);
2215  fBuf = fMem;
2216  fSiz = sz;
2217  fLen = 0;
2218  }
2219  }
2220 }
2221 
2222 //_____________________________________________________________________________
2223 //
2224 // TXSockBuf static methods
2225 //
2226 
2227 ////////////////////////////////////////////////////////////////////////////////
2228 /// Return the currently allocated memory
2229 
2231 {
2232  return fgBuffMem;
2233 }
2234 
2235 ////////////////////////////////////////////////////////////////////////////////
2236 /// Return the max allocated memory allowed
2237 
2239 {
2240  return fgMemMax;
2241 }
2242 
2243 ////////////////////////////////////////////////////////////////////////////////
2244 /// Return the max allocated memory allowed
2245 
2247 {
2248  fgMemMax = memmax > 0 ? memmax : fgMemMax;
2249 }
2250 
2251 //_____________________________________________________________________________
2252 //
2253 // TXSockPipe
2254 //
2255 
2256 ////////////////////////////////////////////////////////////////////////////////
2257 /// Constructor
2258 
2259 TXSockPipe::TXSockPipe(const char *loc) : fLoc(loc)
2260 {
2261  // Create the pipe
2262  if (pipe(fPipe) != 0) {
2263  Printf("TXSockPipe: problem initializing pipe for socket inputs");
2264  fPipe[0] = -1;
2265  fPipe[1] = -1;
2266  return;
2267  }
2268 }
2269 
2270 ////////////////////////////////////////////////////////////////////////////////
2271 /// Destructor
2272 
2274 {
2275  if (fPipe[0] >= 0) close(fPipe[0]);
2276  if (fPipe[1] >= 0) close(fPipe[1]);
2277 }
2278 
2279 
2280 ////////////////////////////////////////////////////////////////////////////////
2281 /// Write a byte to the global pipe to signal new availibility of
2282 /// new messages
2283 
2285 {
2286  if (!IsValid() || !s) return -1;
2287 
2288  // This must be an atomic action
2289  Int_t sz = 0;
2290  { std::lock_guard<std::recursive_mutex> lock(fMutex);
2291  // Add this one
2292  fReadySock.Add(s);
2293 
2294  // Only one char
2295  Char_t c = 1;
2296  if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
2297  Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
2298  return -1;
2299  }
2300  if (gDebug > 2) sz = fReadySock.GetSize();
2301  }
2302 
2303  if (gDebug > 2)
2304  Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2305  fLoc.Data(), s, sz, fPipe[1]);
2306  // We are done
2307  return 0;
2308 }
2309 
2310 ////////////////////////////////////////////////////////////////////////////////
2311 /// Read a byte to the global pipe to synchronize message pickup
2312 
2314 {
2315  // Pipe must have been created
2316  if (!IsValid() || !s) return -1;
2317 
2318  // Only one char
2319  Int_t sz = 0;
2320  Char_t c = 0;
2321  { std::lock_guard<std::recursive_mutex> lock(fMutex);
2322  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
2323  Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
2324  return -1;
2325  }
2326  // Remove this one
2327  fReadySock.Remove(s);
2328 
2329  if (gDebug > 2) sz = fReadySock.GetSize();
2330  }
2331 
2332  if (gDebug > 2)
2333  Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2334  fLoc.Data(), s, sz, fPipe[0]);
2335 
2336  // We are done
2337  return 0;
2338 }
2339 
2340 ////////////////////////////////////////////////////////////////////////////////
2341 /// Remove any reference to socket 's' from the global pipe and
2342 /// ready-socket queue
2343 
2345 {
2346  // Pipe must have been created
2347  if (!IsValid() || !s) return -1;
2348 
2349  TObject *o = 0;
2350  // This must be an atomic action
2351  { std::lock_guard<std::recursive_mutex> lock(fMutex);
2352  o = fReadySock.FindObject(s);
2353 
2354  while (o) {
2355  // Remove from the list
2356  fReadySock.Remove(s);
2357  o = fReadySock.FindObject(s);
2358  // Remove one notification from the pipe
2359  Char_t c = 0;
2360  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
2361  Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
2362  }
2363  }
2364  // Flush also the socket
2365  ((TXSocket *)s)->Flush();
2366 
2367  // Notify
2368  if (gDebug > 0)
2369  Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
2370 
2371  // We are done
2372  return 0;
2373 }
2374 
2375 ////////////////////////////////////////////////////////////////////////////////
2376 /// Dump content of the ready socket list
2377 
2379 {
2380  std::lock_guard<std::recursive_mutex> lock(fMutex);
2381 
2382  TString buf = Form("%d |", fReadySock.GetSize());
2383  TIter nxs(&fReadySock);
2384  TObject *o = 0;
2385  while ((o = nxs()))
2386  buf += Form(" %p",o);
2387  Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
2388 }
2389 
2390 ////////////////////////////////////////////////////////////////////////////////
2391 /// Return last ready socket
2392 
2394 {
2395  std::lock_guard<std::recursive_mutex> lock(fMutex);
2396 
2397  return (TXSocket *) fReadySock.Last();
2398 }
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Int_t GetOpenError() const
Getter for last error.
Definition: TXSocket.cxx:927
Bool_t IsReading() const
Definition: TBuffer.h:81
double read(const std::string &file_name)
reading
Int_t fSocket
Definition: TSocket.h:84
Int_t fCid
Definition: TXSocket.h:246
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define kXPD_querynum
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:242
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:714
Int_t fTcpWindowSize
Definition: TSocket.h:85
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition: TSocket.cxx:932
virtual void SetClientID(Int_t)
Definition: TXSocket.h:166
Int_t GetCompressionLevel() const
Definition: TSocket.h:195
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:847
TSemaphore fAsynProc
Definition: TXSocket.h:93
char * CompBuffer() const
Definition: TMessage.h:88
long long Long64_t
Definition: RtypesCore.h:69
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Definition: TXSocket.cxx:1218
kXR_unt16 fStreamid
Definition: XrdProofConn.h:63
#define kXPD_TopMaster
TXSocket(const char *url, Char_t mode='M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Definition: TXSocket.cxx:127
TSemaphore fASem
Definition: TXSocket.h:85
void SetLoc(const char *loc="")
Definition: TXSocket.h:282
int GetServType() const
Definition: XrdProofConn.h:133
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:520
Int_t fPipe[2]
Definition: TXSocket.h:286
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
Collectable string class.
Definition: TObjString.h:28
virtual void Close(Option_t *opt="")
Close connection.
Definition: TXSocket.cxx:311
int GetLogConnID() const
Definition: XrdProofConn.h:130
const char Option_t
Definition: RtypesCore.h:62
Int_t fLogLevel
Definition: TXSocket.h:76
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition: TSocket.cxx:979
std::recursive_mutex fMutex
Definition: TXSocket.h:285
static Long64_t BuffMem()
Return the currently allocated memory.
Definition: TXSocket.cxx:2230
const Ssiz_t kNPOS
Definition: RtypesCore.h:115
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Definition: TUrl.h:35
XrdProofConn * fConn
Definition: TXSocket.h:82
char fMode
Definition: TXSocket.h:69
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
Definition: TSemaphore.cxx:87
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition: TXSocket.cxx:368
struct XPClientInterruptRequest interrupt
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:585
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
~TXSockBuf()
destructor
Definition: TXSocket.cxx:2199
virtual const char * HomeDirectory(const char *userName=0)
Return the user&#39;s home directory.
Definition: TSystem.cxx:884
std::recursive_mutex fIMtx
Definition: TXSocket.h:96
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
#define gROOT
Definition: TROOT.h:375
#define NAME_KEEPSOCKOPENIFNOTXRD
XrdOucTrace * XrdProofdTrace
Definition: TXSocket.cxx:55
#define NAME_SOCKS4HOST
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
Definition: TXHandler.cxx:37
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition: TSocket.cxx:650
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TXSocket()
Destructor.
Definition: TXSocket.cxx:231
void SetInterrupt(Bool_t i=kTRUE)
Definition: TXSocket.cxx:1677
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server&#39;s response.
TInetAddress fAddress
Definition: TSocket.h:74
#define kXPD_async
#define NAME_DEBUG
#define NAME_FIRSTCONNECTMAXCNT
void SetSessionID(Int_t id)
Set session ID to &#39;id&#39;. If id < 0, disable also the asynchronous handler.
Definition: TXSocket.cxx:256
static std::mutex fgSMtx
Definition: TXSocket.h:116
#define malloc
Definition: civetweb.c:818
TXSocket * GetLastReady()
Return last ready socket.
Definition: TXSocket.cxx:2393
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Definition: TXSocket.cxx:1347
Int_t GetServType() const
Getter for server type.
Definition: TXSocket.cxx:935
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2284
ESendRecvOptions
Definition: TSocket.h:49
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:501
static Long64_t fgBuffMem
Definition: TXSocket.h:259
Int_t Length() const
Definition: TBuffer.h:94
int GetOpenError() const
Definition: XrdProofConn.h:132
void SetInterrupt()
Interrupt the underlying socket.
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
Definition: TXSocket.cxx:2313
static Long64_t fgMemMax
Definition: TXSocket.h:260
struct ClientRequestHdr header
const char * GetHost() const
Definition: TUrl.h:70
#define SafeDelete(p)
Definition: RConfig.h:499
Bool_t fAWait
Definition: TXSocket.h:87
UShort_t net2host(UShort_t x)
Definition: Bytes.h:577
Int_t fPort
Definition: TXSocket.h:74
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TXSocket.cxx:1581
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2345
static void InitEnvs()
Init environment variables for XrdClient.
Definition: TXSocket.cxx:1984
#define kXPD_setidle
#define kXPD_process
TString fUrl
Definition: TSocket.h:86
virtual Bool_t Notify()
This method must be overridden to handle object notification.
Definition: TObject.cxx:499
const char *const kPROOF_WorkerIdleTO
Definition: TProof.h:139
char * Buffer() const
Definition: TBuffer.h:91
static std::list< TXSockBuf * > fgSQue
Definition: TXSocket.h:117
Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TXSocket.cxx:1689
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition: TXSocket.cxx:972
Int_t CompLength() const
Definition: TMessage.h:89
Bool_t fIForward
Definition: TXSocket.h:98
TXHandler * fHandler
Definition: TXSocket.h:80
const char * GetLastErr()
Definition: XrdProofConn.h:136
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1634
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
#define DFLT_RECONNECTWAIT
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Definition: TXSocket.cxx:1512
#define NAME_RECONNECTWAIT
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition: TXSocket.cxx:71
Bool_t IsServProofd()
Return kTRUE if the remote server is a &#39;proofd&#39;.
Definition: TXSocket.cxx:959
#define kXPD_internal
#define realloc
Definition: civetweb.c:820
TString fUser
Definition: TXSocket.h:72
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition: TSystem.cxx:2267
int fRemoteProtocol
Definition: XrdProofConn.h:64
struct XPClientSendRcvRequest sendrcv
kXR_unt16 HeaderSID()
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Definition: TXSocket.cxx:1053
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
Definition: TSemaphore.cxx:35
Int_t fByteCur
Definition: TXSocket.h:90
Int_t Flush()
Flush the asynchronous queue.
Definition: TXSocket.cxx:1004
Bool_t IsInterrupt()
Definition: TXSocket.h:203
TString fHost
Definition: TXSocket.h:73
virtual Int_t GetClientID() const
Definition: TXSocket.h:155
#define NAME_CONNECTDOMAINALLOW_RE
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
Definition: TXSocket.cxx:2187
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition: TSystem.cxx:1618
virtual Bool_t HandleInput(const void *in=0)
Handler of asynchronous input events.
Definition: TXHandler.cxx:28
UnsolRespProcResult
void SetAWait(Bool_t w=kTRUE)
Definition: TXSocket.h:207
R__EXTERN TSystem * gSystem
Definition: TSystem.h:539
Handler of asynchronous events for XProofD sockets.
Definition: TXHandler.h:28
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:482
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:679
Int_t fXrdProofdVersion
Definition: TXSocket.h:108
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type &#39;type&#39; into the read messages queue.
Definition: TXSocket.cxx:848
High level handler of connections to XProofD.
Definition: TXSocket.h:59
#define XrdSysError
Definition: XpdSysError.h:8
Int_t GetLowSocket() const
Definition: TXSocket.cxx:1977
#define EnvPutString(name, val)
Definition: XrdClientEnv.hh:46
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Definition: TXSocket.cxx:1275
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
UInt_t fBytesSent
Definition: TSocket.h:76
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:873
char * Form(const char *fmt,...)
TString fBuffer
Definition: TXSocket.h:78
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
std::recursive_mutex fAMtx
Definition: TXSocket.h:86
#define TRACE_REQ
std::list< TXSockBuf * > fAQue
Definition: TXSocket.h:88
#define TRACE_ALL
Int_t Post()
Increment the value of the semaphore.
Definition: TSemaphore.cxx:103
Int_t Flush(TSocket *s)
Remove any reference to socket &#39;s&#39; from the global pipe and ready-socket queue.
Definition: TXSocket.cxx:2344
static TString fgLoc
Definition: TXSocket.h:112
Char_t * fBuf
Definition: TXSocket.h:244
void Touch()
Definition: TSocket.h:171
TString fLoc
Definition: TXSocket.h:287
XrdOucString fUser
Definition: XrdProofConn.h:69
TList fReadySock
Definition: TXSocket.h:288
Int_t fSiz
Definition: TXSocket.h:242
#define kXPD_AnyServer
#define kXPD_logmsg
Bool_t IsValid() const
Definition: TXSocket.h:272
#define Printf
Definition: TGeoToOCC.h:18
XrdClientUrlInfo fUrl
Definition: XrdProofConn.h:92
Int_t fLen
Definition: TXSocket.h:243
virtual ~TXSockPipe()
Destructor.
Definition: TXSocket.cxx:2273
virtual Int_t Reconnect()
Try reconnection after failure.
Definition: TXSocket.cxx:2133
XrdOucString GetUrl()
const Bool_t kFALSE
Definition: RtypesCore.h:92
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition: TXSocket.cxx:268
UInt_t What() const
Definition: TMessage.h:74
static XrdSysError eDest(0, "Proofx")
#define TRACE_DBG
int Ssiz_t
Definition: RtypesCore.h:63
#define NAME_CONNECTDOMAINDENY_RE
TXSockBuf * fBufCur
Definition: TXSocket.h:91
int GetLowSocket()
Return the socket descriptor of the underlying connection.
Short_t fSessionID
Definition: TXSocket.h:71
XrdOucString fHost
Definition: XrdProofConn.h:70
#define NAME_CONNECTTIMEOUT
#define NAME_SOCKS4PORT
Bool_t IsValid() const
Definition: TXSocket.h:300
Int_t fCompress
Definition: TSocket.h:77
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:170
unsigned long long ULong64_t
Definition: RtypesCore.h:70
#define free
Definition: civetweb.c:821
#define kXPD_fb_prog
static XrdSysLogger eLogger
Definition: TXSocket.cxx:56
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
Definition: XrdProofConn.h:83
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
void forward(const LAYERDATA &prevLayerData, LAYERDATA &currLayerData)
apply the weights (and functions) in forward direction of the DNN
Definition: NeuralNet.icc:544
Bool_t fDontTimeout
Definition: TXSocket.h:104
TXSockPipe(const char *loc="")
Constructor.
Definition: TXSocket.cxx:2259
XrdOucString fLastErrMsg
Definition: XrdProofConn.h:72
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TXSocket.cxx:1774
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
Int_t GetCompressionLevel() const
Definition: TMessage.h:106
kXR_int32 fSendOpt
Definition: TXSocket.h:70
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
Int_t GetPort() const
Definition: TUrl.h:81
Bool_t fRDInterrupt
Definition: TXSocket.h:105
EServiceType fServType
Definition: TSocket.h:83
void DumpReadySock()
Dump content of the ready socket list.
Definition: TXSocket.cxx:2378
TObject * fReference
Definition: TXSocket.h:79
Mother of all ROOT objects.
Definition: TObject.h:37
static ULong64_t fgBytesSent
Definition: TSocket.h:94
char Char_t
Definition: RtypesCore.h:29
#define NAME_REQUESTTIMEOUT
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:188
void PushBackSpare()
Release read buffer giving back to the spare list.
Definition: TXSocket.cxx:1560
Bool_t IsValid() const
Getter for validity status.
Definition: TXSocket.cxx:951
kXR_int32 fILev
Definition: TXSocket.h:97
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition: TSocket.cxx:685
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
Definition: TError.cxx:202
virtual void Add(TObject *obj)
Definition: TList.h:77
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Definition: TXSocket.cxx:1421
static Bool_t fgInitDone
Definition: TXSocket.h:113
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
Definition: TXSocket.cxx:1825
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; &#39;type&#39; specifies the type of the message (see TXSocket::EUrgentMs...
Definition: TXSocket.cxx:1942
you should not use this method at all Int_t Int_t Double_t Double_t Double_t Int_t Double_t Double_t Double_t Double_t b
Definition: TRolke.cxx:630
static TXSockPipe fgPipe
Definition: TXSocket.h:111
static ULong64_t fgBytesRecv
Definition: TSocket.h:93
R__EXTERN Int_t gDebug
Definition: Rtypes.h:83
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
short GetSessionID() const
Definition: XrdProofConn.h:134
Int_t fRemoteProtocol
Definition: TSocket.h:79
bool IsValid() const
Test validity of this connection.
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:283
virtual Int_t GetClientIDSize() const
Definition: TXSocket.h:156
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2246
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
Definition: TXSocket.cxx:1638
Int_t fPid
Definition: TXSocket.h:101
bool MatchStreamid(short sid)
virtual Int_t GetSize() const
Definition: TCollection.h:89
Int_t fByteLeft
Definition: TXSocket.h:89
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Definition: TXSocket.cxx:905
const Bool_t kTRUE
Definition: RtypesCore.h:91
const Int_t n
Definition: legend1.C:16
void Resize(Int_t sz)
resize socket buffer
Definition: TXSocket.cxx:2210
void CtrlC()
Interrupt the remote protocol instance.
Definition: TXSocket.cxx:1385
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:859
Int_t GetSessionID() const
Getter for session ID.
Definition: TXSocket.cxx:943
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
Int_t GetLogConnID() const
Getter for logical connection ID.
Definition: TXSocket.cxx:919
UInt_t fBytesRecv
Definition: TSocket.h:75
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2238