50 #include "XrdOuc/XrdOucString.hh"
51 #include "XrdOuc/XrdOucStream.hh"
94 TRACE(XERR,
"undefined scheduler: cannot start");
99 int lastcheck = time(0), ckfreq = sched->
CheckFrequency(), deltat = 0;
102 if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
104 int pollRet = sched->
Pipe()->
Poll(deltat);
110 if ((rc = sched->
Pipe()->
Recv(msg)) != 0) {
111 XPDERR(
"problems receiving message; errno: "<<-rc);
118 TRACE(ALL,
"received kReschedule");
125 TRACE(XERR,
"unknown type: "<<msg.
Type());
130 TRACE(ALL,
"running regular checks");
147 return ((lhs && rhs &&
156 if (!d || !(d->
fVal))
200 char *val, XrdOucStream *cfg,
bool rcf)
202 XPDLOC(SCHED,
"Sched::DoDirective")
208 if (d->
fName ==
"schedparam") {
210 }
else if (d->
fName ==
"resource") {
240 XPDLOC(SCHED,
"Sched::Config")
244 XPDERR(
"problems parsing file ");
254 XPDFORM(msg,
"maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
262 (
void *)
this, 0,
"Scheduler cron thread") != 0) {
263 XPDERR(
"could not start cron thread");
267 TRACE(ALL,
"cron thread started");
282 if (xps->
Enqueue(query) == 1) {
283 std::list<XrdProofdProofServ *>::iterator ii;
303 XPDLOC(SCHED,
"DumpQueues")
305 TRACE(ALL,
" ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306 if (prefix)
TRACE(ALL,
" +++ Called from: "<<prefix);
307 TRACE(ALL,
" +++ # of waiting sessions: "<<
fQueue.size());
308 std::list<XrdProofdProofServ *>::iterator ii;
311 TRACE(ALL,
" +++ #"<<++i<<
" client:"<< (*ii)->Client()<<
" # of queries: "<< (*ii)->Queries()->size());
313 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
329 while (xps && !(xps->
IsValid())) {
343 XPDLOC(SCHED,
"Sched::GetNumWorkers")
348 std::list<XrdProofWorker *>::iterator
iter;
349 for (iter = wrks->begin(); iter != wrks->end(); ++
iter) {
350 TRACE(DBG, (*iter)->fImage<<
" : # act: "<<(*iter)->fProofServs.size());
351 if ((*iter)->fType !=
'M' && (*iter)->fType !=
'S'
363 std::list<XrdProofdProofServ *>::iterator sesIter;
364 float summedPriority = 0;
365 for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
366 if ((*sesIter)->Group()) {
372 if (summedPriority > 0)
373 priority = (grp->
Priority() * sessions->size()) / summedPriority;
379 }
else if (nWrks >= (
int) wrks->size()) {
380 nWrks = wrks->size() - 1;
382 TRACE(DBG, nFreeCPUs<<
" : "<< nWrks);
398 std::list<XrdProofWorker *> *wrks,
399 const char *querytag)
401 XPDLOC(SCHED,
"Sched::GetWorkers")
405 TRACE(REQ,
"enter: query tag: "<< ((querytag) ? querytag :
""));
414 if (querytag && xps && xps->
Workers()->Num() > 0) {
417 TRACE(REQ,
"current query tag: "<< cqtag );
418 if (!strcmp(querytag, cqtag)) {
421 TRACE(REQ,
"current assignment for session "<< xps->
SrvPID() <<
" is valid");
440 TRACE(REQ,
"session has already assigned workers: enqueue");
446 std::list<XrdProofWorker *> *acws = 0;
468 wrks->push_back(mst);
470 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
475 wrks->push_back(*nxWrk);
487 TRACE(REQ,
"no workers currently available: session enqueued");
491 wrks->push_back(mst);
500 std::list<XrdProofWorker *> *acwseff = 0;
505 acwseff =
new std::list<XrdProofWorker *>;
506 std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
507 if ((*xWrk)->Active() < maxnum) {
508 acwseff->push_back(*xWrk);
510 for (; xWrk != acws->end(); xWrk++) {
511 if ((*xWrk)->Active() < maxnum) {
512 acwseff->push_back(*xWrk);
517 TRACE(REQ,
"max number of sessions reached - ("<< maxnum <<
")");
520 if (!ok) {
delete acwseff; acwseff = 0; }
528 TRACE(REQ,
"act sess ... " << nactsess);
529 if (nactsess < maxnum) {
532 TRACE(REQ,
"max number of sessions reached - ("<< maxnum <<
")");
535 if (!ok) acws = acwseff;
540 if (!acws || (acws && acws->size() <= 1)) {
548 TRACE(REQ,
"no workers currently available: session enqueued");
552 TRACE(XERR,
"no worker available: do nothing");
559 if (!isDynamic && (xps->
Workers()->Num() > 0)) {
566 wrks->push_back(mst);
573 static bool rndmInit = 0;
575 const char *randdev =
"/dev/urandom";
578 if ((fd =
open(randdev, O_RDONLY)) != -1) {
579 if (
read(fd, &seed,
sizeof(seed)) !=
sizeof(seed)) {
580 TRACE(XERR,
"problems reading seed; errno: "<< errno);
588 int nwt = acws->size();
589 std::vector<int> walloc(nwt, 0);
590 std::vector<XrdProofWorker *> vwrk(nwt);
595 std::list<XrdProofWorker *>::iterator iwk = acws->begin();
597 for ( ; iwk != acws->end(); iwk++) {
599 int na = (*iwk)->Active();
601 walloc[i] = na + walloc[i-1];
603 namx = (na > namx) ? na : namx;
607 for (i = 1; i <
nwt; i++) {
609 walloc[i] = namx*i - walloc[i] + i;
613 int natot = walloc[nwt - 1];
618 int maxAtt = 10000, natt = 0;
620 while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
621 int jw = rand() % natot;
622 for (i = 0; i <
nwt; i++) {
623 if (jw < walloc[i]) {
626 for (j = i; j <
nwt; j++) {
639 wrks->push_back(vwrk[iw]);
642 TRACE(XERR,
"random generation failed");
652 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
661 wrks->push_back(*nxWrk);
664 if (
fNextWrk >= (
int) acws->size()) {
667 nxWrk = acws->begin();
673 std::list<XrdProofWorker *>::iterator iw = acws->begin();
675 while (iw != acws->end()) {
677 wrks->push_back(*iw);
683 if (wrks->size() <= 1) {
684 TRACE(XERR,
"no worker found: do nothing");
689 if (acwseff) {
delete acwseff; acwseff = 0; }
704 XPDLOC(SCHED,
"Sched::Reschedule")
713 TRACE(XERR,
"got undefined session: protocol error!");
723 qtag.replace(
":",
"");
728 TRACE(XERR,
"failure from GetWorkers: protocol error!");
741 if (xps->
Queries()->size() > 1)
759 const char *osel[] = {
"all",
"round-robin",
"random",
"load-based"};
760 sbuf +=
"Selection: ";
763 sbuf +=
", max workers: ";
769 std::list<XrdProofWorker *>::iterator iw;
770 for (iw = acws->begin(); iw != acws->end(); ++iw) {
771 sbuf += (*iw)->fType;
772 sbuf +=
": "; sbuf += (*iw)->fHost;
773 if ((*iw)->fPort > -1) {
774 sbuf +=
":"; sbuf += (*iw)->fPort;
777 sbuf +=
" sessions: "; sbuf += (*iw)->Active();
789 char *val, XrdOucStream *cfg,
bool rcf)
791 XPDLOC(SCHED,
"Sched::ProcessDirective")
797 if (d->
fName ==
"schedparam") {
799 }
else if (d->
fName ==
"resource") {
811 XPDLOC(SCHED,
"Sched::DoDirectiveSchedParam")
818 while (val && val[0]) {
820 if (s.beginswith(
"wmx:")) {
821 s.replace(
"wmx:",
"");
822 fWorkerMax = strtol(s.c_str(), (
char **)0, 10);
823 }
else if (s.beginswith(
"mxsess:")) {
824 s.replace(
"mxsess:",
"");
826 }
else if (s.beginswith(
"mxrun:")) {
827 s.replace(
"mxrun:",
"");
829 }
else if (s.beginswith(
"selopt:")) {
830 if (s.endswith(
"random"))
832 else if (s.endswith(
"load"))
836 }
else if (s.beginswith(
"fraction:")) {
837 s.replace(
"fraction:",
"");
839 }
else if (s.beginswith(
"optnwrks:")) {
840 s.replace(
"optnwrks:",
"");
842 }
else if (s.beginswith(
"minforquery:")) {
843 s.replace(
"minforquery:",
"");
845 }
else if (s.beginswith(
"queue:")) {
846 if (s.endswith(
"fifo")) {
849 }
else if (strncmp(val,
"default", 7)) {
854 val = cfg->GetWord();
868 TRACE(ALL,
"WARNING: in Load-Based mode the max number of sessions"
869 " to be run is determined dynamically");
885 if (strncmp(val,
"static", 6) && strncmp(val,
"default", 7))
888 while ((val = cfg->GetWord()) && val[0]) {
890 if (s.beginswith(
"wmx:")) {
891 s.replace(
"wmx:",
"");
892 fWorkerMax = strtol(s.c_str(), (
char **)0, 10);
893 }
else if (s.beginswith(
"mxsess:")) {
894 s.replace(
"mxsess:",
"");
896 }
else if (s.beginswith(
"selopt:")) {
897 if (s.endswith(
"random"))
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
XrdProofQuery * GetQuery(const char *tag)
Get query with tag form the list of queries.
double read(const std::string &file_name)
reading
int Resume()
Send a resume message to the this session.
void DumpQueries()
Export the assigned workers in the format understood by proofserv.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
int Enqueue(XrdProofQuery *q)
std::list< XrdProofdProofServ * > * ActiveSessions()
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define TRACE(Flag, Args)
const char *const XPD_GW_QueryEnqueued
XrdProofGroupMgr * fGrpMgr
XrdProofdProofServMgr * SessionMgr() const
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
int Recv(XpdMsg &msg)
Recv message from the pipe.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
std::list< XrdProofQuery * > * Queries() const
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process 'schedparam' directive.
std::map< std::string, std::string >::const_iterator iter
int GetNActiveSessions()
Calculate the number of workers existing on this node which are currently running.
virtual int Config(bool rcf=0)
Configure this instance using the content of file 'cfn'.
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
std::list< XrdProofdProofServ * > fQueue
int CheckFrequency() const
virtual void ResetParameters()
Reset values for the configurable parameters.
XrdProofdNetMgr * NetMgr() const
static void Sort(std::list< XrdProofWorker * > *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
void RemoveQuery(const char *tag)
remove query with tag form the list of queries
XrdOucHash< XrdProofWorker > * Workers() const
virtual void RegisterDirectives()
Register directives for configuration.
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker * > *, const char *)
Get a list of workers that can be used by session 'xps'.
const char *const XPD_GW_Static
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
virtual int Reschedule()
Consider starting some query from the queue.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
virtual int Config(bool rcf=0)
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdSysError *e=0)
Constructor.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
void Register(const char *dname, XrdProofdDirective *d)
const char * Group() const
XrdProofQuery * CurrentQuery()