Logo ROOT  
Reference Guide
XrdProofdNetMgr.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis Jan 2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11#include "XrdProofdPlatform.h"
12
13//////////////////////////////////////////////////////////////////////////
14// //
15// XrdProofdNetMgr //
16// //
17// Authors: G. Ganis, CERN, 2008 //
18// //
19// Manages connections between PROOF server daemons //
20// //
21//////////////////////////////////////////////////////////////////////////
22
23#include "XrdProofdNetMgr.h"
24
25#include "XrdProofdXrdVers.h"
26#ifndef ROOT_XrdFour
27# include "XpdSysDNS.h"
28#else
29# include "XrdNet/XrdNetAddr.hh"
30#endif
31#include "Xrd/XrdBuffer.hh"
36#include "XrdOuc/XrdOucStream.hh"
37#include "XrdSys/XrdSysPlatform.hh"
38
39#include "XrdProofdClient.h"
40#include "XrdProofdManager.h"
41#include "XrdProofdProtocol.h"
42#include "XrdProofdResponse.h"
43#include "XrdProofWorker.h"
44
45// Tracing utilities
46#include "XrdProofdTrace.h"
47
48#include <algorithm>
49#include <limits>
50#include <math.h>
51
52////////////////////////////////////////////////////////////////////////////////
53/// Send up a message from the server
54
55int MessageSender(const char *msg, int len, void *arg)
56{
58 if (r) {
59 return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
60 }
61 return -1;
62}
63
64////////////////////////////////////////////////////////////////////////////////
65/// Constructor
66
68 XrdProtocol_Config *pi, XrdSysError *e)
69 : XrdProofdConfig(pi->ConfigFN, e)
70{
71 fMgr = mgr;
73 fPROOFcfg.fName = "";
74 fPROOFcfg.fMtime = -1;
76 fDfltFallback = 0;
77 fDfltWorkers.clear();
78 fRegWorkers.clear();
79 fWorkers.clear();
80 fNodes.clear();
82 fWorkerUsrCfg = 0;
83 fRequestTO = 30;
84
85 // Configuration directives
87}
88
89////////////////////////////////////////////////////////////////////////////////
90/// Register config directives
91
93{
94 Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
95 Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
96 Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
97 Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
98}
99
100////////////////////////////////////////////////////////////////////////////////
101/// Destructor
102
104{
105 // Cleanup the worker lists
106 // (the nodes list points to the same object, no cleanup is needed)
107 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
108 while (w != fRegWorkers.end()) {
109 delete *w;
110 w = fRegWorkers.erase(w);
111 }
112 w = fDfltWorkers.begin();
113 while (w != fDfltWorkers.end()) {
114 delete *w;
115 w = fDfltWorkers.erase(w);
116 }
117 fWorkers.clear();
118}
119
120////////////////////////////////////////////////////////////////////////////////
121/// Run configuration and parse the entered config directives.
122/// Return 0 on success, -1 on error
123
125{
126 XPDLOC(NMGR, "NetMgr::Config")
127
128 // Lock the method to protect the lists.
130
131 // Cleanup the worker list
132 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
133 while (w != fWorkers.end()) {
134 delete *w;
135 w = fWorkers.erase(w);
136 }
137 // Create a default master line
138 XrdOucString mm("master ", 128);
139 mm += fMgr->Host();
140 mm += " port=";
141 mm += fMgr->Port();
142 fWorkers.push_back(new XrdProofWorker(mm.c_str()));
143
144 // Run first the configurator
145 if (XrdProofdConfig::Config(rcf) != 0) {
146 XPDERR("problems parsing file ");
147 return -1;
148 }
149
150 XrdOucString msg;
151 msg = (rcf) ? "re-configuring" : "configuring";
152 TRACE(ALL, msg);
153
154 if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
155 TRACE(ALL, "PROOF config file: " <<
156 ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
157 if (fResourceType == kRTStatic) {
158 // Initialize the list of workers if a static config has been required
159 // Default file path, if none specified
160 bool dodefault = 1;
161 if (fPROOFcfg.fName.length() > 0) {
162 // Load file content in memory
163 if (ReadPROOFcfg() == 0) {
164 TRACE(ALL, "PROOF config file will " <<
165 ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
166 dodefault = 0;
167 } else {
168 if (!fDfltFallback) {
169 XPDERR("unable to find valid information in PROOF config file " <<
171 fPROOFcfg.fMtime = -1;
172 return 0;
173 } else {
174 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
175 }
176 }
177 }
178 if (dodefault) {
179 // Use default
181 }
182 } else if (fResourceType == kRTNone && fWorkers.size() <= 1) {
183 // Nothing defined: use default
185 }
186
187 // Find unique nodes
189 }
190
191 // For connection to the other xproofds we try only once
193 // Request Timeout
195
196 // Notification
197 XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
198 TRACE(ALL, msg);
199
200 // Done
201 return 0;
202}
203
204////////////////////////////////////////////////////////////////////////////////
205/// Update the priorities of the active sessions.
206
208 char *val, XrdOucStream *cfg, bool rcf)
209{
210 XPDLOC(NMGR, "NetMgr::DoDirective")
211
212 if (!d)
213 // undefined inputs
214 return -1;
215
216 if (d->fName == "resource") {
217 return DoDirectiveResource(val, cfg, rcf);
218 } else if (d->fName == "adminreqto") {
219 return DoDirectiveAdminReqTO(val, cfg, rcf);
220 } else if (d->fName == "worker") {
221 return DoDirectiveWorker(val, cfg, rcf);
222 }
223
224 TRACE(XERR, "unknown directive: " << d->fName);
225
226 return -1;
227}
228
229////////////////////////////////////////////////////////////////////////////////
230/// Indices (this will be used twice).
231
233{
234 list<XrdProofWorker *>::const_iterator iter, iter2;
235 list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
236 // Map to store information of the balancer.
237 map<XrdProofWorker *, BalancerInfo> info;
238 // Node with minimum number of workers distinct to 1.
239 unsigned int min = UINT_MAX;
240 // Total number of nodes and per iteration assignments.
241 unsigned int total = 0, total_perit = 0;
242 // Number of iterations to get every node filled.
243 unsigned int total_added = 0;
244 // Temporary list to store the balanced configuration
245 list<XrdProofWorker *> tempNodes;
246 // Flag for the search and destroy loop.
247 bool deleted;
248
249 // Fill the information store with the first data (number of nodes).
250 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
251 // The next code is not the same as this:
252 //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
253 // The previous piece of STL code only checks the pointer of the value
254 // stored on the list, altough it is more efficient, it needs that repeated
255 // nodes point to the same object. To allow hybrid configurations, we are
256 // doing a 'manually' matching since statically configured nodes are
257 // created in multiple ways.
258 info[*iter].available = 0;
259 for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
260 if ((*iter)->Matches(*iter2)) {
261 info[*iter].available++;
262 }
263 }
264 info[*iter].added = 0;
265 // Calculate the minimum greater than 1.
266 if (info[*iter].available > 1 && info[*iter].available < min)
267 min = info[*iter].available;
268 // Calculate the totals.
269 total += info[*iter].available;
270 }
271
272 // Now, calculate the number of workers to add in each iteration of the
273 // round robin, scaling to the smaller number.
274 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
275 if (info[*iter].available > 1) {
276 info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
277 } else {
278 info[*iter].per_iteration = 1;
279 }
280 // Calculate the totals.
281 total_perit += info[*iter].per_iteration;
282 }
283
284 // Since we are going to substitute the list, don't forget to recover the
285 // default node at the fist time.
286 tempNodes.push_back(fWorkers.front());
287
288 // Finally, do the round robin assignment of nodes.
289 // Stop when every node has its workers processed.
290 while (total_added < total) {
291 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
292 if (i->second.added < i->second.available) {
293 // Be careful with the remainders (on prime number of nodes).
294 unsigned int to_add = xrdmin(i->second.per_iteration,
295 (i->second.available - i->second.added));
296 // Then add the nodes.
297 for (unsigned int j = 0; j < to_add; j++) {
298 tempNodes.push_back(i->first);
299 }
300 i->second.added += to_add;
301 total_added += to_add;
302 }
303 }
304 }
305
306 // Since we are mergin nodes in only one object, we must merge the current
307 // sessions of the static nodes (that can be distinct objects that represent
308 // the same node) and delete the orphaned objects. If, in the future, we can
309 // assure that every worker has only one object in the list, this is not more
310 // necessary. The things needed to change are the DoDirectiveWorker, it must
311 // search for a node before inserting it, and in the repeat directive insert
312 // the same node always. Also the default configuration methods (there are
313 // two in this class) must be updated.
314 iter3 = ++(fWorkers.begin());
315 while (iter3 != fWorkers.end()) {
316 deleted = false;
317 // If the worker is not in the fWorkers list, we must process it. Note that
318 // std::count() uses a plain comparison between values, in this case, we
319 // are comparing pointers (numbers, at the end).
320 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
321 // Search for an object that matches with this in the temp list.
322 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
323 if ((*iter2)->Matches(*iter3)) {
324 // Copy data and delete the *iter object.
325 (*iter2)->MergeProofServs(*(*iter3));
326 deleted = true;
327 delete *iter3;
328 fWorkers.erase(iter3++);
329 break;
330 }
331 }
332 }
333 // Do not forget to increase the iterator.
334 if (!deleted)
335 ++iter3;
336 }
337
338 // Then, substitute the current fWorkers list with the balanced one.
339 fWorkers = tempNodes;
340}
341
342////////////////////////////////////////////////////////////////////////////////
343/// Process 'adminreqto' directive
344
345int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
346{
347 if (!val)
348 // undefined inputs
349 return -1;
350
351 // Check deprecated 'if' directive
352 if (fMgr->Host() && cfg)
353 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
354 return 0;
355
356 // Timeout on requested broadcasted to workers; there are 4 attempts,
357 // so the real timeout is 4 x fRequestTO
358 int to = strtol(val, 0, 10);
359 fRequestTO = (to > 0) ? to : fRequestTO;
360 return 0;
361}
362
363////////////////////////////////////////////////////////////////////////////////
364/// Process 'resource' directive
365
366int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
367{
368 XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
369
370 if (!val || !cfg)
371 // undefined inputs
372 return -1;
373
374 if (!strcmp("static", val)) {
375 // We just take the path of the config file here; the
376 // rest is used by the static scheduler
378 while ((val = cfg->GetWord()) && val[0]) {
379 XrdOucString s(val);
380 if (s.beginswith("ucfg:")) {
381 fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
382 } else if (s.beginswith("reload:")) {
383 fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
384 } else if (s.beginswith("dfltfallback:")) {
385 fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
386 } else if (s.beginswith("wmx:")) {
387 } else if (s.beginswith("selopt:")) {
388 } else {
389 // Config file
390 fPROOFcfg.fName = val;
391 if (fPROOFcfg.fName.beginswith("sm:")) {
392 fPROOFcfg.fName.replace("sm:", "");
393 }
395 // Make sure it exists and can be read
396 if (access(fPROOFcfg.fName.c_str(), R_OK)) {
397 if (errno == ENOENT) {
398 TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
399 } else {
400 TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
401 fPROOFcfg.fName = "";
402 fPROOFcfg.fMtime = -1;
403 }
404 }
405 }
406 }
407 }
408 return 0;
409}
410
411////////////////////////////////////////////////////////////////////////////////
412/// Process 'worker' directive
413
414int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
415{
416 XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
417
418 if (!val || !cfg)
419 // undefined inputs
420 return -1;
421
422 // Lock the method to protect the lists.
424
425 // Get the full line (w/o heading keyword)
426 cfg->RetToken();
427 XrdOucString wrd(cfg->GetWord());
428 if (wrd.length() > 0) {
429 // Build the line
430 XrdOucString line;
431 char rest[2048] = {0};
432 cfg->GetRest((char *)&rest[0], 2048);
433 XPDFORM(line, "%s %s", wrd.c_str(), rest);
434 // Parse it now
435 if (wrd == "master" || wrd == "node") {
436 // Init a master instance
437 XrdProofWorker *pw = new XrdProofWorker(line.c_str());
438 if (pw->fHost.beginswith("localhost") ||
439 pw->Matches(fMgr->Host())) {
440 // Replace the default line (the first with what found in the file)
441 XrdProofWorker *fw = fWorkers.front();
442 fw->Reset(line.c_str());
443 }
444 SafeDelete(pw);
445 } else {
446 // How many lines like this?
447 int nr = 1;
448 int ir = line.find("repeat=");
449 if (ir != STR_NPOS) {
450 XrdOucString r(line, ir + strlen("repeat="));
451 r.erase(r.find(' '));
452 nr = r.atoi();
453 if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
454 TRACE(DBG, "found repeat = " << nr);
455 }
456 while (nr--) {
457 // Build the worker object
458 XrdProofdMultiStr mline(line.c_str());
459 if (mline.IsValid()) {
460 TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
461 for (int i = 0; i < mline.N(); i++) {
462 TRACE(HDBG, "found token: " << mline.Get(i));
463 fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
464 }
465 } else {
466 TRACE(DBG, "found line: " << line);
467 fWorkers.push_back(new XrdProofWorker(line.c_str()));
468 }
469 }
470 }
471 }
472
473 // Necessary for the balancer when Bonjour is enabled. Note that this balancer
474 // can also be enabled with a static configuration. By this time is disabled
475 // due to its experimental status.
477 //BalanceNodesOrder();
478
479 return 0;
480}
481
482////////////////////////////////////////////////////////////////////////////////
483/// Broadcast a ctrlc interrupt
484/// Return 0 on success, -1 on error
485
487{
488 XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
489
490 int rc = 0;
491
492 // Loop over unique nodes
493 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
494 XrdProofWorker *w = 0;
495 while (iw != fNodes.end()) {
496 if ((w = *iw) && w->fType != 'M') {
497 // Do not send it to ourselves
498 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
499 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
500 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
501 if (!us) {
502 // Create 'url'
503 // We use the enforced username if specified in the config file; this is the case
504 // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
505 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
506 if (u.length() <= 0) u = fMgr->EffectiveUser();
507 u += '@';
508 u += w->fHost;
509 if (w->fPort != -1) {
510 u += ':';
511 u += w->fPort;
512 }
513 TRACE(HDBG, "sending request to: "<<u);
514 // Get a connection to the server
515 XrdProofConn *conn = GetProofConn(u.c_str());
516 if (conn && conn->IsValid()) {
517 // Prepare request
518 XPClientRequest reqhdr;
519 memset(&reqhdr, 0, sizeof(reqhdr));
520 conn->SetSID(reqhdr.header.streamid);
521 reqhdr.proof.requestid = kXP_ctrlc;
522 reqhdr.proof.sid = 0;
523 reqhdr.proof.dlen = 0;
524 // We need the right order
525 if (XPD::clientMarshall(&reqhdr) != 0) {
526 TRACE(XERR, "problems marshalling request");
527 return -1;
528 }
529 if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
530 TRACE(XERR, "problems sending ctrl-c request to server " << u);
531 }
532 // Clean it up, to avoid leaving open tcp connection possibly going forever
533 // into CLOSE_WAIT
534 SafeDelete(conn);
535 }
536 } else {
537 TRACE(DBG, "broadcast request for ourselves: ignore");
538 }
539 }
540 // Next worker
541 ++iw;
542 }
543
544 // Done
545 return rc;
546}
547
548////////////////////////////////////////////////////////////////////////////////
549/// Broadcast request to known potential sub-nodes.
550/// Return 0 on success, -1 on error
551
552int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
553 XrdProofdResponse *r, bool notify, int subtype)
554{
555 XPDLOC(NMGR, "NetMgr::Broadcast")
556
557 unsigned int nok = 0;
558 TRACE(REQ, "type: " << type);
559
560 // Loop over unique nodes
561 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
562 XrdProofWorker *w = 0;
563 XrdClientMessage *xrsp = 0;
564 while (iw != fNodes.end()) {
565 if ((w = *iw) && w->fType != 'M') {
566 // Do not send it to ourselves
567 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
568 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
569 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
570 if (!us) {
571 // Create 'url'
572 // We use the enforced username if specified in the config file; this is the case
573 // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
574 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
575 if (u.length() <= 0) u = fMgr->EffectiveUser();
576 u += '@';
577 u += w->fHost;
578 if (w->fPort != -1) {
579 u += ':';
580 u += w->fPort;
581 }
582 // Type of server
583 int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
584 : (kXR_int32) kXPD_Worker;
585 TRACE(HDBG, "sending request to " << u);
586 // Send request
587 if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
588 TRACE(XERR, "problems sending request to " << u);
589 } else {
590 nok++;
591 }
592 // Cleanup answer
593 SafeDelete(xrsp);
594 } else {
595 TRACE(DBG, "broadcast request for ourselves: ignore");
596 }
597 }
598 // Next worker
599 ++iw;
600 }
601
602 // Done
603 return (nok == fNodes.size()) ? 0 : -1;
604}
605
606////////////////////////////////////////////////////////////////////////////////
607/// Get a XrdProofConn for url; create a new one if not available
608
610{
611 XrdProofConn *p = 0;
612
613 // If not found create a new one
614 XrdOucString buf = " Manager connection from ";
615 buf += fMgr->Host();
616 buf += "|ord:000";
617 char m = 'A'; // log as admin
618
619 {
621 p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
622 }
623 if (p && !(p->IsValid())) SafeDelete(p);
624
625 // Done
626 return p;
627}
628
629////////////////////////////////////////////////////////////////////////////////
630/// Broadcast request to known potential sub-nodes.
631/// Return 0 on success, -1 on error
632
634 const char *msg, int srvtype,
635 XrdProofdResponse *r, bool notify,
636 int subtype)
637{
638 XPDLOC(NMGR, "NetMgr::Send")
639
640 XrdClientMessage *xrsp = 0;
641 TRACE(REQ, "type: " << type);
642
643 if (!url || strlen(url) <= 0)
644 return xrsp;
645
646 // Get a connection to the server
647 XrdProofConn *conn = GetProofConn(url);
648
649 bool ok = 1;
650 if (conn && conn->IsValid()) {
651 XrdOucString notifymsg("Send: ");
652 // Prepare request
653 XPClientRequest reqhdr;
654 const void *buf = 0;
655 char **vout = 0;
656 memset(&reqhdr, 0, sizeof(reqhdr));
657 conn->SetSID(reqhdr.header.streamid);
658 reqhdr.header.requestid = kXP_admin;
659 reqhdr.proof.int1 = type;
660 switch (type) {
661 case kROOTVersion:
662 notifymsg += "change-of-ROOT version request to ";
663 notifymsg += url;
664 notifymsg += " msg: ";
665 notifymsg += msg;
666 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
667 buf = (msg) ? (const void *)msg : buf;
668 break;
669 case kCleanupSessions:
670 notifymsg += "cleanup request to ";
671 notifymsg += url;
672 notifymsg += " for user: ";
673 notifymsg += msg;
674 reqhdr.proof.int2 = (kXR_int32) srvtype;
675 reqhdr.proof.sid = -1;
676 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
677 buf = (msg) ? (const void *)msg : buf;
678 break;
679 case kExec:
680 notifymsg += "exec ";
681 notifymsg += subtype;
682 notifymsg += "request for ";
683 notifymsg += msg;
684 reqhdr.proof.int2 = (kXR_int32) subtype;
685 reqhdr.proof.sid = -1;
686 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
687 buf = (msg) ? (const void *)msg : buf;
688 break;
689 default:
690 ok = 0;
691 TRACE(XERR, "invalid request type " << type);
692 break;
693 }
694
695 // Notify the client that we are sending the request
696 if (r && notify)
697 r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
698
699 // Activate processing of unsolicited responses
700 conn->SetAsync(conn, &MessageSender, (void *)r);
701
702 // Send over
703 if (ok)
704 xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
705
706 // Deactivate processing of unsolicited responses
707 conn->SetAsync(0, 0, (void *)0);
708
709 // Print error msg, if any
710 if (r && !xrsp && conn->GetLastErr()) {
711 XrdOucString cmsg = url;
712 cmsg += ": ";
713 cmsg += conn->GetLastErr();
714 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
715 }
716 // Clean it up, to avoid leaving open tcp connection possibly going forever
717 // into CLOSE_WAIT
718 SafeDelete(conn);
719
720 } else {
721 TRACE(XERR, "could not open connection to " << url);
722 if (r) {
723 XrdOucString cmsg = "failure attempting connection to ";
724 cmsg += url;
725 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
726 }
727 }
728
729 // Done
730 return xrsp;
731}
732
733////////////////////////////////////////////////////////////////////////////////
734/// Check if 'host' is this local host. If checkport is true,
735/// matching of the local port with the one implied by host is also checked.
736/// Return 1 if 'local', 0 otherwise
737
738bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
739{
740 XPDLOC(NMGR, "NetMgr::IsLocal")
741
742 int rc = 0;
743 if (host && strlen(host) > 0) {
744 XrdClientUrlInfo uu(host);
745 if (uu.Port <= 0) uu.Port = 1093;
746 // Fully qualified name
747#ifndef ROOT_XrdFour
748 char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
749#else
750 XrdNetAddr aNA;
751 aNA.Set(uu.Host.c_str());
752 char *fqn = (char *) aNA.Name();
753#endif
754 TRACE(HDBG, "fqn: '"<<fqn<<"' mgrh: '"<<fMgr->Host()<<"'");
755 if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
756 !strcmp(fMgr->Host(), fqn))) {
757 if (!checkport || (uu.Port == fMgr->Port()))
758 rc = 1;
759 }
760#ifndef ROOT_XrdFour
761 SafeFree(fqn);
762#endif
763 }
764 // Done
765 return rc;
766}
767
768////////////////////////////////////////////////////////////////////////////////
769/// Process a readbuf request
770
772{
773 XPDLOC(NMGR, "NetMgr::ReadBuffer")
774
775 int rc = 0;
776 XPD_SETRESP(p, "ReadBuffer");
777
778 XrdOucString emsg;
779
780 // Unmarshall the data
781 //
782 kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
783 int len = ntohl(p->Request()->readbuf.len);
784
785 // Find out the file name
786 char *file = 0;
787 char *filen = 0;
788 char *pattern = 0;
789 int dlen = p->Request()->header.dlen;
790 int grep = ntohl(p->Request()->readbuf.int1);
791 int blen = dlen;
792 bool local = 0;
793 if (dlen > 0 && p->Argp()->buff) {
794 file = new char[dlen+1];
795 memcpy(file, p->Argp()->buff, dlen);
796 file[dlen] = 0;
797 // Check if local
799 if (ui.Host.length() > 0) {
800 // Check locality
801 local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
802 if (local) {
803 memcpy(file, ui.File.c_str(), ui.File.length());
804 file[ui.File.length()] = 0;
805 blen = ui.File.length();
806 TRACEP(p, DBG, "file is LOCAL");
807 }
808 }
809 // If grep, extract the pattern
810 if (grep > 0) {
811 // 'grep' operation: len is the length of the 'pattern' to be grepped
812 pattern = new char[len + 1];
813 int j = blen - len;
814 int i = 0;
815 while (j < blen)
816 pattern[i++] = file[j++];
817 pattern[i] = 0;
818 filen = strdup(file);
819 filen[blen - len] = 0;
820 TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
821 }
822 } else {
823 emsg = "file name not found";
824 TRACEP(p, XERR, emsg);
825 response->Send(kXR_InvalidRequest, emsg.c_str());
826 return 0;
827 }
828 if (grep) {
829 TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
830 ", pattern: " << pattern);
831 } else {
832 TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
833 }
834
835 // Get the buffer
836 int lout = len;
837 char *buf = 0;
838 if (local) {
839 if (grep > 0) {
840 // Grep local file
841 lout = blen; // initial length
842 buf = ReadBufferLocal(filen, pattern, lout, grep);
843 } else {
844 // Read portion of local file
845 buf = ReadBufferLocal(file, ofs, lout);
846 }
847 } else {
848 // Read portion of remote file
850 if (u.User.length() <= 0)
851 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
852 buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
853 }
854
855 bool sent = 0;
856 if (!buf) {
857 if (lout > 0) {
858 if (grep > 0) {
859 if (TRACING(DBG)) {
860 XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
861 TRACEP(p, DBG, emsg);
862 }
863 response->Send();
864 sent = 1;
865 } else {
866 XPDFORM(emsg, "could not read buffer from %s %s",
867 (local) ? "local file " : "remote file ", file);
868 TRACEP(p, XERR, emsg);
869 response->Send(kXR_InvalidRequest, emsg.c_str());
870 sent = 1;
871 }
872 } else {
873 // Just got an empty buffer
874 if (TRACING(DBG)) {
875 emsg = "nothing found in ";
876 emsg += (grep > 0) ? filen : file;
877 TRACEP(p, DBG, emsg);
878 }
879 }
880 }
881
882 // Send back to user
883 if (!sent)
884 response->Send(buf, lout);
885
886 // Cleanup
887 SafeFree(buf);
889 SafeFree(filen);
890 SafeDelArray(pattern);
891
892 // Done
893 return 0;
894}
895
896////////////////////////////////////////////////////////////////////////////////
897/// Locate the exact file path allowing for wildcards '*' in the file name.
898/// In case of success, returns 0 and fills file wity the first matching instance.
899/// Return -1 if no matching pat is found.
900
902{
903 XPDLOC(NMGR, "NetMgr::LocateLocalFile")
904
905 // If no wild cards or empty, nothing to do
906 if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
907
908 // Locate the file name and the dir
909 XrdOucString fn, dn;
910 int isl = file.rfind('/');
911 if (isl != STR_NPOS) {
912 fn.assign(file, isl + 1, -1);
913 dn.assign(file, 0, isl);
914 } else {
915 fn = file;
916 dn = "./";
917 }
918
919 XrdOucString emsg;
920 // Scan the dir
921 DIR *dirp = opendir(dn.c_str());
922 if (!dirp) {
923 XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
924 TRACE(XERR, emsg.c_str());
925 return -1;
926 }
927 struct dirent *ent = 0;
928 XrdOucString sent;
929 while ((ent = readdir(dirp))) {
930 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
931 continue;
932 // Check the match
933 sent = ent->d_name;
934 if (sent.matches(fn.c_str()) > 0) break;
935 sent = "";
936 }
937 closedir(dirp);
938
939 // If found fill a new output
940 if (sent.length() > 0) {
941 XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
942 return 0;
943 }
944
945 // Not found
946 return -1;
947}
948
949////////////////////////////////////////////////////////////////////////////////
950/// Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
951/// returned buffer must be freed by the caller.
952/// Wild cards '*' are allowed in the file name of 'path'; the first matching
953/// instance is taken.
954/// Returns 0 in case of error.
955
956char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
957{
958 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
959
960 XrdOucString emsg;
961 TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
962
963 // Check input
964 if (!path || strlen(path) <= 0) {
965 TRACE(XERR, "path undefined!");
966 return (char *)0;
967 }
968
969 // Locate the path resolving wild cards
970 XrdOucString spath(path);
971 if (LocateLocalFile(spath) != 0) {
972 TRACE(XERR, "path cannot be resolved! (" << path << ")");
973 return (char *)0;
974 }
975 const char *file = spath.c_str();
976
977 // Open the file in read mode
978 int fd = open(file, O_RDONLY);
979 if (fd < 0) {
980 emsg = "could not open ";
981 emsg += file;
982 TRACE(XERR, emsg);
983 return (char *)0;
984 }
985
986 // Size of the output
987 struct stat st;
988 if (fstat(fd, &st) != 0) {
989 emsg = "could not get size of file with stat: errno: ";
990 emsg += (int)errno;
991 TRACE(XERR, emsg);
992 close(fd);
993 return (char *)0;
994 }
995 off_t ltot = st.st_size;
996
997 // Estimate offsets of the requested range
998 // Start from ...
999 kXR_int64 start = ofs;
1000 off_t fst = (start < 0) ? ltot + start : start;
1001 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1002 // End at ...
1003 kXR_int64 end = fst + len;
1004 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1005 TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
1006
1007 // Number of bytes to be read
1008 len = lst - fst;
1009
1010 // Output buffer
1011 char *buf = (char *)malloc(len + 1);
1012 if (!buf) {
1013 emsg = "could not allocate enough memory on the heap: errno: ";
1014 emsg += (int)errno;
1015 XPDERR(emsg);
1016 close(fd);
1017 return (char *)0;
1018 }
1019
1020 // Reposition, if needed
1021 if (fst >= 0)
1022 lseek(fd, fst, SEEK_SET);
1023
1024 int left = len;
1025 int pos = 0;
1026 int nr = 0;
1027 do {
1028 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1029 errno = 0;
1030 if (nr < 0) {
1031 TRACE(XERR, "error reading from file: errno: " << errno);
1032 break;
1033 }
1034
1035 // Update counters
1036 pos += nr;
1037 left -= nr;
1038
1039 } while (nr > 0 && left > 0);
1040
1041 // Termination
1042 buf[len] = 0;
1043 TRACE(HDBG, "read " << nr << " bytes: " << buf);
1044
1045 // Close file
1046 close(fd);
1047
1048 // Done
1049 return buf;
1050}
1051
1052////////////////////////////////////////////////////////////////////////////////
1053/// Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
1054/// must be freed by the caller.
1055/// Wild cards '*' are allowed in the file name of 'path'; the first matching
1056/// instance is taken.
1057/// Returns 0 in case of error.
1058
1060 const char *pat, int &len, int opt)
1061{
1062 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
1063
1064 XrdOucString emsg;
1065 TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
1066
1067 // Check input
1068 if (!path || strlen(path) <= 0) {
1069 TRACE(XERR, "file path undefined!");
1070 return (char *)0;
1071 }
1072
1073 // Locate the path resolving wild cards
1074 XrdOucString spath(path);
1075 if (LocateLocalFile(spath) != 0) {
1076 TRACE(XERR, "path cannot be resolved! (" << path << ")");
1077 return (char *)0;
1078 }
1079 const char *file = spath.c_str();
1080
1081 // Size of the output
1082 struct stat st;
1083 if (stat(file, &st) != 0) {
1084 emsg = "could not get size of file with stat: errno: ";
1085 emsg += (int)errno;
1086 TRACE(XERR, emsg);
1087 return (char *)0;
1088 }
1089 off_t ltot = st.st_size;
1090
1091 // The grep command
1092 char *cmd = 0;
1093 int lcmd = 0;
1094 if (pat && strlen(pat) > 0) {
1095 lcmd = strlen(pat) + strlen(file) + 20;
1096 cmd = new char[lcmd];
1097 if (opt == 1) {
1098 snprintf(cmd, lcmd, "grep %s %s", pat, file);
1099 } else if (opt == 2) {
1100 snprintf(cmd, lcmd, "grep -v %s %s", pat, file);
1101 } else if (opt == 3) {
1102 snprintf(cmd, lcmd, "cat %s | %s", file, pat);
1103 } else { // should not be here
1104 snprintf(cmd, lcmd, "cat %s", file);
1105 }
1106 } else {
1107 lcmd = strlen(file) + 10;
1108 cmd = new char[lcmd];
1109 snprintf(cmd, lcmd, "cat %s", file);
1110 }
1111 TRACE(DBG, "cmd: " << cmd);
1112
1113 // Execute the command in a pipe
1114 FILE *fp = popen(cmd, "r");
1115 if (!fp) {
1116 emsg = "could not run '";
1117 emsg += cmd;
1118 emsg += "'";
1119 TRACE(XERR, emsg);
1120 delete[] cmd;
1121 return (char *)0;
1122 }
1123 delete[] cmd;
1124
1125 // Read line by line
1126 len = 0;
1127 char *buf = 0;
1128 char line[2048];
1129 int bufsiz = 0, left = 0, lines = 0;
1130 while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
1131 // Parse the line
1132 int llen = strlen(line);
1133 ltot -= llen;
1134 lines++;
1135 // (Re-)allocate the buffer
1136 if (!buf || (llen > left)) {
1137 int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1138 dsiz = (dsiz > llen) ? dsiz : llen;
1139 bufsiz += dsiz;
1140 buf = (char *)realloc(buf, bufsiz + 1);
1141 left += dsiz;
1142 }
1143 if (!buf) {
1144 emsg = "could not allocate enough memory on the heap: errno: ";
1145 emsg += (int)errno;
1146 TRACE(XERR, emsg);
1147 pclose(fp);
1148 return (char *)0;
1149 }
1150 // Add line to the buffer
1151 memcpy(buf + len, line, llen);
1152 len += llen;
1153 left -= llen;
1154 if (TRACING(HDBG))
1155 fprintf(stderr, "line: %s", line);
1156 }
1157
1158 // Check the result and terminate the buffer
1159 if (buf) {
1160 if (len > 0) {
1161 buf[len] = 0;
1162 } else {
1163 free(buf);
1164 buf = 0;
1165 }
1166 }
1167
1168 // Close file
1169 pclose(fp);
1170
1171 // Done
1172 return buf;
1173}
1174
1175////////////////////////////////////////////////////////////////////////////////
1176/// Send a read buffer request of length 'len' at offset 'ofs' for remote file
1177/// defined by 'url'; the returned buffer must be freed by the caller.
1178/// Returns 0 in case of error.
1179
1180char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
1181 kXR_int64 ofs, int &len, int grep)
1182{
1183 XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
1184
1185 TRACE(REQ, "url: " << (url ? url : "undef") <<
1186 ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
1187 ", len: " << len << ", grep: " << grep);
1188
1189 // Check input
1190 if (!file || strlen(file) <= 0) {
1191 TRACE(XERR, "file undefined!");
1192 return (char *)0;
1193 }
1194 XrdClientUrlInfo u(url);
1195 if (!url || strlen(url) <= 0) {
1196 // Use file as url
1197 u.TakeUrl(XrdOucString(file));
1198 if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
1199 }
1200
1201 // Get a connection (logs in)
1202 XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
1203
1204 char *buf = 0;
1205 if (conn && conn->IsValid()) {
1206 // Prepare request
1207 XPClientRequest reqhdr;
1208 memset(&reqhdr, 0, sizeof(reqhdr));
1209 conn->SetSID(reqhdr.header.streamid);
1210 reqhdr.header.requestid = kXP_readbuf;
1211 reqhdr.readbuf.ofs = ofs;
1212 reqhdr.readbuf.len = len;
1213 reqhdr.readbuf.int1 = grep;
1214 reqhdr.header.dlen = strlen(file);
1215 const void *btmp = (const void *) file;
1216 char **vout = &buf;
1217 // Send over
1218 XrdClientMessage *xrsp =
1219 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
1220
1221 // If positive answer
1222 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1223 len = xrsp->DataLen();
1224 } else {
1225 if (xrsp && !(xrsp->IsError()))
1226 // The buffer was just empty: do not call it error
1227 len = 0;
1228 SafeFree(buf);
1229 }
1230
1231 // Clean the message
1232 SafeDelete(xrsp);
1233 // Clean it up, to avoid leaving open tcp connection possibly going forever
1234 // into CLOSE_WAIT
1235 SafeDelete(conn);
1236 }
1237
1238 // Done
1239 return buf;
1240}
1241
1242////////////////////////////////////////////////////////////////////////////////
1243/// Get log paths from next tier; used in multi-master setups
1244/// Returns 0 in case of error.
1245
1246char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
1247{
1248 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1249
1250 TRACE(REQ, "url: " << (url ? url : "undef") <<
1251 ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1252
1253 // Check input
1254 if (!url || strlen(url) <= 0) {
1255 TRACE(XERR, "url undefined!");
1256 return (char *)0;
1257 }
1258
1259 // Get a connection (logs in)
1260 XrdProofConn *conn = GetProofConn(url);
1261
1262 char *buf = 0;
1263 if (conn && conn->IsValid()) {
1264 // Prepare request
1265 XPClientRequest reqhdr;
1266 memset(&reqhdr, 0, sizeof(reqhdr));
1267 conn->SetSID(reqhdr.header.streamid);
1268 reqhdr.header.requestid = kXP_admin;
1269 reqhdr.proof.int1 = kQueryLogPaths;
1270 reqhdr.proof.int2 = isess;
1271 reqhdr.proof.sid = -1;
1272 reqhdr.header.dlen = msg ? strlen(msg) : 0;
1273 const void *btmp = (const void *) msg;
1274 char **vout = &buf;
1275 // Send over
1276 XrdClientMessage *xrsp =
1277 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
1278
1279 // If positive answer
1280 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1281 int len = xrsp->DataLen();
1282 buf = (char *) realloc((void *)buf, len + 1);
1283 if (buf)
1284 buf[len] = 0;
1285 } else {
1286 SafeFree(buf);
1287 }
1288
1289 // Clean the message
1290 SafeDelete(xrsp);
1291 // Clean it up, to avoid leaving open tcp connection possibly going forever
1292 // into CLOSE_WAIT
1293 SafeDelete(conn);
1294 }
1295
1296 // Done
1297 return buf;
1298}
1299
1300////////////////////////////////////////////////////////////////////////////////
1301/// Get log paths from next tier; used in multi-master setups
1302/// Returns 0 in case of error.
1303
1304char *XrdProofdNetMgr::ReadLogPaths(const char *msg, int isess)
1305{
1306 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1307
1308 TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1309
1310 char *buf = 0, *pbuf = buf;
1311 int len = 0;
1312 // Loop over unique nodes
1313 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
1314 XrdProofWorker *w = 0;
1315 while (iw != fNodes.end()) {
1316 if ((w = *iw)) {
1317 // Do not send it to ourselves
1318 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
1319 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
1320 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
1321 if (!us) {
1322 // Create 'url'
1323 XrdOucString u = fMgr->EffectiveUser();
1324 u += '@';
1325 u += w->fHost;
1326 if (w->fPort != -1) {
1327 u += ':';
1328 u += w->fPort;
1329 }
1330 // Ask the node
1331 char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
1332 if (bmst) {
1333 len += strlen(bmst) + 1;
1334 buf = (char *) realloc((void *)buf, len);
1335 pbuf = buf + len - strlen(bmst) - 1;
1336 memcpy(pbuf, bmst, strlen(bmst) + 1);
1337 buf[len - 1] = 0;
1338 pbuf = buf + len;
1339 free(bmst);
1340 }
1341 } else {
1342 TRACE(DBG, "request for ourselves: ignore");
1343 }
1344 }
1345 // Next worker
1346 ++iw;
1347 }
1348
1349 // Done
1350 return buf;
1351}
1352
1353////////////////////////////////////////////////////////////////////////////////
1354/// Fill-in fWorkers for a localhost based on the number of
1355/// workers fNumLocalWrks.
1356
1358{
1359 XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
1360
1361 TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
1362
1363 // Lock the method to protect the lists.
1365
1366 // Cleanup the worker list
1367 fWorkers.clear();
1368 // The first time we need to create the default workers
1369 if (fDfltWorkers.size() < 1) {
1370 // Create a default master line
1371 XrdOucString mm("master ", 128);
1372 mm += fMgr->Host();
1373 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1374
1375 // Create 'localhost' lines for each worker
1376 int nwrk = fNumLocalWrks;
1377 if (nwrk > 0) {
1378 mm = "worker localhost port=";
1379 mm += fMgr->Port();
1380 while (nwrk--) {
1381 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1382 TRACE(DBG, "added line: " << mm);
1383 }
1384 }
1385 }
1386
1387 // Copy the list
1388 std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
1389 for (; w != fDfltWorkers.end(); ++w) {
1390 fWorkers.push_back(*w);
1391 }
1392
1393 TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
1394
1395 // Find unique nodes
1397
1398 // We are done
1399 return;
1400}
1401
1402////////////////////////////////////////////////////////////////////////////////
1403/// Return the list of workers after having made sure that the info is
1404/// up-to-date
1405
1406std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
1407{
1408 XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
1409
1411
1412 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1413 // Check if there were any changes in the config file
1414 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1415 if (fDfltFallback) {
1416 // Use default settings
1418 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1419 } else {
1420 TRACE(XERR, "unable to read the configuration file");
1421 return (std::list<XrdProofWorker *> *)0;
1422 }
1423 }
1424 }
1425 TRACE(DBG, "returning list with " << fWorkers.size() << " entries");
1426
1427 if (TRACING(HDBG)) Dump();
1428
1429 return &fWorkers;
1430}
1431
1432////////////////////////////////////////////////////////////////////////////////
1433/// Dump status
1434
1436{
1437 const char *xpdloc = "NetMgr::Dump";
1438
1440
1441 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1442 XPDPRT("+ Active workers status");
1443 XPDPRT("+ Size: " << fWorkers.size());
1444 XPDPRT("+ ");
1445
1446 std::list<XrdProofWorker *>::iterator iw;
1447 for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
1448 XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
1449 " active sessions:" << (*iw)->Active());
1450 }
1451 XPDPRT("+ ");
1452 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1453}
1454
1455////////////////////////////////////////////////////////////////////////////////
1456/// Return the list of unique nodes after having made sure that the info is
1457/// up-to-date
1458
1459std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
1460{
1461 XPDLOC(NMGR, "NetMgr::GetNodes")
1462
1464
1465 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1466 // Check if there were any changes in the config file
1467 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1468 if (fDfltFallback) {
1469 // Use default settings
1471 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1472 } else {
1473 TRACE(XERR, "unable to read the configuration file");
1474 return (std::list<XrdProofWorker *> *)0;
1475 }
1476 }
1477 }
1478 TRACE(DBG, "returning list with " << fNodes.size() << " entries");
1479
1480 return &fNodes;
1481}
1482
1483////////////////////////////////////////////////////////////////////////////////
1484/// Read PROOF config file and load the information in fWorkers.
1485/// NB: 'master' information here is ignored, because it is passed
1486/// via the 'xpd.workdir' and 'xpd.image' config directives
1487
1489{
1490 XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
1491
1492 TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
1493
1494 // Lock the method to protect the lists.
1496
1497 // Check inputs
1498 if (fPROOFcfg.fName.length() <= 0)
1499 return -1;
1500
1501 // Get the modification time
1502 struct stat st;
1503 if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
1504 // If the file disappeared, reset the modification time so that we are sure
1505 // to reload it if it comes back
1506 if (errno == ENOENT) fPROOFcfg.fMtime = -1;
1507 if (!fDfltFallback) {
1508 TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
1509 } else {
1510 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
1511 }
1512 return -1;
1513 }
1514 TRACE(DBG, "time of last modification: " << st.st_mtime);
1515
1516 // File should be loaded only once
1517 if (st.st_mtime <= fPROOFcfg.fMtime)
1518 return 0;
1519
1520 // Save the modification time
1521 fPROOFcfg.fMtime = st.st_mtime;
1522
1523 // Open the defined path.
1524 FILE *fin = 0;
1525 if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
1526 if (fWorkers.size() > 1) {
1527 TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
1528 TRACE(XERR, "continuing with existing list of workers.");
1529 return 0;
1530 } else {
1531 return -1;
1532 }
1533 }
1534
1535 if (reset) {
1536 // Cleanup the worker list
1537 fWorkers.clear();
1538 }
1539
1540 // Add default a master line if not yet there
1541 if (fRegWorkers.size() < 1) {
1542 XrdOucString mm("master ", 128);
1543 mm += fMgr->Host();
1544 fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
1545 } else {
1546 // Deactivate all current active workers
1547 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1548 // Skip the master line
1549 ++w;
1550 for (; w != fRegWorkers.end(); ++w) {
1551 (*w)->fActive = 0;
1552 }
1553 }
1554
1555 // Read now the directives
1556 int nw = 0;
1557 char lin[2048];
1558 while (fgets(lin, sizeof(lin), fin)) {
1559 // Skip empty lines
1560 int p = 0;
1561 while (lin[p++] == ' ') {
1562 ;
1563 }
1564 p--;
1565 if (lin[p] == '\0' || lin[p] == '\n')
1566 continue;
1567
1568 // Skip comments
1569 if (lin[0] == '#')
1570 continue;
1571
1572 // Remove trailing '\n';
1573 if (lin[strlen(lin)-1] == '\n')
1574 lin[strlen(lin)-1] = '\0';
1575
1576 TRACE(DBG, "found line: " << lin);
1577
1578 // Parse the line
1579 XrdProofWorker *pw = new XrdProofWorker(lin);
1580
1581 const char *pfx[2] = { "master", "node" };
1582 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1583 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1584 // Init a master instance
1585 if (pw->fHost.beginswith("localhost") ||
1586 pw->Matches(fMgr->Host())) {
1587 // Replace the default line (the first with what found in the file)
1588 XrdProofWorker *fw = fRegWorkers.front();
1589 fw->Reset(lin);
1590 }
1591 // Ignore it
1592 SafeDelete(pw);
1593 } else {
1594 // Check if we have already it
1595 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1596 // Skip the master line
1597 ++w;
1598 bool haveit = 0;
1599 while (w != fRegWorkers.end()) {
1600 if (!((*w)->fActive)) {
1601 if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
1602 (*w)->fActive = 1;
1603 haveit = 1;
1604 break;
1605 }
1606 }
1607 // Go to next
1608 ++w;
1609 }
1610 // If we do not have it, build a new worker object
1611 if (!haveit) {
1612 // Keep it
1613 fRegWorkers.push_back(pw);
1614 } else {
1615 // Drop it
1616 SafeDelete(pw);
1617 }
1618 }
1619 }
1620
1621 // Copy the active workers
1622 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1623 while (w != fRegWorkers.end()) {
1624 if ((*w)->fActive) {
1625 fWorkers.push_back(*w);
1626 nw++;
1627 }
1628 ++w;
1629 }
1630
1631 // Close files
1632 fclose(fin);
1633
1634 // Find unique nodes
1635 if (reset)
1637
1638 // We are done
1639 return ((nw == 0) ? -1 : 0);
1640}
1641
1642////////////////////////////////////////////////////////////////////////////////
1643/// Scan fWorkers for unique nodes (stored in fNodes).
1644/// Return the number of unque nodes.
1645/// NB: 'master' information here is ignored, because it is passed
1646/// via the 'xpd.workdir' and 'xpd.image' config directives
1647
1649{
1650 XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
1651
1652 TRACE(REQ, "# workers: " << fWorkers.size());
1653
1654 // Cleanup the nodes list
1655 fNodes.clear();
1656
1657 // Build the list of unique nodes (skip the master line);
1658 if (fWorkers.size() > 1) {
1659 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
1660 ++w;
1661 for (; w != fWorkers.end(); ++w) if ((*w)->fActive) {
1662 bool add = 1;
1663 std::list<XrdProofWorker *>::iterator n;
1664 for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
1665 if ((*n)->Matches(*w)) {
1666 add = 0;
1667 break;
1668 }
1669 }
1670 if (add)
1671 fNodes.push_back(*w);
1672 }
1673 }
1674 TRACE(REQ, "found " << fNodes.size() << " unique nodes");
1675
1676 // We are done
1677 return fNodes.size();
1678}
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:550
#define d(i)
Definition: RSha256.hxx:102
#define e(i)
Definition: RSha256.hxx:103
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
static unsigned int total
int type
Definition: TGX11.cxx:120
double floor(double)
@ kXP_readbuf
@ kXP_admin
@ kXP_ctrlc
#define kXPD_Master
@ kRTStatic
@ kRTNone
#define kXPD_AnyServer
@ kCleanupSessions
@ kExec
@ kQueryLogPaths
@ kROOTVersion
#define kXPD_Worker
@ kXPD_srvmsg
#define XrdSysError
Definition: XpdSysError.h:8
#define NAME_REQUESTTIMEOUT
#define xrdmin(a, b)
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
#define XPDFORM
Definition: XrdProofdAux.h:378
#define SafeFree(x)
Definition: XrdProofdAux.h:338
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define SafeDelArray(x)
Definition: XrdProofdAux.h:335
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
#define XPD_LONGOK(x)
#define XPD_SETRESP(p, x)
#define XPDPRT(x)
#define XPDLOC(d, x)
#define TRACEP(p, act, x)
#define TRACING(x)
#define XPDERR(x)
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
#define realloc
Definition: civetweb.c:1538
#define free
Definition: civetweb.c:1539
#define snprintf
Definition: civetweb.c:1540
#define malloc
Definition: civetweb.c:1536
void TakeUrl(XrdOucString url)
XrdOucString GetUrl()
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
Definition: XrdProofConn.h:136
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
void Reset(const char *str)
Set content from a config file-like string.
bool Matches(const char *host)
Check compatibility of host with this instance.
XrdOucString fUser
XrdOucString fHost
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
const char * User() const
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdOucString fName
Definition: XrdProofdAux.h:73
XrdProofdNetMgr * NetMgr() const
const char * Host() const
int SrvType() const
const char * EffectiveUser() const
XrdOucString Get(int i)
Return i-th combination (i : 0 -> fN-1)
bool IsValid() const
Definition: XrdProofdAux.h:164
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
std::list< XrdProofWorker * > fDfltWorkers
void BalanceNodesOrder()
Indices (this will be used twice).
std::list< XrdProofWorker * > fNodes
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdSysRecMutex fMutex
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdProofdFile fPROOFcfg
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
void Dump()
Dump status.
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
void RegisterDirectives()
Register config directives.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
std::list< XrdProofWorker * > fWorkers
XrdProofdManager * fMgr
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
std::list< XrdProofWorker * > fRegWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual ~XrdProofdNetMgr()
Destructor.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
XrdProofdClient * Client() const
XrdBuffer * Argp() const
XPClientRequest * Request() const
TLine * line
const Int_t n
Definition: legend1.C:16
static constexpr double us
static constexpr double s
static constexpr double pi
static constexpr double mm
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...
Definition: file.py:1
auto * m
Definition: textangle.C:8
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientReadbufRequest readbuf