Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
TSocket.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 18/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/**
13\file TSocket.cxx
14\class TSocket
15\brief This class implements client sockets.
16\note This class deals with sockets: the user is entirely responsible for the security of their usage, for example, but
17not limited to, the management of the connections to said sockets.
18
19A socket is an endpoint for communication between two machines. The actual work is done via the TSystem class (either
20TUnixSystem or TWinNTSystem).
21
22**/
23
24#include "Bytes.h"
25#include "Compression.h"
26#include "NetErrors.h"
27#include "TError.h"
28#include "TMessage.h"
29#include "TObjString.h"
30#include "TPSocket.h"
31#include "TPluginManager.h"
32#include "TROOT.h"
33#include "TString.h"
34#include "TSystem.h"
35#include "TUrl.h"
36#include "TStreamerInfo.h"
37#include "TProcessID.h"
38
39#include <limits>
40
43
44//
45// Client "protocol changes"
46//
47// This was in TNetFile and TAuthenticate before, but after the introduction
48// of TSocket::CreateAuthSocket the common place for all the clients is TSocket,
49// so this seems to be the right place for a version number
50//
51// 7: added support for ReOpen(), kROOTD_BYE and kROOTD_PROTOCOL2
52// 8: added support for update being a create (open stat = 2 and not 1)
53// 9: added new authentication features (see README.AUTH)
54// 10: added support for authenticated socket via TSocket::CreateAuthSocket(...)
55// 11: modified SSH protocol + support for server 'no authentication' mode
56// 12: add random tags to avoid reply attacks (password+token)
57// 13: LEGACY: authentication re-organization; cleanup in PROOF
58// 14: support for SSH authentication via SSH tunnel
59// 15: cope with fixes in TUrl::GetFile
60// 16: add env setup message exchange
61//
62Int_t TSocket::fgClientProtocol = 17; // increase when client protocol changes
63
64////////////////////////////////////////////////////////////////////////////////
65/// Create a socket. Connect to the named service at address addr.
66/// Use tcpwindowsize to specify the size of the receive buffer, it has
67/// to be specified here to make sure the window scale option is set (for
68/// tcpwindowsize > 65KB and for platforms supporting window scaling).
69/// Returns when connection has been accepted by remote side. Use IsValid()
70/// to check the validity of the socket. Every socket is added to the TROOT
71/// sockets list which will make sure that any open sockets are properly
72/// closed on program termination.
73
74TSocket::TSocket(TInetAddress addr, const char *service, Int_t tcpwindowsize)
75 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
76{
79
80 fService = service;
83 if (fService.Contains("root"))
85 fAddress = addr;
86 fAddress.fPort = gSystem->GetServiceByName(service);
87 fBytesSent = 0;
88 fBytesRecv = 0;
89 fTcpWindowSize = tcpwindowsize;
90 fUUIDs = 0;
91 fLastUsageMtx = 0;
93
94 if (fAddress.GetPort() != -1) {
95 fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
96 tcpwindowsize);
97
98 if (fSocket != kInvalid) {
99 gROOT->GetListOfSockets()->Add(this);
100 }
101 } else
103
104}
105
106////////////////////////////////////////////////////////////////////////////////
107/// Create a socket. Connect to the specified port # at address addr.
108/// Use tcpwindowsize to specify the size of the receive buffer, it has
109/// to be specified here to make sure the window scale option is set (for
110/// tcpwindowsize > 65KB and for platforms supporting window scaling).
111/// Returns when connection has been accepted by remote side. Use IsValid()
112/// to check the validity of the socket. Every socket is added to the TROOT
113/// sockets list which will make sure that any open sockets are properly
114/// closed on program termination.
115
116TSocket::TSocket(TInetAddress addr, Int_t port, Int_t tcpwindowsize)
117 : TNamed(addr.GetHostName(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
118{
121
122 fService = gSystem->GetServiceByPort(port);
123 fRemoteProtocol= -1;
125 if (fService.Contains("root"))
127 fAddress = addr;
128 fAddress.fPort = port;
130 fBytesSent = 0;
131 fBytesRecv = 0;
132 fTcpWindowSize = tcpwindowsize;
133 fUUIDs = 0;
134 fLastUsageMtx = 0;
136
137 fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
138 tcpwindowsize);
139 if (fSocket == kInvalid)
140 fAddress.fPort = -1;
141 else {
142 gROOT->GetListOfSockets()->Add(this);
143 }
144}
145
146////////////////////////////////////////////////////////////////////////////////
147/// Create a socket. Connect to named service on the remote host.
148/// Use tcpwindowsize to specify the size of the receive buffer, it has
149/// to be specified here to make sure the window scale option is set (for
150/// tcpwindowsize > 65KB and for platforms supporting window scaling).
151/// Returns when connection has been accepted by remote side. Use IsValid()
152/// to check the validity of the socket. Every socket is added to the TROOT
153/// sockets list which will make sure that any open sockets are properly
154/// closed on program termination.
155
156TSocket::TSocket(const char *host, const char *service, Int_t tcpwindowsize)
157 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
158{
161
162 fService = service;
163 fRemoteProtocol= -1;
165 if (fService.Contains("root"))
167 fAddress = gSystem->GetHostByName(host);
168 fAddress.fPort = gSystem->GetServiceByName(service);
169 SetName(fAddress.GetHostName());
170 fBytesSent = 0;
171 fBytesRecv = 0;
172 fTcpWindowSize = tcpwindowsize;
173 fUUIDs = 0;
174 fLastUsageMtx = 0;
176
177 if (fAddress.GetPort() != -1) {
178 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), tcpwindowsize);
179 if (fSocket != kInvalid) {
180 gROOT->GetListOfSockets()->Add(this);
181 }
182 } else
184}
185
186////////////////////////////////////////////////////////////////////////////////
187/// Create a socket. The url parameter has the form
188///
189/// [sockd://]host[:port][/service]
190///
191/// where
192/// [port] = is the remote port number
193/// [service] = service name used to determine the port
194/// (for backward compatibility, specification of
195/// port as priority)
196///
197/// Connect to the specified port # on the remote host.
198/// Use tcpwindowsize to specify the size of the receive buffer, it has
199/// to be specified here to make sure the window scale option is set (for
200/// tcpwindowsize > 65KB and for platforms supporting window scaling).
201/// Returns when connection has been accepted by remote side. Use IsValid()
202/// to check the validity of the socket. Every socket is added to the TROOT
203/// sockets list which will make sure that any open sockets are properly
204/// closed on program termination.
205
206TSocket::TSocket(const char *url, Int_t port, Int_t tcpwindowsize)
207 : TNamed(TUrl(url).GetHost(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
208{
211
212 fUrl = TString(url);
213 TString host(TUrl(fUrl).GetHost());
214
215 fService = gSystem->GetServiceByPort(port);
216 fRemoteProtocol= -1;
218 if (fUrl.Contains("root"))
220 fAddress = gSystem->GetHostByName(host);
221 fAddress.fPort = port;
222 SetName(fAddress.GetHostName());
224 fBytesSent = 0;
225 fBytesRecv = 0;
226 fTcpWindowSize = tcpwindowsize;
227 fUUIDs = 0;
228 fLastUsageMtx = 0;
230
231 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), tcpwindowsize);
232 if (fSocket == kInvalid) {
233 fAddress.fPort = kInvalid;
234 } else {
235 gROOT->GetListOfSockets()->Add(this);
236 }
237}
238
239////////////////////////////////////////////////////////////////////////////////
240/// Create a socket in the Unix domain on 'sockpath'.
241/// Returns when connection has been accepted by the server. Use IsValid()
242/// to check the validity of the socket. Every socket is added to the TROOT
243/// sockets list which will make sure that any open sockets are properly
244/// closed on program termination.
245
246TSocket::TSocket(const char *sockpath) : TNamed(sockpath, ""),
247 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
248{
251
252 fUrl = sockpath;
253
254 fService = "unix";
255 fRemoteProtocol= -1;
257 fAddress.fPort = -1;
258 fName.Form("unix:%s", sockpath);
260 fBytesSent = 0;
261 fBytesRecv = 0;
262 fTcpWindowSize = -1;
263 fUUIDs = 0;
264 fLastUsageMtx = 0;
266
267 fSocket = gSystem->OpenConnection(sockpath, -1, -1);
268 if (fSocket > 0) {
269 gROOT->GetListOfSockets()->Add(this);
270 }
271}
272
273////////////////////////////////////////////////////////////////////////////////
274/// Create a socket. The socket will adopt previously opened TCP socket with
275/// descriptor desc.
276
277TSocket::TSocket(Int_t desc) : TNamed("", ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
278{
281
282 fRemoteProtocol = 0;
283 fService = (char *)kSOCKD;
285 fBytesSent = 0;
286 fBytesRecv = 0;
287 fTcpWindowSize = -1;
288 fUUIDs = 0;
289 fLastUsageMtx = 0;
291
292 if (desc >= 0) {
293 fSocket = desc;
294 fAddress = gSystem->GetPeerName(fSocket);
295 gROOT->GetListOfSockets()->Add(this);
296 } else
298}
299
300////////////////////////////////////////////////////////////////////////////////
301/// Create a socket. The socket will adopt previously opened Unix socket with
302/// descriptor desc. The sockpath arg is for info purposes only. Use
303/// this method to adopt e.g. a socket created via socketpair().
304
305TSocket::TSocket(Int_t desc, const char *sockpath) : TNamed(sockpath, ""),
306 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
307{
310
311 fUrl = sockpath;
312
313 fService = "unix";
314 fRemoteProtocol= -1;
316 fAddress.fPort = -1;
317 fName.Form("unix:%s", sockpath);
319 fBytesSent = 0;
320 fBytesRecv = 0;
321 fTcpWindowSize = -1;
322 fUUIDs = 0;
323 fLastUsageMtx = 0;
325
326 if (desc >= 0) {
327 fSocket = desc;
328 gROOT->GetListOfSockets()->Add(this);
329 } else
331}
332
333
334////////////////////////////////////////////////////////////////////////////////
335/// TSocket copy ctor.
336
338{
339 fSocket = s.fSocket;
340 fService = s.fService;
341 fAddress = s.fAddress;
349 fUUIDs = 0;
350 fLastUsageMtx = 0;
352
353 if (fSocket != kInvalid) {
354 gROOT->GetListOfSockets()->Add(this);
355 }
356}
357////////////////////////////////////////////////////////////////////////////////
358/// Close the socket and mark as due to a broken connection.
359
361{
363 if (IsValid()) {
364 gSystem->CloseConnection(fSocket, kFALSE);
366 }
367
370}
371
372////////////////////////////////////////////////////////////////////////////////
373/// Close the socket. If option is "force", calls shutdown(id,2) to
374/// shut down the connection. This will close the connection also
375/// for the parent of this process. Also called via the dtor (without
376/// option "force", call explicitly Close("force") if this is desired).
377
379{
380 Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
381
382 if (fSocket != kInvalid) {
383 if (IsValid()) { // Filter out kInvalidStillInList case (disconnected but not removed from list)
384 gSystem->CloseConnection(fSocket, force);
385 }
386 gROOT->GetListOfSockets()->Remove(this);
387 }
389
392}
393
394////////////////////////////////////////////////////////////////////////////////
395/// Return internet address of local host to which the socket is bound.
396/// In case of error TInetAddress::IsValid() returns kFALSE.
397
399{
400 if (IsValid()) {
401 if (fLocalAddress.GetPort() == -1)
402 fLocalAddress = gSystem->GetSockName(fSocket);
403 return fLocalAddress;
404 }
405 return TInetAddress();
406}
407
408////////////////////////////////////////////////////////////////////////////////
409/// Return the local port # to which the socket is bound.
410/// In case of error return -1.
411
413{
414 if (IsValid()) {
415 if (fLocalAddress.GetPort() == -1)
417 return fLocalAddress.GetPort();
418 }
419 return -1;
420}
421
422////////////////////////////////////////////////////////////////////////////////
423/// Waits for this socket to change status. If interest=kRead,
424/// the socket will be watched to see if characters become available for
425/// reading; if interest=kWrite the socket will be watched to
426/// see if a write will not block.
427/// The argument 'timeout' specifies a maximum time to wait in millisec.
428/// Default no timeout.
429/// Returns 1 if a change of status of interest has been detected within
430/// timeout; 0 in case of timeout; < 0 if an error occurred.
431
433{
434 Int_t rc = 1;
435
436 // Associate a TFileHandler to this socket
437 TFileHandler fh(fSocket, interest);
438
439 // Wait for an event now
440 rc = gSystem->Select(&fh, timeout);
441
442 return rc;
443}
444
445////////////////////////////////////////////////////////////////////////////////
446/// Send a single message opcode. Use kind (opcode) to set the
447/// TMessage "what" field. Returns the number of bytes that were sent
448/// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
449/// been or'ed with kMESS_ACK, the call will only return after having
450/// received an acknowledgement, making the sending process synchronous.
451
453{
454 TMessage mess(kind);
455
456 Int_t nsent;
457 if ((nsent = Send(mess)) < 0)
458 return -1;
459
460 return nsent;
461}
462
463////////////////////////////////////////////////////////////////////////////////
464/// Send a status and a single message opcode. Use kind (opcode) to set the
465/// TMessage "what" field. Returns the number of bytes that were sent
466/// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
467/// been or'ed with kMESS_ACK, the call will only return after having
468/// received an acknowledgement, making the sending process synchronous.
469
471{
472 TMessage mess(kind);
473 mess << status;
474
475 Int_t nsent;
476 if ((nsent = Send(mess)) < 0)
477 return -1;
478
479 return nsent;
480}
481
482////////////////////////////////////////////////////////////////////////////////
483/// Send a character string buffer. Use kind to set the TMessage "what" field.
484/// Returns the number of bytes in the string str that were sent and -1 in
485/// case of error. In case the kind has been or'ed with kMESS_ACK, the call
486/// will only return after having received an acknowledgement, making the
487/// sending process synchronous.
488
489Int_t TSocket::Send(const char *str, Int_t kind)
490{
491 TMessage mess(kind);
492 if (str) mess.WriteString(str);
493
494 Int_t nsent;
495 if ((nsent = Send(mess)) < 0)
496 return -1;
497
498 return nsent - sizeof(Int_t); // - TMessage::What()
499}
500
501////////////////////////////////////////////////////////////////////////////////
502/// Send a TMessage object. Returns the number of bytes in the TMessage
503/// that were sent and -1 in case of error. In case the TMessage::What
504/// has been or'ed with kMESS_ACK, the call will only return after having
505/// received an acknowledgement, making the sending process synchronous.
506/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
507/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
508/// support for streaming TStreamerInfo added by Rene Brun May 2008
509/// support for streaming TProcessID added by Rene Brun June 2008
510
512{
514
515 if (fSocket < 0) return -1;
516
517 if (mess.IsReading()) {
518 Error("Send", "cannot send a message used for reading");
519 return -1;
520 }
521
522 // send streamer infos in case schema evolution is enabled in the TMessage
523 SendStreamerInfos(mess);
524
525 // send the process id's so TRefs work
526 SendProcessIDs(mess);
527
528 mess.SetLength(); //write length in first word of buffer
529
530 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
531 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
532
533 if (mess.GetCompressionLevel() > 0)
534 const_cast<TMessage&>(mess).Compress();
535
536 char *mbuf = mess.Buffer();
537 Int_t mlen = mess.Length();
538 if (mess.CompBuffer()) {
539 mbuf = mess.CompBuffer();
540 mlen = mess.CompLength();
541 }
542
544 Int_t nsent;
545 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
546 if (nsent == -5) {
547 // Connection reset by peer or broken
549 }
550 return nsent;
551 }
552
553 fBytesSent += nsent;
554 fgBytesSent += nsent;
555
556 // If acknowledgement is desired, wait for it
557 if (mess.What() & kMESS_ACK) {
560 char buf[2];
561 Int_t n = 0;
562 if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
563 if (n == -5) {
564 // Connection reset by peer or broken
566 } else
567 n = -1;
568 return n;
569 }
570 if (strncmp(buf, "ok", 2)) {
571 Error("Send", "bad acknowledgement");
572 return -1;
573 }
574 fBytesRecv += 2;
575 fgBytesRecv += 2;
576 }
577
578 Touch(); // update usage timestamp
579
580 return nsent - sizeof(UInt_t); //length - length header
581}
582
583////////////////////////////////////////////////////////////////////////////////
584/// Send an object. Returns the number of bytes sent and -1 in case of error.
585/// In case the "kind" has been or'ed with kMESS_ACK, the call will only
586/// return after having received an acknowledgement, making the sending
587/// synchronous.
588
590{
591 //stream object to message buffer
592 TMessage mess(kind);
593 mess.WriteObject(obj);
594
595 //now sending the object itself
596 Int_t nsent;
597 if ((nsent = Send(mess)) < 0)
598 return -1;
599
600 return nsent;
601}
602
603////////////////////////////////////////////////////////////////////////////////
604/// Send a raw buffer of specified length. Using option kOob one can send
605/// OOB data. Returns the number of bytes sent or -1 in case of error.
606/// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
607/// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
608
609Int_t TSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
610{
612
613 if (!IsValid()) return -1;
614
616 Int_t nsent;
617 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
618 if (nsent == -5) {
619 // Connection reset or broken: close
621 }
622 return nsent;
623 }
624
625 fBytesSent += nsent;
626 fgBytesSent += nsent;
627
628 Touch(); // update usage timestamp
629
630 return nsent;
631}
632
633////////////////////////////////////////////////////////////////////////////////
634/// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
635/// in the object in the message is in the fInfos list of the message.
636/// We send only the TStreamerInfos not yet sent on this socket.
637
639{
640 if (mess.fInfos && mess.fInfos->GetEntries()) {
641 TIter next(mess.fInfos);
642 TStreamerInfo *info;
643 TList *minilist = 0;
644 while ((info = (TStreamerInfo*)next())) {
645 Int_t uid = info->GetNumber();
646 if (fBitsInfo.TestBitNumber(uid))
647 continue; //TStreamerInfo had already been sent
648 fBitsInfo.SetBitNumber(uid);
649 if (!minilist)
650 minilist = new TList();
651 if (gDebug > 0)
652 Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
653 info->GetName(),info->GetClassVersion());
654 minilist->Add(info);
655 }
656 if (minilist) {
658 messinfo.WriteObject(minilist);
659 delete minilist;
660 if (messinfo.fInfos)
661 messinfo.fInfos->Clear();
662 if (Send(messinfo) < 0)
663 Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
664 }
665 }
666}
667
668////////////////////////////////////////////////////////////////////////////////
669/// Check if TProcessIDs must be sent. The list of TProcessIDs
670/// in the object in the message is found by looking in the TMessage bits.
671/// We send only the TProcessIDs not yet send on this socket.
672
674{
675 if (mess.TestBitNumber(0)) {
677 Int_t npids = pids->GetEntries();
678 TProcessID *pid;
679 TList *minilist = 0;
680 for (Int_t ipid = 0; ipid < npids; ipid++) {
681 pid = (TProcessID*)pids->At(ipid);
682 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
683 continue;
684 //check if a pid with this title has already been sent through the socket
685 //if not add it to the fUUIDs list
686 if (!fUUIDs) {
687 fUUIDs = new TList();
688 fUUIDs->SetOwner(kTRUE);
689 } else {
690 if (fUUIDs->FindObject(pid->GetTitle()))
691 continue;
692 }
693 fUUIDs->Add(new TObjString(pid->GetTitle()));
694 if (!minilist)
695 minilist = new TList();
696 if (gDebug > 0)
697 Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
698 minilist->Add(pid);
699 }
700 if (minilist) {
701 TMessage messpid(kMESS_PROCESSID);
702 messpid.WriteObject(minilist);
703 delete minilist;
704 if (Send(messpid) < 0)
705 Warning("SendProcessIDs", "problems sending TProcessID's ...");
706 }
707 }
708}
709
710////////////////////////////////////////////////////////////////////////////////
711/// Receive a character string message of maximum max length. The expected
712/// message must be of type kMESS_STRING. Returns length of received string
713/// (can be 0 if otherside of connection is closed) or -1 in case of error
714/// or -4 in case a non-blocking socket would block (i.e. there is nothing
715/// to be read).
716
717Int_t TSocket::Recv(char *str, Int_t max)
718{
719 Int_t n, kind;
720
722 if ((n = Recv(str, max, kind)) <= 0) {
723 if (n == -5) {
725 n = -1;
726 }
727 return n;
728 }
729
730 if (kind != kMESS_STRING) {
731 Error("Recv", "got message of wrong kind (expected %d, got %d)",
732 kMESS_STRING, kind);
733 return -1;
734 }
735
736 return n;
737}
738
739////////////////////////////////////////////////////////////////////////////////
740/// Receive a character string message of maximum max length. Returns in
741/// kind the message type. Returns length of received string+4 (can be 0 if
742/// other side of connection is closed) or -1 in case of error or -4 in
743/// case a non-blocking socket would block (i.e. there is nothing to be read).
744
745Int_t TSocket::Recv(char *str, Int_t max, Int_t &kind)
746{
747 Int_t n;
748 TMessage *mess;
749
751 if ((n = Recv(mess)) <= 0) {
752 if (n == -5) {
754 n = -1;
755 }
756 return n;
757 }
758
759 kind = mess->What();
760 if (str) {
761 if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
762 mess->ReadString(str, max);
763 else
764 str[0] = 0;
765 }
766
767 delete mess;
768
769 return n; // number of bytes read (len of str + sizeof(kind)
770}
771
772////////////////////////////////////////////////////////////////////////////////
773/// Receives a status and a message type. Returns length of received
774/// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
775/// is closed) or -1 in case of error or -4 in case a non-blocking
776/// socket would block (i.e. there is nothing to be read).
777
779{
780 Int_t n;
781 TMessage *mess;
782
784 if ((n = Recv(mess)) <= 0) {
785 if (n == -5) {
787 n = -1;
788 }
789 return n;
790 }
791
792 kind = mess->What();
793 (*mess) >> status;
794
795 delete mess;
796
797 return n; // number of bytes read (2 * sizeof(Int_t)
798}
799
800////////////////////////////////////////////////////////////////////////////////
801/// Receive a TMessage object. The user must delete the TMessage object.
802/// Returns length of message in bytes (can be 0 if other side of connection
803/// is closed) or -1 in case of error or -4 in case a non-blocking socket
804/// would block (i.e. there is nothing to be read) or -5 if pipe broken
805/// or reset by peer (EPIPE || ECONNRESET). In those case mess == nullptr.
806
808{
810
811 if (!IsValid()) {
812 mess = nullptr;
813 return -1;
814 }
815
816 Int_t n;
817 while (1) {
819 UInt_t len;
820 if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
821 if (n == 0 || n == -5) {
822 // Connection closed, reset or broken
824 }
825 mess = nullptr;
826 return n;
827 }
828 len = net2host(len); //from network to host byte order
829
830 if (len > (std::numeric_limits<decltype(len)>::max() - sizeof(decltype(len)))) {
831 Error("Recv", "Buffer length is %u and %u+sizeof(UInt_t) cannot be represented as an UInt_t.", len, len);
832 return -1;
833 }
834
836 char *buf = new char[len+sizeof(UInt_t)];
837 if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
838 if (n == 0 || n == -5) {
839 // Connection closed, reset or broken
841 }
842 delete [] buf;
843 mess = nullptr;
844 return n;
845 }
846
847 fBytesRecv += n + sizeof(UInt_t);
848 fgBytesRecv += n + sizeof(UInt_t);
849
850 // `buf` becomes owned by the TMessage.
851 mess = new TMessage(buf, len+sizeof(UInt_t));
852
853 // receive any streamer infos
854 bool streamerInfoReceived = RecvStreamerInfos(mess);
855 if (streamerInfoReceived) {
856 // do another loop. No need to delete `mess` because RecvStreamerInfos already did it.
857 continue;
858 }
859
860 // receive any process ids
861 bool processIdReceived = RecvProcessIDs(mess);
862 if (processIdReceived) {
863 // do another loop. No need to delete `mess` because RecvProcessIDs already did it.
864 continue;
865 }
866
867 break;
868 }
869
870 if (mess->What() & kMESS_ACK) {
872 const char ok[2] = { 'o', 'k' };
873 Int_t n2 = 0;
874 if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
875 if (n2 == -5) {
876 // Connection reset or broken
878 }
879 delete mess;
880 mess = nullptr;
881 return n2;
882 }
883 mess->SetWhat(mess->What() & ~kMESS_ACK);
884
885 fBytesSent += 2;
886 fgBytesSent += 2;
887 }
888
889 Touch(); // update usage timestamp
890
891 return n;
892}
893
894////////////////////////////////////////////////////////////////////////////////
895/// Receive a raw buffer of specified length bytes. Using option kPeek
896/// one can peek at incoming data. Returns number of received bytes.
897/// Returns -1 in case of error. In case of opt == kOob: -2 means
898/// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
899/// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
900/// peer (EPIPE || ECONNRESET).
901
903{
905
906 if (!IsValid()) return -1;
907 if (length == 0) return 0;
908
910 Int_t n;
911 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
912 if (n == 0 || n == -5) {
913 // Connection closed, reset or broken
915 }
916 return n;
917 }
918
919 fBytesRecv += n;
920 fgBytesRecv += n;
921
922 Touch(); // update usage timestamp
923
924 return n;
925}
926
927////////////////////////////////////////////////////////////////////////////////
928/// Receive a message containing streamer infos. In case the message contains
929/// streamer infos they are imported, the message will be deleted and the
930/// method returns kTRUE.
931
933{
934 if (mess->What() == kMESS_STREAMERINFO) {
935 TList *list = (TList*)mess->ReadObject(TList::Class());
936 TIter next(list);
937 TStreamerInfo *info;
938 TObjLink *lnk = list->FirstLink();
939 // First call BuildCheck for regular class
940 while (lnk) {
941 info = (TStreamerInfo*)lnk->GetObject();
942 TObject *element = info->GetElements()->UncheckedAt(0);
943 Bool_t isstl = element && strcmp("This",element->GetName())==0;
944 if (!isstl) {
945 info->BuildCheck();
946 if (gDebug > 0)
947 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
948 info->GetName(), info->GetClassVersion());
949 }
950 lnk = lnk->Next();
951 }
952 // Then call BuildCheck for stl class
953 lnk = list->FirstLink();
954 while (lnk) {
955 info = (TStreamerInfo*)lnk->GetObject();
956 TObject *element = info->GetElements()->UncheckedAt(0);
957 Bool_t isstl = element && strcmp("This",element->GetName())==0;
958 if (isstl) {
959 info->BuildCheck();
960 if (gDebug > 0)
961 Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
962 info->GetName(), info->GetClassVersion());
963 }
964 lnk = lnk->Next();
965 }
966 delete list;
967 delete mess;
968
969 return kTRUE;
970 }
971 return kFALSE;
972}
973
974////////////////////////////////////////////////////////////////////////////////
975/// Receive a message containing process ids. In case the message contains
976/// process ids they are imported, the message will be deleted and the
977/// method returns kTRUE.
978
980{
981 if (mess->What() == kMESS_PROCESSID) {
982 TList *list = (TList*)mess->ReadObject(TList::Class());
983 TIter next(list);
984 TProcessID *pid;
985 while ((pid = (TProcessID*)next())) {
986 // check that a similar pid is not already registered in fgPIDs
987 TObjArray *pidslist = TProcessID::GetPIDs();
988 TIter nextpid(pidslist);
989 TProcessID *p;
990 while ((p = (TProcessID*)nextpid())) {
991 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
992 delete pid;
993 pid = 0;
994 break;
995 }
996 }
997 if (pid) {
998 if (gDebug > 0)
999 Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
1000 pid->IncrementCount();
1001 pidslist->Add(pid);
1002 Int_t ind = pidslist->IndexOf(pid);
1003 pid->SetUniqueID((UInt_t)ind);
1004 }
1005 }
1006 delete list;
1007 delete mess;
1008
1009 return kTRUE;
1010 }
1011 return kFALSE;
1012}
1013
1014////////////////////////////////////////////////////////////////////////////////
1015/// Set socket options.
1016
1018{
1019 if (!IsValid()) return -1;
1020
1021 return gSystem->SetSockOpt(fSocket, opt, val);
1022}
1023
1024////////////////////////////////////////////////////////////////////////////////
1025/// Get socket options. Returns -1 in case of error.
1026
1028{
1029 if (!IsValid()) return -1;
1030
1031 return gSystem->GetSockOpt(fSocket, opt, &val);
1032}
1033
1034////////////////////////////////////////////////////////////////////////////////
1035/// Returns error code. Meaning depends on context where it is called.
1036/// If no error condition returns 0 else a value < 0.
1037/// For example see TServerSocket ctor.
1038
1040{
1041 if (!IsValid())
1042 return fSocket;
1043
1044 return 0;
1045}
1046
1047////////////////////////////////////////////////////////////////////////////////
1048/// See comments for function SetCompressionSettings
1049
1051{
1052 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1053 if (fCompress < 0) {
1055 } else {
1056 int level = fCompress % 100;
1057 fCompress = 100 * algorithm + level;
1058 }
1059}
1060
1061////////////////////////////////////////////////////////////////////////////////
1062/// See comments for function SetCompressionSettings
1063
1065{
1066 if (level < 0) level = 0;
1067 if (level > 99) level = 99;
1068 if (fCompress < 0) {
1069 // if the algorithm is not defined yet use 0 as a default
1070 fCompress = level;
1071 } else {
1072 int algorithm = fCompress / 100;
1073 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1074 fCompress = 100 * algorithm + level;
1075 }
1076}
1077
1078////////////////////////////////////////////////////////////////////////////////
1079/// Used to specify the compression level and algorithm:
1080/// settings = 100 * algorithm + level
1081///
1082/// level = 0, objects written to this file will not be compressed.
1083/// level = 1, minimal compression level but fast.
1084/// ....
1085/// level = 9, maximal compression level but slower and might use more memory.
1086/// (For the currently supported algorithms, the maximum level is 9)
1087/// If compress is negative it indicates the compression level is not set yet.
1088///
1089/// The enumeration ROOT::RCompressionSetting::EAlgorithm associates each
1090/// algorithm with a number. There is a utility function to help
1091/// to set the value of the argument. For example,
1092/// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1093/// will build an integer which will set the compression to use
1094/// the LZMA algorithm and compression level 1. These are defined
1095/// in the header file Compression.h.
1096///
1097/// Note that the compression settings may be changed at any time.
1098/// The new compression settings will only apply to branches created
1099/// or attached after the setting is changed and other objects written
1100/// after the setting is changed.
1101
1103{
1104 fCompress = settings;
1105}
1106
1107////////////////////////////////////////////////////////////////////////////////
1108/// Static method returning supported client protocol.
1109
1114
1115////////////////////////////////////////////////////////////////////////////////
1116/// Print error string depending on error code.
1117
1118void TSocket::NetError(const char *where, Int_t err)
1119{
1120 // Make sure it is in range
1121 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1122
1123 if (gDebug > 0)
1124 ::Error(where, "%s", gRootdErrStr[err]);
1125}
1126
1127////////////////////////////////////////////////////////////////////////////////
1128/// Get total number of bytes sent via all sockets.
1129
1134
1135////////////////////////////////////////////////////////////////////////////////
1136/// Get total number of bytes received via all sockets.
1137
UShort_t net2host(UShort_t x)
Definition Bytes.h:561
@ kMESS_STRING
@ kMESS_ACK
@ kMESS_PROCESSID
@ kMESS_STREAMERINFO
R__EXTERN const char * gRootdErrStr[]
Definition NetErrors.h:72
@ kErrError
Definition NetErrors.h:69
#define SafeDelete(p)
Definition RConfig.hxx:525
int Int_t
Signed integer 4 bytes (int).
Definition RtypesCore.h:59
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
Definition RtypesCore.h:60
long Long_t
Signed long integer 4 bytes (long). Size depends on architecture.
Definition RtypesCore.h:68
bool Bool_t
Boolean (0=false, 1=true) (bool).
Definition RtypesCore.h:77
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
unsigned long long ULong64_t
Portable unsigned long integer 8 bytes.
Definition RtypesCore.h:84
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
const char Option_t
Option string (const char).
Definition RtypesCore.h:80
#define R__ASSERT(e)
Checks condition e and reports a fatal error if it's false.
Definition TError.h:125
Double_t err
Int_t gDebug
Definition TROOT.cxx:777
#define gROOT
Definition TROOT.h:417
char * Compress(const char *str)
Remove all blanks from the string str.
Definition TString.cxx:2579
ESockOptions
Definition TSystem.h:229
ESendRecvOptions
Definition TSystem.h:242
externTSystem * gSystem
Definition TSystem.h:582
TObject * ReadObject(const TClass *cl) override
Read object from I/O buffer.
void WriteString(const char *s) override
Write string to I/O buffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void WriteObject(const TObject *obj, Bool_t cacheReuse=kTRUE) override
Write object to I/O buffer.
Int_t BufferSize() const
Definition TBuffer.h:98
Bool_t IsReading() const
Definition TBuffer.h:86
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
virtual Int_t GetEntries() const
This class represents an Internet Protocol (IP) address.
const char * GetHostName() const
A doubly linked list.
Definition TList.h:38
static TClass * Class()
void Clear(Option_t *option="") override
Remove all objects from the list.
Definition TList.cxx:532
void Add(TObject *obj) override
Definition TList.h:81
UInt_t What() const
Definition TMessage.h:77
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:208
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TMessage.h:58
char * CompBuffer() const
Definition TMessage.h:91
Int_t GetCompressionLevel() const
Definition TMessage.h:108
TList * fInfos
Definition TMessage.h:41
Int_t CompLength() const
Definition TMessage.h:92
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:228
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:173
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
const char * GetTitle() const override
Returns title of object.
Definition TNamed.h:50
TNamed()
Definition TNamed.h:38
TString fName
Definition TNamed.h:32
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition TNamed.cxx:149
An array of TObjects.
Definition TObjArray.h:31
Int_t IndexOf(const TObject *obj) const override
Int_t GetEntries() const override
Return the number of objects in array (i.e.
TObject * At(Int_t idx) const override
Definition TObjArray.h:170
TObject * UncheckedAt(Int_t i) const
Definition TObjArray.h:90
void Add(TObject *obj) override
Definition TObjArray.h:68
Collectable string class.
Definition TObjString.h:28
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:462
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:480
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:1084
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:888
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1098
virtual void SetUniqueID(UInt_t uid)
Set the unique object id.
Definition TObject.cxx:899
TObject()
TObject constructor.
Definition TObject.h:259
void ResetBit(UInt_t f)
Definition TObject.h:203
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:1072
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
Int_t IncrementCount()
Increase the reference count to this object.
static TObjArray * GetPIDs()
static: returns array of TProcessIDs
TInetAddress fAddress
Definition TSocket.h:63
Int_t fCompress
Definition TSocket.h:66
virtual Int_t SetOption(ESockOptions opt, Int_t val)
Set socket options.
Definition TSocket.cxx:1017
Int_t fSocket
Definition TSocket.h:71
Int_t GetErrorCode() const
Returns error code.
Definition TSocket.cxx:1039
TVirtualMutex * fLastUsageMtx
Definition TSocket.h:77
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
See comments for function SetCompressionSettings.
Definition TSocket.cxx:1064
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition TSocket.cxx:638
@ kInvalidStillInList
Definition TSocket.h:61
@ kInvalid
Definition TSocket.h:57
TString fUrl
Definition TSocket.h:73
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
See comments for function SetCompressionSettings.
Definition TSocket.cxx:1050
TSocket()
Definition TSocket.h:85
static ULong64_t GetSocketBytesSent()
Get total number of bytes sent via all sockets.
Definition TSocket.cxx:1130
TString fService
Definition TSocket.h:69
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition TSocket.cxx:932
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:807
TList * fUUIDs
Definition TSocket.h:75
static Int_t GetClientProtocol()
Static method returning supported client protocol.
Definition TSocket.cxx:1110
TBits fBitsInfo
Definition TSocket.h:74
TInetAddress fLocalAddress
Definition TSocket.h:67
static ULong64_t fgBytesRecv
Definition TSocket.h:80
@ kBrokenConn
Definition TSocket.h:45
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:378
void MarkBrokenConnection()
Close the socket and mark as due to a broken connection.
Definition TSocket.cxx:360
void Touch()
Definition TSocket.h:155
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition TSocket.cxx:979
Int_t GetCompressionLevel() const
Definition TSocket.h:174
virtual Int_t RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition TSocket.cxx:902
static ULong64_t fgBytesSent
Definition TSocket.h:81
virtual Int_t SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt=kDefault)
Send a raw buffer of specified length.
Definition TSocket.cxx:609
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition TSocket.cxx:673
static Int_t fgClientProtocol
Definition TSocket.h:83
virtual TInetAddress GetLocalInetAddress()
Return internet address of local host to which the socket is bound.
Definition TSocket.cxx:398
virtual Int_t Select(Int_t interest=kRead, Long_t timeout=-1)
Waits for this socket to change status.
Definition TSocket.cxx:432
virtual Int_t GetLocalPort()
Return the local port # to which the socket is bound.
Definition TSocket.cxx:412
Int_t fTcpWindowSize
Definition TSocket.h:72
Option_t * GetOption() const override
Definition TSocket.h:99
@ kSOCKD
Definition TSocket.h:48
@ kROOTD
Definition TSocket.h:48
static void NetError(const char *where, Int_t error)
Print error string depending on error code.
Definition TSocket.cxx:1118
EServiceType fServType
Definition TSocket.h:70
virtual Int_t SendObject(const TObject *obj, Int_t kind=kMESS_OBJECT)
Send an object.
Definition TSocket.cxx:589
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
Definition TSocket.cxx:1102
UInt_t fBytesSent
Definition TSocket.h:65
Int_t fRemoteProtocol
Definition TSocket.h:68
UInt_t fBytesRecv
Definition TSocket.h:64
virtual Bool_t IsValid() const
Definition TSocket.h:131
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:511
static ULong64_t GetSocketBytesRecv()
Get total number of bytes received via all sockets.
Definition TSocket.cxx:1138
Describes a persistent version of a class.
Int_t GetClassVersion() const override
Int_t GetNumber() const override
TObjArray * GetElements() const override
void BuildCheck(TFile *file=nullptr, Bool_t load=kTRUE) override
Check if built and consistent with the class dictionary.
Basic string class.
Definition TString.h:138
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:286
This class represents a WWW compatible URL.
Definition TUrl.h:33
const Int_t n
Definition legend1.C:16
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression).
Definition Compression.h:72