23#include "XrdOuc/XrdOucStream.hh"
37 std::list<XrdProofdSessionEntry *> *fSortedList;
39} XpdCreateActiveList_t;
53 XPDLOC(PMGR,
"PriorityCron")
57 TRACE(REQ,
"undefined manager: cannot start");
67 if ((rc = mgr->
Pipe()->
Recv(msg)) != 0) {
68 XPDERR(
"problems receiving message; errno: "<<-rc);
73 XrdOucString usr, grp;
74 int opt = 0, pid = -1;
76 rc = (rc == 0) ? msg.
Get(usr) : rc;
77 rc = (rc == 0) ? msg.
Get(grp) : rc;
78 rc = (rc == 0) ? msg.
Get(pid) : rc;
80 XPDERR(
"kChangeStatus: problems parsing message : '"<<msg.
Buf()<<
"'; errno: "<<-rc);
88 mgr->
AddSession(usr.c_str(), grp.c_str(), pid);
90 XPDERR(
"kChangeStatus: invalid opt: "<< opt);
96 rc = (rc == 0) ? msg.
Get(prio) : rc;
98 XPDERR(
"kSetGroupPriority: problems parsing message; errno: "<<-rc);
104 XPDERR(
"unknown message type: "<< msg.
Type());
108 XPDERR(
"problem setting nice values ");
124 XPDLOC(PMGR,
"XrdProofdPriorityMgr")
133 TRACE(XERR,
"unable to generate pipe for the priority poller");
146 XPDLOC(PMGR,
"DumpPriorityChanges")
152 XPDFORM(msg,
"priority will be changed by %d for user(s): %s",
169 XPDLOC(PMGR,
"PriorityMgr::Config")
173 XPDERR(
"problems parsing file ");
178 msg = (rcf) ?
"re-configuring" :
"configuring";
185 TRACE(ALL,
"no priority changes requested");
190 XPDFORM(msg,
"worker sched based on '%s' priorities",
199 (
void *)
this, 0,
"PriorityMgr poller thread") != 0) {
200 XPDERR(
"could not start poller thread");
203 TRACE(ALL,
"poller thread started");
223 char *val, XrdOucStream *cfg,
bool rcf)
225 XPDLOC(PMGR,
"PriorityMgr::DoDirective")
231 if (
d->fName ==
"priority") {
233 }
else if (
d->fName ==
"schedopt") {
236 TRACE(XERR,
"unknown directive: "<<
d->fName);
247 g->SetPriority((
float)priority);
276 XPDLOC(PMGR,
"CreateActiveList")
278 XpdCreateActiveList_t *cal = (XpdCreateActiveList_t *)
s;
283 std::list<XrdProofdSessionEntry *> *sorted = cal->fSortedList;
287 float ef =
g->FracEff() /
g->Active();
288 int nsrv =
g->Active(
e->fUser.c_str());
293 std::list<XrdProofdSessionEntry *>::iterator ssvi;
294 for (ssvi = sorted->begin() ; ssvi != sorted->end(); ++ssvi) {
295 if (ef >= (*ssvi)->fFracEff) {
296 sorted->insert(ssvi,
e);
302 sorted->push_back(
e);
307 emsg =
"no srv sessions for active client";
310 emsg =
"group not found: "; emsg +=
e->fGroup.c_str();
313 emsg =
"group manager undefined";
316 emsg =
"input structure or entry undefined";
320 if (cal) cal->error = 1;
321 TRACE(XERR, (
e ?
e->fUser :
"---") <<
": protocol error: "<<emsg);
336 XPDLOC(PMGR,
"PriorityMgr::SetNiceValues")
338 TRACE(REQ,
"------------------- Start ----------------------");
340 TRACE(REQ,
"opt: "<<opt);
344 TRACE(REQ,
"------------------- End ------------------------");
356 TRACE(REQ,
"------------------- End ------------------------");
366 TRACE(XERR,
"failure from SetEffectiveFractions");
371 TRACE(DBG,
"creating a list of active sessions sorted by decreasing effective fraction ");
372 std::list<XrdProofdSessionEntry *> sorted;
373 XpdCreateActiveList_t cal = {
fMgr->
GroupsMgr(), &sorted, 0 };
380 std::list<XrdProofdSessionEntry *>::iterator ssvi;
382 for (ssvi = sorted.begin() ; ssvi != sorted.end(); ++ssvi)
383 TRACE(HDBG, i++ <<
" eff: "<< (*ssvi)->fFracEff);
386 TRACE(DBG,
"calculating nice values");
389 ssvi = sorted.begin();
390 float xmax = (*ssvi)->fFracEff;
394 (*ssvi)->SetPriority(nice);
397 while (ssvi != sorted.end()) {
401 TRACE(DBG,
" --> nice value for client "<< (*ssvi)->fUser<<
" is "<<nice);
402 (*ssvi)->SetPriority(nice);
406 TRACE(XERR,
"negative or null max effective fraction: "<<
xmax);
410 TRACE(XERR,
"failure from CreateActiveList");
413 TRACE(REQ,
"------------------- End ------------------------");
429 int dp = strtol(val,0,10);
432 if ((val = cfg->GetWord()) && !strncmp(val,
"if",2)) {
433 if ((val = cfg->GetWord()) && val[0]) {
447 XPDLOC(PMGR,
"PriorityMgr::DoDirectiveSchedOpt")
457 while (val && val[0]) {
458 XrdOucString o = val;
459 if (o.beginswith(
"min:")) {
461 o.replace(
"min:",
"");
463 }
else if (o.beginswith(
"max:")) {
465 o.replace(
"max:",
"");
470 else if (o ==
"local")
478 val = cfg->GetWord();
492 TRACE(XERR,
"inconsistent value for fPriorityMin (> fPriorityMax) ["<<
506 XrdOucString key; key += pid;
517 XrdOucString key; key += pid;
536 XPDLOC(PMGR,
"PriorityMgr::SetProcessPriority")
546 if ((priority = getpriority(PRIO_PROCESS, pid)) == -1 && errno != 0) {
547 TRACE(XERR,
"getpriority: errno: " << errno);
551 int newp = priority + dp;
556 TRACE(XERR,
"could not get privileges");
559 TRACE(REQ,
"got privileges ");
561 if (setpriority(PRIO_PROCESS, pid, newp) != 0) {
562 TRACE(XERR,
"setpriority: errno: " << errno);
563 return ((errno != 0) ? -errno : -1);
565 if ((getpriority(PRIO_PROCESS, pid)) != newp && errno != 0) {
566 TRACE(XERR,
"did not succeed: errno: " << errno);
583 : fUser(u), fGroup(
g), fPid(pid), fFracEff(0.)
585 XPDLOC(PMGR,
"XrdProofdSessionEntry")
590 int prio = getpriority(PRIO_PROCESS, pid);
592 TRACE(XERR,
" getpriority: errno: " << errno);
611 XPDLOC(PMGR,
"SessionEntry::SetPriority")
622 TRACE(XERR,
"could not get privileges");
626 if (setpriority(PRIO_PROCESS,
fPid, priority) != 0) {
627 TRACE(XERR,
"setpriority: errno: " << errno);
#define TRACE(Flag, Args)
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define XpdBadPGuard(g, u)
static int CreateActiveList(const char *, XrdProofdSessionEntry *e, void *s)
Run thorugh entries to create the sorted list of active entries.
static int DumpPriorityChanges(const char *, XrdProofdPriority *p, void *s)
Reset the priority on entries.
void * XrdProofdPriorityCron(void *p)
This is an endless loop to periodically check the system.
static int ResetEntryPriority(const char *, XrdProofdSessionEntry *e, void *)
Reset the priority on entries.
#define XrdSysMutexHelper
int Get(int &i)
Get next token and interpret it as an int.
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
int SetEffectiveFractions(bool optprio)
Go through the list of active groups (those having at least a non-idle member) and determine the effe...
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdProofGroupMgr * GroupsMgr() const
const char * Host() const
int Recv(XpdMsg &msg)
Recv message from the pipe.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
void RegisterDirectives()
Register directives for configuration.
int SetProcessPriority(int pid, const char *usr, int &dp)
Change priority of process pid belonging to user, if needed.
int SetNiceValues(int opt=0)
Recalculate nice values taking into account all active users and their priorities.
XrdProofdPriorityMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
int AddSession(const char *u, const char *g, int pid)
Add to the active list a session with ID pid.
XrdOucHash< XrdProofdSessionEntry > fSessions
void SetGroupPriority(const char *grp, int priority)
Change group priority. Used when a master pushes a priority to a worker.
int DoDirectiveSchedOpt(char *, XrdOucStream *, bool)
Process 'schedopt' directive.
XrdOucHash< XrdProofdPriority > fPriorities
int DoDirectivePriority(char *, XrdOucStream *, bool)
Process 'priority' directive.
void SetSchedOpt(int opt)
int RemoveSession(int pid)
Remove from the active list the session with ID pid.
virtual ~XrdProofdSessionEntry()
Destructor.
XrdProofdSessionEntry(const char *u, const char *g, int pid)
Constructor.
int SetPriority(int priority=XPPM_NOPRIORITY)
Change process priority.
static constexpr double s
static constexpr double pi