ROOT  6.06/09
Reference Guide
XrdProofSched.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: G. Ganis September 2007
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // XrdProofSched //
15 // //
16 // Authors: G. Ganis, CERN, 2007 //
17 // //
18 // Interface for a PROOF scheduler. //
19 // Alternative scheduler implementations should be provided as shared //
20 // library containing an implementation of this class. The library //
21 // must also provide a function to load an instance of this class //
22 // with the following signature (see commented example below): //
23 // extern "C" { //
24 // XrdProofSched *XrdgetProofSched(const char *cfg, //
25 // XrdProofdManager *mgr, //
26 // XrdProofGroupMgr *grpmgr, //
27 // XrdSysError *edest); //
28 // } //
29 // Here 'cfg' is the xrootd config file where directives to configure //
30 // the scheduler are specified, 'mgr' is the instance of the cluster //
31 // manager from where the scheduler can get info about the available //
32 // workers and their status, 'grpmgr' is the instance of the group //
33 // bringing the definition of the groups for this run, and 'edest' is //
34 // instance of the error logger to be used. //
35 // The scheduler is identified by a name of max 16 chars. //
36 // //
37 //////////////////////////////////////////////////////////////////////////
38 
39 #include <list>
40 
41 #include "XProofProtocol.h"
42 #include "XrdProofdManager.h"
43 #include "XrdProofdNetMgr.h"
44 #include "XrdProofdProofServMgr.h"
45 #include "XrdProofGroup.h"
46 #include "XrdProofSched.h"
47 #include "XrdProofdProofServ.h"
48 #include "XrdProofWorker.h"
49 
50 #include "XrdOuc/XrdOucString.hh"
51 #include "XrdOuc/XrdOucStream.hh"
52 
53 #include "XpdSysError.h"
54 
55 // Tracing
56 #include "XrdProofdTrace.h"
57 
58 //
59 // Example of scheduler loader for an implementation called XrdProofSchedDyn
60 //
61 // extern "C" {
62 // //______________________________________________________________________________
63 // XrdProofSched *XrdgetProofSched(const char *cfg, XrdProofdManager *mgr,
64 // XrdProofGroupMgr *grpmgr, XrdSysError *edest)
65 // {
66 // // This scheduler is meant to live in a shared library. The interface below is
67 // // used by the server to obtain a copy of the scheduler object.
68 //
69 // XrdProofSchedDyn *pss = new XrdProofSchedDyn(mgr, grpmgr, edest);
70 // if (pss && pss->Config(cfg) == 0) {
71 // return (XrdProofSched *) pss;
72 // }
73 // if (pss)
74 // delete pss;
75 // return (XrdProofSched *)0;
76 // }}
77 
78 //--------------------------------------------------------------------------
79 //
80 // XrdProofSchedCron
81 //
82 // Scheduler thread
83 //
84 ////////////////////////////////////////////////////////////////////////////////
85 /// This is an endless loop to check the system periodically or when
86 /// triggered via a message in a dedicated pipe
87 
88 void *XrdProofSchedCron(void *p)
89 {
90  XPDLOC(SCHED, "SchedCron")
91 
92  XrdProofSched *sched = (XrdProofSched *)p;
93  if (!(sched)) {
94  TRACE(XERR, "undefined scheduler: cannot start");
95  return (void *)0;
96  }
97 
98  // Time of last session check
99  int lastcheck = time(0), ckfreq = sched->CheckFrequency(), deltat = 0;
100  while(1) {
101  // We wait for processes to communicate a session status change
102  if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
103  deltat = ckfreq;
104  int pollRet = sched->Pipe()->Poll(deltat);
105 
106  if (pollRet > 0) {
107  // Read message
108  XpdMsg msg;
109  int rc = 0;
110  if ((rc = sched->Pipe()->Recv(msg)) != 0) {
111  XPDERR("problems receiving message; errno: "<<-rc);
112  continue;
113  }
114  // Parse type
115  XrdOucString buf;
116  if (msg.Type() == XrdProofSched::kReschedule) {
117 
118  TRACE(ALL, "received kReschedule");
119 
120  // Reschedule
121  sched->Reschedule();
122 
123  } else {
124 
125  TRACE(XERR, "unknown type: "<<msg.Type());
126  continue;
127  }
128  } else {
129  // Notify
130  TRACE(ALL, "running regular checks");
131  // Run regular rescheduling checks
132  sched->Reschedule();
133  // Remember when ...
134  lastcheck = time(0);
135  }
136  }
137 
138  // Should never come here
139  return (void *)0;
140 }
141 
142 ////////////////////////////////////////////////////////////////////////////////
143 /// Compare two workers for sorting
144 
145 static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
146 {
147  return ((lhs && rhs &&
148  lhs->GetNActiveSessions() < rhs->GetNActiveSessions()) ? 1 : 0);
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Generic directive processor
153 
154 int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
155 {
156  if (!d || !(d->fVal))
157  // undefined inputs
158  return -1;
159 
160  return ((XrdProofSched *)d->fVal)->ProcessDirective(d, val, cfg, rcf);
161 }
162 
163 ////////////////////////////////////////////////////////////////////////////////
164 /// Constructor
165 
167  XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr,
168  const char *cfn, XrdSysError *e)
169  : XrdProofdConfig(cfn, e)
170 {
171  fValid = 1;
172  fMgr = mgr;
173  fGrpMgr = grpmgr;
174  fNextWrk = 1;
175  fEDest = e;
176  fUseFIFO = 0;
177  ResetParameters();
178 
179  memset(fName, 0, kXPSMXNMLEN);
180  if (name)
181  memcpy(fName, name, kXPSMXNMLEN-1);
182 
183  // Configuration directives
185 }
186 
187 ////////////////////////////////////////////////////////////////////////////////
188 /// Register directives for configuration
189 
191 {
192  Register("schedparam", new XrdProofdDirective("schedparam", this, &DoDirectiveClass));
193  Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
194 }
195 
196 ////////////////////////////////////////////////////////////////////////////////
197 /// Update the priorities of the active sessions.
198 
200  char *val, XrdOucStream *cfg, bool rcf)
201 {
202  XPDLOC(SCHED, "Sched::DoDirective")
203 
204  if (!d)
205  // undefined inputs
206  return -1;
207 
208  if (d->fName == "schedparam") {
209  return DoDirectiveSchedParam(val, cfg, rcf);
210  } else if (d->fName == "resource") {
211  return DoDirectiveResource(val, cfg, rcf);
212  }
213  TRACE(XERR,"unknown directive: "<<d->fName);
214  return -1;
215 }
216 
217 
218 ////////////////////////////////////////////////////////////////////////////////
219 /// Reset values for the configurable parameters
220 
222 {
223  fMaxSessions = -1;
224  fMaxRunning = -1;
225  fWorkerMax = -1;
227  fOptWrksPerUnit = 1;
228  fMinForQuery = 0;
229  fNodesFraction = 0.5;
230  fCheckFrequency = 30;
231 }
232 
233 ////////////////////////////////////////////////////////////////////////////////
234 /// Configure this instance using the content of file 'cfn'.
235 /// Return 0 on success, -1 in case of failure (file does not exists
236 /// or containing incoherent information).
237 
239 {
240  XPDLOC(SCHED, "Sched::Config")
241 
242  // Run first the configurator
243  if (XrdProofdConfig::Config(rcf) != 0) {
244  XPDERR("problems parsing file ");
245  fValid = 0;
246  return -1;
247  }
248 
249  int rc = 0;
250 
251  XrdOucString msg;
252 
253  // Notify
254  XPDFORM(msg, "maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
256  TRACE(DBG, msg);
257 
258  if (!rcf) {
259  // Start cron thread
260  pthread_t tid;
261  if (XrdSysThread::Run(&tid, XrdProofSchedCron,
262  (void *)this, 0, "Scheduler cron thread") != 0) {
263  XPDERR("could not start cron thread");
264  fValid = 0;
265  return 0;
266  }
267  TRACE(ALL, "cron thread started");
268  }
269 
270  // Done
271  return rc;
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Queue a query in the session; if this is the first querym enqueue also
276 /// the session
277 
279 {
280  XPDDOM(SCHED)
281 
282  if (xps->Enqueue(query) == 1) {
283  std::list<XrdProofdProofServ *>::iterator ii;
284  for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
285  if ((*ii)->Status() == kXPD_running) break;
286  }
287  if (ii != fQueue.end()) {
288  fQueue.insert(ii, xps);
289  } else {
290  fQueue.push_back(xps);
291  }
292  }
293  if (TRACING(DBG)) DumpQueues("Enqueue");
294 
295  return 0;
296 }
297 
298 ////////////////////////////////////////////////////////////////////////////////
299 /// Dump the content of the waiting sessions queue
300 
301 void XrdProofSched::DumpQueues(const char *prefix)
302 {
303  XPDLOC(SCHED, "DumpQueues")
304 
305  TRACE(ALL," ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306  if (prefix) TRACE(ALL, " +++ Called from: "<<prefix);
307  TRACE(ALL," +++ # of waiting sessions: "<<fQueue.size());
308  std::list<XrdProofdProofServ *>::iterator ii;
309  int i = 0;
310  for (ii = fQueue.begin(); ii != fQueue.end(); ii++) {
311  TRACE(ALL," +++ #"<<++i<<" client:"<< (*ii)->Client()<<" # of queries: "<< (*ii)->Queries()->size());
312  }
313  TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
314 
315  return;
316 }
317 
318 ////////////////////////////////////////////////////////////////////////////////
319 /// Get first valid session.
320 /// The dataset information can be used to assign workers.
321 
323 {
324  XPDDOM(SCHED)
325 
326  if (fQueue.empty())
327  return 0;
328  XrdProofdProofServ *xps = fQueue.front();
329  while (xps && !(xps->IsValid())) {
330  fQueue.remove(xps);
331  xps = fQueue.front();
332  }
333  if (TRACING(DBG)) DumpQueues("FirstSession");
334  // The session will be removed after workers are assigned
335  return xps;
336 }
337 
338 ////////////////////////////////////////////////////////////////////////////////
339 /// Calculate the number of workers to be used given the state of the cluster
340 
342 {
343  XPDLOC(SCHED, "Sched::GetNumWorkers")
344 
345  // Go through the list of hosts and see how many CPUs are not used.
346  int nFreeCPUs = 0;
347  std::list<XrdProofWorker *> *wrks = fMgr->NetMgr()->GetActiveWorkers();
348  std::list<XrdProofWorker *>::iterator iter;
349  for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
350  TRACE(DBG, (*iter)->fImage<<" : # act: "<<(*iter)->fProofServs.size());
351  if ((*iter)->fType != 'M' && (*iter)->fType != 'S'
352  && (int) (*iter)->fProofServs.size() < fOptWrksPerUnit)
353  // add number of free slots
354  nFreeCPUs += fOptWrksPerUnit - (*iter)->fProofServs.size();
355  }
356 
357  float priority = 1;
358  XrdProofGroup *grp = 0;
359  if (fGrpMgr && xps->Group())
360  grp = fGrpMgr->GetGroup(xps->Group());
361  if (grp) {
362  std::list<XrdProofdProofServ *> *sessions = fMgr->SessionMgr()->ActiveSessions();
363  std::list<XrdProofdProofServ *>::iterator sesIter;
364  float summedPriority = 0;
365  for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
366  if ((*sesIter)->Group()) {
367  XrdProofGroup *g = fGrpMgr->GetGroup((*sesIter)->Group());
368  if (g)
369  summedPriority += g->Priority();
370  }
371  }
372  if (summedPriority > 0)
373  priority = (grp->Priority() * sessions->size()) / summedPriority;
374  }
375 
376  int nWrks = (int)(nFreeCPUs * fNodesFraction * priority);
377  if (nWrks <= fMinForQuery) {
378  nWrks = fMinForQuery;
379  } else if (nWrks >= (int) wrks->size()) {
380  nWrks = wrks->size() - 1;
381  }
382  TRACE(DBG, nFreeCPUs<<" : "<< nWrks);
383 
384  return nWrks;
385 }
386 
387 ////////////////////////////////////////////////////////////////////////////////
388 /// Get a list of workers that can be used by session 'xps'.
389 /// The return code is:
390 /// -1 Some failure occured; cannot continue
391 /// 0 A new list has been assigned to the session 'xps' and
392 /// returned in 'wrks'
393 /// 1 The list currently assigned to the session is the one
394 /// to be used
395 /// 2 No worker could be assigned now; session should be queued
396 
398  std::list<XrdProofWorker *> *wrks,
399  const char *querytag)
400 {
401  XPDLOC(SCHED, "Sched::GetWorkers")
402 
403  int rc = 0;
404 
405  TRACE(REQ, "enter: query tag: "<< ((querytag) ? querytag : ""));
406 
407  // Static or dynamic
408  bool isDynamic = 1;
409  if (querytag && !strncmp(querytag, XPD_GW_Static, strlen(XPD_GW_Static) - 1)) {
410  isDynamic = 0;
411  }
412 
413  // Check if the current assigned list of workers is valid
414  if (querytag && xps && xps->Workers()->Num() > 0) {
415  if (TRACING(REQ)) xps->DumpQueries();
416  const char *cqtag = (xps->CurrentQuery()) ? xps->CurrentQuery()->GetTag() : "undef";
417  TRACE(REQ, "current query tag: "<< cqtag );
418  if (!strcmp(querytag, cqtag)) {
419  // Remove the query to be processed from the queue
420  xps->RemoveQuery(cqtag);
421  TRACE(REQ, "current assignment for session "<< xps->SrvPID() << " is valid");
422  // Current assignement is valid
423  return 1;
424  }
425  }
426 
427  // The caller must provide a list where to store the result
428  if (!wrks)
429  return -1;
430 
431  // If the session has already assigned workers or there are
432  // other queries waiting - just enqueue
433  // FIFO is enforced by dynamic mode so it is checked just in case
434  if (isDynamic) {
435  if (fUseFIFO && xps->Workers()->Num() > 0) {
436  if (!xps->GetQuery(querytag))
437  Enqueue(xps, new XrdProofQuery(querytag));
438  if (TRACING(DBG)) xps->DumpQueries();
439  // Signal enqueing
440  TRACE(REQ, "session has already assigned workers: enqueue");
441  return 2;
442  }
443  }
444 
445  // The current, full list
446  std::list<XrdProofWorker *> *acws = 0;
447 
448  if (!fMgr || !(acws = fMgr->NetMgr()->GetActiveWorkers()))
449  return -1;
450 
451  // Point to the master element
452  XrdProofWorker *mst = acws->front();
453  if (!mst)
454  return -1;
455 
456  if (fWorkerSel == kSSOLoadBased) {
457  // Dynamic scheduling: the scheduler will determine the #workers
458  // to be used based on the current load and assign the least loaded ones
459 
460  // Sort the workers by the load
462 
463  // Get the advised number
464  int nw = GetNumWorkers(xps);
465 
466  if (nw > 0) {
467  // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
468  wrks->push_back(mst);
469 
470  std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
471  while (nw--) {
472  nxWrk++;
473  // Add export version of the info
474  // (stats are updated in XrdProofdProtocol::GetWorkers)
475  wrks->push_back(*nxWrk);
476  }
477  } else {
478  // if no workers were assigned
479  // enqueue or send a list with only the master (processing refused)
480  if (fUseFIFO) {
481  // Enqueue the query/session
482  // the returned list of workers was not filled
483  if (!xps->GetQuery(querytag))
484  Enqueue(xps, new XrdProofQuery(querytag));
485  if (TRACING(DBG)) xps->DumpQueries();
486  // Signal enqueing
487  TRACE(REQ, "no workers currently available: session enqueued");
488  return 2;
489  } else {
490  // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
491  wrks->push_back(mst);
492  }
493  }
494  // Done
495  return 0;
496  }
497 
498  // Check if the check on the max number of sessions is enabled
499  // We need at least 1 master and a worker
500  std::list<XrdProofWorker *> *acwseff = 0;
501  int maxnum = (querytag && strcmp(querytag, XPD_GW_Static)) ? fMaxRunning : fMaxSessions;
502  bool ok = 0;
503  if (isDynamic) {
504  if (maxnum > 0) {
505  acwseff = new std::list<XrdProofWorker *>;
506  std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
507  if ((*xWrk)->Active() < maxnum) {
508  acwseff->push_back(*xWrk);
509  xWrk++;
510  for (; xWrk != acws->end(); xWrk++) {
511  if ((*xWrk)->Active() < maxnum) {
512  acwseff->push_back(*xWrk);
513  ok = 1;
514  }
515  }
516  } else if (!fUseFIFO) {
517  TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
518  }
519  // Check the result
520  if (!ok) { delete acwseff; acwseff = 0; }
521  acws = acwseff;
522  }
523  } else {
524  if (maxnum > 0) {
525  // This is over-conservative for sub-selection (random, or round-robin options)
526  // A better solution should be implemented for that.
527  int nactsess = mst->GetNActiveSessions();
528  TRACE(REQ, "act sess ... " << nactsess);
529  if (nactsess < maxnum) {
530  ok = 1;
531  } else if (!fUseFIFO) {
532  TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
533  }
534  // Check the result
535  if (!ok) acws = acwseff;
536  }
537  }
538 
539  // Make sure that something has been found
540  if (!acws || (acws && acws->size() <= 1)) {
541  if (fUseFIFO) {
542  // Enqueue the query/session
543  // the returned list of workers was not filled
544  if (!xps->GetQuery(querytag))
545  Enqueue(xps, new XrdProofQuery(querytag));
546  if (TRACING(REQ)) xps->DumpQueries();
547  // Notify enqueing
548  TRACE(REQ, "no workers currently available: session enqueued");
549  SafeDel(acwseff);
550  return 2;
551  } else {
552  TRACE(XERR, "no worker available: do nothing");
553  SafeDel(acwseff);
554  return -1;
555  }
556  }
557 
558  // If a non-dynamic session already has assigned workers just return
559  if (!isDynamic && (xps->Workers()->Num() > 0)) {
560  // Current assignement is valid
561  SafeDel(acwseff);
562  return 1;
563  }
564 
565  // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
566  wrks->push_back(mst);
567 
568  if (fWorkerMax > 0 && fWorkerMax < (int) acws->size()) {
569 
570  // Now the workers
571  if (fWorkerSel == kSSORandom) {
572  // Random: the first time init the machine
573  static bool rndmInit = 0;
574  if (!rndmInit) {
575  const char *randdev = "/dev/urandom";
576  int fd;
577  unsigned int seed;
578  if ((fd = open(randdev, O_RDONLY)) != -1) {
579  if (read(fd, &seed, sizeof(seed)) != sizeof(seed)) {
580  TRACE(XERR, "problems reading seed; errno: "<< errno);
581  }
582  srand(seed);
583  close(fd);
584  rndmInit = 1;
585  }
586  }
587  // Selection
588  int nwt = acws->size();
589  std::vector<int> walloc(nwt, 0);
590  std::vector<XrdProofWorker *> vwrk(nwt);
591 
592  // Fill the vector with cumulative number of actives
593  int namx = -1;
594  int i = 1;
595  std::list<XrdProofWorker *>::iterator iwk = acws->begin();
596  iwk++; // Skip master
597  for ( ; iwk != acws->end(); iwk++) {
598  vwrk[i] = *iwk;
599  int na = (*iwk)->Active();
600  printf(" %d", na);
601  walloc[i] = na + walloc[i-1];
602  i++;
603  namx = (na > namx) ? na : namx;
604  }
605  printf("\n");
606  // Normalize
607  for (i = 1; i < nwt; i++) {
608  if (namx > 0)
609  walloc[i] = namx*i - walloc[i] + i;
610  else
611  walloc[i] = i;
612  }
613  int natot = walloc[nwt - 1];
614 
615  int nw = fWorkerMax;
616  while (nw--) {
617  // Normalized number
618  int maxAtt = 10000, natt = 0;
619  int iw = -1;
620  while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
621  int jw = rand() % natot;
622  for (i = 0; i < nwt; i++) {
623  if (jw < walloc[i]) {
624  // re-normalize the weights for the higher index entries
625  int j = 0;
626  for (j = i; j < nwt; j++) {
627  if (walloc[j] > 0)
628  walloc[j]--;
629  }
630  natot--;
631  iw = i;
632  break;
633  }
634  }
635  }
636 
637  if (iw > -1) {
638  // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
639  wrks->push_back(vwrk[iw]);
640  } else {
641  // Unable to generate the right number
642  TRACE(XERR, "random generation failed");
643  rc = -1;
644  break;
645  }
646  }
647 
648  } else {
649  if (fNextWrk >= (int) acws->size())
650  fNextWrk = 1;
651  int iw = 0;
652  std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
653  int nw = fWorkerMax;
654  while (nw--) {
655  while (iw != fNextWrk) {
656  nxWrk++;
657  iw++;
658  }
659  // Add export version of the info
660  // (stats are updated in XrdProofdProtocol::GetWorkers)
661  wrks->push_back(*nxWrk);
662  // Update next worker index
663  fNextWrk++;
664  if (fNextWrk >= (int) acws->size()) {
665  fNextWrk = 1;
666  iw = 0;
667  nxWrk = acws->begin();
668  }
669  }
670  }
671  } else {
672  // The full list
673  std::list<XrdProofWorker *>::iterator iw = acws->begin();
674  iw++;
675  while (iw != acws->end()) {
676  // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
677  wrks->push_back(*iw);
678  iw++;
679  }
680  }
681 
682  // Make sure that something has been found
683  if (wrks->size() <= 1) {
684  TRACE(XERR, "no worker found: do nothing");
685  rc = -1;
686  }
687 
688  // Cleanup
689  if (acwseff) { delete acwseff; acwseff = 0; }
690 
691  return rc;
692 }
693 
694 ////////////////////////////////////////////////////////////////////////////////
695 /// Consider starting some query from the queue.
696 /// to be called after some resources are free (e.g. by a finished query)
697 /// This method is doing the full transaction of finding the session to
698 /// resume, assigning it workers and sending a resume message.
699 /// In this way there is not possibility of interference with other GetWorkers
700 /// return 0 in case of success and -1 in case of an error
701 
703 {
704  XPDLOC(SCHED, "Sched::Reschedule")
705 
706  if (fUseFIFO && TRACING(DBG)) DumpQueues("Reschedule");
707 
708  if (!fQueue.empty()) {
709  // Any advanced scheduling algorithms can be done here
710 
712  if (!xps) {
713  TRACE(XERR, "got undefined session: protocol error!");
714  return -1;
715  }
716  XrdOucString wrks;
717  // Call GetWorkers in the manager to mark the assignment.
718  XrdOucString qtag;
719  if (xps && xps->CurrentQuery()) {
720  qtag = xps->CurrentQuery()->GetTag();
721  if (qtag.beginswith(XPD_GW_Static)) {
722  qtag = XPD_GW_Static;
723  qtag.replace(":","");
724  }
725  }
726  if (fMgr->GetWorkers(wrks, xps, qtag.c_str()) < 0 ) {
727  // Something wrong
728  TRACE(XERR, "failure from GetWorkers: protocol error!");
729  return -1;
730  } else {
731  // Send buffer
732  // if workers were assigned remove the session from the queue
733  if (wrks.length() > 0 && wrks != XPD_GW_QueryEnqueued) {
734  // Send the resume message: the workers will be send in response to a
735  // GetWorkers message
736  xps->Resume();
737  // Acually remove the session from the queue
738  fQueue.remove(xps);
739  // Put the session at the end of the queue
740  // > 1 because the query is kept in the queue until 2nd GetWorkers
741  if (xps->Queries()->size() > 1)
742  fQueue.push_back(xps);
743  if (TRACING(DBG)) DumpQueues("Reschedule 2");
744  } // else add workers to the running sessions (once it's possible)
745 
746  }
747 
748  } //else add workers to the running sessions (once it's possible)
749 
750  return 0;
751 }
752 
753 ////////////////////////////////////////////////////////////////////////////////
754 /// Fill sbuf with some info about our current status
755 
756 int XrdProofSched::ExportInfo(XrdOucString &sbuf)
757 {
758  // Selection type
759  const char *osel[] = { "all", "round-robin", "random", "load-based"};
760  sbuf += "Selection: ";
761  sbuf += osel[fWorkerSel+1];
762  if (fWorkerSel > -1) {
763  sbuf += ", max workers: ";
764  sbuf += fWorkerMax; sbuf += " &";
765  }
766 
767  // The full list
768  std::list<XrdProofWorker *> *acws = fMgr->NetMgr()->GetActiveWorkers();
769  std::list<XrdProofWorker *>::iterator iw;
770  for (iw = acws->begin(); iw != acws->end(); ++iw) {
771  sbuf += (*iw)->fType;
772  sbuf += ": "; sbuf += (*iw)->fHost;
773  if ((*iw)->fPort > -1) {
774  sbuf += ":"; sbuf += (*iw)->fPort;
775  } else
776  sbuf += " ";
777  sbuf += " sessions: "; sbuf += (*iw)->Active();
778  sbuf += " &";
779  }
780 
781  // Done
782  return 0;
783 }
784 
785 ////////////////////////////////////////////////////////////////////////////////
786 /// Update the priorities of the active sessions.
787 
789  char *val, XrdOucStream *cfg, bool rcf)
790 {
791  XPDLOC(SCHED, "Sched::ProcessDirective")
792 
793  if (!d)
794  // undefined inputs
795  return -1;
796 
797  if (d->fName == "schedparam") {
798  return DoDirectiveSchedParam(val, cfg, rcf);
799  } else if (d->fName == "resource") {
800  return DoDirectiveResource(val, cfg, rcf);
801  }
802  TRACE(XERR, "unknown directive: "<<d->fName);
803  return -1;
804 }
805 
806 ////////////////////////////////////////////////////////////////////////////////
807 /// Process 'schedparam' directive
808 
809 int XrdProofSched::DoDirectiveSchedParam(char *val, XrdOucStream *cfg, bool)
810 {
811  XPDLOC(SCHED, "Sched::DoDirectiveSchedParam")
812 
813  if (!val || !cfg)
814  // undefined inputs
815  return -1;
816 
817  // Get the parameters
818  while (val && val[0]) {
819  XrdOucString s(val);
820  if (s.beginswith("wmx:")) {
821  s.replace("wmx:","");
822  fWorkerMax = strtol(s.c_str(), (char **)0, 10);
823  } else if (s.beginswith("mxsess:")) {
824  s.replace("mxsess:","");
825  fMaxSessions = strtol(s.c_str(), (char **)0, 10);
826  } else if (s.beginswith("mxrun:")) {
827  s.replace("mxrun:","");
828  fMaxRunning = strtol(s.c_str(), (char **)0, 10);
829  } else if (s.beginswith("selopt:")) {
830  if (s.endswith("random"))
832  else if (s.endswith("load"))
834  else
836  } else if (s.beginswith("fraction:")) {
837  s.replace("fraction:","");
838  fNodesFraction = strtod(s.c_str(), (char **)0);
839  } else if (s.beginswith("optnwrks:")) {
840  s.replace("optnwrks:","");
841  fOptWrksPerUnit = strtol(s.c_str(), (char **)0, 10);
842  } else if (s.beginswith("minforquery:")) {
843  s.replace("minforquery:","");
844  fMinForQuery = strtol(s.c_str(), (char **)0, 10);
845  } else if (s.beginswith("queue:")) {
846  if (s.endswith("fifo")) {
847  fUseFIFO = 1;
848  }
849  } else if (strncmp(val, "default", 7)) {
850  // This line applies to another scheduler
851  ResetParameters();
852  break;
853  }
854  val = cfg->GetWord();
855  }
856 
857  // If the max number of sessions is limited then there is no lower bound
858  // the number of workers per query
859  if (fMaxSessions > 0) {
860  fMinForQuery = 0;
861  // And there is an upper limit on the number of running sessions
862  if (fMaxRunning < 0 || fMaxRunning > fMaxSessions)
864  }
865 
866  // The FIFO size make sense only in non-load based mode
867  if (fWorkerSel == kSSOLoadBased && fMaxRunning > 0) {
868  TRACE(ALL, "WARNING: in Load-Based mode the max number of sessions"
869  " to be run is determined dynamically");
870  }
871 
872  return 0;
873 }
874 
875 ////////////////////////////////////////////////////////////////////////////////
876 /// Process 'resource' directive
877 
878 int XrdProofSched::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
879 {
880  if (!val || !cfg)
881  // undefined inputs
882  return -1;
883 
884  // Get the scheduler name
885  if (strncmp(val, "static", 6) && strncmp(val, "default", 7))
886  return 0;
887  // Get the values
888  while ((val = cfg->GetWord()) && val[0]) {
889  XrdOucString s(val);
890  if (s.beginswith("wmx:")) {
891  s.replace("wmx:","");
892  fWorkerMax = strtol(s.c_str(), (char **)0, 10);
893  } else if (s.beginswith("mxsess:")) {
894  s.replace("mxsess:","");
895  fMaxSessions = strtol(s.c_str(), (char **)0, 10);
896  } else if (s.beginswith("selopt:")) {
897  if (s.endswith("random"))
899  else
901  }
902  }
903  return 0;
904 }
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
double read(const std::string &file_name)
reading
int Resume()
Send a resume message to the this session.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
#define SafeDel(x)
Definition: XrdProofdAux.h:335
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
int Enqueue(XrdProofQuery *q)
#define TRACING(x)
int nwt
Definition: THbookFile.cxx:91
std::list< XrdProofdProofServ * > * ActiveSessions()
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define TRACE(Flag, Args)
Definition: TGHtml.h:124
const char *const XPD_GW_QueryEnqueued
char fName[kXPSMXNMLEN]
XrdProofGroupMgr * fGrpMgr
XrdProofdProofServMgr * SessionMgr() const
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
float Priority() const
Definition: XrdProofGroup.h:80
#define kXPSMXNMLEN
Definition: XrdProofSched.h:53
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
int Type() const
Definition: XrdProofdAux.h:194
const char * GetTag()
int Recv(XpdMsg &msg)
Recv message from the pipe.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
std::list< XrdProofQuery * > * Queries() const
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process 'schedparam' directive.
std::map< std::string, std::string >::const_iterator iter
Definition: TAlienJob.cxx:54
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
#define XPDLOC(d, x)
XrdProofdPipe * Pipe()
virtual int Config(bool rcf=0)
Configure this instance using the content of file 'cfn'.
XrdProofdManager * fMgr
#define XPDERR(x)
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
std::list< XrdProofdProofServ * > fQueue
#define XPDDOM(d)
int CheckFrequency() const
virtual void ResetParameters()
Reset values for the configurable parameters.
#define XrdSysError
Definition: XpdSysError.h:8
XrdProofdNetMgr * NetMgr() const
static void Sort(std::list< XrdProofWorker * > *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
XrdSysError * fEDest
#define XPDFORM
Definition: XrdProofdAux.h:381
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
XrdOucHash< XrdProofWorker > * Workers() const
#define name(a, b)
Definition: linkTestLib0.cpp:5
virtual void RegisterDirectives()
Register directives for configuration.
double fNodesFraction
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker * > *, const char *)
Get a list of workers that can be used by session 'xps'.
const char *const XPD_GW_Static
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
XrdOucString fName
Definition: XrdProofdAux.h:111
virtual int Reschedule()
Consider starting some query from the queue.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
virtual int Config(bool rcf=0)
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdSysError *e=0)
Constructor.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
void Register(const char *dname, XrdProofdDirective *d)
const char * Group() const
XrdProofQuery * CurrentQuery()