ROOT  6.06/09
Reference Guide
TXSocket.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TXSocket //
15 // //
16 // High level handler of connections to xproofd. //
17 // //
18 //////////////////////////////////////////////////////////////////////////
19 
20 #include "MessageTypes.h"
21 #include "TEnv.h"
22 #include "TError.h"
23 #include "TException.h"
24 #include "TMonitor.h"
25 #include "TObjString.h"
26 #include "TProof.h"
27 #include "TSlave.h"
28 #include "TRegexp.h"
29 #include "TROOT.h"
30 #include "TUrl.h"
31 #include "TXHandler.h"
32 #include "TXSocket.h"
33 #include "XProofProtocol.h"
34 
35 #include "XrdProofConn.h"
36 
38 #include "XrdClient/XrdClientConst.hh"
39 #include "XrdClient/XrdClientEnv.hh"
42 
43 #ifndef WIN32
44 #include <sys/socket.h>
45 #else
46 #include <Winsock2.h>
47 #endif
48 
49 
50 #include "XpdSysError.h"
51 #include "XpdSysLogger.h"
52 
53 // ---- Tracing utils ----------------------------------------------------------
54 #include "XrdProofdTrace.h"
55 XrdOucTrace *XrdProofdTrace = 0;
57 static XrdSysError eDest(0, "Proofx");
58 
59 #ifdef WIN32
62 #endif
63 
64 //______________________________________________________________________________
65 
66 //---- error handling ----------------------------------------------------------
67 
68 ////////////////////////////////////////////////////////////////////////////////
69 /// Interface to ErrorHandler (protected).
70 
71 void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
72 {
73  ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
74 }
75 
76 //----- Ping handler -----------------------------------------------------------
77 ////////////////////////////////////////////////////////////////////////////////
78 
79 class TXSocketPingHandler : public TFileHandler {
80  TXSocket *fSocket;
81 public:
82  TXSocketPingHandler(TXSocket *s, Int_t fd)
83  : TFileHandler(fd, 1) { fSocket = s; }
84  Bool_t Notify();
85  Bool_t ReadNotify() { return Notify(); }
86 };
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 /// Ping the socket
90 
92 {
93  fSocket->Ping("ping handler");
94 
95  return kTRUE;
96 }
97 
98 // Env variables init flag
99 Bool_t TXSocket::fgInitDone = kFALSE;
100 
101 // Static variables for input notification
102 TXSockPipe TXSocket::fgPipe; // Pipe for input monitoring
103 TString TXSocket::fgLoc = "undef"; // Location string
104 
105 // Static buffer manager
106 TMutex TXSocket::fgSMtx; // To protect spare list
107 std::list<TXSockBuf *> TXSocket::fgSQue; // list of spare buffers
108 Long64_t TXSockBuf::fgBuffMem = 0; // Total allocated memory
109 Long64_t TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 MB]
110 
111 ////////////////////////////////////////////////////////////////////////////////
112 /// Constructor
113 /// Open the connection to a remote XrdProofd instance and start a PROOF
114 /// session.
115 /// The mode 'm' indicates the role of this connection:
116 /// 'a' Administrator; used by an XPD to contact the head XPD
117 /// 'i' Internal; used by a TXProofServ to call back its creator
118 /// (see XrdProofUnixConn)
119 /// 'C' PROOF manager: open connection only (do not start a session)
120 /// 'M' Client creating a top master
121 /// 'A' Client attaching to top master
122 /// 'm' Top master creating a submaster
123 /// 's' Master creating a slave
124 /// The buffer 'logbuf' is a null terminated string to be sent over at
125 /// login.
126 
127 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
128  const char *logbuf, Int_t loglevel, TXHandler *handler)
129  : TSocket(), fMode(m), fLogLevel(loglevel),
130  fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131  fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
132 {
133  fUrl = url;
134  // Enable tracing in the XrdProof client. if not done already
135  eDest.logger(&eLogger);
136  if (!XrdProofdTrace)
137  XrdProofdTrace = new XrdOucTrace(&eDest);
138 
139  // Init envs the first time
140  if (!fgInitDone)
141  InitEnvs();
142 
143  // Async queue related stuff
144  if (!(fAMtx = new TMutex(kTRUE))) {
145  Error("TXSocket", "problems initializing mutex for async queue");
146  return;
147  }
148  fAQue.clear();
149 
150  // Interrupts queue related stuff
151  if (!(fIMtx = new TMutex(kTRUE))) {
152  Error("TXSocket", "problems initializing mutex for interrupts");
153  return;
154  }
155  fILev = -1;
156  fIForward = kFALSE;
157 
158  // Init some variables
159  fByteLeft = 0;
160  fByteCur = 0;
161  fBufCur = 0;
162  fServType = kPROOFD; // for consistency
163  fTcpWindowSize = -1;
164  fRemoteProtocol = -1;
165  // By default forward directly to end-point
166  fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
167  fSessionID = (fMode == 'C') ? -1 : psid;
168  fSocket = -1;
169 
170  // This is used by external code to create a link between this object
171  // and another one
172  fReference = 0;
173 
174  // The global pipe
175  if (!fgPipe.IsValid()) {
176  Error("TXSocket", "internal pipe is invalid");
177  return;
178  }
179 
180  // Some initial values
181  TUrl u(url);
183  u.SetProtocol("proof", kTRUE);
184  fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
185 
186  // Set the asynchronous handler
187  fHandler = handler;
188 
189  if (url) {
190 
191  // Create connection (for managers the type of the connection is the same
192  // as for top masters)
193  char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
194  fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
195  if (!fConn || !(fConn->IsValid())) {
197  if (gDebug > 0)
198  Error("TXSocket", "fatal error occurred while opening a connection"
199  " to server [%s]: %s", url, fConn->GetLastErr());
200  return;
201  }
202 
203  // Fill some info
204  fUser = fConn->fUser.c_str();
205  fHost = fConn->fHost.c_str();
206  fPort = fConn->fPort;
207 
208  // Create new proofserv if not client manager or administrator or internal mode
209  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A'|| fMode == 'L') {
210  // We attach or create
211  if (!Create()) {
212  // Failure
213  Error("TXSocket", "create or attach failed (%s)",
214  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
215  Close();
216  return;
217  }
218  }
219 
220  // Fill some other info available if Create is successful
221  if (fMode == 'C') {
224  }
225 
226  // Also in the base class
227  fUrl = fConn->fUrl.GetUrl().c_str();
228  fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
229  fAddress.fPort = fPort;
230 
231  // This is needed for the reader thread to signal an interrupt
232  fPid = gSystem->GetPid();
233  }
234 }
235 
236 ////////////////////////////////////////////////////////////////////////////////
237 /// Destructor
238 
240 {
241  // Disconnect from remote server (the connection manager is
242  // responsible of the underlying physical connection, so we do not
243  // force its closing)
244  Close();
245 
246  // Delete mutexes
247  SafeDelete(fAMtx);
248  SafeDelete(fIMtx);
249 }
250 
251 ////////////////////////////////////////////////////////////////////////////////
252 /// Set location string
253 
254 void TXSocket::SetLocation(const char *loc)
255 {
256  if (loc) {
257  fgLoc = loc;
258  fgPipe.SetLoc(loc);
259  } else {
260  fgLoc = "";
261  fgPipe.SetLoc("");
262  }
263 }
264 
265 ////////////////////////////////////////////////////////////////////////////////
266 /// Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
267 
269 {
270  if (id < 0 && fConn)
271  fConn->SetAsync(0);
272  fSessionID = id;
273 }
274 
275 ////////////////////////////////////////////////////////////////////////////////
276 /// Disconnect a session. Use opt= "S" or "s" to
277 /// shutdown remote session.
278 /// Default is opt = "".
279 
281 {
282  // Make sure we are connected
283  if (!IsValid()) {
284  if (gDebug > 0)
285  Info("DisconnectSession","not connected: nothing to do");
286  return;
287  }
288 
289  Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
290  Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
291 
292  if (id > -1 || all) {
293  // Prepare request
294  XPClientRequest Request;
295  memset(&Request, 0, sizeof(Request) );
296  fConn->SetSID(Request.header.streamid);
297  if (shutdown)
298  Request.proof.requestid = kXP_destroy;
299  else
300  Request.proof.requestid = kXP_detach;
301  Request.proof.sid = id;
302 
303  // Send request
304  XrdClientMessage *xrsp =
305  fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
306 
307  // Print error msg, if any
308  if (!xrsp && fConn->GetLastErr())
309  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
310 
311  // Cleanup
312  SafeDelete(xrsp);
313  }
314 }
315 
316 ////////////////////////////////////////////////////////////////////////////////
317 /// Close connection. Available options are (case insensitive)
318 /// 'P' force closing of the underlying physical connection
319 /// 'S' shutdown remote session, is any
320 /// A session ID can be given using #...# signature, e.g. "#1#".
321 /// Default is opt = "".
322 
324 {
325  Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
326  if (fAsynProc.Wait(to*1000) != 0)
327  Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
328 
329  // Remove any reference in the global pipe and ready-sock queue
330  TXSocket::fgPipe.Flush(this);
331 
332  // Make sure we have a connection
333  if (!fConn) {
334  if (gDebug > 0)
335  Info("Close","no connection: nothing to do");
336  fAsynProc.Post();
337  return;
338  }
339 
340  // Disconnect the asynchronous requests handler
341  fConn->SetAsync(0);
342 
343  // If we are connected we disconnect
344  if (IsValid()) {
345 
346  // Parse options
347  TString o(opt);
348  Int_t sessID = fSessionID;
349  if (o.Index("#") != kNPOS) {
350  o.Remove(0,o.Index("#")+1);
351  if (o.Index("#") != kNPOS) {
352  o.Remove(o.Index("#"));
353  sessID = o.IsDigit() ? o.Atoi() : sessID;
354  }
355  }
356 
357  if (sessID > -1) {
358  // Warn the remote session, if any (after destroy the session is gone)
359  DisconnectSession(sessID, opt);
360  } else {
361  // We are the manager: close underlying connection
362  fConn->Close(opt);
363  }
364  }
365 
366  // Delete the connection module
367  SafeDelete(fConn);
368 
369  // Post semaphore
370  fAsynProc.Post();
371 }
372 
373 ////////////////////////////////////////////////////////////////////////////////
374 /// We are here if an unsolicited response comes from a logical conn
375 /// The response comes in the form of an XrdClientMessage *, that must NOT be
376 /// destroyed after processing. It is destroyed by the first sender.
377 /// Remember that we are in a separate thread, since unsolicited
378 /// responses are asynchronous by nature.
379 
380 UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
382 {
383  UnsolRespProcResult rc = kUNSOL_KEEP;
384 
385  // If we are closing we will not do anything
387  if (!semg.IsValid()) {
388  Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
389  return kUNSOL_CONTINUE;
390  }
391 
392  if (!m) {
393  if (gDebug > 2)
394  Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
395  // Some one is perhaps interested in empty messages
396  return kUNSOL_CONTINUE;
397  } else {
398  if (gDebug > 2)
399  Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
400  this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
401  }
402 
403  // Error notification
404  if (m->IsError()) {
406  if (gDebug > 0)
407  Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
408  XHandleErr_t herr = {1, 0};
409  if (!fHandler || fHandler->HandleError((const void *)&herr)) {
410  if (gDebug > 0)
411  Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
412  // Avoid to contact the server any more
413  fSessionID = -1;
414  } else {
415  // Connection still usable: update usage timestamp
416  Touch();
417  }
418  } else {
419  // Time out
420  if (gDebug > 2)
421  Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
422  }
423  // Propagate the message to other possible handlers
424  return kUNSOL_CONTINUE;
425  }
426 
427  // From now on make sure is for us (but only if not during setup, i.e. fConn == 0; otherwise
428  // we may miss some important server message)
429  if (fConn && !m->MatchStreamid(fConn->fStreamid)) {
430  if (gDebug > 1)
431  Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
432  return kUNSOL_CONTINUE;
433  }
434 
435  // Local processing ...
436  Int_t len = 0;
437  if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
438  Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
440  return rc;
441  }
442 
443  // Activity on the line: update usage timestamp
444  Touch();
445 
446  // The first 4 bytes contain the action code
447  kXR_int32 acod = 0;
448  memcpy(&acod, m->GetData(), sizeof(kXR_int32));
449  if (acod > 10000)
450  Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
451  this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
452  //
453  // Update pointer to data
454  void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
455  len -= sizeof(kXR_int32);
456  if (gDebug > 1)
457  Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
458  this, acod, len, m->HeaderSID());
459 
460  if (gDebug > 3)
462 
463  // Case by case
464  kXR_int32 ilev = -1;
465  const char *lab = 0;
466 
467  switch (acod) {
468  case kXPD_ping:
469  //
470  // Special interrupt
471  ilev = TProof::kPing;
472  lab = "kXPD_ping";
473  case kXPD_interrupt:
474  //
475  // Interrupt
476  lab = !lab ? "kXPD_interrupt" : lab;
477  { R__LOCKGUARD(fIMtx);
478  if (acod == kXPD_interrupt) {
479  memcpy(&ilev, pdata, sizeof(kXR_int32));
480  ilev = net2host(ilev);
481  // Update pointer to data
482  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
483  len -= sizeof(kXR_int32);
484  }
485  // The next 4 bytes contain the forwarding option
486  kXR_int32 ifw = 0;
487  if (len > 0) {
488  memcpy(&ifw, pdata, sizeof(kXR_int32));
489  ifw = net2host(ifw);
490  if (gDebug > 1)
491  Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
492  }
493  //
494  // Save the interrupt
495  fILev = ilev;
496  fIForward = (ifw == 1) ? kTRUE : kFALSE;
497 
498  // Handle this input in this thread to avoid queuing on the
499  // main thread
500  XHandleIn_t hin = {acod, 0, 0, 0};
501  if (fHandler)
502  fHandler->HandleInput((const void *)&hin);
503  else
504  Error("ProcessUnsolicitedMsg","handler undefined");
505  }
506  break;
507  case kXPD_timer:
508  //
509  // Set shutdown timer
510  {
511  kXR_int32 opt = 1;
512  kXR_int32 delay = 0;
513  // The next 4 bytes contain the shutdown option
514  if (len > 0) {
515  memcpy(&opt, pdata, sizeof(kXR_int32));
516  opt = net2host(opt);
517  if (gDebug > 1)
518  Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
519  // Update pointer to data
520  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
521  len -= sizeof(kXR_int32);
522  }
523  // The next 4 bytes contain the delay
524  if (len > 0) {
525  memcpy(&delay, pdata, sizeof(kXR_int32));
526  delay = net2host(delay);
527  if (gDebug > 1)
528  Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
529  // Update pointer to data
530  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
531  len -= sizeof(kXR_int32);
532  }
533 
534  // Handle this input in this thread to avoid queuing on the
535  // main thread
536  XHandleIn_t hin = {acod, opt, delay, 0};
537  if (fHandler)
538  fHandler->HandleInput((const void *)&hin);
539  else
540  Error("ProcessUnsolicitedMsg","handler undefined");
541  }
542  break;
543  case kXPD_inflate:
544  //
545  // Set inflate factor
546  {
547  kXR_int32 inflate = 1000;
548  if (len > 0) {
549  memcpy(&inflate, pdata, sizeof(kXR_int32));
550  inflate = net2host(inflate);
551  if (gDebug > 1)
552  Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
553  // Update pointer to data
554  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
555  len -= sizeof(kXR_int32);
556  }
557  // Handle this input in this thread to avoid queuing on the
558  // main thread
559  XHandleIn_t hin = {acod, inflate, 0, 0};
560  if (fHandler)
561  fHandler->HandleInput((const void *)&hin);
562  else
563  Error("ProcessUnsolicitedMsg","handler undefined");
564  }
565  break;
566  case kXPD_priority:
567  //
568  // Broadcast group priority
569  {
570  kXR_int32 priority = -1;
571  if (len > 0) {
572  memcpy(&priority, pdata, sizeof(kXR_int32));
573  priority = net2host(priority);
574  if (gDebug > 1)
575  Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
576  // Update pointer to data
577  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
578  len -= sizeof(kXR_int32);
579  }
580  // Handle this input in this thread to avoid queuing on the
581  // main thread
582  XHandleIn_t hin = {acod, priority, 0, 0};
583  if (fHandler)
584  fHandler->HandleInput((const void *)&hin);
585  else
586  Error("ProcessUnsolicitedMsg","handler undefined");
587  }
588  break;
589  case kXPD_flush:
590  //
591  // Flush request
592  {
593  // Handle this input in this thread to avoid queuing on the
594  // main thread
595  XHandleIn_t hin = {acod, 0, 0, 0};
596  if (fHandler)
597  fHandler->HandleInput((const void *)&hin);
598  else
599  Error("ProcessUnsolicitedMsg","handler undefined");
600  }
601  break;
602  case kXPD_urgent:
603  //
604  // Set shutdown timer
605  {
606  // The next 4 bytes contain the urgent msg type
607  kXR_int32 type = -1;
608  if (len > 0) {
609  memcpy(&type, pdata, sizeof(kXR_int32));
610  type = net2host(type);
611  if (gDebug > 1)
612  Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
613  // Update pointer to data
614  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
615  len -= sizeof(kXR_int32);
616  }
617  // The next 4 bytes contain the first info container
618  kXR_int32 int1 = -1;
619  if (len > 0) {
620  memcpy(&int1, pdata, sizeof(kXR_int32));
621  int1 = net2host(int1);
622  if (gDebug > 1)
623  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
624  // Update pointer to data
625  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
626  len -= sizeof(kXR_int32);
627  }
628  // The next 4 bytes contain the second info container
629  kXR_int32 int2 = -1;
630  if (len > 0) {
631  memcpy(&int2, pdata, sizeof(kXR_int32));
632  int2 = net2host(int2);
633  if (gDebug > 1)
634  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
635  // Update pointer to data
636  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
637  len -= sizeof(kXR_int32);
638  }
639 
640  // Handle this input in this thread to avoid queuing on the
641  // main thread
642  XHandleIn_t hin = {acod, type, int1, int2};
643  if (fHandler)
644  fHandler->HandleInput((const void *)&hin);
645  else
646  Error("ProcessUnsolicitedMsg","handler undefined");
647  }
648  break;
649  case kXPD_msg:
650  //
651  // Data message
652  { R__LOCKGUARD(fAMtx);
653 
654  // Get a spare buffer
655  TXSockBuf *b = PopUpSpare(len);
656  if (!b) {
657  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
658  return rc;
659  }
660  memcpy(b->fBuf, pdata, len);
661  b->fLen = len;
662 
663  // Update counters
664  fBytesRecv += len;
665 
666  // Produce the message
667  fAQue.push_back(b);
668 
669  // Post the global pipe
670  fgPipe.Post(this);
671 
672  // Signal it and release the mutex
673  if (gDebug > 2)
674  Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
675  this, GetTitle(), &fASem, len);
676  fASem.Post();
677  }
678 
679  break;
680  case kXPD_feedback:
681  Info("ProcessUnsolicitedMsg",
682  "kXPD_feedback treatment not yet implemented");
683  break;
684  case kXPD_srvmsg:
685  //
686  // Service message
687  {
688  // The next 4 bytes may contain a flag to control the way the message is displayed
689  kXR_int32 opt = 0;
690  memcpy(&opt, pdata, sizeof(kXR_int32));
691  opt = net2host(opt);
692  if (opt >= 0 && opt <= 4) {
693  // Update pointer to data
694  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
695  len -= sizeof(kXR_int32);
696  } else {
697  opt = 1;
698  }
699 
700  if (opt == 0) {
701  // One line
702  Printf("| %.*s", len, (char *)pdata);
703  } else if (opt == 2) {
704  // Raw displaying
705  Printf("%.*s", len, (char *)pdata);
706  } else if (opt == 3) {
707  // Incremental displaying
708  fprintf(stderr, "%.*s", len, (char *)pdata);
709  } else if (opt == 4) {
710  // Rewind
711  fprintf(stderr, "%.*s\r", len, (char *)pdata);
712  } else {
713  // A small header
714  Printf(" ");
715  Printf("| Message from server:");
716  Printf("| %.*s", len, (char *)pdata);
717  }
718  }
719  break;
720  case kXPD_errmsg:
721  //
722  // Error condition with message
723  Printf("\n\n");
724  Printf("| Error condition occured: message from server:");
725  Printf("| %.*s", len, (char *)pdata);
726  Printf("\n");
727  // Handle error
728  if (fHandler)
730  else
731  Error("ProcessUnsolicitedMsg","handler undefined");
732  break;
733  case kXPD_msgsid:
734  //
735  // Data message
736  { R__LOCKGUARD(fAMtx);
737 
738  // The next 4 bytes contain the sessiond id
739  kXR_int32 cid = 0;
740  memcpy(&cid, pdata, sizeof(kXR_int32));
741  cid = net2host(cid);
742 
743  if (gDebug > 1)
744  Info("ProcessUnsolicitedMsg","found cid: %d", cid);
745 
746  // Update pointer to data
747  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
748  len -= sizeof(kXR_int32);
749 
750  // Get a spare buffer
751  TXSockBuf *b = PopUpSpare(len);
752  if (!b) {
753  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
754  return rc;
755  }
756  memcpy(b->fBuf, pdata, len);
757  b->fLen = len;
758 
759  // Set the sid
760  b->fCid = cid;
761 
762  // Update counters
763  fBytesRecv += len;
764 
765  // Produce the message
766  fAQue.push_back(b);
767 
768  // Post the global pipe
769  fgPipe.Post(this);
770 
771  // Signal it and release the mutex
772  if (gDebug > 2)
773  Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
774  this, cid, &fASem, len);
775  fASem.Post();
776  }
777 
778  break;
779  case kXPD_wrkmortem:
780  //
781  // A worker died
782  { TString what = TString::Format("%.*s", len, (char *)pdata);
783  if (what.BeginsWith("idle-timeout")) {
784  // Notify the idle timeout
786  } else {
787  Printf(" ");
788  Printf("| %s", what.Data());
789  // Handle error
790  if (fHandler)
792  else
793  Error("ProcessUnsolicitedMsg","handler undefined");
794  }
795  }
796  break;
797 
798  case kXPD_touch:
799  //
800  // Request for remote touch: post a message to do that
802  break;
803  case kXPD_resume:
804  //
805  // process the next query (in the TXProofServ)
807  break;
808  case kXPD_clusterinfo:
809  //
810  // Broadcast cluster information
811  {
812  kXR_int32 nsess = -1, nacti = -1, neffs = -1;
813  if (len > 0) {
814  // Total sessions
815  memcpy(&nsess, pdata, sizeof(kXR_int32));
816  nsess = net2host(nsess);
817  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
818  len -= sizeof(kXR_int32);
819  // Active sessions
820  memcpy(&nacti, pdata, sizeof(kXR_int32));
821  nacti = net2host(nacti);
822  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
823  len -= sizeof(kXR_int32);
824  // Effective sessions
825  memcpy(&neffs, pdata, sizeof(kXR_int32));
826  neffs = net2host(neffs);
827  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
828  len -= sizeof(kXR_int32);
829  }
830  if (gDebug > 1)
831  Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
832  " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
833  // Handle this input in this thread to avoid queuing on the
834  // main thread
835  XHandleIn_t hin = {acod, nsess, nacti, neffs};
836  if (fHandler)
837  fHandler->HandleInput((const void *)&hin);
838  else
839  Error("ProcessUnsolicitedMsg","handler undefined");
840  }
841  break;
842  default:
843  Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
844  this, acod, GetTitle());
846  break;
847  }
848 
849  // We are done
850  return rc;
851 }
852 
853 ////////////////////////////////////////////////////////////////////////////////
854 /// Post a message of type 'type' into the read messages queue.
855 /// If 'msg' is defined it is also added as TString.
856 /// This is used, for example, with kPROOF_FATAL to force the main thread
857 /// to mark this socket as bad, avoiding race condition when a worker
858 /// dies while in processing state.
859 
860 void TXSocket::PostMsg(Int_t type, const char *msg)
861 {
862  // Create the message
863  TMessage m(type);
864 
865  // Add the string if any
866  if (msg && strlen(msg) > 0)
867  m << TString(msg);
868 
869  // Write length in first word of buffer
870  m.SetLength();
871 
872  // Get pointer to the message buffer
873  char *mbuf = m.Buffer();
874  Int_t mlen = m.Length();
875  if (m.CompBuffer()) {
876  mbuf = m.CompBuffer();
877  mlen = m.CompLength();
878  }
879 
880  //
881  // Data message
883 
884  // Get a spare buffer
885  TXSockBuf *b = PopUpSpare(mlen);
886  if (!b) {
887  Error("PostMsg", "could allocate spare buffer");
888  return;
889  }
890 
891  // Fill the pipe buffer
892  memcpy(b->fBuf, mbuf, mlen);
893  b->fLen = mlen;
894 
895  // Update counters
896  fBytesRecv += mlen;
897 
898  // Produce the message
899  fAQue.push_back(b);
900 
901  // Post the global pipe
902  fgPipe.Post(this);
903 
904  // Signal it and release the mutex
905  if (gDebug > 0)
906  Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
907  this, type, &fASem, mlen);
908  fASem.Post();
909 
910  // Done
911  return;
912 }
913 
914 ////////////////////////////////////////////////////////////////////////////////
915 /// Return kTRUE if the remote server is a 'proofd'
916 
918 {
920  return kTRUE;
921 
922  // Failure
923  return kFALSE;
924 }
925 
926 ////////////////////////////////////////////////////////////////////////////////
927 /// Get latest interrupt level and reset it; if the interrupt has to be
928 /// propagated to lower stages forward will be kTRUE after the call
929 
931 {
932  if (gDebug > 2)
933  Info("GetInterrupt","%p: waiting to lock mutex %p", this, fIMtx);
934 
936 
937  // Reset values
938  Int_t ilev = -1;
939  forward = kFALSE;
940 
941  // Check if filled
942  if (fILev == -1)
943  Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
944 
945  // Fill output
946  ilev = fILev;
947  forward = fIForward;
948 
949  // Reset values (we process it only once)
950  fILev = -1;
951  fIForward = kFALSE;
952 
953  // Return what we got
954  return ilev;
955 }
956 
957 ////////////////////////////////////////////////////////////////////////////////
958 /// Flush the asynchronous queue.
959 /// Typically called when a kHardInterrupt is received.
960 /// Returns number of bytes in flushed buffers.
961 
963 {
964  Int_t nf = 0;
965  list<TXSockBuf *> splist;
966  list<TXSockBuf *>::iterator i;
967 
968  { R__LOCKGUARD(fAMtx);
969 
970  // Must have something to flush
971  if (fAQue.size() > 0) {
972 
973  // Save size for later semaphore cleanup
974  Int_t sz = fAQue.size();
975  // get the highest interrupt level
976  for (i = fAQue.begin(); i != fAQue.end();) {
977  if (*i) {
978  splist.push_back(*i);
979  nf += (*i)->fLen;
980  i = fAQue.erase(i);
981  }
982  }
983 
984  // Reset the asynchronous queue
985  while (sz--) {
986  if (fASem.TryWait() == 1)
987  Printf("Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
988  }
989  fAQue.clear();
990  }
991  }
992 
993  // Move spares to the spare queue
994  { R__LOCKGUARD(&fgSMtx);
995  if (splist.size() > 0) {
996  for (i = splist.begin(); i != splist.end();) {
997  fgSQue.push_back(*i);
998  i = splist.erase(i);
999  }
1000  }
1001  }
1002 
1003  // We are done
1004  return nf;
1005 }
1006 
1007 ////////////////////////////////////////////////////////////////////////////////
1008 /// This method sends a request for creation of (or attachment to) a remote
1009 /// server application.
1010 
1012 {
1013  // Make sure we are connected
1014  if (!IsValid()) {
1015  if (gDebug > 0)
1016  Info("Create","not connected: nothing to do");
1017  return kFALSE;
1018  }
1019 
1020  Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
1021 
1022  while (retriesleft--) {
1023 
1024  XPClientRequest reqhdr;
1025 
1026  // We fill the header struct containing the request for login
1027  memset( &reqhdr, 0, sizeof(reqhdr));
1028  fConn->SetSID(reqhdr.header.streamid);
1029 
1030  // This will be a kXP_attach or kXP_create request
1031  if (fMode == 'A' || attach) {
1032  reqhdr.header.requestid = kXP_attach;
1033  reqhdr.proof.sid = fSessionID;
1034  } else {
1035  reqhdr.header.requestid = kXP_create;
1036  }
1037 
1038  // Send log level
1039  reqhdr.proof.int1 = fLogLevel;
1040 
1041  // Send also the chosen alias
1042  const void *buf = (const void *)(fBuffer.Data());
1043  reqhdr.header.dlen = fBuffer.Length();
1044  if (gDebug >= 2)
1045  Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
1046 
1047  // We call SendReq, the function devoted to sending commands.
1048  if (gDebug > 1)
1049  Info("Create", "creating session of server %s", fUrl.Data());
1050 
1051  // server response header
1052  char *answData = 0;
1053  XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
1054  &answData, "TXSocket::Create", 0);
1055  struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
1056 
1057  // If any, the URL the data pool entry point will be stored here
1058  fBuffer = "";
1059  if (xrsp) {
1060 
1061  //
1062  // Pointer to data
1063  void *pdata = (void *)(xrsp->GetData());
1064  Int_t len = xrsp->DataLen();
1065 
1066  if (len >= (Int_t)sizeof(kXR_int32)) {
1067  // The first 4 bytes contain the session ID
1068  kXR_int32 psid = 0;
1069  memcpy(&psid, pdata, sizeof(kXR_int32));
1070  fSessionID = net2host(psid);
1071  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1072  len -= sizeof(kXR_int32);
1073  } else {
1074  Error("Create","session ID is undefined!");
1075  fSessionID = -1;
1076  if (srvresp) free(srvresp);
1077  return kFALSE;
1078  }
1079 
1080  if (len >= (Int_t)sizeof(kXR_int16)) {
1081  // The second 2 bytes contain the remote PROOF protocol version
1082  kXR_int16 dver = 0;
1083  memcpy(&dver, pdata, sizeof(kXR_int16));
1084  fRemoteProtocol = net2host(dver);
1085  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1086  len -= sizeof(kXR_int16);
1087  } else {
1088  Warning("Create","protocol version of the remote PROOF undefined!");
1089  }
1090 
1091  if (fRemoteProtocol == 0) {
1092  // We are dealing with an older server: the PROOF protocol is on 4 bytes
1093  len += sizeof(kXR_int16);
1094  kXR_int32 dver = 0;
1095  memcpy(&dver, pdata, sizeof(kXR_int32));
1096  fRemoteProtocol = net2host(dver);
1097  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1098  len -= sizeof(kXR_int32);
1099  } else {
1100  if (len >= (Int_t)sizeof(kXR_int16)) {
1101  // The third 2 bytes contain the remote XrdProofdProtocol version
1102  kXR_int16 dver = 0;
1103  memcpy(&dver, pdata, sizeof(kXR_int16));
1104  fXrdProofdVersion = net2host(dver);
1105  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1106  len -= sizeof(kXR_int16);
1107  } else {
1108  Warning("Create","version of the remote XrdProofdProtocol undefined!");
1109  }
1110  }
1111 
1112  if (len > 0) {
1113  // From top masters, the url of the data pool
1114  char *url = new char[len+1];
1115  memcpy(url, pdata, len);
1116  url[len] = 0;
1117  fBuffer = url;
1118  delete[] url;
1119  }
1120 
1121  // Cleanup
1122  SafeDelete(xrsp);
1123  if (srvresp) free(srvresp);
1124 
1125  // Notify
1126  return kTRUE;
1127  } else {
1128  // Extract log file path, if any
1129  Ssiz_t ilog = kNPOS;
1130  if (retriesleft <= 0 && fConn->GetLastErr()) {
1131  fBuffer = fConn->GetLastErr();
1132  if ((ilog = fBuffer.Index("|log:")) != kNPOS) fBuffer.Remove(0, ilog);
1133  }
1134  // If not free resources now, just give up
1135  if (fConn->GetOpenError() == kXP_TooManySess) {
1136  // Avoid to contact the server any more
1137  fSessionID = -1;
1138  if (srvresp) free(srvresp);
1139  return kFALSE;
1140  } else {
1141  // Print error msg, if any
1142  if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr()) {
1143  TString emsg(fConn->GetLastErr());
1144  if ((ilog = emsg.Index("|log:")) != kNPOS) emsg.Remove(ilog);
1145  Printf("%s: %s", fHost.Data(), emsg.Data());
1146  }
1147  }
1148  }
1149 
1150  if (gDebug > 0)
1151  Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
1152  if (retriesleft <= 0)
1153  Error("Create", "%d creation/attachment attempts failed: no attempts left",
1154  gEnv->GetValue("XProof.CreationRetries", 4));
1155 
1156  if (srvresp) free(srvresp);
1157  } // Creation retries
1158 
1159  // The session is invalid: reset the sessionID to invalid state (it was our protocol
1160  // number during creation
1161  fSessionID = -1;
1162 
1163  // Notify failure
1164  Error("Create:",
1165  "problems creating or attaching to a remote server (%s)",
1166  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
1167  return kFALSE;
1168 }
1169 
1170 ////////////////////////////////////////////////////////////////////////////////
1171 /// Send a raw buffer of specified length.
1172 /// Use opt = kDontBlock to ask xproofd to push the message into the proofsrv.
1173 /// (by default is appended to a queue waiting for a request from proofsrv).
1174 /// Returns the number of bytes sent or -1 in case of error.
1175 
1177 {
1179 
1180  // Options and request ID
1181  fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
1182  : (~kXPD_async & fSendOpt) ;
1183 
1184  // Prepare request
1185  XPClientRequest Request;
1186  memset( &Request, 0, sizeof(Request) );
1187  fConn->SetSID(Request.header.streamid);
1188  Request.sendrcv.requestid = kXP_sendmsg;
1189  Request.sendrcv.sid = fSessionID;
1190  Request.sendrcv.opt = fSendOpt;
1191  Request.sendrcv.cid = GetClientID();
1192  Request.sendrcv.dlen = length;
1193  if (gDebug >= 2)
1194  Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
1195 
1196  // Send request
1197  XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
1198 
1199  if (xrsp) {
1200  // Prepare return info
1201  Int_t nsent = length;
1202 
1203  // Update counters
1204  fBytesSent += length;
1205 
1206  // Cleanup
1207  SafeDelete(xrsp);
1208 
1209  // Success: update usage timestamp
1210  Touch();
1211 
1212  // ok
1213  return nsent;
1214  } else {
1215  // Print error message, if any
1216  if (fConn->GetLastErr())
1217  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1218  else
1219  Printf("%s: error occured but no message from server", fHost.Data());
1220  }
1221 
1222  // Failure notification (avoid using the handler: we may be exiting)
1223  Error("SendRaw", "%s: problems sending %d bytes to server",
1224  fHost.Data(), length);
1225  return -1;
1226 }
1227 
1228 ////////////////////////////////////////////////////////////////////////////////
1229 /// Ping functionality: contact the server to check its vitality.
1230 /// If external, the server waits for a reply from the server
1231 /// Returns kTRUE if OK or kFALSE in case of error.
1232 
1234 {
1236 
1237  if (gDebug > 0)
1238  Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
1239 
1240  // Make sure we are connected
1241  if (!IsValid()) {
1242  Error("Ping","not connected: nothing to do");
1243  return kFALSE;
1244  }
1245 
1246  // Options
1247  kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
1248 
1249  // Prepare request
1250  XPClientRequest Request;
1251  memset( &Request, 0, sizeof(Request) );
1252  fConn->SetSID(Request.header.streamid);
1253  Request.sendrcv.requestid = kXP_ping;
1254  Request.sendrcv.sid = fSessionID;
1255  Request.sendrcv.opt = options;
1256  Request.sendrcv.dlen = 0;
1257 
1258  // Send request
1259  Bool_t res = kFALSE;
1260  if (fMode != 'i') {
1261  char *pans = 0;
1262  XrdClientMessage *xrsp =
1263  fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
1264  kXR_int32 *pres = (kXR_int32 *) pans;
1265 
1266  // Get the result
1267  if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
1268  *pres = net2host(*pres);
1269  res = (*pres == 1) ? kTRUE : kFALSE;
1270  // Success: update usage timestamp
1271  Touch();
1272  } else {
1273  // Print error msg, if any
1274  if (fConn->GetLastErr())
1275  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1276  }
1277 
1278  // Cleanup
1279  SafeDelete(xrsp);
1280  if (pans) free(pans);
1281 
1282  } else {
1283  if (XPD::clientMarshall(&Request) == 0) {
1284  XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
1285  res = (e == kOK) ? kTRUE : kFALSE;
1286  } else {
1287  Error("Ping", "%p: int: problems marshalling request", this);
1288  }
1289  }
1290 
1291  // Failure notification (avoid using the handler: we may be exiting)
1292  if (!res) {
1293  Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
1294  } else if (gDebug > 0) {
1295  Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
1296  }
1297 
1298  return res;
1299 }
1300 
1301 ////////////////////////////////////////////////////////////////////////////////
1302 /// Remote touch functionality: contact the server to proof our vitality.
1303 /// No reply from server is expected.
1304 
1306 {
1308 
1309  if (gDebug > 0)
1310  Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
1311 
1312  // Make sure we are connected
1313  if (!IsValid()) {
1314  Error("RemoteTouch","not connected: nothing to do");
1315  return;
1316  }
1317 
1318  // Prepare request
1319  XPClientRequest Request;
1320  memset( &Request, 0, sizeof(Request) );
1321  fConn->SetSID(Request.header.streamid);
1322  Request.sendrcv.requestid = kXP_touch;
1323  Request.sendrcv.sid = fSessionID;
1324  Request.sendrcv.opt = 0;
1325  Request.sendrcv.dlen = 0;
1326 
1327  // We need the right order
1328  if (XPD::clientMarshall(&Request) != 0) {
1329  Error("Touch", "%p: problems marshalling request ", this);
1330  return;
1331  }
1332  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1333  Error("Touch", "%p: problems sending touch request to server", this);
1334 
1335  // Done
1336  return;
1337 }
1338 
1339 ////////////////////////////////////////////////////////////////////////////////
1340 /// Interrupt the remote protocol instance. Used to propagate Ctrl-C.
1341 /// No reply from server is expected.
1342 
1344 {
1346 
1347  if (gDebug > 0)
1348  Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
1349 
1350  // Make sure we are connected
1351  if (!IsValid()) {
1352  Error("CtrlC","not connected: nothing to do");
1353  return;
1354  }
1355 
1356  // Prepare request
1357  XPClientRequest Request;
1358  memset( &Request, 0, sizeof(Request) );
1359  fConn->SetSID(Request.header.streamid);
1360  Request.proof.requestid = kXP_ctrlc;
1361  Request.proof.sid = 0;
1362  Request.proof.dlen = 0;
1363 
1364  // We need the right order
1365  if (XPD::clientMarshall(&Request) != 0) {
1366  Error("CtrlC", "%p: problems marshalling request ", this);
1367  return;
1368  }
1369  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1370  Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
1371 
1372  // Done
1373  return;
1374 }
1375 
1376 ////////////////////////////////////////////////////////////////////////////////
1377 /// Wait and pick-up next buffer from the asynchronous queue
1378 
1380 {
1381  fBufCur = 0;
1382  fByteLeft = 0;
1383  fByteCur = 0;
1384  if (gDebug > 2)
1385  Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
1386 
1387  // User can choose whether to wait forever or for a fixed amount of time
1388  if (!fDontTimeout) {
1389  static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
1390  static Int_t dt = 2000;
1391  Int_t to = timeout;
1393  while (to && !IsInterrupt()) {
1394  SetAWait(kTRUE);
1395  if (fASem.Wait(dt) != 0) {
1396  to -= dt;
1397  if (to <= 0) {
1398  Error("PickUpReady","error waiting at semaphore");
1399  return -1;
1400  } else {
1401  if (gDebug > 0)
1402  Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
1403  this, GetTitle(), to/1000);
1404  }
1405  } else
1406  break;
1407  SetAWait(kFALSE);
1408  }
1409  // We wait forever
1410  if (IsInterrupt()) {
1411  if (gDebug > 2)
1412  Info("PickUpReady","interrupted");
1414  SetAWait(kFALSE);
1415  return -1;
1416  }
1417  } else {
1418  // We wait forever
1419  SetAWait(kTRUE);
1420  if (fASem.Wait() != 0) {
1421  Error("PickUpReady","error waiting at semaphore");
1422  SetAWait(kFALSE);
1423  return -1;
1424  }
1425  SetAWait(kFALSE);
1426  }
1427  if (gDebug > 2)
1428  Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
1429 
1431 
1432  // Get message, if any
1433  if (fAQue.size() <= 0) {
1434  Error("PickUpReady","queue is empty - protocol error ?");
1435  return -1;
1436  }
1437  if (!(fBufCur = fAQue.front())) {
1438  Error("PickUpReady","got invalid buffer - protocol error ?");
1439  return -1;
1440  }
1441  // Remove message from the queue
1442  fAQue.pop_front();
1443 
1444  // Set number of available bytes
1445  fByteLeft = fBufCur->fLen;
1446 
1447  if (gDebug > 2)
1448  Info("PickUpReady", "%p: %s: got message (%d bytes)",
1449  this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
1450 
1451  // Update counters
1452  fBytesRecv += fBufCur->fLen;
1453 
1454  // Set session ID
1455  if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
1457 
1458  // Clean entry in the underlying pipe
1459  fgPipe.Clean(this);
1460 
1461  // We are done
1462  return 0;
1463 }
1464 
1465 ////////////////////////////////////////////////////////////////////////////////
1466 /// Pop-up a buffer of at least size bytes from the spare list
1467 /// If none is found either one is reallocated or a new one
1468 /// created
1469 
1471 {
1472  TXSockBuf *buf = 0;
1473  static Int_t nBuf = 0;
1474 
1475 
1476  R__LOCKGUARD(&fgSMtx);
1477 
1478 
1479  Int_t maxsz = 0;
1480  if (fgSQue.size() > 0) {
1481  list<TXSockBuf *>::iterator i;
1482  for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
1483  maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1484  if ((*i) && (*i)->fSiz >= size) {
1485  buf = *i;
1486  if (gDebug > 2)
1487  Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1488  size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
1489  // Drop from this list
1490  fgSQue.erase(i);
1491  return buf;
1492  }
1493  }
1494  // All buffers are too small: enlarge the first one
1495  buf = fgSQue.front();
1496  buf->Resize(size);
1497  if (gDebug > 2)
1498  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1499  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1500  // Drop from this list
1501  fgSQue.pop_front();
1502  return buf;
1503  }
1504 
1505  // Create a new buffer
1506  buf = new TXSockBuf((char *)malloc(size), size);
1507  nBuf++;
1508 
1509  if (gDebug > 2)
1510  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1511  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1512 
1513  // We are done
1514  return buf;
1515 }
1516 
1517 ////////////////////////////////////////////////////////////////////////////////
1518 /// Release read buffer giving back to the spare list
1519 
1521 {
1522  R__LOCKGUARD(&fgSMtx);
1523 
1524  if (gDebug > 2)
1525  Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
1527 
1529  fgSQue.push_back(fBufCur);
1530  } else {
1531  delete fBufCur;
1532  }
1533  fBufCur = 0;
1534  fByteCur = 0;
1535  fByteLeft = 0;
1536 }
1537 
1538 ////////////////////////////////////////////////////////////////////////////////
1539 /// Receive a raw buffer of specified length bytes.
1540 
1542 {
1543  // Inputs must make sense
1544  if (!buffer || (length <= 0))
1545  return -1;
1546 
1547  // Wait and pick-up a read buffer if we do not have one
1548  if (!fBufCur && (PickUpReady() != 0))
1549  return -1;
1550 
1551  // Use it
1552  if (fByteLeft >= length) {
1553  memcpy(buffer, fBufCur->fBuf + fByteCur, length);
1554  fByteCur += length;
1555  if ((fByteLeft -= length) <= 0)
1556  // All used: give back
1557  PushBackSpare();
1558  // Success: update usage timestamp
1559  Touch();
1560  return length;
1561  } else {
1562  // Copy the first part
1563  memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
1564  Int_t at = fByteLeft;
1565  Int_t tobecopied = length - fByteLeft;
1566  PushBackSpare();
1567  while (tobecopied > 0) {
1568  // Pick-up next buffer (it may wait inside)
1569  if (PickUpReady() != 0)
1570  return -1;
1571  // Copy the fresh meat
1572  Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1573  memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
1574  fByteCur = ncpy;
1575  if ((fByteLeft -= ncpy) <= 0)
1576  // All used: give back
1577  PushBackSpare();
1578  // Recalculate
1579  tobecopied -= ncpy;
1580  at += ncpy;
1581  }
1582  }
1583 
1584  // Update counters
1585  fBytesRecv += length;
1586  fgBytesRecv += length;
1587 
1588  // Success: update usage timestamp
1589  Touch();
1590 
1591  return length;
1592 }
1593 
1594 ////////////////////////////////////////////////////////////////////////////////
1595 /// Send urgent message (interrupt) to remote server
1596 /// Returns 0 or -1 in case of error.
1597 
1599 {
1601 
1602  // Prepare request
1603  XPClientRequest Request;
1604  memset(&Request, 0, sizeof(Request) );
1605  fConn->SetSID(Request.header.streamid);
1606  if (type == (Int_t) TProof::kShutdownInterrupt)
1607  Request.interrupt.requestid = kXP_destroy;
1608  else
1609  Request.interrupt.requestid = kXP_interrupt;
1610  Request.interrupt.sid = fSessionID;
1611  Request.interrupt.type = type; // type of interrupt (see TProof::EUrgent)
1612  Request.interrupt.dlen = 0;
1613 
1614  // Send request
1615  XrdClientMessage *xrsp =
1616  fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
1617  if (xrsp) {
1618  // Success: update usage timestamp
1619  Touch();
1620  // Cleanup
1621  SafeDelete(xrsp);
1622  // ok
1623  return 0;
1624  } else {
1625  // Print error msg, if any
1626  if (fConn->GetLastErr())
1627  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1628  }
1629 
1630  // Failure notification (avoid using the handler: we may be exiting)
1631  Error("SendInterrupt", "problems sending interrupt to server");
1632  return -1;
1633 }
1634 
1635 ////////////////////////////////////////////////////////////////////////////////
1636 /// Send a TMessage object. Returns the number of bytes in the TMessage
1637 /// that were sent and -1 in case of error.
1638 
1640 {
1642 
1643  if (mess.IsReading()) {
1644  Error("Send", "cannot send a message used for reading");
1645  return -1;
1646  }
1647 
1648  // send streamer infos in case schema evolution is enabled in the TMessage
1649  SendStreamerInfos(mess);
1650 
1651  // send the process id's so TRefs work
1652  SendProcessIDs(mess);
1653 
1654  mess.SetLength(); //write length in first word of buffer
1655 
1656  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
1657  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
1658 
1659  if (mess.GetCompressionLevel() > 0)
1660  const_cast<TMessage&>(mess).Compress();
1661 
1662  char *mbuf = mess.Buffer();
1663  Int_t mlen = mess.Length();
1664  if (mess.CompBuffer()) {
1665  mbuf = mess.CompBuffer();
1666  mlen = mess.CompLength();
1667  }
1668 
1669  // Parse message type to choose sending options
1670  kXR_int32 fSendOptDefault = fSendOpt;
1671  switch (mess.What()) {
1672  case kPROOF_PROCESS:
1674  break;
1675  case kPROOF_PROGRESS:
1676  case kPROOF_FEEDBACK:
1678  break;
1679  case kPROOF_QUERYSUBMITTED:
1682  break;
1683  case kPROOF_STARTPROCESS:
1686  break;
1687  case kPROOF_STOPPROCESS:
1689  break;
1690  case kPROOF_SETIDLE:
1693  break;
1694  case kPROOF_LOGFILE:
1695  case kPROOF_LOGDONE:
1696  if (GetClientIDSize() <= 1)
1697  fSendOpt |= kXPD_logmsg;
1698  break;
1699  default:
1700  break;
1701  }
1702 
1703  if (gDebug > 2)
1704  Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
1705 
1706  Int_t nsent = SendRaw(mbuf, mlen);
1707  fSendOpt = fSendOptDefault;
1708 
1709  if (nsent <= 0)
1710  return nsent;
1711 
1712  fBytesSent += nsent;
1713  fgBytesSent += nsent;
1714 
1715  return nsent - sizeof(UInt_t); //length - length header
1716 }
1717 
1718 ////////////////////////////////////////////////////////////////////////////////
1719 /// Receive a TMessage object. The user must delete the TMessage object.
1720 /// Returns length of message in bytes (can be 0 if other side of connection
1721 /// is closed) or -1 in case of error or -5 if pipe broken (connection invalid).
1722 /// In those case mess == 0.
1723 
1725 {
1727 
1728  if (!IsValid()) {
1729  mess = 0;
1730  return -5;
1731  }
1732 
1733 oncemore:
1734  Int_t n;
1735  UInt_t len;
1736  if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
1737  mess = 0;
1738  return n;
1739  }
1740  len = net2host(len); //from network to host byte order
1741 
1742  char *buf = new char[len+sizeof(UInt_t)];
1743  if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
1744  delete [] buf;
1745  mess = 0;
1746  return n;
1747  }
1748 
1749  fBytesRecv += n + sizeof(UInt_t);
1750  fgBytesRecv += n + sizeof(UInt_t);
1751 
1752  mess = new TMessage(buf, len+sizeof(UInt_t));
1753 
1754  // receive any streamer infos
1755  if (RecvStreamerInfos(mess))
1756  goto oncemore;
1757 
1758  // receive any process ids
1759  if (RecvProcessIDs(mess))
1760  goto oncemore;
1761 
1762  if (mess->What() & kMESS_ACK) {
1763  // Acknowledgement embedded: ignore ...
1764  mess->SetWhat(mess->What() & ~kMESS_ACK);
1765  }
1766 
1767  return n;
1768 }
1769 
1770 ////////////////////////////////////////////////////////////////////////////////
1771 /// Send message to intermediate coordinator.
1772 /// If any output is due, this is returned as an obj string to be
1773 /// deleted by the caller
1774 
1775 TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
1776  Long64_t l64, Int_t int3, const char *)
1777 {
1778  TObjString *sout = 0;
1779 
1780  // We fill the header struct containing the request
1781  XPClientRequest reqhdr;
1782  const void *buf = 0;
1783  char *bout = 0;
1784  char **vout = 0;
1785  memset(&reqhdr, 0, sizeof(reqhdr));
1786  fConn->SetSID(reqhdr.header.streamid);
1787  reqhdr.header.requestid = kXP_admin;
1788  reqhdr.proof.int1 = kind;
1789  reqhdr.proof.int2 = int2;
1790  switch (kind) {
1791  case kQueryMssUrl:
1792  case kQueryROOTVersions:
1793  case kQuerySessions:
1794  case kQueryWorkers:
1795  reqhdr.proof.sid = 0;
1796  reqhdr.header.dlen = 0;
1797  vout = (char **)&bout;
1798  break;
1799  case kCleanupSessions:
1800  reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
1801  : (kXR_int32) kXPD_TopMaster;
1802  reqhdr.proof.int3 = int2;
1803  reqhdr.proof.sid = fSessionID;
1804  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1805  buf = (msg) ? (const void *)msg : buf;
1806  break;
1807  case kCpFile:
1808  case kGetFile:
1809  case kPutFile:
1810  case kExec:
1811  reqhdr.proof.sid = fSessionID;
1812  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1813  buf = (msg) ? (const void *)msg : buf;
1814  vout = (char **)&bout;
1815  break;
1816  case kQueryLogPaths:
1817  vout = (char **)&bout;
1818  reqhdr.proof.int3 = int3;
1819  case kReleaseWorker:
1820  case kSendMsgToUser:
1821  case kGroupProperties:
1822  case kSessionTag:
1823  case kSessionAlias:
1824  reqhdr.proof.sid = fSessionID;
1825  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1826  buf = (msg) ? (const void *)msg : buf;
1827  break;
1828  case kROOTVersion:
1829  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1830  buf = (msg) ? (const void *)msg : buf;
1831  break;
1832  case kGetWorkers:
1833  reqhdr.proof.sid = fSessionID;
1834  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1835  if (msg)
1836  buf = (const void *)msg;
1837  vout = (char **)&bout;
1838  break;
1839  case kReadBuffer:
1840  reqhdr.header.requestid = kXP_readbuf;
1841  reqhdr.readbuf.ofs = l64;
1842  reqhdr.readbuf.len = int2;
1843  if (int3 > 0 && fXrdProofdVersion < 1003) {
1844  Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
1845  " grep functionality not supported", fXrdProofdVersion);
1846  return sout;
1847  }
1848  reqhdr.readbuf.int1 = int3;
1849  if (!msg || strlen(msg) <= 0) {
1850  Info("SendCoordinator", "kReadBuffer: file path undefined");
1851  return sout;
1852  }
1853  reqhdr.header.dlen = strlen(msg);
1854  buf = (const void *)msg;
1855  vout = (char **)&bout;
1856  break;
1857  default:
1858  Info("SendCoordinator", "unknown message kind: %d", kind);
1859  return sout;
1860  }
1861 
1862  // server response header
1863  Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
1864  XrdClientMessage *xrsp =
1865  fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
1866 
1867  // If positive answer
1868  if (xrsp) {
1869  // Check if we need to create an output string
1870  if (bout && (xrsp->DataLen() > 0))
1871  sout = new TObjString(TString(bout,xrsp->DataLen()));
1872  if (bout)
1873  free(bout);
1874  // Success: update usage timestamp
1875  Touch();
1876  SafeDelete(xrsp);
1877  } else {
1878  // Print error msg, if any
1879  if (fConn->GetLastErr())
1880  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1881  }
1882 
1883  // Failure notification (avoid using the handler: we may be exiting)
1884  return sout;
1885 }
1886 
1887 ////////////////////////////////////////////////////////////////////////////////
1888 /// Send urgent message to counterpart; 'type' specifies the type of
1889 /// the message (see TXSocket::EUrgentMsgType), and 'int1', 'int2'
1890 /// two containers for additional information.
1891 
1893 {
1895 
1896  // Prepare request
1897  XPClientRequest Request;
1898  memset(&Request, 0, sizeof(Request) );
1899  fConn->SetSID(Request.header.streamid);
1900  Request.proof.requestid = kXP_urgent;
1901  Request.proof.sid = fSessionID;
1902  Request.proof.int1 = type; // type of urgent msg (see TXSocket::EUrgentMsgType)
1903  Request.proof.int2 = int1; // 4-byte container info 1
1904  Request.proof.int3 = int2; // 4-byte container info 2
1905  Request.proof.dlen = 0;
1906 
1907  // Send request
1908  XrdClientMessage *xrsp =
1909  fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
1910  if (xrsp) {
1911  // Success: update usage timestamp
1912  Touch();
1913  // Cleanup
1914  SafeDelete(xrsp);
1915  } else {
1916  // Print error msg, if any
1917  if (fConn->GetLastErr())
1918  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1919  }
1920 
1921  // Done
1922  return;
1923 }
1924 
1925 ////////////////////////////////////////////////////////////////////////////////
1926 /// Init environment variables for XrdClient
1927 
1929 {
1930  // Set debug level
1931  Int_t deb = gEnv->GetValue("XProof.Debug", -1);
1932  EnvPutInt(NAME_DEBUG, deb);
1933  if (deb > 0) {
1934  XrdProofdTrace->What |= TRACE_REQ;
1935  if (deb > 1) {
1936  XrdProofdTrace->What |= TRACE_DBG;
1937  if (deb > 2)
1938  XrdProofdTrace->What |= TRACE_ALL;
1939  }
1940  }
1941  const char *cenv = 0;
1942 
1943  // List of domains where connection is allowed
1944  TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
1945  if (allowCO.Length() > 0)
1946  EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
1947 
1948  // List of domains where connection is denied
1949  TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
1950  if (denyCO.Length() > 0)
1951  EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
1952 
1953  // Max number of retries on first connect and related timeout
1955  Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
1956  EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
1957  Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
1958  EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
1959 
1960  // Reconnect Wait
1961  Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
1962  DFLT_RECONNECTWAIT);
1963  if (recoTO == DFLT_RECONNECTWAIT) {
1964  // Check also the old variable name
1965  recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
1966  DFLT_RECONNECTWAIT);
1967  }
1968  EnvPutInt(NAME_RECONNECTWAIT, recoTO);
1969 
1970  // Request Timeout
1971  Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
1972  EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
1973 
1974  // No automatic proofd backward-compatibility
1975  EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
1976 
1977  // Dynamic forwarding (SOCKS4)
1978  TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
1979  Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
1980  if (socks4Port > 0) {
1981  if (socks4Host.IsNull())
1982  // Default
1983  socks4Host = "127.0.0.1";
1984  EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
1985  EnvPutInt(NAME_SOCKS4PORT, socks4Port);
1986  }
1987 
1988  // For password-based authentication
1989  TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
1990  if (autolog.Length() > 0 &&
1991  (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
1992  gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
1993 
1994  // For password-based authentication
1995  TString netrc;
1996  netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
1997  gSystem->Setenv("XrdSecNETRC", netrc.Data());
1998 
1999  TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
2000  if (alogfile.Length() > 0)
2001  gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
2002 
2003  TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
2004  if (verisrv.Length() > 0 &&
2005  (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2006  gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
2007 
2008  TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
2009  if (srvpuk.Length() > 0)
2010  gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
2011 
2012  // For GSI authentication
2013  TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
2014  if (cadir.Length() > 0)
2015  gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
2016 
2017  TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
2018  if (crldir.Length() > 0)
2019  gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
2020 
2021  TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
2022  if (crlext.Length() > 0)
2023  gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
2024 
2025  TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
2026  if (ucert.Length() > 0)
2027  gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
2028 
2029  TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
2030  if (ukey.Length() > 0)
2031  gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
2032 
2033  TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
2034  if (upxy.Length() > 0)
2035  gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
2036 
2037  TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
2038  if (valid.Length() > 0)
2039  gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
2040 
2041  TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
2042  if (deplen.Length() > 0 &&
2043  (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2044  gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
2045 
2046  TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
2047  if (pxybits.Length() > 0)
2048  gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
2049 
2050  TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
2051  if (crlcheck.Length() > 0 &&
2052  (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2053  gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
2054 
2055  TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
2056  if (delegpxy.Length() > 0 &&
2057  (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2058  gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
2059 
2060  TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
2061  if (signpxy.Length() > 0 &&
2062  (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2063  gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
2064 
2065  // Print the tag, if required (only once)
2066  if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
2067  ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
2068  gROOT->GetVersion());
2069 
2070  // Only once
2071  fgInitDone = kTRUE;
2072 }
2073 
2074 ////////////////////////////////////////////////////////////////////////////////
2075 /// Try reconnection after failure
2076 
2078 {
2079  if (gDebug > 0) {
2080  Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2081  this, fConn, (fConn ? fConn->IsValid() : 0),
2082  fUrl.Data(), fConn->GetLogConnID());
2083  }
2084 
2085  Int_t tryreconnect = gEnv->GetValue("TXSocket.Reconnect", 0);
2086  if (tryreconnect == 0 || fXrdProofdVersion < 1005) {
2087  if (tryreconnect == 0)
2088  Info("Reconnect","%p: reconnection attempts explicitly disabled!", this);
2089  else
2090  Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
2091  this, fXrdProofdVersion);
2092  return -1;
2093  }
2094 
2095  if (fConn) {
2096  if (gDebug > 0)
2097  Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
2098  fConn->ReConnect();
2099  if (fConn->IsValid()) {
2100  // Create new proofserv if not client manager or administrator or internal mode
2101  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
2102  // We attach or create
2103  if (!Create(kTRUE)) {
2104  // Failure
2105  Error("TXSocket", "create or attach failed (%s)",
2106  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
2107  Close();
2108  return -1;
2109  }
2110  }
2111  }
2112  }
2113 
2114  if (gDebug > 0) {
2115  if (fConn) {
2116  Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
2117  (fConn->IsValid() ? "succeeded!" : "failed"),
2118  fConn->GetLogConnID() );
2119  } else {
2120  Info("Reconnect", "%p (c:0x0): attempt failed", this);
2121  }
2122  }
2123 
2124  // Done
2125  return ((fConn && fConn->IsValid()) ? 0 : -1);
2126 }
2127 
2128 ////////////////////////////////////////////////////////////////////////////////
2129 ///constructor
2130 
2132 {
2133  fBuf = fMem = bp;
2134  fSiz = fLen = sz;
2135  fOwn = own;
2136  fCid = -1;
2137  fgBuffMem += sz;
2138 }
2139 
2140 ////////////////////////////////////////////////////////////////////////////////
2141 ///destructor
2142 
2144 {
2145  if (fOwn && fMem) {
2146  free(fMem);
2147  fgBuffMem -= fSiz;
2148  }
2149 }
2150 
2151 ////////////////////////////////////////////////////////////////////////////////
2152 ///resize socket buffer
2153 
2155 {
2156  if (sz > fSiz) {
2157  if ((fMem = (Char_t *)realloc(fMem, sz))) {
2158  fgBuffMem += (sz - fSiz);
2159  fBuf = fMem;
2160  fSiz = sz;
2161  fLen = 0;
2162  }
2163  }
2164 }
2165 
2166 //_____________________________________________________________________________
2167 //
2168 // TXSockBuf static methods
2169 //
2170 
2171 ////////////////////////////////////////////////////////////////////////////////
2172 /// Return the currently allocated memory
2173 
2175 {
2176  return fgBuffMem;
2177 }
2178 
2179 ////////////////////////////////////////////////////////////////////////////////
2180 /// Return the max allocated memory allowed
2181 
2183 {
2184  return fgMemMax;
2185 }
2186 
2187 ////////////////////////////////////////////////////////////////////////////////
2188 /// Return the max allocated memory allowed
2189 
2191 {
2192  fgMemMax = memmax > 0 ? memmax : fgMemMax;
2193 }
2194 
2195 //_____________________________________________________________________________
2196 //
2197 // TXSockPipe
2198 //
2199 
2200 ////////////////////////////////////////////////////////////////////////////////
2201 /// Constructor
2202 
2203 TXSockPipe::TXSockPipe(const char *loc) : fMutex(kTRUE), fLoc(loc)
2204 {
2205  // Create the pipe
2206  if (pipe(fPipe) != 0) {
2207  Printf("TXSockPipe: problem initializing pipe for socket inputs");
2208  fPipe[0] = -1;
2209  fPipe[1] = -1;
2210  return;
2211  }
2212 }
2213 
2214 ////////////////////////////////////////////////////////////////////////////////
2215 /// Destructor
2216 
2218 {
2219  if (fPipe[0] >= 0) close(fPipe[0]);
2220  if (fPipe[1] >= 0) close(fPipe[1]);
2221 }
2222 
2223 
2224 ////////////////////////////////////////////////////////////////////////////////
2225 /// Write a byte to the global pipe to signal new availibility of
2226 /// new messages
2227 
2229 {
2230  if (!IsValid() || !s) return -1;
2231 
2232  // This must be an atomic action
2233  Int_t sz = 0;
2234  { R__LOCKGUARD(&fMutex);
2235  // Add this one
2236  fReadySock.Add(s);
2237 
2238  // Only one char
2239  Char_t c = 1;
2240  if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
2241  Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
2242  return -1;
2243  }
2244  if (gDebug > 2) sz = fReadySock.GetSize();
2245  }
2246 
2247  if (gDebug > 2)
2248  Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2249  fLoc.Data(), s, sz, fPipe[1]);
2250  // We are done
2251  return 0;
2252 }
2253 
2254 ////////////////////////////////////////////////////////////////////////////////
2255 /// Read a byte to the global pipe to synchronize message pickup
2256 
2258 {
2259  // Pipe must have been created
2260  if (!IsValid() || !s) return -1;
2261 
2262  // Only one char
2263  Int_t sz = 0;
2264  Char_t c = 0;
2265  { R__LOCKGUARD(&fMutex);
2266  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
2267  Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
2268  return -1;
2269  }
2270  // Remove this one
2271  fReadySock.Remove(s);
2272 
2273  if (gDebug > 2) sz = fReadySock.GetSize();
2274  }
2275 
2276  if (gDebug > 2)
2277  Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2278  fLoc.Data(), s, sz, fPipe[0]);
2279 
2280  // We are done
2281  return 0;
2282 }
2283 
2284 ////////////////////////////////////////////////////////////////////////////////
2285 /// Remove any reference to socket 's' from the global pipe and
2286 /// ready-socket queue
2287 
2289 {
2290  // Pipe must have been created
2291  if (!IsValid() || !s) return -1;
2292 
2293  TObject *o = 0;
2294  // This must be an atomic action
2295  { R__LOCKGUARD(&fMutex);
2296  o = fReadySock.FindObject(s);
2297 
2298  while (o) {
2299  // Remove from the list
2300  fReadySock.Remove(s);
2301  o = fReadySock.FindObject(s);
2302  // Remove one notification from the pipe
2303  Char_t c = 0;
2304  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
2305  Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
2306  }
2307  }
2308  // Flush also the socket
2309  ((TXSocket *)s)->Flush();
2310 
2311  // Notify
2312  if (gDebug > 0)
2313  Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
2314 
2315  // We are done
2316  return 0;
2317 }
2318 
2319 ////////////////////////////////////////////////////////////////////////////////
2320 /// Dump content of the ready socket list
2321 
2323 {
2324  R__LOCKGUARD(&fMutex);
2325 
2326  TString buf = Form("%d |", fReadySock.GetSize());
2327  TIter nxs(&fReadySock);
2328  TObject *o = 0;
2329  while ((o = nxs()))
2330  buf += Form(" %p",o);
2331  Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
2332 }
2333 
2334 ////////////////////////////////////////////////////////////////////////////////
2335 /// Return last ready socket
2336 
2338 {
2339  R__LOCKGUARD(&fMutex);
2340 
2341  return (TXSocket *) fReadySock.Last();
2342 }
const char * GetHost() const
Definition: TUrl.h:76
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
Definition: TMutex.h:37
double read(const std::string &file_name)
reading
Int_t fSocket
Definition: TSocket.h:100
TXSocket(const char *url, Char_t mode= 'M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Definition: TXSocket.cxx:127
Int_t fCid
Definition: TXSocket.h:261
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define kXPD_querynum
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:254
Bool_t IsValid() const
Definition: TXSocket.h:179
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:711
Int_t fTcpWindowSize
Definition: TSocket.h:101
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition: TSocket.cxx:932
virtual void SetClientID(Int_t)
Definition: TXSocket.h:182
TSemaphore fAsynProc
Definition: TXSocket.h:108
virtual Bool_t Notify()
Notify when event occurred on descriptor associated with this handler.
long long Long64_t
Definition: RtypesCore.h:69
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Definition: TXSocket.cxx:1176
kXR_unt16 fStreamid
Definition: XrdProofConn.h:73
#define kXPD_TopMaster
TSemaphore fASem
Definition: TXSocket.h:100
void SetLoc(const char *loc="")
Definition: TXSocket.h:297
Bool_t IsReading() const
Definition: TBuffer.h:81
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Int_t fPipe[2]
Definition: TXSocket.h:301
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
Bool_t IsValid() const
Definition: TXSocket.h:287
Ssiz_t Length() const
Definition: TString.h:390
Collectable string class.
Definition: TObjString.h:32
virtual void Close(Option_t *opt="")
Close connection.
Definition: TXSocket.cxx:323
const char Option_t
Definition: RtypesCore.h:62
Int_t fLogLevel
Definition: TXSocket.h:91
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition: TSocket.cxx:979
static Long64_t BuffMem()
Return the currently allocated memory.
Definition: TXSocket.cxx:2174
virtual Int_t GetClientID() const
Definition: TXSocket.h:171
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Definition: TUrl.h:41
int GetLogConnID() const
Definition: XrdProofConn.h:140
XrdProofConn * fConn
Definition: TXSocket.h:97
char fMode
Definition: TXSocket.h:84
Int_t TryWait()
If semaphore value is > 0 then decrement it and return 0.
Definition: TSemaphore.cxx:81
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition: TXSocket.cxx:380
Int_t Wait(Int_t millisec=0)
If semaphore value is > 0 then decrement it and carry on.
Definition: TSemaphore.cxx:38
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
struct XPClientInterruptRequest interrupt
char * CompBuffer() const
Definition: TMessage.h:94
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
~TXSockBuf()
destructor
Definition: TXSocket.cxx:2143
virtual const char * HomeDirectory(const char *userName=0)
Return the user's home directory.
Definition: TSystem.cxx:881
TMutex * fAMtx
Definition: TXSocket.h:101
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
Bool_t Notify()
Definition: TTimer.cxx:65
#define gROOT
Definition: TROOT.h:340
XrdOucTrace * XrdProofdTrace
Definition: TXSocket.cxx:55
Basic string class.
Definition: TString.h:137
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
Definition: TXHandler.cxx:39
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition: TSocket.cxx:650
Int_t CompLength() const
Definition: TMessage.h:95
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TXSocket()
Destructor.
Definition: TXSocket.cxx:239
const Bool_t kFALSE
Definition: Rtypes.h:92
Int_t GetCompressionLevel() const
Definition: TSocket.h:211
TMutex * fIMtx
Definition: TXSocket.h:111
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
int GetServType() const
Definition: XrdProofConn.h:143
void SetInterrupt(Bool_t i=kTRUE)
Definition: TXSocket.h:218
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
TInetAddress fAddress
Definition: TSocket.h:90
#define kXPD_async
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Definition: TXSocket.cxx:268
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
TXSocket * GetLastReady()
Return last ready socket.
Definition: TXSocket.cxx:2337
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Definition: TXSocket.cxx:1305
TObject Int_t at
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2228
ESendRecvOptions
Definition: TSocket.h:65
static Long64_t fgBuffMem
Definition: TXSocket.h:274
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
Definition: TXSocket.cxx:2257
const char * Data() const
Definition: TString.h:349
static Long64_t fgMemMax
Definition: TXSocket.h:275
struct ClientRequestHdr header
#define SafeDelete(p)
Definition: RConfig.h:436
UShort_t net2host(UShort_t x)
Definition: Bytes.h:579
Int_t fPort
Definition: TXSocket.h:89
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TXSocket.cxx:1541
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2334
static void InitEnvs()
Init environment variables for XrdClient.
Definition: TXSocket.cxx:1928
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
Definition: memory.h:94
const char * ord
Definition: TXSlave.cxx:46
#define kXPD_setidle
#define kXPD_process
TString fUrl
Definition: TSocket.h:102
const char *const kPROOF_WorkerIdleTO
Definition: TProof.h:158
static std::list< TXSockBuf * > fgSQue
Definition: TXSocket.h:132
Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TXSocket.cxx:1639
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition: TXSocket.cxx:930
Bool_t fIForward
Definition: TXSocket.h:113
TXHandler * fHandler
Definition: TXSocket.h:95
const char * GetLastErr()
Definition: XrdProofConn.h:146
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1627
if(pyself &&pyself!=Py_None)
virtual Int_t GetClientIDSize() const
Definition: TXSocket.h:172
virtual Bool_t HandleInput(const void *in=0)
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1964
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Definition: TXSocket.cxx:1470
char * Buffer() const
Definition: TBuffer.h:91
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
Definition: TXSocket.cxx:917
XFontStruct * id
Definition: TGX11.cxx:108
#define kXPD_internal
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
TString fUser
Definition: TXSocket.h:87
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition: TSystem.cxx:2245
int fRemoteProtocol
Definition: XrdProofConn.h:74
struct XPClientSendRcvRequest sendrcv
int GetOpenError() const
Definition: XrdProofConn.h:142
static const char * what
Definition: stlLoader.cc:6
kXR_unt16 HeaderSID()
Bool_t fOwn
Definition: TXSocket.h:260
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Definition: TXSocket.cxx:1011
Int_t fByteCur
Definition: TXSocket.h:105
Int_t Flush()
Flush the asynchronous queue.
Definition: TXSocket.cxx:962
Bool_t IsInterrupt()
Definition: TXSocket.h:222
Int_t GetPort() const
Definition: TUrl.h:87
TString fHost
Definition: TXSocket.h:88
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
Definition: TXSocket.cxx:2131
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition: TSystem.cxx:1611
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
void SetAWait(Bool_t w=kTRUE)
Definition: TXSocket.h:224
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:494
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:674
Int_t fXrdProofdVersion
Definition: TXSocket.h:123
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
Definition: TXSocket.cxx:860
#define XrdSysError
Definition: XpdSysError.h:8
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Definition: TXSocket.cxx:1233
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
UInt_t fBytesSent
Definition: TSocket.h:92
char * Form(const char *fmt,...)
TString fBuffer
Definition: TXSocket.h:93
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
static TMutex fgSMtx
Definition: TXSocket.h:131
#define TRACE_REQ
std::list< TXSockBuf * > fAQue
Definition: TXSocket.h:103
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
#define TRACE_ALL
Int_t Post()
If any threads are blocked in Wait(), wake one of them up and increment the value of the semaphore...
Definition: TSemaphore.cxx:105
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
Definition: TXSocket.cxx:2288
Bool_t IsNull() const
Definition: TString.h:387
static TString fgLoc
Definition: TXSocket.h:127
Char_t * fBuf
Definition: TXSocket.h:259
void Touch()
Definition: TSocket.h:187
TString fLoc
Definition: TXSocket.h:302
XrdOucString fUser
Definition: XrdProofConn.h:79
TList fReadySock
Definition: TXSocket.h:303
Int_t fSiz
Definition: TXSocket.h:257
#define kXPD_AnyServer
#define kXPD_logmsg
Int_t GetCompressionLevel() const
Definition: TMessage.h:112
#define Printf
Definition: TGeoToOCC.h:18
XrdClientUrlInfo fUrl
Definition: XrdProofConn.h:102
Int_t fLen
Definition: TXSocket.h:258
virtual ~TXSockPipe()
Destructor.
Definition: TXSocket.cxx:2217
virtual Int_t Reconnect()
Try reconnection after failure.
Definition: TXSocket.cxx:2077
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition: TXSocket.cxx:280
#define TRACE_DBG
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:580
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
int Ssiz_t
Definition: RtypesCore.h:63
TXSockBuf * fBufCur
Definition: TXSocket.h:106
Short_t fSessionID
Definition: TXSocket.h:86
virtual Int_t GetSize() const
Definition: TCollection.h:95
XrdOucString fHost
Definition: XrdProofConn.h:80
Bool_t IsValid() const
Definition: TXSocket.h:315
bool IsValid() const
Test validity of this connection.
Int_t fCompress
Definition: TSocket.h:93
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
unsigned long long ULong64_t
Definition: RtypesCore.h:70
#define kXPD_fb_prog
static XrdSysLogger eLogger
Definition: TXSocket.cxx:56
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
Definition: XrdProofConn.h:93
#define R__LOCKGUARD(mutex)
UInt_t What() const
Definition: TMessage.h:80
Bool_t fDontTimeout
Definition: TXSocket.h:119
TXSockPipe(const char *loc="")
Constructor.
Definition: TXSocket.cxx:2203
XrdOucString fLastErrMsg
Definition: XrdProofConn.h:82
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TXSocket.cxx:1724
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
Char_t * fMem
Definition: TXSocket.h:273
kXR_int32 fSendOpt
Definition: TXSocket.h:85
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
EServiceType fServType
Definition: TSocket.h:99
void DumpReadySock()
Dump content of the ready socket list.
Definition: TXSocket.cxx:2322
TObject * fReference
Definition: TXSocket.h:94
Mother of all ROOT objects.
Definition: TObject.h:58
static ULong64_t fgBytesSent
Definition: TSocket.h:110
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition: TString.cxx:1806
char Char_t
Definition: RtypesCore.h:29
void PushBackSpare()
Release read buffer giving back to the spare list.
Definition: TXSocket.cxx:1520
kXR_int32 fILev
Definition: TXSocket.h:112
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition: TSocket.cxx:685
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
Definition: TError.cxx:202
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Int_t Length() const
Definition: TBuffer.h:94
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Definition: TXSocket.cxx:1379
static Bool_t fgInitDone
Definition: TXSocket.h:128
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
Definition: TXSocket.cxx:1775
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
Definition: TXSocket.cxx:1892
static TXSockPipe fgPipe
Definition: TXSocket.h:126
static ULong64_t fgBytesRecv
Definition: TSocket.h:109
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
virtual Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
Int_t fRemoteProtocol
Definition: TSocket.h:95
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:280
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2190
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
Definition: TXSocket.cxx:1598
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition: TXSocket.cxx:71
Int_t fPid
Definition: TXSocket.h:116
bool MatchStreamid(short sid)
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:582
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t fByteLeft
Definition: TXSocket.h:104
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
const Int_t n
Definition: legend1.C:16
void Resize(Int_t sz)
resize socket buffer
Definition: TXSocket.cxx:2154
void CtrlC()
Interrupt the remote protocol instance.
Definition: TXSocket.cxx:1343
TMutex fMutex
Definition: TXSocket.h:300
UInt_t fBytesRecv
Definition: TSocket.h:91
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2182
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:188
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904
static XrdSysError eDest(0,"Proofx")