Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TXSocket.cxx
Go to the documentation of this file.
1// @(#)root/proofx:$Id$
2// Author: Gerardo Ganis 12/12/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TXSocket
13\ingroup proofx
14
15High level handler of connections to XProofD.
16See TSocket for details.
17
18*/
19
20#include "MessageTypes.h"
21#include "TEnv.h"
22#include "TError.h"
23#include "TException.h"
24#include "TMonitor.h"
25#include "TObjString.h"
26#include "TProof.h"
27#include "TSlave.h"
28#include "TRegexp.h"
29#include "TROOT.h"
30#include "TUrl.h"
31#include "TXHandler.h"
32#include "TXSocket.h"
33#include "XProofProtocol.h"
34
35#include "XrdProofConn.h"
36
42
43#ifndef WIN32
44#include <sys/socket.h>
45#else
46#include <Winsock2.h>
47#endif
48
49
50#include "XpdSysError.h"
51#include "XpdSysLogger.h"
52
53// ---- Tracing utils ----------------------------------------------------------
54#include "XrdProofdTrace.h"
55XrdOucTrace *XrdProofdTrace = 0;
57static XrdSysError eDest(0, "Proofx");
58
59#ifdef WIN32
62#endif
63
64//______________________________________________________________________________
65
66//---- error handling ----------------------------------------------------------
67
68////////////////////////////////////////////////////////////////////////////////
69/// Interface to ErrorHandler (protected).
70
71void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
72{
73 ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
74}
75
76//----- Ping handler -----------------------------------------------------------
77////////////////////////////////////////////////////////////////////////////////
78
81public:
83 : TFileHandler(fd, 1) { fSocket = s; }
84 Bool_t Notify();
85 Bool_t ReadNotify() { return Notify(); }
86};
87
88////////////////////////////////////////////////////////////////////////////////
89/// Ping the socket
90
92{
93 fSocket->Ping("ping handler");
94
95 return kTRUE;
96}
97
98// Env variables init flag
100
101// Static variables for input notification
102TXSockPipe TXSocket::fgPipe; // Pipe for input monitoring
103TString TXSocket::fgLoc = "undef"; // Location string
104
105// Static buffer manager
106std::mutex TXSocket::fgSMtx; // To protect spare list
107std::list<TXSockBuf *> TXSocket::fgSQue; // list of spare buffers
108Long64_t TXSockBuf::fgBuffMem = 0; // Total allocated memory
109Long64_t TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 MB]
110
111////////////////////////////////////////////////////////////////////////////////
112/// Constructor
113/// Open the connection to a remote XrdProofd instance and start a PROOF
114/// session.
115/// The mode 'm' indicates the role of this connection:
116/// 'a' Administrator; used by an XPD to contact the head XPD
117/// 'i' Internal; used by a TXProofServ to call back its creator
118/// (see XrdProofUnixConn)
119/// 'C' PROOF manager: open connection only (do not start a session)
120/// 'M' Client creating a top master
121/// 'A' Client attaching to top master
122/// 'm' Top master creating a submaster
123/// 's' Master creating a slave
124/// The buffer 'logbuf' is a null terminated string to be sent over at
125/// login.
126
127TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
128 const char *logbuf, Int_t loglevel, TXHandler *handler)
129 : TSocket(), fMode(m), fLogLevel(loglevel),
130 fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131 fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
132{
133 fUrl = url;
134 // Enable tracing in the XrdProof client. if not done already
135 eDest.logger(&eLogger);
136 if (!XrdProofdTrace)
137 XrdProofdTrace = new XrdOucTrace(&eDest);
138
139 // Init envs the first time
140 if (!fgInitDone)
141 InitEnvs();
142
143 // Async queue related stuff
144 fAQue.clear();
145
146 // Interrupts queue related stuff
147 fILev = -1;
149
150 // Init some variables
151 fByteLeft = 0;
152 fByteCur = 0;
153 fBufCur = 0;
154 fServType = kPROOFD; // for consistency
155 fTcpWindowSize = -1;
156 fRemoteProtocol = -1;
157 // By default forward directly to end-point
159 fSessionID = (fMode == 'C') ? -1 : psid;
160 fSocket = -1;
161
162 // This is used by external code to create a link between this object
163 // and another one
164 fReference = 0;
165
166 // The global pipe
167 if (!fgPipe.IsValid()) {
168 Error("TXSocket", "internal pipe is invalid");
169 return;
170 }
171
172 // Some initial values
173 TUrl u(url);
175 u.SetProtocol("proof", kTRUE);
176 fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
177
178 // Set the asynchronous handler
179 fHandler = handler;
180
181 if (url) {
182
183 // Create connection (for managers the type of the connection is the same
184 // as for top masters)
185 char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
186 fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
187 if (!fConn || !(fConn->IsValid())) {
189 if (gDebug > 0)
190 Error("TXSocket", "fatal error occurred while opening a connection"
191 " to server [%s]: %s", url, fConn->GetLastErr());
192 return;
193 }
194
195 // Fill some info
196 fUser = fConn->fUser.c_str();
197 fHost = fConn->fHost.c_str();
198 fPort = fConn->fPort;
199
200 // Create new proofserv if not client manager or administrator or internal mode
201 if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A'|| fMode == 'L') {
202 // We attach or create
203 if (!Create()) {
204 // Failure
205 Error("TXSocket", "create or attach failed (%s)",
206 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
207 Close();
208 return;
209 }
210 }
211
212 // Fill some other info available if Create is successful
213 if (fMode == 'C') {
216 }
217
218 // Also in the base class
219 fUrl = fConn->fUrl.GetUrl().c_str();
222
223 // This is needed for the reader thread to signal an interrupt
224 fPid = gSystem->GetPid();
225 }
226}
227
228////////////////////////////////////////////////////////////////////////////////
229/// Destructor
230
232{
233 // Disconnect from remote server (the connection manager is
234 // responsible of the underlying physical connection, so we do not
235 // force its closing)
236 Close();
237}
238
239////////////////////////////////////////////////////////////////////////////////
240/// Set location string
241
242void TXSocket::SetLocation(const char *loc)
243{
244 if (loc) {
245 fgLoc = loc;
246 fgPipe.SetLoc(loc);
247 } else {
248 fgLoc = "";
249 fgPipe.SetLoc("");
250 }
251}
252
253////////////////////////////////////////////////////////////////////////////////
254/// Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
255
257{
258 if (id < 0 && fConn)
259 fConn->SetAsync(0);
260 fSessionID = id;
261}
262
263////////////////////////////////////////////////////////////////////////////////
264/// Disconnect a session. Use opt= "S" or "s" to
265/// shutdown remote session.
266/// Default is opt = "".
267
269{
270 // Make sure we are connected
271 if (!IsValid()) {
272 if (gDebug > 0)
273 Info("DisconnectSession","not connected: nothing to do");
274 return;
275 }
276
277 Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
278 Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
279
280 if (id > -1 || all) {
281 // Prepare request
282 XPClientRequest Request;
283 memset(&Request, 0, sizeof(Request) );
284 fConn->SetSID(Request.header.streamid);
285 if (shutdown)
286 Request.proof.requestid = kXP_destroy;
287 else
288 Request.proof.requestid = kXP_detach;
289 Request.proof.sid = id;
290
291 // Send request
292 XrdClientMessage *xrsp =
293 fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
294
295 // Print error msg, if any
296 if (!xrsp && fConn->GetLastErr())
297 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
298
299 // Cleanup
300 SafeDelete(xrsp);
301 }
302}
303
304////////////////////////////////////////////////////////////////////////////////
305/// Close connection. Available options are (case insensitive)
306/// 'P' force closing of the underlying physical connection
307/// 'S' shutdown remote session, is any
308/// A session ID can be given using #...# signature, e.g. "#1#".
309/// Default is opt = "".
310
312{
313 Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
314 if (fAsynProc.Wait(to*1000) != 0)
315 Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
316
317 // Remove any reference in the global pipe and ready-sock queue
319
320 // Make sure we have a connection
321 if (!fConn) {
322 if (gDebug > 0)
323 Info("Close","no connection: nothing to do");
324 fAsynProc.Post();
325 return;
326 }
327
328 // Disconnect the asynchronous requests handler
329 fConn->SetAsync(0);
330
331 // If we are connected we disconnect
332 if (IsValid()) {
333
334 // Parse options
335 TString o(opt);
336 Int_t sessID = fSessionID;
337 if (o.Index("#") != kNPOS) {
338 o.Remove(0,o.Index("#")+1);
339 if (o.Index("#") != kNPOS) {
340 o.Remove(o.Index("#"));
341 sessID = o.IsDigit() ? o.Atoi() : sessID;
342 }
343 }
344
345 if (sessID > -1) {
346 // Warn the remote session, if any (after destroy the session is gone)
347 DisconnectSession(sessID, opt);
348 } else {
349 // We are the manager: close underlying connection
350 fConn->Close(opt);
351 }
352 }
353
354 // Delete the connection module
356
357 // Post semaphore
358 fAsynProc.Post();
359}
360
361////////////////////////////////////////////////////////////////////////////////
362/// We are here if an unsolicited response comes from a logical conn
363/// The response comes in the form of an XrdClientMessage *, that must NOT be
364/// destroyed after processing. It is destroyed by the first sender.
365/// Remember that we are in a separate thread, since unsolicited
366/// responses are asynchronous by nature.
367
370{
372
373 // If we are closing we will not do anything
375 if (!semg.IsValid()) {
376 Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
377 return kUNSOL_CONTINUE;
378 }
379
380 if (!m) {
381 if (gDebug > 2)
382 Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
383 // Some one is perhaps interested in empty messages
384 return kUNSOL_CONTINUE;
385 } else {
386 if (gDebug > 2)
387 Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
388 this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
389 }
390
391 // Error notification
392 if (m->IsError()) {
393 if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
394 if (gDebug > 0)
395 Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
396 XHandleErr_t herr = {1, 0};
397 if (!fHandler || fHandler->HandleError((const void *)&herr)) {
398 if (gDebug > 0)
399 Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
400 // Avoid to contact the server any more
401 fSessionID = -1;
402 } else {
403 // Connection still usable: update usage timestamp
404 Touch();
405 }
406 } else {
407 // Time out
408 if (gDebug > 2)
409 Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
410 }
411 // Propagate the message to other possible handlers
412 return kUNSOL_CONTINUE;
413 }
414
415 // From now on make sure is for us (but only if not during setup, i.e. fConn == 0; otherwise
416 // we may miss some important server message)
417 if (fConn && !m->MatchStreamid(fConn->fStreamid)) {
418 if (gDebug > 1)
419 Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
420 return kUNSOL_CONTINUE;
421 }
422
423 // Local processing ...
424 Int_t len = 0;
425 if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
426 Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
428 return rc;
429 }
430
431 // Activity on the line: update usage timestamp
432 Touch();
433
434 // The first 4 bytes contain the action code
435 kXR_int32 acod = 0;
436 memcpy(&acod, m->GetData(), sizeof(kXR_int32));
437 if (acod > 10000)
438 Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
439 this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
440 //
441 // Update pointer to data
442 void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
443 len -= sizeof(kXR_int32);
444 if (gDebug > 1)
445 Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
446 this, acod, len, m->HeaderSID());
447
448 if (gDebug > 3)
450
451 // Case by case
452 kXR_int32 ilev = -1;
453 const char *lab = 0;
454
455 switch (acod) {
456 case kXPD_ping:
457 //
458 // Special interrupt
459 ilev = TProof::kPing;
460 lab = "kXPD_ping";
461 case kXPD_interrupt:
462 //
463 // Interrupt
464 lab = !lab ? "kXPD_interrupt" : lab;
465 { std::lock_guard<std::recursive_mutex> lock(fIMtx);
466 if (acod == kXPD_interrupt) {
467 memcpy(&ilev, pdata, sizeof(kXR_int32));
468 ilev = net2host(ilev);
469 // Update pointer to data
470 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
471 len -= sizeof(kXR_int32);
472 }
473 // The next 4 bytes contain the forwarding option
474 kXR_int32 ifw = 0;
475 if (len > 0) {
476 memcpy(&ifw, pdata, sizeof(kXR_int32));
477 ifw = net2host(ifw);
478 if (gDebug > 1)
479 Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
480 }
481 //
482 // Save the interrupt
483 fILev = ilev;
484 fIForward = (ifw == 1) ? kTRUE : kFALSE;
485
486 // Handle this input in this thread to avoid queuing on the
487 // main thread
488 XHandleIn_t hin = {acod, 0, 0, 0};
489 if (fHandler)
490 fHandler->HandleInput((const void *)&hin);
491 else
492 Error("ProcessUnsolicitedMsg","handler undefined");
493 }
494 break;
495 case kXPD_timer:
496 //
497 // Set shutdown timer
498 {
499 kXR_int32 opt = 1;
500 kXR_int32 delay = 0;
501 // The next 4 bytes contain the shutdown option
502 if (len > 0) {
503 memcpy(&opt, pdata, sizeof(kXR_int32));
504 opt = net2host(opt);
505 if (gDebug > 1)
506 Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
507 // Update pointer to data
508 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
509 len -= sizeof(kXR_int32);
510 }
511 // The next 4 bytes contain the delay
512 if (len > 0) {
513 memcpy(&delay, pdata, sizeof(kXR_int32));
514 delay = net2host(delay);
515 if (gDebug > 1)
516 Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
517 // Update pointer to data
518 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
519 len -= sizeof(kXR_int32);
520 }
521
522 // Handle this input in this thread to avoid queuing on the
523 // main thread
524 XHandleIn_t hin = {acod, opt, delay, 0};
525 if (fHandler)
526 fHandler->HandleInput((const void *)&hin);
527 else
528 Error("ProcessUnsolicitedMsg","handler undefined");
529 }
530 break;
531 case kXPD_inflate:
532 //
533 // Set inflate factor
534 {
535 kXR_int32 inflate = 1000;
536 if (len > 0) {
537 memcpy(&inflate, pdata, sizeof(kXR_int32));
538 inflate = net2host(inflate);
539 if (gDebug > 1)
540 Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
541 // Update pointer to data
542 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
543 len -= sizeof(kXR_int32);
544 }
545 // Handle this input in this thread to avoid queuing on the
546 // main thread
547 XHandleIn_t hin = {acod, inflate, 0, 0};
548 if (fHandler)
549 fHandler->HandleInput((const void *)&hin);
550 else
551 Error("ProcessUnsolicitedMsg","handler undefined");
552 }
553 break;
554 case kXPD_priority:
555 //
556 // Broadcast group priority
557 {
558 kXR_int32 priority = -1;
559 if (len > 0) {
560 memcpy(&priority, pdata, sizeof(kXR_int32));
561 priority = net2host(priority);
562 if (gDebug > 1)
563 Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
564 // Update pointer to data
565 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
566 len -= sizeof(kXR_int32);
567 }
568 // Handle this input in this thread to avoid queuing on the
569 // main thread
570 XHandleIn_t hin = {acod, priority, 0, 0};
571 if (fHandler)
572 fHandler->HandleInput((const void *)&hin);
573 else
574 Error("ProcessUnsolicitedMsg","handler undefined");
575 }
576 break;
577 case kXPD_flush:
578 //
579 // Flush request
580 {
581 // Handle this input in this thread to avoid queuing on the
582 // main thread
583 XHandleIn_t hin = {acod, 0, 0, 0};
584 if (fHandler)
585 fHandler->HandleInput((const void *)&hin);
586 else
587 Error("ProcessUnsolicitedMsg","handler undefined");
588 }
589 break;
590 case kXPD_urgent:
591 //
592 // Set shutdown timer
593 {
594 // The next 4 bytes contain the urgent msg type
595 kXR_int32 type = -1;
596 if (len > 0) {
597 memcpy(&type, pdata, sizeof(kXR_int32));
598 type = net2host(type);
599 if (gDebug > 1)
600 Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
601 // Update pointer to data
602 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
603 len -= sizeof(kXR_int32);
604 }
605 // The next 4 bytes contain the first info container
606 kXR_int32 int1 = -1;
607 if (len > 0) {
608 memcpy(&int1, pdata, sizeof(kXR_int32));
609 int1 = net2host(int1);
610 if (gDebug > 1)
611 Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
612 // Update pointer to data
613 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
614 len -= sizeof(kXR_int32);
615 }
616 // The next 4 bytes contain the second info container
617 kXR_int32 int2 = -1;
618 if (len > 0) {
619 memcpy(&int2, pdata, sizeof(kXR_int32));
620 int2 = net2host(int2);
621 if (gDebug > 1)
622 Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
623 // Update pointer to data
624 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
625 len -= sizeof(kXR_int32);
626 }
627
628 // Handle this input in this thread to avoid queuing on the
629 // main thread
630 XHandleIn_t hin = {acod, type, int1, int2};
631 if (fHandler)
632 fHandler->HandleInput((const void *)&hin);
633 else
634 Error("ProcessUnsolicitedMsg","handler undefined");
635 }
636 break;
637 case kXPD_msg:
638 //
639 // Data message
640 { std::lock_guard<std::recursive_mutex> lock(fAMtx);
641
642 // Get a spare buffer
644 if (!b) {
645 Error("ProcessUnsolicitedMsg","could allocate spare buffer");
646 return rc;
647 }
648 memcpy(b->fBuf, pdata, len);
649 b->fLen = len;
650
651 // Update counters
652 fBytesRecv += len;
653
654 // Produce the message
655 fAQue.push_back(b);
656
657 // Post the global pipe
658 fgPipe.Post(this);
659
660 // Signal it and release the mutex
661 if (gDebug > 2)
662 Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
663 this, GetTitle(), &fASem, len);
664 fASem.Post();
665 }
666
667 break;
668 case kXPD_feedback:
669 Info("ProcessUnsolicitedMsg",
670 "kXPD_feedback treatment not yet implemented");
671 break;
672 case kXPD_srvmsg:
673 //
674 // Service message
675 {
676 // The next 4 bytes may contain a flag to control the way the message is displayed
677 kXR_int32 opt = 0;
678 memcpy(&opt, pdata, sizeof(kXR_int32));
679 opt = net2host(opt);
680 if (opt >= 0 && opt <= 4) {
681 // Update pointer to data
682 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
683 len -= sizeof(kXR_int32);
684 } else {
685 opt = 1;
686 }
687
688 if (opt == 0) {
689 // One line
690 Printf("| %.*s", len, (char *)pdata);
691 } else if (opt == 2) {
692 // Raw displaying
693 Printf("%.*s", len, (char *)pdata);
694 } else if (opt == 3) {
695 // Incremental displaying
696 fprintf(stderr, "%.*s", len, (char *)pdata);
697 } else if (opt == 4) {
698 // Rewind
699 fprintf(stderr, "%.*s\r", len, (char *)pdata);
700 } else {
701 // A small header
702 Printf(" ");
703 Printf("| Message from server:");
704 Printf("| %.*s", len, (char *)pdata);
705 }
706 }
707 break;
708 case kXPD_errmsg:
709 //
710 // Error condition with message
711 Printf("\n\n");
712 Printf("| Error condition occured: message from server:");
713 Printf("| %.*s", len, (char *)pdata);
714 Printf("\n");
715 // Handle error
716 if (fHandler)
718 else
719 Error("ProcessUnsolicitedMsg","handler undefined");
720 break;
721 case kXPD_msgsid:
722 //
723 // Data message
724 { std::lock_guard<std::recursive_mutex> lock(fAMtx);
725
726 // The next 4 bytes contain the sessiond id
727 kXR_int32 cid = 0;
728 memcpy(&cid, pdata, sizeof(kXR_int32));
729 cid = net2host(cid);
730
731 if (gDebug > 1)
732 Info("ProcessUnsolicitedMsg","found cid: %d", cid);
733
734 // Update pointer to data
735 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
736 len -= sizeof(kXR_int32);
737
738 // Get a spare buffer
740 if (!b) {
741 Error("ProcessUnsolicitedMsg","could allocate spare buffer");
742 return rc;
743 }
744 memcpy(b->fBuf, pdata, len);
745 b->fLen = len;
746
747 // Set the sid
748 b->fCid = cid;
749
750 // Update counters
751 fBytesRecv += len;
752
753 // Produce the message
754 fAQue.push_back(b);
755
756 // Post the global pipe
757 fgPipe.Post(this);
758
759 // Signal it and release the mutex
760 if (gDebug > 2)
761 Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
762 this, cid, &fASem, len);
763 fASem.Post();
764 }
765
766 break;
767 case kXPD_wrkmortem:
768 //
769 // A worker died
770 { TString what = TString::Format("%.*s", len, (char *)pdata);
771 if (what.BeginsWith("idle-timeout")) {
772 // Notify the idle timeout
774 } else {
775 Printf(" ");
776 Printf("| %s", what.Data());
777 // Handle error
778 if (fHandler)
780 else
781 Error("ProcessUnsolicitedMsg","handler undefined");
782 }
783 }
784 break;
785
786 case kXPD_touch:
787 //
788 // Request for remote touch: post a message to do that
790 break;
791 case kXPD_resume:
792 //
793 // process the next query (in the TXProofServ)
795 break;
796 case kXPD_clusterinfo:
797 //
798 // Broadcast cluster information
799 {
800 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
801 if (len > 0) {
802 // Total sessions
803 memcpy(&nsess, pdata, sizeof(kXR_int32));
804 nsess = net2host(nsess);
805 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
806 len -= sizeof(kXR_int32);
807 // Active sessions
808 memcpy(&nacti, pdata, sizeof(kXR_int32));
809 nacti = net2host(nacti);
810 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
811 len -= sizeof(kXR_int32);
812 // Effective sessions
813 memcpy(&neffs, pdata, sizeof(kXR_int32));
814 neffs = net2host(neffs);
815 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
816 len -= sizeof(kXR_int32);
817 }
818 if (gDebug > 1)
819 Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
820 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
821 // Handle this input in this thread to avoid queuing on the
822 // main thread
823 XHandleIn_t hin = {acod, nsess, nacti, neffs};
824 if (fHandler)
825 fHandler->HandleInput((const void *)&hin);
826 else
827 Error("ProcessUnsolicitedMsg","handler undefined");
828 }
829 break;
830 default:
831 Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
832 this, acod, GetTitle());
834 break;
835 }
836
837 // We are done
838 return rc;
839}
840
841////////////////////////////////////////////////////////////////////////////////
842/// Post a message of type 'type' into the read messages queue.
843/// If 'msg' is defined it is also added as TString.
844/// This is used, for example, with kPROOF_FATAL to force the main thread
845/// to mark this socket as bad, avoiding race condition when a worker
846/// dies while in processing state.
847
848void TXSocket::PostMsg(Int_t type, const char *msg)
849{
850 // Create the message
851 TMessage m(type);
852
853 // Add the string if any
854 if (msg && strlen(msg) > 0)
855 m << TString(msg);
856
857 // Write length in first word of buffer
858 m.SetLength();
859
860 // Get pointer to the message buffer
861 char *mbuf = m.Buffer();
862 Int_t mlen = m.Length();
863 if (m.CompBuffer()) {
864 mbuf = m.CompBuffer();
865 mlen = m.CompLength();
866 }
867
868 //
869 // Data message
870 std::lock_guard<std::recursive_mutex> lock(fAMtx);
871
872 // Get a spare buffer
873 TXSockBuf *b = PopUpSpare(mlen);
874 if (!b) {
875 Error("PostMsg", "could allocate spare buffer");
876 return;
877 }
878
879 // Fill the pipe buffer
880 memcpy(b->fBuf, mbuf, mlen);
881 b->fLen = mlen;
882
883 // Update counters
884 fBytesRecv += mlen;
885
886 // Produce the message
887 fAQue.push_back(b);
888
889 // Post the global pipe
890 fgPipe.Post(this);
891
892 // Signal it and release the mutex
893 if (gDebug > 0)
894 Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
895 this, type, &fASem, mlen);
896 fASem.Post();
897
898 // Done
899 return;
900}
901
902////////////////////////////////////////////////////////////////////////////////
903/// Wake up all threads waiting for at the semaphore (used by TXSlave)
904
906{
907 std::lock_guard<std::recursive_mutex> lock(fAMtx);
908
909 // Post semaphore to wake up anybody waiting; send as many posts as needed
910 while (fASem.TryWait() != 1)
911 fASem.Post();
912
913 return;
914}
915
916////////////////////////////////////////////////////////////////////////////////
917/// Getter for logical connection ID
918
920{
921 return (fConn ? fConn->GetLogConnID() : -1);
922}
923
924////////////////////////////////////////////////////////////////////////////////
925/// Getter for last error
926
928{
929 return (fConn ? fConn->GetOpenError() : -1);
930}
931
932////////////////////////////////////////////////////////////////////////////////
933/// Getter for server type
934
936{
937 return (fConn ? fConn->GetServType() : -1);
938}
939
940////////////////////////////////////////////////////////////////////////////////
941/// Getter for session ID
942
944{
945 return (fConn ? fConn->GetSessionID() : -1);
946}
947
948////////////////////////////////////////////////////////////////////////////////
949/// Getter for validity status
950
952{
953 return (fConn ? (fConn->IsValid()) : kFALSE);
954}
955
956////////////////////////////////////////////////////////////////////////////////
957/// Return kTRUE if the remote server is a 'proofd'
958
960{
962 return kTRUE;
963
964 // Failure
965 return kFALSE;
966}
967
968////////////////////////////////////////////////////////////////////////////////
969/// Get latest interrupt level and reset it; if the interrupt has to be
970/// propagated to lower stages forward will be kTRUE after the call
971
973{
974 if (gDebug > 2)
975 Info("GetInterrupt","%p: waiting to lock mutex", this);
976
977 std::lock_guard<std::recursive_mutex> lock(fIMtx);
978
979 // Reset values
980 Int_t ilev = -1;
981 forward = kFALSE;
982
983 // Check if filled
984 if (fILev == -1)
985 Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
986
987 // Fill output
988 ilev = fILev;
989 forward = fIForward;
990
991 // Reset values (we process it only once)
992 fILev = -1;
994
995 // Return what we got
996 return ilev;
997}
998
999////////////////////////////////////////////////////////////////////////////////
1000/// Flush the asynchronous queue.
1001/// Typically called when a kHardInterrupt is received.
1002/// Returns number of bytes in flushed buffers.
1003
1005{
1006 Int_t nf = 0;
1007 list<TXSockBuf *> splist;
1008 list<TXSockBuf *>::iterator i;
1009
1010 { std::lock_guard<std::recursive_mutex> lock(fAMtx);
1011
1012 // Must have something to flush
1013 if (fAQue.size() > 0) {
1014
1015 // Save size for later semaphore cleanup
1016 Int_t sz = fAQue.size();
1017 // get the highest interrupt level
1018 for (i = fAQue.begin(); i != fAQue.end();) {
1019 if (*i) {
1020 splist.push_back(*i);
1021 nf += (*i)->fLen;
1022 i = fAQue.erase(i);
1023 }
1024 }
1025
1026 // Reset the asynchronous queue
1027 while (sz--) {
1028 if (fASem.TryWait() == 1)
1029 Printf("Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
1030 }
1031 fAQue.clear();
1032 }
1033 }
1034
1035 // Move spares to the spare queue
1036 { std::lock_guard<std::mutex> lock(fgSMtx);
1037 if (splist.size() > 0) {
1038 for (i = splist.begin(); i != splist.end();) {
1039 fgSQue.push_back(*i);
1040 i = splist.erase(i);
1041 }
1042 }
1043 }
1044
1045 // We are done
1046 return nf;
1047}
1048
1049////////////////////////////////////////////////////////////////////////////////
1050/// This method sends a request for creation of (or attachment to) a remote
1051/// server application.
1052
1054{
1055 // Make sure we are connected
1056 if (!IsValid()) {
1057 if (gDebug > 0)
1058 Info("Create","not connected: nothing to do");
1059 return kFALSE;
1060 }
1061
1062 Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
1063
1064 while (retriesleft--) {
1065
1066 XPClientRequest reqhdr;
1067
1068 // We fill the header struct containing the request for login
1069 memset( &reqhdr, 0, sizeof(reqhdr));
1070 fConn->SetSID(reqhdr.header.streamid);
1071
1072 // This will be a kXP_attach or kXP_create request
1073 if (fMode == 'A' || attach) {
1074 reqhdr.header.requestid = kXP_attach;
1075 reqhdr.proof.sid = fSessionID;
1076 } else {
1077 reqhdr.header.requestid = kXP_create;
1078 }
1079
1080 // Send log level
1081 reqhdr.proof.int1 = fLogLevel;
1082
1083 // Send also the chosen alias
1084 const void *buf = (const void *)(fBuffer.Data());
1085 reqhdr.header.dlen = fBuffer.Length();
1086 if (gDebug >= 2)
1087 Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
1088
1089 // We call SendReq, the function devoted to sending commands.
1090 if (gDebug > 1)
1091 Info("Create", "creating session of server %s", fUrl.Data());
1092
1093 // server response header
1094 char *answData = 0;
1095 XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
1096 &answData, "TXSocket::Create", 0);
1097 struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
1098
1099 // If any, the URL the data pool entry point will be stored here
1100 fBuffer = "";
1101 if (xrsp) {
1102
1103 //
1104 // Pointer to data
1105 void *pdata = (void *)(xrsp->GetData());
1106 Int_t len = xrsp->DataLen();
1107
1108 if (len >= (Int_t)sizeof(kXR_int32)) {
1109 // The first 4 bytes contain the session ID
1110 kXR_int32 psid = 0;
1111 memcpy(&psid, pdata, sizeof(kXR_int32));
1112 fSessionID = net2host(psid);
1113 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1114 len -= sizeof(kXR_int32);
1115 } else {
1116 Error("Create","session ID is undefined!");
1117 fSessionID = -1;
1118 if (srvresp) free(srvresp);
1119 return kFALSE;
1120 }
1121
1122 if (len >= (Int_t)sizeof(kXR_int16)) {
1123 // The second 2 bytes contain the remote PROOF protocol version
1124 kXR_int16 dver = 0;
1125 memcpy(&dver, pdata, sizeof(kXR_int16));
1126 fRemoteProtocol = net2host(dver);
1127 pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1128 len -= sizeof(kXR_int16);
1129 } else {
1130 Warning("Create","protocol version of the remote PROOF undefined!");
1131 }
1132
1133 if (fRemoteProtocol == 0) {
1134 // We are dealing with an older server: the PROOF protocol is on 4 bytes
1135 len += sizeof(kXR_int16);
1136 kXR_int32 dver = 0;
1137 memcpy(&dver, pdata, sizeof(kXR_int32));
1138 fRemoteProtocol = net2host(dver);
1139 pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1140 len -= sizeof(kXR_int32);
1141 } else {
1142 if (len >= (Int_t)sizeof(kXR_int16)) {
1143 // The third 2 bytes contain the remote XrdProofdProtocol version
1144 kXR_int16 dver = 0;
1145 memcpy(&dver, pdata, sizeof(kXR_int16));
1147 pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1148 len -= sizeof(kXR_int16);
1149 } else {
1150 Warning("Create","version of the remote XrdProofdProtocol undefined!");
1151 }
1152 }
1153
1154 if (len > 0) {
1155 // From top masters, the url of the data pool
1156 char *url = new char[len+1];
1157 memcpy(url, pdata, len);
1158 url[len] = 0;
1159 fBuffer = url;
1160 delete[] url;
1161 }
1162
1163 // Cleanup
1164 SafeDelete(xrsp);
1165 if (srvresp) free(srvresp);
1166
1167 // Notify
1168 return kTRUE;
1169 } else {
1170 // Extract log file path, if any
1171 Ssiz_t ilog = kNPOS;
1172 if (retriesleft <= 0 && fConn->GetLastErr()) {
1174 if ((ilog = fBuffer.Index("|log:")) != kNPOS) fBuffer.Remove(0, ilog);
1175 }
1176 // If not free resources now, just give up
1178 // Avoid to contact the server any more
1179 fSessionID = -1;
1180 if (srvresp) free(srvresp);
1181 return kFALSE;
1182 } else {
1183 // Print error msg, if any
1184 if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr()) {
1185 TString emsg(fConn->GetLastErr());
1186 if ((ilog = emsg.Index("|log:")) != kNPOS) emsg.Remove(ilog);
1187 Printf("%s: %s", fHost.Data(), emsg.Data());
1188 }
1189 }
1190 }
1191
1192 if (gDebug > 0)
1193 Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
1194 if (retriesleft <= 0)
1195 Error("Create", "%d creation/attachment attempts failed: no attempts left",
1196 gEnv->GetValue("XProof.CreationRetries", 4));
1197
1198 if (srvresp) free(srvresp);
1199 } // Creation retries
1200
1201 // The session is invalid: reset the sessionID to invalid state (it was our protocol
1202 // number during creation
1203 fSessionID = -1;
1204
1205 // Notify failure
1206 Error("Create:",
1207 "problems creating or attaching to a remote server (%s)",
1208 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
1209 return kFALSE;
1210}
1211
1212////////////////////////////////////////////////////////////////////////////////
1213/// Send a raw buffer of specified length.
1214/// Use opt = kDontBlock to ask xproofd to push the message into the proofsrv.
1215/// (by default is appended to a queue waiting for a request from proofsrv).
1216/// Returns the number of bytes sent or -1 in case of error.
1217
1219{
1221
1222 // Options and request ID
1223 fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
1224 : (~kXPD_async & fSendOpt) ;
1225
1226 // Prepare request
1227 XPClientRequest Request;
1228 memset( &Request, 0, sizeof(Request) );
1229 fConn->SetSID(Request.header.streamid);
1230 Request.sendrcv.requestid = kXP_sendmsg;
1231 Request.sendrcv.sid = fSessionID;
1232 Request.sendrcv.opt = fSendOpt;
1233 Request.sendrcv.cid = GetClientID();
1234 Request.sendrcv.dlen = length;
1235 if (gDebug >= 2)
1236 Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
1237
1238 // Send request
1239 XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
1240
1241 if (xrsp) {
1242 // Prepare return info
1243 Int_t nsent = length;
1244
1245 // Update counters
1246 fBytesSent += length;
1247
1248 // Cleanup
1249 SafeDelete(xrsp);
1250
1251 // Success: update usage timestamp
1252 Touch();
1253
1254 // ok
1255 return nsent;
1256 } else {
1257 // Print error message, if any
1258 if (fConn->GetLastErr())
1259 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1260 else
1261 Printf("%s: error occured but no message from server", fHost.Data());
1262 }
1263
1264 // Failure notification (avoid using the handler: we may be exiting)
1265 Error("SendRaw", "%s: problems sending %d bytes to server",
1266 fHost.Data(), length);
1267 return -1;
1268}
1269
1270////////////////////////////////////////////////////////////////////////////////
1271/// Ping functionality: contact the server to check its vitality.
1272/// If external, the server waits for a reply from the server
1273/// Returns kTRUE if OK or kFALSE in case of error.
1274
1275Bool_t TXSocket::Ping(const char *ord)
1276{
1278
1279 if (gDebug > 0)
1280 Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
1281
1282 // Make sure we are connected
1283 if (!IsValid()) {
1284 Error("Ping","not connected: nothing to do");
1285 return kFALSE;
1286 }
1287
1288 // Options
1289 kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
1290
1291 // Prepare request
1292 XPClientRequest Request;
1293 memset( &Request, 0, sizeof(Request) );
1294 fConn->SetSID(Request.header.streamid);
1295 Request.sendrcv.requestid = kXP_ping;
1296 Request.sendrcv.sid = fSessionID;
1297 Request.sendrcv.opt = options;
1298 Request.sendrcv.dlen = 0;
1299
1300 // Send request
1301 Bool_t res = kFALSE;
1302 if (fMode != 'i') {
1303 char *pans = 0;
1304 XrdClientMessage *xrsp =
1305 fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
1306 kXR_int32 *pres = (kXR_int32 *) pans;
1307
1308 // Get the result
1309 if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
1310 *pres = net2host(*pres);
1311 res = (*pres == 1) ? kTRUE : kFALSE;
1312 // Success: update usage timestamp
1313 Touch();
1314 } else {
1315 // Print error msg, if any
1316 if (fConn->GetLastErr())
1317 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1318 }
1319
1320 // Cleanup
1321 SafeDelete(xrsp);
1322 if (pans) free(pans);
1323
1324 } else {
1325 if (XPD::clientMarshall(&Request) == 0) {
1326 XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
1327 res = (e == kOK) ? kTRUE : kFALSE;
1328 } else {
1329 Error("Ping", "%p: int: problems marshalling request", this);
1330 }
1331 }
1332
1333 // Failure notification (avoid using the handler: we may be exiting)
1334 if (!res) {
1335 Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
1336 } else if (gDebug > 0) {
1337 Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
1338 }
1339
1340 return res;
1341}
1342
1343////////////////////////////////////////////////////////////////////////////////
1344/// Remote touch functionality: contact the server to proof our vitality.
1345/// No reply from server is expected.
1346
1348{
1350
1351 if (gDebug > 0)
1352 Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
1353
1354 // Make sure we are connected
1355 if (!IsValid()) {
1356 Error("RemoteTouch","not connected: nothing to do");
1357 return;
1358 }
1359
1360 // Prepare request
1361 XPClientRequest Request;
1362 memset( &Request, 0, sizeof(Request) );
1363 fConn->SetSID(Request.header.streamid);
1364 Request.sendrcv.requestid = kXP_touch;
1365 Request.sendrcv.sid = fSessionID;
1366 Request.sendrcv.opt = 0;
1367 Request.sendrcv.dlen = 0;
1368
1369 // We need the right order
1370 if (XPD::clientMarshall(&Request) != 0) {
1371 Error("Touch", "%p: problems marshalling request ", this);
1372 return;
1373 }
1374 if (fConn->LowWrite(&Request, 0, 0) != kOK)
1375 Error("Touch", "%p: problems sending touch request to server", this);
1376
1377 // Done
1378 return;
1379}
1380
1381////////////////////////////////////////////////////////////////////////////////
1382/// Interrupt the remote protocol instance. Used to propagate Ctrl-C.
1383/// No reply from server is expected.
1384
1386{
1388
1389 if (gDebug > 0)
1390 Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
1391
1392 // Make sure we are connected
1393 if (!IsValid()) {
1394 Error("CtrlC","not connected: nothing to do");
1395 return;
1396 }
1397
1398 // Prepare request
1399 XPClientRequest Request;
1400 memset( &Request, 0, sizeof(Request) );
1401 fConn->SetSID(Request.header.streamid);
1402 Request.proof.requestid = kXP_ctrlc;
1403 Request.proof.sid = 0;
1404 Request.proof.dlen = 0;
1405
1406 // We need the right order
1407 if (XPD::clientMarshall(&Request) != 0) {
1408 Error("CtrlC", "%p: problems marshalling request ", this);
1409 return;
1410 }
1411 if (fConn->LowWrite(&Request, 0, 0) != kOK)
1412 Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
1413
1414 // Done
1415 return;
1416}
1417
1418////////////////////////////////////////////////////////////////////////////////
1419/// Wait and pick-up next buffer from the asynchronous queue
1420
1422{
1423 fBufCur = 0;
1424 fByteLeft = 0;
1425 fByteCur = 0;
1426 if (gDebug > 2)
1427 Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
1428
1429 // User can choose whether to wait forever or for a fixed amount of time
1430 if (!fDontTimeout) {
1431 static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
1432 static Int_t dt = 2000;
1433 Int_t to = timeout;
1435 while (to && !IsInterrupt()) {
1436 SetAWait(kTRUE);
1437 if (fASem.Wait(dt) != 0) {
1438 to -= dt;
1439 if (to <= 0) {
1440 Error("PickUpReady","error waiting at semaphore");
1441 return -1;
1442 } else {
1443 if (gDebug > 0)
1444 Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
1445 this, GetTitle(), to/1000);
1446 }
1447 } else
1448 break;
1450 }
1451 // We wait forever
1452 if (IsInterrupt()) {
1453 if (gDebug > 2)
1454 Info("PickUpReady","interrupted");
1457 return -1;
1458 }
1459 } else {
1460 // We wait forever
1461 SetAWait(kTRUE);
1462 if (fASem.Wait() != 0) {
1463 Error("PickUpReady","error waiting at semaphore");
1465 return -1;
1466 }
1468 }
1469 if (gDebug > 2)
1470 Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
1471
1472 std::lock_guard<std::recursive_mutex> lock(fAMtx);
1473
1474 // Get message, if any
1475 if (fAQue.size() <= 0) {
1476 Error("PickUpReady","queue is empty - protocol error ?");
1477 return -1;
1478 }
1479 if (!(fBufCur = fAQue.front())) {
1480 Error("PickUpReady","got invalid buffer - protocol error ?");
1481 return -1;
1482 }
1483 // Remove message from the queue
1484 fAQue.pop_front();
1485
1486 // Set number of available bytes
1488
1489 if (gDebug > 2)
1490 Info("PickUpReady", "%p: %s: got message (%d bytes)",
1491 this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
1492
1493 // Update counters
1495
1496 // Set session ID
1497 if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
1499
1500 // Clean entry in the underlying pipe
1501 fgPipe.Clean(this);
1502
1503 // We are done
1504 return 0;
1505}
1506
1507////////////////////////////////////////////////////////////////////////////////
1508/// Pop-up a buffer of at least size bytes from the spare list
1509/// If none is found either one is reallocated or a new one
1510/// created
1511
1513{
1514 TXSockBuf *buf = 0;
1515 static Int_t nBuf = 0;
1516
1517 std::lock_guard<std::mutex> lock(fgSMtx);
1518
1519 Int_t maxsz = 0;
1520 if (fgSQue.size() > 0) {
1521 list<TXSockBuf *>::iterator i;
1522 for (i = fgSQue.begin(); i != fgSQue.end(); ++i) {
1523 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1524 if ((*i) && (*i)->fSiz >= size) {
1525 buf = *i;
1526 if (gDebug > 2)
1527 Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1528 size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
1529 // Drop from this list
1530 fgSQue.erase(i);
1531 return buf;
1532 }
1533 }
1534 // All buffers are too small: enlarge the first one
1535 buf = fgSQue.front();
1536 buf->Resize(size);
1537 if (gDebug > 2)
1538 Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1539 size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1540 // Drop from this list
1541 fgSQue.pop_front();
1542 return buf;
1543 }
1544
1545 // Create a new buffer
1546 buf = new TXSockBuf((char *)malloc(size), size);
1547 nBuf++;
1548
1549 if (gDebug > 2)
1550 Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1551 size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1552
1553 // We are done
1554 return buf;
1555}
1556
1557////////////////////////////////////////////////////////////////////////////////
1558/// Release read buffer giving back to the spare list
1559
1561{
1562 std::lock_guard<std::mutex> lock(fgSMtx);
1563
1564 if (gDebug > 2)
1565 Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
1567
1569 fgSQue.push_back(fBufCur);
1570 } else {
1571 delete fBufCur;
1572 }
1573 fBufCur = 0;
1574 fByteCur = 0;
1575 fByteLeft = 0;
1576}
1577
1578////////////////////////////////////////////////////////////////////////////////
1579/// Receive a raw buffer of specified length bytes.
1580
1582{
1583 // Inputs must make sense
1584 if (!buffer || (length <= 0))
1585 return -1;
1586
1587 // Wait and pick-up a read buffer if we do not have one
1588 if (!fBufCur && (PickUpReady() != 0))
1589 return -1;
1590
1591 // Use it
1592 if (fByteLeft >= length) {
1593 memcpy(buffer, fBufCur->fBuf + fByteCur, length);
1594 fByteCur += length;
1595 if ((fByteLeft -= length) <= 0)
1596 // All used: give back
1597 PushBackSpare();
1598 // Success: update usage timestamp
1599 Touch();
1600 return length;
1601 } else {
1602 // Copy the first part
1603 memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
1604 Int_t at = fByteLeft;
1605 Int_t tobecopied = length - fByteLeft;
1606 PushBackSpare();
1607 while (tobecopied > 0) {
1608 // Pick-up next buffer (it may wait inside)
1609 if (PickUpReady() != 0)
1610 return -1;
1611 // Copy the fresh meat
1612 Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1613 memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
1614 fByteCur = ncpy;
1615 if ((fByteLeft -= ncpy) <= 0)
1616 // All used: give back
1617 PushBackSpare();
1618 // Recalculate
1619 tobecopied -= ncpy;
1620 at += ncpy;
1621 }
1622 }
1623
1624 // Update counters
1625 fBytesRecv += length;
1627
1628 // Success: update usage timestamp
1629 Touch();
1630
1631 return length;
1632}
1633
1634////////////////////////////////////////////////////////////////////////////////
1635/// Send urgent message (interrupt) to remote server
1636/// Returns 0 or -1 in case of error.
1637
1639{
1641
1642 // Prepare request
1643 XPClientRequest Request;
1644 memset(&Request, 0, sizeof(Request) );
1645 fConn->SetSID(Request.header.streamid);
1648 else
1650 Request.interrupt.sid = fSessionID;
1651 Request.interrupt.type = type; // type of interrupt (see TProof::EUrgent)
1652 Request.interrupt.dlen = 0;
1653
1654 // Send request
1655 XrdClientMessage *xrsp =
1656 fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
1657 if (xrsp) {
1658 // Success: update usage timestamp
1659 Touch();
1660 // Cleanup
1661 SafeDelete(xrsp);
1662 // ok
1663 return 0;
1664 } else {
1665 // Print error msg, if any
1666 if (fConn->GetLastErr())
1667 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1668 }
1669
1670 // Failure notification (avoid using the handler: we may be exiting)
1671 Error("SendInterrupt", "problems sending interrupt to server");
1672 return -1;
1673}
1674
1675////////////////////////////////////////////////////////////////////////////////
1676
1678{
1679 std::lock_guard<std::recursive_mutex> lock(fAMtx);
1680 fRDInterrupt = i;
1681 if (i && fConn) fConn->SetInterrupt();
1682 if (i && fAWait) fASem.Post();
1683}
1684
1685////////////////////////////////////////////////////////////////////////////////
1686/// Send a TMessage object. Returns the number of bytes in the TMessage
1687/// that were sent and -1 in case of error.
1688
1690{
1692
1693 if (mess.IsReading()) {
1694 Error("Send", "cannot send a message used for reading");
1695 return -1;
1696 }
1697
1698 // send streamer infos in case schema evolution is enabled in the TMessage
1699 SendStreamerInfos(mess);
1700
1701 // send the process id's so TRefs work
1702 SendProcessIDs(mess);
1703
1704 mess.SetLength(); //write length in first word of buffer
1705
1706 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
1707 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
1708
1709 if (mess.GetCompressionLevel() > 0)
1710 const_cast<TMessage&>(mess).Compress();
1711
1712 char *mbuf = mess.Buffer();
1713 Int_t mlen = mess.Length();
1714 if (mess.CompBuffer()) {
1715 mbuf = mess.CompBuffer();
1716 mlen = mess.CompLength();
1717 }
1718
1719 // Parse message type to choose sending options
1720 kXR_int32 fSendOptDefault = fSendOpt;
1721 switch (mess.What()) {
1722 case kPROOF_PROCESS:
1724 break;
1725 case kPROOF_PROGRESS:
1726 case kPROOF_FEEDBACK:
1728 break;
1732 break;
1736 break;
1737 case kPROOF_STOPPROCESS:
1739 break;
1740 case kPROOF_SETIDLE:
1743 break;
1744 case kPROOF_LOGFILE:
1745 case kPROOF_LOGDONE:
1746 if (GetClientIDSize() <= 1)
1748 break;
1749 default:
1750 break;
1751 }
1752
1753 if (gDebug > 2)
1754 Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
1755
1756 Int_t nsent = SendRaw(mbuf, mlen);
1757 fSendOpt = fSendOptDefault;
1758
1759 if (nsent <= 0)
1760 return nsent;
1761
1762 fBytesSent += nsent;
1763 fgBytesSent += nsent;
1764
1765 return nsent - sizeof(UInt_t); //length - length header
1766}
1767
1768////////////////////////////////////////////////////////////////////////////////
1769/// Receive a TMessage object. The user must delete the TMessage object.
1770/// Returns length of message in bytes (can be 0 if other side of connection
1771/// is closed) or -1 in case of error or -5 if pipe broken (connection invalid).
1772/// In those case mess == 0.
1773
1775{
1777
1778 if (!IsValid()) {
1779 mess = 0;
1780 return -5;
1781 }
1782
1783oncemore:
1784 Int_t n;
1785 UInt_t len;
1786 if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
1787 mess = 0;
1788 return n;
1789 }
1790 len = net2host(len); //from network to host byte order
1791
1792 char *buf = new char[len+sizeof(UInt_t)];
1793 if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
1794 delete [] buf;
1795 mess = 0;
1796 return n;
1797 }
1798
1799 fBytesRecv += n + sizeof(UInt_t);
1800 fgBytesRecv += n + sizeof(UInt_t);
1801
1802 mess = new TMessage(buf, len+sizeof(UInt_t));
1803
1804 // receive any streamer infos
1805 if (RecvStreamerInfos(mess))
1806 goto oncemore;
1807
1808 // receive any process ids
1809 if (RecvProcessIDs(mess))
1810 goto oncemore;
1811
1812 if (mess->What() & kMESS_ACK) {
1813 // Acknowledgement embedded: ignore ...
1814 mess->SetWhat(mess->What() & ~kMESS_ACK);
1815 }
1816
1817 return n;
1818}
1819
1820////////////////////////////////////////////////////////////////////////////////
1821/// Send message to intermediate coordinator.
1822/// If any output is due, this is returned as an obj string to be
1823/// deleted by the caller
1824
1826 Long64_t l64, Int_t int3, const char *)
1827{
1828 TObjString *sout = 0;
1829
1830 // We fill the header struct containing the request
1831 XPClientRequest reqhdr;
1832 const void *buf = 0;
1833 char *bout = 0;
1834 char **vout = 0;
1835 memset(&reqhdr, 0, sizeof(reqhdr));
1836 fConn->SetSID(reqhdr.header.streamid);
1837 reqhdr.header.requestid = kXP_admin;
1838 reqhdr.proof.int1 = kind;
1839 reqhdr.proof.int2 = int2;
1840 switch (kind) {
1841 case kQueryMssUrl:
1842 case kQueryROOTVersions:
1843 case kQuerySessions:
1844 case kQueryWorkers:
1845 reqhdr.proof.sid = 0;
1846 reqhdr.header.dlen = 0;
1847 vout = (char **)&bout;
1848 break;
1849 case kCleanupSessions:
1850 reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
1851 : (kXR_int32) kXPD_TopMaster;
1852 reqhdr.proof.int3 = int2;
1853 reqhdr.proof.sid = fSessionID;
1854 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1855 buf = (msg) ? (const void *)msg : buf;
1856 break;
1857 case kCpFile:
1858 case kGetFile:
1859 case kPutFile:
1860 case kExec:
1861 reqhdr.proof.sid = fSessionID;
1862 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1863 buf = (msg) ? (const void *)msg : buf;
1864 vout = (char **)&bout;
1865 break;
1866 case kQueryLogPaths:
1867 vout = (char **)&bout;
1868 reqhdr.proof.int3 = int3;
1869 case kReleaseWorker:
1870 case kSendMsgToUser:
1871 case kGroupProperties:
1872 case kSessionTag:
1873 case kSessionAlias:
1874 reqhdr.proof.sid = fSessionID;
1875 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1876 buf = (msg) ? (const void *)msg : buf;
1877 break;
1878 case kROOTVersion:
1879 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1880 buf = (msg) ? (const void *)msg : buf;
1881 break;
1882 case kGetWorkers:
1883 reqhdr.proof.sid = fSessionID;
1884 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1885 if (msg)
1886 buf = (const void *)msg;
1887 vout = (char **)&bout;
1888 break;
1889 case kReadBuffer:
1890 reqhdr.header.requestid = kXP_readbuf;
1891 reqhdr.readbuf.ofs = l64;
1892 reqhdr.readbuf.len = int2;
1893 if (int3 > 0 && fXrdProofdVersion < 1003) {
1894 Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
1895 " grep functionality not supported", fXrdProofdVersion);
1896 return sout;
1897 }
1898 reqhdr.readbuf.int1 = int3;
1899 if (!msg || strlen(msg) <= 0) {
1900 Info("SendCoordinator", "kReadBuffer: file path undefined");
1901 return sout;
1902 }
1903 reqhdr.header.dlen = strlen(msg);
1904 buf = (const void *)msg;
1905 vout = (char **)&bout;
1906 break;
1907 default:
1908 Info("SendCoordinator", "unknown message kind: %d", kind);
1909 return sout;
1910 }
1911
1912 // server response header
1913 Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
1914 XrdClientMessage *xrsp =
1915 fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
1916
1917 // If positive answer
1918 if (xrsp) {
1919 // Check if we need to create an output string
1920 if (bout && (xrsp->DataLen() > 0))
1921 sout = new TObjString(TString(bout,xrsp->DataLen()));
1922 if (bout)
1923 free(bout);
1924 // Success: update usage timestamp
1925 Touch();
1926 SafeDelete(xrsp);
1927 } else {
1928 // Print error msg, if any
1929 if (fConn->GetLastErr())
1930 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1931 }
1932
1933 // Failure notification (avoid using the handler: we may be exiting)
1934 return sout;
1935}
1936
1937////////////////////////////////////////////////////////////////////////////////
1938/// Send urgent message to counterpart; 'type' specifies the type of
1939/// the message (see TXSocket::EUrgentMsgType), and 'int1', 'int2'
1940/// two containers for additional information.
1941
1943{
1945
1946 // Prepare request
1947 XPClientRequest Request;
1948 memset(&Request, 0, sizeof(Request) );
1949 fConn->SetSID(Request.header.streamid);
1950 Request.proof.requestid = kXP_urgent;
1951 Request.proof.sid = fSessionID;
1952 Request.proof.int1 = type; // type of urgent msg (see TXSocket::EUrgentMsgType)
1953 Request.proof.int2 = int1; // 4-byte container info 1
1954 Request.proof.int3 = int2; // 4-byte container info 2
1955 Request.proof.dlen = 0;
1956
1957 // Send request
1958 XrdClientMessage *xrsp =
1959 fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
1960 if (xrsp) {
1961 // Success: update usage timestamp
1962 Touch();
1963 // Cleanup
1964 SafeDelete(xrsp);
1965 } else {
1966 // Print error msg, if any
1967 if (fConn->GetLastErr())
1968 Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1969 }
1970
1971 // Done
1972 return;
1973}
1974
1975////////////////////////////////////////////////////////////////////////////////
1976
1978 return (fConn ? fConn->GetLowSocket() : -1);
1979}
1980
1981////////////////////////////////////////////////////////////////////////////////
1982/// Init environment variables for XrdClient
1983
1985{
1986 // Set debug level
1987 Int_t deb = gEnv->GetValue("XProof.Debug", -1);
1988 EnvPutInt(NAME_DEBUG, deb);
1989 if (deb > 0) {
1990 XrdProofdTrace->What |= TRACE_REQ;
1991 if (deb > 1) {
1992 XrdProofdTrace->What |= TRACE_DBG;
1993 if (deb > 2)
1994 XrdProofdTrace->What |= TRACE_ALL;
1995 }
1996 }
1997 const char *cenv = 0;
1998
1999 // List of domains where connection is allowed
2000 TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
2001 if (allowCO.Length() > 0)
2003
2004 // List of domains where connection is denied
2005 TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
2006 if (denyCO.Length() > 0)
2008
2009 // Max number of retries on first connect and related timeout
2011 Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
2013 Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
2015
2016 // Reconnect Wait
2017 Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
2019 if (recoTO == DFLT_RECONNECTWAIT) {
2020 // Check also the old variable name
2021 recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
2023 }
2025
2026 // Request Timeout
2027 Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
2029
2030 // No automatic proofd backward-compatibility
2032
2033 // Dynamic forwarding (SOCKS4)
2034 TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
2035 Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
2036 if (socks4Port > 0) {
2037 if (socks4Host.IsNull())
2038 // Default
2039 socks4Host = "127.0.0.1";
2040 EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
2041 EnvPutInt(NAME_SOCKS4PORT, socks4Port);
2042 }
2043
2044 // For password-based authentication
2045 TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
2046 if (autolog.Length() > 0 &&
2047 (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2048 gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
2049
2050 // For password-based authentication
2051 TString netrc;
2052 netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
2053 gSystem->Setenv("XrdSecNETRC", netrc.Data());
2054
2055 TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
2056 if (alogfile.Length() > 0)
2057 gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
2058
2059 TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
2060 if (verisrv.Length() > 0 &&
2061 (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2062 gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
2063
2064 TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
2065 if (srvpuk.Length() > 0)
2066 gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
2067
2068 // For GSI authentication
2069 TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
2070 if (cadir.Length() > 0)
2071 gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
2072
2073 TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
2074 if (crldir.Length() > 0)
2075 gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
2076
2077 TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
2078 if (crlext.Length() > 0)
2079 gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
2080
2081 TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
2082 if (ucert.Length() > 0)
2083 gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
2084
2085 TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
2086 if (ukey.Length() > 0)
2087 gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
2088
2089 TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
2090 if (upxy.Length() > 0)
2091 gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
2092
2093 TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
2094 if (valid.Length() > 0)
2095 gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
2096
2097 TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
2098 if (deplen.Length() > 0 &&
2099 (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2100 gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
2101
2102 TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
2103 if (pxybits.Length() > 0)
2104 gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
2105
2106 TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
2107 if (crlcheck.Length() > 0 &&
2108 (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2109 gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
2110
2111 TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
2112 if (delegpxy.Length() > 0 &&
2113 (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2114 gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
2115
2116 TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
2117 if (signpxy.Length() > 0 &&
2118 (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2119 gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
2120
2121 // Print the tag, if required (only once)
2122 if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
2123 ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
2124 gROOT->GetVersion());
2125
2126 // Only once
2127 fgInitDone = kTRUE;
2128}
2129
2130////////////////////////////////////////////////////////////////////////////////
2131/// Try reconnection after failure
2132
2134{
2135 if (gDebug > 0) {
2136 Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2137 this, fConn, (fConn ? fConn->IsValid() : 0),
2138 fUrl.Data(), (fConn ? fConn->GetLogConnID() : -1));
2139 }
2140
2141 Int_t tryreconnect = gEnv->GetValue("TXSocket.Reconnect", 0);
2142 if (tryreconnect == 0 || fXrdProofdVersion < 1005) {
2143 if (tryreconnect == 0)
2144 Info("Reconnect","%p: reconnection attempts explicitly disabled!", this);
2145 else
2146 Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
2147 this, fXrdProofdVersion);
2148 return -1;
2149 }
2150
2151 if (fConn) {
2152 if (gDebug > 0)
2153 Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
2154 fConn->ReConnect();
2155 if (fConn->IsValid()) {
2156 // Create new proofserv if not client manager or administrator or internal mode
2157 if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
2158 // We attach or create
2159 if (!Create(kTRUE)) {
2160 // Failure
2161 Error("TXSocket", "create or attach failed (%s)",
2162 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
2163 Close();
2164 return -1;
2165 }
2166 }
2167 }
2168 }
2169
2170 if (gDebug > 0) {
2171 if (fConn) {
2172 Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
2173 (fConn->IsValid() ? "succeeded!" : "failed"),
2174 fConn->GetLogConnID() );
2175 } else {
2176 Info("Reconnect", "%p (c:0x0): attempt failed", this);
2177 }
2178 }
2179
2180 // Done
2181 return ((fConn && fConn->IsValid()) ? 0 : -1);
2182}
2183
2184////////////////////////////////////////////////////////////////////////////////
2185///constructor
2186
2188{
2189 fBuf = fMem = bp;
2190 fSiz = fLen = sz;
2191 fOwn = own;
2192 fCid = -1;
2193 fgBuffMem += sz;
2194}
2195
2196////////////////////////////////////////////////////////////////////////////////
2197///destructor
2198
2200{
2201 if (fOwn && fMem) {
2202 free(fMem);
2203 fgBuffMem -= fSiz;
2204 }
2205}
2206
2207////////////////////////////////////////////////////////////////////////////////
2208///resize socket buffer
2209
2211{
2212 if (sz > fSiz) {
2213 if ((fMem = (Char_t *)realloc(fMem, sz))) {
2214 fgBuffMem += (sz - fSiz);
2215 fBuf = fMem;
2216 fSiz = sz;
2217 fLen = 0;
2218 }
2219 }
2220}
2221
2222//_____________________________________________________________________________
2223//
2224// TXSockBuf static methods
2225//
2226
2227////////////////////////////////////////////////////////////////////////////////
2228/// Return the currently allocated memory
2229
2231{
2232 return fgBuffMem;
2233}
2234
2235////////////////////////////////////////////////////////////////////////////////
2236/// Return the max allocated memory allowed
2237
2239{
2240 return fgMemMax;
2241}
2242
2243////////////////////////////////////////////////////////////////////////////////
2244/// Return the max allocated memory allowed
2245
2247{
2248 fgMemMax = memmax > 0 ? memmax : fgMemMax;
2249}
2250
2251//_____________________________________________________________________________
2252//
2253// TXSockPipe
2254//
2255
2256////////////////////////////////////////////////////////////////////////////////
2257/// Constructor
2258
2259TXSockPipe::TXSockPipe(const char *loc) : fLoc(loc)
2260{
2261 // Create the pipe
2262 if (pipe(fPipe) != 0) {
2263 Printf("TXSockPipe: problem initializing pipe for socket inputs");
2264 fPipe[0] = -1;
2265 fPipe[1] = -1;
2266 return;
2267 }
2268}
2269
2270////////////////////////////////////////////////////////////////////////////////
2271/// Destructor
2272
2274{
2275 if (fPipe[0] >= 0) close(fPipe[0]);
2276 if (fPipe[1] >= 0) close(fPipe[1]);
2277}
2278
2279
2280////////////////////////////////////////////////////////////////////////////////
2281/// Write a byte to the global pipe to signal new availibility of
2282/// new messages
2283
2285{
2286 if (!IsValid() || !s) return -1;
2287
2288 // This must be an atomic action
2289 Int_t sz = 0;
2290 { std::lock_guard<std::recursive_mutex> lock(fMutex);
2291 // Add this one
2292 fReadySock.Add(s);
2293
2294 // Only one char
2295 Char_t c = 1;
2296 if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
2297 Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
2298 return -1;
2299 }
2300 if (gDebug > 2) sz = fReadySock.GetSize();
2301 }
2302
2303 if (gDebug > 2)
2304 Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2305 fLoc.Data(), s, sz, fPipe[1]);
2306 // We are done
2307 return 0;
2308}
2309
2310////////////////////////////////////////////////////////////////////////////////
2311/// Read a byte to the global pipe to synchronize message pickup
2312
2314{
2315 // Pipe must have been created
2316 if (!IsValid() || !s) return -1;
2317
2318 // Only one char
2319 Int_t sz = 0;
2320 Char_t c = 0;
2321 { std::lock_guard<std::recursive_mutex> lock(fMutex);
2322 if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
2323 Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
2324 return -1;
2325 }
2326 // Remove this one
2327 fReadySock.Remove(s);
2328
2329 if (gDebug > 2) sz = fReadySock.GetSize();
2330 }
2331
2332 if (gDebug > 2)
2333 Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2334 fLoc.Data(), s, sz, fPipe[0]);
2335
2336 // We are done
2337 return 0;
2338}
2339
2340////////////////////////////////////////////////////////////////////////////////
2341/// Remove any reference to socket 's' from the global pipe and
2342/// ready-socket queue
2343
2345{
2346 // Pipe must have been created
2347 if (!IsValid() || !s) return -1;
2348
2349 TObject *o = 0;
2350 // This must be an atomic action
2351 { std::lock_guard<std::recursive_mutex> lock(fMutex);
2352 o = fReadySock.FindObject(s);
2353
2354 while (o) {
2355 // Remove from the list
2356 fReadySock.Remove(s);
2357 o = fReadySock.FindObject(s);
2358 // Remove one notification from the pipe
2359 Char_t c = 0;
2360 if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
2361 Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
2362 }
2363 }
2364 // Flush also the socket
2365 ((TXSocket *)s)->Flush();
2366
2367 // Notify
2368 if (gDebug > 0)
2369 Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
2370
2371 // We are done
2372 return 0;
2373}
2374
2375////////////////////////////////////////////////////////////////////////////////
2376/// Dump content of the ready socket list
2377
2379{
2380 std::lock_guard<std::recursive_mutex> lock(fMutex);
2381
2382 TString buf = Form("%d |", fReadySock.GetSize());
2383 TIter nxs(&fReadySock);
2384 TObject *o = 0;
2385 while ((o = nxs()))
2386 buf += Form(" %p",o);
2387 Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
2388}
2389
2390////////////////////////////////////////////////////////////////////////////////
2391/// Return last ready socket
2392
2394{
2395 std::lock_guard<std::recursive_mutex> lock(fMutex);
2396
2397 return (TXSocket *) fReadySock.Last();
2398}
UShort_t net2host(UShort_t x)
Definition Bytes.h:575
std::string fBuffer
@ kPROOF_PROCESS
@ kPROOF_PROGRESS
@ kPROOF_TOUCH
@ kPROOF_STOPPROCESS
@ kPROOF_LOGFILE
@ kMESS_ACK
@ kPROOF_FATAL
@ kPROOF_QUERYSUBMITTED
@ kPROOF_FEEDBACK
@ kPROOF_LOGDONE
@ kPROOF_SETIDLE
@ kPROOF_STARTPROCESS
@ kPROOF_STOP
#define SafeDelete(p)
Definition RConfig.hxx:525
#define b(i)
Definition RSha256.hxx:100
#define c(i)
Definition RSha256.hxx:101
#define e(i)
Definition RSha256.hxx:103
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
bool Bool_t
Definition RtypesCore.h:63
char Char_t
Definition RtypesCore.h:37
unsigned int UInt_t
Definition RtypesCore.h:46
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
constexpr Ssiz_t kNPOS
Definition RtypesCore.h:124
long long Long64_t
Definition RtypesCore.h:80
unsigned long long ULong64_t
Definition RtypesCore.h:81
constexpr Bool_t kTRUE
Definition RtypesCore.h:100
const char Option_t
Definition RtypesCore.h:66
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
void ErrorHandler(int level, const char *location, const char *fmt, std::va_list va)
General error handler function. It calls the user set error handler.
Definition TError.cxx:109
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize id
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
const char *const kPROOF_WorkerIdleTO
Definition TProof.h:135
Int_t gDebug
Definition TROOT.cxx:597
#define gROOT
Definition TROOT.h:407
char * Form(const char *fmt,...)
Formats a string in a circular formatting buffer.
Definition TString.cxx:2467
void Printf(const char *fmt,...)
Formats a string in a circular formatting buffer and prints the string.
Definition TString.cxx:2481
ESendRecvOptions
Definition TSystem.h:228
@ kDontBlock
Definition TSystem.h:232
R__EXTERN TSystem * gSystem
Definition TSystem.h:560
static XrdSysError eDest(0, "Proofx")
static XrdSysLogger eLogger
Definition TXSocket.cxx:56
XrdOucTrace * XrdProofdTrace
Definition TXSocket.cxx:55
#define kXPD_logmsg
#define kXPD_internal
@ kXP_create
@ kXP_destroy
@ kXP_sendmsg
@ kXP_ping
@ kXP_urgent
@ kXP_touch
@ kXP_detach
@ kXP_attach
@ kXP_interrupt
@ kXP_readbuf
@ kXP_admin
@ kXP_ctrlc
#define kXPD_TopMaster
#define kXPD_AnyServer
#define kXPD_startprocess
@ kXP_TooManySess
#define kXPD_querynum
#define kXPD_async
#define kXPD_fb_prog
@ kCleanupSessions
@ kPutFile
@ kGroupProperties
@ kExec
@ kQueryLogPaths
@ kCpFile
@ kQuerySessions
@ kSendMsgToUser
@ kROOTVersion
@ kReleaseWorker
@ kSessionAlias
@ kQueryWorkers
@ kGetFile
@ kGetWorkers
@ kQueryMssUrl
@ kQueryROOTVersions
@ kReadBuffer
@ kSessionTag
#define kXPD_setidle
#define kXPD_process
@ kXPD_flush
@ kXPD_msgsid
@ kXPD_msg
@ kXPD_urgent
@ kXPD_wrkmortem
@ kXPD_feedback
@ kXPD_resume
@ kXPD_clusterinfo
@ kXPD_interrupt
@ kXPD_ping
@ kXPD_touch
@ kXPD_inflate
@ kXPD_timer
@ kXPD_priority
@ kXPD_errmsg
@ kXPD_srvmsg
#define XrdSysError
Definition XpdSysError.h:8
#define XrdSysLogger
Definition XpdSysLogger.h:8
#define NAME_SOCKS4PORT
#define NAME_CONNECTTIMEOUT
#define NAME_REQUESTTIMEOUT
#define NAME_DEBUG
#define NAME_SOCKS4HOST
#define NAME_FIRSTCONNECTMAXCNT
#define NAME_RECONNECTWAIT
#define NAME_CONNECTDOMAINDENY_RE
#define NAME_CONNECTDOMAINALLOW_RE
#define NAME_KEEPSOCKOPENIFNOTXRD
#define DFLT_RECONNECTWAIT
#define EnvPutInt(name, val)
#define EnvPutString(name, val)
UnsolRespProcResult
@ kUNSOL_KEEP
@ kUNSOL_CONTINUE
#define TRACE_REQ
#define TRACE_DBG
R__EXTERN XrdOucTrace * XrdProofdTrace
#define TRACE_ALL
#define realloc
Definition civetweb.c:1538
#define free
Definition civetweb.c:1539
#define malloc
Definition civetweb.c:1536
Bool_t IsReading() const
Definition TBuffer.h:86
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:578
void Add(TObject *obj) override
Definition TList.h:81
TObject * Remove(TObject *obj) override
Remove object from the list.
Definition TList.cxx:822
TObject * Last() const override
Return the last object in the list. Returns 0 when list is empty.
Definition TList.cxx:693
UInt_t What() const
Definition TMessage.h:75
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:202
char * CompBuffer() const
Definition TMessage.h:89
Int_t Compress()
Compress the message.
Definition TMessage.cxx:306
Int_t GetCompressionLevel() const
Definition TMessage.h:106
Int_t CompLength() const
Definition TMessage.h:90
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:222
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
const char * GetTitle() const override
Returns title of object.
Definition TNamed.h:48
Collectable string class.
Definition TObjString.h:28
Mother of all ROOT objects.
Definition TObject.h:41
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:973
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:987
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:961
@ kShutdownInterrupt
Definition TProof.h:398
@ kPing
Definition TProof.h:395
Int_t TryWait()
If the semaphore value is > 0 then decrement it and return 0.
Int_t Post()
Increment the value of the semaphore.
Int_t Wait()
If the semaphore value is > 0 then decrement it and carry on, else block, waiting on the condition un...
TInetAddress fAddress
Definition TSocket.h:59
Int_t fCompress
Definition TSocket.h:62
Int_t fSocket
Definition TSocket.h:69
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition TSocket.cxx:649
TString fUrl
Definition TSocket.h:71
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition TSocket.cxx:928
static ULong64_t fgBytesRecv
Definition TSocket.h:78
void Touch()
Definition TSocket.h:157
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition TSocket.cxx:975
Int_t GetCompressionLevel() const
Definition TSocket.h:181
static ULong64_t fgBytesSent
Definition TSocket.h:79
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition TSocket.cxx:684
Int_t fTcpWindowSize
Definition TSocket.h:70
@ kPROOFD
Definition TSocket.h:52
EServiceType fServType
Definition TSocket.h:68
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm: settings = 100 * algorithm + level.
Definition TSocket.cxx:1098
UInt_t fBytesSent
Definition TSocket.h:61
Int_t fRemoteProtocol
Definition TSocket.h:64
UInt_t fBytesRecv
Definition TSocket.h:60
Basic string class.
Definition TString.h:139
Ssiz_t Length() const
Definition TString.h:421
Int_t Atoi() const
Return integer value of string.
Definition TString.cxx:1966
const char * Data() const
Definition TString.h:380
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition TString.cxx:1808
Bool_t IsNull() const
Definition TString.h:418
TString & Remove(Ssiz_t pos)
Definition TString.h:685
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString.
Definition TString.cxx:2356
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2334
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition TString.h:651
static void ResetErrno()
Static function resetting system error number.
Definition TSystem.cxx:271
virtual int GetPid()
Get process id.
Definition TSystem.cxx:694
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1650
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition TSystem.cxx:2276
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition TSystem.cxx:1634
virtual const char * HomeDirectory(const char *userName=nullptr)
Return the user's home directory.
Definition TSystem.cxx:874
This class represents a WWW compatible URL.
Definition TUrl.h:33
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition TUrl.cxx:523
const char * GetHost() const
Definition TUrl.h:67
Int_t GetPort() const
Definition TUrl.h:78
Handler of asynchronous events for XProofD sockets.
Definition TXHandler.h:28
virtual Bool_t HandleInput(const void *in=0)
Handler of asynchronous input events.
Definition TXHandler.cxx:28
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
Definition TXHandler.cxx:37
Bool_t IsValid() const
Definition TXSocket.h:300
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Bool_t fOwn
Definition TXSocket.h:245
Int_t fSiz
Definition TXSocket.h:242
Int_t fLen
Definition TXSocket.h:243
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Char_t * fMem
Definition TXSocket.h:258
static Long64_t BuffMem()
Return the currently allocated memory.
static Long64_t fgMemMax
Definition TXSocket.h:260
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
~TXSockBuf()
destructor
static Long64_t fgBuffMem
Definition TXSocket.h:259
Char_t * fBuf
Definition TXSocket.h:244
void Resize(Int_t sz)
resize socket buffer
Int_t fCid
Definition TXSocket.h:246
TXSockPipe(const char *loc="")
Constructor.
void SetLoc(const char *loc="")
Definition TXSocket.h:282
TList fReadySock
Definition TXSocket.h:288
Int_t Flush(TSocket *s)
Remove any reference to socket 's' from the global pipe and ready-socket queue.
std::recursive_mutex fMutex
Definition TXSocket.h:285
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
virtual ~TXSockPipe()
Destructor.
void DumpReadySock()
Dump content of the ready socket list.
Bool_t IsValid() const
Definition TXSocket.h:272
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
TXSocket * GetLastReady()
Return last ready socket.
TString fLoc
Definition TXSocket.h:287
Int_t fPipe[2]
Definition TXSocket.h:286
TXSocket * fSocket
Definition TXSocket.cxx:80
TXSocketPingHandler(TXSocket *s, Int_t fd)
Definition TXSocket.cxx:82
Bool_t ReadNotify()
Notify when something can be read from the descriptor associated with this handler.
Definition TXSocket.cxx:85
Bool_t Notify()
Ping the socket.
Definition TXSocket.cxx:91
High level handler of connections to XProofD.
Definition TXSocket.h:59
void SetSessionID(Int_t id)
Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
Definition TXSocket.cxx:256
static std::list< TXSockBuf * > fgSQue
Definition TXSocket.h:117
TXSockBuf * fBufCur
Definition TXSocket.h:91
Int_t GetSessionID() const
Getter for session ID.
Definition TXSocket.cxx:943
virtual Int_t GetClientID() const
Definition TXSocket.h:155
TString fUser
Definition TXSocket.h:72
void PostSemAll()
Wake up all threads waiting for at the semaphore (used by TXSlave)
Definition TXSocket.cxx:905
Int_t GetLogConnID() const
Getter for logical connection ID.
Definition TXSocket.cxx:919
Bool_t fRDInterrupt
Definition TXSocket.h:105
Int_t GetOpenError() const
Getter for last error.
Definition TXSocket.cxx:927
std::list< TXSockBuf * > fAQue
Definition TXSocket.h:88
Int_t GetServType() const
Getter for server type.
Definition TXSocket.cxx:935
static void SetLocation(const char *loc="")
Set location string.
Definition TXSocket.cxx:242
TXSocket(const char *url, Char_t mode='M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Definition TXSocket.cxx:127
Int_t fPort
Definition TXSocket.h:74
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition TXSocket.cxx:71
Int_t fByteLeft
Definition TXSocket.h:89
std::recursive_mutex fAMtx
Definition TXSocket.h:86
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
virtual Int_t GetClientIDSize() const
Definition TXSocket.h:156
virtual ~TXSocket()
Destructor.
Definition TXSocket.cxx:231
void PushBackSpare()
Release read buffer giving back to the spare list.
Int_t fPid
Definition TXSocket.h:101
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition TXSocket.cxx:368
TSemaphore fAsynProc
Definition TXSocket.h:93
kXR_int32 fSendOpt
Definition TXSocket.h:70
static std::mutex fgSMtx
Definition TXSocket.h:116
XrdProofConn * fConn
Definition TXSocket.h:82
static void InitEnvs()
Init environment variables for XrdClient.
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Bool_t fIForward
Definition TXSocket.h:98
Bool_t IsInterrupt()
Definition TXSocket.h:203
Int_t fXrdProofdVersion
Definition TXSocket.h:108
std::recursive_mutex fIMtx
Definition TXSocket.h:96
TObject * fReference
Definition TXSocket.h:79
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Int_t GetLowSocket() const
static Bool_t fgInitDone
Definition TXSocket.h:113
Int_t fLogLevel
Definition TXSocket.h:76
virtual void Close(Option_t *opt="")
Close connection.
Definition TXSocket.cxx:311
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
void SetInterrupt(Bool_t i=kTRUE)
static TString fgLoc
Definition TXSocket.h:112
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; 'type' specifies the type of the message (see TXSocket::EUrgentMs...
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
virtual Int_t Reconnect()
Try reconnection after failure.
TSemaphore fASem
Definition TXSocket.h:85
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
TString fBuffer
Definition TXSocket.h:78
TXHandler * fHandler
Definition TXSocket.h:80
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Bool_t fDontTimeout
Definition TXSocket.h:104
Int_t Flush()
Flush the asynchronous queue.
Int_t fByteCur
Definition TXSocket.h:90
static TXSockPipe fgPipe
Definition TXSocket.h:111
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition TXSocket.cxx:268
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition TXSocket.cxx:972
Short_t fSessionID
Definition TXSocket.h:71
virtual void SetClientID(Int_t)
Definition TXSocket.h:166
char fMode
Definition TXSocket.h:69
Int_t Send(const TMessage &mess)
Send a TMessage object.
kXR_int32 fILev
Definition TXSocket.h:97
void CtrlC()
Interrupt the remote protocol instance.
TString fHost
Definition TXSocket.h:73
Bool_t IsServProofd()
Return kTRUE if the remote server is a 'proofd'.
Definition TXSocket.cxx:959
void PostMsg(Int_t type, const char *msg=0)
Post a message of type 'type' into the read messages queue.
Definition TXSocket.cxx:848
void SetAWait(Bool_t w=kTRUE)
Definition TXSocket.h:207
Bool_t IsValid() const
Getter for validity status.
Definition TXSocket.cxx:951
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Bool_t fAWait
Definition TXSocket.h:87
XrdOucString GetUrl()
XrdClientPhyConnection * fPhyConn
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
int GetLogConnID() const
bool IsValid() const
Test validity of this connection.
virtual void Close(const char *opt="")
Close connection.
XrdClientUrlInfo fUrl
kXR_unt16 fStreamid
short GetSessionID() const
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XrdOucString fUser
XrdOucString fLastErrMsg
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
int GetLowSocket()
Return the socket descriptor of the underlying connection.
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
int GetOpenError() const
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
int GetServType() const
XrdOucString fHost
void SetInterrupt()
Interrupt the underlying socket.
const Int_t n
Definition legend1.C:16
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...
static const char * what
Definition stlLoader.cc:6
TMarker m
Definition textangle.C:8
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientInterruptRequest interrupt
struct XPClientSendRcvRequest sendrcv
struct XPClientReadbufRequest readbuf