Logo ROOT   6.18/05
Reference Guide
XrdProofdProofServ.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: Gerardo Ganis 12/12/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11#include <sys/stat.h>
12
13#include "XrdNet/XrdNet.hh"
14
15#include "XrdProofdAux.h"
16#include "XrdProofdProofServ.h"
17#include "XrdProofWorker.h"
18#include "XrdProofSched.h"
19#include "XrdProofdManager.h"
20
21// Tracing utils
22#include "XrdProofdTrace.h"
23
24////////////////////////////////////////////////////////////////////////////////
25/// Constructor
26
28{
30 fResponse = 0;
31 fProtocol = 0;
32 fParent = 0;
33 fPingSem = 0;
34 fStartMsg = 0;
36 fSrvPID = -1;
38 fPLiteNWrks = -1;
39 fID = -1;
40 fIsShutdown = false;
41 fIsValid = true; // It is created for a valid server ...
42 fSkipCheck = false;
43 fProtVer = -1;
44 fNClients = 0;
45 fClients.reserve(10);
46 fDisconnectTime = -1;
47 fSetIdleTime = time(0);
48 fROOT = 0;
49 // Strings
50 fAdminPath = "";
51 fAlias = "";
52 fClient = "";
53 fFileout = "";
54 fGroup = "";
55 fOrdinal = "";
56 fTag = "";
57 fUserEnvs = "";
58 fUNIXSock = 0;
59 fUNIXSockPath = "";
60 fQueries.clear();
61}
62
63////////////////////////////////////////////////////////////////////////////////
64/// Destructor
65
67{
70
71 std::vector<XrdClientID *>::iterator i;
72 for (i = fClients.begin(); i != fClients.end(); ++i)
73 if (*i)
74 delete (*i);
75 fClients.clear();
76
77 // Cleanup worker info
79
80 // Cleanup queries info
81 fQueries.clear();
82
83 // Remove the associated UNIX socket path
84 unlink(fUNIXSockPath.c_str());
85
87}
88
89////////////////////////////////////////////////////////////////////////////////
90/// Decrease active session counters on worker w
91
92static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
93{
94 XPDLOC(PMGR, "DecreaseWorkerCounters")
95
97
98 if (w && xps) {
99 w->RemoveProofServ(xps);
100 TRACE(REQ, w->fHost.c_str() <<" done");
101 // Check next
102 return 0;
103 }
104
105 // Not enough info: stop
106 return 1;
107}
108
109////////////////////////////////////////////////////////////////////////////////
110/// Decrease active session counters on worker w
111
112static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
113{
114 XPDLOC(PMGR, "DumpWorkerCounters")
115
116 if (w) {
117 TRACE(ALL, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
118 // Check next
119 return 0;
120 }
121
122 // Not enough info: stop
123 return 1;
124}
125
126////////////////////////////////////////////////////////////////////////////////
127/// Decrease worker counters and clean-up the list
128
130{
132
133 // Decrease workers' counters and remove this from workers
134 fWorkers.Apply(DecreaseWorkerCounters, this);
135 fWorkers.Purge();
136}
137
138////////////////////////////////////////////////////////////////////////////////
139/// Add a worker assigned to this session with label 'o'
140
142{
143 if (!o || !w) return;
144
146
147 fWorkers.Add(o, w, 0, Hash_keepdata);
148}
149
150////////////////////////////////////////////////////////////////////////////////
151/// Release worker assigned to this session with label 'o'
152
154{
155 XPDLOC(SMGR, "ProofServ::RemoveWorker")
156
157 if (!o) return;
158
159 TRACE(DBG,"removing: "<<o);
160
162
163 XrdProofWorker *w = fWorkers.Find(o);
164 if (w) w->RemoveProofServ(this);
165 fWorkers.Del(o);
166 if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
167}
168
169////////////////////////////////////////////////////////////////////////////////
170/// Reset this instance, broadcasting a message to the clients.
171/// return 1 if top master, 0 otherwise
172
173int XrdProofdProofServ::Reset(const char *msg, int type)
174{
175 XPDLOC(SMGR, "ProofServ::Reset")
176
177 int rc = 0;
178 // Read the status file
179 int st = -1;
180 XrdOucString fn;
181 XPDFORM(fn, "%s.status", fAdminPath.c_str());
182 FILE *fpid = fopen(fn.c_str(), "r");
183 if (fpid) {
184 char line[64];
185 if (fgets(line, sizeof(line), fpid)) {
186 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = 0;
187 st = atoi(line);
188 } else {
189 TRACE(XERR,"problems reading from file "<<fn);
190 }
191 fclose(fpid);
192 }
193 TRACE(DBG,"file: "<<fn<<", st:"<<st);
195 // Broadcast msg
196 if (st == 4) {
197 Broadcast("idle-timeout", type);
198 } else {
199 Broadcast(msg, type);
200 }
201 // What kind of server is this?
202 if (fSrvType == kXPD_TopMaster) rc = 1;
203 // Reset instance
204 Reset();
205 // Done
206 return rc;
207}
208
209////////////////////////////////////////////////////////////////////////////////
210/// Reset this instance
211
213{
215
216 fResponse = 0;
217 fProtocol = 0;
218 fParent = 0;
221 fSrvPID = -1;
222 fID = -1;
223 fIsShutdown = false;
224 fIsValid = false;
225 fSkipCheck = false;
226 fProtVer = -1;
227 fNClients = 0;
228 fClients.clear();
229 fDisconnectTime = -1;
230 fSetIdleTime = -1;
231 fROOT = 0;
232 // Cleanup worker info
233 ClearWorkers();
234 // ClearWorkers depends on the fSrvType and fStatus
236 fPLiteNWrks = -1;
238 // Cleanup queries info
239 fQueries.clear();
240 // Strings
241 fAdminPath = "";
242 fAlias = "";
243 fClient = "";
244 fFileout = "";
245 fGroup = "";
246 fOrdinal = "";
247 fTag = "";
248 fUserEnvs = "";
250}
251
252////////////////////////////////////////////////////////////////////////////////
253/// Delete the current UNIX socket
254
256{
258 unlink(fUNIXSockPath.c_str());
259 fUNIXSockPath = "";
260}
261
262////////////////////////////////////////////////////////////////////////////////
263/// Return the value of fSkipCheck and reset it to false
264
266{
268
269 bool rc = fSkipCheck;
270 fSkipCheck = false;
271 return rc;
272}
273
274////////////////////////////////////////////////////////////////////////////////
275/// Get instance corresponding to cid
276
278{
279 XPDLOC(SMGR, "ProofServ::GetClientID")
280
281 XrdClientID *csid = 0;
282
283 if (cid < 0) {
284 TRACE(XERR, "negative ID: protocol error!");
285 return csid;
286 }
287
288 XrdOucString msg;
290
291 // Count new attached client
292 fNClients++;
293
294 // If in the allocate range reset the corresponding instance and
295 // return it
296 if (cid < (int)fClients.size()) {
297 csid = fClients.at(cid);
298 csid->Reset();
299
300 // Notification message
301 if (TRACING(DBG)) {
302 XPDFORM(msg, "cid: %d, size: %d", cid, fClients.size());
303 }
304 }
305
306 if (!csid) {
307 // If not, allocate a new one; we need to resize (double it)
308 if (cid >= (int)fClients.capacity())
309 fClients.reserve(2*fClients.capacity());
310
311 // Allocate new elements (for fast access we need all of them)
312 int ic = (int)fClients.size();
313 for (; ic <= cid; ic++)
314 fClients.push_back((csid = new XrdClientID()));
315
316 // Notification message
317 if (TRACING(DBG)) {
318 XPDFORM(msg, "cid: %d, new size: %d", cid, fClients.size());
319 }
320 }
321 }
322 TRACE(DBG, msg);
323
324 // We are done
325 return csid;
326}
327
328////////////////////////////////////////////////////////////////////////////////
329/// Free instance corresponding to protocol connecting process 'pid'
330
332{
333 XPDLOC(SMGR, "ProofServ::FreeClientID")
334
335 TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
336 fStatus<<", # clients: "<< fNClients);
337 int rc = -1;
338 if (pid <= 0) {
339 TRACE(XERR, "undefined pid!");
340 return rc;
341 }
342 if (!IsValid()) return rc;
343
345
346 // Remove this from the list of clients
347 std::vector<XrdClientID *>::iterator i;
348 for (i = fClients.begin(); i != fClients.end(); ++i) {
349 if ((*i) && (*i)->P()) {
350 if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
351 if (fProtocol == (*i)->P()) {
352 SetProtocol(0);
353 SetConnection(0);
354 }
355 (*i)->Reset();
356 if (fParent == (*i)) SetParent(0);
357 fNClients--;
358 // Record time of last disconnection
359 if (fNClients <= 0)
360 fDisconnectTime = time(0);
361 rc = 0;
362 break;
363 }
364 }
365 }
366 }
367 if (TRACING(REQ) && (rc == 0)) {
368 int spid = SrvPID();
369 TRACE(REQ, spid<<": slot for client pid: "<<pid<<" has been reset");
370 }
371
372 // Out of range
373 return rc;
374}
375
376////////////////////////////////////////////////////////////////////////////////
377/// Get the number of connected clients. If check is true check that
378/// they are still valid ones and free the slots for the invalid ones
379
381{
383
384 if (check) {
385 fNClients = 0;
386 // Remove this from the list of clients
387 std::vector<XrdClientID *>::iterator i;
388 for (i = fClients.begin(); i != fClients.end(); ++i) {
389 if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
390 }
391 }
392
393 // Done
394 return fNClients;
395}
396
397////////////////////////////////////////////////////////////////////////////////
398/// Return the time (in secs) all clients have been disconnected.
399/// Return -1 if the session is running
400
402{
404
405 int disct = -1;
406 if (fDisconnectTime > 0)
407 disct = time(0) - fDisconnectTime;
408 return ((disct > 0) ? disct : -1);
409}
410
411////////////////////////////////////////////////////////////////////////////////
412/// Return the time (in secs) the session has been idle.
413/// Return -1 if the session is running
414
416{
418
419 int idlet = -1;
420 if (fStatus == kXPD_idle)
421 idlet = time(0) - fSetIdleTime;
422 return ((idlet > 0) ? idlet : -1);
423}
424
425////////////////////////////////////////////////////////////////////////////////
426/// Set status to idle and update the related time stamp
427///
428
430{
432
434 fSetIdleTime = time(0);
435}
436
437////////////////////////////////////////////////////////////////////////////////
438/// Set status to running and reset the related time stamp
439///
440
442{
444
446 fSetIdleTime = -1;
447}
448
449////////////////////////////////////////////////////////////////////////////////
450/// Broadcast message 'msg' at 'type' to the attached clients
451
452void XrdProofdProofServ::Broadcast(const char *msg, int type)
453{
454 XPDLOC(SMGR, "ProofServ::Broadcast")
455
456 // Backward-compatibility check
457 int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
458
459 XrdOucString m;
460 int len = 0, nc = 0;
461 if (msg && (len = strlen(msg)) > 0) {
462 XrdProofdProtocol *p = 0;
463 int ic = 0, ncz = 0, sid = -1;
464 { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
465 for (ic = 0; ic < ncz; ic++) {
467 p = fClients.at(ic)->P();
468 sid = fClients.at(ic)->Sid(); }
469 // Send message
470 if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
471 XrdProofdResponse *response = p->Response(sid);
472 if (response) {
473 response->Send(kXR_attn, (XProofActionCode)type, (void *)msg, len);
474 nc++;
475 } else {
476 XPDFORM(m, "response instance for sid: %d not found", sid);
477 }
478 }
479 if (m.length() > 0)
480 TRACE(XERR, m);
481 m = "";
482 }
483 }
484 if (TRACING(DBG)) {
485 XPDFORM(m, "type: %d, message: '%s' notified to %d clients", type, msg, nc);
486 XPDPRT(m);
487 }
488}
489
490////////////////////////////////////////////////////////////////////////////////
491/// Terminate the associated process.
492/// A shutdown interrupt message is forwarded.
493/// If add is TRUE (default) the pid is added to the list of processes
494/// requested to terminate.
495/// Return the pid of tyhe terminated process on success, -1 if not allowed
496/// or other errors occured.
497
499{
500 XPDLOC(SMGR, "ProofServ::TerminateProofServ")
501
502 int pid = fSrvPID;
503 TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
504
505 // Send a terminate signal to the proofserv
506 if (pid > -1) {
507 XrdProofUI ui;
509 if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
510 TRACE(XERR, "ord: problems signalling process: "<<fSrvPID);
511 }
513 fIsShutdown = true;
514 }
515
516 // Failed
517 return -1;
518}
519
520////////////////////////////////////////////////////////////////////////////////
521/// Check if the associated proofserv process is alive. This is done
522/// asynchronously by asking the process to callback and proof its vitality.
523/// We do not block here: the caller may setup a waiting structure if
524/// required.
525/// If forward is true, the process will forward the request to the following
526/// tiers.
527/// Return 0 if the request was send successfully, -1 in case of error.
528
530{
531 XPDLOC(SMGR, "ProofServ::VerifyProofServ")
532
533 TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
534
535 int rc = 0;
536 XrdOucString msg;
537
538 // Prepare buffer
539 int len = sizeof(kXR_int32);
540 char *buf = new char[len];
541 // Option
542 kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
543 ifw = static_cast<kXR_int32>(htonl(ifw));
544 memcpy(buf, &ifw, sizeof(kXR_int32));
545
547 // Propagate the ping request
548 if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
549 msg = "could not propagate ping to proofsrv";
550 rc = -1;
551 }
552 }
553 // Cleanup
554 delete[] buf;
555
556 // Notify errors, if any
557 if (rc != 0)
558 TRACE(XERR, msg);
559
560 // Done
561 return rc;
562}
563
564////////////////////////////////////////////////////////////////////////////////
565/// Broadcast a new group priority value to the worker servers.
566/// Called by masters.
567
569{
570 XPDLOC(SMGR, "ProofServ::BroadcastPriority")
571
573
574 // Prepare buffer
575 int len = sizeof(kXR_int32);
576 char *buf = new char[len];
577 kXR_int32 itmp = priority;
578 itmp = static_cast<kXR_int32>(htonl(itmp));
579 memcpy(buf, &itmp, sizeof(kXR_int32));
580 // Send over
581 if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
582 // Failure
583 TRACE(XERR,"problems telling proofserv");
584 SafeDelArray(buf);
585 return -1;
586 }
587 SafeDelArray(buf);
588 TRACE(DBG, "priority "<<priority<<" sent over");
589 // Done
590 return 0;
591}
592
593////////////////////////////////////////////////////////////////////////////////
594/// Send data to client cid.
595
596int XrdProofdProofServ::SendData(int cid, void *buff, int len)
597{
598 XPDLOC(SMGR, "ProofServ::SendData")
599
600 TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
601
602 int rs = 0;
603 XrdOucString msg;
604
605 // Get corresponding instance
606 XrdClientID *csid = 0;
608 if (cid < 0 || cid > (int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
609 XPDFORM(msg, "client ID not found (cid: %d, size: %d)", cid, fClients.size());
610 rs = -1;
611 }
612 if (!rs && !(csid->R())) {
613 XPDFORM(msg, "client not connected: csid: %p, cid: %d, fSid: %d",
614 csid, cid, csid->Sid());
615 rs = -1;
616 }
617 }
618
619 //
620 // The message is strictly for the client requiring it
621 if (!rs) {
622 rs = -1;
623 XrdProofdResponse *response = csid->R() ? csid->R() : 0;
624 if (response)
625 if (!response->Send(kXR_attn, kXPD_msg, buff, len))
626 rs = 0;
627 } else {
628 // Notify error
629 TRACE(XERR, msg);
630 }
631
632 // Done
633 return rs;
634}
635
636////////////////////////////////////////////////////////////////////////////////
637/// Send data over the open client links of this session.
638/// Used when all the connected clients are eligible to receive the message.
639
640int XrdProofdProofServ::SendDataN(void *buff, int len)
641{
642 XPDLOC(SMGR, "ProofServ::SendDataN")
643
644 TRACE(HDBG, "length: "<<len<<" bytes");
645
646 XrdOucString msg;
647
649
650 // Send to connected clients
651 XrdClientID *csid = 0;
652 int ic = 0;
653 for (ic = 0; ic < (int) fClients.size(); ic++) {
654 if ((csid = fClients.at(ic)) && csid->P()) {
655 XrdProofdResponse *resp = csid->R();
656 if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
657 return -1;
658 }
659 }
660
661 // Done
662 return 0;
663}
664
665////////////////////////////////////////////////////////////////////////////////
666/// Fill buf with relevant info about this session
667
668void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
669{
670 XPDLOC(SMGR, "ProofServ::ExportBuf")
671
672 buf = "";
673 int id, status, nc;
674 XrdOucString tag, alias;
676 id = fID;
677 status = fStatus;
678 nc = fNClients;
679 tag = fTag;
680 alias = fAlias; }
681 XPDFORM(buf, " | %d %s %s %d %d", id, tag.c_str(), alias.c_str(), status, nc);
682 TRACE(HDBG, "buf: "<< buf);
683
684 // Done
685 return;
686}
687
688////////////////////////////////////////////////////////////////////////////////
689/// Create UNIX socket for internal connections
690
692{
693 XPDLOC(SMGR, "ProofServ::CreateUNIXSock")
694
695 TRACE(DBG, "enter");
696
697 // Make sure we do not have already a socket
698 if (fUNIXSock) {
699 TRACE(DBG,"UNIX socket exists already! ("<<fUNIXSockPath<<")");
700 return 0;
701 }
702
703 // Create socket
704 fUNIXSock = new XrdNet(edest);
705
706 // Make sure the admin path exists
707 if (fAdminPath.length() > 0) {
708 FILE *fadm = fopen(fAdminPath.c_str(), "a");
709 if (fadm) {
710 fclose(fadm);
711 } else {
712 TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
713 return -1;
714 }
715 }
716
717 // Check the path
718 bool ok = 0;
719 if (unlink(fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
720 XPDPRT("WARNING: path exists: unable to delete it:"
721 " try to use it anyway " <<fUNIXSockPath);
722 ok = 1;
723 }
724
725 // Create the path
726 int fd = 0;
727 if (!ok) {
728 if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
729 TRACE(XERR, "unable to create path: " <<fUNIXSockPath);
730 return -1;
731 }
732 close(fd);
733 }
734 if (fd > -1) {
735 // Change ownership
736 if (fUNIXSock->Bind((char *)fUNIXSockPath.c_str())) {
737 TRACE(XERR, " problems binding to UNIX socket; path: " <<fUNIXSockPath);
738 return -1;
739 } else
740 TRACE(DBG, "path for UNIX for socket is " <<fUNIXSockPath);
741 } else {
742 TRACE(XERR, "unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
743 return -1;
744 }
745
746 // Change ownership if running as super-user
747 if (!geteuid()) {
748 XrdProofUI ui;
750 if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
751 TRACE(XERR, "unable to change ownership of the UNIX socket"<<fUNIXSockPath);
752 return -1;
753 }
754 }
755
756 // We are done
757 return 0;
758}
759
760////////////////////////////////////////////////////////////////////////////////
761/// Set the admin path and make sure the file exists
762
763int XrdProofdProofServ::SetAdminPath(const char *a, bool assert, bool setown)
764{
765 XPDLOC(SMGR, "ProofServ::SetAdminPath")
766
768
769 fAdminPath = a;
770
771 // If we are not asked to assert the file we are done
772 if (!assert) return 0;
773
774 // Check if the session file exists
775 FILE *fpid = fopen(a, "a");
776 if (fpid) {
777 fclose(fpid);
778 } else {
779 TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
780 return -1;
781 }
782
783 // Check if the status file exists
784 XrdOucString fn;
785 XPDFORM(fn, "%s.status", a);
786 if ((fpid = fopen(fn.c_str(), "a"))) {
787 fprintf(fpid, "%d", fStatus);
788 fclose(fpid);
789 } else {
790 TRACE(XERR, "unable to open / create status path "<< fn << "; errno = "<<errno);
791 return -1;
792 }
793
794 if (setown) {
795 // Set the ownership of the status file to the user
796 XrdProofUI ui;
797 if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
798 TRACE(XERR, "unable to get info for user "<<fClient<<"; errno = "<<errno);
799 return -1;
800 }
801 if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
802 TRACE(XERR, "unable to give ownership of the status file "<< fn << " to user; errno = "<<errno);
803 return -1;
804 }
805 }
806
807 // Done
808 return 0;
809}
810
811////////////////////////////////////////////////////////////////////////////////
812/// Send a resume message to the this session. It is assumed that the session
813/// has at least one async query to process and will immediately send
814/// a getworkers request (the workers are already assigned).
815
817{
818 XPDLOC(SMGR, "ProofServ::Resume")
819
820 TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
821
822 int rc = 0;
823 XrdOucString msg;
824
826 //
827 if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
828 msg = "could not propagate resume to proofsrv";
829 rc = -1;
830 }
831 }
832
833 // Notify errors, if any
834 if (rc != 0)
835 TRACE(XERR, msg);
836
837 // Done
838 return rc;
839}
840
841////////////////////////////////////////////////////////////////////////////////
842/// Decrease active session counters on worker w
843
844static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
845{
846 XPDLOC(PMGR, "ExportWorkerDescription")
847
848 XrdOucString *wrks = (XrdOucString *)s;
849 if (w && wrks) {
850 // Master at the beginning
851 if (w->fType == 'M') {
852 if (wrks->length() > 0) wrks->insert('&',0);
853 wrks->insert(w->Export(), 0);
854 } else {
855 // Add separator if not the first
856 if (wrks->length() > 0)
857 (*wrks) += '&';
858 // Add export version of the info
859 (*wrks) += w->Export(k);
860 }
861 TRACE(HDBG, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
862 // Check next
863 return 0;
864 }
865
866 // Not enough info: stop
867 return 1;
868}
869
870////////////////////////////////////////////////////////////////////////////////
871/// Export the assigned workers in the format understood by proofserv
872
873void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
874{
876 wrks = "";
877 fWorkers.Apply(ExportWorkerDescription, (void *)&wrks);
878}
879
880////////////////////////////////////////////////////////////////////////////////
881/// Export the assigned workers in the format understood by proofserv
882
884{
885 XPDLOC(PMGR, "DumpQueries")
886
888
889 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
890 TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
891 ", # of queries: "<< fQueries.size());
892 std::list<XrdProofQuery *>::iterator ii;
893 int i = 0;
894 for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
895 i++;
896 TRACE(ALL," +++ #"<<i<<" tag:"<< (*ii)->GetTag()<<" dset: "<<
897 (*ii)->GetDSName()<<" size:"<<(*ii)->GetDSSize());
898 }
899 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
900}
901
902////////////////////////////////////////////////////////////////////////////////
903/// Get query with tag form the list of queries
904
906{
907 XrdProofQuery *q = 0;
908 if (!tag || strlen(tag) <= 0) return q;
909
911
912 if (fQueries.size() <= 0) return q;
913
914 std::list<XrdProofQuery *>::iterator ii;
915 for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
916 q = *ii;
917 if (!strcmp(tag, q->GetTag())) break;
918 q = 0;
919 }
920 // Done
921 return q;
922}
923
924////////////////////////////////////////////////////////////////////////////////
925/// remove query with tag form the list of queries
926
928{
929 XrdProofQuery *q = 0;
930 if (!tag || strlen(tag) <= 0) return;
931
933
934 if (fQueries.size() <= 0) return;
935
936 std::list<XrdProofQuery *>::iterator ii;
937 for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
938 q = *ii;
939 if (!strcmp(tag, q->GetTag())) break;
940 q = 0;
941 }
942 // remove it
943 if (q) {
944 fQueries.remove(q);
945 delete q;
946 }
947
948 // Done
949 return;
950}
951
952////////////////////////////////////////////////////////////////////////////////
953/// Decrease active session counters on worker w
954
955static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
956{
957 int *actw = (int *)s;
958 if (w && actw) {
959 *actw += w->GetNActiveSessions();
960 // Check next
961 return 0;
962 }
963
964 // Not enough info: stop
965 return 1;
966}
967
968////////////////////////////////////////////////////////////////////////////////
969/// Calculate the effective number of users on this session nodes
970/// and communicate it to the master together with the total number
971/// of sessions and the number of active sessions. for monitoring issues.
972
973void XrdProofdProofServ::SendClusterInfo(int nsess, int nacti)
974{
975 XPDLOC(PMGR, "SendClusterInfo")
976
977 // Only if we are active
978 if (fWorkers.Num() <= 0) return;
979
980 int actw = 0;
981 fWorkers.Apply(CountEffectiveSessions, (void *)&actw);
982 // The number of effective sessions * 1000
983 int neffs = (actw*1000)/fWorkers.Num();
984 TRACE(DBG, "# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.);
985
987
988 // Prepare buffer
989 int len = 3*sizeof(kXR_int32);
990 char *buf = new char[len];
991 kXR_int32 off = 0;
992 kXR_int32 itmp = nsess;
993 itmp = static_cast<kXR_int32>(htonl(itmp));
994 memcpy(buf + off, &itmp, sizeof(kXR_int32));
995 off += sizeof(kXR_int32);
996 itmp = nacti;
997 itmp = static_cast<kXR_int32>(htonl(itmp));
998 memcpy(buf + off, &itmp, sizeof(kXR_int32));
999 off += sizeof(kXR_int32);
1000 itmp = neffs;
1001 itmp = static_cast<kXR_int32>(htonl(itmp));
1002 memcpy(buf + off, &itmp, sizeof(kXR_int32));
1003 // Send over
1004 if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
1005 // Failure
1006 TRACE(XERR,"problems sending proofserv");
1007 }
1008 SafeDelArray(buf);
1009}
1010
1011////////////////////////////////////////////////////////////////////////////////
1012/// Calculate the effective number of users on this session nodes
1013/// and communicate it to the master together with the total number
1014/// of sessions and the number of active sessions. for monitoring issues.
1015
1016int XrdProofdProofServ::CheckSession(bool oldvers, bool isrec,
1017 int shutopt, int shutdel, bool changeown, int &nc)
1018{
1019 XPDLOC(PMGR, "SendClusterInfo")
1020
1021 XrdOucString emsg;
1022 bool rmsession = 0;
1023 nc = -1;
1025
1026 bool skipcheck = fSkipCheck;
1027 fSkipCheck = false;
1028
1029 if (!skipcheck || oldvers) {
1030 nc = 0;
1031 // Remove this from the list of clients
1032 std::vector<XrdClientID *>::iterator i;
1033 for (i = fClients.begin(); i != fClients.end(); ++i) {
1034 if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
1035 }
1036 // Check if we need to shutdown it
1037 if (nc <= 0 && (!isrec || oldvers)) {
1038 int idlet = -1, disct = -1, now = time(0);
1039 if (fStatus == kXPD_idle)
1040 idlet = now - fSetIdleTime;
1041 if (idlet <= 0) idlet = -1;
1042 if (fDisconnectTime > 0)
1043 disct = now - fDisconnectTime;
1044 if (disct <= 0) disct = -1;
1045 if ((fSrvType != kXPD_TopMaster) ||
1046 (shutopt == 1 && (idlet >= shutdel)) ||
1047 (shutopt == 2 && (disct >= shutdel))) {
1048 // Send a terminate signal to the proofserv
1049 if (fSrvPID > -1) {
1050 XrdProofUI ui;
1052 if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
1053 XPDFORM(emsg, "ord: problems signalling process: %d", fSrvPID);
1054 }
1055 fIsShutdown = true;
1056 }
1057 rmsession = 1;
1058 }
1059 }
1060 }
1061 }
1062 // Notify error, if any
1063 if (emsg.length() > 0) {
1064 TRACE(XERR,emsg.c_str());
1065 }
1066 // Done
1067 return rmsession;
1068}
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
XFontStruct * id
Definition: TGX11.cxx:108
int type
Definition: TGX11.cxx:120
float * q
Definition: THbookFile.cxx:87
@ kXPD_running
@ kXPD_idle
#define kXPD_TopMaster
#define kXPD_AnyServer
XProofActionCode
@ kXPD_msg
@ kXPD_wrkmortem
@ kXPD_resume
@ kXPD_clusterinfo
@ kXPD_ping
@ kXPD_priority
#define XrdSysError
Definition: XpdSysError.h:8
#define XPDFORM
Definition: XrdProofdAux.h:378
#define SafeDel(x)
Definition: XrdProofdAux.h:332
#define SafeDelArray(x)
Definition: XrdProofdAux.h:335
static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
Decrease active session counters on worker w.
static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
Decrease active session counters on worker w.
static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
Decrease active session counters on worker w.
#define XPD_CLNT_VERSION_OK(p, v)
#define XPDPRT(x)
#define XPDLOC(d, x)
#define TRACING(x)
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
#define XrdSysRecMutex
Definition: XrdSysToOuc.h:18
XrdProofdResponse * R() const
unsigned short Sid() const
XrdProofdProtocol * P() const
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
void RemoveProofServ(XrdProofdProofServ *xps)
int Active() const
XrdOucString fHost
const char * Export(const char *ord=0)
Export current content in a form understood by parsing algorithms inside the PROOF session,...
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
int Resume()
Send a resume message to the this session.
XrdOucHash< XrdProofWorker > fWorkers
XrdSrvBuffer * fStartMsg
void SetIdle()
Set status to idle and update the related time stamp.
void ClearWorkers()
Decrease worker counters and clean-up the list.
void AddWorker(const char *o, XrdProofWorker *w)
Add a worker assigned to this session with label 'o'.
XrdSysRecMutex * fMutex
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
void DeleteUNIXSock()
Delete the current UNIX socket.
void SetConnection(XrdProofdResponse *r)
int DisconnectTime()
Return the time (in secs) all clients have been disconnected.
std::list< XrdProofQuery * > fQueries
void ExportBuf(XrdOucString &buf)
Fill buf with relevant info about this session.
void SetProtocol(XrdProofdProtocol *p)
XrdProofdProofServ()
Constructor.
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
int SendData(int cid, void *buff, int len)
Send data to client cid.
XrdSysSemWait * fPingSem
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
int TerminateProofServ(bool changeown)
Terminate the associated process.
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
int SendDataN(void *buff, int len)
Send data over the open client links of this session.
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
XrdOucString fUNIXSockPath
void SendClusterInfo(int nsess, int nacti)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdProtocol * fProtocol
void ExportWorkers(XrdOucString &wrks)
Export the assigned workers in the format understood by proofserv.
int GetNClients(bool check)
Get the number of connected clients.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
void Reset()
Reset this instance.
~XrdProofdProofServ()
Destructor.
void Broadcast(const char *msg, int type=kXPD_srvmsg)
Broadcast message 'msg' at 'type' to the attached clients.
std::vector< XrdClientID * > fClients
int IdleTime()
Return the time (in secs) the session has been idle.
XrdProofdResponse * fResponse
bool SkipCheck()
Return the value of fSkipCheck and reset it to false.
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
void SetRunning()
Set status to running and reset the related time stamp.
void SetParent(XrdClientID *cid)
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdResponse * Response(kXR_unt16 rid)
Get response instance corresponding to stream ID 'sid'.
int Send(void)
Auxilliary Send method.
TLine * line
Double_t x[n]
Definition: legend1.C:17
static constexpr double s
void forward(const LAYERDATA &prevLayerData, LAYERDATA &currLayerData)
apply the weights (and functions) in forward direction of the DNN
Definition: NeuralNet.icc:544
int changeown(const std::string &path, uid_t u, gid_t g)
Change the ownership of 'path' to the entity described by {u,g}.
Definition: proofexecv.cxx:738
auto * m
Definition: textangle.C:8
auto * a
Definition: textangle.C:12