50#include "XrdOuc/XrdOucString.hh"
51#include "XrdOuc/XrdOucStream.hh"
94 TRACE(
XERR,
"undefined scheduler: cannot start");
111 XPDERR(
"problems receiving message; errno: "<<-
rc);
130 TRACE(
ALL,
"running regular checks");
148 lhs->GetNActiveSessions() <
rhs->GetNActiveSessions()) ? 1 : 0);
156 if (!
d || !(
d->fVal))
208 if (
d->fName ==
"schedparam") {
210 }
else if (
d->fName ==
"resource") {
244 XPDERR(
"problems parsing file ");
254 XPDFORM(
msg,
"maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
262 (
void *)
this, 0,
"Scheduler cron thread") != 0) {
263 XPDERR(
"could not start cron thread");
283 std::list<XrdProofdProofServ *>::iterator
ii;
305 TRACE(
ALL,
" ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306 if (prefix)
TRACE(
ALL,
" +++ Called from: "<<prefix);
308 std::list<XrdProofdProofServ *>::iterator
ii;
311 TRACE(
ALL,
" +++ #"<<++i<<
" client:"<< (*ii)->Client()<<
" # of queries: "<< (*ii)->Queries()->size());
313 TRACE(
ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
329 while (
xps && !(
xps->IsValid())) {
348 std::list<XrdProofWorker *>::iterator iter;
350 TRACE(
DBG, (*iter)->fImage<<
" : # act: "<<(*iter)->fProofServs.size());
351 if ((*iter)->fType !=
'M' && (*iter)->fType !=
'S'
363 std::list<XrdProofdProofServ *>::iterator
sesIter;
366 if ((*sesIter)->Group()) {
379 }
else if (
nWrks >= (
int)
wrks->size()) {
398 std::list<XrdProofWorker *> *
wrks,
420 const char *
cqtag = (
xps->CurrentQuery()) ?
xps->CurrentQuery()->GetTag() :
"undef";
425 TRACE(
REQ,
"current assignment for session "<<
xps->SrvPID() <<
" is valid");
444 TRACE(
REQ,
"session has already assigned workers: enqueue");
450 std::list<XrdProofWorker *> *
acws = 0;
491 TRACE(
REQ,
"no workers currently available: session enqueued");
504 std::list<XrdProofWorker *> *
acwseff = 0;
509 acwseff =
new std::list<XrdProofWorker *>;
511 if ((*xWrk)->Active() <
maxnum) {
515 if ((*xWrk)->Active() <
maxnum) {
552 TRACE(
REQ,
"no workers currently available: session enqueued");
556 TRACE(
XERR,
"no worker available: do nothing");
579 const char *
randdev =
"/dev/urandom";
583 if (read(fd, &seed,
sizeof(seed)) !=
sizeof(seed)) {
594 std::vector<XrdProofWorker *>
vwrk(
nwt);
599 std::list<XrdProofWorker *>::iterator
iwk =
acws->
begin();
603 int na = (*iwk)->Active();
611 for (i = 1; i <
nwt; i++) {
626 for (i = 0; i <
nwt; i++) {
630 for (
j = i;
j <
nwt;
j++) {
677 std::list<XrdProofWorker *>::iterator
iw =
acws->
begin();
687 if (
wrks->size() <= 1) {
688 TRACE(
XERR,
"no worker found: do nothing");
717 TRACE(
XERR,
"got undefined session: protocol error!");
723 if (
xps &&
xps->CurrentQuery()) {
724 qtag =
xps->CurrentQuery()->GetTag();
727 qtag.replace(
":",
"");
732 TRACE(
XERR,
"failure from GetWorkers: protocol error!");
745 if (
xps->Queries()->size() > 1)
763 const char *
osel[] = {
"all",
"round-robin",
"random",
"load-based"};
764 sbuf +=
"Selection: ";
767 sbuf +=
", max workers: ";
773 std::list<XrdProofWorker *>::iterator
iw;
775 sbuf += (*iw)->fType;
777 if ((*iw)->fPort > -1) {
781 sbuf +=
" sessions: ";
sbuf += (*iw)->Active();
801 if (
d->fName ==
"schedparam") {
803 }
else if (
d->fName ==
"resource") {
822 while (val && val[0]) {
824 if (s.beginswith(
"wmx:")) {
825 s.replace(
"wmx:",
"");
827 }
else if (s.beginswith(
"mxsess:")) {
828 s.replace(
"mxsess:",
"");
830 }
else if (s.beginswith(
"mxrun:")) {
831 s.replace(
"mxrun:",
"");
833 }
else if (s.beginswith(
"selopt:")) {
834 if (s.endswith(
"random"))
836 else if (s.endswith(
"load"))
840 }
else if (s.beginswith(
"fraction:")) {
841 s.replace(
"fraction:",
"");
843 }
else if (s.beginswith(
"optnwrks:")) {
844 s.replace(
"optnwrks:",
"");
846 }
else if (s.beginswith(
"minforquery:")) {
847 s.replace(
"minforquery:",
"");
849 }
else if (s.beginswith(
"queue:")) {
850 if (s.endswith(
"fifo")) {
853 }
else if (
strncmp(val,
"default", 7)) {
858 val = cfg->GetWord();
872 TRACE(
ALL,
"WARNING: in Load-Based mode the max number of sessions"
873 " to be run is determined dynamically");
892 while ((val = cfg->GetWord()) && val[0]) {
894 if (s.beginswith(
"wmx:")) {
895 s.replace(
"wmx:",
"");
897 }
else if (s.beginswith(
"mxsess:")) {
898 s.replace(
"mxsess:",
"");
900 }
else if (s.beginswith(
"selopt:")) {
901 if (s.endswith(
"random"))
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
#define TRACE(Flag, Args)
winID h TVirtualViewer3D TVirtualGLPainter p
const char *const XPD_GW_QueryEnqueued
const char *const XPD_GW_Static
int DoSchedDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Generic directive processor.
static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
Compare two workers for sorting.
void * XrdProofSchedCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
const_iterator begin() const
const_iterator end() const
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
virtual int DoDirectiveResource(char *, XrdOucStream *, bool)
Process 'resource' directive.
virtual int GetWorkers(XrdProofdProofServ *xps, std::list< XrdProofWorker * > *, const char *)
Get a list of workers that can be used by session 'xps'.
XrdProofSched(const char *name, XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr, const char *cfn, XrdOucError *e=0)
Constructor.
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
virtual int ProcessDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
virtual int Reschedule()
Consider starting some query from the queue.
virtual void RegisterDirectives()
Register directives for configuration.
virtual int Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
Queue a query in the session; if this is the first querym enqueue also the session.
virtual int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
std::list< XrdProofdProofServ * > fQueue
virtual XrdProofdProofServ * FirstSession()
Get first valid session.
virtual void DumpQueues(const char *prefix=0)
Dump the content of the waiting sessions queue.
XrdProofGroupMgr * fGrpMgr
virtual int Config(bool rcf=0)
Configure this instance using the content of file 'cfn'.
virtual void ResetParameters()
Reset values for the configurable parameters.
virtual int GetNumWorkers(XrdProofdProofServ *xps)
Calculate the number of workers to be used given the state of the cluster.
virtual int DoDirectiveSchedParam(char *, XrdOucStream *, bool)
Process 'schedparam' directive.
static void Sort(std::list< XrdProofWorker * > *lst, bool(*f)(XrdProofWorker *&lhs, XrdProofWorker *&rhs))
Sort ascendingly the list according to the comparing algorithm defined by 'f'; 'f' should return 'tru...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
XrdProofdNetMgr * NetMgr() const
XrdProofdProofServMgr * SessionMgr() const
std::list< XrdProofWorker * > * GetActiveWorkers()
Return the list of workers after having made sure that the info is up-to-date.
std::list< XrdProofdProofServ * > * ActiveSessions()