Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TUDPSocket.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Marcelo Sousa 26/10/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/**
13\file TUDPSocket.cxx
14\class TUDPSocket
15\brief This class implements UDP client sockets.
16\note This class deals with sockets: the user is entirely responsible for the security of their usage, for example, but
17not limited to, the management of the connections to said sockets.
18
19A socket is an endpoint for communication between two machines. The actual work is done via the TSystem class (either
20TUnixSystem or TWinNTSystem).
21**/
22
23#include "Bytes.h"
24#include "Compression.h"
25#include "NetErrors.h"
26#include "TError.h"
27#include "TMessage.h"
28#include "TUDPSocket.h"
29#include "TObjString.h"
30#include "TPluginManager.h"
31#include "TROOT.h"
32#include "TString.h"
33#include "TSystem.h"
34#include "TUrl.h"
35#include "TStreamerInfo.h"
36#include "TProcessID.h"
37
38#include <limits>
39
42
43
44
45////////////////////////////////////////////////////////////////////////////////
46/// Create a socket. Connect to the named service at address addr.
47/// Use tcpwindowsize to specify the size of the receive buffer, it has
48/// to be specified here to make sure the window scale option is set (for
49/// tcpwindowsize > 65KB and for platforms supporting window scaling).
50/// Returns when connection has been accepted by remote side. Use IsValid()
51/// to check the validity of the socket. Every socket is added to the TROOT
52/// sockets list which will make sure that any open sockets are properly
53/// closed on program termination.
54
56 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
57{
60
62 fSecContext = 0;
65 if (fService.Contains("root"))
67 fAddress = addr;
69 fBytesSent = 0;
70 fBytesRecv = 0;
71 fUUIDs = 0;
72 fLastUsageMtx = 0;
74
75 if (fAddress.GetPort() != -1) {
77 -1, "upd");
78
79 if (fSocket != -1) {
81 gROOT->GetListOfSockets()->Add(this);
82 }
83 } else
84 fSocket = -1;
85
86}
87
88
89////////////////////////////////////////////////////////////////////////////////
90/// Create a socket. Connect to the specified port # at address addr.
91/// Use tcpwindowsize to specify the size of the receive buffer, it has
92/// to be specified here to make sure the window scale option is set (for
93/// tcpwindowsize > 65KB and for platforms supporting window scaling).
94/// Returns when connection has been accepted by remote side. Use IsValid()
95/// to check the validity of the socket. Every socket is added to the TROOT
96/// sockets list which will make sure that any open sockets are properly
97/// closed on program termination.
98
100 : TNamed(addr.GetHostName(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
101{
104
106 fSecContext = 0;
107 fRemoteProtocol= -1;
109 if (fService.Contains("root"))
111 fAddress = addr;
112 fAddress.fPort = port;
114 fBytesSent = 0;
115 fBytesRecv = 0;
116 fUUIDs = 0;
117 fLastUsageMtx = 0;
119
121 -1, "upd");
122 if (fSocket == -1)
123 fAddress.fPort = -1;
124 else {
126 gROOT->GetListOfSockets()->Add(this);
127 }
128}
129
130////////////////////////////////////////////////////////////////////////////////
131/// Create a socket. Connect to named service on the remote host.
132/// Use tcpwindowsize to specify the size of the receive buffer, it has
133/// to be specified here to make sure the window scale option is set (for
134/// tcpwindowsize > 65KB and for platforms supporting window scaling).
135/// Returns when connection has been accepted by remote side. Use IsValid()
136/// to check the validity of the socket. Every socket is added to the TROOT
137/// sockets list which will make sure that any open sockets are properly
138/// closed on program termination.
139
140TUDPSocket::TUDPSocket(const char *host, const char *service)
141 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
142{
145
147 fSecContext = 0;
148 fRemoteProtocol= -1;
150 if (fService.Contains("root"))
155 fBytesSent = 0;
156 fBytesRecv = 0;
157 fUUIDs = 0;
158 fLastUsageMtx = 0;
160
161 if (fAddress.GetPort() != -1) {
162 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
163 if (fSocket != -1) {
165 gROOT->GetListOfSockets()->Add(this);
166 }
167 } else
168 fSocket = -1;
169}
170
171////////////////////////////////////////////////////////////////////////////////
172/// Create a socket; see CreateAuthSocket for the form of url.
173/// Connect to the specified port # on the remote host.
174/// If user is specified in url, try authentication as user.
175/// Use tcpwindowsize to specify the size of the receive buffer, it has
176/// to be specified here to make sure the window scale option is set (for
177/// tcpwindowsize > 65KB and for platforms supporting window scaling).
178/// Returns when connection has been accepted by remote side. Use IsValid()
179/// to check the validity of the socket. Every socket is added to the TROOT
180/// sockets list which will make sure that any open sockets are properly
181/// closed on program termination.
182
184 : TNamed(TUrl(url).GetHost(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
185{
188
189 fUrl = TString(url);
190 TString host(TUrl(fUrl).GetHost());
191
193 fSecContext = 0;
194 fRemoteProtocol= -1;
196 if (fUrl.Contains("root"))
199 fAddress.fPort = port;
202 fBytesSent = 0;
203 fBytesRecv = 0;
204 fUUIDs = 0;
205 fLastUsageMtx = 0;
207
208 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
209 if (fSocket == -1) {
210 fAddress.fPort = -1;
211 } else {
213 gROOT->GetListOfSockets()->Add(this);
214 }
215}
216
217////////////////////////////////////////////////////////////////////////////////
218/// Create a socket in the Unix domain on 'sockpath'.
219/// Returns when connection has been accepted by the server. Use IsValid()
220/// to check the validity of the socket. Every socket is added to the TROOT
221/// sockets list which will make sure that any open sockets are properly
222/// closed on program termination.
223
225 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
226{
229
230 fUrl = sockpath;
231
232 fService = "unix";
233 fSecContext = 0;
234 fRemoteProtocol= -1;
236 fAddress.fPort = -1;
237 fName.Form("unix:%s", sockpath);
239 fBytesSent = 0;
240 fBytesRecv = 0;
241 fUUIDs = 0;
242 fLastUsageMtx = 0;
244
245 fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
246 if (fSocket > 0) {
248 gROOT->GetListOfSockets()->Add(this);
249 }
250}
251
252////////////////////////////////////////////////////////////////////////////////
253/// Create a socket. The socket will adopt previously opened TCP socket with
254/// descriptor desc.
255
256TUDPSocket::TUDPSocket(Int_t desc) : TNamed("", ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
257{
260
261 fSecContext = 0;
262 fRemoteProtocol = 0;
263 fService = (char *)kSOCKD;
265 fBytesSent = 0;
266 fBytesRecv = 0;
267 fUUIDs = 0;
268 fLastUsageMtx = 0;
270
271 if (desc >= 0) {
272 fSocket = desc;
275 gROOT->GetListOfSockets()->Add(this);
276 } else
277 fSocket = -1;
278}
279
280////////////////////////////////////////////////////////////////////////////////
281/// Create a socket. The socket will adopt previously opened Unix socket with
282/// descriptor desc. The sockpath arg is for info purposes only. Use
283/// this method to adopt e.g. a socket created via socketpair().
284
286 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
287{
290
291 fUrl = sockpath;
292
293 fService = "unix";
294 fSecContext = 0;
295 fRemoteProtocol= -1;
297 fAddress.fPort = -1;
298 fName.Form("unix:%s", sockpath);
300 fBytesSent = 0;
301 fBytesRecv = 0;
302 fUUIDs = 0;
303 fLastUsageMtx = 0;
305
306 if (desc >= 0) {
307 fSocket = desc;
309 gROOT->GetListOfSockets()->Add(this);
310 } else
311 fSocket = -1;
312}
313
314
315////////////////////////////////////////////////////////////////////////////////
316/// TUDPSocket copy ctor.
317
319{
320 fSocket = s.fSocket;
321 fService = s.fService;
322 fAddress = s.fAddress;
330 fUUIDs = 0;
331 fLastUsageMtx = 0;
333
334 if (fSocket != -1) {
336 gROOT->GetListOfSockets()->Add(this);
337 }
338}
339
340////////////////////////////////////////////////////////////////////////////////
341/// Close the socket. If option is "force", calls shutdown(id,2) to
342/// shut down the connection. This will close the connection also
343/// for the parent of this process. Also called via the dtor (without
344/// option "force", call explicitly Close("force") if this is desired).
345
347{
348 Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
349
350 if (fSocket != -1) {
353 gROOT->GetListOfSockets()->Remove(this);
354 }
355 fSocket = -1;
356
359}
360
361////////////////////////////////////////////////////////////////////////////////
362/// Return internet address of local host to which the socket is bound.
363/// In case of error TInetAddress::IsValid() returns kFALSE.
364
366{
367 if (IsValid()) {
368 if (fLocalAddress.GetPort() == -1)
370 return fLocalAddress;
371 }
372 return TInetAddress();
373}
374
375////////////////////////////////////////////////////////////////////////////////
376/// Return the local port # to which the socket is bound.
377/// In case of error return -1.
378
380{
381 if (IsValid()) {
382 if (fLocalAddress.GetPort() == -1)
384 return fLocalAddress.GetPort();
385 }
386 return -1;
387}
388
389////////////////////////////////////////////////////////////////////////////////
390/// Waits for this socket to change status. If interest=kRead,
391/// the socket will be watched to see if characters become available for
392/// reading; if interest=kWrite the socket will be watched to
393/// see if a write will not block.
394/// The argument 'timeout' specifies a maximum time to wait in millisec.
395/// Default no timeout.
396/// Returns 1 if a change of status of interest has been detected within
397/// timeout; 0 in case of timeout; < 0 if an error occurred.
398
400{
401 Int_t rc = 1;
402
403 // Associate a TFileHandler to this socket
405
406 // Wait for an event now
407 rc = gSystem->Select(&fh, timeout);
408
409 return rc;
410}
411
412////////////////////////////////////////////////////////////////////////////////
413/// Send a single message opcode. Use kind (opcode) to set the
414/// TMessage "what" field. Returns the number of bytes that were sent
415/// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
416/// been or'ed with kMESS_ACK, the call will only return after having
417/// received an acknowledgement, making the sending process synchronous.
418
420{
421 TMessage mess(kind);
422
423 Int_t nsent;
424 if ((nsent = Send(mess)) < 0)
425 return -1;
426
427 return nsent;
428}
429
430////////////////////////////////////////////////////////////////////////////////
431/// Send a status and a single message opcode. Use kind (opcode) to set the
432/// TMessage "what" field. Returns the number of bytes that were sent
433/// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
434/// been or'ed with kMESS_ACK, the call will only return after having
435/// received an acknowledgement, making the sending process synchronous.
436
438{
439 TMessage mess(kind);
440 mess << status;
441
442 Int_t nsent;
443 if ((nsent = Send(mess)) < 0)
444 return -1;
445
446 return nsent;
447}
448
449////////////////////////////////////////////////////////////////////////////////
450/// Send a character string buffer. Use kind to set the TMessage "what" field.
451/// Returns the number of bytes in the string str that were sent and -1 in
452/// case of error. In case the kind has been or'ed with kMESS_ACK, the call
453/// will only return after having received an acknowledgement, making the
454/// sending process synchronous.
455
456Int_t TUDPSocket::Send(const char *str, Int_t kind)
457{
458 TMessage mess(kind);
459 if (str) mess.WriteString(str);
460
461 Int_t nsent;
462 if ((nsent = Send(mess)) < 0)
463 return -1;
464
465 return nsent - sizeof(Int_t); // - TMessage::What()
466}
467
468////////////////////////////////////////////////////////////////////////////////
469/// Send a TMessage object. Returns the number of bytes in the TMessage
470/// that were sent and -1 in case of error. In case the TMessage::What
471/// has been or'ed with kMESS_ACK, the call will only return after having
472/// received an acknowledgement, making the sending process synchronous.
473/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
474/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
475/// support for streaming TStreamerInfo added by Rene Brun May 2008
476/// support for streaming TProcessID added by Rene Brun June 2008
477
479{
481
482 if (fSocket == -1) return -1;
483
484 if (mess.IsReading()) {
485 Error("Send", "cannot send a message used for reading");
486 return -1;
487 }
488
489 // send streamer infos in case schema evolution is enabled in the TMessage
491
492 // send the process id's so TRefs work
494
495 mess.SetLength(); //write length in first word of buffer
496
497 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
499
500 if (mess.GetCompressionLevel() > 0)
501 const_cast<TMessage&>(mess).Compress();
502
503 char *mbuf = mess.Buffer();
504 Int_t mlen = mess.Length();
505 if (mess.CompBuffer()) {
506 mbuf = mess.CompBuffer();
507 mlen = mess.CompLength();
508 }
509
511 Int_t nsent;
512 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
513 if (nsent == -5) {
514 // Connection reset by peer or broken
516 Close();
517 }
518 return nsent;
519 }
520
521 fBytesSent += nsent;
523
524 // If acknowledgement is desired, wait for it
525 if (mess.What() & kMESS_ACK) {
528 char buf[2];
529 Int_t n = 0;
530 if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
531 if (n == -5) {
532 // Connection reset by peer or broken
534 Close();
535 } else
536 n = -1;
537 return n;
538 }
539 if (strncmp(buf, "ok", 2)) {
540 Error("Send", "bad acknowledgement");
541 return -1;
542 }
543 fBytesRecv += 2;
544 fgBytesRecv += 2;
545 }
546
547 Touch(); // update usage timestamp
548
549 return nsent - sizeof(UInt_t); //length - length header
550}
551
552////////////////////////////////////////////////////////////////////////////////
553/// Send an object. Returns the number of bytes sent and -1 in case of error.
554/// In case the "kind" has been or'ed with kMESS_ACK, the call will only
555/// return after having received an acknowledgement, making the sending
556/// synchronous.
557
559{
560 //stream object to message buffer
561 TMessage mess(kind);
562 mess.WriteObject(obj);
563
564 //now sending the object itself
565 Int_t nsent;
566 if ((nsent = Send(mess)) < 0)
567 return -1;
568
569 return nsent;
570}
571
572////////////////////////////////////////////////////////////////////////////////
573/// Send a raw buffer of specified length. Using option kOob one can send
574/// OOB data. Returns the number of bytes sent or -1 in case of error.
575/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
576/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
577
579{
581
582 if (fSocket == -1) return -1;
583
585 Int_t nsent;
586 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
587 if (nsent == -5) {
588 // Connection reset or broken: close
590 Close();
591 }
592 return nsent;
593 }
594
595 fBytesSent += nsent;
597
598 Touch(); // update usage timestamp
599
600 return nsent;
601}
602
603////////////////////////////////////////////////////////////////////////////////
604/// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
605/// in the object in the message is in the fInfos list of the message.
606/// We send only the TStreamerInfos not yet sent on this socket.
607
609{
610 if (mess.fInfos && mess.fInfos->GetEntries()) {
611 TIter next(mess.fInfos);
613 TList *minilist = 0;
614 while ((info = (TStreamerInfo*)next())) {
615 Int_t uid = info->GetNumber();
616 if (fBitsInfo.TestBitNumber(uid))
617 continue; //TStreamerInfo had already been sent
619 if (!minilist)
620 minilist = new TList();
621 if (gDebug > 0)
622 Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
623 info->GetName(),info->GetClassVersion());
624 minilist->Add(info);
625 }
626 if (minilist) {
628 messinfo.WriteObject(minilist);
629 delete minilist;
630 if (messinfo.fInfos)
631 messinfo.fInfos->Clear();
632 if (Send(messinfo) < 0)
633 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
634 }
635 }
636}
637
638////////////////////////////////////////////////////////////////////////////////
639/// Check if TProcessIDs must be sent. The list of TProcessIDs
640/// in the object in the message is found by looking in the TMessage bits.
641/// We send only the TProcessIDs not yet send on this socket.
642
644{
645 if (mess.TestBitNumber(0)) {
647 Int_t npids = pids->GetEntries();
648 TProcessID *pid;
649 TList *minilist = 0;
650 for (Int_t ipid = 0; ipid < npids; ipid++) {
651 pid = (TProcessID*)pids->At(ipid);
652 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
653 continue;
654 //check if a pid with this title has already been sent through the socket
655 //if not add it to the fUUIDs list
656 if (!fUUIDs) {
657 fUUIDs = new TList();
658 } else {
659 if (fUUIDs->FindObject(pid->GetTitle()))
660 continue;
661 }
662 fUUIDs->Add(new TObjString(pid->GetTitle()));
663 if (!minilist)
664 minilist = new TList();
665 if (gDebug > 0)
666 Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
667 minilist->Add(pid);
668 }
669 if (minilist) {
671 messpid.WriteObject(minilist);
672 delete minilist;
673 if (Send(messpid) < 0)
674 Warning("SendProcessIDs", "problems sending TProcessID's ...");
675 }
676 }
677}
678
679////////////////////////////////////////////////////////////////////////////////
680/// Receive a character string message of maximum max length. The expected
681/// message must be of type kMESS_STRING. Returns length of received string
682/// (can be 0 if otherside of connection is closed) or -1 in case of error
683/// or -4 in case a non-blocking socket would block (i.e. there is nothing
684/// to be read).
685
687{
688 Int_t n, kind;
689
691 if ((n = Recv(str, max, kind)) <= 0) {
692 if (n == -5) {
694 n = -1;
695 }
696 return n;
697 }
698
699 if (kind != kMESS_STRING) {
700 Error("Recv", "got message of wrong kind (expected %d, got %d)",
701 kMESS_STRING, kind);
702 return -1;
703 }
704
705 return n;
706}
707
708////////////////////////////////////////////////////////////////////////////////
709/// Receive a character string message of maximum max length. Returns in
710/// kind the message type. Returns length of received string+4 (can be 0 if
711/// other side of connection is closed) or -1 in case of error or -4 in
712/// case a non-blocking socket would block (i.e. there is nothing to be read).
713
714Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
715{
716 Int_t n;
717 TMessage *mess;
718
720 if ((n = Recv(mess)) <= 0) {
721 if (n == -5) {
723 n = -1;
724 }
725 return n;
726 }
727
728 kind = mess->What();
729 if (str) {
730 if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
731 mess->ReadString(str, max);
732 else
733 str[0] = 0;
734 }
735
736 delete mess;
737
738 return n; // number of bytes read (len of str + sizeof(kind)
739}
740
741////////////////////////////////////////////////////////////////////////////////
742/// Receives a status and a message type. Returns length of received
743/// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
744/// is closed) or -1 in case of error or -4 in case a non-blocking
745/// socket would block (i.e. there is nothing to be read).
746
748{
749 Int_t n;
750 TMessage *mess;
751
753 if ((n = Recv(mess)) <= 0) {
754 if (n == -5) {
756 n = -1;
757 }
758 return n;
759 }
760
761 kind = mess->What();
762 (*mess) >> status;
763
764 delete mess;
765
766 return n; // number of bytes read (2 * sizeof(Int_t)
767}
768
769////////////////////////////////////////////////////////////////////////////////
770/// Receive a TMessage object. The user must delete the TMessage object.
771/// Returns length of message in bytes (can be 0 if other side of connection
772/// is closed) or -1 in case of error or -4 in case a non-blocking socket
773/// would block (i.e. there is nothing to be read) or -5 if pipe broken
774/// or reset by peer (EPIPE || ECONNRESET). In those case mess == 0.
775
777{
779
780 if (fSocket == -1) {
781 mess = 0;
782 return -1;
783 }
784
787 Int_t n;
788 UInt_t len;
789 if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
790 if (n == 0 || n == -5) {
791 // Connection closed, reset or broken
793 Close();
794 }
795 mess = 0;
796 return n;
797 }
798 len = net2host(len); //from network to host byte order
799
800 if (len > (std::numeric_limits<decltype(len)>::max() - sizeof(decltype(len)))) {
801 Error("Recv", "Buffer length is %u and %u+sizeof(UInt_t) cannot be represented as an UInt_t.", len, len);
802 return -1;
803 }
804
806 char *buf = new char[len+sizeof(UInt_t)];
807 if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
808 if (n == 0 || n == -5) {
809 // Connection closed, reset or broken
811 Close();
812 }
813 delete [] buf;
814 mess = 0;
815 return n;
816 }
817
818 fBytesRecv += n + sizeof(UInt_t);
819 fgBytesRecv += n + sizeof(UInt_t);
820
821 mess = new TMessage(buf, len+sizeof(UInt_t));
822
823 // receive any streamer infos
825 goto oncemore;
826
827 // receive any process ids
828 if (RecvProcessIDs(mess))
829 goto oncemore;
830
831 if (mess->What() & kMESS_ACK) {
833 char ok[2] = { 'o', 'k' };
834 Int_t n2 = 0;
835 if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
836 if (n2 == -5) {
837 // Connection reset or broken
839 Close();
840 }
841 delete mess;
842 mess = 0;
843 return n2;
844 }
845 mess->SetWhat(mess->What() & ~kMESS_ACK);
846
847 fBytesSent += 2;
848 fgBytesSent += 2;
849 }
850
851 Touch(); // update usage timestamp
852
853 return n;
854}
855
856////////////////////////////////////////////////////////////////////////////////
857/// Receive a raw buffer of specified length bytes. Using option kPeek
858/// one can peek at incoming data. Returns number of received bytes.
859/// Returns -1 in case of error. In case of opt == kOob: -2 means
860/// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
861/// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
862/// peer (EPIPE || ECONNRESET).
863
865{
867
868 if (fSocket == -1) return -1;
869 if (length == 0) return 0;
870
872 Int_t n;
873 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
874 if (n == 0 || n == -5) {
875 // Connection closed, reset or broken
877 Close();
878 }
879 return n;
880 }
881
882 fBytesRecv += n;
883 fgBytesRecv += n;
884
885 Touch(); // update usage timestamp
886
887 return n;
888}
889
890////////////////////////////////////////////////////////////////////////////////
891/// Receive a message containing streamer infos. In case the message contains
892/// streamer infos they are imported, the message will be deleted and the
893/// method returns kTRUE.
894
896{
897 if (mess->What() == kMESS_STREAMERINFO) {
898 TList *list = (TList*)mess->ReadObject(TList::Class());
899 TIter next(list);
901 TObjLink *lnk = list->FirstLink();
902 // First call BuildCheck for regular class
903 while (lnk) {
904 info = (TStreamerInfo*)lnk->GetObject();
905 TObject *element = info->GetElements()->UncheckedAt(0);
906 Bool_t isstl = element && strcmp("This",element->GetName())==0;
907 if (!isstl) {
908 info->BuildCheck();
909 if (gDebug > 0)
910 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
911 info->GetName(), info->GetClassVersion());
912 }
913 lnk = lnk->Next();
914 }
915 // Then call BuildCheck for stl class
916 lnk = list->FirstLink();
917 while (lnk) {
918 info = (TStreamerInfo*)lnk->GetObject();
919 TObject *element = info->GetElements()->UncheckedAt(0);
920 Bool_t isstl = element && strcmp("This",element->GetName())==0;
921 if (isstl) {
922 info->BuildCheck();
923 if (gDebug > 0)
924 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
925 info->GetName(), info->GetClassVersion());
926 }
927 lnk = lnk->Next();
928 }
929 delete list;
930 delete mess;
931
932 return kTRUE;
933 }
934 return kFALSE;
935}
936
937////////////////////////////////////////////////////////////////////////////////
938/// Receive a message containing process ids. In case the message contains
939/// process ids they are imported, the message will be deleted and the
940/// method returns kTRUE.
941
943{
944 if (mess->What() == kMESS_PROCESSID) {
945 TList *list = (TList*)mess->ReadObject(TList::Class());
946 TIter next(list);
947 TProcessID *pid;
948 while ((pid = (TProcessID*)next())) {
949 // check that a similar pid is not already registered in fgPIDs
952 TProcessID *p;
953 while ((p = (TProcessID*)nextpid())) {
954 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
955 delete pid;
956 pid = 0;
957 break;
958 }
959 }
960 if (pid) {
961 if (gDebug > 0)
962 Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
963 pid->IncrementCount();
964 pidslist->Add(pid);
965 Int_t ind = pidslist->IndexOf(pid);
966 pid->SetUniqueID((UInt_t)ind);
967 }
968 }
969 delete list;
970 delete mess;
971
972 return kTRUE;
973 }
974 return kFALSE;
975}
976
977////////////////////////////////////////////////////////////////////////////////
978/// Set socket options.
979
981{
982 if (fSocket == -1) return -1;
983
984 return gSystem->SetSockOpt(fSocket, opt, val);
985}
986
987////////////////////////////////////////////////////////////////////////////////
988/// Get socket options. Returns -1 in case of error.
989
991{
992 if (fSocket == -1) return -1;
993
994 return gSystem->GetSockOpt(fSocket, opt, &val);
995}
996
997////////////////////////////////////////////////////////////////////////////////
998/// Returns error code. Meaning depends on context where it is called.
999/// If no error condition returns 0 else a value < 0.
1000/// For example see TServerSocket ctor.
1001
1003{
1004 if (!IsValid())
1005 return fSocket;
1006
1007 return 0;
1008}
1009
1010////////////////////////////////////////////////////////////////////////////////
1011/// See comments for function SetCompressionSettings
1012
1014{
1016 if (fCompress < 0) {
1017 // if the level is not defined yet use 4 as a default (with ZLIB was 1)
1019 } else {
1020 int level = fCompress % 100;
1021 fCompress = 100 * algorithm + level;
1022 }
1023}
1024
1025////////////////////////////////////////////////////////////////////////////////
1026/// See comments for function SetCompressionSettings
1027
1029{
1030 if (level < 0) level = 0;
1031 if (level > 99) level = 99;
1032 if (fCompress < 0) {
1033 // if the algorithm is not defined yet use 0 as a default
1034 fCompress = level;
1035 } else {
1036 int algorithm = fCompress / 100;
1038 fCompress = 100 * algorithm + level;
1039 }
1040}
1041
1042////////////////////////////////////////////////////////////////////////////////
1043/// Used to specify the compression level and algorithm:
1044/// settings = 100 * algorithm + level
1045///
1046/// level = 0, objects written to this file will not be compressed.
1047/// level = 1, minimal compression level but fast.
1048/// ....
1049/// level = 9, maximal compression level but slower and might use more memory.
1050/// (For the currently supported algorithms, the maximum level is 9)
1051/// If compress is negative it indicates the compression level is not set yet.
1052///
1053/// The enumeration ROOT::RCompressionSetting::EAlgorithm associates each
1054/// algorithm with a number. There is a utility function to help
1055/// to set the value of the argument. For example,
1056/// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1057/// will build an integer which will set the compression to use
1058/// the LZMA algorithm and compression level 1. These are defined
1059/// in the header file Compression.h.
1060///
1061/// Note that the compression settings may be changed at any time.
1062/// The new compression settings will only apply to branches created
1063/// or attached after the setting is changed and other objects written
1064/// after the setting is changed.
1065
1070
1071////////////////////////////////////////////////////////////////////////////////
1072/// Print error string depending on error code.
1073
1074void TUDPSocket::NetError(const char *where, Int_t err)
1075{
1076 // Make sure it is in range
1077 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1078
1079 if (gDebug > 0)
1080 ::Error(where, "%s", gRootdErrStr[err]);
1081}
1082
1083////////////////////////////////////////////////////////////////////////////////
1084/// Get total number of bytes sent via all sockets.
1085
1090
1091////////////////////////////////////////////////////////////////////////////////
1092/// Get total number of bytes received via all sockets.
1093
UShort_t net2host(UShort_t x)
Definition Bytes.h:575
@ kMESS_STRING
@ kMESS_ACK
@ kMESS_PROCESSID
@ kMESS_STREAMERINFO
R__EXTERN const char * gRootdErrStr[]
Definition NetErrors.h:72
@ kErrError
Definition NetErrors.h:69
#define SafeDelete(p)
Definition RConfig.hxx:531
int Int_t
Signed integer 4 bytes (int)
Definition RtypesCore.h:59
long Long_t
Signed long integer 4 bytes (long). Size depends on architecture.
Definition RtypesCore.h:68
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int)
Definition RtypesCore.h:60
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
const char Option_t
Option string (const char)
Definition RtypesCore.h:80
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Int_t gDebug
Global variable setting the debug level. Set to 0 to disable, increase it in steps of 1 to increase t...
Definition TROOT.cxx:783
R__EXTERN TVirtualMutex * gROOTMutex
Definition TROOT.h:63
#define gROOT
Definition TROOT.h:426
ESockOptions
Definition TSystem.h:229
ESendRecvOptions
Definition TSystem.h:242
R__EXTERN TSystem * gSystem
Definition TSystem.h:582
#define R__LOCKGUARD(mutex)
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
This class represents an Internet Protocol (IP) address.
Int_t GetPort() const
const char * GetHostName() const
A doubly linked list.
Definition TList.h:38
static TClass * Class()
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:708
void Add(TObject *obj) override
Definition TList.h:81
Int_t Compress()
Compress the message.
Definition TMessage.cxx:318
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:173
const char * GetTitle() const override
Returns title of object.
Definition TNamed.h:50
TString fName
Definition TNamed.h:32
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition TNamed.cxx:149
An array of TObjects.
Definition TObjArray.h:31
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:42
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:477
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:1081
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:885
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1095
virtual void SetUniqueID(UInt_t uid)
Set the unique object id.
Definition TObject.cxx:896
void ResetBit(UInt_t f)
Definition TObject.h:203
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:1069
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
Int_t IncrementCount()
Increase the reference count to this object.
static TObjArray * GetPIDs()
static: returns array of TProcessIDs
Describes a persistent version of a class.
Basic string class.
Definition TString.h:138
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2362
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:641
virtual int GetServiceByName(const char *service)
Get port # of internet service.
Definition TSystem.cxx:2329
virtual TInetAddress GetSockName(int sock)
Get Internet Protocol (IP) address of host and port #.
Definition TSystem.cxx:2320
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:282
virtual char * GetServiceByPort(int port)
Get name of internet service.
Definition TSystem.cxx:2338
virtual int SetSockOpt(int sock, int kind, int val)
Set socket option.
Definition TSystem.cxx:2447
virtual TInetAddress GetPeerName(int sock)
Get Internet Protocol (IP) address of remote host and port #.
Definition TSystem.cxx:2311
virtual int OpenConnection(const char *server, int port, int tcpwindowsize=-1, const char *protocol="tcp")
Open a connection to another host.
Definition TSystem.cxx:2347
virtual int GetSockOpt(int sock, int kind, int *val)
Get socket option.
Definition TSystem.cxx:2456
virtual int RecvRaw(int sock, void *buffer, int length, int flag)
Receive exactly length bytes into buffer.
Definition TSystem.cxx:2410
virtual Int_t Select(TList *active, Long_t timeout)
Select on active file descriptors (called by TMonitor).
Definition TSystem.cxx:443
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition TSystem.cxx:2302
virtual int SendRaw(int sock, const void *buffer, int length, int flag)
Send exactly length bytes from buffer.
Definition TSystem.cxx:2420
virtual void CloseConnection(int sock, Bool_t force=kFALSE)
Close socket connection.
Definition TSystem.cxx:2401
This class implements UDP client sockets.
Definition TUDPSocket.h:36
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
UInt_t fBytesRecv
Definition TUDPSocket.h:49
Int_t GetErrorCode() const
Returns error code.
UInt_t fBytesSent
Definition TUDPSocket.h:50
Int_t fCompress
Definition TUDPSocket.h:51
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual TInetAddress GetLocalInetAddress()
Return internet address of local host to which the socket is bound.
EServiceType fServType
Definition TUDPSocket.h:57
TInetAddress fLocalAddress
Definition TUDPSocket.h:52
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
See comments for function SetCompressionSettings.
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
TVirtualMutex * fLastUsageMtx
Definition TUDPSocket.h:63
virtual Int_t GetLocalPort()
Return the local port # to which the socket is bound.
TInetAddress fAddress
Definition TUDPSocket.h:48
TString fService
Definition TUDPSocket.h:56
static ULong64_t fgBytesRecv
Definition TUDPSocket.h:66
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
virtual Bool_t IsValid() const
Definition TUDPSocket.h:119
Int_t fRemoteProtocol
Definition TUDPSocket.h:53
static void NetError(const char *where, Int_t error)
Print error string depending on error code.
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
See comments for function SetCompressionSettings.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void Close(Option_t *opt="")
Close the socket.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
static ULong64_t fgBytesSent
Definition TUDPSocket.h:67
Int_t GetCompressionLevel() const
Definition TUDPSocket.h:162
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
TList * fUUIDs
Definition TUDPSocket.h:61
void Touch()
Definition TUDPSocket.h:145
TBits fBitsInfo
Definition TUDPSocket.h:60
static ULong64_t GetSocketBytesRecv()
Get total number of bytes received via all sockets.
ROOT::Deprecated::TSecContext * fSecContext
Definition TUDPSocket.h:54
TString fUrl
Definition TUDPSocket.h:59
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Option_t * GetOption() const override
Definition TUDPSocket.h:82
static ULong64_t GetSocketBytesSent()
Get total number of bytes sent via all sockets.
Int_t fSocket
Definition TUDPSocket.h:58
virtual Int_t Select(Int_t interest=kRead, Long_t timeout=-1)
Waits for this socket to change status.
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
This class represents a WWW compatible URL.
Definition TUrl.h:33
const Int_t n
Definition legend1.C:16
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:72