Logo ROOT   6.10/09
Reference Guide
XrdProofSched.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: G. Ganis September 2007
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // XrdProofSched //
15 // //
16 // Authors: G. Ganis, CERN, 2007 //
17 // //
18 // Interface for a PROOF scheduler. //
19 // Alternative scheduler implementations should be provided as shared //
20 // library containing an implementation of this class. The library //
21 // must also provide a function to load an instance of this class //
22 // with the following signature (see commented example below): //
23 // extern "C" { //
24 // XrdProofSched *XrdgetProofSched(const char *cfg, //
25 // XrdProofdManager *mgr, //
26 // XrdProofGroupMgr *grpmgr, //
27 // XrdSysError *edest); //
28 // } //
29 // Here 'cfg' is the xrootd config file where directives to configure //
30 // the scheduler are specified, 'mgr' is the instance of the cluster //
31 // manager from where the scheduler can get info about the available //
32 // workers and their status, 'grpmgr' is the instance of the group //
33 // bringing the definition of the groups for this run, and 'edest' is //
34 // instance of the error logger to be used. //
35 // The scheduler is identified by a name of max 16 chars. //
36 // //
37 //////////////////////////////////////////////////////////////////////////
38 
39 #include <list>
40 
41 #include "XProofProtocol.h"
42 #include "XrdProofdManager.h"
43 #include "XrdProofdNetMgr.h"
44 #include "XrdProofdProofServMgr.h"
45 #include "XrdProofGroup.h"
46 #include "XrdProofSched.h"
47 #include "XrdProofdProofServ.h"
48 #include "XrdProofWorker.h"
49 
50 #include "XrdOuc/XrdOucString.hh"
51 #include "XrdOuc/XrdOucStream.hh"
52 
53 #include "XpdSysError.h"
54 
55 // Tracing
56 #include "XrdProofdTrace.h"
57 
58 //
59 // Example of scheduler loader for an implementation called XrdProofSchedDyn
60 //
61 // extern "C" {
62 // //______________________________________________________________________________
63 // XrdProofSched *XrdgetProofSched(const char *cfg, XrdProofdManager *mgr,
64 // XrdProofGroupMgr *grpmgr, XrdSysError *edest)
65 // {
66 // // This scheduler is meant to live in a shared library. The interface below is
67 // // used by the server to obtain a copy of the scheduler object.
68 //
69 // XrdProofSchedDyn *pss = new XrdProofSchedDyn(mgr, grpmgr, edest);
70 // if (pss && pss->Config(cfg) == 0) {
71 // return (XrdProofSched *) pss;
72 // }
73 // if (pss)
74 // delete pss;
75 // return (XrdProofSched *)0;
76 // }}
77 
78 //--------------------------------------------------------------------------
79 //
80 // XrdProofSchedCron
81 //
82 // Scheduler thread
83 //
84 ////////////////////////////////////////////////////////////////////////////////
85 /// This is an endless loop to check the system periodically or when
86 /// triggered via a message in a dedicated pipe
87 
88 void *XrdProofSchedCron(void *p)
89 {
90  XPDLOC(SCHED, "SchedCron")
91 
92  XrdProofSched *sched = (XrdProofSched *)p;
93  if (!(sched)) {
94  TRACE(XERR, "undefined scheduler: cannot start");
95  return (void *)0;
96  }
97 
98  // Time of last session check
99  int lastcheck = time(0), ckfreq = sched->CheckFrequency(), deltat = 0;
100  while(1) {
101  // We wait for processes to communicate a session status change
102  if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
103  deltat = ckfreq;
104  int pollRet = sched->Pipe()->Poll(deltat);
105 
106  if (pollRet > 0) {
107  // Read message
108  XpdMsg msg;
109  int rc = 0;
110  if ((rc = sched->Pipe()->Recv(msg)) != 0) {
111  XPDERR("problems receiving message; errno: "<<-rc);
112  continue;
113  }
114  // Parse type
115  XrdOucString buf;
116  if (msg.Type() == XrdProofSched::kReschedule) {
117 
118  TRACE(ALL, "received kReschedule");
119 
120  // Reschedule
121  sched->Reschedule();
122 
123  } else {
124 
125  TRACE(XERR, "unknown type: "<<msg.Type());
126  continue;
127  }
128  } else {
129  // Notify
130  TRACE(ALL, "running regular checks");
131  // Run regular rescheduling checks
132  sched->Reschedule();
133  // Remember when ...
134  lastcheck = time(0);
135  }
136  }
137 
138  // Should never come here
139  return (void *)0;
140 }
141 
142 ////////////////////////////////////////////////////////////////////////////////
143 /// Compare two workers for sorting
144 
145 static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
146 {
147  return ((lhs && rhs &&
148  lhs->GetNActiveSessions() < rhs->GetNActiveSessions()) ? 1 : 0);
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Generic directive processor
153 
154 int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
155 {
156  if (!d || !(d->fVal))
157  // undefined inputs
158  return -1;
159 
160  return ((XrdProofSched *)d->fVal)->ProcessDirective(d, val, cfg, rcf);
161 }
162 
163 ////////////////////////////////////////////////////////////////////////////////
164 /// Constructor
165 
167  XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr,
168  const char *cfn, XrdSysError *e)
169  : XrdProofdConfig(cfn, e)
170 {
171  fValid = 1;
172  fMgr = mgr;
173  fGrpMgr = grpmgr;
174  fNextWrk = 1;
175  fEDest = e;
176  fUseFIFO = 0;
177  ResetParameters();
178 
179  memset(fName, 0, kXPSMXNMLEN);
180  if (name)
181  memcpy(fName, name, kXPSMXNMLEN-1);
182 
183  // Configuration directives
185 }
186 
187 ////////////////////////////////////////////////////////////////////////////////
188 /// Register directives for configuration
189 
191 {
192  Register("schedparam", new XrdProofdDirective("schedparam", this, &DoDirectiveClass));
193  Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
194 }
195 
196 ////////////////////////////////////////////////////////////////////////////////
197 /// Update the priorities of the active sessions.
198 
200  char *val, XrdOucStream *cfg, bool rcf)
201 {
202  XPDLOC(SCHED, "Sched::DoDirective")
203 
204  if (!d)
205  // undefined inputs
206  return -1;
207 
208  if (d->fName == "schedparam") {
209  return DoDirectiveSchedParam(val, cfg, rcf);
210  } else if (d->fName == "resource") {
211  return DoDirectiveResource(val, cfg, rcf);
212  }
213  TRACE(XERR,"unknown directive: "<<d->fName);
214  return -1;
215 }
216 
217 
218 ////////////////////////////////////////////////////////////////////////////////
219 /// Reset values for the configurable parameters
220 
222 {
223  fMaxSessions = -1;
224  fMaxRunning = -1;
225  fWorkerMax = -1;
227  fOptWrksPerUnit = 1;
228  fMinForQuery = 0;
229  fNodesFraction = 0.5;
230  fCheckFrequency = 30;
231 }
232 
233 ////////////////////////////////////////////////////////////////////////////////
234 /// Configure this instance using the content of file 'cfn'.
235 /// Return 0 on success, -1 in case of failure (file does not exists
236 /// or containing incoherent information).
237 
239 {
240  XPDLOC(SCHED, "Sched::Config")
241 
242  // Run first the configurator
243  if (XrdProofdConfig::Config(rcf) != 0) {
244  XPDERR("problems parsing file ");
245  fValid = 0;
246  return -1;
247  }
248 
249  int rc = 0;
250 
251  XrdOucString msg;
252 
253  // Notify
254  XPDFORM(msg, "maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
256  TRACE(DBG, msg);
257 
258  if (!rcf) {
259  // Start cron thread
260  pthread_t tid;
261  if (XrdSysThread::Run(&tid, XrdProofSchedCron,
262  (void *)this, 0, "Scheduler cron thread") != 0) {
263  XPDERR("could not start cron thread");
264  fValid = 0;
265  return 0;
266  }
267  TRACE(ALL, "cron thread started");
268  }
269 
270  // Done
271  return rc;
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Queue a query in the session; if this is the first querym enqueue also
276 /// the session
277 
279 {
280  XPDDOM(SCHED)
281 
282  if (xps->Enqueue(query) == 1) {
283  std::list<XrdProofdProofServ *>::iterator ii;
284  for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
285  if ((*ii)->Status() == kXPD_running) break;
286  }
287  if (ii != fQueue.end()) {
288  fQueue.insert(ii, xps);
289  } else {
290  fQueue.push_back(xps);
291  }
292  }
293  if (TRACING(DBG)) DumpQueues("Enqueue");
294 
295  return 0;
296 }
297 
298 ////////////////////////////////////////////////////////////////////////////////
299 /// Dump the content of the waiting sessions queue
300 
301 void XrdProofSched::DumpQueues(const char *prefix)
302 {
303  XPDLOC(SCHED, "DumpQueues")
304 
305  TRACE(ALL," ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306  if (prefix) TRACE(ALL, " +++ Called from: "<<prefix);
307  TRACE(ALL," +++ # of waiting sessions: "<<fQueue.size());
308  std::list<XrdProofdProofServ *>::iterator ii;
309  int i = 0;
310  for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
311  TRACE(ALL," +++ #"<<++i<<" client:"<< (*ii)->Client()<<" # of queries: "<< (*ii)->Queries()->size());
312  }
313  TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
314 
315  return;
316 }
317 
318 ////////////////////////////////////////////////////////////////////////////////
319 /// Get first valid session.
320 /// The dataset information can be used to assign workers.
321 
323 {
324  XPDDOM(SCHED)
325 
326  if (fQueue.empty())
327  return 0;
328  XrdProofdProofServ *xps = fQueue.front();
329  while (xps && !(xps->IsValid())) {
330  fQueue.remove(xps);
331  xps = fQueue.front();
332  }
333  if (TRACING(DBG)) DumpQueues("FirstSession");
334  // The session will be removed after workers are assigned
335  return xps;
336 }
337 
338 ////////////////////////////////////////////////////////////////////////////////
339 /// Calculate the number of workers to be used given the state of the cluster
340 
342 {
343  XPDLOC(SCHED, "Sched::GetNumWorkers")
344 
345  // Go through the list of hosts and see how many CPUs are not used.
346  int nFreeCPUs = 0;
347  std::list<XrdProofWorker *> *wrks = fMgr->NetMgr()->GetActiveWorkers();
348  std::list<XrdProofWorker *>::iterator iter;
349  for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
350  TRACE(DBG, (*iter)->fImage<<" : # act: "<<(*iter)->fProofServs.size());
351  if ((*iter)->fType != 'M' && (*iter)->fType != 'S'
352  && (int) (*iter)->fProofServs.size() < fOptWrksPerUnit)
353  // add number of free slots
354  nFreeCPUs += fOptWrksPerUnit - (*iter)->fProofServs.size();
355  }
356 
357  float priority = 1;
358  XrdProofGroup *grp = 0;
359  if (fGrpMgr && xps->Group())
360  grp = fGrpMgr->GetGroup(xps->Group());
361  if (grp) {
362  std::list<XrdProofdProofServ *> *sessions = fMgr->SessionMgr()->ActiveSessions();
363  std::list<XrdProofdProofServ *>::iterator sesIter;
364  float summedPriority = 0;
365  for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
366  if ((*sesIter)->Group()) {
367  XrdProofGroup *g = fGrpMgr->GetGroup((*sesIter)->Group());
368  if (g)
369  summedPriority += g->Priority();
370  }
371  }
372  if (summedPriority > 0)
373  priority = (grp->Priority() * sessions->size()) / summedPriority;
374  }
375 
376  int nWrks = (int)(nFreeCPUs * fNodesFraction * priority);
377  if (nWrks <= fMinForQuery) {
378  nWrks = fMinForQuery;
379  } else if (nWrks >= (int) wrks->size()) {
380  nWrks = wrks->size() - 1;
381  }
382  TRACE(DBG, nFreeCPUs<<" : "<< nWrks);
383 
384  return nWrks;
385 }
386 
387 ////////////////////////////////////////////////////////////////////////////////
388 /// Get a list of workers that can be used by session 'xps'.
389 /// The return code is:
390 /// -1 Some failure occured; cannot continue
391 /// 0 A new list has been assigned to the session 'xps' and
392 /// returned in 'wrks'
393 /// 1 The list currently assigned to the session is the one
394 /// to be used
395 /// 2 No worker could be assigned now; session should be queued
396 
398  std::list<XrdProofWorker *> *wrks,
399  const char *querytag)
400 {
401  XPDLOC(SCHED, "Sched::GetWorkers")
402 
403  int rc = 0;
404 
405  // The caller must provide a proofserv session handler
406  if (!xps)
407  return -1;
408 
409  TRACE(REQ, "enter: query tag: "<< ((querytag) ? querytag : ""));
410 
411  // Static or dynamic
412  bool isDynamic = 1;
413  if (querytag && !strncmp(querytag, XPD_GW_Static, strlen(XPD_GW_Static) - 1)) {
414  isDynamic = 0;
415  }
416 
417  // Check if the current assigned list of workers is valid
418  if (querytag && xps && xps->Workers()->Num() > 0) {
419  if (TRACING(REQ)) xps->DumpQueries();
420  const char *cqtag = (xps->CurrentQuery()) ? xps->CurrentQuery()->GetTag() : "undef";
421  TRACE(REQ, "current query tag: "<< cqtag );
422  if (!strcmp(querytag, cqtag)) {
423  // Remove the query to be processed from the queue
424  xps->RemoveQuery(cqtag);
425  TRACE(REQ, "current assignment for session "<< xps->SrvPID() << " is valid");
426  // Current assignement is valid
427  return 1;
428  }
429  }
430 
431  // The caller must provide a list where to store the result
432  if (!wrks)
433  return -1;
434 
435  // If the session has already assigned workers or there are
436  // other queries waiting - just enqueue
437  // FIFO is enforced by dynamic mode so it is checked just in case
438  if (isDynamic) {
439  if (fUseFIFO && xps->Workers()->Num() > 0) {
440  if (!xps->GetQuery(querytag))
441  Enqueue(xps, new XrdProofQuery(querytag));
442  if (TRACING(DBG)) xps->DumpQueries();
443  // Signal enqueing
444  TRACE(REQ, "session has already assigned workers: enqueue");
445  return 2;
446  }
447  }
448 
449  // The current, full list
450  std::list<XrdProofWorker *> *acws = 0;
451 
452  if (!fMgr || !(acws = fMgr->NetMgr()->GetActiveWorkers()))
453  return -1;
454 
455  // Point to the master element
456  XrdProofWorker *mst = acws->front();
457  if (!mst)
458  return -1;
459 
460  if (fWorkerSel == kSSOLoadBased) {
461  // Dynamic scheduling: the scheduler will determine the #workers
462  // to be used based on the current load and assign the least loaded ones
463 
464  // Sort the workers by the load
466 
467  // Get the advised number
468  int nw = GetNumWorkers(xps);
469 
470  if (nw > 0) {
471  // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
472  wrks->push_back(mst);
473 
474  std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
475  while (nw--) {
476  nxWrk++;
477  // Add export version of the info
478  // (stats are updated in XrdProofdProtocol::GetWorkers)
479  wrks->push_back(*nxWrk);
480  }
481  } else {
482  // if no workers were assigned
483  // enqueue or send a list with only the master (processing refused)
484  if (fUseFIFO) {
485  // Enqueue the query/session
486  // the returned list of workers was not filled
487  if (!xps->GetQuery(querytag))
488  Enqueue(xps, new XrdProofQuery(querytag));
489  if (TRACING(DBG)) xps->DumpQueries();
490  // Signal enqueing
491  TRACE(REQ, "no workers currently available: session enqueued");
492  return 2;
493  } else {
494  // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
495  wrks->push_back(mst);
496  }
497  }
498  // Done
499  return 0;
500  }
501 
502  // Check if the check on the max number of sessions is enabled
503  // We need at least 1 master and a worker
504  std::list<XrdProofWorker *> *acwseff = 0;
505  int maxnum = (querytag && strcmp(querytag, XPD_GW_Static)) ? fMaxRunning : fMaxSessions;
506  bool ok = 0;
507  if (isDynamic) {
508  if (maxnum > 0) {
509  acwseff = new std::list<XrdProofWorker *>;
510  std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
511  if ((*xWrk)->Active() < maxnum) {
512  acwseff->push_back(*xWrk);
513  xWrk++;
514  for (; xWrk != acws->end(); xWrk++) {
515  if ((*xWrk)->Active() < maxnum) {
516  acwseff->push_back(*xWrk);
517  ok = 1;
518  }
519  }
520  } else if (!fUseFIFO) {
521  TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
522  }
523  // Check the result
524  if (!ok) { delete acwseff; acwseff = 0; }
525  acws = acwseff;
526  }
527  } else {
528  if (maxnum > 0) {
529  // This is over-conservative for sub-selection (random, or round-robin options)
530  // A better solution should be implemented for that.
531  int nactsess = mst->GetNActiveSessions();
532  TRACE(REQ, "act sess ... " << nactsess);
533  if (nactsess < maxnum) {
534  ok = 1;
535  } else if (!fUseFIFO) {
536  TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
537  }
538  // Check the result
539  if (!ok) acws = acwseff;
540  }
541  }
542 
543  // Make sure that something has been found
544  if (!acws || (acws && acws->size() <= 1)) {
545  if (fUseFIFO) {
546  // Enqueue the query/session
547  // the returned list of workers was not filled
548  if (!xps->GetQuery(querytag))
549  Enqueue(xps, new XrdProofQuery(querytag));
550  if (TRACING(REQ)) xps->DumpQueries();
551  // Notify enqueing
552  TRACE(REQ, "no workers currently available: session enqueued");
553  SafeDel(acwseff);
554  return 2;
555  } else {
556  TRACE(XERR, "no worker available: do nothing");
557  SafeDel(acwseff);
558  return -1;
559  }
560  }
561 
562  // If a non-dynamic session already has assigned workers just return
563  if (!isDynamic && (xps->Workers()->Num() > 0)) {
564  // Current assignement is valid
565  SafeDel(acwseff);
566  return 1;
567  }
568 
569  // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
570  wrks->push_back(mst);
571 
572  if (fWorkerMax > 0 && fWorkerMax < (int) acws->size()) {
573 
574  // Now the workers
575  if (fWorkerSel == kSSORandom) {
576  // Random: the first time init the machine
577  static bool rndmInit = 0;
578  if (!rndmInit) {
579  const char *randdev = "/dev/urandom";
580  int fd;
581  unsigned int seed;
582  if ((fd = open(randdev, O_RDONLY)) != -1) {
583  if (read(fd, &seed, sizeof(seed)) != sizeof(seed)) {
584  TRACE(XERR, "problems reading seed; errno: "<< errno);
585  }
586  srand(seed);
587  close(fd);
588  rndmInit = 1;
589  }
590  }
591  // Selection
592  int nwt = acws->size();
593  std::vector<int> walloc(nwt, 0);
594  std::vector<XrdProofWorker *> vwrk(nwt);
595 
596  // Fill the vector with cumulative number of actives
597  int namx = -1;
598  int i = 1;
599  std::list<XrdProofWorker *>::iterator iwk = acws->begin();
600  iwk++; // Skip master
601  for ( ; iwk != acws->end(); iwk++) {
602  vwrk[i] = *iwk;
603  int na = (*iwk)->Active();
604  printf(" %d", na);
605  walloc[i] = na + walloc[i-1];
606  i++;
607  namx = (na > namx) ? na : namx;
608  }
609  printf("\n");
610  // Normalize
611  for (i = 1; i < nwt; i++) {
612  if (namx > 0)
613  walloc[i] = namx*i - walloc[i] + i;
614  else
615  walloc[i] = i;
616  }
617  int natot = walloc[nwt - 1];
618 
619  int nw = fWorkerMax;
620  while (nw--) {
621  // Normalized number
622  int maxAtt = 10000, natt = 0;
623  int iw = -1;
624  while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
625  int jw = rand() % natot;
626  for (i = 0; i < nwt; i++) {
627  if (jw < walloc[i]) {
628  // re-normalize the weights for the higher index entries
629  int j = 0;
630  for (j = i; j < nwt; j++) {
631  if (walloc[j] > 0)
632  walloc[j]--;
633  }
634  natot--;
635  iw = i;
636  break;
637  }
638  }
639  }
640 
641  if (iw > -1) {
642  // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
643  wrks->push_back(vwrk[iw]);
644  } else {
645  // Unable to generate the right number
646  TRACE(XERR, "random generation failed");
647  rc = -1;
648  break;
649  }
650  }
651 
652  } else {
653  if (fNextWrk >= (int) acws->size())
654  fNextWrk = 1;
655  int iw = 0;
656  std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
657  int nw = fWorkerMax;
658  while (nw--) {
659  while (iw != fNextWrk) {
660  nxWrk++;
661  iw++;
662  }
663  // Add export version of the info
664  // (stats are updated in XrdProofdProtocol::GetWorkers)
665  wrks->push_back(*nxWrk);
666  // Update next worker index
667  fNextWrk++;
668  if (fNextWrk >= (int) acws->size()) {
669  fNextWrk = 1;
670  iw = 0;
671  nxWrk = acws->begin();
672  }
673  }
674  }
675  } else {
676  // The full list
677  std::list<XrdProofWorker *>::iterator iw = acws->begin();
678  iw++;
679  while (iw != acws->end()) {
680  // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
681  wrks->push_back(*iw);
682  iw++;
683  }
684  }
685 
686  // Make sure that something has been found
687  if (wrks->size() <= 1) {
688  TRACE(XERR, "no worker found: do nothing");
689  rc = -1;
690  }
691 
692  // Cleanup
693  if (acwseff) { delete acwseff; acwseff = 0; }
694 
695  return rc;
696 }
697 
698 ////////////////////////////////////////////////////////////////////////////////
699 /// Consider starting some query from the queue.
700 /// to be called after some resources are free (e.g. by a finished query)
701 /// This method is doing the full transaction of finding the session to
702 /// resume, assigning it workers and sending a resume message.
703 /// In this way there is not possibility of interference with other GetWorkers
704 /// return 0 in case of success and -1 in case of an error
705 
707 {
708  XPDLOC(SCHED, "Sched::Reschedule")
709 
710  if (fUseFIFO && TRACING(DBG)) DumpQueues("Reschedule");
711 
712  if (!fQueue.empty()) {
713  // Any advanced scheduling algorithms can be done here
714 
716  if (!xps) {
717  TRACE(XERR, "got undefined session: protocol error!");
718  return -1;
719  }
720  XrdOucString wrks;
721  // Call GetWorkers in the manager to mark the assignment.
722  XrdOucString qtag;
723  if (xps && xps->CurrentQuery()) {
724  qtag = xps->CurrentQuery()->GetTag();
725  if (qtag.beginswith(XPD_GW_Static)) {
726  qtag = XPD_GW_Static;
727  qtag.replace(":","");
728  }
729  }
730  if (fMgr->GetWorkers(wrks, xps, qtag.c_str()) < 0 ) {
731  // Something wrong
732  TRACE(XERR, "failure from GetWorkers: protocol error!");
733  return -1;
734  } else {
735  // Send buffer
736  // if workers were assigned remove the session from the queue
737  if (wrks.length() > 0 && wrks != XPD_GW_QueryEnqueued) {
738  // Send the resume message: the workers will be send in response to a
739  // GetWorkers message
740  xps->Resume();
741  // Acually remove the session from the queue
742  fQueue.remove(xps);
743  // Put the session at the end of the queue
744  // > 1 because the query is kept in the queue until 2nd GetWorkers
745  if (xps->Queries()->size() > 1)
746  fQueue.push_back(xps);
747  if (TRACING(DBG)) DumpQueues("Reschedule 2");
748  } // else add workers to the running sessions (once it's possible)
749 
750  }
751 
752  } //else add workers to the running sessions (once it's possible)
753 
754  return 0;
755 }
756 
757 ////////////////////////////////////////////////////////////////////////////////
758 /// Fill sbuf with some info about our current status
759 
760 int XrdProofSched::ExportInfo(XrdOucString &sbuf)
761 {
762  // Selection type
763  const char *osel[] = { "all", "round-robin", "random", "load-based"};
764  sbuf += "Selection: ";
765  sbuf += osel[fWorkerSel+1];
766  if (fWorkerSel > -1) {
767  sbuf += ", max workers: ";
768  sbuf += fWorkerMax; sbuf += " &";
769  }
770 
771  // The full list
772  std::list<XrdProofWorker *> *acws = fMgr->NetMgr()->GetActiveWorkers();
773  std::list<XrdProofWorker *>::iterator iw;
774  for (iw = acws->begin(); iw != acws->end(); ++iw) {
775  sbuf += (*iw)->fType;
776  sbuf += ": "; sbuf += (*iw)->fHost;
777  if ((*iw)->fPort > -1) {
778  sbuf += ":"; sbuf += (*iw)->fPort;
779  } else
780  sbuf += " ";
781  sbuf += " sessions: "; sbuf += (*iw)->Active();
782  sbuf += " &";
783  }
784 
785  // Done
786  return 0;
787 }
788 
789 ////////////////////////////////////////////////////////////////////////////////
790 /// Update the priorities of the active sessions.
791 
793  char *val, XrdOucStream *cfg, bool rcf)
794 {
795  XPDLOC(SCHED, "Sched::ProcessDirective")
796 
797  if (!d)
798  // undefined inputs
799  return -1;
800 
801  if (d->fName == "schedparam") {
802  return DoDirectiveSchedParam(val, cfg, rcf);
803  } else if (d->fName == "resource") {
804  return DoDirectiveResource(val, cfg, rcf);
805  }
806  TRACE(XERR, "unknown directive: "<<d->fName);
807  return -1;
808 }
809 
810 ////////////////////////////////////////////////////////////////////////////////
811 /// Process 'schedparam' directive
812 
813 int XrdProofSched::DoDirectiveSchedParam(char *val, XrdOucStream *cfg, bool)
814 {
815  XPDLOC(SCHED, "Sched::DoDirectiveSchedParam")
816 
817  if (!val || !cfg)
818  // undefined inputs
819  return -1;
820 
821  // Get the parameters
822  while (val && val[0]) {
823  XrdOucString s(val);
824  if (s.beginswith("wmx:")) {
825  s.replace("wmx:","");
826  fWorkerMax = strtol(s.c_str(), (char **)0, 10);
827  } else if (s.beginswith("mxsess:")) {
828  s.replace("mxsess:","");
829  fMaxSessions = strtol(s.c_str(), (char **)0, 10);
830  } else if (s.beginswith("mxrun:")) {
831  s.replace("mxrun:","");
832  fMaxRunning = strtol(s.c_str(), (char **)0, 10);
833  } else if (s.beginswith("selopt:")) {
834  if (s.endswith("random"))
836  else if (s.endswith("load"))
838  else
840  } else if (s.beginswith("fraction:")) {
841  s.replace("fraction:","");
842  fNodesFraction = strtod(s.c_str(), (char **)0);
843  } else if (s.beginswith("optnwrks:")) {
844  s.replace("optnwrks:","");
845  fOptWrksPerUnit = strtol(s.c_str(), (char **)0, 10);
846  } else if (s.beginswith("minforquery:")) {
847  s.replace("minforquery:","");
848  fMinForQuery = strtol(s.c_str(), (char **)0, 10);
849  } else if (s.beginswith("queue:")) {
850  if (s.endswith("fifo")) {
851  fUseFIFO = 1;
852  }
853  } else if (strncmp(val, "default", 7)) {
854  // This line applies to another scheduler
855  ResetParameters();
856  break;
857  }
858  val = cfg->GetWord();
859  }
860 
861  // If the max number of sessions is limited then there is no lower bound
862  // the number of workers per query
863  if (fMaxSessions > 0) {
864  fMinForQuery = 0;
865  // And there is an upper limit on the number of running sessions
866  if (fMaxRunning < 0 || fMaxRunning > fMaxSessions)
868  }
869 
870  // The FIFO size make sense only in non-load based mode
871  if (fWorkerSel == kSSOLoadBased && fMaxRunning > 0) {
872  TRACE(ALL, "WARNING: in Load-Based mode the max number of sessions"
873  " to be run is determined dynamically");
874  }
875 
876  return 0;
877 }
878 
879 ////////////////////////////////////////////////////////////////////////////////
880 /// Process 'resource' directive
881 
882 int XrdProofSched::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
883 {
884  if (!val || !cfg)
885  // undefined inputs
886  return -1;
887 
888  // Get the scheduler name
889  if (strncmp(val, "static", 6) && strncmp(val, "default", 7))
890  return 0;
891  // Get the values
892  while ((val = cfg->GetWord()) && val[0]) {
893  XrdOucString s(val);
894  if (s.beginswith("wmx:")) {
895  s.replace("wmx:","");
896  fWorkerMax = strtol(s.c_str(), (char **)0, 10);
897  } else if (s.beginswith("mxsess:")) {
898  s.replace("mxsess:","");
899  fMaxSessions = strtol(s.c_str(), (char **)0, 10);
900  } else if (s.beginswith("selopt:")) {
901  if (s.endswith("random"))
903  else
905  }
906  }
907  return 0;
908 }
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
double read(const std::string &file_name)
reading
int Resume()
Send a resume message to the this session.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
const char * Group() const
#define SafeDel(x)
Definition: XrdProofdAux.h:335
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
int Enqueue(XrdProofQuery *q)
#define TRACING(x)
XrdProofdProofServMgr * SessionMgr() const
int nwt
Definition: THbookFile.cxx:91
std::list< XrdProofdProofServ * > * ActiveSessions()
XrdOucHash< XrdProofWorker > * Workers() const
XrdProofdNetMgr * NetMgr() const
int CheckFrequency() const
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
const char *const XPD_GW_QueryEnqueued
char fName[kXPSMXNMLEN]
int Type() const
Definition: XrdProofdAux.h:194
std::list< XrdProofQuery * > * Queries() const
XrdProofGroupMgr * fGrpMgr
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
#define kXPSMXNMLEN
Definition: XrdProofSched.h:53
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group &#39;grp.
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker *> *, const char *)
Get a list of workers that can be used by session &#39;xps&#39;.
const char * GetTag()
int Recv(XpdMsg &msg)
Recv message from the pipe.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process &#39;schedparam&#39; directive.
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
#define XPDLOC(d, x)
XrdProofdPipe * Pipe()
virtual int Config(bool rcf=0)
Configure this instance using the content of file &#39;cfn&#39;.
XrdProofdManager * fMgr
#define XPDERR(x)
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
float Priority() const
Definition: XrdProofGroup.h:80
std::list< XrdProofdProofServ * > fQueue
#define XPDDOM(d)
virtual void ResetParameters()
Reset values for the configurable parameters.
#define XrdSysError
Definition: XpdSysError.h:8
XrdSysError * fEDest
#define XPDFORM
Definition: XrdProofdAux.h:381
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
Definition: TRolke.cxx:630
static void Sort(std::list< XrdProofWorker *> *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by &#39;f&#39;; &#39;f&#39; should return &#39;tru...
virtual void RegisterDirectives()
Register directives for configuration.
double fNodesFraction
const char *const XPD_GW_Static
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process &#39;resource&#39; directive.
XrdOucString fName
Definition: XrdProofdAux.h:111
virtual int Reschedule()
Consider starting some query from the queue.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
virtual int Config(bool rcf=0)
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdSysError *e=0)
Constructor.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
void Register(const char *dname, XrdProofdDirective *d)
XrdProofQuery * CurrentQuery()