Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
XrdProofdNetMgr.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis Jan 2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11#include "XrdProofdPlatform.h"
12
13//////////////////////////////////////////////////////////////////////////
14// //
15// XrdProofdNetMgr //
16// //
17// Authors: G. Ganis, CERN, 2008 //
18// //
19// Manages connections between PROOF server daemons //
20// //
21//////////////////////////////////////////////////////////////////////////
22
23#include "XrdProofdNetMgr.h"
24
25#include "XrdProofdXrdVers.h"
26#ifndef ROOT_XrdFour
27# include "XpdSysDNS.h"
28#else
29# include "XrdNet/XrdNetAddr.hh"
30#endif
31#include "Xrd/XrdBuffer.hh"
36#include "XrdOuc/XrdOucStream.hh"
37#include "XrdSys/XrdSysPlatform.hh"
38
39#include "XrdProofdClient.h"
40#include "XrdProofdManager.h"
41#include "XrdProofdProtocol.h"
42#include "XrdProofdResponse.h"
43#include "XrdProofWorker.h"
44
45// Tracing utilities
46#include "XrdProofdTrace.h"
47
48#include <algorithm>
49#include <limits>
50#include <math.h>
51
52////////////////////////////////////////////////////////////////////////////////
53/// Send up a message from the server
54
55int MessageSender(const char *msg, int len, void *arg)
56{
58 if (r) {
59 return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
60 }
61 return -1;
62}
63
64////////////////////////////////////////////////////////////////////////////////
65/// Constructor
66
70{
71 fMgr = mgr;
73 fPROOFcfg.fName = "";
74 fPROOFcfg.fMtime = -1;
76 fDfltFallback = 0;
77 fDfltWorkers.clear();
78 fRegWorkers.clear();
79 fWorkers.clear();
80 fNodes.clear();
82 fWorkerUsrCfg = 0;
83 fRequestTO = 30;
84
85 // Configuration directives
87}
88
89////////////////////////////////////////////////////////////////////////////////
90/// Register config directives
91
93{
94 Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
95 Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
96 Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
97 Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
98}
99
100////////////////////////////////////////////////////////////////////////////////
101/// Destructor
102
104{
105 // Cleanup the worker lists
106 // (the nodes list points to the same object, no cleanup is needed)
107 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
108 while (w != fRegWorkers.end()) {
109 delete *w;
110 w = fRegWorkers.erase(w);
111 }
112 w = fDfltWorkers.begin();
113 while (w != fDfltWorkers.end()) {
114 delete *w;
115 w = fDfltWorkers.erase(w);
116 }
117 fWorkers.clear();
118}
119
120////////////////////////////////////////////////////////////////////////////////
121/// Run configuration and parse the entered config directives.
122/// Return 0 on success, -1 on error
123
125{
126 XPDLOC(NMGR, "NetMgr::Config")
127
128 // Lock the method to protect the lists.
130
131 // Cleanup the worker list
132 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
133 while (w != fWorkers.end()) {
134 delete *w;
135 w = fWorkers.erase(w);
136 }
137 // Create a default master line
138 XrdOucString mm("master ", 128);
139 mm += fMgr->Host();
140 mm += " port=";
141 mm += fMgr->Port();
142 fWorkers.push_back(new XrdProofWorker(mm.c_str()));
143
144 // Run first the configurator
145 if (XrdProofdConfig::Config(rcf) != 0) {
146 XPDERR("problems parsing file ");
147 return -1;
148 }
149
150 XrdOucString msg;
151 msg = (rcf) ? "re-configuring" : "configuring";
152 TRACE(ALL, msg);
153
154 if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
155 TRACE(ALL, "PROOF config file: " <<
156 ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
157 if (fResourceType == kRTStatic) {
158 // Initialize the list of workers if a static config has been required
159 // Default file path, if none specified
160 bool dodefault = 1;
161 if (fPROOFcfg.fName.length() > 0) {
162 // Load file content in memory
163 if (ReadPROOFcfg() == 0) {
164 TRACE(ALL, "PROOF config file will " <<
165 ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
166 dodefault = 0;
167 } else {
168 if (!fDfltFallback) {
169 XPDERR("unable to find valid information in PROOF config file " <<
171 fPROOFcfg.fMtime = -1;
172 return 0;
173 } else {
174 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
175 }
176 }
177 }
178 if (dodefault) {
179 // Use default
181 }
182 } else if (fResourceType == kRTNone && fWorkers.size() <= 1) {
183 // Nothing defined: use default
185 }
186
187 // Find unique nodes
189 }
190
191 // For connection to the other xproofds we try only once
193 // Request Timeout
195
196 // Notification
197 XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
198 TRACE(ALL, msg);
199
200 // Done
201 return 0;
202}
203
204////////////////////////////////////////////////////////////////////////////////
205/// Update the priorities of the active sessions.
206
208 char *val, XrdOucStream *cfg, bool rcf)
209{
210 XPDLOC(NMGR, "NetMgr::DoDirective")
211
212 if (!d)
213 // undefined inputs
214 return -1;
215
216 if (d->fName == "resource") {
217 return DoDirectiveResource(val, cfg, rcf);
218 } else if (d->fName == "adminreqto") {
219 return DoDirectiveAdminReqTO(val, cfg, rcf);
220 } else if (d->fName == "worker") {
221 return DoDirectiveWorker(val, cfg, rcf);
222 }
223
224 TRACE(XERR, "unknown directive: " << d->fName);
225
226 return -1;
227}
228
229////////////////////////////////////////////////////////////////////////////////
230/// Indices (this will be used twice).
231
233{
234 list<XrdProofWorker *>::const_iterator iter, iter2;
235 list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
236 // Map to store information of the balancer.
238 // Node with minimum number of workers distinct to 1.
239 unsigned int min = UINT_MAX;
240 // Total number of nodes and per iteration assignments.
241 unsigned int total = 0, total_perit = 0;
242 // Number of iterations to get every node filled.
243 unsigned int total_added = 0;
244 // Temporary list to store the balanced configuration
246 // Flag for the search and destroy loop.
247 bool deleted;
248
249 // Fill the information store with the first data (number of nodes).
250 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
251 // The next code is not the same as this:
252 //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
253 // The previous piece of STL code only checks the pointer of the value
254 // stored on the list, altough it is more efficient, it needs that repeated
255 // nodes point to the same object. To allow hybrid configurations, we are
256 // doing a 'manually' matching since statically configured nodes are
257 // created in multiple ways.
258 info[*iter].available = 0;
259 for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
260 if ((*iter)->Matches(*iter2)) {
261 info[*iter].available++;
262 }
263 }
264 info[*iter].added = 0;
265 // Calculate the minimum greater than 1.
266 if (info[*iter].available > 1 && info[*iter].available < min)
267 min = info[*iter].available;
268 // Calculate the totals.
269 total += info[*iter].available;
270 }
271
272 // Now, calculate the number of workers to add in each iteration of the
273 // round robin, scaling to the smaller number.
274 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
275 if (info[*iter].available > 1) {
276 info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
277 } else {
278 info[*iter].per_iteration = 1;
279 }
280 // Calculate the totals.
281 total_perit += info[*iter].per_iteration;
282 }
283
284 // Since we are going to substitute the list, don't forget to recover the
285 // default node at the fist time.
286 tempNodes.push_back(fWorkers.front());
287
288 // Finally, do the round robin assignment of nodes.
289 // Stop when every node has its workers processed.
290 while (total_added < total) {
291 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
292 if (i->second.added < i->second.available) {
293 // Be careful with the remainders (on prime number of nodes).
294 unsigned int to_add = xrdmin(i->second.per_iteration,
295 (i->second.available - i->second.added));
296 // Then add the nodes.
297 for (unsigned int j = 0; j < to_add; j++) {
298 tempNodes.push_back(i->first);
299 }
300 i->second.added += to_add;
302 }
303 }
304 }
305
306 // Since we are mergin nodes in only one object, we must merge the current
307 // sessions of the static nodes (that can be distinct objects that represent
308 // the same node) and delete the orphaned objects. If, in the future, we can
309 // assure that every worker has only one object in the list, this is not more
310 // necessary. The things needed to change are the DoDirectiveWorker, it must
311 // search for a node before inserting it, and in the repeat directive insert
312 // the same node always. Also the default configuration methods (there are
313 // two in this class) must be updated.
314 iter3 = ++(fWorkers.begin());
315 while (iter3 != fWorkers.end()) {
316 deleted = false;
317 // If the worker is not in the fWorkers list, we must process it. Note that
318 // std::count() uses a plain comparison between values, in this case, we
319 // are comparing pointers (numbers, at the end).
320 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
321 // Search for an object that matches with this in the temp list.
322 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
323 if ((*iter2)->Matches(*iter3)) {
324 // Copy data and delete the *iter object.
325 (*iter2)->MergeProofServs(*(*iter3));
326 deleted = true;
327 delete *iter3;
328 fWorkers.erase(iter3++);
329 break;
330 }
331 }
332 }
333 // Do not forget to increase the iterator.
334 if (!deleted)
335 ++iter3;
336 }
337
338 // Then, substitute the current fWorkers list with the balanced one.
340}
341
342////////////////////////////////////////////////////////////////////////////////
343/// Process 'adminreqto' directive
344
346{
347 if (!val)
348 // undefined inputs
349 return -1;
350
351 // Check deprecated 'if' directive
352 if (fMgr->Host() && cfg)
353 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
354 return 0;
355
356 // Timeout on requested broadcasted to workers; there are 4 attempts,
357 // so the real timeout is 4 x fRequestTO
358 int to = strtol(val, 0, 10);
359 fRequestTO = (to > 0) ? to : fRequestTO;
360 return 0;
361}
362
363////////////////////////////////////////////////////////////////////////////////
364/// Process 'resource' directive
365
367{
368 XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
369
370 if (!val || !cfg)
371 // undefined inputs
372 return -1;
373
374 if (!strcmp("static", val)) {
375 // We just take the path of the config file here; the
376 // rest is used by the static scheduler
378 while ((val = cfg->GetWord()) && val[0]) {
379 XrdOucString s(val);
380 if (s.beginswith("ucfg:")) {
381 fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
382 } else if (s.beginswith("reload:")) {
383 fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
384 } else if (s.beginswith("dfltfallback:")) {
385 fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
386 } else if (s.beginswith("wmx:")) {
387 } else if (s.beginswith("selopt:")) {
388 } else {
389 // Config file
390 fPROOFcfg.fName = val;
391 if (fPROOFcfg.fName.beginswith("sm:")) {
392 fPROOFcfg.fName.replace("sm:", "");
393 }
395 // Make sure it exists and can be read
396 if (access(fPROOFcfg.fName.c_str(), R_OK)) {
397 if (errno == ENOENT) {
398 TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
399 } else {
400 TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
401 fPROOFcfg.fName = "";
402 fPROOFcfg.fMtime = -1;
403 }
404 }
405 }
406 }
407 }
408 return 0;
409}
410
411////////////////////////////////////////////////////////////////////////////////
412/// Process 'worker' directive
413
415{
416 XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
417
418 if (!val || !cfg)
419 // undefined inputs
420 return -1;
421
422 // Lock the method to protect the lists.
424
425 // Get the full line (w/o heading keyword)
426 cfg->RetToken();
427 XrdOucString wrd(cfg->GetWord());
428 if (wrd.length() > 0) {
429 // Build the line
430 XrdOucString line;
431 char rest[2048] = {0};
432 cfg->GetRest((char *)&rest[0], 2048);
433 XPDFORM(line, "%s %s", wrd.c_str(), rest);
434 // Parse it now
435 if (wrd == "master" || wrd == "node") {
436 // Init a master instance
437 XrdProofWorker *pw = new XrdProofWorker(line.c_str());
438 if (pw->fHost.beginswith("localhost") ||
439 pw->Matches(fMgr->Host())) {
440 // Replace the default line (the first with what found in the file)
441 XrdProofWorker *fw = fWorkers.front();
442 fw->Reset(line.c_str());
443 }
444 SafeDelete(pw);
445 } else {
446 // How many lines like this?
447 int nr = 1;
448 int ir = line.find("repeat=");
449 if (ir != STR_NPOS) {
450 XrdOucString r(line, ir + strlen("repeat="));
451 r.erase(r.find(' '));
452 nr = r.atoi();
453 if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
454 TRACE(DBG, "found repeat = " << nr);
455 }
456 while (nr--) {
457 // Build the worker object
459 if (mline.IsValid()) {
460 TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
461 for (int i = 0; i < mline.N(); i++) {
462 TRACE(HDBG, "found token: " << mline.Get(i));
463 fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
464 }
465 } else {
466 TRACE(DBG, "found line: " << line);
467 fWorkers.push_back(new XrdProofWorker(line.c_str()));
468 }
469 }
470 }
471 }
472
473 // Necessary for the balancer when Bonjour is enabled. Note that this balancer
474 // can also be enabled with a static configuration. By this time is disabled
475 // due to its experimental status.
477 //BalanceNodesOrder();
478
479 return 0;
480}
481
482////////////////////////////////////////////////////////////////////////////////
483/// Broadcast a ctrlc interrupt
484/// Return 0 on success, -1 on error
485
487{
488 XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
489
490 int rc = 0;
491
492 // Loop over unique nodes
493 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
494 XrdProofWorker *w = 0;
495 while (iw != fNodes.end()) {
496 if ((w = *iw) && w->fType != 'M') {
497 // Do not send it to ourselves
498 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
499 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
500 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
501 if (!us) {
502 // Create 'url'
503 // We use the enforced username if specified in the config file; this is the case
504 // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
505 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
506 if (u.length() <= 0) u = fMgr->EffectiveUser();
507 u += '@';
508 u += w->fHost;
509 if (w->fPort != -1) {
510 u += ':';
511 u += w->fPort;
512 }
513 TRACE(HDBG, "sending request to: "<<u);
514 // Get a connection to the server
515 XrdProofConn *conn = GetProofConn(u.c_str());
516 if (conn && conn->IsValid()) {
517 // Prepare request
519 memset(&reqhdr, 0, sizeof(reqhdr));
520 conn->SetSID(reqhdr.header.streamid);
521 reqhdr.proof.requestid = kXP_ctrlc;
522 reqhdr.proof.sid = 0;
523 reqhdr.proof.dlen = 0;
524 // We need the right order
525 if (XPD::clientMarshall(&reqhdr) != 0) {
526 TRACE(XERR, "problems marshalling request");
527 return -1;
528 }
529 if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
530 TRACE(XERR, "problems sending ctrl-c request to server " << u);
531 }
532 // Clean it up, to avoid leaving open tcp connection possibly going forever
533 // into CLOSE_WAIT
534 SafeDelete(conn);
535 }
536 } else {
537 TRACE(DBG, "broadcast request for ourselves: ignore");
538 }
539 }
540 // Next worker
541 ++iw;
542 }
543
544 // Done
545 return rc;
546}
547
548////////////////////////////////////////////////////////////////////////////////
549/// Broadcast request to known potential sub-nodes.
550/// Return 0 on success, -1 on error
551
552int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
553 XrdProofdResponse *r, bool notify, int subtype)
554{
555 XPDLOC(NMGR, "NetMgr::Broadcast")
556
557 unsigned int nok = 0;
558 TRACE(REQ, "type: " << type);
559
560 // Loop over unique nodes
561 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
562 XrdProofWorker *w = 0;
564 while (iw != fNodes.end()) {
565 if ((w = *iw) && w->fType != 'M') {
566 // Do not send it to ourselves
567 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
568 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
569 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
570 if (!us) {
571 // Create 'url'
572 // We use the enforced username if specified in the config file; this is the case
573 // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
574 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
575 if (u.length() <= 0) u = fMgr->EffectiveUser();
576 u += '@';
577 u += w->fHost;
578 if (w->fPort != -1) {
579 u += ':';
580 u += w->fPort;
581 }
582 // Type of server
583 int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
584 : (kXR_int32) kXPD_Worker;
585 TRACE(HDBG, "sending request to " << u);
586 // Send request
587 if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
588 TRACE(XERR, "problems sending request to " << u);
589 } else {
590 nok++;
591 }
592 // Cleanup answer
594 } else {
595 TRACE(DBG, "broadcast request for ourselves: ignore");
596 }
597 }
598 // Next worker
599 ++iw;
600 }
601
602 // Done
603 return (nok == fNodes.size()) ? 0 : -1;
604}
605
606////////////////////////////////////////////////////////////////////////////////
607/// Get a XrdProofConn for url; create a new one if not available
608
610{
611 XrdProofConn *p = 0;
612
613 // If not found create a new one
614 XrdOucString buf = " Manager connection from ";
615 buf += fMgr->Host();
616 buf += "|ord:000";
617 char m = 'A'; // log as admin
618
619 {
621 p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
622 }
623 if (p && !(p->IsValid())) SafeDelete(p);
624
625 // Done
626 return p;
627}
628
629////////////////////////////////////////////////////////////////////////////////
630/// Broadcast request to known potential sub-nodes.
631/// Return 0 on success, -1 on error
632
634 const char *msg, int srvtype,
636 int subtype)
637{
638 XPDLOC(NMGR, "NetMgr::Send")
639
641 TRACE(REQ, "type: " << type);
642
643 if (!url || strlen(url) <= 0)
644 return xrsp;
645
646 // Get a connection to the server
648
649 bool ok = 1;
650 if (conn && conn->IsValid()) {
651 XrdOucString notifymsg("Send: ");
652 // Prepare request
654 const void *buf = 0;
655 char **vout = 0;
656 memset(&reqhdr, 0, sizeof(reqhdr));
657 conn->SetSID(reqhdr.header.streamid);
658 reqhdr.header.requestid = kXP_admin;
659 reqhdr.proof.int1 = type;
660 switch (type) {
661 case kROOTVersion:
662 notifymsg += "change-of-ROOT version request to ";
663 notifymsg += url;
664 notifymsg += " msg: ";
665 notifymsg += msg;
666 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
667 buf = (msg) ? (const void *)msg : buf;
668 break;
669 case kCleanupSessions:
670 notifymsg += "cleanup request to ";
671 notifymsg += url;
672 notifymsg += " for user: ";
673 notifymsg += msg;
674 reqhdr.proof.int2 = (kXR_int32) srvtype;
675 reqhdr.proof.sid = -1;
676 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
677 buf = (msg) ? (const void *)msg : buf;
678 break;
679 case kExec:
680 notifymsg += "exec ";
682 notifymsg += "request for ";
683 notifymsg += msg;
684 reqhdr.proof.int2 = (kXR_int32) subtype;
685 reqhdr.proof.sid = -1;
686 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
687 buf = (msg) ? (const void *)msg : buf;
688 break;
689 default:
690 ok = 0;
691 TRACE(XERR, "invalid request type " << type);
692 break;
693 }
694
695 // Notify the client that we are sending the request
696 if (r && notify)
697 r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
698
699 // Activate processing of unsolicited responses
700 conn->SetAsync(conn, &MessageSender, (void *)r);
701
702 // Send over
703 if (ok)
704 xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
705
706 // Deactivate processing of unsolicited responses
707 conn->SetAsync(0, 0, (void *)0);
708
709 // Print error msg, if any
710 if (r && !xrsp && conn->GetLastErr()) {
711 XrdOucString cmsg = url;
712 cmsg += ": ";
713 cmsg += conn->GetLastErr();
714 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
715 }
716 // Clean it up, to avoid leaving open tcp connection possibly going forever
717 // into CLOSE_WAIT
718 SafeDelete(conn);
719
720 } else {
721 TRACE(XERR, "could not open connection to " << url);
722 if (r) {
723 XrdOucString cmsg = "failure attempting connection to ";
724 cmsg += url;
725 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
726 }
727 }
728
729 // Done
730 return xrsp;
731}
732
733////////////////////////////////////////////////////////////////////////////////
734/// Check if 'host' is this local host. If checkport is true,
735/// matching of the local port with the one implied by host is also checked.
736/// Return 1 if 'local', 0 otherwise
737
738bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
739{
740 XPDLOC(NMGR, "NetMgr::IsLocal")
741
742 int rc = 0;
743 if (host && strlen(host) > 0) {
744 XrdClientUrlInfo uu(host);
745 if (uu.Port <= 0) uu.Port = 1093;
746 // Fully qualified name
747#ifndef ROOT_XrdFour
748 char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
749#else
751 aNA.Set(uu.Host.c_str());
752 char *fqn = (char *) aNA.Name();
753#endif
754 TRACE(HDBG, "fqn: '"<<fqn<<"' mgrh: '"<<fMgr->Host()<<"'");
755 if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
756 !strcmp(fMgr->Host(), fqn))) {
757 if (!checkport || (uu.Port == fMgr->Port()))
758 rc = 1;
759 }
760#ifndef ROOT_XrdFour
761 SafeFree(fqn);
762#endif
763 }
764 // Done
765 return rc;
766}
767
768////////////////////////////////////////////////////////////////////////////////
769/// Process a readbuf request
770
772{
773 XPDLOC(NMGR, "NetMgr::ReadBuffer")
774
775 int rc = 0;
776 XPD_SETRESP(p, "ReadBuffer");
777
778 XrdOucString emsg;
779
780 // Unmarshall the data
781 //
782 kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
783 int len = ntohl(p->Request()->readbuf.len);
784
785 // Find out the file name
786 char *file = 0;
787 char *filen = 0;
788 char *pattern = 0;
789 int dlen = p->Request()->header.dlen;
790 int grep = ntohl(p->Request()->readbuf.int1);
791 int blen = dlen;
792 bool local = 0;
793 if (dlen > 0 && p->Argp()->buff) {
794 file = new char[dlen+1];
795 memcpy(file, p->Argp()->buff, dlen);
796 file[dlen] = 0;
797 // Check if local
798 XrdClientUrlInfo ui(file);
799 if (ui.Host.length() > 0) {
800 // Check locality
801 local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
802 if (local) {
803 memcpy(file, ui.File.c_str(), ui.File.length());
804 file[ui.File.length()] = 0;
805 blen = ui.File.length();
806 TRACEP(p, DBG, "file is LOCAL");
807 }
808 }
809 // If grep, extract the pattern
810 if (grep > 0) {
811 // 'grep' operation: len is the length of the 'pattern' to be grepped
812 pattern = new char[len + 1];
813 int j = blen - len;
814 int i = 0;
815 while (j < blen)
816 pattern[i++] = file[j++];
817 pattern[i] = 0;
818 filen = strdup(file);
819 filen[blen - len] = 0;
820 TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
821 }
822 } else {
823 emsg = "file name not found";
824 TRACEP(p, XERR, emsg);
825 response->Send(kXR_InvalidRequest, emsg.c_str());
826 return 0;
827 }
828 if (grep) {
829 TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
830 ", pattern: " << pattern);
831 } else {
832 TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
833 }
834
835 // Get the buffer
836 int lout = len;
837 char *buf = 0;
838 if (local) {
839 if (grep > 0) {
840 // Grep local file
841 lout = blen; // initial length
842 buf = ReadBufferLocal(filen, pattern, lout, grep);
843 } else {
844 // Read portion of local file
845 buf = ReadBufferLocal(file, ofs, lout);
846 }
847 } else {
848 // Read portion of remote file
849 XrdClientUrlInfo u(file);
850 if (u.User.length() <= 0)
851 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
852 buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
853 }
854
855 bool sent = 0;
856 if (!buf) {
857 if (lout > 0) {
858 if (grep > 0) {
859 if (TRACING(DBG)) {
860 XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
861 TRACEP(p, DBG, emsg);
862 }
863 response->Send();
864 sent = 1;
865 } else {
866 XPDFORM(emsg, "could not read buffer from %s %s",
867 (local) ? "local file " : "remote file ", file);
868 TRACEP(p, XERR, emsg);
869 response->Send(kXR_InvalidRequest, emsg.c_str());
870 sent = 1;
871 }
872 } else {
873 // Just got an empty buffer
874 if (TRACING(DBG)) {
875 emsg = "nothing found in ";
876 emsg += (grep > 0) ? filen : file;
877 TRACEP(p, DBG, emsg);
878 }
879 }
880 }
881
882 // Send back to user
883 if (!sent)
884 response->Send(buf, lout);
885
886 // Cleanup
887 SafeFree(buf);
888 SafeDelArray(file);
890 SafeDelArray(pattern);
891
892 // Done
893 return 0;
894}
895
896////////////////////////////////////////////////////////////////////////////////
897/// Locate the exact file path allowing for wildcards '*' in the file name.
898/// In case of success, returns 0 and fills file wity the first matching instance.
899/// Return -1 if no matching pat is found.
900
901int XrdProofdNetMgr::LocateLocalFile(XrdOucString &file)
902{
903 XPDLOC(NMGR, "NetMgr::LocateLocalFile")
904
905 // If no wild cards or empty, nothing to do
906 if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
907
908 // Locate the file name and the dir
909 XrdOucString fn, dn;
910 int isl = file.rfind('/');
911 if (isl != STR_NPOS) {
912 fn.assign(file, isl + 1, -1);
913 dn.assign(file, 0, isl);
914 } else {
915 fn = file;
916 dn = "./";
917 }
918
919 XrdOucString emsg;
920 // Scan the dir
921 DIR *dirp = opendir(dn.c_str());
922 if (!dirp) {
923 XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
924 TRACE(XERR, emsg.c_str());
925 return -1;
926 }
927 struct dirent *ent = 0;
928 XrdOucString sent;
929 while ((ent = readdir(dirp))) {
930 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
931 continue;
932 // Check the match
933 sent = ent->d_name;
934 if (sent.matches(fn.c_str()) > 0) break;
935 sent = "";
936 }
937 closedir(dirp);
938
939 // If found fill a new output
940 if (sent.length() > 0) {
941 XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
942 return 0;
943 }
944
945 // Not found
946 return -1;
947}
948
949////////////////////////////////////////////////////////////////////////////////
950/// Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
951/// returned buffer must be freed by the caller.
952/// Wild cards '*' are allowed in the file name of 'path'; the first matching
953/// instance is taken.
954/// Returns 0 in case of error.
955
956char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
957{
958 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
959
960 XrdOucString emsg;
961 TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
962
963 // Check input
964 if (!path || strlen(path) <= 0) {
965 TRACE(XERR, "path undefined!");
966 return (char *)0;
967 }
968
969 // Locate the path resolving wild cards
970 XrdOucString spath(path);
971 if (LocateLocalFile(spath) != 0) {
972 TRACE(XERR, "path cannot be resolved! (" << path << ")");
973 return (char *)0;
974 }
975 const char *file = spath.c_str();
976
977 // Open the file in read mode
978 int fd = open(file, O_RDONLY);
979 if (fd < 0) {
980 emsg = "could not open ";
981 emsg += file;
982 TRACE(XERR, emsg);
983 return (char *)0;
984 }
985
986 // Size of the output
987 struct stat st;
988 if (fstat(fd, &st) != 0) {
989 emsg = "could not get size of file with stat: errno: ";
990 emsg += (int)errno;
991 TRACE(XERR, emsg);
992 close(fd);
993 return (char *)0;
994 }
995 off_t ltot = st.st_size;
996
997 // Estimate offsets of the requested range
998 // Start from ...
999 kXR_int64 start = ofs;
1000 off_t fst = (start < 0) ? ltot + start : start;
1001 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1002 // End at ...
1003 kXR_int64 end = fst + len;
1004 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1005 TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
1006
1007 // Number of bytes to be read
1008 len = lst - fst;
1009
1010 // Output buffer
1011 char *buf = (char *)malloc(len + 1);
1012 if (!buf) {
1013 emsg = "could not allocate enough memory on the heap: errno: ";
1014 emsg += (int)errno;
1015 XPDERR(emsg);
1016 close(fd);
1017 return (char *)0;
1018 }
1019
1020 // Reposition, if needed
1021 if (fst >= 0)
1022 lseek(fd, fst, SEEK_SET);
1023
1024 int left = len;
1025 int pos = 0;
1026 int nr = 0;
1027 do {
1028 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1029 errno = 0;
1030 if (nr < 0) {
1031 TRACE(XERR, "error reading from file: errno: " << errno);
1032 break;
1033 }
1034
1035 // Update counters
1036 pos += nr;
1037 left -= nr;
1038
1039 } while (nr > 0 && left > 0);
1040
1041 // Termination
1042 buf[len] = 0;
1043 TRACE(HDBG, "read " << nr << " bytes: " << buf);
1044
1045 // Close file
1046 close(fd);
1047
1048 // Done
1049 return buf;
1050}
1051
1052////////////////////////////////////////////////////////////////////////////////
1053/// Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
1054/// must be freed by the caller.
1055/// Wild cards '*' are allowed in the file name of 'path'; the first matching
1056/// instance is taken.
1057/// Returns 0 in case of error.
1058
1060 const char *pat, int &len, int opt)
1061{
1062 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
1063
1064 XrdOucString emsg;
1065 TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
1066
1067 // Check input
1068 if (!path || strlen(path) <= 0) {
1069 TRACE(XERR, "file path undefined!");
1070 return (char *)0;
1071 }
1072
1073 // Locate the path resolving wild cards
1074 XrdOucString spath(path);
1075 if (LocateLocalFile(spath) != 0) {
1076 TRACE(XERR, "path cannot be resolved! (" << path << ")");
1077 return (char *)0;
1078 }
1079 const char *file = spath.c_str();
1080
1081 // Size of the output
1082 struct stat st;
1083 if (stat(file, &st) != 0) {
1084 emsg = "could not get size of file with stat: errno: ";
1085 emsg += (int)errno;
1086 TRACE(XERR, emsg);
1087 return (char *)0;
1088 }
1089 off_t ltot = st.st_size;
1090
1091 // The grep command
1092 char *cmd = 0;
1093 int lcmd = 0;
1094 if (pat && strlen(pat) > 0) {
1095 lcmd = strlen(pat) + strlen(file) + 20;
1096 cmd = new char[lcmd];
1097 if (opt == 1) {
1098 snprintf(cmd, lcmd, "grep %s %s", pat, file);
1099 } else if (opt == 2) {
1100 snprintf(cmd, lcmd, "grep -v %s %s", pat, file);
1101 } else if (opt == 3) {
1102 snprintf(cmd, lcmd, "cat %s | %s", file, pat);
1103 } else { // should not be here
1104 snprintf(cmd, lcmd, "cat %s", file);
1105 }
1106 } else {
1107 lcmd = strlen(file) + 10;
1108 cmd = new char[lcmd];
1109 snprintf(cmd, lcmd, "cat %s", file);
1110 }
1111 TRACE(DBG, "cmd: " << cmd);
1112
1113 // Execute the command in a pipe
1114 FILE *fp = popen(cmd, "r");
1115 if (!fp) {
1116 emsg = "could not run '";
1117 emsg += cmd;
1118 emsg += "'";
1119 TRACE(XERR, emsg);
1120 delete[] cmd;
1121 return (char *)0;
1122 }
1123 delete[] cmd;
1124
1125 // Read line by line
1126 len = 0;
1127 char *buf = 0;
1128 char line[2048];
1129 int bufsiz = 0, left = 0, lines = 0;
1130 while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
1131 // Parse the line
1132 int llen = strlen(line);
1133 ltot -= llen;
1134 lines++;
1135 // (Re-)allocate the buffer
1136 if (!buf || (llen > left)) {
1137 int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1138 dsiz = (dsiz > llen) ? dsiz : llen;
1139 bufsiz += dsiz;
1140 buf = (char *)realloc(buf, bufsiz + 1);
1141 left += dsiz;
1142 }
1143 if (!buf) {
1144 emsg = "could not allocate enough memory on the heap: errno: ";
1145 emsg += (int)errno;
1146 TRACE(XERR, emsg);
1147 pclose(fp);
1148 return (char *)0;
1149 }
1150 // Add line to the buffer
1151 memcpy(buf + len, line, llen);
1152 len += llen;
1153 left -= llen;
1154 if (TRACING(HDBG))
1155 fprintf(stderr, "line: %s", line);
1156 }
1157
1158 // Check the result and terminate the buffer
1159 if (buf) {
1160 if (len > 0) {
1161 buf[len] = 0;
1162 } else {
1163 free(buf);
1164 buf = 0;
1165 }
1166 }
1167
1168 // Close file
1169 pclose(fp);
1170
1171 // Done
1172 return buf;
1173}
1174
1175////////////////////////////////////////////////////////////////////////////////
1176/// Send a read buffer request of length 'len' at offset 'ofs' for remote file
1177/// defined by 'url'; the returned buffer must be freed by the caller.
1178/// Returns 0 in case of error.
1179
1180char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
1181 kXR_int64 ofs, int &len, int grep)
1182{
1183 XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
1184
1185 TRACE(REQ, "url: " << (url ? url : "undef") <<
1186 ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
1187 ", len: " << len << ", grep: " << grep);
1188
1189 // Check input
1190 if (!file || strlen(file) <= 0) {
1191 TRACE(XERR, "file undefined!");
1192 return (char *)0;
1193 }
1195 if (!url || strlen(url) <= 0) {
1196 // Use file as url
1197 u.TakeUrl(XrdOucString(file));
1198 if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
1199 }
1200
1201 // Get a connection (logs in)
1202 XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
1203
1204 char *buf = 0;
1205 if (conn && conn->IsValid()) {
1206 // Prepare request
1208 memset(&reqhdr, 0, sizeof(reqhdr));
1209 conn->SetSID(reqhdr.header.streamid);
1210 reqhdr.header.requestid = kXP_readbuf;
1211 reqhdr.readbuf.ofs = ofs;
1212 reqhdr.readbuf.len = len;
1213 reqhdr.readbuf.int1 = grep;
1214 reqhdr.header.dlen = strlen(file);
1215 const void *btmp = (const void *) file;
1216 char **vout = &buf;
1217 // Send over
1219 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
1220
1221 // If positive answer
1222 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1223 len = xrsp->DataLen();
1224 } else {
1225 if (xrsp && !(xrsp->IsError()))
1226 // The buffer was just empty: do not call it error
1227 len = 0;
1228 SafeFree(buf);
1229 }
1230
1231 // Clean the message
1233 // Clean it up, to avoid leaving open tcp connection possibly going forever
1234 // into CLOSE_WAIT
1235 SafeDelete(conn);
1236 }
1237
1238 // Done
1239 return buf;
1240}
1241
1242////////////////////////////////////////////////////////////////////////////////
1243/// Get log paths from next tier; used in multi-master setups
1244/// Returns 0 in case of error.
1245
1246char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
1247{
1248 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1249
1250 TRACE(REQ, "url: " << (url ? url : "undef") <<
1251 ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1252
1253 // Check input
1254 if (!url || strlen(url) <= 0) {
1255 TRACE(XERR, "url undefined!");
1256 return (char *)0;
1257 }
1258
1259 // Get a connection (logs in)
1260 XrdProofConn *conn = GetProofConn(url);
1261
1262 char *buf = 0;
1263 if (conn && conn->IsValid()) {
1264 // Prepare request
1266 memset(&reqhdr, 0, sizeof(reqhdr));
1267 conn->SetSID(reqhdr.header.streamid);
1268 reqhdr.header.requestid = kXP_admin;
1269 reqhdr.proof.int1 = kQueryLogPaths;
1270 reqhdr.proof.int2 = isess;
1271 reqhdr.proof.sid = -1;
1272 reqhdr.header.dlen = msg ? strlen(msg) : 0;
1273 const void *btmp = (const void *) msg;
1274 char **vout = &buf;
1275 // Send over
1277 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
1278
1279 // If positive answer
1280 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1281 int len = xrsp->DataLen();
1282 buf = (char *) realloc((void *)buf, len + 1);
1283 if (buf)
1284 buf[len] = 0;
1285 } else {
1286 SafeFree(buf);
1287 }
1288
1289 // Clean the message
1291 // Clean it up, to avoid leaving open tcp connection possibly going forever
1292 // into CLOSE_WAIT
1293 SafeDelete(conn);
1294 }
1295
1296 // Done
1297 return buf;
1298}
1299
1300////////////////////////////////////////////////////////////////////////////////
1301/// Get log paths from next tier; used in multi-master setups
1302/// Returns 0 in case of error.
1303
1305{
1306 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1307
1308 TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1309
1310 char *buf = 0, *pbuf = buf;
1311 int len = 0;
1312 // Loop over unique nodes
1313 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
1314 XrdProofWorker *w = 0;
1315 while (iw != fNodes.end()) {
1316 if ((w = *iw)) {
1317 // Do not send it to ourselves
1318 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
1319 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
1320 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
1321 if (!us) {
1322 // Create 'url'
1323 XrdOucString u = fMgr->EffectiveUser();
1324 u += '@';
1325 u += w->fHost;
1326 if (w->fPort != -1) {
1327 u += ':';
1328 u += w->fPort;
1329 }
1330 // Ask the node
1331 char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
1332 if (bmst) {
1333 len += strlen(bmst) + 1;
1334 buf = (char *) realloc((void *)buf, len);
1335 pbuf = buf + len - strlen(bmst) - 1;
1336 memcpy(pbuf, bmst, strlen(bmst) + 1);
1337 buf[len - 1] = 0;
1338 pbuf = buf + len;
1339 free(bmst);
1340 }
1341 } else {
1342 TRACE(DBG, "request for ourselves: ignore");
1343 }
1344 }
1345 // Next worker
1346 ++iw;
1347 }
1348
1349 // Done
1350 return buf;
1351}
1352
1353////////////////////////////////////////////////////////////////////////////////
1354/// Fill-in fWorkers for a localhost based on the number of
1355/// workers fNumLocalWrks.
1356
1358{
1359 XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
1360
1361 TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
1362
1363 // Lock the method to protect the lists.
1365
1366 // Cleanup the worker list
1367 fWorkers.clear();
1368 // The first time we need to create the default workers
1369 if (fDfltWorkers.size() < 1) {
1370 // Create a default master line
1371 XrdOucString mm("master ", 128);
1372 mm += fMgr->Host();
1373 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1374
1375 // Create 'localhost' lines for each worker
1376 int nwrk = fNumLocalWrks;
1377 if (nwrk > 0) {
1378 mm = "worker localhost port=";
1379 mm += fMgr->Port();
1380 while (nwrk--) {
1381 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1382 TRACE(DBG, "added line: " << mm);
1383 }
1384 }
1385 }
1386
1387 // Copy the list
1388 std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
1389 for (; w != fDfltWorkers.end(); ++w) {
1390 fWorkers.push_back(*w);
1391 }
1392
1393 TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
1394
1395 // Find unique nodes
1397
1398 // We are done
1399 return;
1400}
1401
1402////////////////////////////////////////////////////////////////////////////////
1403/// Return the list of workers after having made sure that the info is
1404/// up-to-date
1405
1406std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
1407{
1408 XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
1409
1411
1412 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1413 // Check if there were any changes in the config file
1414 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1415 if (fDfltFallback) {
1416 // Use default settings
1418 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1419 } else {
1420 TRACE(XERR, "unable to read the configuration file");
1421 return (std::list<XrdProofWorker *> *)0;
1422 }
1423 }
1424 }
1425 TRACE(DBG, "returning list with " << fWorkers.size() << " entries");
1426
1427 if (TRACING(HDBG)) Dump();
1428
1429 return &fWorkers;
1430}
1431
1432////////////////////////////////////////////////////////////////////////////////
1433/// Dump status
1434
1436{
1437 const char *xpdloc = "NetMgr::Dump";
1438
1440
1441 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1442 XPDPRT("+ Active workers status");
1443 XPDPRT("+ Size: " << fWorkers.size());
1444 XPDPRT("+ ");
1445
1446 std::list<XrdProofWorker *>::iterator iw;
1447 for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
1448 XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
1449 " active sessions:" << (*iw)->Active());
1450 }
1451 XPDPRT("+ ");
1452 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1453}
1454
1455////////////////////////////////////////////////////////////////////////////////
1456/// Return the list of unique nodes after having made sure that the info is
1457/// up-to-date
1458
1459std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
1460{
1461 XPDLOC(NMGR, "NetMgr::GetNodes")
1462
1464
1465 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1466 // Check if there were any changes in the config file
1467 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1468 if (fDfltFallback) {
1469 // Use default settings
1471 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1472 } else {
1473 TRACE(XERR, "unable to read the configuration file");
1474 return (std::list<XrdProofWorker *> *)0;
1475 }
1476 }
1477 }
1478 TRACE(DBG, "returning list with " << fNodes.size() << " entries");
1479
1480 return &fNodes;
1481}
1482
1483////////////////////////////////////////////////////////////////////////////////
1484/// Read PROOF config file and load the information in fWorkers.
1485/// NB: 'master' information here is ignored, because it is passed
1486/// via the 'xpd.workdir' and 'xpd.image' config directives
1487
1489{
1490 XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
1491
1492 TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
1493
1494 // Lock the method to protect the lists.
1496
1497 // Check inputs
1498 if (fPROOFcfg.fName.length() <= 0)
1499 return -1;
1500
1501 // Get the modification time
1502 struct stat st;
1503 if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
1504 // If the file disappeared, reset the modification time so that we are sure
1505 // to reload it if it comes back
1506 if (errno == ENOENT) fPROOFcfg.fMtime = -1;
1507 if (!fDfltFallback) {
1508 TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
1509 } else {
1510 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
1511 }
1512 return -1;
1513 }
1514 TRACE(DBG, "time of last modification: " << st.st_mtime);
1515
1516 // File should be loaded only once
1517 if (st.st_mtime <= fPROOFcfg.fMtime)
1518 return 0;
1519
1520 // Save the modification time
1521 fPROOFcfg.fMtime = st.st_mtime;
1522
1523 // Open the defined path.
1524 FILE *fin = 0;
1525 if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
1526 if (fWorkers.size() > 1) {
1527 TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
1528 TRACE(XERR, "continuing with existing list of workers.");
1529 return 0;
1530 } else {
1531 return -1;
1532 }
1533 }
1534
1535 if (reset) {
1536 // Cleanup the worker list
1537 fWorkers.clear();
1538 }
1539
1540 // Add default a master line if not yet there
1541 if (fRegWorkers.size() < 1) {
1542 XrdOucString mm("master ", 128);
1543 mm += fMgr->Host();
1544 fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
1545 } else {
1546 // Deactivate all current active workers
1547 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1548 // Skip the master line
1549 ++w;
1550 for (; w != fRegWorkers.end(); ++w) {
1551 (*w)->fActive = 0;
1552 }
1553 }
1554
1555 // Read now the directives
1556 int nw = 0;
1557 char lin[2048];
1558 while (fgets(lin, sizeof(lin), fin)) {
1559 // Skip empty lines
1560 int p = 0;
1561 while (lin[p++] == ' ') {
1562 ;
1563 }
1564 p--;
1565 if (lin[p] == '\0' || lin[p] == '\n')
1566 continue;
1567
1568 // Skip comments
1569 if (lin[0] == '#')
1570 continue;
1571
1572 // Remove trailing '\n';
1573 if (lin[strlen(lin)-1] == '\n')
1574 lin[strlen(lin)-1] = '\0';
1575
1576 TRACE(DBG, "found line: " << lin);
1577
1578 // Parse the line
1580
1581 const char *pfx[2] = { "master", "node" };
1582 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1583 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1584 // Init a master instance
1585 if (pw->fHost.beginswith("localhost") ||
1586 pw->Matches(fMgr->Host())) {
1587 // Replace the default line (the first with what found in the file)
1588 XrdProofWorker *fw = fRegWorkers.front();
1589 fw->Reset(lin);
1590 }
1591 // Ignore it
1592 SafeDelete(pw);
1593 } else {
1594 // Check if we have already it
1595 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1596 // Skip the master line
1597 ++w;
1598 bool haveit = 0;
1599 while (w != fRegWorkers.end()) {
1600 if (!((*w)->fActive)) {
1601 if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
1602 (*w)->fActive = 1;
1603 haveit = 1;
1604 break;
1605 }
1606 }
1607 // Go to next
1608 ++w;
1609 }
1610 // If we do not have it, build a new worker object
1611 if (!haveit) {
1612 // Keep it
1613 fRegWorkers.push_back(pw);
1614 } else {
1615 // Drop it
1616 SafeDelete(pw);
1617 }
1618 }
1619 }
1620
1621 // Copy the active workers
1622 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1623 while (w != fRegWorkers.end()) {
1624 if ((*w)->fActive) {
1625 fWorkers.push_back(*w);
1626 nw++;
1627 }
1628 ++w;
1629 }
1630
1631 // Close files
1632 fclose(fin);
1633
1634 // Find unique nodes
1635 if (reset)
1637
1638 // We are done
1639 return ((nw == 0) ? -1 : 0);
1640}
1641
1642////////////////////////////////////////////////////////////////////////////////
1643/// Scan fWorkers for unique nodes (stored in fNodes).
1644/// Return the number of unque nodes.
1645/// NB: 'master' information here is ignored, because it is passed
1646/// via the 'xpd.workdir' and 'xpd.image' config directives
1647
1649{
1650 XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
1651
1652 TRACE(REQ, "# workers: " << fWorkers.size());
1653
1654 // Cleanup the nodes list
1655 fNodes.clear();
1656
1657 // Build the list of unique nodes (skip the master line);
1658 if (fWorkers.size() > 1) {
1659 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
1660 ++w;
1661 for (; w != fWorkers.end(); ++w) if ((*w)->fActive) {
1662 bool add = 1;
1663 std::list<XrdProofWorker *>::iterator n;
1664 for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
1665 if ((*n)->Matches(*w)) {
1666 add = 0;
1667 break;
1668 }
1669 }
1670 if (add)
1671 fNodes.push_back(*w);
1672 }
1673 }
1674 TRACE(REQ, "found " << fNodes.size() << " unique nodes");
1675
1676 // We are done
1677 return fNodes.size();
1678}
#define SafeDelete(p)
Definition RConfig.hxx:517
#define d(i)
Definition RSha256.hxx:102
#define e(i)
Definition RSha256.hxx:103
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define TRACE(Flag, Args)
Definition TGHtml.h:121
static unsigned int total
winID h TVirtualViewer3D TVirtualGLPainter p
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t type
@ kXP_readbuf
@ kXP_admin
@ kXP_ctrlc
#define kXPD_Master
@ kRTStatic
@ kRTNone
#define kXPD_AnyServer
@ kCleanupSessions
@ kExec
@ kQueryLogPaths
@ kROOTVersion
#define kXPD_Worker
@ kXPD_srvmsg
#define XrdSysError
Definition XpdSysError.h:8
#define NAME_REQUESTTIMEOUT
#define xrdmin(a, b)
#define EnvPutInt(name, val)
#define XPDFORM
#define SafeFree(x)
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define SafeDelArray(x)
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
#define XPD_LONGOK(x)
#define XPD_SETRESP(p, x)
#define XPDPRT(x)
#define XPDLOC(d, x)
#define TRACEP(p, act, x)
#define TRACING(x)
#define XPDERR(x)
#define XrdSysMutexHelper
Definition XrdSysToOuc.h:17
#define realloc
Definition civetweb.c:1538
#define free
Definition civetweb.c:1539
#define snprintf
Definition civetweb.c:1540
#define malloc
Definition civetweb.c:1536
const_iterator begin() const
const_iterator end() const
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdOucString fName
XrdProofdNetMgr * NetMgr() const
const char * Host() const
const char * EffectiveUser() const
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
std::list< XrdProofWorker * > fDfltWorkers
void BalanceNodesOrder()
Indices (this will be used twice).
std::list< XrdProofWorker * > fNodes
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdOucError *e)
Constructor.
XrdProofdFile fPROOFcfg
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
void Dump()
Dump status.
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
void RegisterDirectives()
Register config directives.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
std::list< XrdProofWorker * > fWorkers
XrdProofdManager * fMgr
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
std::list< XrdProofWorker * > fRegWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
XrdOucRecMutex fMutex
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual ~XrdProofdNetMgr()
Destructor.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
TLine * line
const Int_t n
Definition legend1.C:16
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...
TMarker m
Definition textangle.C:8