Logo ROOT   6.18/05
Reference Guide
XrdProofSched.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis September 2007
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// XrdProofSched //
15// //
16// Authors: G. Ganis, CERN, 2007 //
17// //
18// Interface for a PROOF scheduler. //
19// Alternative scheduler implementations should be provided as shared //
20// library containing an implementation of this class. The library //
21// must also provide a function to load an instance of this class //
22// with the following signature (see commented example below): //
23// extern "C" { //
24// XrdProofSched *XrdgetProofSched(const char *cfg, //
25// XrdProofdManager *mgr, //
26// XrdProofGroupMgr *grpmgr, //
27// XrdSysError *edest); //
28// } //
29// Here 'cfg' is the xrootd config file where directives to configure //
30// the scheduler are specified, 'mgr' is the instance of the cluster //
31// manager from where the scheduler can get info about the available //
32// workers and their status, 'grpmgr' is the instance of the group //
33// bringing the definition of the groups for this run, and 'edest' is //
34// instance of the error logger to be used. //
35// The scheduler is identified by a name of max 16 chars. //
36// //
37//////////////////////////////////////////////////////////////////////////
38
39#include <list>
40
41#include "XProofProtocol.h"
42#include "XrdProofdManager.h"
43#include "XrdProofdNetMgr.h"
45#include "XrdProofGroup.h"
46#include "XrdProofSched.h"
47#include "XrdProofdProofServ.h"
48#include "XrdProofWorker.h"
49
50#include "XrdOuc/XrdOucString.hh"
51#include "XrdOuc/XrdOucStream.hh"
52
53#include "XpdSysError.h"
54
55// Tracing
56#include "XrdProofdTrace.h"
57
58//
59// Example of scheduler loader for an implementation called XrdProofSchedDyn
60//
61// extern "C" {
62// //______________________________________________________________________________
63// XrdProofSched *XrdgetProofSched(const char *cfg, XrdProofdManager *mgr,
64// XrdProofGroupMgr *grpmgr, XrdSysError *edest)
65// {
66// // This scheduler is meant to live in a shared library. The interface below is
67// // used by the server to obtain a copy of the scheduler object.
68//
69// XrdProofSchedDyn *pss = new XrdProofSchedDyn(mgr, grpmgr, edest);
70// if (pss && pss->Config(cfg) == 0) {
71// return (XrdProofSched *) pss;
72// }
73// if (pss)
74// delete pss;
75// return (XrdProofSched *)0;
76// }}
77
78//--------------------------------------------------------------------------
79//
80// XrdProofSchedCron
81//
82// Scheduler thread
83//
84////////////////////////////////////////////////////////////////////////////////
85/// This is an endless loop to check the system periodically or when
86/// triggered via a message in a dedicated pipe
87
88void *XrdProofSchedCron(void *p)
89{
90 XPDLOC(SCHED, "SchedCron")
91
92 XrdProofSched *sched = (XrdProofSched *)p;
93 if (!(sched)) {
94 TRACE(XERR, "undefined scheduler: cannot start");
95 return (void *)0;
96 }
97
98 // Time of last session check
99 int lastcheck = time(0), ckfreq = sched->CheckFrequency(), deltat = 0;
100 while(1) {
101 // We wait for processes to communicate a session status change
102 if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
103 deltat = ckfreq;
104 int pollRet = sched->Pipe()->Poll(deltat);
105
106 if (pollRet > 0) {
107 // Read message
108 XpdMsg msg;
109 int rc = 0;
110 if ((rc = sched->Pipe()->Recv(msg)) != 0) {
111 XPDERR("problems receiving message; errno: "<<-rc);
112 continue;
113 }
114 // Parse type
115 XrdOucString buf;
116 if (msg.Type() == XrdProofSched::kReschedule) {
117
118 TRACE(ALL, "received kReschedule");
119
120 // Reschedule
121 sched->Reschedule();
122
123 } else {
124
125 TRACE(XERR, "unknown type: "<<msg.Type());
126 continue;
127 }
128 } else {
129 // Notify
130 TRACE(ALL, "running regular checks");
131 // Run regular rescheduling checks
132 sched->Reschedule();
133 // Remember when ...
134 lastcheck = time(0);
135 }
136 }
137
138 // Should never come here
139 return (void *)0;
140}
141
142////////////////////////////////////////////////////////////////////////////////
143/// Compare two workers for sorting
144
145static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
146{
147 return ((lhs && rhs &&
148 lhs->GetNActiveSessions() < rhs->GetNActiveSessions()) ? 1 : 0);
149}
150
151////////////////////////////////////////////////////////////////////////////////
152/// Generic directive processor
153
154int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
155{
156 if (!d || !(d->fVal))
157 // undefined inputs
158 return -1;
159
160 return ((XrdProofSched *)d->fVal)->ProcessDirective(d, val, cfg, rcf);
161}
162
163////////////////////////////////////////////////////////////////////////////////
164/// Constructor
165
168 const char *cfn, XrdSysError *e)
169 : XrdProofdConfig(cfn, e)
170{
171 fValid = 1;
172 fMgr = mgr;
173 fGrpMgr = grpmgr;
174 fNextWrk = 1;
175 fEDest = e;
176 fUseFIFO = 0;
178
179 memset(fName, 0, kXPSMXNMLEN);
180 if (name)
181 memcpy(fName, name, kXPSMXNMLEN-1);
182
183 // Configuration directives
185}
186
187////////////////////////////////////////////////////////////////////////////////
188/// Register directives for configuration
189
191{
192 Register("schedparam", new XrdProofdDirective("schedparam", this, &DoDirectiveClass));
193 Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
194}
195
196////////////////////////////////////////////////////////////////////////////////
197/// Update the priorities of the active sessions.
198
200 char *val, XrdOucStream *cfg, bool rcf)
201{
202 XPDLOC(SCHED, "Sched::DoDirective")
203
204 if (!d)
205 // undefined inputs
206 return -1;
207
208 if (d->fName == "schedparam") {
209 return DoDirectiveSchedParam(val, cfg, rcf);
210 } else if (d->fName == "resource") {
211 return DoDirectiveResource(val, cfg, rcf);
212 }
213 TRACE(XERR,"unknown directive: "<<d->fName);
214 return -1;
215}
216
217
218////////////////////////////////////////////////////////////////////////////////
219/// Reset values for the configurable parameters
220
222{
223 fMaxSessions = -1;
224 fMaxRunning = -1;
225 fWorkerMax = -1;
227 fOptWrksPerUnit = 1;
228 fMinForQuery = 0;
229 fNodesFraction = 0.5;
230 fCheckFrequency = 30;
231}
232
233////////////////////////////////////////////////////////////////////////////////
234/// Configure this instance using the content of file 'cfn'.
235/// Return 0 on success, -1 in case of failure (file does not exists
236/// or containing incoherent information).
237
239{
240 XPDLOC(SCHED, "Sched::Config")
241
242 // Run first the configurator
243 if (XrdProofdConfig::Config(rcf) != 0) {
244 XPDERR("problems parsing file ");
245 fValid = 0;
246 return -1;
247 }
248
249 int rc = 0;
250
251 XrdOucString msg;
252
253 // Notify
254 XPDFORM(msg, "maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
256 TRACE(DBG, msg);
257
258 if (!rcf) {
259 // Start cron thread
260 pthread_t tid;
261 if (XrdSysThread::Run(&tid, XrdProofSchedCron,
262 (void *)this, 0, "Scheduler cron thread") != 0) {
263 XPDERR("could not start cron thread");
264 fValid = 0;
265 return 0;
266 }
267 TRACE(ALL, "cron thread started");
268 }
269
270 // Done
271 return rc;
272}
273
274////////////////////////////////////////////////////////////////////////////////
275/// Queue a query in the session; if this is the first querym enqueue also
276/// the session
277
279{
280 XPDDOM(SCHED)
281
282 if (xps->Enqueue(query) == 1) {
283 std::list<XrdProofdProofServ *>::iterator ii;
284 for (ii = fQueue.begin(); ii != fQueue.end(); ++ii) {
285 if ((*ii)->Status() == kXPD_running) break;
286 }
287 if (ii != fQueue.end()) {
288 fQueue.insert(ii, xps);
289 } else {
290 fQueue.push_back(xps);
291 }
292 }
293 if (TRACING(DBG)) DumpQueues("Enqueue");
294
295 return 0;
296}
297
298////////////////////////////////////////////////////////////////////////////////
299/// Dump the content of the waiting sessions queue
300
301void XrdProofSched::DumpQueues(const char *prefix)
302{
303 XPDLOC(SCHED, "DumpQueues")
304
305 TRACE(ALL," ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306 if (prefix) TRACE(ALL, " +++ Called from: "<<prefix);
307 TRACE(ALL," +++ # of waiting sessions: "<<fQueue.size());
308 std::list<XrdProofdProofServ *>::iterator ii;
309 int i = 0;
310 for (ii = fQueue.begin(); ii != fQueue.end(); ++ii) {
311 TRACE(ALL," +++ #"<<++i<<" client:"<< (*ii)->Client()<<" # of queries: "<< (*ii)->Queries()->size());
312 }
313 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
314
315 return;
316}
317
318////////////////////////////////////////////////////////////////////////////////
319/// Get first valid session.
320/// The dataset information can be used to assign workers.
321
323{
324 XPDDOM(SCHED)
325
326 if (fQueue.empty())
327 return 0;
328 XrdProofdProofServ *xps = fQueue.front();
329 while (xps && !(xps->IsValid())) {
330 fQueue.remove(xps);
331 xps = fQueue.front();
332 }
333 if (TRACING(DBG)) DumpQueues("FirstSession");
334 // The session will be removed after workers are assigned
335 return xps;
336}
337
338////////////////////////////////////////////////////////////////////////////////
339/// Calculate the number of workers to be used given the state of the cluster
340
342{
343 XPDLOC(SCHED, "Sched::GetNumWorkers")
344
345 // Go through the list of hosts and see how many CPUs are not used.
346 int nFreeCPUs = 0;
347 std::list<XrdProofWorker *> *wrks = fMgr->NetMgr()->GetActiveWorkers();
348 std::list<XrdProofWorker *>::iterator iter;
349 for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
350 TRACE(DBG, (*iter)->fImage<<" : # act: "<<(*iter)->fProofServs.size());
351 if ((*iter)->fType != 'M' && (*iter)->fType != 'S'
352 && (int) (*iter)->fProofServs.size() < fOptWrksPerUnit)
353 // add number of free slots
354 nFreeCPUs += fOptWrksPerUnit - (*iter)->fProofServs.size();
355 }
356
357 float priority = 1;
358 XrdProofGroup *grp = 0;
359 if (fGrpMgr && xps->Group())
360 grp = fGrpMgr->GetGroup(xps->Group());
361 if (grp) {
362 std::list<XrdProofdProofServ *> *sessions = fMgr->SessionMgr()->ActiveSessions();
363 std::list<XrdProofdProofServ *>::iterator sesIter;
364 float summedPriority = 0;
365 for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
366 if ((*sesIter)->Group()) {
367 XrdProofGroup *g = fGrpMgr->GetGroup((*sesIter)->Group());
368 if (g)
369 summedPriority += g->Priority();
370 }
371 }
372 if (summedPriority > 0)
373 priority = (grp->Priority() * sessions->size()) / summedPriority;
374 }
375
376 int nWrks = (int)(nFreeCPUs * fNodesFraction * priority);
377 if (nWrks <= fMinForQuery) {
378 nWrks = fMinForQuery;
379 } else if (nWrks >= (int) wrks->size()) {
380 nWrks = wrks->size() - 1;
381 }
382 TRACE(DBG, nFreeCPUs<<" : "<< nWrks);
383
384 return nWrks;
385}
386
387////////////////////////////////////////////////////////////////////////////////
388/// Get a list of workers that can be used by session 'xps'.
389/// The return code is:
390/// -1 Some failure occured; cannot continue
391/// 0 A new list has been assigned to the session 'xps' and
392/// returned in 'wrks'
393/// 1 The list currently assigned to the session is the one
394/// to be used
395/// 2 No worker could be assigned now; session should be queued
396
398 std::list<XrdProofWorker *> *wrks,
399 const char *querytag)
400{
401 XPDLOC(SCHED, "Sched::GetWorkers")
402
403 int rc = 0;
404
405 // The caller must provide a proofserv session handler
406 if (!xps)
407 return -1;
408
409 TRACE(REQ, "enter: query tag: "<< ((querytag) ? querytag : ""));
410
411 // Static or dynamic
412 bool isDynamic = 1;
413 if (querytag && !strncmp(querytag, XPD_GW_Static, strlen(XPD_GW_Static) - 1)) {
414 isDynamic = 0;
415 }
416
417 // Check if the current assigned list of workers is valid
418 if (querytag && xps && xps->Workers()->Num() > 0) {
419 if (TRACING(REQ)) xps->DumpQueries();
420 const char *cqtag = (xps->CurrentQuery()) ? xps->CurrentQuery()->GetTag() : "undef";
421 TRACE(REQ, "current query tag: "<< cqtag );
422 if (!strcmp(querytag, cqtag)) {
423 // Remove the query to be processed from the queue
424 xps->RemoveQuery(cqtag);
425 TRACE(REQ, "current assignment for session "<< xps->SrvPID() << " is valid");
426 // Current assignement is valid
427 return 1;
428 }
429 }
430
431 // The caller must provide a list where to store the result
432 if (!wrks)
433 return -1;
434
435 // If the session has already assigned workers or there are
436 // other queries waiting - just enqueue
437 // FIFO is enforced by dynamic mode so it is checked just in case
438 if (isDynamic) {
439 if (fUseFIFO && xps->Workers()->Num() > 0) {
440 if (!xps->GetQuery(querytag))
441 Enqueue(xps, new XrdProofQuery(querytag));
442 if (TRACING(DBG)) xps->DumpQueries();
443 // Signal enqueing
444 TRACE(REQ, "session has already assigned workers: enqueue");
445 return 2;
446 }
447 }
448
449 // The current, full list
450 std::list<XrdProofWorker *> *acws = 0;
451
452 if (!fMgr || !(acws = fMgr->NetMgr()->GetActiveWorkers()))
453 return -1;
454
455 // Point to the master element
456 XrdProofWorker *mst = acws->front();
457 if (!mst)
458 return -1;
459
460 if (fWorkerSel == kSSOLoadBased) {
461 // Dynamic scheduling: the scheduler will determine the #workers
462 // to be used based on the current load and assign the least loaded ones
463
464 // Sort the workers by the load
466
467 // Get the advised number
468 int nw = GetNumWorkers(xps);
469
470 if (nw > 0) {
471 // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
472 wrks->push_back(mst);
473
474 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
475 while (nw--) {
476 ++nxWrk;
477 // Add export version of the info
478 // (stats are updated in XrdProofdProtocol::GetWorkers)
479 wrks->push_back(*nxWrk);
480 }
481 } else {
482 // if no workers were assigned
483 // enqueue or send a list with only the master (processing refused)
484 if (fUseFIFO) {
485 // Enqueue the query/session
486 // the returned list of workers was not filled
487 if (!xps->GetQuery(querytag))
488 Enqueue(xps, new XrdProofQuery(querytag));
489 if (TRACING(DBG)) xps->DumpQueries();
490 // Signal enqueing
491 TRACE(REQ, "no workers currently available: session enqueued");
492 return 2;
493 } else {
494 // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
495 wrks->push_back(mst);
496 }
497 }
498 // Done
499 return 0;
500 }
501
502 // Check if the check on the max number of sessions is enabled
503 // We need at least 1 master and a worker
504 std::list<XrdProofWorker *> *acwseff = 0;
505 int maxnum = (querytag && strcmp(querytag, XPD_GW_Static)) ? fMaxRunning : fMaxSessions;
506 bool ok = 0;
507 if (isDynamic) {
508 if (maxnum > 0) {
509 acwseff = new std::list<XrdProofWorker *>;
510 std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
511 if ((*xWrk)->Active() < maxnum) {
512 acwseff->push_back(*xWrk);
513 ++xWrk;
514 for (; xWrk != acws->end(); ++xWrk) {
515 if ((*xWrk)->Active() < maxnum) {
516 acwseff->push_back(*xWrk);
517 ok = 1;
518 }
519 }
520 } else if (!fUseFIFO) {
521 TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
522 }
523 // Check the result
524 if (!ok) { delete acwseff; acwseff = 0; }
525 acws = acwseff;
526 }
527 } else {
528 if (maxnum > 0) {
529 // This is over-conservative for sub-selection (random, or round-robin options)
530 // A better solution should be implemented for that.
531 int nactsess = mst->GetNActiveSessions();
532 TRACE(REQ, "act sess ... " << nactsess);
533 if (nactsess < maxnum) {
534 ok = 1;
535 } else if (!fUseFIFO) {
536 TRACE(REQ, "max number of sessions reached - ("<< maxnum <<")");
537 }
538 // Check the result
539 if (!ok) acws = acwseff;
540 }
541 }
542
543 // Make sure that something has been found
544 if (!acws || (acws && acws->size() <= 1)) {
545 if (fUseFIFO) {
546 // Enqueue the query/session
547 // the returned list of workers was not filled
548 if (!xps->GetQuery(querytag))
549 Enqueue(xps, new XrdProofQuery(querytag));
550 if (TRACING(REQ)) xps->DumpQueries();
551 // Notify enqueing
552 TRACE(REQ, "no workers currently available: session enqueued");
553 SafeDel(acwseff);
554 return 2;
555 } else {
556 TRACE(XERR, "no worker available: do nothing");
557 SafeDel(acwseff);
558 return -1;
559 }
560 }
561
562 // If a non-dynamic session already has assigned workers just return
563 if (!isDynamic && (xps->Workers()->Num() > 0)) {
564 // Current assignement is valid
565 SafeDel(acwseff);
566 return 1;
567 }
568
569 // The master first (stats are updated in XrdProofdProtocol::GetWorkers)
570 wrks->push_back(mst);
571
572 if (fWorkerMax > 0 && fWorkerMax < (int) acws->size()) {
573
574 // Now the workers
575 if (fWorkerSel == kSSORandom) {
576 // Random: the first time init the machine
577 static bool rndmInit = 0;
578 if (!rndmInit) {
579 const char *randdev = "/dev/urandom";
580 int fd;
581 unsigned int seed;
582 if ((fd = open(randdev, O_RDONLY)) != -1) {
583 if (read(fd, &seed, sizeof(seed)) != sizeof(seed)) {
584 TRACE(XERR, "problems reading seed; errno: "<< errno);
585 }
586 srand(seed);
587 close(fd);
588 rndmInit = 1;
589 }
590 }
591 // Selection
592 int nwt = acws->size();
593 std::vector<int> walloc(nwt, 0);
594 std::vector<XrdProofWorker *> vwrk(nwt);
595
596 // Fill the vector with cumulative number of actives
597 int namx = -1;
598 int i = 1;
599 std::list<XrdProofWorker *>::iterator iwk = acws->begin();
600 ++iwk; // Skip master
601 for ( ; iwk != acws->end(); ++iwk) {
602 vwrk[i] = *iwk;
603 int na = (*iwk)->Active();
604 printf(" %d", na);
605 walloc[i] = na + walloc[i-1];
606 i++;
607 namx = (na > namx) ? na : namx;
608 }
609 printf("\n");
610 // Normalize
611 for (i = 1; i < nwt; i++) {
612 if (namx > 0)
613 walloc[i] = namx*i - walloc[i] + i;
614 else
615 walloc[i] = i;
616 }
617 int natot = walloc[nwt - 1];
618
619 int nw = fWorkerMax;
620 while (nw--) {
621 // Normalized number
622 int maxAtt = 10000, natt = 0;
623 int iw = -1;
624 while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
625 int jw = rand() % natot;
626 for (i = 0; i < nwt; i++) {
627 if (jw < walloc[i]) {
628 // re-normalize the weights for the higher index entries
629 int j = 0;
630 for (j = i; j < nwt; j++) {
631 if (walloc[j] > 0)
632 walloc[j]--;
633 }
634 natot--;
635 iw = i;
636 break;
637 }
638 }
639 }
640
641 if (iw > -1) {
642 // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
643 wrks->push_back(vwrk[iw]);
644 } else {
645 // Unable to generate the right number
646 TRACE(XERR, "random generation failed");
647 rc = -1;
648 break;
649 }
650 }
651
652 } else {
653 if (fNextWrk >= (int) acws->size())
654 fNextWrk = 1;
655 int iw = 0;
656 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
657 int nw = fWorkerMax;
658 while (nw--) {
659 while (iw != fNextWrk) {
660 ++nxWrk;
661 iw++;
662 }
663 // Add export version of the info
664 // (stats are updated in XrdProofdProtocol::GetWorkers)
665 wrks->push_back(*nxWrk);
666 // Update next worker index
667 fNextWrk++;
668 if (fNextWrk >= (int) acws->size()) {
669 fNextWrk = 1;
670 iw = 0;
671 nxWrk = acws->begin();
672 }
673 }
674 }
675 } else {
676 // The full list
677 std::list<XrdProofWorker *>::iterator iw = acws->begin();
678 ++iw;
679 while (iw != acws->end()) {
680 // Add to the list (stats are updated in XrdProofdProtocol::GetWorkers)
681 wrks->push_back(*iw);
682 ++iw;
683 }
684 }
685
686 // Make sure that something has been found
687 if (wrks->size() <= 1) {
688 TRACE(XERR, "no worker found: do nothing");
689 rc = -1;
690 }
691
692 // Cleanup
693 if (acwseff) { delete acwseff; acwseff = 0; }
694
695 return rc;
696}
697
698////////////////////////////////////////////////////////////////////////////////
699/// Consider starting some query from the queue.
700/// to be called after some resources are free (e.g. by a finished query)
701/// This method is doing the full transaction of finding the session to
702/// resume, assigning it workers and sending a resume message.
703/// In this way there is not possibility of interference with other GetWorkers
704/// return 0 in case of success and -1 in case of an error
705
707{
708 XPDLOC(SCHED, "Sched::Reschedule")
709
710 if (fUseFIFO && TRACING(DBG)) DumpQueues("Reschedule");
711
712 if (!fQueue.empty()) {
713 // Any advanced scheduling algorithms can be done here
714
716 if (!xps) {
717 TRACE(XERR, "got undefined session: protocol error!");
718 return -1;
719 }
720 XrdOucString wrks;
721 // Call GetWorkers in the manager to mark the assignment.
722 XrdOucString qtag;
723 if (xps && xps->CurrentQuery()) {
724 qtag = xps->CurrentQuery()->GetTag();
725 if (qtag.beginswith(XPD_GW_Static)) {
726 qtag = XPD_GW_Static;
727 qtag.replace(":","");
728 }
729 }
730 if (fMgr->GetWorkers(wrks, xps, qtag.c_str()) < 0 ) {
731 // Something wrong
732 TRACE(XERR, "failure from GetWorkers: protocol error!");
733 return -1;
734 } else {
735 // Send buffer
736 // if workers were assigned remove the session from the queue
737 if (wrks.length() > 0 && wrks != XPD_GW_QueryEnqueued) {
738 // Send the resume message: the workers will be send in response to a
739 // GetWorkers message
740 xps->Resume();
741 // Acually remove the session from the queue
742 fQueue.remove(xps);
743 // Put the session at the end of the queue
744 // > 1 because the query is kept in the queue until 2nd GetWorkers
745 if (xps->Queries()->size() > 1)
746 fQueue.push_back(xps);
747 if (TRACING(DBG)) DumpQueues("Reschedule 2");
748 } // else add workers to the running sessions (once it's possible)
749
750 }
751
752 } //else add workers to the running sessions (once it's possible)
753
754 return 0;
755}
756
757////////////////////////////////////////////////////////////////////////////////
758/// Fill sbuf with some info about our current status
759
760int XrdProofSched::ExportInfo(XrdOucString &sbuf)
761{
762 // Selection type
763 const char *osel[] = { "all", "round-robin", "random", "load-based"};
764 sbuf += "Selection: ";
765 sbuf += osel[fWorkerSel+1];
766 if (fWorkerSel > -1) {
767 sbuf += ", max workers: ";
768 sbuf += fWorkerMax; sbuf += " &";
769 }
770
771 // The full list
772 std::list<XrdProofWorker *> *acws = fMgr->NetMgr()->GetActiveWorkers();
773 std::list<XrdProofWorker *>::iterator iw;
774 for (iw = acws->begin(); iw != acws->end(); ++iw) {
775 sbuf += (*iw)->fType;
776 sbuf += ": "; sbuf += (*iw)->fHost;
777 if ((*iw)->fPort > -1) {
778 sbuf += ":"; sbuf += (*iw)->fPort;
779 } else
780 sbuf += " ";
781 sbuf += " sessions: "; sbuf += (*iw)->Active();
782 sbuf += " &";
783 }
784
785 // Done
786 return 0;
787}
788
789////////////////////////////////////////////////////////////////////////////////
790/// Update the priorities of the active sessions.
791
793 char *val, XrdOucStream *cfg, bool rcf)
794{
795 XPDLOC(SCHED, "Sched::ProcessDirective")
796
797 if (!d)
798 // undefined inputs
799 return -1;
800
801 if (d->fName == "schedparam") {
802 return DoDirectiveSchedParam(val, cfg, rcf);
803 } else if (d->fName == "resource") {
804 return DoDirectiveResource(val, cfg, rcf);
805 }
806 TRACE(XERR, "unknown directive: "<<d->fName);
807 return -1;
808}
809
810////////////////////////////////////////////////////////////////////////////////
811/// Process 'schedparam' directive
812
813int XrdProofSched::DoDirectiveSchedParam(char *val, XrdOucStream *cfg, bool)
814{
815 XPDLOC(SCHED, "Sched::DoDirectiveSchedParam")
816
817 if (!val || !cfg)
818 // undefined inputs
819 return -1;
820
821 // Get the parameters
822 while (val && val[0]) {
823 XrdOucString s(val);
824 if (s.beginswith("wmx:")) {
825 s.replace("wmx:","");
826 fWorkerMax = strtol(s.c_str(), (char **)0, 10);
827 } else if (s.beginswith("mxsess:")) {
828 s.replace("mxsess:","");
829 fMaxSessions = strtol(s.c_str(), (char **)0, 10);
830 } else if (s.beginswith("mxrun:")) {
831 s.replace("mxrun:","");
832 fMaxRunning = strtol(s.c_str(), (char **)0, 10);
833 } else if (s.beginswith("selopt:")) {
834 if (s.endswith("random"))
836 else if (s.endswith("load"))
838 else
840 } else if (s.beginswith("fraction:")) {
841 s.replace("fraction:","");
842 fNodesFraction = strtod(s.c_str(), (char **)0);
843 } else if (s.beginswith("optnwrks:")) {
844 s.replace("optnwrks:","");
845 fOptWrksPerUnit = strtol(s.c_str(), (char **)0, 10);
846 } else if (s.beginswith("minforquery:")) {
847 s.replace("minforquery:","");
848 fMinForQuery = strtol(s.c_str(), (char **)0, 10);
849 } else if (s.beginswith("queue:")) {
850 if (s.endswith("fifo")) {
851 fUseFIFO = 1;
852 }
853 } else if (strncmp(val, "default", 7)) {
854 // This line applies to another scheduler
856 break;
857 }
858 val = cfg->GetWord();
859 }
860
861 // If the max number of sessions is limited then there is no lower bound
862 // the number of workers per query
863 if (fMaxSessions > 0) {
864 fMinForQuery = 0;
865 // And there is an upper limit on the number of running sessions
866 if (fMaxRunning < 0 || fMaxRunning > fMaxSessions)
868 }
869
870 // The FIFO size make sense only in non-load based mode
871 if (fWorkerSel == kSSOLoadBased && fMaxRunning > 0) {
872 TRACE(ALL, "WARNING: in Load-Based mode the max number of sessions"
873 " to be run is determined dynamically");
874 }
875
876 return 0;
877}
878
879////////////////////////////////////////////////////////////////////////////////
880/// Process 'resource' directive
881
882int XrdProofSched::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
883{
884 if (!val || !cfg)
885 // undefined inputs
886 return -1;
887
888 // Get the scheduler name
889 if (strncmp(val, "static", 6) && strncmp(val, "default", 7))
890 return 0;
891 // Get the values
892 while ((val = cfg->GetWord()) && val[0]) {
893 XrdOucString s(val);
894 if (s.beginswith("wmx:")) {
895 s.replace("wmx:","");
896 fWorkerMax = strtol(s.c_str(), (char **)0, 10);
897 } else if (s.beginswith("mxsess:")) {
898 s.replace("mxsess:","");
899 fMaxSessions = strtol(s.c_str(), (char **)0, 10);
900 } else if (s.beginswith("selopt:")) {
901 if (s.endswith("random"))
903 else
905 }
906 }
907 return 0;
908}
#define d(i)
Definition: RSha256.hxx:102
#define g(i)
Definition: RSha256.hxx:105
#define e(i)
Definition: RSha256.hxx:103
#define TRACE(Flag, Args)
Definition: TGHtml.h:120
char name[80]
Definition: TGX11.cxx:109
int nwt
Definition: THbookFile.cxx:91
@ kSSORandom
@ kSSOLoadBased
@ kSSORoundRobin
@ kXPD_running
const char *const XPD_GW_QueryEnqueued
const char *const XPD_GW_Static
#define XrdSysError
Definition: XpdSysError.h:8
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
#define kXPSMXNMLEN
Definition: XrdProofSched.h:53
#define XPDFORM
Definition: XrdProofdAux.h:378
#define SafeDel(x)
Definition: XrdProofdAux.h:332
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define XPDDOM(d)
#define XPDLOC(d, x)
#define TRACING(x)
#define XPDERR(x)
int Type() const
Definition: XrdProofdAux.h:191
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
float Priority() const
Definition: XrdProofGroup.h:80
const char * GetTag()
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker * > *, const char *)
Get a list of workers that can be used by session 'xps'.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int Reschedule()
Consider starting some query from the queue.
virtual void RegisterDirectives()
Register directives for configuration.
char fName[kXPSMXNMLEN]
XrdSysError * fEDest
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
XrdProofdPipe * Pipe()
double fNodesFraction
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
std::list< XrdProofdProofServ * > fQueue
int CheckFrequency() const
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdSysError *e=0)
Constructor.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
XrdProofGroupMgr * fGrpMgr
virtual int Config(bool rcf=0)
Configure this instance using the content of file 'cfn'.
virtual void ResetParameters()
Reset values for the configurable parameters.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process 'schedparam' directive.
XrdProofdManager * fMgr
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
static void Sort(std::list< XrdProofWorker * > *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
XrdProofdNetMgr * NetMgr() const
XrdProofdProofServMgr * SessionMgr() const
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
int Recv(XpdMsg &msg)
Recv message from the pipe.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
std::list< XrdProofdProofServ * > * ActiveSessions()
int Resume()
Send a resume message to the this session.
int Enqueue(XrdProofQuery *q)
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
XrdOucHash< XrdProofWorker > * Workers() const
std::list< XrdProofQuery * > * Queries() const
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
const char * Group() const
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
XrdProofQuery * CurrentQuery()
static constexpr double s