Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
XrdProofdPriorityMgr.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis Feb 2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// XrdProofdPriorityMgr //
15// //
16// Author: G. Ganis, CERN, 2007 //
17// //
18// Class managing session priorities. //
19// //
20//////////////////////////////////////////////////////////////////////////
21#include "XrdProofdPlatform.h"
22
23#include "XrdOuc/XrdOucStream.hh"
24#include "XrdSys/XrdSysPriv.hh"
25
26#include "XrdProofdAux.h"
27#include "XrdProofdManager.h"
29#include "XrdProofGroup.h"
30
31// Tracing utilities
32#include "XrdProofdTrace.h"
33
34// Aux structures for scan through operations
35typedef struct {
36 XrdProofGroupMgr *fGroupMgr;
37 std::list<XrdProofdSessionEntry *> *fSortedList;
38 bool error;
39} XpdCreateActiveList_t;
40
41//--------------------------------------------------------------------------
42//
43// XrdProofdPriorityCron
44//
45// Function run in separate thread watching changes in session status
46// frequency
47//
48////////////////////////////////////////////////////////////////////////////////
49/// This is an endless loop to periodically check the system
50
52{
53 XPDLOC(PMGR, "PriorityCron")
54
56 if (!(mgr)) {
57 TRACE(REQ, "undefined manager: cannot start");
58 return (void *)0;
59 }
60
61 while(1) {
62 // We wait for processes to communicate a session status change
63 int pollRet = mgr->Pipe()->Poll(-1);
64 if (pollRet > 0) {
65 int rc = 0;
66 XpdMsg msg;
67 if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
68 XPDERR("problems receiving message; errno: "<<-rc);
69 continue;
70 }
71 // Parse type
73 XrdOucString usr, grp;
74 int opt = 0, pid = -1;
75 rc = msg.Get(opt);
76 rc = (rc == 0) ? msg.Get(usr) : rc;
77 rc = (rc == 0) ? msg.Get(grp) : rc;
78 rc = (rc == 0) ? msg.Get(pid) : rc;
79 if (rc != 0) {
80 XPDERR("kChangeStatus: problems parsing message : '"<<msg.Buf()<<"'; errno: "<<-rc);
81 continue;
82 }
83 if (opt < 0) {
84 // Remove
85 mgr->RemoveSession(pid);
86 } else if (opt > 0) {
87 // Add
88 mgr->AddSession(usr.c_str(), grp.c_str(), pid);
89 } else {
90 XPDERR("kChangeStatus: invalid opt: "<< opt);
91 }
93 XrdOucString grp;
94 int prio = -1;
95 rc = msg.Get(grp);
96 rc = (rc == 0) ? msg.Get(prio) : rc;
97 if (rc != 0) {
98 XPDERR("kSetGroupPriority: problems parsing message; errno: "<<-rc);
99 continue;
100 }
101 // Change group priority
102 mgr->SetGroupPriority(grp.c_str(), prio);
103 } else {
104 XPDERR("unknown message type: "<< msg.Type());
105 }
106 // Communicate new priorities
107 if (mgr->SetNiceValues() != 0) {
108 XPDERR("problem setting nice values ");
109 }
110 }
111 }
112
113 // Should never come here
114 return (void *)0;
115}
116
117////////////////////////////////////////////////////////////////////////////////
118/// Constructor
119
121 XrdProtocol_Config *pi, XrdSysError *e)
122 : XrdProofdConfig(pi->ConfigFN, e)
123{
124 XPDLOC(PMGR, "XrdProofdPriorityMgr")
125
126 fMgr = mgr;
128 fPriorityMax = 20;
129 fPriorityMin = 1;
130
131 // Init pipe for the poller
132 if (!fPipe.IsValid()) {
133 TRACE(XERR, "unable to generate pipe for the priority poller");
134 return;
135 }
136
137 // Configuration directives
139}
140
141////////////////////////////////////////////////////////////////////////////////
142/// Reset the priority on entries
143
144static int DumpPriorityChanges(const char *, XrdProofdPriority *p, void *s)
145{
146 XPDLOC(PMGR, "DumpPriorityChanges")
147
148 XrdSysError *e = (XrdSysError *)s;
149
150 if (p && e) {
151 XrdOucString msg;
152 XPDFORM(msg, "priority will be changed by %d for user(s): %s",
153 p->fDeltaPriority, p->fUser.c_str());
154 TRACE(ALL, msg);
155 // Check next
156 return 0;
157 }
158
159 // Not enough info: stop
160 return 1;
161}
162
163////////////////////////////////////////////////////////////////////////////////
164/// Run configuration and parse the entered config directives.
165/// Return 0 on success, -1 on error
166
168{
169 XPDLOC(PMGR, "PriorityMgr::Config")
170
171 // Run first the configurator
172 if (XrdProofdConfig::Config(rcf) != 0) {
173 XPDERR("problems parsing file ");
174 return -1;
175 }
176
177 XrdOucString msg;
178 msg = (rcf) ? "re-configuring" : "configuring";
179 TRACE(ALL, msg);
180
181 // Notify change priority rules
182 if (fPriorities.Num() > 0) {
183 fPriorities.Apply(DumpPriorityChanges, (void *)fEDest);
184 } else {
185 TRACE(ALL, "no priority changes requested");
186 }
187
188 // Scheduling option
189 if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->Num() > 1 && fSchedOpt != kXPD_sched_off) {
190 XPDFORM(msg, "worker sched based on '%s' priorities",
191 (fSchedOpt == kXPD_sched_central) ? "central" : "local");
192 TRACE(ALL, msg);
193 }
194
195 if (!rcf) {
196 // Start poller thread
197 pthread_t tid;
198 if (XrdSysThread::Run(&tid, XrdProofdPriorityCron,
199 (void *)this, 0, "PriorityMgr poller thread") != 0) {
200 XPDERR("could not start poller thread");
201 return 0;
202 }
203 TRACE(ALL, "poller thread started");
204 }
205
206 // Done
207 return 0;
208}
209
210////////////////////////////////////////////////////////////////////////////////
211/// Register directives for configuration
212
214{
215 Register("schedopt", new XrdProofdDirective("schedopt", this, &DoDirectiveClass));
216 Register("priority", new XrdProofdDirective("priority", this, &DoDirectiveClass));
217}
218
219////////////////////////////////////////////////////////////////////////////////
220/// Update the priorities of the active sessions.
221
223 char *val, XrdOucStream *cfg, bool rcf)
224{
225 XPDLOC(PMGR, "PriorityMgr::DoDirective")
226
227 if (!d)
228 // undefined inputs
229 return -1;
230
231 if (d->fName == "priority") {
232 return DoDirectivePriority(val, cfg, rcf);
233 } else if (d->fName == "schedopt") {
234 return DoDirectiveSchedOpt(val, cfg, rcf);
235 }
236 TRACE(XERR, "unknown directive: "<<d->fName);
237 return -1;
238}
239
240////////////////////////////////////////////////////////////////////////////////
241/// Change group priority. Used when a master pushes a priority to a worker.
242
243void XrdProofdPriorityMgr::SetGroupPriority(const char *grp, int priority)
244{
246 if (g)
247 g->SetPriority((float)priority);
248
249 // Make sure scheduling is ON
251
252 // Done
253 return;
254}
255
256////////////////////////////////////////////////////////////////////////////////
257/// Reset the priority on entries
258
259static int ResetEntryPriority(const char *, XrdProofdSessionEntry *e, void *)
260{
261 if (e) {
262 e->SetPriority();
263 // Check next
264 return 0;
265 }
266
267 // Not enough info: stop
268 return 1;
269}
270
271////////////////////////////////////////////////////////////////////////////////
272/// Run thorugh entries to create the sorted list of active entries
273
274static int CreateActiveList(const char *, XrdProofdSessionEntry *e, void *s)
275{
276 XPDLOC(PMGR, "CreateActiveList")
277
278 XpdCreateActiveList_t *cal = (XpdCreateActiveList_t *)s;
279
280 XrdOucString emsg;
281 if (e && cal) {
282 XrdProofGroupMgr *gm = cal->fGroupMgr;
283 std::list<XrdProofdSessionEntry *> *sorted = cal->fSortedList;
284 if (gm) {
285 XrdProofGroup *g = gm->GetGroup(e->fGroup.c_str());
286 if (g) {
287 float ef = g->FracEff() / g->Active();
288 int nsrv = g->Active(e->fUser.c_str());
289 if (nsrv > 0) {
290 ef /= nsrv;
291 e->fFracEff = ef;
292 bool pushed = 0;
293 std::list<XrdProofdSessionEntry *>::iterator ssvi;
294 for (ssvi = sorted->begin() ; ssvi != sorted->end(); ++ssvi) {
295 if (ef >= (*ssvi)->fFracEff) {
296 sorted->insert(ssvi, e);
297 pushed = 1;
298 break;
299 }
300 }
301 if (!pushed)
302 sorted->push_back(e);
303 // Go to next
304 return 0;
305
306 } else {
307 emsg = "no srv sessions for active client";
308 }
309 } else {
310 emsg = "group not found: "; emsg += e->fGroup.c_str();
311 }
312 } else {
313 emsg = "group manager undefined";
314 }
315 } else {
316 emsg = "input structure or entry undefined";
317 }
318
319 // Some problem
320 if (cal) cal->error = 1;
321 TRACE(XERR, (e ? e->fUser : "---") << ": protocol error: "<<emsg);
322 return 1;
323}
324
325////////////////////////////////////////////////////////////////////////////////
326/// Recalculate nice values taking into account all active users
327/// and their priorities.
328/// The type of sessions considered depend on 'opt':
329/// 0 all active sessions
330/// 1 master sessions
331/// 2 worker sessionsg21
332/// Return 0 on success, -1 otherwise.
333
335{
336 XPDLOC(PMGR, "PriorityMgr::SetNiceValues")
337
338 TRACE(REQ, "------------------- Start ----------------------");
339
340 TRACE(REQ, "opt: "<<opt);
341
342 if (!fMgr->GroupsMgr() || fMgr->GroupsMgr()->Num() <= 1 || !IsSchedOn()) {
343 // Nothing to do
344 TRACE(REQ, "------------------- End ------------------------");
345 return 0;
346 }
347
348 // At least two active session
349 int nact = fSessions.Num();
350 TRACE(DBG, fMgr->GroupsMgr()->Num()<<" groups, " << nact<<" active sessions");
351 if (nact <= 1) {
352 // Restore default values
353 if (nact == 1)
355 // Nothing else to do
356 TRACE(REQ, "------------------- End ------------------------");
357 return 0;
358 }
359
360 XrdSysMutexHelper mtxh(&fMutex);
361
362 // Determine which groups are active and their effective fractions
363 int rc = 0;
364 if ((rc = fMgr->GroupsMgr()->SetEffectiveFractions(IsSchedOn())) != 0) {
365 // Failure
366 TRACE(XERR, "failure from SetEffectiveFractions");
367 rc = -1;
368 }
369
370 // Now create a list of active sessions sorted by decreasing effective fraction
371 TRACE(DBG, "creating a list of active sessions sorted by decreasing effective fraction ");
372 std::list<XrdProofdSessionEntry *> sorted;
373 XpdCreateActiveList_t cal = { fMgr->GroupsMgr(), &sorted, 0 };
374 if (rc == 0)
375 fSessions.Apply(CreateActiveList, (void *)&cal);
376
377 if (!cal.error) {
378 // Notify
379 int i = 0;
380 std::list<XrdProofdSessionEntry *>::iterator ssvi;
381 if (TRACING(HDBG)) {
382 for (ssvi = sorted.begin() ; ssvi != sorted.end(); ++ssvi)
383 TRACE(HDBG, i++ <<" eff: "<< (*ssvi)->fFracEff);
384 }
385
386 TRACE(DBG, "calculating nice values");
387
388 // The first has the max priority
389 ssvi = sorted.begin();
390 float xmax = (*ssvi)->fFracEff;
391 if (xmax > 0.) {
392 // This is for Unix
393 int nice = 20 - fPriorityMax;
394 (*ssvi)->SetPriority(nice);
395 // The others have priorities rescaled wrt their effective fractions
396 ++ssvi;
397 while (ssvi != sorted.end()) {
398 int xpri = (int) ((*ssvi)->fFracEff / xmax * (fPriorityMax - fPriorityMin))
399 + fPriorityMin;
400 nice = 20 - xpri;
401 TRACE(DBG, " --> nice value for client "<< (*ssvi)->fUser<<" is "<<nice);
402 (*ssvi)->SetPriority(nice);
403 ++ssvi;
404 }
405 } else {
406 TRACE(XERR, "negative or null max effective fraction: "<<xmax);
407 rc = -1;
408 }
409 } else {
410 TRACE(XERR, "failure from CreateActiveList");
411 rc = -1;
412 }
413 TRACE(REQ, "------------------- End ------------------------");
414
415 // Done
416 return rc;
417}
418
419////////////////////////////////////////////////////////////////////////////////
420/// Process 'priority' directive
421
422int XrdProofdPriorityMgr::DoDirectivePriority(char *val, XrdOucStream *cfg, bool)
423{
424 if (!val || !cfg)
425 // undefined inputs
426 return -1;
427
428 // Priority change directive: get delta_priority
429 int dp = strtol(val,0,10);
430 XrdProofdPriority *p = new XrdProofdPriority("*", dp);
431 // Check if an 'if' condition is present
432 if ((val = cfg->GetWord()) && !strncmp(val,"if",2)) {
433 if ((val = cfg->GetWord()) && val[0]) {
434 p->fUser = val;
435 }
436 }
437 // Add to the list
438 fPriorities.Rep(p->fUser.c_str(), p);
439 return 0;
440}
441
442////////////////////////////////////////////////////////////////////////////////
443/// Process 'schedopt' directive
444
445int XrdProofdPriorityMgr::DoDirectiveSchedOpt(char *val, XrdOucStream *cfg, bool)
446{
447 XPDLOC(PMGR, "PriorityMgr::DoDirectiveSchedOpt")
448
449 if (!val || !cfg)
450 // undefined inputs
451 return -1;
452
453 int pmin = -1;
454 int pmax = -1;
455 int opt = -1;
456 // Defines scheduling options
457 while (val && val[0]) {
458 XrdOucString o = val;
459 if (o.beginswith("min:")) {
460 // The overall inflating factor
461 o.replace("min:","");
462 pmin = o.atoi();
463 } else if (o.beginswith("max:")) {
464 // The overall inflating factor
465 o.replace("max:","");
466 pmax = o.atoi();
467 } else {
468 if (o == "central")
469 opt = kXPD_sched_central;
470 else if (o == "local")
471 opt = kXPD_sched_local;
472 }
473 // Check deprecated 'if' directive
474 if (fMgr->Host() && cfg)
475 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
476 return 0;
477 // Next
478 val = cfg->GetWord();
479 }
480
481 // Set the values (we need to do it here to avoid setting wrong values
482 // when a non-matching 'if' condition is found)
483 if (pmin > -1)
484 fPriorityMin = (pmin >= 1 && pmin <= 40) ? pmin : fPriorityMin;
485 if (pmax > -1)
486 fPriorityMax = (pmax >= 1 && pmax <= 40) ? pmax : fPriorityMax;
487 if (opt > -1)
488 fSchedOpt = opt;
489
490 // Make sure that min is <= max
492 TRACE(XERR, "inconsistent value for fPriorityMin (> fPriorityMax) ["<<
493 fPriorityMin << ", "<<fPriorityMax<<"] - correcting");
495 }
496
497 return 0;
498}
499
500////////////////////////////////////////////////////////////////////////////////
501/// Remove from the active list the session with ID pid.
502/// Return -ENOENT if not found, or 0.
503
505{
506 XrdOucString key; key += pid;
507 return fSessions.Del(key.c_str());
508}
509
510////////////////////////////////////////////////////////////////////////////////
511/// Add to the active list a session with ID pid. Check for duplications.
512/// Returns 1 if the entry existed already and it has been replaced; or 0.
513
514int XrdProofdPriorityMgr::AddSession(const char *u, const char *g, int pid)
515{
516 int rc = 0;
517 XrdOucString key; key += pid;
518 XrdProofdSessionEntry *oldent = fSessions.Find(key.c_str());
519 if (oldent) {
520 rc = 1;
521 fSessions.Rep(key.c_str(), new XrdProofdSessionEntry(u, g, pid));
522 } else {
523 fSessions.Add(key.c_str(), new XrdProofdSessionEntry(u, g, pid));
524 }
525
526 // Done
527 return rc;
528}
529
530////////////////////////////////////////////////////////////////////////////////
531/// Change priority of process pid belonging to user, if needed.
532/// Return 0 on success, -errno in case of error
533
534int XrdProofdPriorityMgr::SetProcessPriority(int pid, const char *user, int &dp)
535{
536 XPDLOC(PMGR, "PriorityMgr::SetProcessPriority")
537
538 // Change child process priority, if required
539 if (fPriorities.Num() > 0) {
540 XrdProofdPriority *pu = fPriorities.Find(user);
541 if (pu) {
542 dp = pu->fDeltaPriority;
543 // Change the priority
544 errno = 0;
545 int priority = XPPM_NOPRIORITY;
546 if ((priority = getpriority(PRIO_PROCESS, pid)) == -1 && errno != 0) {
547 TRACE(XERR, "getpriority: errno: " << errno);
548 return -errno;
549 }
550 // Set the priority
551 int newp = priority + dp;
552 XrdProofUI ui;
553 XrdProofdAux::GetUserInfo(geteuid(), ui);
554 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
555 if (XpdBadPGuard(pGuard, ui.fUid)) {
556 TRACE(XERR, "could not get privileges");
557 return -1;
558 }
559 TRACE(REQ, "got privileges ");
560 errno = 0;
561 if (setpriority(PRIO_PROCESS, pid, newp) != 0) {
562 TRACE(XERR, "setpriority: errno: " << errno);
563 return ((errno != 0) ? -errno : -1);
564 }
565 if ((getpriority(PRIO_PROCESS, pid)) != newp && errno != 0) {
566 TRACE(XERR, "did not succeed: errno: " << errno);
567 return -errno;
568 }
569 }
570 }
571
572 // We are done
573 return 0;
574}
575
576//
577// Small class to describe an active session
578//
579////////////////////////////////////////////////////////////////////////////////
580/// Constructor
581
582XrdProofdSessionEntry::XrdProofdSessionEntry(const char *u, const char *g, int pid)
583 : fUser(u), fGroup(g), fPid(pid), fFracEff(0.)
584{
585 XPDLOC(PMGR, "XrdProofdSessionEntry")
586
589 errno = 0;
590 int prio = getpriority(PRIO_PROCESS, pid);
591 if (errno != 0) {
592 TRACE(XERR, " getpriority: errno: " << errno);
593 return;
594 }
595 fDefaultPriority = prio;
596}
597
598////////////////////////////////////////////////////////////////////////////////
599/// Destructor
600
602{
604}
605
606////////////////////////////////////////////////////////////////////////////////
607/// Change process priority
608
610{
611 XPDLOC(PMGR, "SessionEntry::SetPriority")
612
613 if (priority != XPPM_NOPRIORITY)
614 priority = fDefaultPriority;
615
616 if (priority != fPriority) {
617 // Set priority to the default value
618 XrdProofUI ui;
619 XrdProofdAux::GetUserInfo(geteuid(), ui);
620 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
621 if (XpdBadPGuard(pGuard, ui.fUid)) {
622 TRACE(XERR, "could not get privileges");
623 return -1;
624 }
625 errno = 0;
626 if (setpriority(PRIO_PROCESS, fPid, priority) != 0) {
627 TRACE(XERR, "setpriority: errno: " << errno);
628 return -1;
629 }
630 fPriority = priority;
631 }
632
633 // Done
634 return 0;
635}
#define d(i)
Definition RSha256.hxx:102
#define g(i)
Definition RSha256.hxx:105
#define e(i)
Definition RSha256.hxx:103
#define TRACE(Flag, Args)
Definition TGHtml.h:120
float xmax
@ kXPD_sched_central
@ kXPD_sched_off
@ kXPD_sched_local
#define XPDFORM
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define XpdBadPGuard(g, u)
static int CreateActiveList(const char *, XrdProofdSessionEntry *e, void *s)
Run thorugh entries to create the sorted list of active entries.
static int DumpPriorityChanges(const char *, XrdProofdPriority *p, void *s)
Reset the priority on entries.
void * XrdProofdPriorityCron(void *p)
This is an endless loop to periodically check the system.
static int ResetEntryPriority(const char *, XrdProofdSessionEntry *e, void *)
Reset the priority on entries.
#define XPPM_NOPRIORITY
#define XPDLOC(d, x)
#define TRACING(x)
#define XPDERR(x)
int Type() const
const char * Buf() const
int Get(int &i)
Get next token and interpret it as an int.
XrdProofGroup * Apply(int(*f)(const char *, XrdProofGroup *, void *), void *arg)
Apply function 'f' to the hash table of groups; 'arg' is passed to 'f' in the last argument.
XrdProofGroup * GetGroup(const char *grp)
Returns the instance of for group 'grp.
int SetEffectiveFractions(bool optprio)
Go through the list of active groups (those having at least a non-idle member) and determine the effe...
float FracEff() const
void SetPriority(float p)
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdSysError * fEDest
XrdProofGroupMgr * GroupsMgr() const
const char * Host() const
bool IsValid() const
int Recv(XpdMsg &msg)
Recv message from the pipe.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
void RegisterDirectives()
Register directives for configuration.
int SetProcessPriority(int pid, const char *usr, int &dp)
Change priority of process pid belonging to user, if needed.
int SetNiceValues(int opt=0)
Recalculate nice values taking into account all active users and their priorities.
XrdProofdPriorityMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
int AddSession(const char *u, const char *g, int pid)
Add to the active list a session with ID pid.
XrdOucHash< XrdProofdSessionEntry > fSessions
void SetGroupPriority(const char *grp, int priority)
Change group priority. Used when a master pushes a priority to a worker.
int DoDirectiveSchedOpt(char *, XrdOucStream *, bool)
Process 'schedopt' directive.
XrdOucHash< XrdProofdPriority > fPriorities
int DoDirectivePriority(char *, XrdOucStream *, bool)
Process 'priority' directive.
int RemoveSession(int pid)
Remove from the active list the session with ID pid.
XrdProofdManager * fMgr
XrdOucString fUser
virtual ~XrdProofdSessionEntry()
Destructor.
XrdProofdSessionEntry(const char *u, const char *g, int pid)
Constructor.
int SetPriority(int priority=XPPM_NOPRIORITY)
Change process priority.