Logo ROOT   6.14/05
Reference Guide
XrdProofdNetMgr.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: G. Ganis Jan 2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 #include "XrdProofdPlatform.h"
12 
13 //////////////////////////////////////////////////////////////////////////
14 // //
15 // XrdProofdNetMgr //
16 // //
17 // Authors: G. Ganis, CERN, 2008 //
18 // //
19 // Manages connections between PROOF server daemons //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #include "XrdProofdNetMgr.h"
24 
25 #include "XrdProofdXrdVers.h"
26 #ifndef ROOT_XrdFour
27 # include "XpdSysDNS.h"
28 #else
29 # include "XrdNet/XrdNetAddr.hh"
30 #endif
31 #include "Xrd/XrdBuffer.hh"
36 #include "XrdOuc/XrdOucStream.hh"
37 #include "XrdSys/XrdSysPlatform.hh"
38 
39 #include "XrdProofdClient.h"
40 #include "XrdProofdManager.h"
41 #include "XrdProofdProtocol.h"
42 #include "XrdProofdResponse.h"
43 #include "XrdProofWorker.h"
44 
45 // Tracing utilities
46 #include "XrdProofdTrace.h"
47 
48 #include <algorithm>
49 #include <limits>
50 #include <math.h>
51 
52 ////////////////////////////////////////////////////////////////////////////////
53 /// Send up a message from the server
54 
55 int MessageSender(const char *msg, int len, void *arg)
56 {
58  if (r) {
59  return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
60  }
61  return -1;
62 }
63 
64 ////////////////////////////////////////////////////////////////////////////////
65 /// Constructor
66 
68  XrdProtocol_Config *pi, XrdSysError *e)
69  : XrdProofdConfig(pi->ConfigFN, e)
70 {
71  fMgr = mgr;
73  fPROOFcfg.fName = "";
74  fPROOFcfg.fMtime = -1;
75  fReloadPROOFcfg = 1;
76  fDfltFallback = 0;
77  fDfltWorkers.clear();
78  fRegWorkers.clear();
79  fWorkers.clear();
80  fNodes.clear();
82  fWorkerUsrCfg = 0;
83  fRequestTO = 30;
84  fBonjourEnabled = false;
85 
86  // Configuration directives
88 }
89 
90 ////////////////////////////////////////////////////////////////////////////////
91 /// Register config directives
92 
94 {
95  Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
96  Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
97  Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
98  Register("bonjour", new XrdProofdDirective("bonjour", this, &DoDirectiveClass));
99  Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
100 }
101 
102 ////////////////////////////////////////////////////////////////////////////////
103 /// Destructor
104 
106 {
107  // Cleanup the worker lists
108  // (the nodes list points to the same object, no cleanup is needed)
109  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
110  while (w != fRegWorkers.end()) {
111  delete *w;
112  w = fRegWorkers.erase(w);
113  }
114  w = fDfltWorkers.begin();
115  while (w != fDfltWorkers.end()) {
116  delete *w;
117  w = fDfltWorkers.erase(w);
118  }
119  fWorkers.clear();
120 }
121 
122 ////////////////////////////////////////////////////////////////////////////////
123 /// Run configuration and parse the entered config directives.
124 /// Return 0 on success, -1 on error
125 
127 {
128  XPDLOC(NMGR, "NetMgr::Config")
129 
130  // Lock the method to protect the lists.
132 
133  // Cleanup the worker list
134  std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
135  while (w != fWorkers.end()) {
136  delete *w;
137  w = fWorkers.erase(w);
138  }
139  // Create a default master line
140  XrdOucString mm("master ", 128);
141  mm += fMgr->Host();
142  mm += " port=";
143  mm += fMgr->Port();
144  fWorkers.push_back(new XrdProofWorker(mm.c_str()));
145 
146  // Run first the configurator
147  if (XrdProofdConfig::Config(rcf) != 0) {
148  XPDERR("problems parsing file ");
149  return -1;
150  }
151 
152  XrdOucString msg;
153  msg = (rcf) ? "re-configuring" : "configuring";
154  TRACE(ALL, msg);
155 
156  if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
157  TRACE(ALL, "PROOF config file: " <<
158  ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
159  if (fResourceType == kRTStatic) {
160  // Initialize the list of workers if a static config has been required
161  // Default file path, if none specified
162  bool dodefault = 1;
163  if (fPROOFcfg.fName.length() > 0) {
164  // Load file content in memory
165  if (ReadPROOFcfg() == 0) {
166  TRACE(ALL, "PROOF config file will " <<
167  ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
168  dodefault = 0;
169  } else {
170  if (!fDfltFallback) {
171  XPDERR("unable to find valid information in PROOF config file " <<
172  fPROOFcfg.fName);
173  fPROOFcfg.fMtime = -1;
174  return 0;
175  } else {
176  TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
177  }
178  }
179  }
180  if (dodefault) {
181  // Use default
183  }
184  } else if (fResourceType == kRTNone && fWorkers.size() <= 1 && !fBonjourEnabled) {
185  // Nothing defined: use default
187  }
188 
189  // Find unique nodes
190  FindUniqueNodes();
191  }
192 
193  // For connection to the other xproofds we try only once
195  // Request Timeout
197 
198  // Notification
199  XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
200  TRACE(ALL, msg);
201 
202  // Done
203  return 0;
204 }
205 
206 ////////////////////////////////////////////////////////////////////////////////
207 /// Update the priorities of the active sessions.
208 
210  char *val, XrdOucStream *cfg, bool rcf)
211 {
212  XPDLOC(NMGR, "NetMgr::DoDirective")
213 
214  if (!d)
215  // undefined inputs
216  return -1;
217 
218  if (d->fName == "resource") {
219  return DoDirectiveResource(val, cfg, rcf);
220  } else if (d->fName == "adminreqto") {
221  return DoDirectiveAdminReqTO(val, cfg, rcf);
222  } else if (d->fName == "worker") {
223  return DoDirectiveWorker(val, cfg, rcf);
224  } else if (d->fName == "bonjour") {
225  return DoDirectiveBonjour(val, cfg, rcf);
226  }
227 
228  TRACE(XERR, "unknown directive: " << d->fName);
229 
230  return -1;
231 }
232 
233 ////////////////////////////////////////////////////////////////////////////////
234 
235 int XrdProofdNetMgr::DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
236 {
237  XPDLOC(NMGR, "NetMgr::DoDirectiveBonjour");
238 
239  // Process 'bonjour' directive
240  TRACE(DBG, "processing Bonjour directive");
241 
242  if (!val || !cfg)
243  // undefined inputs
244  return -1;
245 
246  TRACE(XERR, "Bonjour support is disabled");
247  return -1;
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Indices (this will be used twice).
252 
254 {
255  list<XrdProofWorker *>::const_iterator iter, iter2;
256  list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
257  // Map to store information of the balancer.
258  map<XrdProofWorker *, BalancerInfo> info;
259  // Node with minimum number of workers distinct to 1.
260  unsigned int min = UINT_MAX;
261  // Total number of nodes and per iteration assignments.
262  unsigned int total = 0, total_perit = 0;
263  // Number of iterations to get every node filled.
264  unsigned int total_added = 0;
265  // Temporary list to store the balanced configuration
266  list<XrdProofWorker *> tempNodes;
267  // Flag for the search and destroy loop.
268  bool deleted;
269 
270  // Fill the information store with the first data (number of nodes).
271  for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
272  // The next code is not the same as this:
273  //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
274  // The previous piece of STL code only checks the pointer of the value
275  // stored on the list, altough it is more efficient, it needs that repeated
276  // nodes point to the same object. To allow hybrid configurations, we are
277  // doing a 'manually' matching since statically configured nodes are
278  // created in multiple ways.
279  info[*iter].available = 0;
280  for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
281  if ((*iter)->Matches(*iter2)) {
282  info[*iter].available++;
283  }
284  }
285  info[*iter].added = 0;
286  // Calculate the minimum greater than 1.
287  if (info[*iter].available > 1 && info[*iter].available < min)
288  min = info[*iter].available;
289  // Calculate the totals.
290  total += info[*iter].available;
291  }
292 
293  // Now, calculate the number of workers to add in each iteration of the
294  // round robin, scaling to the smaller number.
295  for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
296  if (info[*iter].available > 1) {
297  info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
298  } else {
299  info[*iter].per_iteration = 1;
300  }
301  // Calculate the totals.
302  total_perit += info[*iter].per_iteration;
303  }
304 
305  // Since we are going to substitute the list, don't forget to recover the
306  // default node at the fist time.
307  tempNodes.push_back(fWorkers.front());
308 
309  // Finally, do the round robin assignment of nodes.
310  // Stop when every node has its workers processed.
311  while (total_added < total) {
312  for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
313  if (i->second.added < i->second.available) {
314  // Be careful with the remainders (on prime number of nodes).
315  unsigned int to_add = xrdmin(i->second.per_iteration,
316  (i->second.available - i->second.added));
317  // Then add the nodes.
318  for (unsigned int j = 0; j < to_add; j++) {
319  tempNodes.push_back(i->first);
320  }
321  i->second.added += to_add;
322  total_added += to_add;
323  }
324  }
325  }
326 
327  // Since we are mergin nodes in only one object, we must merge the current
328  // sessions of the static nodes (that can be distinct objects that represent
329  // the same node) and delete the orphaned objects. If, in the future, we can
330  // assure that every worker has only one object in the list, this is not more
331  // necessary. The things needed to change are the DoDirectiveWorker, it must
332  // search for a node before inserting it, and in the repeat directive insert
333  // the same node always. Also the default configuration methods (there are
334  // two in this class) must be updated.
335  iter3 = ++(fWorkers.begin());
336  while (iter3 != fWorkers.end()) {
337  deleted = false;
338  // If the worker is not in the fWorkers list, we must process it. Note that
339  // std::count() uses a plain comparison between values, in this case, we
340  // are comparing pointers (numbers, at the end).
341  if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
342  // Search for an object that matches with this in the temp list.
343  for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
344  if ((*iter2)->Matches(*iter3)) {
345  // Copy data and delete the *iter object.
346  (*iter2)->MergeProofServs(*(*iter3));
347  deleted = true;
348  delete *iter3;
349  fWorkers.erase(iter3++);
350  break;
351  }
352  }
353  }
354  // Do not forget to increase the iterator.
355  if (!deleted)
356  ++iter3;
357  }
358 
359  // Then, substitute the current fWorkers list with the balanced one.
360  fWorkers = tempNodes;
361 }
362 
363 ////////////////////////////////////////////////////////////////////////////////
364 /// Process 'adminreqto' directive
365 
366 int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
367 {
368  if (!val)
369  // undefined inputs
370  return -1;
371 
372  // Check deprecated 'if' directive
373  if (fMgr->Host() && cfg)
374  if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
375  return 0;
376 
377  // Timeout on requested broadcasted to workers; there are 4 attempts,
378  // so the real timeout is 4 x fRequestTO
379  int to = strtol(val, 0, 10);
380  fRequestTO = (to > 0) ? to : fRequestTO;
381  return 0;
382 }
383 
384 ////////////////////////////////////////////////////////////////////////////////
385 /// Process 'resource' directive
386 
387 int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
388 {
389  XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
390 
391  if (!val || !cfg)
392  // undefined inputs
393  return -1;
394 
395  if (!strcmp("static", val)) {
396  // We just take the path of the config file here; the
397  // rest is used by the static scheduler
399  while ((val = cfg->GetWord()) && val[0]) {
400  XrdOucString s(val);
401  if (s.beginswith("ucfg:")) {
402  fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
403  } else if (s.beginswith("reload:")) {
404  fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
405  } else if (s.beginswith("dfltfallback:")) {
406  fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
407  } else if (s.beginswith("wmx:")) {
408  } else if (s.beginswith("selopt:")) {
409  } else {
410  // Config file
411  fPROOFcfg.fName = val;
412  if (fPROOFcfg.fName.beginswith("sm:")) {
413  fPROOFcfg.fName.replace("sm:", "");
414  }
416  // Make sure it exists and can be read
417  if (access(fPROOFcfg.fName.c_str(), R_OK)) {
418  if (errno == ENOENT) {
419  TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
420  } else {
421  TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
422  fPROOFcfg.fName = "";
423  fPROOFcfg.fMtime = -1;
424  }
425  }
426  }
427  }
428  }
429  return 0;
430 }
431 
432 ////////////////////////////////////////////////////////////////////////////////
433 /// Process 'worker' directive
434 
435 int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
436 {
437  XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
438 
439  if (!val || !cfg)
440  // undefined inputs
441  return -1;
442 
443  // Lock the method to protect the lists.
445 
446  // Get the full line (w/o heading keyword)
447  cfg->RetToken();
448  XrdOucString wrd(cfg->GetWord());
449  if (wrd.length() > 0) {
450  // Build the line
451  XrdOucString line;
452  char rest[2048] = {0};
453  cfg->GetRest((char *)&rest[0], 2048);
454  XPDFORM(line, "%s %s", wrd.c_str(), rest);
455  // Parse it now
456  if (wrd == "master" || wrd == "node") {
457  // Init a master instance
458  XrdProofWorker *pw = new XrdProofWorker(line.c_str());
459  if (pw->fHost.beginswith("localhost") ||
460  pw->Matches(fMgr->Host())) {
461  // Replace the default line (the first with what found in the file)
462  XrdProofWorker *fw = fWorkers.front();
463  fw->Reset(line.c_str());
464  }
465  SafeDelete(pw);
466  } else {
467  // How many lines like this?
468  int nr = 1;
469  int ir = line.find("repeat=");
470  if (ir != STR_NPOS) {
471  XrdOucString r(line, ir + strlen("repeat="));
472  r.erase(r.find(' '));
473  nr = r.atoi();
474  if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
475  TRACE(DBG, "found repeat = " << nr);
476  }
477  while (nr--) {
478  // Build the worker object
479  XrdProofdMultiStr mline(line.c_str());
480  if (mline.IsValid()) {
481  TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
482  for (int i = 0; i < mline.N(); i++) {
483  TRACE(HDBG, "found token: " << mline.Get(i));
484  fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
485  }
486  } else {
487  TRACE(DBG, "found line: " << line);
488  fWorkers.push_back(new XrdProofWorker(line.c_str()));
489  }
490  }
491  }
492  }
493 
494  // Necessary for the balancer when Bonjour is enabled. Note that this balancer
495  // can also be enabled with a static configuration. By this time is disabled
496  // due to its experimental status.
497  FindUniqueNodes();
498  //BalanceNodesOrder();
499 
500  return 0;
501 }
502 
503 ////////////////////////////////////////////////////////////////////////////////
504 /// Broadcast a ctrlc interrupt
505 /// Return 0 on success, -1 on error
506 
508 {
509  XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
510 
511  int rc = 0;
512 
513  // Loop over unique nodes
514  std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
515  XrdProofWorker *w = 0;
516  while (iw != fNodes.end()) {
517  if ((w = *iw) && w->fType != 'M') {
518  // Do not send it to ourselves
519  bool us = (((w->fHost.find("localhost") != STR_NPOS ||
520  XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
521  (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
522  if (!us) {
523  // Create 'url'
524  // We use the enforced username if specified in the config file; this is the case
525  // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
526  XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
527  if (u.length() <= 0) u = fMgr->EffectiveUser();
528  u += '@';
529  u += w->fHost;
530  if (w->fPort != -1) {
531  u += ':';
532  u += w->fPort;
533  }
534  TRACE(HDBG, "sending request to: "<<u);
535  // Get a connection to the server
536  XrdProofConn *conn = GetProofConn(u.c_str());
537  if (conn && conn->IsValid()) {
538  // Prepare request
539  XPClientRequest reqhdr;
540  memset(&reqhdr, 0, sizeof(reqhdr));
541  conn->SetSID(reqhdr.header.streamid);
542  reqhdr.proof.requestid = kXP_ctrlc;
543  reqhdr.proof.sid = 0;
544  reqhdr.proof.dlen = 0;
545  // We need the right order
546  if (XPD::clientMarshall(&reqhdr) != 0) {
547  TRACE(XERR, "problems marshalling request");
548  return -1;
549  }
550  if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
551  TRACE(XERR, "problems sending ctrl-c request to server " << u);
552  }
553  // Clean it up, to avoid leaving open tcp connection possibly going forever
554  // into CLOSE_WAIT
555  SafeDelete(conn);
556  }
557  } else {
558  TRACE(DBG, "broadcast request for ourselves: ignore");
559  }
560  }
561  // Next worker
562  ++iw;
563  }
564 
565  // Done
566  return rc;
567 }
568 
569 ////////////////////////////////////////////////////////////////////////////////
570 /// Broadcast request to known potential sub-nodes.
571 /// Return 0 on success, -1 on error
572 
573 int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
574  XrdProofdResponse *r, bool notify, int subtype)
575 {
576  XPDLOC(NMGR, "NetMgr::Broadcast")
577 
578  unsigned int nok = 0;
579  TRACE(REQ, "type: " << type);
580 
581  // Loop over unique nodes
582  std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
583  XrdProofWorker *w = 0;
584  XrdClientMessage *xrsp = 0;
585  while (iw != fNodes.end()) {
586  if ((w = *iw) && w->fType != 'M') {
587  // Do not send it to ourselves
588  bool us = (((w->fHost.find("localhost") != STR_NPOS ||
589  XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
590  (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
591  if (!us) {
592  // Create 'url'
593  // We use the enforced username if specified in the config file; this is the case
594  // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
595  XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
596  if (u.length() <= 0) u = fMgr->EffectiveUser();
597  u += '@';
598  u += w->fHost;
599  if (w->fPort != -1) {
600  u += ':';
601  u += w->fPort;
602  }
603  // Type of server
604  int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
605  : (kXR_int32) kXPD_Worker;
606  TRACE(HDBG, "sending request to " << u);
607  // Send request
608  if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
609  TRACE(XERR, "problems sending request to " << u);
610  } else {
611  nok++;
612  }
613  // Cleanup answer
614  SafeDelete(xrsp);
615  } else {
616  TRACE(DBG, "broadcast request for ourselves: ignore");
617  }
618  }
619  // Next worker
620  ++iw;
621  }
622 
623  // Done
624  return (nok == fNodes.size()) ? 0 : -1;
625 }
626 
627 ////////////////////////////////////////////////////////////////////////////////
628 /// Get a XrdProofConn for url; create a new one if not available
629 
631 {
632  XrdProofConn *p = 0;
633 
634  // If not found create a new one
635  XrdOucString buf = " Manager connection from ";
636  buf += fMgr->Host();
637  buf += "|ord:000";
638  char m = 'A'; // log as admin
639 
640  {
642  p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
643  }
644  if (p && !(p->IsValid())) SafeDelete(p);
645 
646  // Done
647  return p;
648 }
649 
650 ////////////////////////////////////////////////////////////////////////////////
651 /// Broadcast request to known potential sub-nodes.
652 /// Return 0 on success, -1 on error
653 
655  const char *msg, int srvtype,
656  XrdProofdResponse *r, bool notify,
657  int subtype)
658 {
659  XPDLOC(NMGR, "NetMgr::Send")
660 
661  XrdClientMessage *xrsp = 0;
662  TRACE(REQ, "type: " << type);
663 
664  if (!url || strlen(url) <= 0)
665  return xrsp;
666 
667  // Get a connection to the server
668  XrdProofConn *conn = GetProofConn(url);
669 
670  bool ok = 1;
671  if (conn && conn->IsValid()) {
672  XrdOucString notifymsg("Send: ");
673  // Prepare request
674  XPClientRequest reqhdr;
675  const void *buf = 0;
676  char **vout = 0;
677  memset(&reqhdr, 0, sizeof(reqhdr));
678  conn->SetSID(reqhdr.header.streamid);
679  reqhdr.header.requestid = kXP_admin;
680  reqhdr.proof.int1 = type;
681  switch (type) {
682  case kROOTVersion:
683  notifymsg += "change-of-ROOT version request to ";
684  notifymsg += url;
685  notifymsg += " msg: ";
686  notifymsg += msg;
687  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
688  buf = (msg) ? (const void *)msg : buf;
689  break;
690  case kCleanupSessions:
691  notifymsg += "cleanup request to ";
692  notifymsg += url;
693  notifymsg += " for user: ";
694  notifymsg += msg;
695  reqhdr.proof.int2 = (kXR_int32) srvtype;
696  reqhdr.proof.sid = -1;
697  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
698  buf = (msg) ? (const void *)msg : buf;
699  break;
700  case kExec:
701  notifymsg += "exec ";
702  notifymsg += subtype;
703  notifymsg += "request for ";
704  notifymsg += msg;
705  reqhdr.proof.int2 = (kXR_int32) subtype;
706  reqhdr.proof.sid = -1;
707  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
708  buf = (msg) ? (const void *)msg : buf;
709  break;
710  default:
711  ok = 0;
712  TRACE(XERR, "invalid request type " << type);
713  break;
714  }
715 
716  // Notify the client that we are sending the request
717  if (r && notify)
718  r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
719 
720  // Activate processing of unsolicited responses
721  conn->SetAsync(conn, &MessageSender, (void *)r);
722 
723  // Send over
724  if (ok)
725  xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
726 
727  // Deactivate processing of unsolicited responses
728  conn->SetAsync(0, 0, (void *)0);
729 
730  // Print error msg, if any
731  if (r && !xrsp && conn->GetLastErr()) {
732  XrdOucString cmsg = url;
733  cmsg += ": ";
734  cmsg += conn->GetLastErr();
735  r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
736  }
737  // Clean it up, to avoid leaving open tcp connection possibly going forever
738  // into CLOSE_WAIT
739  SafeDelete(conn);
740 
741  } else {
742  TRACE(XERR, "could not open connection to " << url);
743  if (r) {
744  XrdOucString cmsg = "failure attempting connection to ";
745  cmsg += url;
746  r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
747  }
748  }
749 
750  // Done
751  return xrsp;
752 }
753 
754 ////////////////////////////////////////////////////////////////////////////////
755 /// Check if 'host' is this local host. If checkport is true,
756 /// matching of the local port with the one implied by host is also checked.
757 /// Return 1 if 'local', 0 otherwise
758 
759 bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
760 {
761  XPDLOC(NMGR, "NetMgr::IsLocal")
762 
763  int rc = 0;
764  if (host && strlen(host) > 0) {
765  XrdClientUrlInfo uu(host);
766  if (uu.Port <= 0) uu.Port = 1093;
767  // Fully qualified name
768 #ifndef ROOT_XrdFour
769  char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
770 #else
771  XrdNetAddr aNA;
772  aNA.Set(uu.Host.c_str());
773  char *fqn = (char *) aNA.Name();
774 #endif
775  TRACE(HDBG, "fqn: '"<<fqn<<"' mgrh: '"<<fMgr->Host()<<"'");
776  if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
777  !strcmp(fMgr->Host(), fqn))) {
778  if (!checkport || (uu.Port == fMgr->Port()))
779  rc = 1;
780  }
781 #ifndef ROOT_XrdFour
782  SafeFree(fqn);
783 #endif
784  }
785  // Done
786  return rc;
787 }
788 
789 ////////////////////////////////////////////////////////////////////////////////
790 /// Process a readbuf request
791 
793 {
794  XPDLOC(NMGR, "NetMgr::ReadBuffer")
795 
796  int rc = 0;
797  XPD_SETRESP(p, "ReadBuffer");
798 
799  XrdOucString emsg;
800 
801  // Unmarshall the data
802  //
803  kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
804  int len = ntohl(p->Request()->readbuf.len);
805 
806  // Find out the file name
807  char *file = 0;
808  char *filen = 0;
809  char *pattern = 0;
810  int dlen = p->Request()->header.dlen;
811  int grep = ntohl(p->Request()->readbuf.int1);
812  int blen = dlen;
813  bool local = 0;
814  if (dlen > 0 && p->Argp()->buff) {
815  file = new char[dlen+1];
816  memcpy(file, p->Argp()->buff, dlen);
817  file[dlen] = 0;
818  // Check if local
819  XrdClientUrlInfo ui(file);
820  if (ui.Host.length() > 0) {
821  // Check locality
822  local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
823  if (local) {
824  memcpy(file, ui.File.c_str(), ui.File.length());
825  file[ui.File.length()] = 0;
826  blen = ui.File.length();
827  TRACEP(p, DBG, "file is LOCAL");
828  }
829  }
830  // If grep, extract the pattern
831  if (grep > 0) {
832  // 'grep' operation: len is the length of the 'pattern' to be grepped
833  pattern = new char[len + 1];
834  int j = blen - len;
835  int i = 0;
836  while (j < blen)
837  pattern[i++] = file[j++];
838  pattern[i] = 0;
839  filen = strdup(file);
840  filen[blen - len] = 0;
841  TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
842  }
843  } else {
844  emsg = "file name not found";
845  TRACEP(p, XERR, emsg);
846  response->Send(kXR_InvalidRequest, emsg.c_str());
847  return 0;
848  }
849  if (grep) {
850  TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
851  ", pattern: " << pattern);
852  } else {
853  TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
854  }
855 
856  // Get the buffer
857  int lout = len;
858  char *buf = 0;
859  if (local) {
860  if (grep > 0) {
861  // Grep local file
862  lout = blen; // initial length
863  buf = ReadBufferLocal(filen, pattern, lout, grep);
864  } else {
865  // Read portion of local file
866  buf = ReadBufferLocal(file, ofs, lout);
867  }
868  } else {
869  // Read portion of remote file
870  XrdClientUrlInfo u(file);
871  if (u.User.length() <= 0)
872  u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
873  buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
874  }
875 
876  bool sent = 0;
877  if (!buf) {
878  if (lout > 0) {
879  if (grep > 0) {
880  if (TRACING(DBG)) {
881  XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
882  TRACEP(p, DBG, emsg);
883  }
884  response->Send();
885  sent = 1;
886  } else {
887  XPDFORM(emsg, "could not read buffer from %s %s",
888  (local) ? "local file " : "remote file ", file);
889  TRACEP(p, XERR, emsg);
890  response->Send(kXR_InvalidRequest, emsg.c_str());
891  sent = 1;
892  }
893  } else {
894  // Just got an empty buffer
895  if (TRACING(DBG)) {
896  emsg = "nothing found in ";
897  emsg += (grep > 0) ? filen : file;
898  TRACEP(p, DBG, emsg);
899  }
900  }
901  }
902 
903  // Send back to user
904  if (!sent)
905  response->Send(buf, lout);
906 
907  // Cleanup
908  SafeFree(buf);
909  SafeDelArray(file);
910  SafeFree(filen);
911  SafeDelArray(pattern);
912 
913  // Done
914  return 0;
915 }
916 
917 ////////////////////////////////////////////////////////////////////////////////
918 /// Locate the exact file path allowing for wildcards '*' in the file name.
919 /// In case of success, returns 0 and fills file wity the first matching instance.
920 /// Return -1 if no matching pat is found.
921 
923 {
924  XPDLOC(NMGR, "NetMgr::LocateLocalFile")
925 
926  // If no wild cards or empty, nothing to do
927  if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
928 
929  // Locate the file name and the dir
930  XrdOucString fn, dn;
931  int isl = file.rfind('/');
932  if (isl != STR_NPOS) {
933  fn.assign(file, isl + 1, -1);
934  dn.assign(file, 0, isl);
935  } else {
936  fn = file;
937  dn = "./";
938  }
939 
940  XrdOucString emsg;
941  // Scan the dir
942  DIR *dirp = opendir(dn.c_str());
943  if (!dirp) {
944  XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
945  TRACE(XERR, emsg.c_str());
946  return -1;
947  }
948  struct dirent *ent = 0;
949  XrdOucString sent;
950  while ((ent = readdir(dirp))) {
951  if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
952  continue;
953  // Check the match
954  sent = ent->d_name;
955  if (sent.matches(fn.c_str()) > 0) break;
956  sent = "";
957  }
958  closedir(dirp);
959 
960  // If found fill a new output
961  if (sent.length() > 0) {
962  XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
963  return 0;
964  }
965 
966  // Not found
967  return -1;
968 }
969 
970 ////////////////////////////////////////////////////////////////////////////////
971 /// Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
972 /// returned buffer must be freed by the caller.
973 /// Wild cards '*' are allowed in the file name of 'path'; the first matching
974 /// instance is taken.
975 /// Returns 0 in case of error.
976 
977 char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
978 {
979  XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
980 
981  XrdOucString emsg;
982  TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
983 
984  // Check input
985  if (!path || strlen(path) <= 0) {
986  TRACE(XERR, "path undefined!");
987  return (char *)0;
988  }
989 
990  // Locate the path resolving wild cards
991  XrdOucString spath(path);
992  if (LocateLocalFile(spath) != 0) {
993  TRACE(XERR, "path cannot be resolved! (" << path << ")");
994  return (char *)0;
995  }
996  const char *file = spath.c_str();
997 
998  // Open the file in read mode
999  int fd = open(file, O_RDONLY);
1000  if (fd < 0) {
1001  emsg = "could not open ";
1002  emsg += file;
1003  TRACE(XERR, emsg);
1004  return (char *)0;
1005  }
1006 
1007  // Size of the output
1008  struct stat st;
1009  if (fstat(fd, &st) != 0) {
1010  emsg = "could not get size of file with stat: errno: ";
1011  emsg += (int)errno;
1012  TRACE(XERR, emsg);
1013  close(fd);
1014  return (char *)0;
1015  }
1016  off_t ltot = st.st_size;
1017 
1018  // Estimate offsets of the requested range
1019  // Start from ...
1020  kXR_int64 start = ofs;
1021  off_t fst = (start < 0) ? ltot + start : start;
1022  fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1023  // End at ...
1024  kXR_int64 end = fst + len;
1025  off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1026  TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
1027 
1028  // Number of bytes to be read
1029  len = lst - fst;
1030 
1031  // Output buffer
1032  char *buf = (char *)malloc(len + 1);
1033  if (!buf) {
1034  emsg = "could not allocate enough memory on the heap: errno: ";
1035  emsg += (int)errno;
1036  XPDERR(emsg);
1037  close(fd);
1038  return (char *)0;
1039  }
1040 
1041  // Reposition, if needed
1042  if (fst >= 0)
1043  lseek(fd, fst, SEEK_SET);
1044 
1045  int left = len;
1046  int pos = 0;
1047  int nr = 0;
1048  do {
1049  while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1050  errno = 0;
1051  if (nr < 0) {
1052  TRACE(XERR, "error reading from file: errno: " << errno);
1053  break;
1054  }
1055 
1056  // Update counters
1057  pos += nr;
1058  left -= nr;
1059 
1060  } while (nr > 0 && left > 0);
1061 
1062  // Termination
1063  buf[len] = 0;
1064  TRACE(HDBG, "read " << nr << " bytes: " << buf);
1065 
1066  // Close file
1067  close(fd);
1068 
1069  // Done
1070  return buf;
1071 }
1072 
1073 ////////////////////////////////////////////////////////////////////////////////
1074 /// Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
1075 /// must be freed by the caller.
1076 /// Wild cards '*' are allowed in the file name of 'path'; the first matching
1077 /// instance is taken.
1078 /// Returns 0 in case of error.
1079 
1080 char *XrdProofdNetMgr::ReadBufferLocal(const char *path,
1081  const char *pat, int &len, int opt)
1082 {
1083  XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
1084 
1085  XrdOucString emsg;
1086  TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
1087 
1088  // Check input
1089  if (!path || strlen(path) <= 0) {
1090  TRACE(XERR, "file path undefined!");
1091  return (char *)0;
1092  }
1093 
1094  // Locate the path resolving wild cards
1095  XrdOucString spath(path);
1096  if (LocateLocalFile(spath) != 0) {
1097  TRACE(XERR, "path cannot be resolved! (" << path << ")");
1098  return (char *)0;
1099  }
1100  const char *file = spath.c_str();
1101 
1102  // Size of the output
1103  struct stat st;
1104  if (stat(file, &st) != 0) {
1105  emsg = "could not get size of file with stat: errno: ";
1106  emsg += (int)errno;
1107  TRACE(XERR, emsg);
1108  return (char *)0;
1109  }
1110  off_t ltot = st.st_size;
1111 
1112  // The grep command
1113  char *cmd = 0;
1114  int lcmd = 0;
1115  if (pat && strlen(pat) > 0) {
1116  lcmd = strlen(pat) + strlen(file) + 20;
1117  cmd = new char[lcmd];
1118  if (opt == 1) {
1119  snprintf(cmd, lcmd, "grep %s %s", pat, file);
1120  } else if (opt == 2) {
1121  snprintf(cmd, lcmd, "grep -v %s %s", pat, file);
1122  } else if (opt == 3) {
1123  snprintf(cmd, lcmd, "cat %s | %s", file, pat);
1124  } else { // should not be here
1125  snprintf(cmd, lcmd, "cat %s", file);
1126  }
1127  } else {
1128  lcmd = strlen(file) + 10;
1129  cmd = new char[lcmd];
1130  snprintf(cmd, lcmd, "cat %s", file);
1131  }
1132  TRACE(DBG, "cmd: " << cmd);
1133 
1134  // Execute the command in a pipe
1135  FILE *fp = popen(cmd, "r");
1136  if (!fp) {
1137  emsg = "could not run '";
1138  emsg += cmd;
1139  emsg += "'";
1140  TRACE(XERR, emsg);
1141  delete[] cmd;
1142  return (char *)0;
1143  }
1144  delete[] cmd;
1145 
1146  // Read line by line
1147  len = 0;
1148  char *buf = 0;
1149  char line[2048];
1150  int bufsiz = 0, left = 0, lines = 0;
1151  while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
1152  // Parse the line
1153  int llen = strlen(line);
1154  ltot -= llen;
1155  lines++;
1156  // (Re-)allocate the buffer
1157  if (!buf || (llen > left)) {
1158  int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1159  dsiz = (dsiz > llen) ? dsiz : llen;
1160  bufsiz += dsiz;
1161  buf = (char *)realloc(buf, bufsiz + 1);
1162  left += dsiz;
1163  }
1164  if (!buf) {
1165  emsg = "could not allocate enough memory on the heap: errno: ";
1166  emsg += (int)errno;
1167  TRACE(XERR, emsg);
1168  pclose(fp);
1169  return (char *)0;
1170  }
1171  // Add line to the buffer
1172  memcpy(buf + len, line, llen);
1173  len += llen;
1174  left -= llen;
1175  if (TRACING(HDBG))
1176  fprintf(stderr, "line: %s", line);
1177  }
1178 
1179  // Check the result and terminate the buffer
1180  if (buf) {
1181  if (len > 0) {
1182  buf[len] = 0;
1183  } else {
1184  free(buf);
1185  buf = 0;
1186  }
1187  }
1188 
1189  // Close file
1190  pclose(fp);
1191 
1192  // Done
1193  return buf;
1194 }
1195 
1196 ////////////////////////////////////////////////////////////////////////////////
1197 /// Send a read buffer request of length 'len' at offset 'ofs' for remote file
1198 /// defined by 'url'; the returned buffer must be freed by the caller.
1199 /// Returns 0 in case of error.
1200 
1201 char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
1202  kXR_int64 ofs, int &len, int grep)
1203 {
1204  XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
1205 
1206  TRACE(REQ, "url: " << (url ? url : "undef") <<
1207  ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
1208  ", len: " << len << ", grep: " << grep);
1209 
1210  // Check input
1211  if (!file || strlen(file) <= 0) {
1212  TRACE(XERR, "file undefined!");
1213  return (char *)0;
1214  }
1215  XrdClientUrlInfo u(url);
1216  if (!url || strlen(url) <= 0) {
1217  // Use file as url
1218  u.TakeUrl(XrdOucString(file));
1219  if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
1220  }
1221 
1222  // Get a connection (logs in)
1223  XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
1224 
1225  char *buf = 0;
1226  if (conn && conn->IsValid()) {
1227  // Prepare request
1228  XPClientRequest reqhdr;
1229  memset(&reqhdr, 0, sizeof(reqhdr));
1230  conn->SetSID(reqhdr.header.streamid);
1231  reqhdr.header.requestid = kXP_readbuf;
1232  reqhdr.readbuf.ofs = ofs;
1233  reqhdr.readbuf.len = len;
1234  reqhdr.readbuf.int1 = grep;
1235  reqhdr.header.dlen = strlen(file);
1236  const void *btmp = (const void *) file;
1237  char **vout = &buf;
1238  // Send over
1239  XrdClientMessage *xrsp =
1240  conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
1241 
1242  // If positive answer
1243  if (xrsp && buf && (xrsp->DataLen() > 0)) {
1244  len = xrsp->DataLen();
1245  } else {
1246  if (xrsp && !(xrsp->IsError()))
1247  // The buffer was just empty: do not call it error
1248  len = 0;
1249  SafeFree(buf);
1250  }
1251 
1252  // Clean the message
1253  SafeDelete(xrsp);
1254  // Clean it up, to avoid leaving open tcp connection possibly going forever
1255  // into CLOSE_WAIT
1256  SafeDelete(conn);
1257  }
1258 
1259  // Done
1260  return buf;
1261 }
1262 
1263 ////////////////////////////////////////////////////////////////////////////////
1264 /// Get log paths from next tier; used in multi-master setups
1265 /// Returns 0 in case of error.
1266 
1267 char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
1268 {
1269  XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1270 
1271  TRACE(REQ, "url: " << (url ? url : "undef") <<
1272  ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1273 
1274  // Check input
1275  if (!url || strlen(url) <= 0) {
1276  TRACE(XERR, "url undefined!");
1277  return (char *)0;
1278  }
1279 
1280  // Get a connection (logs in)
1281  XrdProofConn *conn = GetProofConn(url);
1282 
1283  char *buf = 0;
1284  if (conn && conn->IsValid()) {
1285  // Prepare request
1286  XPClientRequest reqhdr;
1287  memset(&reqhdr, 0, sizeof(reqhdr));
1288  conn->SetSID(reqhdr.header.streamid);
1289  reqhdr.header.requestid = kXP_admin;
1290  reqhdr.proof.int1 = kQueryLogPaths;
1291  reqhdr.proof.int2 = isess;
1292  reqhdr.proof.sid = -1;
1293  reqhdr.header.dlen = msg ? strlen(msg) : 0;
1294  const void *btmp = (const void *) msg;
1295  char **vout = &buf;
1296  // Send over
1297  XrdClientMessage *xrsp =
1298  conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
1299 
1300  // If positive answer
1301  if (xrsp && buf && (xrsp->DataLen() > 0)) {
1302  int len = xrsp->DataLen();
1303  buf = (char *) realloc((void *)buf, len + 1);
1304  if (buf)
1305  buf[len] = 0;
1306  } else {
1307  SafeFree(buf);
1308  }
1309 
1310  // Clean the message
1311  SafeDelete(xrsp);
1312  // Clean it up, to avoid leaving open tcp connection possibly going forever
1313  // into CLOSE_WAIT
1314  SafeDelete(conn);
1315  }
1316 
1317  // Done
1318  return buf;
1319 }
1320 
1321 ////////////////////////////////////////////////////////////////////////////////
1322 /// Get log paths from next tier; used in multi-master setups
1323 /// Returns 0 in case of error.
1324 
1325 char *XrdProofdNetMgr::ReadLogPaths(const char *msg, int isess)
1326 {
1327  XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1328 
1329  TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1330 
1331  char *buf = 0, *pbuf = buf;
1332  int len = 0;
1333  // Loop over unique nodes
1334  std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
1335  XrdProofWorker *w = 0;
1336  while (iw != fNodes.end()) {
1337  if ((w = *iw)) {
1338  // Do not send it to ourselves
1339  bool us = (((w->fHost.find("localhost") != STR_NPOS ||
1340  XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
1341  (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
1342  if (!us) {
1343  // Create 'url'
1344  XrdOucString u = fMgr->EffectiveUser();
1345  u += '@';
1346  u += w->fHost;
1347  if (w->fPort != -1) {
1348  u += ':';
1349  u += w->fPort;
1350  }
1351  // Ask the node
1352  char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
1353  if (bmst) {
1354  len += strlen(bmst) + 1;
1355  buf = (char *) realloc((void *)buf, len);
1356  pbuf = buf + len - strlen(bmst) - 1;
1357  memcpy(pbuf, bmst, strlen(bmst) + 1);
1358  buf[len - 1] = 0;
1359  pbuf = buf + len;
1360  free(bmst);
1361  }
1362  } else {
1363  TRACE(DBG, "request for ourselves: ignore");
1364  }
1365  }
1366  // Next worker
1367  ++iw;
1368  }
1369 
1370  // Done
1371  return buf;
1372 }
1373 
1374 ////////////////////////////////////////////////////////////////////////////////
1375 /// Fill-in fWorkers for a localhost based on the number of
1376 /// workers fNumLocalWrks.
1377 
1379 {
1380  XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
1381 
1382  TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
1383 
1384  // Lock the method to protect the lists.
1386 
1387  // Cleanup the worker list
1388  fWorkers.clear();
1389  // The first time we need to create the default workers
1390  if (fDfltWorkers.size() < 1) {
1391  // Create a default master line
1392  XrdOucString mm("master ", 128);
1393  mm += fMgr->Host();
1394  fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1395 
1396  // Create 'localhost' lines for each worker
1397  int nwrk = fNumLocalWrks;
1398  if (nwrk > 0) {
1399  mm = "worker localhost port=";
1400  mm += fMgr->Port();
1401  while (nwrk--) {
1402  fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1403  TRACE(DBG, "added line: " << mm);
1404  }
1405  }
1406  }
1407 
1408  // Copy the list
1409  std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
1410  for (; w != fDfltWorkers.end(); ++w) {
1411  fWorkers.push_back(*w);
1412  }
1413 
1414  TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
1415 
1416  // Find unique nodes
1417  FindUniqueNodes();
1418 
1419  // We are done
1420  return;
1421 }
1422 
1423 ////////////////////////////////////////////////////////////////////////////////
1424 /// Return the list of workers after having made sure that the info is
1425 /// up-to-date
1426 
1427 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
1428 {
1429  XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
1430 
1432 
1433  if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1434  // Check if there were any changes in the config file
1435  if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1436  if (fDfltFallback) {
1437  // Use default settings
1439  TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1440  } else {
1441  TRACE(XERR, "unable to read the configuration file");
1442  return (std::list<XrdProofWorker *> *)0;
1443  }
1444  }
1445  }
1446  TRACE(DBG, "returning list with " << fWorkers.size() << " entries");
1447 
1448  if (TRACING(HDBG)) Dump();
1449 
1450  return &fWorkers;
1451 }
1452 
1453 ////////////////////////////////////////////////////////////////////////////////
1454 /// Dump status
1455 
1457 {
1458  const char *xpdloc = "NetMgr::Dump";
1459 
1461 
1462  XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1463  XPDPRT("+ Active workers status");
1464  XPDPRT("+ Size: " << fWorkers.size());
1465  XPDPRT("+ ");
1466 
1467  std::list<XrdProofWorker *>::iterator iw;
1468  for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
1469  XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
1470  " active sessions:" << (*iw)->Active());
1471  }
1472  XPDPRT("+ ");
1473  XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1474 }
1475 
1476 ////////////////////////////////////////////////////////////////////////////////
1477 /// Return the list of unique nodes after having made sure that the info is
1478 /// up-to-date
1479 
1480 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
1481 {
1482  XPDLOC(NMGR, "NetMgr::GetNodes")
1483 
1485 
1486  if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1487  // Check if there were any changes in the config file
1488  if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1489  if (fDfltFallback) {
1490  // Use default settings
1492  TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1493  } else {
1494  TRACE(XERR, "unable to read the configuration file");
1495  return (std::list<XrdProofWorker *> *)0;
1496  }
1497  }
1498  }
1499  TRACE(DBG, "returning list with " << fNodes.size() << " entries");
1500 
1501  return &fNodes;
1502 }
1503 
1504 ////////////////////////////////////////////////////////////////////////////////
1505 /// Read PROOF config file and load the information in fWorkers.
1506 /// NB: 'master' information here is ignored, because it is passed
1507 /// via the 'xpd.workdir' and 'xpd.image' config directives
1508 
1510 {
1511  XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
1512 
1513  TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
1514 
1515  // Lock the method to protect the lists.
1517 
1518  // Check inputs
1519  if (fPROOFcfg.fName.length() <= 0)
1520  return -1;
1521 
1522  // Get the modification time
1523  struct stat st;
1524  if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
1525  // If the file disappeared, reset the modification time so that we are sure
1526  // to reload it if it comes back
1527  if (errno == ENOENT) fPROOFcfg.fMtime = -1;
1528  if (!fDfltFallback) {
1529  TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
1530  } else {
1531  TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
1532  }
1533  return -1;
1534  }
1535  TRACE(DBG, "time of last modification: " << st.st_mtime);
1536 
1537  // File should be loaded only once
1538  if (st.st_mtime <= fPROOFcfg.fMtime)
1539  return 0;
1540 
1541  // Save the modification time
1542  fPROOFcfg.fMtime = st.st_mtime;
1543 
1544  // Open the defined path.
1545  FILE *fin = 0;
1546  if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
1547  if (fWorkers.size() > 1) {
1548  TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
1549  TRACE(XERR, "continuing with existing list of workers.");
1550  return 0;
1551  } else {
1552  return -1;
1553  }
1554  }
1555 
1556  if (reset) {
1557  // Cleanup the worker list
1558  fWorkers.clear();
1559  }
1560 
1561  // Add default a master line if not yet there
1562  if (fRegWorkers.size() < 1) {
1563  XrdOucString mm("master ", 128);
1564  mm += fMgr->Host();
1565  fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
1566  } else {
1567  // Deactivate all current active workers
1568  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1569  // Skip the master line
1570  ++w;
1571  for (; w != fRegWorkers.end(); ++w) {
1572  (*w)->fActive = 0;
1573  }
1574  }
1575 
1576  // Read now the directives
1577  int nw = 0;
1578  char lin[2048];
1579  while (fgets(lin, sizeof(lin), fin)) {
1580  // Skip empty lines
1581  int p = 0;
1582  while (lin[p++] == ' ') {
1583  ;
1584  }
1585  p--;
1586  if (lin[p] == '\0' || lin[p] == '\n')
1587  continue;
1588 
1589  // Skip comments
1590  if (lin[0] == '#')
1591  continue;
1592 
1593  // Remove trailing '\n';
1594  if (lin[strlen(lin)-1] == '\n')
1595  lin[strlen(lin)-1] = '\0';
1596 
1597  TRACE(DBG, "found line: " << lin);
1598 
1599  // Parse the line
1600  XrdProofWorker *pw = new XrdProofWorker(lin);
1601 
1602  const char *pfx[2] = { "master", "node" };
1603  if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1604  !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1605  // Init a master instance
1606  if (pw->fHost.beginswith("localhost") ||
1607  pw->Matches(fMgr->Host())) {
1608  // Replace the default line (the first with what found in the file)
1609  XrdProofWorker *fw = fRegWorkers.front();
1610  fw->Reset(lin);
1611  }
1612  // Ignore it
1613  SafeDelete(pw);
1614  } else {
1615  // Check if we have already it
1616  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1617  // Skip the master line
1618  ++w;
1619  bool haveit = 0;
1620  while (w != fRegWorkers.end()) {
1621  if (!((*w)->fActive)) {
1622  if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
1623  (*w)->fActive = 1;
1624  haveit = 1;
1625  break;
1626  }
1627  }
1628  // Go to next
1629  ++w;
1630  }
1631  // If we do not have it, build a new worker object
1632  if (!haveit) {
1633  // Keep it
1634  fRegWorkers.push_back(pw);
1635  } else {
1636  // Drop it
1637  SafeDelete(pw);
1638  }
1639  }
1640  }
1641 
1642  // Copy the active workers
1643  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1644  while (w != fRegWorkers.end()) {
1645  if ((*w)->fActive) {
1646  fWorkers.push_back(*w);
1647  nw++;
1648  }
1649  ++w;
1650  }
1651 
1652  // Close files
1653  fclose(fin);
1654 
1655  // Find unique nodes
1656  if (reset)
1657  FindUniqueNodes();
1658 
1659  // We are done
1660  return ((nw == 0) ? -1 : 0);
1661 }
1662 
1663 ////////////////////////////////////////////////////////////////////////////////
1664 /// Scan fWorkers for unique nodes (stored in fNodes).
1665 /// Return the number of unque nodes.
1666 /// NB: 'master' information here is ignored, because it is passed
1667 /// via the 'xpd.workdir' and 'xpd.image' config directives
1668 
1670 {
1671  XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
1672 
1673  TRACE(REQ, "# workers: " << fWorkers.size());
1674 
1675  // Cleanup the nodes list
1676  fNodes.clear();
1677 
1678  // Build the list of unique nodes (skip the master line);
1679  if (fWorkers.size() > 1) {
1680  std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
1681  ++w;
1682  for (; w != fWorkers.end(); ++w) if ((*w)->fActive) {
1683  bool add = 1;
1684  std::list<XrdProofWorker *>::iterator n;
1685  for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
1686  if ((*n)->Matches(*w)) {
1687  add = 0;
1688  break;
1689  }
1690  }
1691  if (add)
1692  fNodes.push_back(*w);
1693  }
1694  }
1695  TRACE(REQ, "found " << fNodes.size() << " unique nodes");
1696 
1697  // We are done
1698  return fNodes.size();
1699 }
int DoDirectiveBonjour(char *val, XrdOucStream *cfg, bool)
static int GetNumCPUs()
Find out and return the number of CPUs in the local machine.
int DoDirectiveAdminReqTO(char *, XrdOucStream *, bool)
Process &#39;adminreqto&#39; directive.
void RegisterDirectives()
Register config directives.
XrdOucString fName
Definition: XrdProofdAux.h:76
void TakeUrl(XrdOucString url)
static constexpr double pi
auto * m
Definition: textangle.C:8
#define TRACING(x)
TLine * line
char * ReadBufferLocal(const char *file, kXR_int64 ofs, int &len)
Read a buffer of length &#39;len&#39; at offset &#39;ofs&#39; of local file &#39;path&#39;; the returned buffer must be freed...
int MessageSender(const char *msg, int len, void *arg)
Send up a message from the server.
static constexpr double us
struct XPClientReadbufRequest readbuf
XrdProofdNetMgr * NetMgr() const
std::list< XrdProofWorker * > fRegWorkers
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
int LocateLocalFile(XrdOucString &file)
Locate the exact file path allowing for wildcards &#39;*&#39; in the file name.
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
XrdProofdFile fPROOFcfg
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
int ReadPROOFcfg(bool reset=1)
Read PROOF config file and load the information in fWorkers.
XrdProofConn * GetProofConn(const char *url)
Get a XrdProofConn for url; create a new one if not available.
XrdSysRecMutex fMutex
void Reset(const char *str)
Set content from a config file-like string.
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server&#39;s response.
XrdOucString fUser
static constexpr double mm
#define XPD_LONGOK(x)
#define malloc
Definition: civetweb.c:1347
int DoDirectiveResource(char *, XrdOucStream *, bool)
Process &#39;resource&#39; directive.
XrdProofdManager * fMgr
#define XPDPRT(x)
struct ClientRequestHdr header
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
std::list< XrdProofWorker * > fNodes
int BroadcastCtrlC(const char *usr)
Broadcast a ctrlc interrupt Return 0 on success, -1 on error.
const char * User() const
XrdBuffer * Argp() const
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
#define SafeDelArray(x)
Definition: XrdProofdAux.h:338
const char * GetLastErr()
Definition: XrdProofConn.h:136
#define EnvPutInt(name, val)
Definition: XrdClientEnv.hh:47
virtual ~XrdProofdNetMgr()
Destructor.
int SrvType() const
bool Matches(const char *host)
Check compatibility of host with this instance.
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
#define realloc
Definition: civetweb.c:1349
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
#define XPDLOC(d, x)
int FindUniqueNodes()
Scan fWorkers for unique nodes (stored in fNodes).
void CreateDefaultPROOFcfg()
Fill-in fWorkers for a localhost based on the number of workers fNumLocalWrks.
#define XPDERR(x)
ROOT::R::TRInterface & r
Definition: Object.C:4
std::list< XrdProofWorker * > * GetNodes()
Return the list of unique nodes after having made sure that the info is up-to-date.
#define XrdSysMutexHelper
Definition: XrdSysToOuc.h:17
struct XPClientProofRequest proof
const char * Host() const
#define XrdSysError
Definition: XpdSysError.h:8
#define TRACEP(p, act, x)
double floor(double)
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
int DoDirectiveWorker(char *, XrdOucStream *, bool)
Process &#39;worker&#39; directive.
int ReadBuffer(XrdProofdProtocol *p)
Process a readbuf request.
#define kXPD_AnyServer
XrdOucString GetUrl()
static unsigned int total
#define SafeDelete(p)
Definition: RConfig.h:529
#define d(i)
Definition: RSha256.hxx:102
std::list< XrdProofWorker * > fWorkers
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
#define XPDFORM
Definition: XrdProofdAux.h:381
#define kXPD_Worker
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an &#39;if&#39; directive If none (valid) is found, return -1.
std::list< XrdProofWorker * > fDfltWorkers
static char * Expand(char *p)
Expand path &#39;p&#39; relative to: $HOME if begins with ~/ <user>&#39;s $HOME if begins with ~<user>/ $PWD if d...
#define xrdmin(a, b)
int type
Definition: TGX11.cxx:120
#define SafeFree(x)
Definition: XrdProofdAux.h:341
#define free
Definition: civetweb.c:1350
static constexpr double s
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
const char * EffectiveUser() const
void BalanceNodesOrder()
Indices (this will be used twice).
#define XPD_SETRESP(p, x)
bool IsLocal(const char *host, bool checkport=0)
Check if &#39;host&#39; is this local host.
#define NAME_REQUESTTIMEOUT
XrdOucString fName
Definition: XrdProofdAux.h:111
Definition: file.py:1
void Dump()
Dump status.
char * ReadBufferRemote(const char *url, const char *file, kXR_int64 ofs, int &len, int grep)
Send a read buffer request of length &#39;len&#39; at offset &#39;ofs&#39; for remote file defined by &#39;url&#39;; the retu...
#define snprintf
Definition: civetweb.c:1351
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
bool IsValid() const
Test validity of this connection.
#define kXPD_Master
XrdProofdClient * Client() const
virtual int Config(bool rcf=0)
int Send(void)
Auxilliary Send method.
const Int_t n
Definition: legend1.C:16
void Register(const char *dname, XrdProofdDirective *d)
XPClientRequest * Request() const
XrdProofdNetMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
XrdOucString fHost