Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TUDPSocket.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Marcelo Sousa 26/10/2011
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TUDPSocket //
15// //
16// This class implements UDP client sockets. A socket is an endpoint //
17// for communication between two machines. //
18// The actual work is done via the TSystem class (either TUnixSystem //
19// or TWinNTSystem). //
20// //
21//////////////////////////////////////////////////////////////////////////
22
23#include "Bytes.h"
24#include "Compression.h"
25#include "NetErrors.h"
26#include "TError.h"
27#include "TMessage.h"
28#include "TUDPSocket.h"
29#include "TObjString.h"
30#include "TPluginManager.h"
31#include "TROOT.h"
32#include "TString.h"
33#include "TSystem.h"
34#include "TUrl.h"
35#include "TVirtualAuth.h"
36#include "TStreamerInfo.h"
37#include "TProcessID.h"
38
41
42
44
45////////////////////////////////////////////////////////////////////////////////
46/// Create a socket. Connect to the named service at address addr.
47/// Use tcpwindowsize to specify the size of the receive buffer, it has
48/// to be specified here to make sure the window scale option is set (for
49/// tcpwindowsize > 65KB and for platforms supporting window scaling).
50/// Returns when connection has been accepted by remote side. Use IsValid()
51/// to check the validity of the socket. Every socket is added to the TROOT
52/// sockets list which will make sure that any open sockets are properly
53/// closed on program termination.
54
55TUDPSocket::TUDPSocket(TInetAddress addr, const char *service)
56 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
57{
60
61 fService = service;
62 fSecContext = 0;
65 if (fService.Contains("root"))
67 if (fService.Contains("proof"))
69 fAddress = addr;
71 fBytesSent = 0;
72 fBytesRecv = 0;
73 fUUIDs = 0;
74 fLastUsageMtx = 0;
76
77 if (fAddress.GetPort() != -1) {
79 -1, "upd");
80
81 if (fSocket != -1) {
83 gROOT->GetListOfSockets()->Add(this);
84 }
85 } else
86 fSocket = -1;
87
88}
89
90
91////////////////////////////////////////////////////////////////////////////////
92/// Create a socket. Connect to the specified port # at address addr.
93/// Use tcpwindowsize to specify the size of the receive buffer, it has
94/// to be specified here to make sure the window scale option is set (for
95/// tcpwindowsize > 65KB and for platforms supporting window scaling).
96/// Returns when connection has been accepted by remote side. Use IsValid()
97/// to check the validity of the socket. Every socket is added to the TROOT
98/// sockets list which will make sure that any open sockets are properly
99/// closed on program termination.
100
102 : TNamed(addr.GetHostName(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
103{
106
108 fSecContext = 0;
109 fRemoteProtocol= -1;
111 if (fService.Contains("root"))
113 if (fService.Contains("proof"))
115 fAddress = addr;
116 fAddress.fPort = port;
118 fBytesSent = 0;
119 fBytesRecv = 0;
120 fUUIDs = 0;
121 fLastUsageMtx = 0;
123
125 -1, "upd");
126 if (fSocket == -1)
127 fAddress.fPort = -1;
128 else {
130 gROOT->GetListOfSockets()->Add(this);
131 }
132}
133
134////////////////////////////////////////////////////////////////////////////////
135/// Create a socket. Connect to named service on the remote host.
136/// Use tcpwindowsize to specify the size of the receive buffer, it has
137/// to be specified here to make sure the window scale option is set (for
138/// tcpwindowsize > 65KB and for platforms supporting window scaling).
139/// Returns when connection has been accepted by remote side. Use IsValid()
140/// to check the validity of the socket. Every socket is added to the TROOT
141/// sockets list which will make sure that any open sockets are properly
142/// closed on program termination.
143
144TUDPSocket::TUDPSocket(const char *host, const char *service)
145 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
146{
149
150 fService = service;
151 fSecContext = 0;
152 fRemoteProtocol= -1;
154 if (fService.Contains("root"))
156 if (fService.Contains("proof"))
161 fBytesSent = 0;
162 fBytesRecv = 0;
163 fUUIDs = 0;
164 fLastUsageMtx = 0;
166
167 if (fAddress.GetPort() != -1) {
168 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
169 if (fSocket != -1) {
171 gROOT->GetListOfSockets()->Add(this);
172 }
173 } else
174 fSocket = -1;
175}
176
177////////////////////////////////////////////////////////////////////////////////
178/// Create a socket; see CreateAuthSocket for the form of url.
179/// Connect to the specified port # on the remote host.
180/// If user is specified in url, try authentication as user.
181/// Use tcpwindowsize to specify the size of the receive buffer, it has
182/// to be specified here to make sure the window scale option is set (for
183/// tcpwindowsize > 65KB and for platforms supporting window scaling).
184/// Returns when connection has been accepted by remote side. Use IsValid()
185/// to check the validity of the socket. Every socket is added to the TROOT
186/// sockets list which will make sure that any open sockets are properly
187/// closed on program termination.
188
189TUDPSocket::TUDPSocket(const char *url, Int_t port)
190 : TNamed(TUrl(url).GetHost(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
191{
194
195 fUrl = TString(url);
196 TString host(TUrl(fUrl).GetHost());
197
199 fSecContext = 0;
200 fRemoteProtocol= -1;
202 if (fUrl.Contains("root"))
204 if (fUrl.Contains("proof"))
207 fAddress.fPort = port;
210 fBytesSent = 0;
211 fBytesRecv = 0;
212 fUUIDs = 0;
213 fLastUsageMtx = 0;
215
216 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
217 if (fSocket == -1) {
218 fAddress.fPort = -1;
219 } else {
221 gROOT->GetListOfSockets()->Add(this);
222 }
223}
224
225////////////////////////////////////////////////////////////////////////////////
226/// Create a socket in the Unix domain on 'sockpath'.
227/// Returns when connection has been accepted by the server. Use IsValid()
228/// to check the validity of the socket. Every socket is added to the TROOT
229/// sockets list which will make sure that any open sockets are properly
230/// closed on program termination.
231
232TUDPSocket::TUDPSocket(const char *sockpath) : TNamed(sockpath, ""),
233 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
234{
237
238 fUrl = sockpath;
239
240 fService = "unix";
241 fSecContext = 0;
242 fRemoteProtocol= -1;
244 fAddress.fPort = -1;
245 fName.Form("unix:%s", sockpath);
247 fBytesSent = 0;
248 fBytesRecv = 0;
249 fUUIDs = 0;
250 fLastUsageMtx = 0;
252
253 fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
254 if (fSocket > 0) {
256 gROOT->GetListOfSockets()->Add(this);
257 }
258}
259
260////////////////////////////////////////////////////////////////////////////////
261/// Create a socket. The socket will adopt previously opened TCP socket with
262/// descriptor desc.
263
264TUDPSocket::TUDPSocket(Int_t desc) : TNamed("", ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
265{
268
269 fSecContext = 0;
270 fRemoteProtocol = 0;
271 fService = (char *)kSOCKD;
273 fBytesSent = 0;
274 fBytesRecv = 0;
275 fUUIDs = 0;
276 fLastUsageMtx = 0;
278
279 if (desc >= 0) {
280 fSocket = desc;
283 gROOT->GetListOfSockets()->Add(this);
284 } else
285 fSocket = -1;
286}
287
288////////////////////////////////////////////////////////////////////////////////
289/// Create a socket. The socket will adopt previously opened Unix socket with
290/// descriptor desc. The sockpath arg is for info purposes only. Use
291/// this method to adopt e.g. a socket created via socketpair().
292
293TUDPSocket::TUDPSocket(Int_t desc, const char *sockpath) : TNamed(sockpath, ""),
294 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
295{
298
299 fUrl = sockpath;
300
301 fService = "unix";
302 fSecContext = 0;
303 fRemoteProtocol= -1;
305 fAddress.fPort = -1;
306 fName.Form("unix:%s", sockpath);
308 fBytesSent = 0;
309 fBytesRecv = 0;
310 fUUIDs = 0;
311 fLastUsageMtx = 0;
313
314 if (desc >= 0) {
315 fSocket = desc;
317 gROOT->GetListOfSockets()->Add(this);
318 } else
319 fSocket = -1;
320}
321
322
323////////////////////////////////////////////////////////////////////////////////
324/// TUDPSocket copy ctor.
325
327{
328 fSocket = s.fSocket;
329 fService = s.fService;
330 fAddress = s.fAddress;
338 fUUIDs = 0;
339 fLastUsageMtx = 0;
341
342 if (fSocket != -1) {
344 gROOT->GetListOfSockets()->Add(this);
345 }
346}
347
348////////////////////////////////////////////////////////////////////////////////
349/// Close the socket. If option is "force", calls shutdown(id,2) to
350/// shut down the connection. This will close the connection also
351/// for the parent of this process. Also called via the dtor (without
352/// option "force", call explicitly Close("force") if this is desired).
353
355{
356 Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
357
358 if (fSocket != -1) {
361 gROOT->GetListOfSockets()->Remove(this);
362 }
363 fSocket = -1;
364
367}
368
369////////////////////////////////////////////////////////////////////////////////
370/// Return internet address of local host to which the socket is bound.
371/// In case of error TInetAddress::IsValid() returns kFALSE.
372
374{
375 if (IsValid()) {
376 if (fLocalAddress.GetPort() == -1)
378 return fLocalAddress;
379 }
380 return TInetAddress();
381}
382
383////////////////////////////////////////////////////////////////////////////////
384/// Return the local port # to which the socket is bound.
385/// In case of error return -1.
386
388{
389 if (IsValid()) {
390 if (fLocalAddress.GetPort() == -1)
392 return fLocalAddress.GetPort();
393 }
394 return -1;
395}
396
397////////////////////////////////////////////////////////////////////////////////
398/// Waits for this socket to change status. If interest=kRead,
399/// the socket will be watched to see if characters become available for
400/// reading; if interest=kWrite the socket will be watched to
401/// see if a write will not block.
402/// The argument 'timeout' specifies a maximum time to wait in millisec.
403/// Default no timeout.
404/// Returns 1 if a change of status of interest has been detected within
405/// timeout; 0 in case of timeout; < 0 if an error occured.
406
408{
409 Int_t rc = 1;
410
411 // Associate a TFileHandler to this socket
412 TFileHandler fh(fSocket, interest);
413
414 // Wait for an event now
415 rc = gSystem->Select(&fh, timeout);
416
417 return rc;
418}
419
420////////////////////////////////////////////////////////////////////////////////
421/// Send a single message opcode. Use kind (opcode) to set the
422/// TMessage "what" field. Returns the number of bytes that were sent
423/// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
424/// been or'ed with kMESS_ACK, the call will only return after having
425/// received an acknowledgement, making the sending process synchronous.
426
428{
429 TMessage mess(kind);
430
431 Int_t nsent;
432 if ((nsent = Send(mess)) < 0)
433 return -1;
434
435 return nsent;
436}
437
438////////////////////////////////////////////////////////////////////////////////
439/// Send a status and a single message opcode. Use kind (opcode) to set the
440/// TMessage "what" field. Returns the number of bytes that were sent
441/// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
442/// been or'ed with kMESS_ACK, the call will only return after having
443/// received an acknowledgement, making the sending process synchronous.
444
446{
447 TMessage mess(kind);
448 mess << status;
449
450 Int_t nsent;
451 if ((nsent = Send(mess)) < 0)
452 return -1;
453
454 return nsent;
455}
456
457////////////////////////////////////////////////////////////////////////////////
458/// Send a character string buffer. Use kind to set the TMessage "what" field.
459/// Returns the number of bytes in the string str that were sent and -1 in
460/// case of error. In case the kind has been or'ed with kMESS_ACK, the call
461/// will only return after having received an acknowledgement, making the
462/// sending process synchronous.
463
464Int_t TUDPSocket::Send(const char *str, Int_t kind)
465{
466 TMessage mess(kind);
467 if (str) mess.WriteString(str);
468
469 Int_t nsent;
470 if ((nsent = Send(mess)) < 0)
471 return -1;
472
473 return nsent - sizeof(Int_t); // - TMessage::What()
474}
475
476////////////////////////////////////////////////////////////////////////////////
477/// Send a TMessage object. Returns the number of bytes in the TMessage
478/// that were sent and -1 in case of error. In case the TMessage::What
479/// has been or'ed with kMESS_ACK, the call will only return after having
480/// received an acknowledgement, making the sending process synchronous.
481/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
482/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
483/// support for streaming TStreamerInfo added by Rene Brun May 2008
484/// support for streaming TProcessID added by Rene Brun June 2008
485
487{
489
490 if (fSocket == -1) return -1;
491
492 if (mess.IsReading()) {
493 Error("Send", "cannot send a message used for reading");
494 return -1;
495 }
496
497 // send streamer infos in case schema evolution is enabled in the TMessage
498 SendStreamerInfos(mess);
499
500 // send the process id's so TRefs work
501 SendProcessIDs(mess);
502
503 mess.SetLength(); //write length in first word of buffer
504
505 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
506 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
507
508 if (mess.GetCompressionLevel() > 0)
509 const_cast<TMessage&>(mess).Compress();
510
511 char *mbuf = mess.Buffer();
512 Int_t mlen = mess.Length();
513 if (mess.CompBuffer()) {
514 mbuf = mess.CompBuffer();
515 mlen = mess.CompLength();
516 }
517
519 Int_t nsent;
520 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
521 if (nsent == -5) {
522 // Connection reset by peer or broken
524 Close();
525 }
526 return nsent;
527 }
528
529 fBytesSent += nsent;
530 fgBytesSent += nsent;
531
532 // If acknowledgement is desired, wait for it
533 if (mess.What() & kMESS_ACK) {
536 char buf[2];
537 Int_t n = 0;
538 if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
539 if (n == -5) {
540 // Connection reset by peer or broken
542 Close();
543 } else
544 n = -1;
545 return n;
546 }
547 if (strncmp(buf, "ok", 2)) {
548 Error("Send", "bad acknowledgement");
549 return -1;
550 }
551 fBytesRecv += 2;
552 fgBytesRecv += 2;
553 }
554
555 Touch(); // update usage timestamp
556
557 return nsent - sizeof(UInt_t); //length - length header
558}
559
560////////////////////////////////////////////////////////////////////////////////
561/// Send an object. Returns the number of bytes sent and -1 in case of error.
562/// In case the "kind" has been or'ed with kMESS_ACK, the call will only
563/// return after having received an acknowledgement, making the sending
564/// synchronous.
565
567{
568 //stream object to message buffer
569 TMessage mess(kind);
570 mess.WriteObject(obj);
571
572 //now sending the object itself
573 Int_t nsent;
574 if ((nsent = Send(mess)) < 0)
575 return -1;
576
577 return nsent;
578}
579
580////////////////////////////////////////////////////////////////////////////////
581/// Send a raw buffer of specified length. Using option kOob one can send
582/// OOB data. Returns the number of bytes sent or -1 in case of error.
583/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
584/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
585
587{
589
590 if (fSocket == -1) return -1;
591
593 Int_t nsent;
594 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
595 if (nsent == -5) {
596 // Connection reset or broken: close
598 Close();
599 }
600 return nsent;
601 }
602
603 fBytesSent += nsent;
604 fgBytesSent += nsent;
605
606 Touch(); // update usage timestamp
607
608 return nsent;
609}
610
611////////////////////////////////////////////////////////////////////////////////
612/// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
613/// in the object in the message is in the fInfos list of the message.
614/// We send only the TStreamerInfos not yet sent on this socket.
615
617{
618 if (mess.fInfos && mess.fInfos->GetEntries()) {
619 TIter next(mess.fInfos);
620 TStreamerInfo *info;
621 TList *minilist = 0;
622 while ((info = (TStreamerInfo*)next())) {
623 Int_t uid = info->GetNumber();
624 if (fBitsInfo.TestBitNumber(uid))
625 continue; //TStreamerInfo had already been sent
627 if (!minilist)
628 minilist = new TList();
629 if (gDebug > 0)
630 Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
631 info->GetName(),info->GetClassVersion());
632 minilist->Add(info);
633 }
634 if (minilist) {
636 messinfo.WriteObject(minilist);
637 delete minilist;
638 if (messinfo.fInfos)
639 messinfo.fInfos->Clear();
640 if (Send(messinfo) < 0)
641 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
642 }
643 }
644}
645
646////////////////////////////////////////////////////////////////////////////////
647/// Check if TProcessIDs must be sent. The list of TProcessIDs
648/// in the object in the message is found by looking in the TMessage bits.
649/// We send only the TProcessIDs not yet send on this socket.
650
652{
653 if (mess.TestBitNumber(0)) {
655 Int_t npids = pids->GetEntries();
656 TProcessID *pid;
657 TList *minilist = 0;
658 for (Int_t ipid = 0; ipid < npids; ipid++) {
659 pid = (TProcessID*)pids->At(ipid);
660 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
661 continue;
662 //check if a pid with this title has already been sent through the socket
663 //if not add it to the fUUIDs list
664 if (!fUUIDs) {
665 fUUIDs = new TList();
666 } else {
667 if (fUUIDs->FindObject(pid->GetTitle()))
668 continue;
669 }
670 fUUIDs->Add(new TObjString(pid->GetTitle()));
671 if (!minilist)
672 minilist = new TList();
673 if (gDebug > 0)
674 Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
675 minilist->Add(pid);
676 }
677 if (minilist) {
678 TMessage messpid(kMESS_PROCESSID);
679 messpid.WriteObject(minilist);
680 delete minilist;
681 if (Send(messpid) < 0)
682 Warning("SendProcessIDs", "problems sending TProcessID's ...");
683 }
684 }
685}
686
687////////////////////////////////////////////////////////////////////////////////
688/// Receive a character string message of maximum max length. The expected
689/// message must be of type kMESS_STRING. Returns length of received string
690/// (can be 0 if otherside of connection is closed) or -1 in case of error
691/// or -4 in case a non-blocking socket would block (i.e. there is nothing
692/// to be read).
693
695{
696 Int_t n, kind;
697
699 if ((n = Recv(str, max, kind)) <= 0) {
700 if (n == -5) {
702 n = -1;
703 }
704 return n;
705 }
706
707 if (kind != kMESS_STRING) {
708 Error("Recv", "got message of wrong kind (expected %d, got %d)",
709 kMESS_STRING, kind);
710 return -1;
711 }
712
713 return n;
714}
715
716////////////////////////////////////////////////////////////////////////////////
717/// Receive a character string message of maximum max length. Returns in
718/// kind the message type. Returns length of received string+4 (can be 0 if
719/// other side of connection is closed) or -1 in case of error or -4 in
720/// case a non-blocking socket would block (i.e. there is nothing to be read).
721
722Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
723{
724 Int_t n;
725 TMessage *mess;
726
728 if ((n = Recv(mess)) <= 0) {
729 if (n == -5) {
731 n = -1;
732 }
733 return n;
734 }
735
736 kind = mess->What();
737 if (str) {
738 if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
739 mess->ReadString(str, max);
740 else
741 str[0] = 0;
742 }
743
744 delete mess;
745
746 return n; // number of bytes read (len of str + sizeof(kind)
747}
748
749////////////////////////////////////////////////////////////////////////////////
750/// Receives a status and a message type. Returns length of received
751/// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
752/// is closed) or -1 in case of error or -4 in case a non-blocking
753/// socket would block (i.e. there is nothing to be read).
754
756{
757 Int_t n;
758 TMessage *mess;
759
761 if ((n = Recv(mess)) <= 0) {
762 if (n == -5) {
764 n = -1;
765 }
766 return n;
767 }
768
769 kind = mess->What();
770 (*mess) >> status;
771
772 delete mess;
773
774 return n; // number of bytes read (2 * sizeof(Int_t)
775}
776
777////////////////////////////////////////////////////////////////////////////////
778/// Receive a TMessage object. The user must delete the TMessage object.
779/// Returns length of message in bytes (can be 0 if other side of connection
780/// is closed) or -1 in case of error or -4 in case a non-blocking socket
781/// would block (i.e. there is nothing to be read) or -5 if pipe broken
782/// or reset by peer (EPIPE || ECONNRESET). In those case mess == 0.
783
785{
787
788 if (fSocket == -1) {
789 mess = 0;
790 return -1;
791 }
792
793oncemore:
795 Int_t n;
796 UInt_t len;
797 if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
798 if (n == 0 || n == -5) {
799 // Connection closed, reset or broken
801 Close();
802 }
803 mess = 0;
804 return n;
805 }
806 len = net2host(len); //from network to host byte order
807
809 char *buf = new char[len+sizeof(UInt_t)];
810 if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
811 if (n == 0 || n == -5) {
812 // Connection closed, reset or broken
814 Close();
815 }
816 delete [] buf;
817 mess = 0;
818 return n;
819 }
820
821 fBytesRecv += n + sizeof(UInt_t);
822 fgBytesRecv += n + sizeof(UInt_t);
823
824 mess = new TMessage(buf, len+sizeof(UInt_t));
825
826 // receive any streamer infos
827 if (RecvStreamerInfos(mess))
828 goto oncemore;
829
830 // receive any process ids
831 if (RecvProcessIDs(mess))
832 goto oncemore;
833
834 if (mess->What() & kMESS_ACK) {
836 char ok[2] = { 'o', 'k' };
837 Int_t n2 = 0;
838 if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
839 if (n2 == -5) {
840 // Connection reset or broken
842 Close();
843 }
844 delete mess;
845 mess = 0;
846 return n2;
847 }
848 mess->SetWhat(mess->What() & ~kMESS_ACK);
849
850 fBytesSent += 2;
851 fgBytesSent += 2;
852 }
853
854 Touch(); // update usage timestamp
855
856 return n;
857}
858
859////////////////////////////////////////////////////////////////////////////////
860/// Receive a raw buffer of specified length bytes. Using option kPeek
861/// one can peek at incoming data. Returns number of received bytes.
862/// Returns -1 in case of error. In case of opt == kOob: -2 means
863/// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
864/// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
865/// peer (EPIPE || ECONNRESET).
866
868{
870
871 if (fSocket == -1) return -1;
872 if (length == 0) return 0;
873
875 Int_t n;
876 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
877 if (n == 0 || n == -5) {
878 // Connection closed, reset or broken
880 Close();
881 }
882 return n;
883 }
884
885 fBytesRecv += n;
886 fgBytesRecv += n;
887
888 Touch(); // update usage timestamp
889
890 return n;
891}
892
893////////////////////////////////////////////////////////////////////////////////
894/// Receive a message containing streamer infos. In case the message contains
895/// streamer infos they are imported, the message will be deleted and the
896/// method returns kTRUE.
897
899{
900 if (mess->What() == kMESS_STREAMERINFO) {
901 TList *list = (TList*)mess->ReadObject(TList::Class());
902 TIter next(list);
903 TStreamerInfo *info;
904 TObjLink *lnk = list->FirstLink();
905 // First call BuildCheck for regular class
906 while (lnk) {
907 info = (TStreamerInfo*)lnk->GetObject();
908 TObject *element = info->GetElements()->UncheckedAt(0);
909 Bool_t isstl = element && strcmp("This",element->GetName())==0;
910 if (!isstl) {
911 info->BuildCheck();
912 if (gDebug > 0)
913 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
914 info->GetName(), info->GetClassVersion());
915 }
916 lnk = lnk->Next();
917 }
918 // Then call BuildCheck for stl class
919 lnk = list->FirstLink();
920 while (lnk) {
921 info = (TStreamerInfo*)lnk->GetObject();
922 TObject *element = info->GetElements()->UncheckedAt(0);
923 Bool_t isstl = element && strcmp("This",element->GetName())==0;
924 if (isstl) {
925 info->BuildCheck();
926 if (gDebug > 0)
927 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
928 info->GetName(), info->GetClassVersion());
929 }
930 lnk = lnk->Next();
931 }
932 delete list;
933 delete mess;
934
935 return kTRUE;
936 }
937 return kFALSE;
938}
939
940////////////////////////////////////////////////////////////////////////////////
941/// Receive a message containing process ids. In case the message contains
942/// process ids they are imported, the message will be deleted and the
943/// method returns kTRUE.
944
946{
947 if (mess->What() == kMESS_PROCESSID) {
948 TList *list = (TList*)mess->ReadObject(TList::Class());
949 TIter next(list);
950 TProcessID *pid;
951 while ((pid = (TProcessID*)next())) {
952 // check that a similar pid is not already registered in fgPIDs
953 TObjArray *pidslist = TProcessID::GetPIDs();
954 TIter nextpid(pidslist);
955 TProcessID *p;
956 while ((p = (TProcessID*)nextpid())) {
957 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
958 delete pid;
959 pid = 0;
960 break;
961 }
962 }
963 if (pid) {
964 if (gDebug > 0)
965 Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
966 pid->IncrementCount();
967 pidslist->Add(pid);
968 Int_t ind = pidslist->IndexOf(pid);
969 pid->SetUniqueID((UInt_t)ind);
970 }
971 }
972 delete list;
973 delete mess;
974
975 return kTRUE;
976 }
977 return kFALSE;
978}
979
980////////////////////////////////////////////////////////////////////////////////
981/// Set socket options.
982
984{
985 if (fSocket == -1) return -1;
986
987 return gSystem->SetSockOpt(fSocket, opt, val);
988}
989
990////////////////////////////////////////////////////////////////////////////////
991/// Get socket options. Returns -1 in case of error.
992
994{
995 if (fSocket == -1) return -1;
996
997 return gSystem->GetSockOpt(fSocket, opt, &val);
998}
999
1000////////////////////////////////////////////////////////////////////////////////
1001/// Returns error code. Meaning depends on context where it is called.
1002/// If no error condition returns 0 else a value < 0.
1003/// For example see TServerSocket ctor.
1004
1006{
1007 if (!IsValid())
1008 return fSocket;
1009
1010 return 0;
1011}
1012
1013////////////////////////////////////////////////////////////////////////////////
1014/// See comments for function SetCompressionSettings
1015
1017{
1018 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1019 if (fCompress < 0) {
1020 // if the level is not defined yet use 4 as a default (with ZLIB was 1)
1022 } else {
1023 int level = fCompress % 100;
1024 fCompress = 100 * algorithm + level;
1025 }
1026}
1027
1028////////////////////////////////////////////////////////////////////////////////
1029/// See comments for function SetCompressionSettings
1030
1032{
1033 if (level < 0) level = 0;
1034 if (level > 99) level = 99;
1035 if (fCompress < 0) {
1036 // if the algorithm is not defined yet use 0 as a default
1037 fCompress = level;
1038 } else {
1039 int algorithm = fCompress / 100;
1040 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1041 fCompress = 100 * algorithm + level;
1042 }
1043}
1044
1045////////////////////////////////////////////////////////////////////////////////
1046/// Used to specify the compression level and algorithm:
1047/// settings = 100 * algorithm + level
1048///
1049/// level = 0, objects written to this file will not be compressed.
1050/// level = 1, minimal compression level but fast.
1051/// ....
1052/// level = 9, maximal compression level but slower and might use more memory.
1053/// (For the currently supported algorithms, the maximum level is 9)
1054/// If compress is negative it indicates the compression level is not set yet.
1055///
1056/// The enumeration ROOT::RCompressionSetting::EAlgorithm associates each
1057/// algorithm with a number. There is a utility function to help
1058/// to set the value of the argument. For example,
1059/// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1060/// will build an integer which will set the compression to use
1061/// the LZMA algorithm and compression level 1. These are defined
1062/// in the header file Compression.h.
1063///
1064/// Note that the compression settings may be changed at any time.
1065/// The new compression settings will only apply to branches created
1066/// or attached after the setting is changed and other objects written
1067/// after the setting is changed.
1068
1070{
1071 fCompress = settings;
1072}
1073
1074////////////////////////////////////////////////////////////////////////////////
1075/// Print error string depending on error code.
1076
1077void TUDPSocket::NetError(const char *where, Int_t err)
1078{
1079 // Make sure it is in range
1080 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1081
1082 if (gDebug > 0)
1083 ::Error(where, "%s", gRootdErrStr[err]);
1084}
1085
1086////////////////////////////////////////////////////////////////////////////////
1087/// Get total number of bytes sent via all sockets.
1088
1090{
1091 return fgBytesSent;
1092}
1093
1094////////////////////////////////////////////////////////////////////////////////
1095/// Get total number of bytes received via all sockets.
1096
1098{
1099 return fgBytesRecv;
1100}
UShort_t net2host(UShort_t x)
Definition Bytes.h:575
@ kMESS_STRING
@ kMESS_ACK
@ kMESS_PROCESSID
@ kMESS_STREAMERINFO
R__EXTERN const char * gRootdErrStr[]
Definition NetErrors.h:72
@ kErrError
Definition NetErrors.h:69
#define SafeDelete(p)
Definition RConfig.hxx:525
int Int_t
Definition RtypesCore.h:45
long Long_t
Definition RtypesCore.h:54
unsigned int UInt_t
Definition RtypesCore.h:46
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
unsigned long long ULong64_t
Definition RtypesCore.h:81
constexpr Bool_t kTRUE
Definition RtypesCore.h:100
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:377
#define R__ASSERT(e)
Definition TError.h:118
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Int_t gDebug
Definition TROOT.cxx:597
R__EXTERN TVirtualMutex * gROOTMutex
Definition TROOT.h:63
#define gROOT
Definition TROOT.h:406
ESockOptions
Definition TSystem.h:213
ESendRecvOptions
Definition TSystem.h:226
R__EXTERN TSystem * gSystem
Definition TSystem.h:555
#define R__LOCKGUARD(mutex)
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
TObject * ReadObject(const TClass *cl) override
Read object from I/O buffer.
void WriteString(const char *s) override
Write string to I/O buffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void WriteObject(const TObject *obj, Bool_t cacheReuse=kTRUE) override
Write object to I/O buffer.
Int_t BufferSize() const
Definition TBuffer.h:98
Bool_t IsReading() const
Definition TBuffer.h:86
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
virtual Int_t GetEntries() const
This class represents an Internet Protocol (IP) address.
Int_t GetPort() const
const char * GetHostName() const
A doubly linked list.
Definition TList.h:38
static TClass * Class()
void Clear(Option_t *option="") override
Remove all objects from the list.
Definition TList.cxx:400
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:576
void Add(TObject *obj) override
Definition TList.h:83
virtual TObjLink * FirstLink() const
Definition TList.h:104
UInt_t What() const
Definition TMessage.h:75
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:202
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TMessage.h:59
char * CompBuffer() const
Definition TMessage.h:89
Int_t Compress()
Compress the message.
Definition TMessage.cxx:306
Int_t GetCompressionLevel() const
Definition TMessage.h:106
TList * fInfos
Definition TMessage.h:42
Int_t CompLength() const
Definition TMessage.h:90
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:222
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:164
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
const char * GetTitle() const override
Returns title of object.
Definition TNamed.h:48
TString fName
Definition TNamed.h:32
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition TNamed.cxx:140
An array of TObjects.
Definition TObjArray.h:31
Int_t IndexOf(const TObject *obj) const override
Int_t GetEntries() const override
Return the number of objects in array (i.e.
TObject * At(Int_t idx) const override
Definition TObjArray.h:164
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:84
void Add(TObject *obj) override
Definition TObjArray.h:68
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:439
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:457
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:973
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:780
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:987
virtual void SetUniqueID(UInt_t uid)
Set the unique object id.
Definition TObject.cxx:791
virtual const char * GetTitle() const
Returns title of object.
Definition TObject.cxx:483
void ResetBit(UInt_t f)
Definition TObject.h:198
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:961
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
Int_t IncrementCount()
Increase the reference count to this object.
static TObjArray * GetPIDs()
static: returns array of TProcessIDs
Describes a persistent version of a class.
Int_t GetClassVersion() const override
Int_t GetNumber() const override
TObjArray * GetElements() const override
void BuildCheck(TFile *file=nullptr, Bool_t load=kTRUE) override
Check if built and consistent with the class dictionary.
Basic string class.
Definition TString.h:139
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2356
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:632
virtual int GetServiceByName(const char *service)
Get port # of internet service.
Definition TSystem.cxx:2318
virtual TInetAddress GetSockName(int sock)
Get Internet Protocol (IP) address of host and port #.
Definition TSystem.cxx:2309
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:284
virtual char * GetServiceByPort(int port)
Get name of internet service.
Definition TSystem.cxx:2327
virtual int SetSockOpt(int sock, int kind, int val)
Set socket option.
Definition TSystem.cxx:2436
virtual TInetAddress GetPeerName(int sock)
Get Internet Protocol (IP) address of remote host and port #.
Definition TSystem.cxx:2300
virtual int OpenConnection(const char *server, int port, int tcpwindowsize=-1, const char *protocol="tcp")
Open a connection to another host.
Definition TSystem.cxx:2336
virtual int GetSockOpt(int sock, int kind, int *val)
Get socket option.
Definition TSystem.cxx:2445
virtual int RecvRaw(int sock, void *buffer, int length, int flag)
Receive exactly length bytes into buffer.
Definition TSystem.cxx:2399
virtual Int_t Select(TList *active, Long_t timeout)
Select on active file descriptors (called by TMonitor).
Definition TSystem.cxx:445
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition TSystem.cxx:2291
virtual int SendRaw(int sock, const void *buffer, int length, int flag)
Send exactly length bytes from buffer.
Definition TSystem.cxx:2409
virtual void CloseConnection(int sock, Bool_t force=kFALSE)
Close socket connection.
Definition TSystem.cxx:2390
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
UInt_t fBytesRecv
Definition TUDPSocket.h:50
Int_t GetErrorCode() const
Returns error code.
UInt_t fBytesSent
Definition TUDPSocket.h:51
Int_t fCompress
Definition TUDPSocket.h:52
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
virtual TInetAddress GetLocalInetAddress()
Return internet address of local host to which the socket is bound.
EServiceType fServType
Definition TUDPSocket.h:58
TInetAddress fLocalAddress
Definition TUDPSocket.h:53
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
See comments for function SetCompressionSettings.
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
TVirtualMutex * fLastUsageMtx
Definition TUDPSocket.h:64
virtual Int_t GetLocalPort()
Return the local port # to which the socket is bound.
TInetAddress fAddress
Definition TUDPSocket.h:49
TString fService
Definition TUDPSocket.h:57
static ULong64_t fgBytesRecv
Definition TUDPSocket.h:67
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
virtual Bool_t IsValid() const
Definition TUDPSocket.h:119
Int_t fRemoteProtocol
Definition TUDPSocket.h:54
static void NetError(const char *where, Int_t error)
Print error string depending on error code.
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
See comments for function SetCompressionSettings.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual void Close(Option_t *opt="")
Close the socket.
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
static ULong64_t fgBytesSent
Definition TUDPSocket.h:68
Int_t GetCompressionLevel() const
Definition TUDPSocket.h:161
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
TList * fUUIDs
Definition TUDPSocket.h:62
void Touch()
Definition TUDPSocket.h:144
TBits fBitsInfo
Definition TUDPSocket.h:61
static ULong64_t GetSocketBytesRecv()
Get total number of bytes received via all sockets.
TString fUrl
Definition TUDPSocket.h:60
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Option_t * GetOption() const override
Definition TUDPSocket.h:83
static ULong64_t GetSocketBytesSent()
Get total number of bytes sent via all sockets.
Int_t fSocket
Definition TUDPSocket.h:59
virtual Int_t Select(Int_t interest=kRead, Long_t timeout=-1)
Waits for this socket to change status.
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
TSecContext * fSecContext
Definition TUDPSocket.h:55
This class represents a WWW compatible URL.
Definition TUrl.h:33
const Int_t n
Definition legend1.C:16
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:70