50 #include "XrdOuc/XrdOucString.hh" 51 #include "XrdOuc/XrdOucStream.hh" 94 TRACE(XERR,
"undefined scheduler: cannot start");
99 int lastcheck = time(0), ckfreq = sched->
CheckFrequency(), deltat = 0;
102 if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
104 int pollRet = sched->
Pipe()->
Poll(deltat);
110 if ((rc = sched->
Pipe()->
Recv(msg)) != 0) {
111 XPDERR(
"problems receiving message; errno: "<<-rc);
118 TRACE(ALL,
"received kReschedule");
125 TRACE(XERR,
"unknown type: "<<msg.
Type());
130 TRACE(ALL,
"running regular checks");
147 return ((lhs && rhs &&
156 if (!d || !(d->
fVal))
200 char *val, XrdOucStream *cfg,
bool rcf)
202 XPDLOC(SCHED,
"Sched::DoDirective")
208 if (d->
fName ==
"schedparam") {
210 }
else if (d->
fName ==
"resource") {
240 XPDLOC(SCHED,
"Sched::Config")
244 XPDERR(
"problems parsing file ");
254 XPDFORM(msg,
"maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
262 (
void *)
this, 0,
"Scheduler cron thread") != 0) {
263 XPDERR(
"could not start cron thread");
267 TRACE(ALL,
"cron thread started");
282 if (xps->
Enqueue(query) == 1) {
283 std::list<XrdProofdProofServ *>::iterator ii;
303 XPDLOC(SCHED,
"DumpQueues")
305 TRACE(ALL,
" ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306 if (prefix)
TRACE(ALL,
" +++ Called from: "<<prefix);
307 TRACE(ALL,
" +++ # of waiting sessions: "<<
fQueue.size());
308 std::list<XrdProofdProofServ *>::iterator ii;
311 TRACE(ALL,
" +++ #"<<++i<<
" client:"<< (*ii)->Client()<<
" # of queries: "<< (*ii)->Queries()->size());
313 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
329 while (xps && !(xps->
IsValid())) {
343 XPDLOC(SCHED,
"Sched::GetNumWorkers")
348 std::list<XrdProofWorker *>::iterator iter;
349 for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
350 TRACE(DBG, (*iter)->fImage<<
" : # act: "<<(*iter)->fProofServs.size());
351 if ((*iter)->fType !=
'M' && (*iter)->fType !=
'S' 363 std::list<XrdProofdProofServ *>::iterator sesIter;
364 float summedPriority = 0;
365 for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
366 if ((*sesIter)->Group()) {
372 if (summedPriority > 0)
373 priority = (grp->
Priority() * sessions->size()) / summedPriority;
379 }
else if (nWrks >= (
int) wrks->size()) {
380 nWrks = wrks->size() - 1;
382 TRACE(DBG, nFreeCPUs<<
" : "<< nWrks);
398 std::list<XrdProofWorker *> *wrks,
399 const char *querytag)
401 XPDLOC(SCHED,
"Sched::GetWorkers")
409 TRACE(REQ,
"enter: query tag: "<< ((querytag) ? querytag :
""));
418 if (querytag && xps && xps->
Workers()->Num() > 0) {
421 TRACE(REQ,
"current query tag: "<< cqtag );
422 if (!strcmp(querytag, cqtag)) {
425 TRACE(REQ,
"current assignment for session "<< xps->
SrvPID() <<
" is valid");
444 TRACE(REQ,
"session has already assigned workers: enqueue");
450 std::list<XrdProofWorker *> *acws = 0;
472 wrks->push_back(mst);
474 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
479 wrks->push_back(*nxWrk);
491 TRACE(REQ,
"no workers currently available: session enqueued");
495 wrks->push_back(mst);
504 std::list<XrdProofWorker *> *acwseff = 0;
509 acwseff =
new std::list<XrdProofWorker *>;
510 std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
511 if ((*xWrk)->Active() < maxnum) {
512 acwseff->push_back(*xWrk);
514 for (; xWrk != acws->end(); xWrk++) {
515 if ((*xWrk)->Active() < maxnum) {
516 acwseff->push_back(*xWrk);
521 TRACE(REQ,
"max number of sessions reached - ("<< maxnum <<
")");
524 if (!ok) {
delete acwseff; acwseff = 0; }
532 TRACE(REQ,
"act sess ... " << nactsess);
533 if (nactsess < maxnum) {
536 TRACE(REQ,
"max number of sessions reached - ("<< maxnum <<
")");
539 if (!ok) acws = acwseff;
544 if (!acws || (acws && acws->size() <= 1)) {
552 TRACE(REQ,
"no workers currently available: session enqueued");
556 TRACE(XERR,
"no worker available: do nothing");
563 if (!isDynamic && (xps->
Workers()->Num() > 0)) {
570 wrks->push_back(mst);
577 static bool rndmInit = 0;
579 const char *randdev =
"/dev/urandom";
582 if ((fd =
open(randdev, O_RDONLY)) != -1) {
583 if (read(fd, &seed,
sizeof(seed)) !=
sizeof(seed)) {
584 TRACE(XERR,
"problems reading seed; errno: "<< errno);
592 int nwt = acws->size();
593 std::vector<int> walloc(nwt, 0);
594 std::vector<XrdProofWorker *> vwrk(nwt);
599 std::list<XrdProofWorker *>::iterator iwk = acws->begin();
601 for ( ; iwk != acws->end(); iwk++) {
603 int na = (*iwk)->Active();
605 walloc[i] = na + walloc[i-1];
607 namx = (na > namx) ? na : namx;
611 for (i = 1; i <
nwt; i++) {
613 walloc[i] = namx*i - walloc[i] + i;
617 int natot = walloc[nwt - 1];
622 int maxAtt = 10000, natt = 0;
624 while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
625 int jw = rand() % natot;
626 for (i = 0; i <
nwt; i++) {
627 if (jw < walloc[i]) {
630 for (j = i; j <
nwt; j++) {
643 wrks->push_back(vwrk[iw]);
646 TRACE(XERR,
"random generation failed");
656 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
665 wrks->push_back(*nxWrk);
668 if (
fNextWrk >= (
int) acws->size()) {
671 nxWrk = acws->begin();
677 std::list<XrdProofWorker *>::iterator iw = acws->begin();
679 while (iw != acws->end()) {
681 wrks->push_back(*iw);
687 if (wrks->size() <= 1) {
688 TRACE(XERR,
"no worker found: do nothing");
693 if (acwseff) {
delete acwseff; acwseff = 0; }
708 XPDLOC(SCHED,
"Sched::Reschedule")
717 TRACE(XERR,
"got undefined session: protocol error!");
727 qtag.replace(
":",
"");
732 TRACE(XERR,
"failure from GetWorkers: protocol error!");
745 if (xps->
Queries()->size() > 1)
763 const char *osel[] = {
"all",
"round-robin",
"random",
"load-based"};
764 sbuf +=
"Selection: ";
767 sbuf +=
", max workers: ";
773 std::list<XrdProofWorker *>::iterator iw;
774 for (iw = acws->begin(); iw != acws->end(); ++iw) {
775 sbuf += (*iw)->fType;
776 sbuf +=
": "; sbuf += (*iw)->fHost;
777 if ((*iw)->fPort > -1) {
778 sbuf +=
":"; sbuf += (*iw)->fPort;
781 sbuf +=
" sessions: "; sbuf += (*iw)->Active();
793 char *val, XrdOucStream *cfg,
bool rcf)
795 XPDLOC(SCHED,
"Sched::ProcessDirective")
801 if (d->
fName ==
"schedparam") {
803 }
else if (d->
fName ==
"resource") {
815 XPDLOC(SCHED,
"Sched::DoDirectiveSchedParam")
822 while (val && val[0]) {
824 if (s.beginswith(
"wmx:")) {
825 s.replace(
"wmx:",
"");
826 fWorkerMax = strtol(s.c_str(), (
char **)0, 10);
827 }
else if (s.beginswith(
"mxsess:")) {
828 s.replace(
"mxsess:",
"");
830 }
else if (s.beginswith(
"mxrun:")) {
831 s.replace(
"mxrun:",
"");
833 }
else if (s.beginswith(
"selopt:")) {
834 if (s.endswith(
"random"))
836 else if (s.endswith(
"load"))
840 }
else if (s.beginswith(
"fraction:")) {
841 s.replace(
"fraction:",
"");
843 }
else if (s.beginswith(
"optnwrks:")) {
844 s.replace(
"optnwrks:",
"");
846 }
else if (s.beginswith(
"minforquery:")) {
847 s.replace(
"minforquery:",
"");
849 }
else if (s.beginswith(
"queue:")) {
850 if (s.endswith(
"fifo")) {
853 }
else if (strncmp(val,
"default", 7)) {
858 val = cfg->GetWord();
872 TRACE(ALL,
"WARNING: in Load-Based mode the max number of sessions" 873 " to be run is determined dynamically");
889 if (strncmp(val,
"static", 6) && strncmp(val,
"default", 7))
892 while ((val = cfg->GetWord()) && val[0]) {
894 if (s.beginswith(
"wmx:")) {
895 s.replace(
"wmx:",
"");
896 fWorkerMax = strtol(s.c_str(), (
char **)0, 10);
897 }
else if (s.beginswith(
"mxsess:")) {
898 s.replace(
"mxsess:",
"");
900 }
else if (s.beginswith(
"selopt:")) {
901 if (s.endswith(
"random"))
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
int Resume()
Send a resume message to the this session.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
const char * Group() const
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
int Enqueue(XrdProofQuery *q)
XrdProofdProofServMgr * SessionMgr() const
std::list< XrdProofdProofServ * > * ActiveSessions()
XrdOucHash< XrdProofWorker > * Workers() const
XrdProofdNetMgr * NetMgr() const
int CheckFrequency() const
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define TRACE(Flag, Args)
const char *const XPD_GW_QueryEnqueued
std::list< XrdProofQuery * > * Queries() const
XrdProofGroupMgr * fGrpMgr
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker *> *, const char *)
Get a list of workers that can be used by session 'xps'.
int Recv(XpdMsg &msg)
Recv message from the pipe.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process 'schedparam' directive.
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
virtual int Config(bool rcf=0)
Configure this instance using the content of file 'cfn'.
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
std::list< XrdProofdProofServ * > fQueue
virtual void ResetParameters()
Reset values for the configurable parameters.
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
static constexpr double s
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
static void Sort(std::list< XrdProofWorker *> *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
virtual void RegisterDirectives()
Register directives for configuration.
const char *const XPD_GW_Static
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
virtual int Reschedule()
Consider starting some query from the queue.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
virtual int Config(bool rcf=0)
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdSysError *e=0)
Constructor.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
void Register(const char *dname, XrdProofdDirective *d)
XrdProofQuery * CurrentQuery()
static constexpr double g