Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TUDPSocket.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Marcelo Sousa 26/10/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/**
13\file TUDPSocket.cxx
14\class TUDPSocket
15\brief This class implements UDP client sockets.
16\note This class deals with sockets: the user is entirely responsible for the security of their usage, for example, but
17not limited to, the management of the connections to said sockets.
18
19A socket is an endpoint for communication between two machines. The actual work is done via the TSystem class (either
20TUnixSystem or TWinNTSystem).
21**/
22
23#include "Bytes.h"
24#include "Compression.h"
25#include "NetErrors.h"
26#include "TError.h"
27#include "TMessage.h"
28#include "TUDPSocket.h"
29#include "TObjString.h"
30#include "TPluginManager.h"
31#include "TROOT.h"
32#include "TString.h"
33#include "TSystem.h"
34#include "TUrl.h"
35#include "TVirtualAuth.h"
36#include "TStreamerInfo.h"
37#include "TProcessID.h"
38
39#include <limits>
40
43
44
45
46////////////////////////////////////////////////////////////////////////////////
47/// Create a socket. Connect to the named service at address addr.
48/// Use tcpwindowsize to specify the size of the receive buffer, it has
49/// to be specified here to make sure the window scale option is set (for
50/// tcpwindowsize > 65KB and for platforms supporting window scaling).
51/// Returns when connection has been accepted by remote side. Use IsValid()
52/// to check the validity of the socket. Every socket is added to the TROOT
53/// sockets list which will make sure that any open sockets are properly
54/// closed on program termination.
55
57 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
58{
61
63 fSecContext = 0;
66 if (fService.Contains("root"))
68 fAddress = addr;
70 fBytesSent = 0;
71 fBytesRecv = 0;
72 fUUIDs = 0;
73 fLastUsageMtx = 0;
75
76 if (fAddress.GetPort() != -1) {
78 -1, "upd");
79
80 if (fSocket != -1) {
82 gROOT->GetListOfSockets()->Add(this);
83 }
84 } else
85 fSocket = -1;
86
87}
88
89
90////////////////////////////////////////////////////////////////////////////////
91/// Create a socket. Connect to the specified port # at address addr.
92/// Use tcpwindowsize to specify the size of the receive buffer, it has
93/// to be specified here to make sure the window scale option is set (for
94/// tcpwindowsize > 65KB and for platforms supporting window scaling).
95/// Returns when connection has been accepted by remote side. Use IsValid()
96/// to check the validity of the socket. Every socket is added to the TROOT
97/// sockets list which will make sure that any open sockets are properly
98/// closed on program termination.
99
101 : TNamed(addr.GetHostName(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
102{
105
107 fSecContext = 0;
108 fRemoteProtocol= -1;
110 if (fService.Contains("root"))
112 fAddress = addr;
113 fAddress.fPort = port;
115 fBytesSent = 0;
116 fBytesRecv = 0;
117 fUUIDs = 0;
118 fLastUsageMtx = 0;
120
122 -1, "upd");
123 if (fSocket == -1)
124 fAddress.fPort = -1;
125 else {
127 gROOT->GetListOfSockets()->Add(this);
128 }
129}
130
131////////////////////////////////////////////////////////////////////////////////
132/// Create a socket. Connect to named service on the remote host.
133/// Use tcpwindowsize to specify the size of the receive buffer, it has
134/// to be specified here to make sure the window scale option is set (for
135/// tcpwindowsize > 65KB and for platforms supporting window scaling).
136/// Returns when connection has been accepted by remote side. Use IsValid()
137/// to check the validity of the socket. Every socket is added to the TROOT
138/// sockets list which will make sure that any open sockets are properly
139/// closed on program termination.
140
141TUDPSocket::TUDPSocket(const char *host, const char *service)
142 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
143{
146
148 fSecContext = 0;
149 fRemoteProtocol= -1;
151 if (fService.Contains("root"))
156 fBytesSent = 0;
157 fBytesRecv = 0;
158 fUUIDs = 0;
159 fLastUsageMtx = 0;
161
162 if (fAddress.GetPort() != -1) {
163 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
164 if (fSocket != -1) {
166 gROOT->GetListOfSockets()->Add(this);
167 }
168 } else
169 fSocket = -1;
170}
171
172////////////////////////////////////////////////////////////////////////////////
173/// Create a socket; see CreateAuthSocket for the form of url.
174/// Connect to the specified port # on the remote host.
175/// If user is specified in url, try authentication as user.
176/// Use tcpwindowsize to specify the size of the receive buffer, it has
177/// to be specified here to make sure the window scale option is set (for
178/// tcpwindowsize > 65KB and for platforms supporting window scaling).
179/// Returns when connection has been accepted by remote side. Use IsValid()
180/// to check the validity of the socket. Every socket is added to the TROOT
181/// sockets list which will make sure that any open sockets are properly
182/// closed on program termination.
183
185 : TNamed(TUrl(url).GetHost(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
186{
189
190 fUrl = TString(url);
191 TString host(TUrl(fUrl).GetHost());
192
194 fSecContext = 0;
195 fRemoteProtocol= -1;
197 if (fUrl.Contains("root"))
200 fAddress.fPort = port;
203 fBytesSent = 0;
204 fBytesRecv = 0;
205 fUUIDs = 0;
206 fLastUsageMtx = 0;
208
209 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
210 if (fSocket == -1) {
211 fAddress.fPort = -1;
212 } else {
214 gROOT->GetListOfSockets()->Add(this);
215 }
216}
217
218////////////////////////////////////////////////////////////////////////////////
219/// Create a socket in the Unix domain on 'sockpath'.
220/// Returns when connection has been accepted by the server. Use IsValid()
221/// to check the validity of the socket. Every socket is added to the TROOT
222/// sockets list which will make sure that any open sockets are properly
223/// closed on program termination.
224
226 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
227{
230
231 fUrl = sockpath;
232
233 fService = "unix";
234 fSecContext = 0;
235 fRemoteProtocol= -1;
237 fAddress.fPort = -1;
238 fName.Form("unix:%s", sockpath);
240 fBytesSent = 0;
241 fBytesRecv = 0;
242 fUUIDs = 0;
243 fLastUsageMtx = 0;
245
246 fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
247 if (fSocket > 0) {
249 gROOT->GetListOfSockets()->Add(this);
250 }
251}
252
253////////////////////////////////////////////////////////////////////////////////
254/// Create a socket. The socket will adopt previously opened TCP socket with
255/// descriptor desc.
256
257TUDPSocket::TUDPSocket(Int_t desc) : TNamed("", ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
258{
261
262 fSecContext = 0;
263 fRemoteProtocol = 0;
264 fService = (char *)kSOCKD;
266 fBytesSent = 0;
267 fBytesRecv = 0;
268 fUUIDs = 0;
269 fLastUsageMtx = 0;
271
272 if (desc >= 0) {
273 fSocket = desc;
276 gROOT->GetListOfSockets()->Add(this);
277 } else
278 fSocket = -1;
279}
280
281////////////////////////////////////////////////////////////////////////////////
282/// Create a socket. The socket will adopt previously opened Unix socket with
283/// descriptor desc. The sockpath arg is for info purposes only. Use
284/// this method to adopt e.g. a socket created via socketpair().
285
287 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
288{
291
292 fUrl = sockpath;
293
294 fService = "unix";
295 fSecContext = 0;
296 fRemoteProtocol= -1;
298 fAddress.fPort = -1;
299 fName.Form("unix:%s", sockpath);
301 fBytesSent = 0;
302 fBytesRecv = 0;
303 fUUIDs = 0;
304 fLastUsageMtx = 0;
306
307 if (desc >= 0) {
308 fSocket = desc;
310 gROOT->GetListOfSockets()->Add(this);
311 } else
312 fSocket = -1;
313}
314
315
316////////////////////////////////////////////////////////////////////////////////
317/// TUDPSocket copy ctor.
318
320{
321 fSocket = s.fSocket;
322 fService = s.fService;
323 fAddress = s.fAddress;
331 fUUIDs = 0;
332 fLastUsageMtx = 0;
334
335 if (fSocket != -1) {
337 gROOT->GetListOfSockets()->Add(this);
338 }
339}
340
341////////////////////////////////////////////////////////////////////////////////
342/// Close the socket. If option is "force", calls shutdown(id,2) to
343/// shut down the connection. This will close the connection also
344/// for the parent of this process. Also called via the dtor (without
345/// option "force", call explicitly Close("force") if this is desired).
346
348{
349 Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
350
351 if (fSocket != -1) {
354 gROOT->GetListOfSockets()->Remove(this);
355 }
356 fSocket = -1;
357
360}
361
362////////////////////////////////////////////////////////////////////////////////
363/// Return internet address of local host to which the socket is bound.
364/// In case of error TInetAddress::IsValid() returns kFALSE.
365
367{
368 if (IsValid()) {
369 if (fLocalAddress.GetPort() == -1)
371 return fLocalAddress;
372 }
373 return TInetAddress();
374}
375
376////////////////////////////////////////////////////////////////////////////////
377/// Return the local port # to which the socket is bound.
378/// In case of error return -1.
379
381{
382 if (IsValid()) {
383 if (fLocalAddress.GetPort() == -1)
385 return fLocalAddress.GetPort();
386 }
387 return -1;
388}
389
390////////////////////////////////////////////////////////////////////////////////
391/// Waits for this socket to change status. If interest=kRead,
392/// the socket will be watched to see if characters become available for
393/// reading; if interest=kWrite the socket will be watched to
394/// see if a write will not block.
395/// The argument 'timeout' specifies a maximum time to wait in millisec.
396/// Default no timeout.
397/// Returns 1 if a change of status of interest has been detected within
398/// timeout; 0 in case of timeout; < 0 if an error occured.
399
401{
402 Int_t rc = 1;
403
404 // Associate a TFileHandler to this socket
406
407 // Wait for an event now
408 rc = gSystem->Select(&fh, timeout);
409
410 return rc;
411}
412
413////////////////////////////////////////////////////////////////////////////////
414/// Send a single message opcode. Use kind (opcode) to set the
415/// TMessage "what" field. Returns the number of bytes that were sent
416/// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
417/// been or'ed with kMESS_ACK, the call will only return after having
418/// received an acknowledgement, making the sending process synchronous.
419
421{
422 TMessage mess(kind);
423
424 Int_t nsent;
425 if ((nsent = Send(mess)) < 0)
426 return -1;
427
428 return nsent;
429}
430
431////////////////////////////////////////////////////////////////////////////////
432/// Send a status and a single message opcode. Use kind (opcode) to set the
433/// TMessage "what" field. Returns the number of bytes that were sent
434/// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
435/// been or'ed with kMESS_ACK, the call will only return after having
436/// received an acknowledgement, making the sending process synchronous.
437
439{
440 TMessage mess(kind);
441 mess << status;
442
443 Int_t nsent;
444 if ((nsent = Send(mess)) < 0)
445 return -1;
446
447 return nsent;
448}
449
450////////////////////////////////////////////////////////////////////////////////
451/// Send a character string buffer. Use kind to set the TMessage "what" field.
452/// Returns the number of bytes in the string str that were sent and -1 in
453/// case of error. In case the kind has been or'ed with kMESS_ACK, the call
454/// will only return after having received an acknowledgement, making the
455/// sending process synchronous.
456
457Int_t TUDPSocket::Send(const char *str, Int_t kind)
458{
459 TMessage mess(kind);
460 if (str) mess.WriteString(str);
461
462 Int_t nsent;
463 if ((nsent = Send(mess)) < 0)
464 return -1;
465
466 return nsent - sizeof(Int_t); // - TMessage::What()
467}
468
469////////////////////////////////////////////////////////////////////////////////
470/// Send a TMessage object. Returns the number of bytes in the TMessage
471/// that were sent and -1 in case of error. In case the TMessage::What
472/// has been or'ed with kMESS_ACK, the call will only return after having
473/// received an acknowledgement, making the sending process synchronous.
474/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
475/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
476/// support for streaming TStreamerInfo added by Rene Brun May 2008
477/// support for streaming TProcessID added by Rene Brun June 2008
478
480{
482
483 if (fSocket == -1) return -1;
484
485 if (mess.IsReading()) {
486 Error("Send", "cannot send a message used for reading");
487 return -1;
488 }
489
490 // send streamer infos in case schema evolution is enabled in the TMessage
492
493 // send the process id's so TRefs work
495
496 mess.SetLength(); //write length in first word of buffer
497
498 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
500
501 if (mess.GetCompressionLevel() > 0)
502 const_cast<TMessage&>(mess).Compress();
503
504 char *mbuf = mess.Buffer();
505 Int_t mlen = mess.Length();
506 if (mess.CompBuffer()) {
507 mbuf = mess.CompBuffer();
508 mlen = mess.CompLength();
509 }
510
512 Int_t nsent;
513 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
514 if (nsent == -5) {
515 // Connection reset by peer or broken
517 Close();
518 }
519 return nsent;
520 }
521
522 fBytesSent += nsent;
524
525 // If acknowledgement is desired, wait for it
526 if (mess.What() & kMESS_ACK) {
529 char buf[2];
530 Int_t n = 0;
531 if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
532 if (n == -5) {
533 // Connection reset by peer or broken
535 Close();
536 } else
537 n = -1;
538 return n;
539 }
540 if (strncmp(buf, "ok", 2)) {
541 Error("Send", "bad acknowledgement");
542 return -1;
543 }
544 fBytesRecv += 2;
545 fgBytesRecv += 2;
546 }
547
548 Touch(); // update usage timestamp
549
550 return nsent - sizeof(UInt_t); //length - length header
551}
552
553////////////////////////////////////////////////////////////////////////////////
554/// Send an object. Returns the number of bytes sent and -1 in case of error.
555/// In case the "kind" has been or'ed with kMESS_ACK, the call will only
556/// return after having received an acknowledgement, making the sending
557/// synchronous.
558
560{
561 //stream object to message buffer
562 TMessage mess(kind);
563 mess.WriteObject(obj);
564
565 //now sending the object itself
566 Int_t nsent;
567 if ((nsent = Send(mess)) < 0)
568 return -1;
569
570 return nsent;
571}
572
573////////////////////////////////////////////////////////////////////////////////
574/// Send a raw buffer of specified length. Using option kOob one can send
575/// OOB data. Returns the number of bytes sent or -1 in case of error.
576/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
577/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
578
580{
582
583 if (fSocket == -1) return -1;
584
586 Int_t nsent;
587 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
588 if (nsent == -5) {
589 // Connection reset or broken: close
591 Close();
592 }
593 return nsent;
594 }
595
596 fBytesSent += nsent;
598
599 Touch(); // update usage timestamp
600
601 return nsent;
602}
603
604////////////////////////////////////////////////////////////////////////////////
605/// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
606/// in the object in the message is in the fInfos list of the message.
607/// We send only the TStreamerInfos not yet sent on this socket.
608
610{
611 if (mess.fInfos && mess.fInfos->GetEntries()) {
612 TIter next(mess.fInfos);
614 TList *minilist = 0;
615 while ((info = (TStreamerInfo*)next())) {
616 Int_t uid = info->GetNumber();
617 if (fBitsInfo.TestBitNumber(uid))
618 continue; //TStreamerInfo had already been sent
620 if (!minilist)
621 minilist = new TList();
622 if (gDebug > 0)
623 Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
624 info->GetName(),info->GetClassVersion());
625 minilist->Add(info);
626 }
627 if (minilist) {
629 messinfo.WriteObject(minilist);
630 delete minilist;
631 if (messinfo.fInfos)
632 messinfo.fInfos->Clear();
633 if (Send(messinfo) < 0)
634 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
635 }
636 }
637}
638
639////////////////////////////////////////////////////////////////////////////////
640/// Check if TProcessIDs must be sent. The list of TProcessIDs
641/// in the object in the message is found by looking in the TMessage bits.
642/// We send only the TProcessIDs not yet send on this socket.
643
645{
646 if (mess.TestBitNumber(0)) {
648 Int_t npids = pids->GetEntries();
649 TProcessID *pid;
650 TList *minilist = 0;
651 for (Int_t ipid = 0; ipid < npids; ipid++) {
652 pid = (TProcessID*)pids->At(ipid);
653 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
654 continue;
655 //check if a pid with this title has already been sent through the socket
656 //if not add it to the fUUIDs list
657 if (!fUUIDs) {
658 fUUIDs = new TList();
659 } else {
660 if (fUUIDs->FindObject(pid->GetTitle()))
661 continue;
662 }
663 fUUIDs->Add(new TObjString(pid->GetTitle()));
664 if (!minilist)
665 minilist = new TList();
666 if (gDebug > 0)
667 Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
668 minilist->Add(pid);
669 }
670 if (minilist) {
672 messpid.WriteObject(minilist);
673 delete minilist;
674 if (Send(messpid) < 0)
675 Warning("SendProcessIDs", "problems sending TProcessID's ...");
676 }
677 }
678}
679
680////////////////////////////////////////////////////////////////////////////////
681/// Receive a character string message of maximum max length. The expected
682/// message must be of type kMESS_STRING. Returns length of received string
683/// (can be 0 if otherside of connection is closed) or -1 in case of error
684/// or -4 in case a non-blocking socket would block (i.e. there is nothing
685/// to be read).
686
688{
689 Int_t n, kind;
690
692 if ((n = Recv(str, max, kind)) <= 0) {
693 if (n == -5) {
695 n = -1;
696 }
697 return n;
698 }
699
700 if (kind != kMESS_STRING) {
701 Error("Recv", "got message of wrong kind (expected %d, got %d)",
702 kMESS_STRING, kind);
703 return -1;
704 }
705
706 return n;
707}
708
709////////////////////////////////////////////////////////////////////////////////
710/// Receive a character string message of maximum max length. Returns in
711/// kind the message type. Returns length of received string+4 (can be 0 if
712/// other side of connection is closed) or -1 in case of error or -4 in
713/// case a non-blocking socket would block (i.e. there is nothing to be read).
714
715Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
716{
717 Int_t n;
718 TMessage *mess;
719
721 if ((n = Recv(mess)) <= 0) {
722 if (n == -5) {
724 n = -1;
725 }
726 return n;
727 }
728
729 kind = mess->What();
730 if (str) {
731 if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
732 mess->ReadString(str, max);
733 else
734 str[0] = 0;
735 }
736
737 delete mess;
738
739 return n; // number of bytes read (len of str + sizeof(kind)
740}
741
742////////////////////////////////////////////////////////////////////////////////
743/// Receives a status and a message type. Returns length of received
744/// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
745/// is closed) or -1 in case of error or -4 in case a non-blocking
746/// socket would block (i.e. there is nothing to be read).
747
749{
750 Int_t n;
751 TMessage *mess;
752
754 if ((n = Recv(mess)) <= 0) {
755 if (n == -5) {
757 n = -1;
758 }
759 return n;
760 }
761
762 kind = mess->What();
763 (*mess) >> status;
764
765 delete mess;
766
767 return n; // number of bytes read (2 * sizeof(Int_t)
768}
769
770////////////////////////////////////////////////////////////////////////////////
771/// Receive a TMessage object. The user must delete the TMessage object.
772/// Returns length of message in bytes (can be 0 if other side of connection
773/// is closed) or -1 in case of error or -4 in case a non-blocking socket
774/// would block (i.e. there is nothing to be read) or -5 if pipe broken
775/// or reset by peer (EPIPE || ECONNRESET). In those case mess == 0.
776
778{
780
781 if (fSocket == -1) {
782 mess = 0;
783 return -1;
784 }
785
788 Int_t n;
789 UInt_t len;
790 if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
791 if (n == 0 || n == -5) {
792 // Connection closed, reset or broken
794 Close();
795 }
796 mess = 0;
797 return n;
798 }
799 len = net2host(len); //from network to host byte order
800
801 if (len > (std::numeric_limits<decltype(len)>::max() - sizeof(decltype(len)))) {
802 Error("Recv", "Buffer length is %u and %u+sizeof(UInt_t) cannot be represented as an UInt_t.", len, len);
803 return -1;
804 }
805
807 char *buf = new char[len+sizeof(UInt_t)];
808 if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
809 if (n == 0 || n == -5) {
810 // Connection closed, reset or broken
812 Close();
813 }
814 delete [] buf;
815 mess = 0;
816 return n;
817 }
818
819 fBytesRecv += n + sizeof(UInt_t);
820 fgBytesRecv += n + sizeof(UInt_t);
821
822 mess = new TMessage(buf, len+sizeof(UInt_t));
823
824 // receive any streamer infos
826 goto oncemore;
827
828 // receive any process ids
829 if (RecvProcessIDs(mess))
830 goto oncemore;
831
832 if (mess->What() & kMESS_ACK) {
834 char ok[2] = { 'o', 'k' };
835 Int_t n2 = 0;
836 if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
837 if (n2 == -5) {
838 // Connection reset or broken
840 Close();
841 }
842 delete mess;
843 mess = 0;
844 return n2;
845 }
846 mess->SetWhat(mess->What() & ~kMESS_ACK);
847
848 fBytesSent += 2;
849 fgBytesSent += 2;
850 }
851
852 Touch(); // update usage timestamp
853
854 return n;
855}
856
857////////////////////////////////////////////////////////////////////////////////
858/// Receive a raw buffer of specified length bytes. Using option kPeek
859/// one can peek at incoming data. Returns number of received bytes.
860/// Returns -1 in case of error. In case of opt == kOob: -2 means
861/// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
862/// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
863/// peer (EPIPE || ECONNRESET).
864
866{
868
869 if (fSocket == -1) return -1;
870 if (length == 0) return 0;
871
873 Int_t n;
874 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
875 if (n == 0 || n == -5) {
876 // Connection closed, reset or broken
878 Close();
879 }
880 return n;
881 }
882
883 fBytesRecv += n;
884 fgBytesRecv += n;
885
886 Touch(); // update usage timestamp
887
888 return n;
889}
890
891////////////////////////////////////////////////////////////////////////////////
892/// Receive a message containing streamer infos. In case the message contains
893/// streamer infos they are imported, the message will be deleted and the
894/// method returns kTRUE.
895
897{
898 if (mess->What() == kMESS_STREAMERINFO) {
899 TList *list = (TList*)mess->ReadObject(TList::Class());
900 TIter next(list);
902 TObjLink *lnk = list->FirstLink();
903 // First call BuildCheck for regular class
904 while (lnk) {
905 info = (TStreamerInfo*)lnk->GetObject();
906 TObject *element = info->GetElements()->UncheckedAt(0);
907 Bool_t isstl = element && strcmp("This",element->GetName())==0;
908 if (!isstl) {
909 info->BuildCheck();
910 if (gDebug > 0)
911 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
912 info->GetName(), info->GetClassVersion());
913 }
914 lnk = lnk->Next();
915 }
916 // Then call BuildCheck for stl class
917 lnk = list->FirstLink();
918 while (lnk) {
919 info = (TStreamerInfo*)lnk->GetObject();
920 TObject *element = info->GetElements()->UncheckedAt(0);
921 Bool_t isstl = element && strcmp("This",element->GetName())==0;
922 if (isstl) {
923 info->BuildCheck();
924 if (gDebug > 0)
925 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
926 info->GetName(), info->GetClassVersion());
927 }
928 lnk = lnk->Next();
929 }
930 delete list;
931 delete mess;
932
933 return kTRUE;
934 }
935 return kFALSE;
936}
937
938////////////////////////////////////////////////////////////////////////////////
939/// Receive a message containing process ids. In case the message contains
940/// process ids they are imported, the message will be deleted and the
941/// method returns kTRUE.
942
944{
945 if (mess->What() == kMESS_PROCESSID) {
946 TList *list = (TList*)mess->ReadObject(TList::Class());
947 TIter next(list);
948 TProcessID *pid;
949 while ((pid = (TProcessID*)next())) {
950 // check that a similar pid is not already registered in fgPIDs
953 TProcessID *p;
954 while ((p = (TProcessID*)nextpid())) {
955 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
956 delete pid;
957 pid = 0;
958 break;
959 }
960 }
961 if (pid) {
962 if (gDebug > 0)
963 Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
964 pid->IncrementCount();
965 pidslist->Add(pid);
966 Int_t ind = pidslist->IndexOf(pid);
967 pid->SetUniqueID((UInt_t)ind);
968 }
969 }
970 delete list;
971 delete mess;
972
973 return kTRUE;
974 }
975 return kFALSE;
976}
977
978////////////////////////////////////////////////////////////////////////////////
979/// Set socket options.
980
982{
983 if (fSocket == -1) return -1;
984
985 return gSystem->SetSockOpt(fSocket, opt, val);
986}
987
988////////////////////////////////////////////////////////////////////////////////
989/// Get socket options. Returns -1 in case of error.
990
992{
993 if (fSocket == -1) return -1;
994
995 return gSystem->GetSockOpt(fSocket, opt, &val);
996}
997
998////////////////////////////////////////////////////////////////////////////////
999/// Returns error code. Meaning depends on context where it is called.
1000/// If no error condition returns 0 else a value < 0.
1001/// For example see TServerSocket ctor.
1002
1004{
1005 if (!IsValid())
1006 return fSocket;
1007
1008 return 0;
1009}
1010
1011////////////////////////////////////////////////////////////////////////////////
1012/// See comments for function SetCompressionSettings
1013
1015{
1017 if (fCompress < 0) {
1018 // if the level is not defined yet use 4 as a default (with ZLIB was 1)
1020 } else {
1021 int level = fCompress % 100;
1022 fCompress = 100 * algorithm + level;
1023 }
1024}
1025
1026////////////////////////////////////////////////////////////////////////////////
1027/// See comments for function SetCompressionSettings
1028
1030{
1031 if (level < 0) level = 0;
1032 if (level > 99) level = 99;
1033 if (fCompress < 0) {
1034 // if the algorithm is not defined yet use 0 as a default
1035 fCompress = level;
1036 } else {
1037 int algorithm = fCompress / 100;
1039 fCompress = 100 * algorithm + level;
1040 }
1041}
1042
1043////////////////////////////////////////////////////////////////////////////////
1044/// Used to specify the compression level and algorithm:
1045/// settings = 100 * algorithm + level
1046///
1047/// level = 0, objects written to this file will not be compressed.
1048/// level = 1, minimal compression level but fast.
1049/// ....
1050/// level = 9, maximal compression level but slower and might use more memory.
1051/// (For the currently supported algorithms, the maximum level is 9)
1052/// If compress is negative it indicates the compression level is not set yet.
1053///
1054/// The enumeration ROOT::RCompressionSetting::EAlgorithm associates each
1055/// algorithm with a number. There is a utility function to help
1056/// to set the value of the argument. For example,
1057/// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1058/// will build an integer which will set the compression to use
1059/// the LZMA algorithm and compression level 1. These are defined
1060/// in the header file Compression.h.
1061///
1062/// Note that the compression settings may be changed at any time.
1063/// The new compression settings will only apply to branches created
1064/// or attached after the setting is changed and other objects written
1065/// after the setting is changed.
1066
1071
1072////////////////////////////////////////////////////////////////////////////////
1073/// Print error string depending on error code.
1074
1075void TUDPSocket::NetError(const char *where, Int_t err)
1076{
1077 // Make sure it is in range
1078 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1079
1080 if (gDebug > 0)
1081 ::Error(where, "%s", gRootdErrStr[err]);
1082}
1083
1084////////////////////////////////////////////////////////////////////////////////
1085/// Get total number of bytes sent via all sockets.
1086
1091
1092////////////////////////////////////////////////////////////////////////////////
1093/// Get total number of bytes received via all sockets.
1094
UShort_t net2host(UShort_t x)
Definition Bytes.h:575
@ kMESS_STRING
@ kMESS_ACK
@ kMESS_PROCESSID
@ kMESS_STREAMERINFO
R__EXTERN const char * gRootdErrStr[]
Definition NetErrors.h:72
@ kErrError
Definition NetErrors.h:69
#define SafeDelete(p)
Definition RConfig.hxx:533
int Int_t
Signed integer 4 bytes (int)
Definition RtypesCore.h:59
long Long_t
Signed long integer 4 bytes (long). Size depends on architecture.
Definition RtypesCore.h:68
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int)
Definition RtypesCore.h:60
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
const char Option_t
Option string (const char)
Definition RtypesCore.h:80
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Int_t gDebug
Global variable setting the debug level. Set to 0 to disable, increase it in steps of 1 to increase t...
Definition TROOT.cxx:627
R__EXTERN TVirtualMutex * gROOTMutex
Definition TROOT.h:63
#define gROOT
Definition TROOT.h:411
ESockOptions
Definition TSystem.h:229
ESendRecvOptions
Definition TSystem.h:242
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
#define R__LOCKGUARD(mutex)
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
This class represents an Internet Protocol (IP) address.
Int_t GetPort() const
const char * GetHostName() const
A doubly linked list.
Definition TList.h:38
static TClass * Class()
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:575
void Add(TObject *obj) override
Definition TList.h:81
Int_t Compress()
Compress the message.
Definition TMessage.cxx:318
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:173
const char * GetTitle() const override
Returns title of object.
Definition TNamed.h:50
TString fName
Definition TNamed.h:32
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition TNamed.cxx:149
An array of TObjects.
Definition TObjArray.h:31
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:41
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:475
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:1057
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:864
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1071
virtual void SetUniqueID(UInt_t uid)
Set the unique object id.
Definition TObject.cxx:875
void ResetBit(UInt_t f)
Definition TObject.h:201
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:1045
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
Int_t IncrementCount()
Increase the reference count to this object.
static TObjArray * GetPIDs()
static: returns array of TProcessIDs
Describes a persistent version of a class.
Basic string class.
Definition TString.h:138
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2362
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:641
virtual int GetServiceByName(const char *service)
Get port # of internet service.
Definition TSystem.cxx:2329
virtual TInetAddress GetSockName(int sock)
Get Internet Protocol (IP) address of host and port #.
Definition TSystem.cxx:2320
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:282
virtual char * GetServiceByPort(int port)
Get name of internet service.
Definition TSystem.cxx:2338
virtual int SetSockOpt(int sock, int kind, int val)
Set socket option.
Definition TSystem.cxx:2447
virtual TInetAddress GetPeerName(int sock)
Get Internet Protocol (IP) address of remote host and port #.
Definition TSystem.cxx:2311
virtual int OpenConnection(const char *server, int port, int tcpwindowsize=-1, const char *protocol="tcp")
Open a connection to another host.
Definition TSystem.cxx:2347
virtual int GetSockOpt(int sock, int kind, int *val)
Get socket option.
Definition TSystem.cxx:2456
virtual int RecvRaw(int sock, void *buffer, int length, int flag)
Receive exactly length bytes into buffer.
Definition TSystem.cxx:2410
virtual Int_t Select(TList *active, Long_t timeout)
Select on active file descriptors (called by TMonitor).
Definition TSystem.cxx:443
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition TSystem.cxx:2302
virtual int SendRaw(int sock, const void *buffer, int length, int flag)
Send exactly length bytes from buffer.
Definition TSystem.cxx:2420
virtual void CloseConnection(int sock, Bool_t force=kFALSE)
Close socket connection.
Definition TSystem.cxx:2401
This class implements UDP client sockets.
Definition TUDPSocket.h:37
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
UInt_t fBytesRecv
Definition TUDPSocket.h:50
Int_t GetErrorCode() const
Returns error code.
UInt_t fBytesSent
Definition TUDPSocket.h:51
Int_t fCompress
Definition TUDPSocket.h:52
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual TInetAddress GetLocalInetAddress()
Return internet address of local host to which the socket is bound.
EServiceType fServType
Definition TUDPSocket.h:58
TInetAddress fLocalAddress
Definition TUDPSocket.h:53
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
See comments for function SetCompressionSettings.
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
TVirtualMutex * fLastUsageMtx
Definition TUDPSocket.h:64
virtual Int_t GetLocalPort()
Return the local port # to which the socket is bound.
TInetAddress fAddress
Definition TUDPSocket.h:49
TString fService
Definition TUDPSocket.h:57
static ULong64_t fgBytesRecv
Definition TUDPSocket.h:67
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
virtual Bool_t IsValid() const
Definition TUDPSocket.h:119
Int_t fRemoteProtocol
Definition TUDPSocket.h:54
static void NetError(const char *where, Int_t error)
Print error string depending on error code.
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
See comments for function SetCompressionSettings.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void Close(Option_t *opt="")
Close the socket.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
static ULong64_t fgBytesSent
Definition TUDPSocket.h:68
Int_t GetCompressionLevel() const
Definition TUDPSocket.h:161
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
TList * fUUIDs
Definition TUDPSocket.h:62
void Touch()
Definition TUDPSocket.h:144
TBits fBitsInfo
Definition TUDPSocket.h:61
static ULong64_t GetSocketBytesRecv()
Get total number of bytes received via all sockets.
TString fUrl
Definition TUDPSocket.h:60
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Option_t * GetOption() const override
Definition TUDPSocket.h:83
static ULong64_t GetSocketBytesSent()
Get total number of bytes sent via all sockets.
Int_t fSocket
Definition TUDPSocket.h:59
virtual Int_t Select(Int_t interest=kRead, Long_t timeout=-1)
Waits for this socket to change status.
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
TSecContext * fSecContext
Definition TUDPSocket.h:55
This class represents a WWW compatible URL.
Definition TUrl.h:33
const Int_t n
Definition legend1.C:16
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:72