Logo ROOT   6.08/07
Reference Guide
TUDPSocket.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Marcelo Sousa 26/10/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TUDPSocket //
15 // //
16 // This class implements UDP client sockets. A socket is an endpoint //
17 // for communication between two machines. //
18 // The actual work is done via the TSystem class (either TUnixSystem //
19 // or TWinNTSystem). //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #include "Bytes.h"
24 #include "Compression.h"
25 #include "NetErrors.h"
26 #include "TEnv.h"
27 #include "TError.h"
28 #include "TMessage.h"
29 #include "TUDPSocket.h"
30 #include "TPluginManager.h"
31 #include "TROOT.h"
32 #include "TString.h"
33 #include "TSystem.h"
34 #include "TUrl.h"
35 #include "TVirtualAuth.h"
36 #include "TStreamerInfo.h"
37 #include "TProcessID.h"
38 
41 
42 
44 
45 ////////////////////////////////////////////////////////////////////////////////
46 /// Create a socket. Connect to the named service at address addr.
47 /// Use tcpwindowsize to specify the size of the receive buffer, it has
48 /// to be specified here to make sure the window scale option is set (for
49 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
50 /// Returns when connection has been accepted by remote side. Use IsValid()
51 /// to check the validity of the socket. Every socket is added to the TROOT
52 /// sockets list which will make sure that any open sockets are properly
53 /// closed on program termination.
54 
55 TUDPSocket::TUDPSocket(TInetAddress addr, const char *service)
56  : TNamed(addr.GetHostName(), service)
57 {
60 
61  fService = service;
62  fSecContext = 0;
63  fRemoteProtocol= -1;
64  fServType = kSOCKD;
65  if (fService.Contains("root"))
66  fServType = kROOTD;
67  if (fService.Contains("proof"))
68  fServType = kPROOFD;
69  fAddress = addr;
70  fAddress.fPort = gSystem->GetServiceByName(service);
71  fBytesSent = 0;
72  fBytesRecv = 0;
73  fCompress = 0;
74  fUUIDs = 0;
75  fLastUsageMtx = 0;
76  ResetBit(TUDPSocket::kBrokenConn);
77 
78  if (fAddress.GetPort() != -1) {
79  fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
80  -1, "upd");
81 
82  if (fSocket != -1) {
84  gROOT->GetListOfSockets()->Add(this);
85  }
86  } else
87  fSocket = -1;
88 
89 }
90 
91 
92 ////////////////////////////////////////////////////////////////////////////////
93 /// Create a socket. Connect to the specified port # at address addr.
94 /// Use tcpwindowsize to specify the size of the receive buffer, it has
95 /// to be specified here to make sure the window scale option is set (for
96 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
97 /// Returns when connection has been accepted by remote side. Use IsValid()
98 /// to check the validity of the socket. Every socket is added to the TROOT
99 /// sockets list which will make sure that any open sockets are properly
100 /// closed on program termination.
101 
103  : TNamed(addr.GetHostName(), "")
104 {
105  R__ASSERT(gROOT);
107 
109  fSecContext = 0;
110  fRemoteProtocol= -1;
111  fServType = kSOCKD;
112  if (fService.Contains("root"))
113  fServType = kROOTD;
114  if (fService.Contains("proof"))
115  fServType = kPROOFD;
116  fAddress = addr;
117  fAddress.fPort = port;
119  fBytesSent = 0;
120  fBytesRecv = 0;
121  fCompress = 0;
122  fUUIDs = 0;
123  fLastUsageMtx = 0;
125 
127  -1, "upd");
128  if (fSocket == -1)
129  fAddress.fPort = -1;
130  else {
132  gROOT->GetListOfSockets()->Add(this);
133  }
134 }
135 
136 ////////////////////////////////////////////////////////////////////////////////
137 /// Create a socket. Connect to named service on the remote host.
138 /// Use tcpwindowsize to specify the size of the receive buffer, it has
139 /// to be specified here to make sure the window scale option is set (for
140 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
141 /// Returns when connection has been accepted by remote side. Use IsValid()
142 /// to check the validity of the socket. Every socket is added to the TROOT
143 /// sockets list which will make sure that any open sockets are properly
144 /// closed on program termination.
145 
146 TUDPSocket::TUDPSocket(const char *host, const char *service)
147  : TNamed(host, service)
148 {
149  R__ASSERT(gROOT);
151 
152  fService = service;
153  fSecContext = 0;
154  fRemoteProtocol= -1;
155  fServType = kSOCKD;
156  if (fService.Contains("root"))
157  fServType = kROOTD;
158  if (fService.Contains("proof"))
159  fServType = kPROOFD;
160  fAddress = gSystem->GetHostByName(host);
163  fBytesSent = 0;
164  fBytesRecv = 0;
165  fCompress = 0;
166  fUUIDs = 0;
167  fLastUsageMtx = 0;
169 
170  if (fAddress.GetPort() != -1) {
171  fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
172  if (fSocket != -1) {
174  gROOT->GetListOfSockets()->Add(this);
175  }
176  } else
177  fSocket = -1;
178 }
179 
180 ////////////////////////////////////////////////////////////////////////////////
181 /// Create a socket; see CreateAuthSocket for the form of url.
182 /// Connect to the specified port # on the remote host.
183 /// If user is specified in url, try authentication as user.
184 /// Use tcpwindowsize to specify the size of the receive buffer, it has
185 /// to be specified here to make sure the window scale option is set (for
186 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
187 /// Returns when connection has been accepted by remote side. Use IsValid()
188 /// to check the validity of the socket. Every socket is added to the TROOT
189 /// sockets list which will make sure that any open sockets are properly
190 /// closed on program termination.
191 
192 TUDPSocket::TUDPSocket(const char *url, Int_t port)
193  : TNamed(TUrl(url).GetHost(), "")
194 {
195  R__ASSERT(gROOT);
197 
198  fUrl = TString(url);
199  TString host(TUrl(fUrl).GetHost());
200 
202  fSecContext = 0;
203  fRemoteProtocol= -1;
204  fServType = kSOCKD;
205  if (fUrl.Contains("root"))
206  fServType = kROOTD;
207  if (fUrl.Contains("proof"))
208  fServType = kPROOFD;
209  fAddress = gSystem->GetHostByName(host);
210  fAddress.fPort = port;
213  fBytesSent = 0;
214  fBytesRecv = 0;
215  fCompress = 0;
216  fUUIDs = 0;
217  fLastUsageMtx = 0;
219 
220  fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
221  if (fSocket == -1) {
222  fAddress.fPort = -1;
223  } else {
225  gROOT->GetListOfSockets()->Add(this);
226  }
227 }
228 
229 ////////////////////////////////////////////////////////////////////////////////
230 /// Create a socket in the Unix domain on 'sockpath'.
231 /// Returns when connection has been accepted by the server. Use IsValid()
232 /// to check the validity of the socket. Every socket is added to the TROOT
233 /// sockets list which will make sure that any open sockets are properly
234 /// closed on program termination.
235 
236 TUDPSocket::TUDPSocket(const char *sockpath) : TNamed(sockpath, "")
237 {
238  R__ASSERT(gROOT);
240 
241  fUrl = sockpath;
242 
243  fService = "unix";
244  fSecContext = 0;
245  fRemoteProtocol= -1;
246  fServType = kSOCKD;
247  fAddress.fPort = -1;
248  fName.Form("unix:%s", sockpath);
250  fBytesSent = 0;
251  fBytesRecv = 0;
252  fCompress = 0;
253  fUUIDs = 0;
254  fLastUsageMtx = 0;
256 
257  fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
258  if (fSocket > 0) {
260  gROOT->GetListOfSockets()->Add(this);
261  }
262 }
263 
264 ////////////////////////////////////////////////////////////////////////////////
265 /// Create a socket. The socket will adopt previously opened TCP socket with
266 /// descriptor desc.
267 
269 {
270  R__ASSERT(gROOT);
272 
273  fSecContext = 0;
274  fRemoteProtocol = 0;
275  fService = (char *)kSOCKD;
276  fServType = kSOCKD;
277  fBytesSent = 0;
278  fBytesRecv = 0;
279  fCompress = 0;
280  fUUIDs = 0;
281  fLastUsageMtx = 0;
283 
284  if (desc >= 0) {
285  fSocket = desc;
288  gROOT->GetListOfSockets()->Add(this);
289  } else
290  fSocket = -1;
291 }
292 
293 ////////////////////////////////////////////////////////////////////////////////
294 /// Create a socket. The socket will adopt previously opened Unix socket with
295 /// descriptor desc. The sockpath arg is for info purposes only. Use
296 /// this method to adopt e.g. a socket created via socketpair().
297 
298 TUDPSocket::TUDPSocket(Int_t desc, const char *sockpath) : TNamed(sockpath, "")
299 {
300  R__ASSERT(gROOT);
302 
303  fUrl = sockpath;
304 
305  fService = "unix";
306  fSecContext = 0;
307  fRemoteProtocol= -1;
308  fServType = kSOCKD;
309  fAddress.fPort = -1;
310  fName.Form("unix:%s", sockpath);
312  fBytesSent = 0;
313  fBytesRecv = 0;
314  fCompress = 0;
315  fUUIDs = 0;
316  fLastUsageMtx = 0;
318 
319  if (desc >= 0) {
320  fSocket = desc;
322  gROOT->GetListOfSockets()->Add(this);
323  } else
324  fSocket = -1;
325 }
326 
327 
328 ////////////////////////////////////////////////////////////////////////////////
329 /// TUDPSocket copy ctor.
330 
332 {
333  fSocket = s.fSocket;
334  fService = s.fService;
335  fAddress = s.fAddress;
339  fCompress = s.fCompress;
342  fServType = s.fServType;
343  fUUIDs = 0;
344  fLastUsageMtx = 0;
346 
347  if (fSocket != -1) {
349  gROOT->GetListOfSockets()->Add(this);
350  }
351 }
352 
353 ////////////////////////////////////////////////////////////////////////////////
354 /// Close the socket. If option is "force", calls shutdown(id,2) to
355 /// shut down the connection. This will close the connection also
356 /// for the parent of this process. Also called via the dtor (without
357 /// option "force", call explicitly Close("force") if this is desired).
358 
360 {
361  Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
362 
363  if (fSocket != -1) {
366  gROOT->GetListOfSockets()->Remove(this);
367  }
368  fSocket = -1;
369 
372 }
373 
374 ////////////////////////////////////////////////////////////////////////////////
375 /// Return internet address of local host to which the socket is bound.
376 /// In case of error TInetAddress::IsValid() returns kFALSE.
377 
379 {
380  if (IsValid()) {
381  if (fLocalAddress.GetPort() == -1)
383  return fLocalAddress;
384  }
385  return TInetAddress();
386 }
387 
388 ////////////////////////////////////////////////////////////////////////////////
389 /// Return the local port # to which the socket is bound.
390 /// In case of error return -1.
391 
393 {
394  if (IsValid()) {
395  if (fLocalAddress.GetPort() == -1)
397  return fLocalAddress.GetPort();
398  }
399  return -1;
400 }
401 
402 ////////////////////////////////////////////////////////////////////////////////
403 /// Waits for this socket to change status. If interest=kRead,
404 /// the socket will be watched to see if characters become available for
405 /// reading; if interest=kWrite the socket will be watched to
406 /// see if a write will not block.
407 /// The argument 'timeout' specifies a maximum time to wait in millisec.
408 /// Default no timeout.
409 /// Returns 1 if a change of status of interest has been detected within
410 /// timeout; 0 in case of timeout; < 0 if an error occured.
411 
413 {
414  Int_t rc = 1;
415 
416  // Associate a TFileHandler to this socket
417  TFileHandler fh(fSocket, interest);
418 
419  // Wait for an event now
420  rc = gSystem->Select(&fh, timeout);
421 
422  return rc;
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Send a single message opcode. Use kind (opcode) to set the
427 /// TMessage "what" field. Returns the number of bytes that were sent
428 /// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
429 /// been or'ed with kMESS_ACK, the call will only return after having
430 /// received an acknowledgement, making the sending process synchronous.
431 
433 {
434  TMessage mess(kind);
435 
436  Int_t nsent;
437  if ((nsent = Send(mess)) < 0)
438  return -1;
439 
440  return nsent;
441 }
442 
443 ////////////////////////////////////////////////////////////////////////////////
444 /// Send a status and a single message opcode. Use kind (opcode) to set the
445 /// TMessage "what" field. Returns the number of bytes that were sent
446 /// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
447 /// been or'ed with kMESS_ACK, the call will only return after having
448 /// received an acknowledgement, making the sending process synchronous.
449 
451 {
452  TMessage mess(kind);
453  mess << status;
454 
455  Int_t nsent;
456  if ((nsent = Send(mess)) < 0)
457  return -1;
458 
459  return nsent;
460 }
461 
462 ////////////////////////////////////////////////////////////////////////////////
463 /// Send a character string buffer. Use kind to set the TMessage "what" field.
464 /// Returns the number of bytes in the string str that were sent and -1 in
465 /// case of error. In case the kind has been or'ed with kMESS_ACK, the call
466 /// will only return after having received an acknowledgement, making the
467 /// sending process synchronous.
468 
469 Int_t TUDPSocket::Send(const char *str, Int_t kind)
470 {
471  TMessage mess(kind);
472  if (str) mess.WriteString(str);
473 
474  Int_t nsent;
475  if ((nsent = Send(mess)) < 0)
476  return -1;
477 
478  return nsent - sizeof(Int_t); // - TMessage::What()
479 }
480 
481 ////////////////////////////////////////////////////////////////////////////////
482 /// Send a TMessage object. Returns the number of bytes in the TMessage
483 /// that were sent and -1 in case of error. In case the TMessage::What
484 /// has been or'ed with kMESS_ACK, the call will only return after having
485 /// received an acknowledgement, making the sending process synchronous.
486 /// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
487 /// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
488 /// support for streaming TStreamerInfo added by Rene Brun May 2008
489 /// support for streaming TProcessID added by Rene Brun June 2008
490 
492 {
494 
495  if (fSocket == -1) return -1;
496 
497  if (mess.IsReading()) {
498  Error("Send", "cannot send a message used for reading");
499  return -1;
500  }
501 
502  // send streamer infos in case schema evolution is enabled in the TMessage
503  SendStreamerInfos(mess);
504 
505  // send the process id's so TRefs work
506  SendProcessIDs(mess);
507 
508  mess.SetLength(); //write length in first word of buffer
509 
510  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
511  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
512 
513  if (mess.GetCompressionLevel() > 0)
514  const_cast<TMessage&>(mess).Compress();
515 
516  char *mbuf = mess.Buffer();
517  Int_t mlen = mess.Length();
518  if (mess.CompBuffer()) {
519  mbuf = mess.CompBuffer();
520  mlen = mess.CompLength();
521  }
522 
524  Int_t nsent;
525  if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
526  if (nsent == -5) {
527  // Connection reset by peer or broken
529  Close();
530  }
531  return nsent;
532  }
533 
534  fBytesSent += nsent;
535  fgBytesSent += nsent;
536 
537  // If acknowledgement is desired, wait for it
538  if (mess.What() & kMESS_ACK) {
541  char buf[2];
542  Int_t n = 0;
543  if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
544  if (n == -5) {
545  // Connection reset by peer or broken
547  Close();
548  } else
549  n = -1;
550  return n;
551  }
552  if (strncmp(buf, "ok", 2)) {
553  Error("Send", "bad acknowledgement");
554  return -1;
555  }
556  fBytesRecv += 2;
557  fgBytesRecv += 2;
558  }
559 
560  Touch(); // update usage timestamp
561 
562  return nsent - sizeof(UInt_t); //length - length header
563 }
564 
565 ////////////////////////////////////////////////////////////////////////////////
566 /// Send an object. Returns the number of bytes sent and -1 in case of error.
567 /// In case the "kind" has been or'ed with kMESS_ACK, the call will only
568 /// return after having received an acknowledgement, making the sending
569 /// synchronous.
570 
572 {
573  //stream object to message buffer
574  TMessage mess(kind);
575  mess.WriteObject(obj);
576 
577  //now sending the object itself
578  Int_t nsent;
579  if ((nsent = Send(mess)) < 0)
580  return -1;
581 
582  return nsent;
583 }
584 
585 ////////////////////////////////////////////////////////////////////////////////
586 /// Send a raw buffer of specified length. Using option kOob one can send
587 /// OOB data. Returns the number of bytes sent or -1 in case of error.
588 /// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
589 /// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
590 
591 Int_t TUDPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
592 {
594 
595  if (fSocket == -1) return -1;
596 
598  Int_t nsent;
599  if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
600  if (nsent == -5) {
601  // Connection reset or broken: close
603  Close();
604  }
605  return nsent;
606  }
607 
608  fBytesSent += nsent;
609  fgBytesSent += nsent;
610 
611  Touch(); // update usage timestamp
612 
613  return nsent;
614 }
615 
616 ////////////////////////////////////////////////////////////////////////////////
617 /// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
618 /// in the object in the message is in the fInfos list of the message.
619 /// We send only the TStreamerInfos not yet sent on this socket.
620 
622 {
623  if (mess.fInfos && mess.fInfos->GetEntries()) {
624  TIter next(mess.fInfos);
625  TStreamerInfo *info;
626  TList *minilist = 0;
627  while ((info = (TStreamerInfo*)next())) {
628  Int_t uid = info->GetNumber();
629  if (fBitsInfo.TestBitNumber(uid))
630  continue; //TStreamerInfo had already been sent
631  fBitsInfo.SetBitNumber(uid);
632  if (!minilist)
633  minilist = new TList();
634  if (gDebug > 0)
635  Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
636  info->GetName(),info->GetClassVersion());
637  minilist->Add(info);
638  }
639  if (minilist) {
640  TMessage messinfo(kMESS_STREAMERINFO);
641  messinfo.WriteObject(minilist);
642  delete minilist;
643  if (messinfo.fInfos)
644  messinfo.fInfos->Clear();
645  if (Send(messinfo) < 0)
646  Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
647  }
648  }
649 }
650 
651 ////////////////////////////////////////////////////////////////////////////////
652 /// Check if TProcessIDs must be sent. The list of TProcessIDs
653 /// in the object in the message is found by looking in the TMessage bits.
654 /// We send only the TProcessIDs not yet send on this socket.
655 
657 {
658  if (mess.TestBitNumber(0)) {
659  TObjArray *pids = TProcessID::GetPIDs();
660  Int_t npids = pids->GetEntries();
661  TProcessID *pid;
662  TList *minilist = 0;
663  for (Int_t ipid = 0; ipid < npids; ipid++) {
664  pid = (TProcessID*)pids->At(ipid);
665  if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
666  continue;
667  //check if a pid with this title has already been sent through the socket
668  //if not add it to the fUUIDs list
669  if (!fUUIDs) {
670  fUUIDs = new TList();
671  } else {
672  if (fUUIDs->FindObject(pid->GetTitle()))
673  continue;
674  }
675  fUUIDs->Add(new TObjString(pid->GetTitle()));
676  if (!minilist)
677  minilist = new TList();
678  if (gDebug > 0)
679  Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
680  minilist->Add(pid);
681  }
682  if (minilist) {
683  TMessage messpid(kMESS_PROCESSID);
684  messpid.WriteObject(minilist);
685  delete minilist;
686  if (Send(messpid) < 0)
687  Warning("SendProcessIDs", "problems sending TProcessID's ...");
688  }
689  }
690 }
691 
692 ////////////////////////////////////////////////////////////////////////////////
693 /// Receive a character string message of maximum max length. The expected
694 /// message must be of type kMESS_STRING. Returns length of received string
695 /// (can be 0 if otherside of connection is closed) or -1 in case of error
696 /// or -4 in case a non-blocking socket would block (i.e. there is nothing
697 /// to be read).
698 
699 Int_t TUDPSocket::Recv(char *str, Int_t max)
700 {
701  Int_t n, kind;
702 
704  if ((n = Recv(str, max, kind)) <= 0) {
705  if (n == -5) {
707  n = -1;
708  }
709  return n;
710  }
711 
712  if (kind != kMESS_STRING) {
713  Error("Recv", "got message of wrong kind (expected %d, got %d)",
714  kMESS_STRING, kind);
715  return -1;
716  }
717 
718  return n;
719 }
720 
721 ////////////////////////////////////////////////////////////////////////////////
722 /// Receive a character string message of maximum max length. Returns in
723 /// kind the message type. Returns length of received string+4 (can be 0 if
724 /// other side of connection is closed) or -1 in case of error or -4 in
725 /// case a non-blocking socket would block (i.e. there is nothing to be read).
726 
727 Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
728 {
729  Int_t n;
730  TMessage *mess;
731 
733  if ((n = Recv(mess)) <= 0) {
734  if (n == -5) {
736  n = -1;
737  }
738  return n;
739  }
740 
741  kind = mess->What();
742  if (str) {
743  if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
744  mess->ReadString(str, max);
745  else
746  str[0] = 0;
747  }
748 
749  delete mess;
750 
751  return n; // number of bytes read (len of str + sizeof(kind)
752 }
753 
754 ////////////////////////////////////////////////////////////////////////////////
755 /// Receives a status and a message type. Returns length of received
756 /// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
757 /// is closed) or -1 in case of error or -4 in case a non-blocking
758 /// socket would block (i.e. there is nothing to be read).
759 
761 {
762  Int_t n;
763  TMessage *mess;
764 
766  if ((n = Recv(mess)) <= 0) {
767  if (n == -5) {
769  n = -1;
770  }
771  return n;
772  }
773 
774  kind = mess->What();
775  (*mess) >> status;
776 
777  delete mess;
778 
779  return n; // number of bytes read (2 * sizeof(Int_t)
780 }
781 
782 ////////////////////////////////////////////////////////////////////////////////
783 /// Receive a TMessage object. The user must delete the TMessage object.
784 /// Returns length of message in bytes (can be 0 if other side of connection
785 /// is closed) or -1 in case of error or -4 in case a non-blocking socket
786 /// would block (i.e. there is nothing to be read) or -5 if pipe broken
787 /// or reset by peer (EPIPE || ECONNRESET). In those case mess == 0.
788 
790 {
792 
793  if (fSocket == -1) {
794  mess = 0;
795  return -1;
796  }
797 
798 oncemore:
800  Int_t n;
801  UInt_t len;
802  if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
803  if (n == 0 || n == -5) {
804  // Connection closed, reset or broken
806  Close();
807  }
808  mess = 0;
809  return n;
810  }
811  len = net2host(len); //from network to host byte order
812 
814  char *buf = new char[len+sizeof(UInt_t)];
815  if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
816  if (n == 0 || n == -5) {
817  // Connection closed, reset or broken
819  Close();
820  }
821  delete [] buf;
822  mess = 0;
823  return n;
824  }
825 
826  fBytesRecv += n + sizeof(UInt_t);
827  fgBytesRecv += n + sizeof(UInt_t);
828 
829  mess = new TMessage(buf, len+sizeof(UInt_t));
830 
831  // receive any streamer infos
832  if (RecvStreamerInfos(mess))
833  goto oncemore;
834 
835  // receive any process ids
836  if (RecvProcessIDs(mess))
837  goto oncemore;
838 
839  if (mess->What() & kMESS_ACK) {
841  char ok[2] = { 'o', 'k' };
842  Int_t n2 = 0;
843  if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
844  if (n2 == -5) {
845  // Connection reset or broken
847  Close();
848  }
849  delete mess;
850  mess = 0;
851  return n2;
852  }
853  mess->SetWhat(mess->What() & ~kMESS_ACK);
854 
855  fBytesSent += 2;
856  fgBytesSent += 2;
857  }
858 
859  Touch(); // update usage timestamp
860 
861  return n;
862 }
863 
864 ////////////////////////////////////////////////////////////////////////////////
865 /// Receive a raw buffer of specified length bytes. Using option kPeek
866 /// one can peek at incoming data. Returns number of received bytes.
867 /// Returns -1 in case of error. In case of opt == kOob: -2 means
868 /// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
869 /// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
870 /// peer (EPIPE || ECONNRESET).
871 
873 {
875 
876  if (fSocket == -1) return -1;
877  if (length == 0) return 0;
878 
880  Int_t n;
881  if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
882  if (n == 0 || n == -5) {
883  // Connection closed, reset or broken
885  Close();
886  }
887  return n;
888  }
889 
890  fBytesRecv += n;
891  fgBytesRecv += n;
892 
893  Touch(); // update usage timestamp
894 
895  return n;
896 }
897 
898 ////////////////////////////////////////////////////////////////////////////////
899 /// Receive a message containing streamer infos. In case the message contains
900 /// streamer infos they are imported, the message will be deleted and the
901 /// method returns kTRUE.
902 
904 {
905  if (mess->What() == kMESS_STREAMERINFO) {
906  TList *list = (TList*)mess->ReadObject(TList::Class());
907  TIter next(list);
908  TStreamerInfo *info;
909  TObjLink *lnk = list->FirstLink();
910  // First call BuildCheck for regular class
911  while (lnk) {
912  info = (TStreamerInfo*)lnk->GetObject();
913  TObject *element = info->GetElements()->UncheckedAt(0);
914  Bool_t isstl = element && strcmp("This",element->GetName())==0;
915  if (!isstl) {
916  info->BuildCheck();
917  if (gDebug > 0)
918  Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
919  info->GetName(), info->GetClassVersion());
920  }
921  lnk = lnk->Next();
922  }
923  // Then call BuildCheck for stl class
924  lnk = list->FirstLink();
925  while (lnk) {
926  info = (TStreamerInfo*)lnk->GetObject();
927  TObject *element = info->GetElements()->UncheckedAt(0);
928  Bool_t isstl = element && strcmp("This",element->GetName())==0;
929  if (isstl) {
930  info->BuildCheck();
931  if (gDebug > 0)
932  Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
933  info->GetName(), info->GetClassVersion());
934  }
935  lnk = lnk->Next();
936  }
937  delete list;
938  delete mess;
939 
940  return kTRUE;
941  }
942  return kFALSE;
943 }
944 
945 ////////////////////////////////////////////////////////////////////////////////
946 /// Receive a message containing process ids. In case the message contains
947 /// process ids they are imported, the message will be deleted and the
948 /// method returns kTRUE.
949 
951 {
952  if (mess->What() == kMESS_PROCESSID) {
953  TList *list = (TList*)mess->ReadObject(TList::Class());
954  TIter next(list);
955  TProcessID *pid;
956  while ((pid = (TProcessID*)next())) {
957  // check that a similar pid is not already registered in fgPIDs
958  TObjArray *pidslist = TProcessID::GetPIDs();
959  TIter nextpid(pidslist);
960  TProcessID *p;
961  while ((p = (TProcessID*)nextpid())) {
962  if (!strcmp(p->GetTitle(), pid->GetTitle())) {
963  delete pid;
964  pid = 0;
965  break;
966  }
967  }
968  if (pid) {
969  if (gDebug > 0)
970  Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
971  pid->IncrementCount();
972  pidslist->Add(pid);
973  Int_t ind = pidslist->IndexOf(pid);
974  pid->SetUniqueID((UInt_t)ind);
975  }
976  }
977  delete list;
978  delete mess;
979 
980  return kTRUE;
981  }
982  return kFALSE;
983 }
984 
985 ////////////////////////////////////////////////////////////////////////////////
986 /// Set socket options.
987 
989 {
990  if (fSocket == -1) return -1;
991 
992  return gSystem->SetSockOpt(fSocket, opt, val);
993 }
994 
995 ////////////////////////////////////////////////////////////////////////////////
996 /// Get socket options. Returns -1 in case of error.
997 
999 {
1000  if (fSocket == -1) return -1;
1001 
1002  return gSystem->GetSockOpt(fSocket, opt, &val);
1003 }
1004 
1005 ////////////////////////////////////////////////////////////////////////////////
1006 /// Returns error code. Meaning depends on context where it is called.
1007 /// If no error condition returns 0 else a value < 0.
1008 /// For example see TServerSocket ctor.
1009 
1011 {
1012  if (!IsValid())
1013  return fSocket;
1014 
1015  return 0;
1016 }
1017 
1018 ////////////////////////////////////////////////////////////////////////////////
1019 /// See comments for function SetCompressionSettings
1020 
1022 {
1023  if (algorithm < 0 || algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
1024  if (fCompress < 0) {
1025  // if the level is not defined yet use 1 as a default
1026  fCompress = 100 * algorithm + 1;
1027  } else {
1028  int level = fCompress % 100;
1029  fCompress = 100 * algorithm + level;
1030  }
1031 }
1032 
1033 ////////////////////////////////////////////////////////////////////////////////
1034 /// See comments for function SetCompressionSettings
1035 
1037 {
1038  if (level < 0) level = 0;
1039  if (level > 99) level = 99;
1040  if (fCompress < 0) {
1041  // if the algorithm is not defined yet use 0 as a default
1042  fCompress = level;
1043  } else {
1044  int algorithm = fCompress / 100;
1045  if (algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
1046  fCompress = 100 * algorithm + level;
1047  }
1048 }
1049 
1050 ////////////////////////////////////////////////////////////////////////////////
1051 /// Used to specify the compression level and algorithm:
1052 /// settings = 100 * algorithm + level
1053 ///
1054 /// level = 0, objects written to this file will not be compressed.
1055 /// level = 1, minimal compression level but fast.
1056 /// ....
1057 /// level = 9, maximal compression level but slower and might use more memory.
1058 /// (For the currently supported algorithms, the maximum level is 9)
1059 /// If compress is negative it indicates the compression level is not set yet.
1060 ///
1061 /// The enumeration ROOT::ECompressionAlgorithm associates each
1062 /// algorithm with a number. There is a utility function to help
1063 /// to set the value of the argument. For example,
1064 /// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1065 /// will build an integer which will set the compression to use
1066 /// the LZMA algorithm and compression level 1. These are defined
1067 /// in the header file Compression.h.
1068 ///
1069 /// Note that the compression settings may be changed at any time.
1070 /// The new compression settings will only apply to branches created
1071 /// or attached after the setting is changed and other objects written
1072 /// after the setting is changed.
1073 
1075 {
1076  fCompress = settings;
1077 }
1078 
1079 ////////////////////////////////////////////////////////////////////////////////
1080 /// Print error string depending on error code.
1081 
1082 void TUDPSocket::NetError(const char *where, Int_t err)
1083 {
1084  // Make sure it is in range
1085  err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1086 
1087  if (gDebug > 0)
1088  ::Error(where, "%s", gRootdErrStr[err]);
1089 }
1090 
1091 ////////////////////////////////////////////////////////////////////////////////
1092 /// Get total number of bytes sent via all sockets.
1093 
1095 {
1096  return fgBytesSent;
1097 }
1098 
1099 ////////////////////////////////////////////////////////////////////////////////
1100 /// Get total number of bytes received via all sockets.
1101 
1103 {
1104  return fgBytesRecv;
1105 }
void SetCompressionSettings(Int_t settings=1)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level...
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
Definition: TUDPSocket.cxx:591
Describe Streamer information for one class version.
Definition: TStreamerInfo.h:47
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition: TObject.cxx:434
Bool_t IsReading() const
Definition: TBuffer.h:83
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TUDPSocket.cxx:359
virtual void WriteString(const char *s)
Write string to I/O buffer.
An array of TObjects.
Definition: TObjArray.h:39
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition: TUDPSocket.cxx:903
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
void Touch()
Definition: TUDPSocket.h:162
char * CompBuffer() const
Definition: TMessage.h:94
R__EXTERN const char * gRootdErrStr[]
Definition: NetErrors.h:74
Int_t fCompress
Definition: TUDPSocket.h:70
virtual TInetAddress GetLocalInetAddress()
Return internet address of local host to which the socket is bound.
Definition: TUDPSocket.cxx:378
Collectable string class.
Definition: TObjString.h:32
TList * fUUIDs
Definition: TUDPSocket.h:80
const char Option_t
Definition: RtypesCore.h:62
const char * GetHostName() const
Definition: TInetAddress.h:75
ESockOptions
Definition: TSocket.h:52
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition: TUDPSocket.cxx:656
This class represents a WWW compatible URL.
Definition: TUrl.h:41
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition: TNamed.cxx:131
virtual Int_t GetEntries() const
Definition: TCollection.h:92
This class represents an Internet Protocol (IP) address.
Definition: TInetAddress.h:40
virtual void CloseConnection(int sock, Bool_t force=kFALSE)
Close socket connection.
Definition: TSystem.cxx:2372
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition: TUDPSocket.cxx:950
#define R__ASSERT(e)
Definition: TError.h:98
#define gROOT
Definition: TROOT.h:364
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
R__EXTERN TVirtualMutex * gROOTMutex
Definition: TROOT.h:63
const Bool_t kFALSE
Definition: Rtypes.h:92
Int_t GetCompressionLevel() const
Definition: TUDPSocket.h:179
TObject * At(Int_t idx) const
Definition: TObjArray.h:167
TString fService
Definition: TUDPSocket.h:75
const char * Class
Definition: TXMLSetup.cxx:64
virtual TInetAddress GetPeerName(int sock)
Get Internet Protocol (IP) address of remote host and port #.
Definition: TSystem.cxx:2282
virtual int SendRaw(int sock, const void *buffer, int length, int flag)
Send exactly length bytes from buffer.
Definition: TSystem.cxx:2391
virtual char * GetServiceByPort(int port)
Get name of internet service.
Definition: TSystem.cxx:2309
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:739
ESendRecvOptions
Definition: TSocket.h:65
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
Int_t Length() const
Definition: TBuffer.h:96
Int_t fSocket
Definition: TUDPSocket.h:77
virtual TObject * ReadObject(const TClass *cl)
Read object from I/O buffer.
static TObjArray * GetPIDs()
static: returns array of TProcessIDs
Definition: TProcessID.cxx:321
#define SafeDelete(p)
Definition: RConfig.h:507
UShort_t net2host(UShort_t x)
Definition: Bytes.h:579
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition: TUDPSocket.cxx:988
void SetCompressionLevel(Int_t level=1)
See comments for function SetCompressionSettings.
static ULong64_t fgBytesRecv
Definition: TUDPSocket.h:85
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:33
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TUDPSocket.cxx:491
char * Buffer() const
Definition: TBuffer.h:93
Int_t CompLength() const
Definition: TMessage.h:95
Int_t GetPort() const
Definition: TInetAddress.h:77
virtual int RecvRaw(int sock, void *buffer, int length, int flag)
Receive exactly length bytes into buffer.
Definition: TSystem.cxx:2381
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition: TProcessID.h:73
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition: TBits.h:223
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition: TSystem.cxx:2273
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
Definition: TUDPSocket.cxx:571
TString fUrl
Definition: TUDPSocket.h:78
virtual void SetUniqueID(UInt_t uid)
Set the unique object id.
Definition: TObject.cxx:750
A doubly linked list.
Definition: TList.h:47
Option_t * GetOption() const
Definition: TUDPSocket.h:101
TInetAddress fAddress
Definition: TUDPSocket.h:67
Int_t GetErrorCode() const
Returns error code.
static ULong64_t fgBytesSent
Definition: TUDPSocket.h:86
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
virtual Int_t GetLocalPort()
Return the local port # to which the socket is bound.
Definition: TUDPSocket.cxx:392
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2322
Int_t IncrementCount()
Increase the reference count to this object.
Definition: TProcessID.cxx:283
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
TSecContext * fSecContext
Definition: TUDPSocket.h:73
TList * fInfos
Definition: TMessage.h:47
TString fName
Definition: TNamed.h:36
virtual TObjLink * FirstLink() const
Definition: TList.h:101
UInt_t fBytesRecv
Definition: TUDPSocket.h:68
#define R__LOCKGUARD2(mutex)
virtual int OpenConnection(const char *server, int port, int tcpwindowsize=-1, const char *protocol="tcp")
Open a connection to another host.
Definition: TSystem.cxx:2318
virtual int SetSockOpt(int sock, int kind, int val)
Set socket option.
Definition: TSystem.cxx:2418
virtual Int_t Select(TList *active, Long_t timeout)
Select on active file descriptors (called by TMonitor).
Definition: TSystem.cxx:450
static ULong64_t GetSocketBytesRecv()
Get total number of bytes received via all sockets.
UInt_t What() const
Definition: TMessage.h:80
void SetCompressionAlgorithm(Int_t algorithm=0)
See comments for function SetCompressionSettings.
long Long_t
Definition: RtypesCore.h:50
TObject * UncheckedAt(Int_t i) const
Definition: TObjArray.h:91
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TUDPSocket.cxx:789
#define ClassImp(name)
Definition: Rtypes.h:279
UInt_t fBytesSent
Definition: TUDPSocket.h:69
TBits fBitsInfo
Definition: TUDPSocket.h:79
TInetAddress fLocalAddress
Definition: TUDPSocket.h:71
virtual Int_t Select(Int_t interest=kRead, Long_t timeout=-1)
Waits for this socket to change status.
Definition: TUDPSocket.cxx:412
Int_t IndexOf(const TObject *obj) const
Definition: TObjArray.cxx:552
unsigned long long ULong64_t
Definition: RtypesCore.h:70
virtual Bool_t IsValid() const
Definition: TUDPSocket.h:137
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:567
Int_t BufferSize() const
Definition: TBuffer.h:94
virtual TInetAddress GetSockName(int sock)
Get Internet Protocol (IP) address of host and port #.
Definition: TSystem.cxx:2291
Int_t GetCompressionLevel() const
Definition: TMessage.h:112
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:349
Mother of all ROOT objects.
Definition: TObject.h:37
TObjArray * GetElements() const
Int_t GetClassVersion() const
static void NetError(const char *where, Int_t error)
Print error string depending on error code.
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:188
void BuildCheck(TFile *file=0)
Check if built and consistent with the class dictionary.
virtual int GetServiceByName(const char *service)
Get port # of internet service.
Definition: TSystem.cxx:2300
virtual void Add(TObject *obj)
Definition: TList.h:81
EServiceType fServType
Definition: TUDPSocket.h:76
Int_t fRemoteProtocol
Definition: TUDPSocket.h:72
void WriteObject(const TObject *obj)
Write object to message buffer.
Definition: TMessage.cxx:418
Int_t GetEntries() const
Return the number of objects in array (i.e.
Definition: TObjArray.cxx:494
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
void Add(TObject *obj)
Definition: TObjArray.h:75
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:281
void ResetBit(UInt_t f)
Definition: TObject.h:156
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition: TMessage.h:64
TVirtualMutex * fLastUsageMtx
Definition: TUDPSocket.h:82
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition: TUDPSocket.cxx:621
static ULong64_t GetSocketBytesSent()
Get total number of bytes sent via all sockets.
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:416
virtual int GetSockOpt(int sock, int kind, int *val)
Get socket option.
Definition: TSystem.cxx:2427
Int_t GetNumber() const
const Bool_t kTRUE
Definition: Rtypes.h:91
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TUDPSocket.cxx:872
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition: TNamed.cxx:155
const Int_t n
Definition: legend1.C:16
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:198
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52