Loading [MathJax]/extensions/tex2jax.js
Logo ROOT   6.16/01
Reference Guide
•All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Modules Pages
XrdProofdNetMgr.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis Jan 2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11#include "XrdProofdPlatform.h"
12
13//////////////////////////////////////////////////////////////////////////
14// //
15// XrdProofdNetMgr //
16// //
17// Authors: G. Ganis, CERN, 2008 //
18// //
19// Manages connections between PROOF server daemons //
20// //
21//////////////////////////////////////////////////////////////////////////
22
23#include "XrdProofdNetMgr.h"
24
25#include "XrdProofdXrdVers.h"
26#ifndef ROOT_XrdFour
27# include "XpdSysDNS.h"
28#else
29# include "XrdNet/XrdNetAddr.hh"
30#endif
31#include "Xrd/XrdBuffer.hh"
36#include "XrdOuc/XrdOucStream.hh"
37#include "XrdSys/XrdSysPlatform.hh"
38
39#include "XrdProofdClient.h"
40#include "XrdProofdManager.h"
41#include "XrdProofdProtocol.h"
42#include "XrdProofdResponse.h"
43#include "XrdProofWorker.h"
44
45// Tracing utilities
46#include "XrdProofdTrace.h"
47
48#include <algorithm>
49#include <limits>
50#include <math.h>
51
52////////////////////////////////////////////////////////////////////////////////
53/// Send up a message from the server
54
55int MessageSender(const char *msg, int len, void *arg)
56{
58 if (r) {
59 return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
60 }
61 return -1;
62}
63
64////////////////////////////////////////////////////////////////////////////////
65/// Constructor
66
68 XrdProtocol_Config *pi, XrdSysError *e)
69 : XrdProofdConfig(pi->ConfigFN, e)
70{
71 fMgr = mgr;
73 fPROOFcfg.fName = "";
74 fPROOFcfg.fMtime = -1;
76 fDfltFallback = 0;
77 fDfltWorkers.clear();
78 fRegWorkers.clear();
79 fWorkers.clear();
80 fNodes.clear();
82 fWorkerUsrCfg = 0;
83 fRequestTO = 30;
84 fBonjourEnabled = false;
85
86 // Configuration directives
88}
89
90////////////////////////////////////////////////////////////////////////////////
91/// Register config directives
92
94{
95 Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
96 Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
97 Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
98 Register("bonjour", new XrdProofdDirective("bonjour", this, &DoDirectiveClass));
99 Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
100}
101
102////////////////////////////////////////////////////////////////////////////////
103/// Destructor
104
106{
107 // Cleanup the worker lists
108 // (the nodes list points to the same object, no cleanup is needed)
109 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
110 while (w != fRegWorkers.end()) {
111 delete *w;
112 w = fRegWorkers.erase(w);
113 }
114 w = fDfltWorkers.begin();
115 while (w != fDfltWorkers.end()) {
116 delete *w;
117 w = fDfltWorkers.erase(w);
118 }
119 fWorkers.clear();
120}
121
122////////////////////////////////////////////////////////////////////////////////
123/// Run configuration and parse the entered config directives.
124/// Return 0 on success, -1 on error
125
127{
128 XPDLOC(NMGR, "NetMgr::Config")
129
130 // Lock the method to protect the lists.
132
133 // Cleanup the worker list
134 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
135 while (w != fWorkers.end()) {
136 delete *w;
137 w = fWorkers.erase(w);
138 }
139 // Create a default master line
140 XrdOucString mm("master ", 128);
141 mm += fMgr->Host();
142 mm += " port=";
143 mm += fMgr->Port();
144 fWorkers.push_back(new XrdProofWorker(mm.c_str()));
145
146 // Run first the configurator
147 if (XrdProofdConfig::Config(rcf) != 0) {
148 XPDERR("problems parsing file ");
149 return -1;
150 }
151
152 XrdOucString msg;
153 msg = (rcf) ? "re-configuring" : "configuring";
154 TRACE(ALL, msg);
155
156 if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
157 TRACE(ALL, "PROOF config file: " <<
158 ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
159 if (fResourceType == kRTStatic) {
160 // Initialize the list of workers if a static config has been required
161 // Default file path, if none specified
162 bool dodefault = 1;
163 if (fPROOFcfg.fName.length() > 0) {
164 // Load file content in memory
165 if (ReadPROOFcfg() == 0) {
166 TRACE(ALL, "PROOF config file will " <<
167 ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
168 dodefault = 0;
169 } else {
170 if (!fDfltFallback) {
171 XPDERR("unable to find valid information in PROOF config file " <<
173 fPROOFcfg.fMtime = -1;
174 return 0;
175 } else {
176 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
177 }
178 }
179 }
180 if (dodefault) {
181 // Use default
183 }
184 } else if (fResourceType == kRTNone && fWorkers.size() <= 1 && !fBonjourEnabled) {
185 // Nothing defined: use default
187 }
188
189 // Find unique nodes
191 }
192
193 // For connection to the other xproofds we try only once
195 // Request Timeout
197
198 // Notification
199 XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
200 TRACE(ALL, msg);
201
202 // Done
203 return 0;
204}
205
206////////////////////////////////////////////////////////////////////////////////
207/// Update the priorities of the active sessions.
208
210 char *val, XrdOucStream *cfg, bool rcf)
211{
212 XPDLOC(NMGR, "NetMgr::DoDirective")
213
214 if (!d)
215 // undefined inputs
216 return -1;
217
218 if (d->fName == "resource") {
219 return DoDirectiveResource(val, cfg, rcf);
220 } else if (d->fName == "adminreqto") {
221 return DoDirectiveAdminReqTO(val, cfg, rcf);
222 } else if (d->fName == "worker") {
223 return DoDirectiveWorker(val, cfg, rcf);
224 } else if (d->fName == "bonjour") {
225 return DoDirectiveBonjour(val, cfg, rcf);
226 }
227
228 TRACE(XERR, "unknown directive: " << d->fName);
229
230 return -1;
231}
232
233////////////////////////////////////////////////////////////////////////////////
234
235int XrdProofdNetMgr::DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
236{
237 XPDLOC(NMGR, "NetMgr::DoDirectiveBonjour");
238
239 // Process 'bonjour' directive
240 TRACE(DBG, "processing Bonjour directive");
241
242 if (!val || !cfg)
243 // undefined inputs
244 return -1;
245
246 TRACE(XERR, "Bonjour support is disabled");
247 return -1;
248}
249
250////////////////////////////////////////////////////////////////////////////////
251/// Indices (this will be used twice).
252
254{
255 list<XrdProofWorker *>::const_iterator iter, iter2;
256 list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
257 // Map to store information of the balancer.
258 map<XrdProofWorker *, BalancerInfo> info;
259 // Node with minimum number of workers distinct to 1.
260 unsigned int min = UINT_MAX;
261 // Total number of nodes and per iteration assignments.
262 unsigned int total = 0, total_perit = 0;
263 // Number of iterations to get every node filled.
264 unsigned int total_added = 0;
265 // Temporary list to store the balanced configuration
266 list<XrdProofWorker *> tempNodes;
267 // Flag for the search and destroy loop.
268 bool deleted;
269
270 // Fill the information store with the first data (number of nodes).
271 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
272 // The next code is not the same as this:
273 //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
274 // The previous piece of STL code only checks the pointer of the value
275 // stored on the list, altough it is more efficient, it needs that repeated
276 // nodes point to the same object. To allow hybrid configurations, we are
277 // doing a 'manually' matching since statically configured nodes are
278 // created in multiple ways.
279 info[*iter].available = 0;
280 for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
281 if ((*iter)->Matches(*iter2)) {
282 info[*iter].available++;
283 }
284 }
285 info[*iter].added = 0;
286 // Calculate the minimum greater than 1.
287 if (info[*iter].available > 1 && info[*iter].available < min)
288 min = info[*iter].available;
289 // Calculate the totals.
290 total += info[*iter].available;
291 }
292
293 // Now, calculate the number of workers to add in each iteration of the
294 // round robin, scaling to the smaller number.
295 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
296 if (info[*iter].available > 1) {
297 info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
298 } else {
299 info[*iter].per_iteration = 1;
300 }
301 // Calculate the totals.
302 total_perit += info[*iter].per_iteration;
303 }
304
305 // Since we are going to substitute the list, don't forget to recover the
306 // default node at the fist time.
307 tempNodes.push_back(fWorkers.front());
308
309 // Finally, do the round robin assignment of nodes.
310 // Stop when every node has its workers processed.
311 while (total_added < total) {
312 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
313 if (i->second.added < i->second.available) {
314 // Be careful with the remainders (on prime number of nodes).
315 unsigned int to_add = xrdmin(i->second.per_iteration,
316 (i->second.available - i->second.added));
317 // Then add the nodes.
318 for (unsigned int j = 0; j < to_add; j++) {
319 tempNodes.push_back(i->first);
320 }
321 i->second.added += to_add;
322 total_added += to_add;
323 }
324 }
325 }
326
327 // Since we are mergin nodes in only one object, we must merge the current
328 // sessions of the static nodes (that can be distinct objects that represent
329 // the same node) and delete the orphaned objects. If, in the future, we can
330 // assure that every worker has only one object in the list, this is not more
331 // necessary. The things needed to change are the DoDirectiveWorker, it must
332 // search for a node before inserting it, and in the repeat directive insert
333 // the same node always. Also the default configuration methods (there are
334 // two in this class) must be updated.
335 iter3 = ++(fWorkers.begin());
336 while (iter3 != fWorkers.end()) {
337 deleted = false;
338 // If the worker is not in the fWorkers list, we must process it. Note that
339 // std::count() uses a plain comparison between values, in this case, we
340 // are comparing pointers (numbers, at the end).
341 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
342 // Search for an object that matches with this in the temp list.
343 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
344 if ((*iter2)->Matches(*iter3)) {
345 // Copy data and delete the *iter object.
346 (*iter2)->MergeProofServs(*(*iter3));
347 deleted = true;
348 delete *iter3;
349 fWorkers.erase(iter3++);
350 break;
351 }
352 }
353 }
354 // Do not forget to increase the iterator.
355 if (!deleted)
356 ++iter3;
357 }
358
359 // Then, substitute the current fWorkers list with the balanced one.
360 fWorkers = tempNodes;
361}
362
363////////////////////////////////////////////////////////////////////////////////
364/// Process 'adminreqto' directive
365
366int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
367{
368 if (!val)
369 // undefined inputs
370 return -1;
371
372 // Check deprecated 'if' directive
373 if (fMgr->Host() && cfg)
374 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
375 return 0;
376
377 // Timeout on requested broadcasted to workers; there are 4 attempts,
378 // so the real timeout is 4 x fRequestTO
379 int to = strtol(val, 0, 10);
380 fRequestTO = (to > 0) ? to : fRequestTO;
381 return 0;
382}
383
384////////////////////////////////////////////////////////////////////////////////
385/// Process 'resource' directive
386
387int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
388{
389 XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
390
391 if (!val || !cfg)
392 // undefined inputs
393 return -1;
394
395 if (!strcmp("static", val)) {
396 // We just take the path of the config file here; the
397 // rest is used by the static scheduler
399 while ((val = cfg->GetWord()) && val[0]) {
400 XrdOucString s(val);
401 if (s.beginswith("ucfg:")) {
402 fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
403 } else if (s.beginswith("reload:")) {
404 fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
405 } else if (s.beginswith("dfltfallback:")) {
406 fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
407 } else if (s.beginswith("wmx:")) {
408 } else if (s.beginswith("selopt:")) {
409 } else {
410 // Config file
411 fPROOFcfg.fName = val;
412 if (fPROOFcfg.fName.beginswith("sm:")) {
413 fPROOFcfg.fName.replace("sm:", "");
414 }
416 // Make sure it exists and can be read
417 if (access(fPROOFcfg.fName.c_str(), R_OK)) {
418 if (errno == ENOENT) {
419 TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
420 } else {
421 TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
422 fPROOFcfg.fName = "";
423 fPROOFcfg.fMtime = -1;
424 }
425 }
426 }
427 }
428 }
429 return 0;
430}
431
432////////////////////////////////////////////////////////////////////////////////
433/// Process 'worker' directive
434
435int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
436{
437 XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
438
439 if (!val || !cfg)
440 // undefined inputs
441 return -1;
442
443 // Lock the method to protect the lists.
445
446 // Get the full line (w/o heading keyword)
447 cfg->RetToken();
448 XrdOucString wrd(cfg->GetWord());
449 if (wrd.length() > 0) {
450 // Build the line
451 XrdOucString line;
452 char rest[2048] = {0};
453 cfg->GetRest((char *)&rest[0], 2048);
454 XPDFORM(line, "%s %s", wrd.c_str(), rest);
455 // Parse it now
456 if (wrd == "master" || wrd == "node") {
457 // Init a master instance
458 XrdProofWorker *pw = new XrdProofWorker(line.c_str());
459 if (pw->fHost.beginswith("localhost") ||
460 pw->Matches(fMgr->Host())) {
461 // Replace the default line (the first with what found in the file)
462 XrdProofWorker *fw = fWorkers.front();
463 fw->Reset(line.c_str());
464 }
465 SafeDelete(pw);
466 } else {
467 // How many lines like this?
468 int nr = 1;
469 int ir = line.find("repeat=");
470 if (ir != STR_NPOS) {
471 XrdOucString r(line, ir + strlen("repeat="));
472 r.erase(r.find(' '));
473 nr = r.atoi();
474 if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
475 TRACE(DBG, "found repeat = " << nr);
476 }
477 while (nr--) {
478 // Build the worker object
479 XrdProofdMultiStr mline(line.c_str());
480 if (mline.IsValid()) {
481 TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
482 for (int i = 0; i < mline.N(); i++) {
483 TRACE(HDBG, "found token: " << mline.Get(i));
484 fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
485 }
486 } else {
487 TRACE(DBG, "found line: " << line);
488 fWorkers.push_back(new XrdProofWorker(line.c_str()));
489 }
490 }
491 }
492 }
493
494 // Necessary for the balancer when Bonjour is enabled. Note that this balancer
495 // can also be enabled with a static configuration. By this time is disabled
496 // due to its experimental status.
498 //BalanceNodesOrder();
499
500 return 0;
501}
502
503////////////////////////////////////////////////////////////////////////////////
504/// Broadcast a ctrlc interrupt
505/// Return 0 on success, -1 on error
506
508{
509 XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
510
511 int rc = 0;
512
513 // Loop over unique nodes
514 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
515 XrdProofWorker *w = 0;
516 while (iw != fNodes.end()) {
517 if ((w = *iw) && w->fType != 'M') {
518 // Do not send it to ourselves
519 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
520 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
521 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
522 if (!us) {
523 // Create 'url'
524 // We use the enforced username if specified in the config file; this is the case
525 // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
526 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
527 if (u.length() <= 0) u = fMgr->EffectiveUser();
528 u += '@';
529 u += w->fHost;
530 if (w->fPort != -1) {
531 u += ':';
532 u += w->fPort;
533 }
534 TRACE(HDBG, "sending request to: "<<u);
535 // Get a connection to the server
536 XrdProofConn *conn = GetProofConn(u.c_str());
537 if (conn && conn->IsValid()) {
538 // Prepare request
539 XPClientRequest reqhdr;
540 memset(&reqhdr, 0, sizeof(reqhdr));
541 conn->SetSID(reqhdr.header.streamid);
542 reqhdr.proof.requestid = kXP_ctrlc;
543 reqhdr.proof.sid = 0;
544 reqhdr.proof.dlen = 0;
545 // We need the right order
546 if (XPD::clientMarshall(&reqhdr) != 0) {
547 TRACE(XERR, "problems marshalling request");
548 return -1;
549 }
550 if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
551 TRACE(XERR, "problems sending ctrl-c request to server " << u);
552 }
553 // Clean it up, to avoid leaving open tcp connection possibly going forever
554 // into CLOSE_WAIT
555 SafeDelete(conn);
556 }
557 } else {
558 TRACE(DBG, "broadcast request for ourselves: ignore");
559 }
560 }
561 // Next worker
562 ++iw;
563 }
564
565 // Done
566 return rc;
567}
568
569////////////////////////////////////////////////////////////////////////////////
570/// Broadcast request to known potential sub-nodes.
571/// Return 0 on success, -1 on error
572
573int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
574 XrdProofdResponse *r, bool notify, int subtype)
575{
576 XPDLOC(NMGR, "NetMgr::Broadcast")
577
578 unsigned int nok = 0;
579 TRACE(REQ, "type: " << type);
580
581 // Loop over unique nodes
582 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
583 XrdProofWorker *w = 0;
584 XrdClientMessage *xrsp = 0;
585 while (iw != fNodes.end()) {
586 if ((w = *iw) && w->fType != 'M') {
587 // Do not send it to ourselves
588 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
589 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
590 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
591 if (!us) {
592 // Create 'url'
593 // We use the enforced username if specified in the config file; this is the case
594 // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
595 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
596 if (u.length() <= 0) u = fMgr->EffectiveUser();
597 u += '@';
598 u += w->fHost;
599 if (w->fPort != -1) {
600 u += ':';
601 u += w->fPort;
602 }
603 // Type of server
604 int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
605 : (kXR_int32) kXPD_Worker;
606 TRACE(HDBG, "sending request to " << u);
607 // Send request
608 if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
609 TRACE(XERR, "problems sending request to " << u);
610 } else {
611 nok++;
612 }
613 // Cleanup answer
614 SafeDelete(xrsp);
615 } else {
616 TRACE(DBG, "broadcast request for ourselves: ignore");
617 }
618 }
619 // Next worker
620 ++iw;
621 }
622
623 // Done
624 return (nok == fNodes.size()) ? 0 : -1;
625}
626
627////////////////////////////////////////////////////////////////////////////////
628/// Get a XrdProofConn for url; create a new one if not available
629
631{
632 XrdProofConn *p = 0;
633
634 // If not found create a new one
635 XrdOucString buf = " Manager connection from ";
636 buf += fMgr->Host();
637 buf += "|ord:000";
638 char m = 'A'; // log as admin
639
640 {
642 p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
643 }
644 if (p && !(p->IsValid())) SafeDelete(p);
645
646 // Done
647 return p;
648}
649
650////////////////////////////////////////////////////////////////////////////////
651/// Broadcast request to known potential sub-nodes.
652/// Return 0 on success, -1 on error
653
655 const char *msg, int srvtype,
656 XrdProofdResponse *r, bool notify,
657 int subtype)
658{
659 XPDLOC(NMGR, "NetMgr::Send")
660
661 XrdClientMessage *xrsp = 0;
662 TRACE(REQ, "type: " << type);
663
664 if (!url || strlen(url) <= 0)
665 return xrsp;
666
667 // Get a connection to the server
668 XrdProofConn *conn = GetProofConn(url);
669
670 bool ok = 1;
671 if (conn && conn->IsValid()) {
672 XrdOucString notifymsg("Send: ");
673 // Prepare request
674 XPClientRequest reqhdr;
675 const void *buf = 0;
676 char **vout = 0;
677 memset(&reqhdr, 0, sizeof(reqhdr));
678 conn->SetSID(reqhdr.header.streamid);
679 reqhdr.header.requestid = kXP_admin;
680 reqhdr.proof.int1 = type;
681 switch (type) {
682 case kROOTVersion:
683 notifymsg += "change-of-ROOT version request to ";
684 notifymsg += url;
685 notifymsg += " msg: ";
686 notifymsg += msg;
687 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
688 buf = (msg) ? (const void *)msg : buf;
689 break;
690 case kCleanupSessions:
691 notifymsg += "cleanup request to ";
692 notifymsg += url;
693 notifymsg += " for user: ";
694 notifymsg += msg;
695 reqhdr.proof.int2 = (kXR_int32) srvtype;
696 reqhdr.proof.sid = -1;
697 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
698 buf = (msg) ? (const void *)msg : buf;
699 break;
700 case kExec:
701 notifymsg += "exec ";
702 notifymsg += subtype;
703 notifymsg += "request for ";
704 notifymsg += msg;
705 reqhdr.proof.int2 = (kXR_int32) subtype;
706 reqhdr.proof.sid = -1;
707 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
708 buf = (msg) ? (const void *)msg : buf;
709 break;
710 default:
711 ok = 0;
712 TRACE(XERR, "invalid request type " << type);
713 break;
714 }
715
716 // Notify the client that we are sending the request
717 if (r && notify)
718 r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
719
720 // Activate processing of unsolicited responses
721 conn->SetAsync(conn, &MessageSender, (void *)r);
722
723 // Send over
724 if (ok)
725 xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
726
727 // Deactivate processing of unsolicited responses
728 conn->SetAsync(0, 0, (void *)0);
729
730 // Print error msg, if any
731 if (r && !xrsp && conn->GetLastErr()) {
732 XrdOucString cmsg = url;
733 cmsg += ": ";
734 cmsg += conn->GetLastErr();
735 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
736 }
737 // Clean it up, to avoid leaving open tcp connection possibly going forever
738 // into CLOSE_WAIT
739 SafeDelete(conn);
740
741 } else {
742 TRACE(XERR, "could not open connection to " << url);
743 if (r) {
744 XrdOucString cmsg = "failure attempting connection to ";
745 cmsg += url;
746 r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
747 }
748 }
749
750 // Done
751 return xrsp;
752}
753
754////////////////////////////////////////////////////////////////////////////////
755/// Check if 'host' is this local host. If checkport is true,
756/// matching of the local port with the one implied by host is also checked.
757/// Return 1 if 'local', 0 otherwise
758
759bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
760{
761 XPDLOC(NMGR, "NetMgr::IsLocal")
762
763 int rc = 0;
764 if (host && strlen(host) > 0) {
765 XrdClientUrlInfo uu(host);
766 if (uu.Port <= 0) uu.Port = 1093;
767 // Fully qualified name
768#ifndef ROOT_XrdFour
769 char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
770#else
771 XrdNetAddr aNA;
772 aNA.Set(uu.Host.c_str());
773 char *fqn = (char *) aNA.Name();
774#endif
775 TRACE(HDBG, "fqn: '"<<fqn<<"' mgrh: '"<<fMgr->Host()<<"'");
776 if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
777 !strcmp(fMgr->Host(), fqn))) {
778 if (!checkport || (uu.Port == fMgr->Port()))
779 rc = 1;
780 }
781#ifndef ROOT_XrdFour
782 SafeFree(fqn);
783#endif
784 }
785 // Done
786 return rc;
787}
788
789////////////////////////////////////////////////////////////////////////////////
790/// Process a readbuf request
791
793{
794 XPDLOC(NMGR, "NetMgr::ReadBuffer")
795
796 int rc = 0;
797 XPD_SETRESP(p, "ReadBuffer");
798
799 XrdOucString emsg;
800
801 // Unmarshall the data
802 //
803 kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
804 int len = ntohl(p->Request()->readbuf.len);
805
806 // Find out the file name
807 char *file = 0;
808 char *filen = 0;
809 char *pattern = 0;
810 int dlen = p->Request()->header.dlen;
811 int grep = ntohl(p->Request()->readbuf.int1);
812 int blen = dlen;
813 bool local = 0;
814 if (dlen > 0 && p->Argp()->buff) {
815 file = new char[dlen+1];
816 memcpy(file, p->Argp()->buff, dlen);
817 file[dlen] = 0;
818 // Check if local
820 if (ui.Host.length() > 0) {
821 // Check locality
822 local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
823 if (local) {
824 memcpy(file, ui.File.c_str(), ui.File.length());
825 file[ui.File.length()] = 0;
826 blen = ui.File.length();
827 TRACEP(p, DBG, "file is LOCAL");
828 }
829 }
830 // If grep, extract the pattern
831 if (grep > 0) {
832 // 'grep' operation: len is the length of the 'pattern' to be grepped
833 pattern = new char[len + 1];
834 int j = blen - len;
835 int i = 0;
836 while (j < blen)
837 pattern[i++] = file[j++];
838 pattern[i] = 0;
839 filen = strdup(file);
840 filen[blen - len] = 0;
841 TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
842 }
843 } else {
844 emsg = "file name not found";
845 TRACEP(p, XERR, emsg);
846 response->Send(kXR_InvalidRequest, emsg.c_str());
847 return 0;
848 }
849 if (grep) {
850 TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
851 ", pattern: " << pattern);
852 } else {
853 TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
854 }
855
856 // Get the buffer
857 int lout = len;
858 char *buf = 0;
859 if (local) {
860 if (grep > 0) {
861 // Grep local file
862 lout = blen; // initial length
863 buf = ReadBufferLocal(filen, pattern, lout, grep);
864 } else {
865 // Read portion of local file
866 buf = ReadBufferLocal(file, ofs, lout);
867 }
868 } else {
869 // Read portion of remote file
871 if (u.User.length() <= 0)
872 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
873 buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
874 }
875
876 bool sent = 0;
877 if (!buf) {
878 if (lout > 0) {
879 if (grep > 0) {
880 if (TRACING(DBG)) {
881 XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
882 TRACEP(p, DBG, emsg);
883 }
884 response->Send();
885 sent = 1;
886 } else {
887 XPDFORM(emsg, "could not read buffer from %s %s",
888 (local) ? "local file " : "remote file ", file);
889 TRACEP(p, XERR, emsg);
890 response->Send(kXR_InvalidRequest, emsg.c_str());
891 sent = 1;
892 }
893 } else {
894 // Just got an empty buffer
895 if (TRACING(DBG)) {
896 emsg = "nothing found in ";
897 emsg += (grep > 0) ? filen : file;
898 TRACEP(p, DBG, emsg);
899 }
900 }
901 }
902
903 // Send back to user
904 if (!sent)
905 response->Send(buf, lout);
906
907 // Cleanup
908 SafeFree(buf);
910 SafeFree(filen);
911 SafeDelArray(pattern);
912
913 // Done
914 return 0;
915}
916
917////////////////////////////////////////////////////////////////////////////////
918/// Locate the exact file path allowing for wildcards '*' in the file name.
919/// In case of success, returns 0 and fills file wity the first matching instance.
920/// Return -1 if no matching pat is found.
921
923{
924 XPDLOC(NMGR, "NetMgr::LocateLocalFile")
925
926 // If no wild cards or empty, nothing to do
927 if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
928
929 // Locate the file name and the dir
930 XrdOucString fn, dn;
931 int isl = file.rfind('/');
932 if (isl != STR_NPOS) {
933 fn.assign(file, isl + 1, -1);
934 dn.assign(file, 0, isl);
935 } else {
936 fn = file;
937 dn = "./";
938 }
939
940 XrdOucString emsg;
941 // Scan the dir
942 DIR *dirp = opendir(dn.c_str());
943 if (!dirp) {
944 XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
945 TRACE(XERR, emsg.c_str());
946 return -1;
947 }
948 struct dirent *ent = 0;
949 XrdOucString sent;
950 while ((ent = readdir(dirp))) {
951 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
952 continue;
953 // Check the match
954 sent = ent->d_name;
955 if (sent.matches(fn.c_str()) > 0) break;
956 sent = "";
957 }
958 closedir(dirp);
959
960 // If found fill a new output
961 if (sent.length() > 0) {
962 XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
963 return 0;
964 }
965
966 // Not found
967 return -1;
968}
969
970////////////////////////////////////////////////////////////////////////////////
971/// Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
972/// returned buffer must be freed by the caller.
973/// Wild cards '*' are allowed in the file name of 'path'; the first matching
974/// instance is taken.
975/// Returns 0 in case of error.
976
977char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
978{
979 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
980
981 XrdOucString emsg;
982 TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
983
984 // Check input
985 if (!path || strlen(path) <= 0) {
986 TRACE(XERR, "path undefined!");
987 return (char *)0;
988 }
989
990 // Locate the path resolving wild cards
991 XrdOucString spath(path);
992 if (LocateLocalFile(spath) != 0) {
993 TRACE(XERR, "path cannot be resolved! (" << path << ")");
994 return (char *)0;
995 }
996 const char *file = spath.c_str();
997
998 // Open the file in read mode
999 int fd = open(file, O_RDONLY);
1000 if (fd < 0) {
1001 emsg = "could not open ";
1002 emsg += file;
1003 TRACE(XERR, emsg);
1004 return (char *)0;
1005 }
1006
1007 // Size of the output
1008 struct stat st;
1009 if (fstat(fd, &st) != 0) {
1010 emsg = "could not get size of file with stat: errno: ";
1011 emsg += (int)errno;
1012 TRACE(XERR, emsg);
1013 close(fd);
1014 return (char *)0;
1015 }
1016 off_t ltot = st.st_size;
1017
1018 // Estimate offsets of the requested range
1019 // Start from ...
1020 kXR_int64 start = ofs;
1021 off_t fst = (start < 0) ? ltot + start : start;
1022 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1023 // End at ...
1024 kXR_int64 end = fst + len;
1025 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1026 TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
1027
1028 // Number of bytes to be read
1029 len = lst - fst;
1030
1031 // Output buffer
1032 char *buf = (char *)malloc(len + 1);
1033 if (!buf) {
1034 emsg = "could not allocate enough memory on the heap: errno: ";
1035 emsg += (int)errno;
1036 XPDERR(emsg);
1037 close(fd);
1038 return (char *)0;
1039 }
1040
1041 // Reposition, if needed
1042 if (fst >= 0)
1043 lseek(fd, fst, SEEK_SET);
1044
1045 int left = len;
1046 int pos = 0;
1047 int nr = 0;
1048 do {
1049 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1050 errno = 0;
1051 if (nr < 0) {
1052 TRACE(XERR, "error reading from file: errno: " << errno);
1053 break;
1054 }
1055
1056 // Update counters
1057 pos += nr;
1058 left -= nr;
1059
1060 } while (nr > 0 && left > 0);
1061
1062 // Termination
1063 buf[len] = 0;
1064 TRACE(HDBG, "read " << nr << " bytes: " << buf);
1065
1066 // Close file
1067 close(fd);
1068
1069 // Done
1070 return buf;
1071}
1072
1073////////////////////////////////////////////////////////////////////////////////
1074/// Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
1075/// must be freed by the caller.
1076/// Wild cards '*' are allowed in the file name of 'path'; the first matching
1077/// instance is taken.
1078/// Returns 0 in case of error.
1079
1081 const char *pat, int &len, int opt)
1082{
1083 XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
1084
1085 XrdOucString emsg;
1086 TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
1087
1088 // Check input
1089 if (!path || strlen(path) <= 0) {
1090 TRACE(XERR, "file path undefined!");
1091 return (char *)0;
1092 }
1093
1094 // Locate the path resolving wild cards
1095 XrdOucString spath(path);
1096 if (LocateLocalFile(spath) != 0) {
1097 TRACE(XERR, "path cannot be resolved! (" << path << ")");
1098 return (char *)0;
1099 }
1100 const char *file = spath.c_str();
1101
1102 // Size of the output
1103 struct stat st;
1104 if (stat(file, &st) != 0) {
1105 emsg = "could not get size of file with stat: errno: ";
1106 emsg += (int)errno;
1107 TRACE(XERR, emsg);
1108 return (char *)0;
1109 }
1110 off_t ltot = st.st_size;
1111
1112 // The grep command
1113 char *cmd = 0;
1114 int lcmd = 0;
1115 if (pat && strlen(pat) > 0) {
1116 lcmd = strlen(pat) + strlen(file) + 20;
1117 cmd = new char[lcmd];
1118 if (opt == 1) {
1119 snprintf(cmd, lcmd, "grep %s %s", pat, file);
1120 } else if (opt == 2) {
1121 snprintf(cmd, lcmd, "grep -v %s %s", pat, file);
1122 } else if (opt == 3) {
1123 snprintf(cmd, lcmd, "cat %s | %s", file, pat);
1124 } else { // should not be here
1125 snprintf(cmd, lcmd, "cat %s", file);
1126 }
1127 } else {
1128 lcmd = strlen(file) + 10;
1129 cmd = new char[lcmd];
1130 snprintf(cmd, lcmd, "cat %s", file);
1131 }
1132 TRACE(DBG, "cmd: " << cmd);
1133
1134 // Execute the command in a pipe
1135 FILE *fp = popen(cmd, "r");
1136 if (!fp) {
1137 emsg = "could not run '";
1138 emsg += cmd;
1139 emsg += "'";
1140 TRACE(XERR, emsg);
1141 delete[] cmd;
1142 return (char *)0;
1143 }
1144 delete[] cmd;
1145
1146 // Read line by line
1147 len = 0;
1148 char *buf = 0;
1149 char line[2048];
1150 int bufsiz = 0, left = 0, lines = 0;
1151 while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
1152 // Parse the line
1153 int llen = strlen(line);
1154 ltot -= llen;
1155 lines++;
1156 // (Re-)allocate the buffer
1157 if (!buf || (llen > left)) {
1158 int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1159 dsiz = (dsiz > llen) ? dsiz : llen;
1160 bufsiz += dsiz;
1161 buf = (char *)realloc(buf, bufsiz + 1);
1162 left += dsiz;
1163 }
1164 if (!buf) {
1165 emsg = "could not allocate enough memory on the heap: errno: ";
1166 emsg += (int)errno;
1167 TRACE(XERR, emsg);
1168 pclose(fp);
1169 return (char *)0;
1170 }
1171 // Add line to the buffer
1172 memcpy(buf + len, line, llen);
1173 len += llen;
1174 left -= llen;
1175 if (TRACING(HDBG))
1176 fprintf(stderr, "line: %s", line);
1177 }
1178
1179 // Check the result and terminate the buffer
1180 if (buf) {
1181 if (len > 0) {
1182 buf[len] = 0;
1183 } else {
1184 free(buf);
1185 buf = 0;
1186 }
1187 }
1188
1189 // Close file
1190 pclose(fp);
1191
1192 // Done
1193 return buf;
1194}
1195
1196////////////////////////////////////////////////////////////////////////////////
1197/// Send a read buffer request of length 'len' at offset 'ofs' for remote file
1198/// defined by 'url'; the returned buffer must be freed by the caller.
1199/// Returns 0 in case of error.
1200
1201char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
1202 kXR_int64 ofs, int &len, int grep)
1203{
1204 XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
1205
1206 TRACE(REQ, "url: " << (url ? url : "undef") <<
1207 ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
1208 ", len: " << len << ", grep: " << grep);
1209
1210 // Check input
1211 if (!file || strlen(file) <= 0) {
1212 TRACE(XERR, "file undefined!");
1213 return (char *)0;
1214 }
1215 XrdClientUrlInfo u(url);
1216 if (!url || strlen(url) <= 0) {
1217 // Use file as url
1218 u.TakeUrl(XrdOucString(file));
1219 if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
1220 }
1221
1222 // Get a connection (logs in)
1223 XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
1224
1225 char *buf = 0;
1226 if (conn && conn->IsValid()) {
1227 // Prepare request
1228 XPClientRequest reqhdr;
1229 memset(&reqhdr, 0, sizeof(reqhdr));
1230 conn->SetSID(reqhdr.header.streamid);
1231 reqhdr.header.requestid = kXP_readbuf;
1232 reqhdr.readbuf.ofs = ofs;
1233 reqhdr.readbuf.len = len;
1234 reqhdr.readbuf.int1 = grep;
1235 reqhdr.header.dlen = strlen(file);
1236 const void *btmp = (const void *) file;
1237 char **vout = &buf;
1238 // Send over
1239 XrdClientMessage *xrsp =
1240 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
1241
1242 // If positive answer
1243 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1244 len = xrsp->DataLen();
1245 } else {
1246 if (xrsp && !(xrsp->IsError()))
1247 // The buffer was just empty: do not call it error
1248 len = 0;
1249 SafeFree(buf);
1250 }
1251
1252 // Clean the message
1253 SafeDelete(xrsp);
1254 // Clean it up, to avoid leaving open tcp connection possibly going forever
1255 // into CLOSE_WAIT
1256 SafeDelete(conn);
1257 }
1258
1259 // Done
1260 return buf;
1261}
1262
1263////////////////////////////////////////////////////////////////////////////////
1264/// Get log paths from next tier; used in multi-master setups
1265/// Returns 0 in case of error.
1266
1267char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
1268{
1269 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1270
1271 TRACE(REQ, "url: " << (url ? url : "undef") <<
1272 ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1273
1274 // Check input
1275 if (!url || strlen(url) <= 0) {
1276 TRACE(XERR, "url undefined!");
1277 return (char *)0;
1278 }
1279
1280 // Get a connection (logs in)
1281 XrdProofConn *conn = GetProofConn(url);
1282
1283 char *buf = 0;
1284 if (conn && conn->IsValid()) {
1285 // Prepare request
1286 XPClientRequest reqhdr;
1287 memset(&reqhdr, 0, sizeof(reqhdr));
1288 conn->SetSID(reqhdr.header.streamid);
1289 reqhdr.header.requestid = kXP_admin;
1290 reqhdr.proof.int1 = kQueryLogPaths;
1291 reqhdr.proof.int2 = isess;
1292 reqhdr.proof.sid = -1;
1293 reqhdr.header.dlen = msg ? strlen(msg) : 0;
1294 const void *btmp = (const void *) msg;
1295 char **vout = &buf;
1296 // Send over
1297 XrdClientMessage *xrsp =
1298 conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
1299
1300 // If positive answer
1301 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1302 int len = xrsp->DataLen();
1303 buf = (char *) realloc((void *)buf, len + 1);
1304 if (buf)
1305 buf[len] = 0;
1306 } else {
1307 SafeFree(buf);
1308 }
1309
1310 // Clean the message
1311 SafeDelete(xrsp);
1312 // Clean it up, to avoid leaving open tcp connection possibly going forever
1313 // into CLOSE_WAIT
1314 SafeDelete(conn);
1315 }
1316
1317 // Done
1318 return buf;
1319}
1320
1321////////////////////////////////////////////////////////////////////////////////
1322/// Get log paths from next tier; used in multi-master setups
1323/// Returns 0 in case of error.
1324
1325char *XrdProofdNetMgr::ReadLogPaths(const char *msg, int isess)
1326{
1327 XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1328
1329 TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1330
1331 char *buf = 0, *pbuf = buf;
1332 int len = 0;
1333 // Loop over unique nodes
1334 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
1335 XrdProofWorker *w = 0;
1336 while (iw != fNodes.end()) {
1337 if ((w = *iw)) {
1338 // Do not send it to ourselves
1339 bool us = (((w->fHost.find("localhost") != STR_NPOS ||
1340 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
1341 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
1342 if (!us) {
1343 // Create 'url'
1344 XrdOucString u = fMgr->EffectiveUser();
1345 u += '@';
1346 u += w->fHost;
1347 if (w->fPort != -1) {
1348 u += ':';
1349 u += w->fPort;
1350 }
1351 // Ask the node
1352 char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
1353 if (bmst) {
1354 len += strlen(bmst) + 1;
1355 buf = (char *) realloc((void *)buf, len);
1356 pbuf = buf + len - strlen(bmst) - 1;
1357 memcpy(pbuf, bmst, strlen(bmst) + 1);
1358 buf[len - 1] = 0;
1359 pbuf = buf + len;
1360 free(bmst);
1361 }
1362 } else {
1363 TRACE(DBG, "request for ourselves: ignore");
1364 }
1365 }
1366 // Next worker
1367 ++iw;
1368 }
1369
1370 // Done
1371 return buf;
1372}
1373
1374////////////////////////////////////////////////////////////////////////////////
1375/// Fill-in fWorkers for a localhost based on the number of
1376/// workers fNumLocalWrks.
1377
1379{
1380 XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
1381
1382 TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
1383
1384 // Lock the method to protect the lists.
1386
1387 // Cleanup the worker list
1388 fWorkers.clear();
1389 // The first time we need to create the default workers
1390 if (fDfltWorkers.size() < 1) {
1391 // Create a default master line
1392 XrdOucString mm("master ", 128);
1393 mm += fMgr->Host();
1394 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1395
1396 // Create 'localhost' lines for each worker
1397 int nwrk = fNumLocalWrks;
1398 if (nwrk > 0) {
1399 mm = "worker localhost port=";
1400 mm += fMgr->Port();
1401 while (nwrk--) {
1402 fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1403 TRACE(DBG, "added line: " << mm);
1404 }
1405 }
1406 }
1407
1408 // Copy the list
1409 std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
1410 for (; w != fDfltWorkers.end(); ++w) {
1411 fWorkers.push_back(*w);
1412 }
1413
1414 TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
1415
1416 // Find unique nodes
1418
1419 // We are done
1420 return;
1421}
1422
1423////////////////////////////////////////////////////////////////////////////////
1424/// Return the list of workers after having made sure that the info is
1425/// up-to-date
1426
1427std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
1428{
1429 XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
1430
1432
1433 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1434 // Check if there were any changes in the config file
1435 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1436 if (fDfltFallback) {
1437 // Use default settings
1439 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1440 } else {
1441 TRACE(XERR, "unable to read the configuration file");
1442 return (std::list<XrdProofWorker *> *)0;
1443 }
1444 }
1445 }
1446 TRACE(DBG, "returning list with " << fWorkers.size() << " entries");
1447
1448 if (TRACING(HDBG)) Dump();
1449
1450 return &fWorkers;
1451}
1452
1453////////////////////////////////////////////////////////////////////////////////
1454/// Dump status
1455
1457{
1458 const char *xpdloc = "NetMgr::Dump";
1459
1461
1462 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1463 XPDPRT("+ Active workers status");
1464 XPDPRT("+ Size: " << fWorkers.size());
1465 XPDPRT("+ ");
1466
1467 std::list<XrdProofWorker *>::iterator iw;
1468 for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
1469 XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
1470 " active sessions:" << (*iw)->Active());
1471 }
1472 XPDPRT("+ ");
1473 XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1474}
1475
1476////////////////////////////////////////////////////////////////////////////////
1477/// Return the list of unique nodes after having made sure that the info is
1478/// up-to-date
1479
1480std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
1481{
1482 XPDLOC(NMGR, "NetMgr::GetNodes")
1483
1485
1486 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1487 // Check if there were any changes in the config file
1488 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1489 if (fDfltFallback) {
1490 // Use default settings
1492 TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1493 } else {
1494 TRACE(XERR, "unable to read the configuration file");
1495 return (std::list<XrdProofWorker *> *)0;
1496 }
1497 }
1498 }
1499 TRACE(DBG, "returning list with " << fNodes.size() << " entries");
1500
1501 return &fNodes;
1502}
1503
1504////////////////////////////////////////////////////////////////////////////////
1505/// Read PROOF config file and load the information in fWorkers.
1506/// NB: 'master' information here is ignored, because it is passed
1507/// via the 'xpd.workdir' and 'xpd.image' config directives
1508
1510{
1511 XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
1512
1513 TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
1514
1515 // Lock the method to protect the lists.
1517
1518 // Check inputs
1519 if (fPROOFcfg.fName.length() <= 0)
1520 return -1;
1521
1522 // Get the modification time
1523 struct stat st;
1524 if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
1525 // If the file disappeared, reset the modification time so that we are sure
1526 // to reload it if it comes back
1527 if (errno == ENOENT) fPROOFcfg.fMtime = -1;
1528 if (!fDfltFallback) {
1529 TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
1530 } else {
1531 TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
1532 }
1533 return -1;
1534 }
1535 TRACE(DBG, "time of last modification: " << st.st_mtime);
1536
1537 // File should be loaded only once
1538 if (st.st_mtime <= fPROOFcfg.fMtime)
1539 return 0;
1540
1541 // Save the modification time
1542 fPROOFcfg.fMtime = st.st_mtime;
1543
1544 // Open the defined path.
1545 FILE *fin = 0;
1546 if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
1547 if (fWorkers.size() > 1) {
1548 TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
1549 TRACE(XERR, "continuing with existing list of workers.");
1550 return 0;
1551 } else {
1552 return -1;
1553 }
1554 }
1555
1556 if (reset) {
1557 // Cleanup the worker list
1558 fWorkers.clear();
1559 }
1560
1561 // Add default a master line if not yet there
1562 if (fRegWorkers.size() < 1) {
1563 XrdOucString mm("master ", 128);
1564 mm += fMgr->Host();
1565 fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
1566 } else {
1567 // Deactivate all current active workers
1568 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1569 // Skip the master line
1570 ++w;
1571 for (; w != fRegWorkers.end(); ++w) {
1572 (*w)->fActive = 0;
1573 }
1574 }
1575
1576 // Read now the directives
1577 int nw = 0;
1578 char lin[2048];
1579 while (fgets(lin, sizeof(lin), fin)) {
1580 // Skip empty lines
1581 int p = 0;
1582 while (lin[p++] == ' ') {
1583 ;
1584 }
1585 p--;
1586 if (lin[p] == '\0' || lin[p] == '\n')
1587 continue;
1588
1589 // Skip comments
1590 if (lin[0] == '#')
1591 continue;
1592
1593 // Remove trailing '\n';
1594 if (lin[strlen(lin)-1] == '\n')
1595 lin[strlen(lin)-1] = '\0';
1596
1597 TRACE(DBG, "found line: " << lin);
1598
1599 // Parse the line
1600 XrdProofWorker *pw = new XrdProofWorker(lin);
1601
1602 const char *pfx[2] = { "master", "node" };
1603 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1604 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1605 // Init a master instance
1606 if (pw->fHost.beginswith("localhost") ||
1607 pw->Matches(fMgr->Host())) {
1608 // Replace the default line (the first with what found in the file)
1609 XrdProofWorker *fw = fRegWorkers.front();
1610 fw->Reset(lin);
1611 }
1612 // Ignore it
1613 SafeDelete(pw);
1614 } else {
1615 // Check if we have already it
1616 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1617 // Skip the master line
1618 ++w;
1619 bool haveit = 0;
1620 while (w != fRegWorkers.end()) {
1621 if (!((*w)->fActive)) {
1622 if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
1623 (*w)->fActive = 1;
1624 haveit = 1;
1625 break;
1626 }
1627 }
1628 // Go to next
1629 ++w;
1630 }
1631 // If we do not have it, build a new worker object
1632 if (!haveit) {
1633 // Keep it
1634 fRegWorkers.push_back(pw);
1635 } else {
1636 // Drop it
1637 SafeDelete(pw);
1638 }
1639 }
1640 }
1641
1642 // Copy the active workers
1643 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1644 while (w != fRegWorkers.end()) {
1645 if ((*w)->fActive) {
1646 fWorkers.push_back(*w);
1647 nw++;
1648 }
1649 ++w;
1650 }
1651
1652 // Close files
1653 fclose(fin);
1654
1655 // Find unique nodes
1656 if (reset)
1658
1659 // We are done
1660 return ((nw == 0) ? -1 : 0);
1661}
1662
1663////////////////////////////////////////////////////////////////////////////////
1664/// Scan fWorkers for unique nodes (stored in fNodes).
1665/// Return the number of unque nodes.
1666/// NB: 'master' information here is ignored, because it is passed
1667/// via the 'xpd.workdir' and 'xpd.image' config directives
1668
1670{
1671 XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
1672
1673 TRACE(REQ, "# workers: " << fWorkers.size());
1674
1675 // Cleanup the nodes list
1676 fNodes.clear();
1677
1678 // Build the list of unique nodes (skip the master line);
1679 if (fWorkers.size() > 1) {
1680 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
1681 ++w;
1682 for (; w != fWorkers.end(); ++w) if ((*w)->fActive) {
1683 bool add = 1;
1684 std::list<XrdProofWorker *>::iterator n;
1685 for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
1686 if ((*n)->Matches(*w)) {
1687 add = 0;
1688 break;
1689 }
1690 }
1691 if (add)
1692 fNodes.push_back(*w);
1693 }
1694 }
1695 TRACE(REQ, "found " << fNodes.size() << " unique nodes");
1696
1697 // We are done
1698 return fNodes.size();
1699}
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:529
#define d(i)
Definition: RSha256.hxx:102
#define e(i)
Definition: RSha256.hxx:103
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
static unsigned int total
int type
Definition: TGX11.cxx:120
double floor(double)
@ kXP_readbuf
@ kXP_admin
@ kXP_ctrlc
#define kXPD_Master
@ kRTStatic
@ kRTNone
#define kXPD_AnyServer
@ kCleanupSessions
@ kExec
@ kQueryLogPaths
@ kROOTVersion
#define kXPD_Worker
@ kXPD_srvmsg
#define XrdSysError
Definition: XpdSysError.h:8
#define NAME_REQUESTTIMEOUT
#define xrdmin(a, b)
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
#define XPDFORM
Definition: XrdProofdAux.h:381
#define SafeFree(x)
Definition: XrdProofdAux.h:341
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define SafeDelArray(x)
Definition: XrdProofdAux.h:338
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
#define XPD_LONGOK(x)
#define XPD_SETRESP(p, x)
#define XPDPRT(x)
#define XPDLOC(d, x)
#define TRACEP(p, act, x)
#define TRACING(x)
#define XPDERR(x)
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
#define realloc
Definition: civetweb.c:1538
#define free
Definition: civetweb.c:1539
#define snprintf
Definition: civetweb.c:1540
#define malloc
Definition: civetweb.c:1536
void TakeUrl(XrdOucString url)
XrdOucString GetUrl()
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
bool IsValid() const
Test validity of this connection.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server's response.
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen)
const char * GetLastErr()
Definition: XrdProofConn.h:136
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
void Reset(const char *str)
Set content from a config file-like string.
bool Matches(const char *host)
Check compatibility of host with this instance.
XrdOucString fUser
XrdOucString fHost
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static char * Expand(char *p)
Expand path 'p' relative to: $HOME if begins with ~/ <user>'s $HOME if begins with ~<user>/ $PWD if d...
const char * User() const
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdOucString fName
Definition: XrdProofdAux.h:76
XrdProofdNetMgr * NetMgr() const
const char * Host() const
int SrvType() const
const char * EffectiveUser() const
XrdOucString Get(int i)
Return i-th combination (i : 0 -> fN-1)
bool IsValid() const
Definition: XrdProofdAux.h:167
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length 'len' at offset 'ofs' for remote file defined by 'url'; the retu...
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
std::list< XrdProofWorker * > fDfltWorkers
void BalanceNodesOrder()
Indices (this will be used twice).
std::list< XrdProofWorker * > fNodes
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdSysRecMutex fMutex
int DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards '*' in the file name.
XrdProofdFile fPROOFcfg
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the returned buffer must be freed...
void Dump()
Dump status.
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
void RegisterDirectives()
Register config directives.
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
std::list< XrdProofWorker * > fWorkers
XrdProofdManager * fMgr
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process 'worker' directive.
std::list< XrdProofWorker * > fRegWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual ~XrdProofdNetMgr()
Destructor.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process 'adminreqto' directive.
XrdProofdClient * Client() const
XrdBuffer * Argp() const
XPClientRequest * Request() const
TLine * line
const Int_t n
Definition: legend1.C:16
static constexpr double us
static constexpr double s
static constexpr double mm
static constexpr double pi
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer,...
Definition: file.py:1
auto * m
Definition: textangle.C:8
struct ClientRequestHdr header
struct XPClientProofRequest proof
struct XPClientReadbufRequest readbuf