Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
XrdProofdProofServMgr.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis Jan 2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// XrdProofdProofServMgr //
15// //
16// Author: G. Ganis, CERN, 2008 //
17// //
18// Class managing proofserv sessions manager. //
19// //
20//////////////////////////////////////////////////////////////////////////
21#include "XrdProofdPlatform.h"
22
23#include "XpdSysDNS.h"
24#include "XpdSysError.h"
25#include "XpdSysLogger.h"
26
27#include "Xrd/XrdBuffer.hh"
28#include "Xrd/XrdPoll.hh"
29#include "Xrd/XrdScheduler.hh"
30#include "XrdNet/XrdNet.hh"
31#include "XrdOuc/XrdOucRash.hh"
32#include "XrdOuc/XrdOucStream.hh"
33#include "XrdSys/XrdSysPriv.hh"
34#include "XrdSys/XrdSysPlugin.hh"
35#include "XrdProofdClient.h"
36#include "XrdProofdClientMgr.h"
37#include "XrdProofdManager.h"
38#include "XrdProofdNetMgr.h"
41#include "XrdProofdProtocol.h"
42#include "XrdProofGroup.h"
43#include "XrdProofSched.h"
44#include "XrdROOT.h"
45
46#include <grp.h>
47#include <map>
48#include <unistd.h>
49
50// Aux structures for scan through operations
51typedef struct {
52 XrdProofGroupMgr *fGroupMgr;
53 int *fNBroadcast;
54} XpdBroadcastPriority_t;
55typedef struct {
56 XrdProofdManager *fMgr;
57 XrdProofdClient *fClient;
58 FILE *fEnv;
59 bool fExport;
60} XpdWriteEnv_t;
61
62#ifndef PutEnv
63#define PutEnv(x,e) { if (e) { putenv(x); } else { delete[] x; } }
64#endif
65
66// Tracing utilities
67#include "XrdProofdTrace.h"
68
70
71//--------------------------------------------------------------------------
72//
73// XrdProofdProofServCron
74//
75// Function run in separate thread watching changes in session status
76// frequency
77//
78////////////////////////////////////////////////////////////////////////////////
79/// This is an endless loop to check the system periodically or when
80/// triggered via a message in a dedicated pipe
81
83{
84 XPDLOC(SMGR, "ProofServCron")
85
88 XrdProofSched *sched = mc->fProofSched;
89 if (!(mgr)) {
90 TRACE(XERR, "undefined session manager: cannot start");
91 return (void *)0;
92 }
93
94 // Quicj checks for client disconnections: frequency (5 secs) and
95 // flag for disconnections effectively occuring
96 int quickcheckfreq = 5;
97 int clnlostscale = 0;
98
99 // Time of last full sessions check
100 int lastrun = time(0);
101 int lastcheck = lastrun, ckfreq = mgr->CheckFrequency(), waitt = 0;
102 int deltat = ((int)(0.1*ckfreq) >= 1) ? (int)(0.1*ckfreq) : 1;
103 int maxdelay = 5*ckfreq; // Force check after 5 times the check frequency
104 mgr->SetNextSessionsCheck(lastcheck + ckfreq);
105 TRACE(ALL, "next full sessions check in "<<ckfreq<<" secs");
106 while(1) {
107 // We check for client disconnections every 'quickcheckfreq' secs; we do
108 // a full check every mgr->CheckFrequency() secs; we make sure that we
109 // do not pass a negative value (meaning no timeout)
110 waitt = ckfreq - (time(0) - lastcheck);
111 if (waitt > quickcheckfreq || waitt <= 0)
112 waitt = quickcheckfreq;
113 int pollRet = mgr->Pipe()->Poll(waitt);
114
115 if (pollRet > 0) {
116 // Read message
117 XpdMsg msg;
118 int rc = 0;
119 if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
120 TRACE(XERR, "problems receiving message; errno: "<<-rc);
121 continue;
122 }
123 // Parse type
125 // A session has just gone: read process id
126 XrdOucString fpid;
127 if ((rc = msg.Get(fpid)) != 0) {
128 TRACE(XERR, "kSessionRemoval: problems receiving process ID (buf: '"<<
129 msg.Buf()<<"'); errno: "<<-rc);
130 continue;
131 }
132 XrdSysMutexHelper mhp(mgr->Mutex());
133 // Remove it from the hash list
134 mgr->DeleteFromSessions(fpid.c_str());
135 // Move the entry to the terminated sessions area
136 mgr->MvSession(fpid.c_str());
137 // Notify the scheduler too
138 if (sched) {
139 if (sched->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
140 TRACE(XERR, "kSessionRemoval: problem posting the scheduler pipe");
141 }
142 }
143 // Notify action
144 TRACE(REQ, "kSessionRemoval: session: "<<fpid<<
145 " has been removed from the active list");
147 // Obsolete
148 TRACE(XERR, "obsolete type: XrdProofdProofServMgr::kClientDisconnect");
149 } else if (msg.Type() == XrdProofdProofServMgr::kCleanSessions) {
150 // Request for cleanup all sessions of a client (or all clients)
152 XrdOucString usr;
153 rc = msg.Get(usr);
154 int svrtype = kXPD_AnyServer;
155 rc = (rc == 0) ? msg.Get(svrtype) : rc;
156 if (rc != 0) {
157 TRACE(XERR, "kCleanSessions: problems parsing message (buf: '"<<
158 msg.Buf()<<"'); errno: "<<-rc);
159 continue;
160 }
161 // Notify action
162 TRACE(REQ, "kCleanSessions: request for user: '"<<usr<<"', server type: "<<svrtype);
163 // Clean sessions
164 mgr->CleanClientSessions(usr.c_str(), svrtype);
165 // Check if there is any orphalin sessions and clean them up
167 } else if (msg.Type() == XrdProofdProofServMgr::kProcessReq) {
168 // Process request from some client: if we are here it means they can go ahead
169 mgr->ProcessSem()->Post();
170 } else if (msg.Type() == XrdProofdProofServMgr::kChgSessionSt) {
171 // Propagate cluster information to active sessions after one session changed its state
173 } else {
174 TRACE(XERR, "unknown type: "<<msg.Type());
175 continue;
176 }
177 } else {
178
179 // The current time
180 int now = time(0);
181
182 // If there is any activity in mgr->Process() we postpone the checks in 5 secs
184 if (cnt > 0) {
185 if ((now - lastrun) < maxdelay) {
186 // The current time
187 lastcheck = now + 5 - ckfreq;
188 mgr->SetNextSessionsCheck(now + 5);
189 // Notify
190 TRACE(ALL, "postponing sessions check (will retry in 5 secs)");
191 continue;
192 } else {
193 // Max time without checks reached: force a check
194 TRACE(ALL, "Max time without checks reached ("<<maxdelay<<"): force a session check");
195 // Reset the counter
197 }
198 }
199
200 bool full = (now > mgr->NextSessionsCheck() - deltat) ? 1 : 0;
201 if (full) {
202 // Run periodical full checks
203 mgr->CheckActiveSessions();
205 if (clnlostscale <= 0) {
207 clnlostscale = 10;
208 } else {
209 clnlostscale--;
210 }
211 // How many active sessions do we have
212 int cursess = mgr->CurrentSessions(1);
213 TRACE(ALL, cursess << " sessions are currently active");
214 // Remember when ...
215 lastrun = now;
216 lastcheck = now;
217 mgr->SetNextSessionsCheck(lastcheck + mgr->CheckFrequency());
218 // Notify
219 TRACE(ALL, "next sessions check in "<<mgr->CheckFrequency()<<" secs");
220 } else {
221 TRACE(HDBG, "nothing to do; "<<mgr->NextSessionsCheck()-now<<" secs to full check");
222 }
223 }
224 }
225
226 // Should never come here
227 return (void *)0;
228}
229
230//--------------------------------------------------------------------------
231//
232// XrdProofdProofServRecover
233//
234// Function run in a separate thread waiting for session to recover after
235// an abrupt shutdown
236//
237////////////////////////////////////////////////////////////////////////////////
238/// Waiting for session to recover after an abrupt shutdown
239
241{
242 XPDLOC(SMGR, "ProofServRecover")
243
246 if (!(mgr)) {
247 TRACE(XERR, "undefined session manager: cannot start");
248 return (void *)0;
249 }
250
251 // Recover active sessions
252 int rc = mgr->RecoverActiveSessions();
253
254 // Notify end of recovering
255 if (rc > 0) {
256 TRACE(ALL, "timeout recovering sessions: "<<rc<<" sessions not recovered");
257 } else if (rc < 0) {
258 TRACE(XERR, "some problem occured while recovering sessions");
259 } else {
260 TRACE(ALL, "recovering successfully terminated");
261 }
262
263 // Should never come here
264 return (void *)0;
265}
266
267////////////////////////////////////////////////////////////////////////////////
268/// Constructor
269
271 XrdProtocol_Config *pi, XrdSysError *e)
272 : XrdProofdConfig(pi->ConfigFN, e), fProcessSem(0)
273{
274 XPDLOC(SMGR, "XrdProofdProofServMgr")
275
276 fMgr = mgr;
277 fLogger = pi->eDest->logger();
278 fInternalWait = 10;
279 fActiveSessions.clear();
280 fShutdownOpt = 1;
281 fShutdownDelay = 0;
282 fReconnectTime = -1;
283 fReconnectTimeOut = 300;
285 // Init internal counters
286 for (int i = 0; i < PSMMAXCNTS; i++) {
287 fCounters[i] = 0;
288 }
290
291 fSeqSessionN = 0;
292
293 fCredsSaver = 0;
294
295 // Defaults can be changed via 'proofservmgr'
296 fCheckFrequency = 30;
299 fRecoverTimeOut = 10;
300 fCheckLost = 1;
301 fUseFork = 1;
302 fParentExecs = "xproofd,xrootd";
303
304 // Recover-related quantities
305 fRecoverClients = 0;
306 fRecoverDeadline = -1;
307
308 // Init pipe for the poller
309 if (!fPipe.IsValid()) {
310 TRACE(XERR, "unable to generate pipe for the session poller");
311 return;
312 }
313
314 // Configuration directives
316}
317
318////////////////////////////////////////////////////////////////////////////////
319/// Run configuration and parse the entered config directives.
320/// Return 0 on success, -1 on error
321
323{
324 XPDLOC(SMGR, "ProofServMgr::Config")
325
326 XrdSysMutexHelper mhp(fEnvsMutex);
327
328 bool notify = (rcf) ? 0 : 1;
329 if (rcf && ReadFile(0)) {
330 // Cleanup lists of envs and RCs
331 fProofServRCs.clear();
332 fProofServEnvs.clear();
333 // Notify possible new settings
334 notify = 1;
335 }
336
337 // Run first the configurator
338 if (XrdProofdConfig::Config(rcf) != 0) {
339 TRACE(XERR, "problems parsing file ");
340 return -1;
341 }
342
343 XrdOucString msg;
344 msg = (rcf) ? "re-configuring" : "configuring";
345 if (notify) XPDPRT(msg);
346
347 // Notify timeout on internal communications
348 XPDFORM(msg, "setting internal timeout to %d secs", fInternalWait);
349 if (notify) XPDPRT(msg);
350
351 // Shutdown options
352 msg = "client sessions shutdown after disconnection";
353 if (fShutdownOpt > 0) {
354 XPDFORM(msg, "client sessions kept %sfor %d secs after disconnection",
355 (fShutdownOpt == 1) ? "idle " : "", fShutdownDelay);
356 }
357 if (notify) XPDPRT(msg);
358
359 if (!rcf) {
360 // Admin paths
362 fActiAdminPath += "/activesessions";
364 fTermAdminPath += "/terminatedsessions";
365
366 // Make sure they exist
367 XrdProofUI ui;
369 if (XrdProofdAux::AssertDir(fActiAdminPath.c_str(), ui, 1) != 0) {
370 TRACE(XERR, "unable to assert the admin path: "<<fActiAdminPath);
371 fActiAdminPath = "";
372 return -1;
373 }
374 XPDPRT("active sessions admin path set to: "<<fActiAdminPath);
375
376 if (XrdProofdAux::AssertDir(fTermAdminPath.c_str(), ui, 1) != 0) {
377 TRACE(XERR, "unable to assert the admin path "<<fTermAdminPath);
378 fTermAdminPath = "";
379 return -1;
380 }
381 XPDPRT("terminated sessions admin path set to "<<fTermAdminPath);
382 }
383
384 if (notify) {
385 XPDPRT("RC settings: "<< fProofServRCs.size());
386 if (fProofServRCs.size() > 0) {
387 std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
388 for ( ; ircs != fProofServRCs.end(); ++ircs) { (*ircs).Print("rc"); }
389 }
390 XPDPRT("ENV settings: "<< fProofServEnvs.size());
391 if (fProofServEnvs.size() > 0) {
392 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
393 for ( ; ienvs != fProofServEnvs.end(); ++ienvs) { (*ienvs).Print("env"); }
394 }
395 }
396
397 // Notify sessions startup technology
398 XPDFORM(msg, "using %s to start proofserv sessions", fUseFork ? "fork()" : "system()");
399 if (notify) XPDPRT(msg);
400
401 if (!rcf) {
402 // Try to recover active session previously started
403 int nr = -1;
404 if ((nr = PrepareSessionRecovering()) < 0) {
405 TRACE(XERR, "problems trying to recover active sessions");
406 } else if (nr > 0) {
407 XPDFORM(msg, "%d active sessions have been recovered", nr);
408 XPDPRT(msg);
409 }
410
411 // Start cron thread
412 pthread_t tid;
413 // Fill manager pointers structure
416 if (XrdSysThread::Run(&tid, XrdProofdProofServCron,
417 (void *)&fManagerCron, 0, "ProofServMgr cron thread") != 0) {
418 TRACE(XERR, "could not start cron thread");
419 return 0;
420 }
421 XPDPRT("cron thread started");
422 }
423
424 // Done
425 return 0;
426}
427
428////////////////////////////////////////////////////////////////////////////////
429/// Add new active session
430
432{
433 XPDLOC(SMGR, "ProofServMgr::AddSession")
434
435 TRACE(REQ, "adding new active session ...");
436
437 // Check inputs
438 if (!s || !p->Client()) {
439 TRACE(XERR,"invalid inputs: "<<(s ? "" : "s, ") <<", "<< (p->Client() ? "" : "p->Client()"));
440 return -1;
441 }
442 XrdProofdClient *c = p->Client();
443
444 // Path
445 XrdOucString path;
446 XPDFORM(path, "%s/%s.%s.%d", fActiAdminPath.c_str(), c->User(), c->Group(), s->SrvPID());
447
448 // Save session info to file
449 XrdProofSessionInfo info(c, s);
450 int rc = info.SaveToFile(path.c_str());
451
452 return rc;
453}
454
455////////////////////////////////////////////////////////////////////////////////
456/// Checks is fpid is the path of a session UNIX socket
457/// Returns TRUE is yes; cleans the socket if the session is gone.
458
460{
461 XPDLOC(SMGR, "ProofServMgr::IsSessionSocket")
462
463 TRACE(REQ, "checking "<<fpid<<" ...");
464
465 // Check inputs
466 if (!fpid || strlen(fpid) <= 0) {
467 TRACE(XERR, "invalid input: "<<(fpid ? fpid : "<nul>"));
468 return 0;
469 }
470
471 // Paths
472 XrdOucString spath(fpid);
473 if (!spath.endswith(".sock")) return 0;
474 if (!spath.beginswith(fActiAdminPath.c_str())) {
475 // We are given a partial path: create full paths
476 XPDFORM(spath, "%s/%s", fActiAdminPath.c_str(), fpid);
477 }
478 XrdOucString apath = spath;
479 apath.replace(".sock", "");
480
481 // Check the admin path
482 struct stat st;
483 if (stat(apath.c_str(), &st) != 0 && (errno == ENOENT)) {
484 // Remove the socket path if not during creation
485 if (CheckCounter(kCreateCnt) <= 0) {
486 unlink(spath.c_str());
487 TRACE(REQ, "missing admin path: removing "<<spath<<" ...");
488 }
489 }
490
491 // Done
492 return 1;
493}
494
495////////////////////////////////////////////////////////////////////////////////
496/// Move session file from the active to the terminated areas
497
499{
500 XPDLOC(SMGR, "ProofServMgr::MvSession")
501
502 TRACE(REQ, "moving "<<fpid<<" ...");
503
504 // Check inputs
505 if (!fpid || strlen(fpid) <= 0) {
506 TRACE(XERR, "invalid input: "<<(fpid ? fpid : "<nul>"));
507 return -1;
508 }
509
510 // Paths
511 XrdOucString opath(fpid), npath;
512 if (!opath.beginswith(fActiAdminPath.c_str())) {
513 // We are given a partial path: create full paths
514 XPDFORM(opath, "%s/%s", fActiAdminPath.c_str(), fpid);
515 opath.replace(".status", "");
516 } else {
517 // Full path: just create the new path
518 opath.replace(".status", "");
519 }
520 // The target path
521 npath = opath;
522 npath.replace(fActiAdminPath.c_str(), fTermAdminPath.c_str());
523
524 // Remove the socket path
525 XrdOucString spath = opath;
526 spath += ".sock";
527 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
528 TRACE(XERR, "problems removing the UNIX socket path: "<<spath<<"; errno: "<<errno);
529 spath.replace(".sock", ".status");
530 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
531 TRACE(XERR, "problems removing the status file: "<<spath<<"; errno: "<<errno);
532
533 // Move the file
534 errno = 0;
535 int rc = 0;
536 if ((rc = rename(opath.c_str(), npath.c_str())) == 0 || (errno == ENOENT)) {
537 if (!rc)
538 // Record the time when we did this
539 TouchSession(fpid, npath.c_str());
540 return 0;
541 }
542
543 TRACE(XERR, "session pid file cannot be moved: "<<opath<<
544 "; target file: "<<npath<<"; errno: "<<errno);
545 return -1;
546}
547
548////////////////////////////////////////////////////////////////////////////////
549/// Remove session file from the terminated sessions area
550
552{
553 XPDLOC(SMGR, "ProofServMgr::RmSession")
554
555 TRACE(REQ, "removing "<<fpid<<" ...");
556
557 // Check inputs
558 if (!fpid || strlen(fpid) <= 0) {
559 TRACE(XERR, "invalid input: "<< (fpid ? fpid : "<nul>"));
560 return -1;
561 }
562
563 // Path
564 XrdOucString path;
565 XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), fpid);
566
567 // remove the file
568 if (unlink(path.c_str()) == 0)
569 return 0;
570
571 TRACE(XERR, "session pid file cannot be unlinked: "<<
572 path<<"; error: "<<errno);
573 return -1;
574}
575
576////////////////////////////////////////////////////////////////////////////////
577/// Update the access time for the session pid file to the current time
578
579int XrdProofdProofServMgr::TouchSession(const char *fpid, const char *fpath)
580{
581 XPDLOC(SMGR, "ProofServMgr::TouchSession")
582
583 TRACE(REQ, "touching "<<(fpid ? fpid : "<nul>")<<", "<<(fpath ? fpath : "<nul>")<<" ...");
584
585 // Check inputs
586 if (!fpid || strlen(fpid) <= 0) {
587 TRACE(XERR, "invalid input: "<<(fpid ? fpid : "<nul>"));
588 return -1;
589 }
590
591 // Path
592 XrdOucString path(fpath);
593 if (!fpath || !fpath[0])
594 XPDFORM(path, "%s/%s.status", fActiAdminPath.c_str(), fpid);
595
596 // Update file time stamps
597 if (utime(path.c_str(), 0) == 0)
598 return 0;
599
600 TRACE(XERR, "time stamps for session pid file cannot be updated: "<<
601 path<<"; error: "<<errno);
602 return -1;
603}
604
605////////////////////////////////////////////////////////////////////////////////
606/// Check if the session is alive, i.e. if it has recently touched its admin file.
607/// Return 0 if alive, 1 if not-responding, -1 in case of error.
608/// The timeout for verification is 'to' if positive, else fVerifyTimeOut;
609/// the admin file is looked under 'fpath' if defined, else fActiAdminPath.
610
612 int to, const char *fpath)
613{
614 XPDLOC(SMGR, "ProofServMgr::VerifySession")
615
616 // Check inputs
617 if (!fpid || strlen(fpid) <= 0) {
618 TRACE(XERR, "invalid input: "<<(fpid ? fpid : "<nul>"));
619 return -1;
620 }
621
622 // Path
623 XrdOucString path;
624 if (fpath && strlen(fpath) > 0)
625 XPDFORM(path, "%s/%s", fpath, fpid);
626 else
627 XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), fpid);
628
629 // Check first the new file but also the old one, for backward compatibility
630 int deltat = -1;
631 bool checkmore = 1;
632 while (checkmore) {
633 // Current settings
634 struct stat st;
635 if (stat(path.c_str(), &st)) {
636 TRACE(XERR, "session status file cannot be stat'ed: "<<
637 path<<"; error: "<<errno);
638 return -1;
639 }
640 // Check times
641 int xto = (to > 0) ? to : fVerifyTimeOut;
642 deltat = time(0) - st.st_mtime;
643 if (deltat > xto) {
644 if (path.endswith(".status")) {
645 // Check the old one too
646 path.erase(path.rfind(".status"));
647 } else {
648 // Dead
649 TRACE(DBG, "admin path for session "<<fpid<<" hase not been touched"
650 " since at least "<< xto <<" secs");
651 return 1;
652 }
653 } else {
654 // We are done
655 checkmore = 0;
656 }
657 }
658
659 // Alive
660 TRACE(DBG, "admin path for session "<<fpid<<" was touched " <<
661 deltat <<" secs ago");
662 return 0;
663}
664
665////////////////////////////////////////////////////////////////////////////////
666/// Delete from the hash list the session with ID pid.
667/// Return -ENOENT if not found, or 0.
668
670{
671 XPDLOC(SMGR, "ProofServMgr::DeleteFromSessions")
672
673 TRACE(REQ, "session: "<<fpid);
674
675 // Check inputs
676 if (!fpid || strlen(fpid) <= 0) {
677 TRACE(XERR, "invalid input: "<<(fpid ? fpid : "<nul>"));
678 return -1;
679 }
680
681 XrdOucString key = fpid;
682 key.replace(".status", "");
683 key.erase(0, key.rfind('.') + 1);
684 XrdProofdProofServ *xps = 0;
685 { XrdSysMutexHelper mhp(fMutex); xps = fSessions.Find(key.c_str()); }
686 if (xps) {
687 // Tell other attached clients, if any, that this session is gone
688 XrdOucString msg;
689 XPDFORM(msg, "session: %s terminated by peer", fpid);
690 TRACE(DBG, msg);
691 // Reset this instance
692 int tp = xps->Reset(msg.c_str(), kXPD_wrkmortem);
693 // Update counters and lists
694 XrdSysMutexHelper mhp(fMutex);
695 if (tp == 1) fCurrentSessions--;
696 // remove from the list of active sessions
697 fActiveSessions.remove(xps);
698 }
699 int rc = -1;
700 { XrdSysMutexHelper mhp(fMutex); rc = fSessions.Del(key.c_str()); }
701 return rc;
702}
703
704////////////////////////////////////////////////////////////////////////////////
705/// Go through the active sessions admin path and prepare reconnection of those
706/// still alive.
707/// Called at start-up.
708
710{
711 XPDLOC(SMGR, "ProofServMgr::PrepareSessionRecovering")
712
713 // Open dir
714 DIR *dir = opendir(fActiAdminPath.c_str());
715 if (!dir) {
716 TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
717 return -1;
718 }
719 TRACE(REQ, "preparing recovering of active sessions ...");
720
721 // Scan the active sessions admin path
722 fRecoverClients = new std::list<XpdClientSessions *>;
723 struct dirent *ent = 0;
724 while ((ent = (struct dirent *)readdir(dir))) {
725 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
726 // Get the session instance (skip non-digital entries)
727 XrdOucString rest, a;
728 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
729 if (!XPD_LONGOK(pid) || pid <= 0) continue;
730 if (a.length() > 0) continue;
731 bool rmsession = 1;
732 // Check if the process is still alive
733 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
734 if (ResolveSession(ent->d_name) == 0) {
735 TRACE(DBG, "found active session: "<<pid);
736 rmsession = 0;
737 }
738 }
739 // Remove the session, if needed
740 if (rmsession)
741 MvSession(ent->d_name);
742 }
743 // Close the directory
744 closedir(dir);
745
746 // Start the recovering thread, if needed
747 int nrc = 0;
748 { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
749 if (nrc > 0) {
750 // Start recovering thread
751 pthread_t tid;
752 // Fill manager pointers structure
756 if (XrdSysThread::Run(&tid, XrdProofdProofServRecover, (void *)&fManagerCron,
757 0, "ProofServMgr session recover thread") != 0) {
758 TRACE(XERR, "could not start session recover thread");
759 return 0;
760 }
761 XPDPRT("session recover thread started");
762 } else {
763 // End reconnect state if there is nothing to reconnect
764 if (fMgr->ClientMgr() && fMgr->ClientMgr()->GetNClients() <= 0)
766 }
767
768 // Done
769 return 0;
770}
771
772
773////////////////////////////////////////////////////////////////////////////////
774/// Accept connections from sessions still alive. This is run in a dedicated
775/// thread.
776/// Returns -1 in case of failure, 0 if all alive sessions reconnected or the
777/// numer of sessions not reconnected if the timeout (fRecoverTimeOut per client)
778/// expired.
779
781{
782 XPDLOC(SMGR, "ProofServMgr::RecoverActiveSessions")
783
784 int rc = 0;
785
786 if (!fRecoverClients) {
787 // Invalid input
788 TRACE(XERR, "recovering clients list undefined");
789 return -1;
790 }
791
792 int nrc = 0;
793 { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
794 TRACE(REQ, "start recovering of "<<nrc<<" clients");
795
796 // Recovering deadline
797 { XrdSysMutexHelper mhp(fRecoverMutex);
798 fRecoverDeadline = time(0) + fRecoverTimeOut * nrc; }
799
800 // Respect the deadline
801 int nr = 0;
802 XpdClientSessions *cls = 0;
803 bool go = true;
804 while (go) {
805
806 // Pickup the first one in the list
807 { XrdSysMutexHelper mhp(fRecoverMutex); cls = fRecoverClients->front(); }
808 if (cls) {
810 nr += Recover(cls);
811
812 // If all client sessions reconnected remove the client from the list
813 { XrdSysMutexHelper mhp(cls->fMutex);
814 if (cls->fProofServs.size() <= 0) {
815 XrdSysMutexHelper mhpr(fRecoverMutex);
816 fRecoverClients->remove(cls);
817 // We may be over
818 if ((nrc = fRecoverClients->size()) <= 0)
819 break;
820 }
821 }
822 }
823 TRACE(REQ, nrc<<" clients still to recover");
824
825 // Check the deadline
826 { XrdSysMutexHelper mhp(fRecoverMutex);
827 go = (time(0) < fRecoverDeadline) ? true : false; }
828 }
829 // End reconnect state
831
832 // If we reached the deadline, calculate the number of sessions not reconnected
833 rc = 0;
834 { XrdSysMutexHelper mhp(fRecoverMutex);
835 if (fRecoverClients->size() > 0) {
836 std::list<XpdClientSessions* >::iterator ii = fRecoverClients->begin();
837 for (; ii != fRecoverClients->end(); ++ii) {
838 rc += (*ii)->fProofServs.size();
839 }
840 }
841 }
842
843 // Delete the recovering clients list
844 { XrdSysMutexHelper mhp(fRecoverMutex);
845 fRecoverClients->clear();
846 delete fRecoverClients;
847 fRecoverClients = 0;
848 fRecoverDeadline = -1;
849 }
850
851 // Done
852 return rc;
853}
854
855////////////////////////////////////////////////////////////////////////////////
856/// Returns true (an the recovering deadline) if the client has sessions in
857/// recovering state; returns false otherwise.
858/// Called during for attach requests.
859
860bool XrdProofdProofServMgr::IsClientRecovering(const char *usr, const char *grp,
861 int &deadline)
862{
863 XPDLOC(SMGR, "ProofServMgr::IsClientRecovering")
864
865 if (!usr || !grp) {
866 TRACE(XERR, "invalid inputs: usr: "<<(usr ? usr : "")<<", grp:"<<(grp ? grp : "")<<" ...");
867 return false;
868 }
869
870 deadline = -1;
871 int rc = false;
872 { XrdSysMutexHelper mhp(fRecoverMutex);
873 if (fRecoverClients && fRecoverClients->size() > 0) {
874 std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
875 for (; ii != fRecoverClients->end(); ++ii) {
876 if ((*ii)->fClient && (*ii)->fClient->Match(usr, grp)) {
877 rc = true;
878 deadline = fRecoverDeadline;
879 break;
880 }
881 }
882 }
883 }
884 TRACE(DBG, "checking usr: "<<usr<<", grp:"<<grp<<" ... recovering? "<<
885 rc<<", until: "<<deadline);
886
887 // Done
888 return rc;
889}
890
891////////////////////////////////////////////////////////////////////////////////
892/// Go through the active sessions admin path and make sure sessions are alive.
893/// If 'verify' is true also ask the session to proof that they are alive
894/// via asynchronous ping (the result will be done at next check).
895/// Move those not responding in the terminated sessions admin path.
896
898{
899 XPDLOC(SMGR, "ProofServMgr::CheckActiveSessions")
900
901 TRACE(REQ, "checking active sessions ...");
902
903 // Open dir
904 DIR *dir = opendir(fActiAdminPath.c_str());
905 if (!dir) {
906 TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
907 return -1;
908 }
909
910 // Scan the active sessions admin path
911 struct dirent *ent = 0;
912 while ((ent = (struct dirent *)readdir(dir))) {
913 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
914 // If a socket path, make sure that the associated session still exists
915 // and go to the next
916 if (strstr(ent->d_name, ".sock") && IsSessionSocket(ent->d_name)) continue;
917 // Get the session instance (skip non-digital entries)
918 XrdOucString rest, key, after;
919 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, after);
920 // If not a status path, go to the next
921 if (after != "status") continue;
922 // If not a good pid
923 if (!XPD_LONGOK(pid) || pid <= 0) continue;
924 key += pid;
925 //
926 XrdProofdProofServ *xps = 0;
927 { XrdSysMutexHelper mhp(fMutex);
928 xps = fSessions.Find(key.c_str());
929 }
930
931 bool sessionalive = (VerifySession(ent->d_name) == 0) ? 1 : 0;
932 bool rmsession = 0;
933 if (xps) {
934 if (!xps->IsValid() || !sessionalive) rmsession = 1;
935 } else {
936 // Session not yet registered, possibly starting
937 // Skips checks the admin file verification was OK
938 if (sessionalive) continue;
939 rmsession = 1;
940 }
941
942 // For backward compatibility we need to check the session version
943 bool oldvers = (xps && xps->ROOT() && xps->ROOT()->SrvProtVers() >= 18) ? 0 : 1;
944
945 // If somebody is interested in this session, we give them some
946 // more time by skipping the connected clients check this time
947 int nc = -1;
948 if (!rmsession)
949 rmsession = xps->CheckSession(oldvers, IsReconnecting(),
951
952 // Verify the session: this just sends a request to the session
953 // to touch the session file; all this will be done asynchronously;
954 // the result will be checked next time.
955 // We do not want further propagation at this stage.
956 if (!rmsession && verify && !oldvers) {
957 if (xps->VerifyProofServ(0) != 0) {
958 // This means that the connection is already gone
959 rmsession = 1;
960 }
961 }
962 TRACE(REQ, "session: "<<ent->d_name<<"; nc: "<<nc<<"; rm: "<<rmsession);
963 // Remove the session, if needed
964 if (rmsession)
965 MvSession(ent->d_name);
966 }
967 // Close the directory
968 closedir(dir);
969
970 // Done
971 return 0;
972}
973
974////////////////////////////////////////////////////////////////////////////////
975/// Go through the terminated sessions admin path and make sure sessions they
976/// are gone.
977/// Hard-kill those still alive.
978
980{
981 XPDLOC(SMGR, "ProofServMgr::CheckTerminatedSessions")
982
983 TRACE(REQ, "checking terminated sessions ...");
984
985 // Open dir
986 DIR *dir = opendir(fTermAdminPath.c_str());
987 if (!dir) {
988 TRACE(XERR, "cannot open dir "<<fTermAdminPath<<" ; error: "<<errno);
989 return -1;
990 }
991
992 // Scan the terminated sessions admin path
993 int now = -1;
994 struct dirent *ent = 0;
995 while ((ent = (struct dirent *)readdir(dir))) {
996 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
997 // Get the session instance (skip non-digital entries)
998 XrdOucString rest, a;
999 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
1000 if (!XPD_LONGOK(pid) || pid <= 0) continue;
1001
1002 // Current time
1003 now = (now > 0) ? now : time(0);
1004
1005 // Full path
1006 XrdOucString path;
1007 XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), ent->d_name);
1008
1009 // Check termination time
1010 struct stat st;
1011 int rcst = stat(path.c_str(), &st);
1012 TRACE(DBG, pid<<": rcst: "<<rcst<<", now - mtime: "<<now - st.st_mtime<<" secs")
1013 if ((now - st.st_mtime) > fTerminationTimeOut || rcst != 0) {
1014 // Check if the process is still alive
1015 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1016 // Send again an hard-kill signal
1017 XrdProofSessionInfo info(path.c_str());
1018 XrdProofUI ui;
1019 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
1021 } else {
1022 // Delete the entry
1023 RmSession(ent->d_name);
1024 }
1025 }
1026 }
1027 // Close the directory
1028 closedir(dir);
1029
1030 // Done
1031 return 0;
1032}
1033
1034////////////////////////////////////////////////////////////////////////////////
1035/// Go through the sessions admin path and clean all sessions belonging to 'usr'.
1036/// Move those not responding in the terminated sessions admin path.
1037
1038int XrdProofdProofServMgr::CleanClientSessions(const char *usr, int srvtype)
1039{
1040 XPDLOC(SMGR, "ProofServMgr::CleanClientSessions")
1041
1042 TRACE(REQ, "cleaning "<<usr<<" ...");
1043
1044 // Check which client
1045 bool all = (!usr || strlen(usr) <= 0 || !strcmp(usr, "all")) ? 1 : 0;
1046
1047 // Get user info
1048 XrdProofUI ui;
1049 if (!all)
1051 XrdOucString path, rest, key, a;
1052
1053 // We need lock to avoid session actions request while we are doing this
1054 XrdSysRecMutex *mtx = 0;
1055 if (all) {
1056 // Lock us all
1057 mtx = &fMutex;
1058 } else {
1059 // Lock the client
1061 if (c) mtx = c->Mutex();
1062 }
1063
1064 std::list<int> tobedel;
1065 { XrdSysMutexHelper mtxh(mtx);
1066
1067 // Check the terminated session dir first
1068 DIR *dir = opendir(fTermAdminPath.c_str());
1069 if (!dir) {
1070 TRACE(XERR, "cannot open dir "<<fTermAdminPath<<" ; error: "<<errno);
1071 } else {
1072 // Go trough
1073 struct dirent *ent = 0;
1074 while ((ent = (struct dirent *)readdir(dir))) {
1075 // Skip basic entries
1076 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
1077 // Get the session instance
1078 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
1079 if (!XPD_LONGOK(pid) || pid <= 0) continue;
1080 // Read info from file and check that we are interested in this session
1081 XPDFORM(path, "%s/%s", fTermAdminPath.c_str(), ent->d_name);
1082 XrdProofSessionInfo info(path.c_str());
1083 // Check user
1084 if (!all && info.fUser != usr) continue;
1085 // Check server type
1086 if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype) continue;
1087 // Refresh user info, if needed
1088 if (all)
1089 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
1090 // Check if the process is still alive
1091 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1092 // Send a hard-kill signal
1094 } else {
1095 // Delete the entry
1096 RmSession(ent->d_name);
1097 }
1098 }
1099 // Close the directory
1100 closedir(dir);
1101 }
1102
1103 // Check the active session dir now
1104 dir = opendir(fActiAdminPath.c_str());
1105 if (!dir) {
1106 TRACE(XERR, "cannot open dir "<<fActiAdminPath<<" ; error: "<<errno);
1107 return -1;
1108 }
1109
1110 // Scan the active sessions admin path
1111 struct dirent *ent = 0;
1112 while ((ent = (struct dirent *)readdir(dir))) {
1113 // Skip basic entries
1114 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
1115 // Get the session instance
1116 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
1117 if (a == "status") continue;
1118 if (!XPD_LONGOK(pid) || pid <= 0) continue;
1119 // Read info from file and check that we are interested in this session
1120 XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), ent->d_name);
1121 XrdProofSessionInfo info(path.c_str());
1122 if (!all && info.fUser != usr) continue;
1123 // Check server type
1124 if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype) continue;
1125 // Refresh user info, if needed
1126 if (all)
1127 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
1128 // Check if the process is still alive
1129 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1130 // We will remove this later
1131 tobedel.push_back(pid);
1132 // Send a termination signal
1134 }
1135 // Flag as terminated
1136 MvSession(ent->d_name);
1137 }
1138 // Close the directory
1139 closedir(dir);
1140 }
1141
1142 // Cleanup fSessions
1143 std::list<int>::iterator ii = tobedel.begin();
1144 while (ii != tobedel.end()) {
1145 XPDFORM(key, "%d", *ii);
1146 XrdSysMutexHelper mhp(fMutex);
1147 XrdProofdProofServ *xps = fSessions.Find(key.c_str());
1148 bool active = 0;
1149 std::list<XrdProofdProofServ *>::iterator ixps = fActiveSessions.begin();
1150 while (ixps != fActiveSessions.end()) {
1151 if (*ixps == xps) {
1152 active = 1;
1153 break;
1154 }
1155 ++ixps;
1156 }
1157 if (!active) fSessions.Del(key.c_str());
1158 ++ii;
1159 }
1160
1161 // Done
1162 return 0;
1163}
1164
1165////////////////////////////////////////////////////////////////////////////////
1166/// Register directives for configuration
1167
1169{
1170 // Register special config directives
1171 Register("proofservmgr", new XrdProofdDirective("proofservmgr", this, &DoDirectiveClass));
1172 Register("putenv", new XrdProofdDirective("putenv", this, &DoDirectiveClass));
1173 Register("putrc", new XrdProofdDirective("putrc", this, &DoDirectiveClass));
1174 Register("shutdown", new XrdProofdDirective("shutdown", this, &DoDirectiveClass));
1175 // Register config directives for ints
1176 Register("intwait",
1177 new XrdProofdDirective("intwait", (void *)&fInternalWait, &DoDirectiveInt));
1178 Register("reconnto",
1179 new XrdProofdDirective("reconnto", (void *)&fReconnectTimeOut, &DoDirectiveInt));
1180 // Register config directives for strings
1181 Register("proofplugin",
1182 new XrdProofdDirective("proofplugin", (void *)&fProofPlugin, &DoDirectiveString));
1183 Register("proofservparents",
1184 new XrdProofdDirective("proofservparents", (void *)&fParentExecs, &DoDirectiveString));
1185}
1186
1187////////////////////////////////////////////////////////////////////////////////
1188/// Update the priorities of the active sessions.
1189
1191 char *val, XrdOucStream *cfg, bool rcf)
1192{
1193 XPDLOC(SMGR, "ProofServMgr::DoDirective")
1194
1195 if (!d)
1196 // undefined inputs
1197 return -1;
1198
1199 if (d->fName == "proofservmgr") {
1200 return DoDirectiveProofServMgr(val, cfg, rcf);
1201 } else if (d->fName == "putenv") {
1202 return DoDirectivePutEnv(val, cfg, rcf);
1203 } else if (d->fName == "putrc") {
1204 return DoDirectivePutRc(val, cfg, rcf);
1205 } else if (d->fName == "shutdown") {
1206 return DoDirectiveShutdown(val, cfg, rcf);
1207 }
1208 TRACE(XERR,"unknown directive: "<<d->fName);
1209 return -1;
1210}
1211
1212////////////////////////////////////////////////////////////////////////////////
1213/// Process 'proofswrvmgr' directive
1214/// eg: xpd.proofswrvmgr checkfq:120 termto:100 verifyto:5 recoverto:20
1215
1216int XrdProofdProofServMgr::DoDirectiveProofServMgr(char *val, XrdOucStream *cfg, bool rcf)
1217{
1218 XPDLOC(SMGR, "ProofServMgr::DoDirectiveProofServMgr")
1219
1220 if (!val || !cfg)
1221 // undefined inputs
1222 return -1;
1223
1224 if (rcf)
1225 // Do not reconfigure this (need to check what happens with the cron thread ...
1226 return 0;
1227
1228 int checkfq = -1;
1229 int termto = -1;
1230 int verifyto = -1;
1231 int recoverto = -1;
1232 int checklost = 0;
1233 int usefork = 0;
1234
1235 while (val) {
1236 XrdOucString tok(val);
1237 if (tok.beginswith("checkfq:")) {
1238 tok.replace("checkfq:", "");
1239 checkfq = strtol(tok.c_str(), 0, 10);
1240 } else if (tok.beginswith("termto:")) {
1241 tok.replace("termto:", "");
1242 termto = strtol(tok.c_str(), 0, 10);
1243 } else if (tok.beginswith("verifyto:")) {
1244 tok.replace("verifyto:", "");
1245 verifyto = strtol(tok.c_str(), 0, 10);
1246 } else if (tok.beginswith("recoverto:")) {
1247 tok.replace("recoverto:", "");
1248 recoverto = strtol(tok.c_str(), 0, 10);
1249 } else if (tok.beginswith("checklost:")) {
1250 tok.replace("checklost:", "");
1251 checklost = strtol(tok.c_str(), 0, 10);
1252 } else if (tok.beginswith("usefork:")) {
1253 tok.replace("usefork:", "");
1254 usefork = strtol(tok.c_str(), 0, 10);
1255 }
1256 // Get next
1257 val = cfg->GetWord();
1258 }
1259
1260 // Check deprecated 'if' directive
1261 if (fMgr->Host() && cfg)
1262 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
1263 return 0;
1264
1265 // Set the values
1266 fCheckFrequency = (XPD_LONGOK(checkfq) && checkfq > 0) ? checkfq : fCheckFrequency;
1267 fTerminationTimeOut = (XPD_LONGOK(termto) && termto > 0) ? termto : fTerminationTimeOut;
1268 fVerifyTimeOut = (XPD_LONGOK(verifyto) && (verifyto > fCheckFrequency + 1))
1269 ? verifyto : fVerifyTimeOut;
1270 fRecoverTimeOut = (XPD_LONGOK(recoverto) && recoverto > 0) ? recoverto : fRecoverTimeOut;
1271 if (XPD_LONGOK(checklost)) fCheckLost = (checklost != 0) ? 1 : 0;
1272 if (XPD_LONGOK(usefork)) fUseFork = (usefork != 0) ? 1 : 0;
1273
1274 XrdOucString msg;
1275 XPDFORM(msg, "checkfq: %d s, termto: %d s, verifyto: %d s, recoverto: %d s, checklost: %d, usefork: %d",
1277 TRACE(ALL, msg);
1278
1279 return 0;
1280}
1281
1282////////////////////////////////////////////////////////////////////////////////
1283/// Process 'putenv' directives
1284
1285int XrdProofdProofServMgr::DoDirectivePutEnv(char *val, XrdOucStream *cfg, bool)
1286{
1287 if (!val)
1288 // undefined inputs
1289 return -1;
1290
1291 // Parse env variables to be passed to 'proofserv':
1292 XrdOucString users, groups, rcval, rcnam;
1293 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1294 bool hex = 0;
1295 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1296
1297 // Adjust name of the variable
1298 int iequ = rcnam.find('=');
1299 if (iequ == STR_NPOS) return -1;
1300 rcnam.erase(iequ);
1301
1302 // Fill entries
1303 FillEnvList(&fProofServEnvs, rcnam.c_str(), rcval.c_str(),
1304 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1305
1306 return 0;
1307}
1308
1309////////////////////////////////////////////////////////////////////////////////
1310/// Process 'putrc' directives.
1311/// Syntax:
1312/// xpd.putrc [u:<usr1>,<usr2>,...] [g:<grp1>,<grp2>,...]
1313/// [s:[svnmin][-][svnmax]] [v:[vermin][-][vermax]] RcVarName RcVarValue
1314/// NB: <usr1>,... and <grp1>,... may contain the wild card '*'
1315
1316int XrdProofdProofServMgr::DoDirectivePutRc(char *val, XrdOucStream *cfg, bool)
1317{
1318 if (!val || !cfg)
1319 // undefined inputs
1320 return -1;
1321
1322 // Parse rootrc variables to be passed to 'proofserv':
1323 XrdOucString users, groups, rcval, rcnam;
1324 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1325 bool hex = 0;
1326 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1327
1328 // Fill entries
1329 FillEnvList(&fProofServRCs, rcnam.c_str(), rcval.c_str(),
1330 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1331
1332 return 0;
1333}
1334
1335////////////////////////////////////////////////////////////////////////////////
1336/// Extract env information from the stream 'cfg'
1337
1338void XrdProofdProofServMgr::ExtractEnv(char *val, XrdOucStream *cfg,
1339 XrdOucString &users, XrdOucString &groups,
1340 XrdOucString &rcval, XrdOucString &rcnam,
1341 int &smi, int &smx, int &vmi, int &vmx, bool &hex)
1342{
1343 XrdOucString ssvn, sver;
1344 int idash = -1;
1345 while (val && val[0]) {
1346 if (!strncmp(val, "u:", 2)) {
1347 users = val;
1348 users.erase(0,2);
1349 } else if (!strncmp(val, "g:", 2)) {
1350 groups = val;
1351 groups.erase(0,2);
1352 } else if (!strncmp(val, "s:", 2)) {
1353 ssvn = val;
1354 ssvn.erase(0,2);
1355 idash = ssvn.find('-');
1356 if (idash != STR_NPOS) {
1357 if (ssvn.isdigit(0, idash-1)) smi = ssvn.atoi(0, idash-1);
1358 if (ssvn.isdigit(idash+1)) smx = ssvn.atoi(idash+1);
1359 } else {
1360 if (ssvn.isdigit()) smi = ssvn.atoi();
1361 }
1362 } else if (!strncmp(val, "v:", 2)) {
1363 sver = val;
1364 sver.erase(0,2);
1365 hex = 0;
1366 if (sver.beginswith('x')) {
1367 hex = 1;
1368 sver.erase(0,1);
1369 }
1370 idash = sver.find('-');
1371 if (idash != STR_NPOS) {
1372 if (sver.isdigit(0, idash-1)) vmi = sver.atoi(0, idash-1);
1373 if (sver.isdigit(idash+1)) vmx = sver.atoi(idash+1);
1374 } else {
1375 if (sver.isdigit()) vmi = sver.atoi();
1376 }
1377 } else {
1378 if (rcval.length() > 0) {
1379 rcval += ' ';
1380 } else {
1381 rcnam = val;
1382 }
1383 rcval += val;
1384 }
1385 val = cfg->GetWord();
1386 }
1387 // Done
1388 return;
1389}
1390
1391////////////////////////////////////////////////////////////////////////////////
1392/// Fill env entry(ies) in the relevant list
1393
1394void XrdProofdProofServMgr::FillEnvList(std::list<XpdEnv> *el, const char *nam, const char *val,
1395 const char *usrs, const char *grps,
1396 int smi, int smx, int vmi, int vmx, bool hex)
1397{
1398 XPDLOC(SMGR, "ProofServMgr::FillEnvList")
1399
1400 if (!el) {
1401 TRACE(ALL, "env list undefined!");
1402 return;
1403 }
1404
1405 XrdOucString users(usrs), groups(grps);
1406 // Transform version numbers in the human unreadable format used internally (version code)
1407 if (vmi > 0) vmi = XpdEnv::ToVersCode(vmi, hex);
1408 if (vmx > 0) vmx = XpdEnv::ToVersCode(vmx, hex);
1409 // Create the entry
1410 XpdEnv xpe(nam, val, users.c_str(), groups.c_str(), smi, smx, vmi, vmx);
1411 if (users.length() > 0) {
1412 XrdOucString usr;
1413 int from = 0;
1414 while ((from = users.tokenize(usr, from, ',')) != -1) {
1415 if (usr.length() > 0) {
1416 if (groups.length() > 0) {
1417 XrdOucString grp;
1418 int fromg = 0;
1419 while ((fromg = groups.tokenize(grp, from, ',')) != -1) {
1420 if (grp.length() > 0) {
1421 xpe.Reset(nam, val, usr.c_str(), grp.c_str(), smi, smx, vmi, vmx);
1422 el->push_back(xpe);
1423 }
1424 }
1425 } else {
1426 xpe.Reset(nam, val, usr.c_str(), 0, smi, smx, vmi, vmx);
1427 el->push_back(xpe);
1428 }
1429 }
1430 }
1431 } else {
1432 if (groups.length() > 0) {
1433 XrdOucString grp;
1434 int fromg = 0;
1435 while ((fromg = groups.tokenize(grp, fromg, ',')) != -1) {
1436 if (grp.length() > 0) {
1437 xpe.Reset(nam, val, 0, grp.c_str(), smi, smx, vmi, vmx);
1438 el->push_back(xpe);
1439 }
1440 }
1441 } else {
1442 el->push_back(xpe);
1443 }
1444 }
1445 // Done
1446 return;
1447}
1448
1449////////////////////////////////////////////////////////////////////////////////
1450/// Process 'shutdown' directive
1451
1452int XrdProofdProofServMgr::DoDirectiveShutdown(char *val, XrdOucStream *cfg, bool)
1453{
1454 if (!val || !cfg)
1455 // undefined inputs
1456 return -1;
1457
1458 int opt = -1;
1459 int delay = -1;
1460
1461 // Shutdown option
1462 int dp = strtol(val,0,10);
1463 if (dp >= 0 && dp <= 2)
1464 opt = dp;
1465 // Shutdown delay
1466 if ((val = cfg->GetWord())) {
1467 int l = strlen(val);
1468 int f = 1;
1469 XrdOucString tval = val;
1470 // Parse
1471 if (val[l-1] == 's') {
1472 val[l-1] = 0;
1473 } else if (val[l-1] == 'm') {
1474 f = 60;
1475 val[l-1] = 0;
1476 } else if (val[l-1] == 'h') {
1477 f = 3600;
1478 val[l-1] = 0;
1479 } else if (val[l-1] < 48 || val[l-1] > 57) {
1480 f = -1;
1481 }
1482 if (f > 0) {
1483 int de = strtol(val,0,10);
1484 if (de > 0)
1485 delay = de * f;
1486 }
1487 }
1488
1489 // Check deprecated 'if' directive
1490 if (fMgr->Host() && cfg)
1491 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
1492 return 0;
1493
1494 // Set the values
1495 fShutdownOpt = (opt > -1) ? opt : fShutdownOpt;
1496 fShutdownDelay = (delay > -1) ? delay : fShutdownDelay;
1497
1498 return 0;
1499}
1500
1501////////////////////////////////////////////////////////////////////////////////
1502/// Process manager request
1503
1505{
1506 XPDLOC(SMGR, "ProofServMgr::Process")
1507
1508 int rc = 1;
1509 XPD_SETRESP(p, "Process");
1510
1511 TRACEP(p, REQ, "enter: req id: " << p->Request()->header.requestid << " (" <<
1512 XrdProofdAux::ProofRequestTypes(p->Request()->header.requestid) << ")");
1513
1514 XrdSysMutexHelper mtxh(p->Client()->Mutex());
1515
1516 // Once logged-in, the user can request the real actions
1517 XrdOucString emsg("Invalid request code: ");
1518
1519 int twait = 20;
1520
1521 if (Pipe()->Post(XrdProofdProofServMgr::kProcessReq, 0) != 0) {
1522 response->Send(kXR_ServerError,
1523 "ProofServMgr::Process: error posting internal pipe for authorization to proceed");
1524 return 0;
1525 }
1526 if (fProcessSem.Wait(twait) != 0) {
1527 response->Send(kXR_ServerError,
1528 "ProofServMgr::Process: timed-out waiting for authorization to proceed - retry later");
1529 return 0;
1530 }
1531
1532 // This is needed to block the session checks
1534
1535 switch(p->Request()->header.requestid) {
1536 case kXP_create:
1537 return Create(p);
1538 case kXP_destroy:
1539 return Destroy(p);
1540 case kXP_attach:
1541 return Attach(p);
1542 case kXP_detach:
1543 return Detach(p);
1544 default:
1545 emsg += p->Request()->header.requestid;
1546 break;
1547 }
1548
1549 // Whatever we have, it's not valid
1550 response->Send(kXR_InvalidRequest, emsg.c_str());
1551 return 0;
1552}
1553
1554////////////////////////////////////////////////////////////////////////////////
1555/// Handle a request to attach to an existing session
1556
1558{
1559 XPDLOC(SMGR, "ProofServMgr::Attach")
1560
1561 int psid = -1, rc = 0;
1562 XPD_SETRESP(p, "Attach");
1563
1564 // Unmarshall the data
1565 psid = ntohl(p->Request()->proof.sid);
1566 TRACEP(p, REQ, "psid: "<<psid<<", CID = "<<p->CID());
1567
1568 // The client instance must be defined
1569 XrdProofdClient *c = p->Client();
1570 if (!c) {
1571 TRACEP(p, XERR, "client instance undefined");
1572 response->Send(kXR_ServerError,"client instance undefined");
1573 return 0;
1574 }
1575
1576 // Find server session; sessions maybe recovering, so we need to take
1577 // that into account
1578 XrdProofdProofServ *xps = 0;
1579 int now = time(0);
1580 int deadline = -1, defdeadline = now + fRecoverTimeOut;
1581 while ((deadline < 0) || (now < deadline)) {
1582 if (!(xps = c->GetServer(psid)) || !xps->IsValid()) {
1583 // If the client is recovering start regular checks
1584 if (!IsClientRecovering(c->User(), c->Group(), deadline)) {
1585 // Failure
1586 TRACEP(p, XERR, "session ID not found: "<<psid);
1587 response->Send(kXR_InvalidRequest,"session ID not found");
1588 return 0;
1589 } else {
1590 // Make dure we do not enter an infinite loop
1591 deadline = (deadline > 0) ? deadline : defdeadline;
1592 // Wait until deadline in 1 sec steps
1593 sleep(1);
1594 now++;
1595 }
1596 } else {
1597 // Found
1598 break;
1599 }
1600 }
1601 // If we deadline we should fail now
1602 if (!xps || !xps->IsValid()) {
1603 TRACEP(p, XERR, "session ID not found: "<<psid);
1604 response->Send(kXR_InvalidRequest,"session ID not found");
1605 return 0;
1606 }
1607 TRACEP(p, DBG, "xps: "<<xps<<", status: "<< xps->Status());
1608
1609 // Stream ID
1610 unsigned short sid;
1611 memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
1612
1613 // We associate this instance to the corresponding slot in the
1614 // session vector of attached clients
1615 XrdClientID *csid = xps->GetClientID(p->CID());
1616 csid->SetP(p);
1617 csid->SetSid(sid);
1618
1619 // Take parentship, if orphalin
1620 if (!(xps->Parent()))
1621 xps->SetParent(csid);
1622
1623 // Notify to user
1624 int protvers = (xps && xps->ROOT()) ? xps->ROOT()->SrvProtVers() : -1;
1625 if (p->ConnType() == kXPD_ClientMaster) {
1626 // Send also back the data pool url
1627 XrdOucString dpu = fMgr->PoolURL();
1628 if (!dpu.endswith('/'))
1629 dpu += '/';
1630 dpu += fMgr->NameSpace();
1631 response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN,
1632 (void *) dpu.c_str(), dpu.length());
1633 } else
1634 response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN);
1635
1636 // Send saved start processing message, if not idle
1637 if (xps->Status() == kXPD_running && xps->StartMsg()) {
1638 TRACEP(p, XERR, "sending start process message ("<<xps->StartMsg()->fSize<<" bytes)");
1639 response->Send(kXR_attn, kXPD_msg,
1640 xps->StartMsg()->fBuff, xps->StartMsg()->fSize);
1641 }
1642
1643 // Over
1644 return 0;
1645}
1646
1647////////////////////////////////////////////////////////////////////////////////
1648/// Allocate and prepare the XrdProofdProofServ object describing this session
1649
1652 unsigned short &sid)
1653{
1654 XPDLOC(SMGR, "ProofServMgr::PrepareProofServ")
1655
1656 // Allocate next free server ID and fill in the basic stuff
1658 xps->SetClient(p->Client()->User());
1659 xps->SetSrvType(p->ConnType());
1660
1661 // Prepare the stream identifier
1662 memcpy((void *)&sid, (const void *)&(p->Request()->header.streamid[0]), 2);
1663 // We associate this instance to the corresponding slot in the
1664 // session vector of attached clients
1665 XrdClientID *csid = xps->GetClientID(p->CID());
1666 csid->SetSid(sid);
1667 csid->SetP(p);
1668 // Take parentship, if orphalin
1669 xps->SetParent(csid);
1670
1671 // The ROOT version to be used
1672 xps->SetROOT(p->Client()->ROOT());
1673 XrdOucString msg;
1674 XPDFORM(msg, "using ROOT version: %s", xps->ROOT()->Export());
1675 TRACEP(p, REQ, msg);
1676 if (p->ConnType() == kXPD_ClientMaster) {
1677 // Notify the client if using a version different from the default one
1678 if (fMgr && p->Client()->ROOT() != fMgr->ROOTMgr()->DefaultVersion()) {
1679 XPDFORM(msg, "++++ Using NON-default ROOT version: %s ++++\n", xps->ROOT()->Export());
1680 r->Send(kXR_attn, kXPD_srvmsg, (char *) msg.c_str(), msg.length());
1681 }
1682 }
1683
1684 // Done
1685 return xps;
1686}
1687
1688////////////////////////////////////////////////////////////////////////////////
1689/// Extract relevant quantities from the buffer received during a create request
1690
1692 XrdProofdProofServ *xps,
1693 XrdOucString &tag, XrdOucString &ord,
1694 XrdOucString &cffile,
1695 XrdOucString &uenvs, int &intwait)
1696{
1697 XPDLOC(SMGR, "ProofServMgr::ParseCreateBuffer")
1698
1699 // Parse buffer
1700 char *buf = p->Argp()->buff;
1701 int len = p->Request()->proof.dlen;
1702
1703 // Extract session tag
1704 tag.assign(buf,0,len-1);
1705
1706 TRACEP(p, DBG, "received buf: "<<tag);
1707
1708 tag.erase(tag.find('|'));
1709 xps->SetTag(tag.c_str());
1710 TRACEP(p, DBG, "tag: "<<tag);
1711
1712 // Extract ordinal number
1713 ord = "0";
1714 if ((p->ConnType() == kXPD_MasterWorker) || (p->ConnType() == kXPD_MasterMaster)) {
1715 ord.assign(buf,0,len-1);
1716 int iord = ord.find("|ord:");
1717 if (iord != STR_NPOS) {
1718 ord.erase(0,iord+5);
1719 ord.erase(ord.find("|"));
1720 } else
1721 ord = "0";
1722 }
1723 xps->SetOrdinal(ord.c_str());
1724
1725 // Extract config file, if any (for backward compatibility)
1726 cffile.assign(buf,0,len-1);
1727 int icf = cffile.find("|cf:");
1728 if (icf != STR_NPOS) {
1729 cffile.erase(0,icf+4);
1730 cffile.erase(cffile.find("|"));
1731 } else
1732 cffile = "";
1733
1734 // Extract # number of workers, if plite master
1735 XrdOucString plitenwk;
1736 plitenwk.assign(buf,0,len-1);
1737 int inwk = plitenwk.find("|plite:");
1738 if (inwk != STR_NPOS) {
1739 plitenwk.erase(0,inwk+7);
1740 plitenwk.erase(plitenwk.find("|"));
1741 int nwk = plitenwk.atoi();
1742 if (nwk > -1) {
1743 xps->SetPLiteNWrks(nwk);
1744 TRACEP(p, DBG, "P-Lite master with "<<nwk<<" workers (0 means # or cores)");
1745 }
1746 }
1747
1748 // Extract user envs, if any
1749 uenvs.assign(buf,0,len-1);
1750 int ienv = uenvs.find("|envs:");
1751 if (ienv != STR_NPOS) {
1752 uenvs.erase(0,ienv+6);
1753 uenvs.erase(uenvs.find("|"));
1754 xps->SetUserEnvs(uenvs.c_str());
1755 } else
1756 uenvs = "";
1757
1758 // Check if the user wants to wait more for the session startup
1759 intwait = fInternalWait;
1760 if (uenvs.length() > 0) {
1761 TRACEP(p, DBG, "user envs: "<<uenvs);
1762 int iiw = STR_NPOS;
1763 if ((iiw = uenvs.find("PROOF_INTWAIT=")) != STR_NPOS) {
1764 XrdOucString s(uenvs, iiw + strlen("PROOF_INTWAIT="));
1765 s.erase(s.find(','));
1766 if (s.isdigit()) {
1767 intwait = s.atoi();
1768 TRACEP(p, ALL, "startup internal wait set by user to "<<intwait);
1769 }
1770 }
1771 }
1772}
1773
1774////////////////////////////////////////////////////////////////////////////////
1775/// Handle a request to create a new session
1776
1778{
1779 XPDLOC(SMGR, "ProofServMgr::Create")
1780
1781 int psid = -1, rc = 0;
1782 XPD_SETRESP(p, "Create");
1783
1784 TRACEP(p, DBG, "enter");
1785 XrdOucString msg;
1786
1787 XpdSrvMgrCreateGuard mcGuard;
1788
1789 // Check if we are allowed to start a new session
1790 int mxsess = (fMgr && fMgr->ProofSched()) ? fMgr->ProofSched()->MaxSessions() : -1;
1791 if (p->ConnType() == kXPD_ClientMaster && mxsess > 0) {
1792 XrdSysMutexHelper mhp(fMutex);
1793 int cursess = CurrentSessions();
1794 TRACEP(p,ALL," cursess: "<<cursess);
1795 if (mxsess <= cursess) {
1796 XPDFORM(msg, " ++++ Max number of sessions reached (%d) - please retry later ++++ \n", cursess);
1797 response->Send(kXR_attn, kXPD_srvmsg, (char *) msg.c_str(), msg.length());
1798 response->Send(kXP_TooManySess, "cannot start a new session");
1799 return 0;
1800 }
1801 // If we fail this guarantees that the counters are decreased, if needed
1802 mcGuard.Set(&fCurrentSessions);
1803 }
1804
1805 // Update counter to control checks during creation
1806 XpdSrvMgrCreateCnt cnt(this, kCreateCnt);
1807 if (TRACING(DBG)) {
1808 int nc = CheckCounter(kCreateCnt);
1809 TRACEP(p, DBG, nc << " threads are creating a new session");
1810 }
1811
1812 // Allocate and prepare the XrdProofdProofServ object describing this session
1813 unsigned short sid;
1814 XrdProofdProofServ *xps = PrepareProofServ(p, response, sid);
1815 psid = xps->ID();
1816
1817 // Unmarshall log level
1818 int loglevel = ntohl(p->Request()->proof.int1);
1819
1820 // Parse buffer
1821 int intwait;
1822 XrdOucString tag, ord, cffile, uenvs;
1823 ParseCreateBuffer(p, xps, tag, ord, cffile, uenvs, intwait);
1824
1825 // Notify
1826 TRACEP(p, DBG, "{ord,cfg,psid,cid,log}: {"<<ord<<","<<cffile<<","<<psid
1827 <<","<<p->CID()<<","<<loglevel<<"}");
1828
1829 // Here we fork: for some weird problem on SMP machines there is a
1830 // non-zero probability for a deadlock situation in system mutexes.
1831 // The semaphore seems to have solved the problem.
1832 if (fForkSem.Wait(10) != 0) {
1833 xps->Reset();
1834 // Timeout acquire fork semaphore
1835 response->Send(kXP_ServerError, "timed-out acquiring fork semaphore");
1836 return 0;
1837 }
1838
1839 // Pipe for child-to-parent communications during setup
1840 XrdProofdPipe fpc, fcp;
1841 if (!(fpc.IsValid()) || !(fcp.IsValid())) {
1842 xps->Reset();
1843 // Failure creating pipe
1844 response->Send(kXP_ServerError,
1845 "unable to create pipes for communication during setup");
1846 return 0;
1847 }
1848
1849 // Start setting up the unique tag and relevant dirs for this session
1850 ProofServEnv_t in = {xps, loglevel, cffile.c_str(), "", "", tag.c_str(), "", "", 1};
1851 GetTagDirs(0, p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
1852
1853 // Fork an agent process to handle this session
1854 int pid = -1;
1855 TRACEP(p, FORK,"Forking external proofsrv");
1856 if (!(pid = fMgr->Sched()->Fork("proofsrv"))) {
1857
1858 // Finalize unique tag and relevant dirs for this session
1859 GetTagDirs((int)getpid(),
1860 p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
1861
1862 // Create log file path
1863 FormFileNameInSessionDir(p, xps, in.fSessionDir.c_str(), "log", in.fLogFile);
1864
1865 // Log to the session log file from now on
1866 if (fLogger) fLogger->Bind(in.fLogFile.c_str());
1867 TRACE(FORK, "log file: "<<in.fLogFile);
1868
1869 XrdOucString pmsg = "*** spawned child process ";
1870 pmsg += (int) getpid();
1871 pmsg += " ***";
1872 TRACE(ALL, pmsg);
1873
1874 // These files belongs to the client
1875 if (chown(in.fLogFile.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid) != 0)
1876 TRACE(XERR, "chown on '"<<in.fLogFile.c_str()<<"'; errno: "<<errno);
1877
1878 XpdMsg xmsg;
1879 XrdOucString path, sockpath, emsg;
1880
1881 // Receive the admin path from the parent
1882 if (fpc.Poll() < 0) {
1883 TRACE(XERR, "error while polling to receive the admin path from parent - EXIT" );
1884 exit(1);
1885 }
1886 if (fpc.Recv(xmsg) != 0) {
1887 TRACE(XERR, "error reading message while waiting for the admin path from parent - EXIT" );
1888 exit(1);
1889 }
1890 if (xmsg.Type() < 0) {
1891 TRACE(XERR, "the parent failed to setup the admin path - EXIT" );
1892 exit(1);
1893 }
1894 // Set the path w/o asserting the related files
1895 path = xmsg.Buf();
1896 xps->SetAdminPath(path.c_str(), 0, fMgr->ChangeOwn());
1897 TRACE(FORK, "admin path: "<<path);
1898
1899 xmsg.Reset();
1900 // Receive the sock path from the parent
1901 if (fpc.Poll() < 0) {
1902 TRACE(XERR, "error while polling to receive the sock path from parent - EXIT" );
1903 exit(1);
1904 }
1905 if (fpc.Recv(xmsg) != 0) {
1906 TRACE(XERR, "error reading message while waiting for the sock path from parent - EXIT" );
1907 exit(1);
1908 }
1909 if (xmsg.Type() < 0) {
1910 TRACE(XERR, "the parent failed to setup the sock path - EXIT" );
1911 exit(1);
1912 }
1913 // Set the UNIX sock path
1914 sockpath = xmsg.Buf();
1915 xps->SetUNIXSockPath(sockpath.c_str());
1916 TRACE(FORK, "UNIX sock path: "<<sockpath);
1917
1918 // We set to the user ownerships and create relevant dirs
1919 bool asserdatadir = 1;
1920 int srvtype = xps->SrvType();
1921 TRACE(ALL,"srvtype = "<< srvtype);
1922 if (xps->SrvType() != kXPD_Worker && !strchr(fMgr->DataDirOpts(), 'M')) {
1923 asserdatadir = 0;
1924 } else if (xps->SrvType() == kXPD_Worker && !strchr(fMgr->DataDirOpts(), 'W')) {
1925 asserdatadir = 0;
1926 }
1927 const char *pord = asserdatadir ? ord.c_str() : 0;
1928 const char *ptag = asserdatadir ? in.fSessionTag.c_str() : 0;
1929 if (SetUserOwnerships(p, pord, ptag) != 0) {
1930 emsg = "SetUserOwnerships did not return OK - EXIT";
1931 TRACE(XERR, emsg);
1932 if (fcp.Post(0, emsg.c_str()) != 0)
1933 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
1934 exit(1);
1935 }
1936
1937 // We set to the user environment
1938 if (SetUserEnvironment(p) != 0) {
1939 emsg = "SetUserEnvironment did not return OK - EXIT";
1940 TRACE(XERR, emsg);
1941 if (fcp.Post(0, emsg.c_str()) != 0)
1942 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
1943 exit(1);
1944 }
1945
1946 char *argvv[7] = {0};
1947
1948 // We set to the user environment
1949 if (!fMgr) {
1950 emsg = "XrdProofdManager instance undefined!";
1951 TRACE(XERR, emsg);
1952 if (fcp.Post(0, emsg.c_str()) != 0)
1953 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
1954 exit(1);
1955 }
1956 char *sxpd = 0;
1957 if (fMgr->AdminPath()) {
1958 // We add our admin path to be able to identify processes coming from us
1959 size_t len = strlen(fMgr->AdminPath()) + strlen("xpdpath:") + 1;
1960 sxpd = new char[len];
1961 snprintf(sxpd, len, "xpdpath:%s", fMgr->AdminPath());
1962 } else {
1963 // We add our PID to be able to identify processes coming from us
1964 sxpd = new char[10];
1965 snprintf(sxpd, 10, "%d", getppid());
1966 }
1967
1968 // Log level
1969 char slog[10] = {0};
1970 snprintf(slog, 10, "%d", loglevel);
1971
1972 // Server type
1973 char ssrv[10] = {0};
1974 snprintf(ssrv, 10, "%d", xps->SrvType());
1975
1976 // start server
1977 argvv[0] = (char *) xps->ROOT()->PrgmSrv();
1978 argvv[1] = (char *)((p->ConnType() == kXPD_MasterWorker) ? "proofslave"
1979 : "proofserv");
1980 argvv[2] = (char *)"xpd";
1981 argvv[3] = (char *)sxpd;
1982 argvv[4] = (char *)slog;
1983 argvv[5] = (char *)ssrv;
1984 argvv[6] = 0;
1985
1986 // Set environment for proofserv
1987 if (SetProofServEnv(p, (void *)&in) != 0) {
1988 emsg = "SetProofServEnv did not return OK - EXIT";
1989 TRACE(XERR, emsg);
1990 if (fcp.Post(0, emsg.c_str()) != 0)
1991 TRACE(XERR, "cannot write to internal pipe; errno: "<<errno);
1992 exit(1);
1993 }
1994 TRACE(FORK, (int)getpid() << ": proofserv env set up");
1995
1996 // Setup OK: now we go
1997 // Communicate the logfile path
1998 if (fcp.Post(1, xps->Fileout()) != 0) {
1999 TRACE(XERR, "cannot write log file path to internal pipe; errno: "<<errno);
2000 exit(1);
2001 }
2002 TRACE(FORK, (int)getpid()<< ": log file path communicated");
2003
2004 // Unblock SIGUSR1 and SIGUSR2
2005 sigset_t myset;
2006 sigemptyset(&myset);
2007 sigaddset(&myset, SIGUSR1);
2008 sigaddset(&myset, SIGUSR2);
2009 pthread_sigmask(SIG_UNBLOCK, &myset, 0);
2010
2011 // Close pipes
2012 fpc.Close();
2013 fcp.Close();
2014
2015 TRACE(FORK, (int)getpid()<<": user: "<<p->Client()->User()<<
2016 ", uid: "<<getuid()<<", euid:"<<geteuid()<<
2017 ", psrv: "<<xps->ROOT()->PrgmSrv()<<", argvv[1]: "<<argvv[1]);
2018 // Run the program
2019 execv(xps->ROOT()->PrgmSrv(), argvv);
2020
2021 // We should not be here!!!
2022 TRACE(XERR, "returned from execv: bad, bad sign !!! errno:" << (int)errno);
2023 exit(1);
2024 }
2025
2026 // Wakeup colleagues
2027 fForkSem.Post();
2028
2029 // parent process
2030 if (pid < 0) {
2031 xps->Reset();
2032 // Failure in forking
2033 response->Send(kXP_ServerError, "could not fork agent");
2034 return 0;
2035 }
2036
2037 TRACEP(p, FORK,"Parent process: child is "<<pid);
2038 XrdOucString emsg;
2039
2040 // Finalize unique tag and relevant dirs for this session
2041 GetTagDirs((int)pid, p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
2042
2043 // Create log file path
2044 FormFileNameInSessionDir(p, xps, in.fSessionDir.c_str(), "log", in.fLogFile);
2045
2046 TRACEP(p, FORK, "log file: "<<in.fLogFile);
2047
2048 // Log prefix
2049 XrdOucString npfx;
2050 XPDFORM(npfx, "%s-%s:", (p->ConnType() == kXPD_MasterWorker) ? "wrk" : "mst", xps->Ordinal());
2051
2052 // Cleanup current socket, if any
2053 if (xps->UNIXSock()) {
2054 TRACEP(p, FORK,"current UNIX sock: "<<xps->UNIXSock() <<", path: "<<xps->UNIXSockPath());
2055 xps->DeleteUNIXSock();
2056 }
2057
2058 // Admin and UNIX Socket Path (set path and create the socket); we need to
2059 // set and create them in here, otherwise the cleaning may remove the socket
2060 XrdOucString path, sockpath;
2061 XPDFORM(path, "%s/%s.%s.%d", fActiAdminPath.c_str(),
2062 p->Client()->User(), p->Client()->Group(), pid);
2063 // Sock path under dedicated directory to avoid problems related to its length
2064 XPDFORM(sockpath, "%s/xpd.%d.%d", fMgr->SockPathDir(), fMgr->Port(), pid);
2065 struct sockaddr_un unserver;
2066 if (sockpath.length() > (int)(sizeof(unserver.sun_path) - 1)) {
2067 emsg = "socket path very long (";
2068 emsg += sockpath.length();
2069 emsg += "): this may lead to stack corruption!";
2070 emsg += " Use xpd.sockpathdir to change it";
2071 TRACEP(p, XERR, emsg.c_str());
2072 }
2073 int pathrc = 0;
2074 if (!pathrc && !(pathrc = xps->SetAdminPath(path.c_str(), 1, fMgr->ChangeOwn()))) {
2075 // Communicate the path to child
2076 if ((pathrc = fpc.Post(0, path.c_str())) != 0) {
2077 emsg = "failed to communicating path to child";
2078 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2079 TRACEP(p, XERR, emsg.c_str());
2080 }
2081 } else {
2082 emsg = "failed to setup child admin path";
2083 // Communicate failure to child
2084 if ((pathrc = fpc.Post(-1, path.c_str())) != 0) {
2085 emsg += ": failed communicating failure to child";
2086 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2087 TRACEP(p, XERR, emsg.c_str());
2088 }
2089 }
2090 // Now create the UNIX sock path
2091 if (!pathrc) {
2092 xps->SetUNIXSockPath(sockpath.c_str());
2093 if ((pathrc = xps->CreateUNIXSock(fEDest)) != 0) {
2094 // Failure
2095 emsg = "failure creating UNIX socket on " ;
2096 emsg += sockpath;
2097 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2098 TRACEP(p, XERR, emsg.c_str());
2099 }
2100 }
2101 if (!pathrc) {
2102 TRACEP(p, FORK,"UNIX sock: "<<xps->UNIXSockPath());
2103 if ((pathrc = chown(sockpath.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid)) != 0) {
2104 emsg = "failure changing ownership of the UNIX socket on " ;
2105 emsg += sockpath;
2106 emsg += "; errno: " ;
2107 emsg += errno;
2108 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2109 TRACEP(p, XERR, emsg.c_str());
2110 }
2111 }
2112 // Communicate sockpath or failure, if any
2113 if (!pathrc) {
2114 // Communicate the path to child
2115 if ((pathrc = fpc.Post(0, sockpath.c_str())) != 0) {
2116 emsg = "failed to communicating path to child";
2117 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2118 TRACEP(p, XERR, emsg.c_str());
2119 }
2120 } else {
2121 emsg = "failed to setup child admin path";
2122 // Communicate failure to child
2123 if ((pathrc = fpc.Post(-1, sockpath.c_str())) != 0) {
2124 emsg += ": failed communicating failure to child";
2125 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2126 TRACEP(p, XERR, emsg.c_str());
2127 }
2128 }
2129
2130 if (pathrc != 0) {
2131 // Failure
2132 xps->Reset();
2133 XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
2134 // Make sure that the log file path reaches the caller
2135 emsg += "|log:";
2136 emsg += in.fLogFile;
2137 emsg.insert(npfx, 0);
2138 response->Send(kXP_ServerError, emsg.c_str());
2139 return 0;
2140 }
2141
2142 TRACEP(p, FORK, "waiting for client setup status ...");
2143
2144 emsg = "proofserv setup";
2145 // Wait for the setup process on the pipe, 20 secs max (10 x 2000 millisecs): this
2146 // is enough to cover possible delays due to heavy load; the client will anyhow
2147 // retry a few times
2148 int ntry = 10, prc = 0, rst = -1;
2149 while (prc == 0 && ntry--) {
2150 // Poll for 2 secs
2151 if ((prc = fcp.Poll(2)) > 0) {
2152 // Got something: read the message out
2153 XpdMsg xmsg;
2154 if (fcp.Recv(xmsg) != 0) {
2155 emsg = "error receiving message from pipe";
2156 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2157 TRACEP(p, XERR, emsg.c_str());
2158 prc = -1;
2159 break;
2160 }
2161 // Status is the message type
2162 rst = xmsg.Type();
2163 // Read string, if any
2164 XrdOucString xbuf = xmsg.Buf();
2165 if (xbuf.length() <= 0) {
2166 emsg = "error reading buffer {logfile, error message} from message received on the pipe";
2167 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2168 TRACEP(p, XERR, emsg.c_str());
2169 prc = -1;
2170 break;
2171 }
2172 if (rst > 0) {
2173 // Set the log file
2174 xps->SetFileout(xbuf.c_str());
2175 // Set also the session tag
2176 XrdOucString stag(xbuf);
2177 stag.erase(stag.rfind('/'));
2178 stag.erase(0, stag.find("session-") + strlen("session-"));
2179 xps->SetTag(stag.c_str());
2180
2181 } else {
2182 // Setup failed: save the error
2183 prc = -1;
2184 emsg = "failed: ";
2185 emsg += xbuf;
2186 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2187 TRACEP(p, XERR, emsg.c_str());
2188 break;
2189 }
2190
2191 } else if (prc < 0) {
2192 emsg = "error receive status-of-setup from pipe";
2193 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2194 TRACEP(p, XERR, emsg.c_str());
2195 break;
2196 } else {
2197 TRACEP(p, FORK, "receiving status-of-setup from pipe: waiting 2 s ..."<<pid);
2198 }
2199 }
2200
2201 // Close pipes
2202 fpc.Close();
2203 fcp.Close();
2204
2205 TRACEP(p, FORK, "tags: tag:"<<in.fSessionTag<<" top:"<<in.fTopSessionTag<<" xps:"<<xps->Tag());
2206
2207 // Notify the user
2208 if (prc <= 0) {
2209 // Timed-out or failed: we are done; if timed-out finalize the notification message
2210 emsg = "failure setting up proofserv" ;
2211 if (prc == 0) emsg += ": timed-out receiving status-of-setup from pipe";
2212 // Dump to the log file
2213 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2214 // Recycle the session object
2215 xps->Reset();
2216 XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
2217 // Make sure that the log file path reaches the caller
2218 emsg += "|log:";
2219 emsg += in.fLogFile;
2220 TRACEP(p, XERR, emsg.c_str());
2221 emsg.insert(npfx, 0);
2222 response->Send(kXP_ServerError, emsg.c_str());
2223 return 0;
2224
2225 } else {
2226 // Setup was successful
2227 XrdOucString info;
2228 // The log file path (so we do it independently of a successful session startup)
2229 info += "|log:";
2230 info += xps->Fileout();
2231 // Send it back
2232 response->SendI(psid, xps->ROOT()->SrvProtVers(), (kXR_int16)XPROOFD_VERSBIN,
2233 (void *) info.c_str(), info.length());
2234 }
2235
2236 // now we wait for the callback to be (successfully) established
2237 TRACEP(p, FORK, "server launched: wait for callback ");
2238
2239 // Set ID
2240 xps->SetSrvPID(pid);
2241
2242 // Wait for the call back
2243 if (AcceptPeer(xps, intwait, emsg) != 0) {
2244 emsg = "problems accepting callback: ";
2245 // Failure: kill the child process
2246 if (XrdProofdAux::KillProcess(pid, 0, p->Client()->UI(), fMgr->ChangeOwn()) != 0)
2247 emsg += "process could not be killed - pid: ";
2248 else
2249 emsg += "process killed - pid: ";
2250 emsg += (int)pid;
2251 // Dump to the log file
2252 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2253 // Reset the instance
2254 xps->Reset();
2255 // Notify
2256 TRACEP(p, XERR, emsg.c_str());
2257 emsg.insert(npfx, 0);
2258 response->Send(kXR_attn, kXPD_errmsg, (char *) emsg.c_str(), emsg.length());
2259 return 0;
2260 }
2261 // Set the group, if any
2262 xps->SetGroup(p->Client()->Group());
2263
2264 // Change child process priority, if required
2265 int dp = 0;
2267 p->Client()->User(), dp) != 0) {
2268 TRACEP(p, XERR, "problems changing child process priority");
2269 } else if (dp > 0) {
2270 TRACEP(p, DBG, "priority of the child process changed by " << dp << " units");
2271 }
2272
2273 XrdClientID *cid = xps->Parent();
2274 TRACEP(p, FORK, "xps: "<<xps<<", ClientID: "<<(int *)cid<<" (sid: "<<sid<<")"<<" NClients: "<<xps->GetNClients(1));
2275
2276 // Record this session in the client sandbox
2277 if (p->Client()->Sandbox()->AddSession(xps->Tag()) == -1)
2278 TRACEP(p, REQ, "problems recording session in sandbox");
2279
2280 // Success; avoid that the global counter is decreased
2281 mcGuard.Set(0);
2282
2283 // Update the global session handlers
2284 XrdOucString key; key += pid;
2285 { XrdSysMutexHelper mh(fMutex);
2286 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2287 fActiveSessions.push_back(xps);
2288 }
2289 AddSession(p, xps);
2290
2291 // Check session validity
2292 if (!xps->IsValid()) {
2293 // Notify
2294 TRACEP(p, XERR, "PROOF session is invalid: protocol error? " <<emsg);
2295 }
2296
2297 // Over
2298 return 0;
2299}
2300////////////////////////////////////////////////////////////////////////////////
2301/// Create the admin path for the starting session
2302/// Return 0 on success, -1 on error (error message in 'emsg')
2303
2305 XrdProofdProtocol *p, int pid,
2306 XrdOucString &emsg)
2307{
2308 XrdOucString path;
2309 bool assert = (pid > 0) ? 1 : 0;
2310 XPDFORM(path, "%s/%s.%s.", fActiAdminPath.c_str(),
2311 p->Client()->User(), p->Client()->Group());
2312 if (pid > 0) path += pid;
2313 if (xps->SetAdminPath(path.c_str(), assert, fMgr->ChangeOwn()) != 0) {
2314 XPDFORM(emsg, "failure setting admin path '%s'", path.c_str());
2315 return -1;
2316 }
2317 // Done
2318 return 0;
2319}
2320
2321////////////////////////////////////////////////////////////////////////////////
2322/// Create the socket path for the starting session
2323/// Return 0 on success, -1 on error (error message in 'emsg')
2324
2327 unsigned int seq, XrdOucString &emsg)
2328{
2329 XPDLOC(SMGR, "ProofServMgr::CreateSockPath")
2330
2331 XrdOucString sockpath;
2332 // Sock path under dedicated directory to avoid problems related to its length
2333 XPDFORM(sockpath, "%s/xpd.%d.%d.%u", fMgr->SockPathDir(), fMgr->Port(), getpid(), seq);
2334 TRACEP(p, ALL, "socket path: " << sockpath);
2335 struct sockaddr_un unserver;
2336 if (sockpath.length() > (int)(sizeof(unserver.sun_path) - 1)) {
2337 XPDFORM(emsg, "socket path very long (%d): this may lead to stack corruption! ", sockpath.length());
2338 return -1;
2339 }
2340 // Now create the UNIX sock path and set its permissions
2341 xps->SetUNIXSockPath(sockpath.c_str());
2342 if (xps->CreateUNIXSock(fEDest) != 0) {
2343 // Failure
2344 XPDFORM(emsg, "failure creating UNIX socket on '%s'", sockpath.c_str());
2345 return -1;
2346 }
2347 if (chmod(sockpath.c_str(), 0755) != 0) {
2348 XPDFORM(emsg, "failure changing permissions of the UNIX socket on '%s'; errno: %d",
2349 sockpath.c_str(), (int)errno);
2350 return -1;
2351 }
2352
2353 // Done
2354 return 0;
2355}
2356
2357////////////////////////////////////////////////////////////////////////////////
2358/// Send content of errlog upstream asynchronously
2359
2361{
2362 XPDLOC(SMGR, "ProofServMgr::SendErrLog")
2363
2364 XrdOucString emsg("An error occured: the content of errlog follows:");
2365 r->Send(kXR_attn, kXPD_srvmsg, (char *) emsg.c_str(), emsg.length());
2366 emsg = "------------------------------------------------\n";
2367 r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) emsg.c_str(), emsg.length());
2368
2369 int ierr = open(errlog, O_RDONLY);
2370 if (ierr < 0) {
2371 XPDFORM(emsg, "cannot open '%s' (errno: %d)", errlog, errno);
2372 r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) emsg.c_str(), emsg.length());
2373 return;
2374 }
2375 struct stat st;
2376 if (fstat(ierr, &st) != 0) {
2377 XPDFORM(emsg, "cannot stat '%s' (errno: %d)", errlog, errno);
2378 r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) emsg.c_str(), emsg.length());
2379 close (ierr);
2380 return;
2381 }
2382 off_t len = st.st_size;
2383 TRACE(ALL, " reading "<<len<<" bytes from "<<errlog);
2384 ssize_t chunk = 2048, nb, nr;
2385 char buf[2048];
2386 ssize_t left = len;
2387 while (left > 0) {
2388 nb = (left > chunk) ? chunk : left;
2389 if ((nr = read(ierr, buf, nb)) < 0) {
2390 XPDFORM(emsg, "problems reading from '%s' (errno: %d)", errlog, errno);
2391 r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) emsg.c_str(), emsg.length());
2392 close(ierr);
2393 return;
2394 }
2395 TRACE(ALL, buf);
2396 r->Send(kXR_attn, kXPD_srvmsg, 2, buf, nr);
2397 left -= nr;
2398 }
2399 close(ierr);
2400 emsg = "------------------------------------------------";
2401 r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) emsg.c_str(), emsg.length());
2402
2403 // Done
2404 return;
2405}
2406
2407////////////////////////////////////////////////////////////////////////////////
2408/// Handle a request to recover a session after stop&restart
2409
2411{
2412 XPDLOC(SMGR, "ProofServMgr::ResolveSession")
2413
2414 TRACE(REQ, "resolving "<< (fpid ? fpid : "<nul>")<<" ...");
2415
2416 // Check inputs
2417 if (!fpid || strlen(fpid)<= 0 || !(fMgr->ClientMgr()) || !fRecoverClients) {
2418 TRACE(XERR, "invalid inputs: "<<(fpid ? fpid : "<nul>")<<", "<<fMgr->ClientMgr()<<
2419 ", "<<fRecoverClients);
2420 return -1;
2421 }
2422
2423 // Path to the session file
2424 XrdOucString path;
2425 XPDFORM(path, "%s/%s", fActiAdminPath.c_str(), fpid);
2426
2427 // Read info
2428 XrdProofSessionInfo si(path.c_str());
2429
2430 // Check if recovering is supported
2431 if (si.fSrvProtVers < 18) {
2432 TRACE(DBG, "session does not support recovering: protocol "
2433 <<si.fSrvProtVers<<" < 18");
2434 return -1;
2435 }
2436
2437 // Create client instance
2438 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(si.fUser.c_str(), si.fGroup.c_str(),
2439 si.fUnixPath.c_str());
2440 if (!c) {
2441 TRACE(DBG, "client instance not initialized");
2442 return -1;
2443 }
2444
2445 // Allocate the server object
2446 int psid = si.fID;
2447 XrdProofdProofServ *xps = c->GetServObj(psid);
2448 if (!xps) {
2449 TRACE(DBG, "server object not initialized");
2450 return -1;
2451 }
2452
2453 // Fill info for this session
2454 si.FillProofServ(*xps, fMgr->ROOTMgr());
2455 if (xps->CreateUNIXSock(fEDest) != 0) {
2456 // Failure
2457 TRACE(XERR,"failure creating UNIX socket on " << xps->UNIXSockPath());
2458 xps->Reset();
2459 return -1;
2460 }
2461
2462 // Set invalid as we are not yet connected
2463 xps->SetValid(0);
2464
2465 // Add to the lists
2466 XrdSysMutexHelper mhp(fRecoverMutex);
2467 std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
2468 while (ii != fRecoverClients->end()) {
2469 if ((*ii)->fClient == c)
2470 break;
2471 ++ii;
2472 }
2473 if (ii != fRecoverClients->end()) {
2474 (*ii)->fProofServs.push_back(xps);
2475 } else {
2477 cl->fProofServs.push_back(xps);
2478 fRecoverClients->push_back(cl);
2479 }
2480
2481 // Done
2482 return 0;
2483}
2484
2485////////////////////////////////////////////////////////////////////////////////
2486/// Handle a request to recover a session after stop&restart for a specific client
2487
2489{
2490 XPDLOC(SMGR, "ProofServMgr::Recover")
2491
2492 if (!cl) {
2493 TRACE(XERR, "invalid input!");
2494 return 0;
2495 }
2496
2497 TRACE(DBG, "client: "<< cl->fClient->User());
2498
2499 int nr = 0;
2500 XrdOucString emsg;
2501 XrdProofdProofServ *xps = 0;
2502 int nps = 0;
2503 { XrdSysMutexHelper mhp(cl->fMutex); nps = cl->fProofServs.size(); }
2504 while (nps--) {
2505
2506 { XrdSysMutexHelper mhp(cl->fMutex); xps = cl->fProofServs.front();
2507 cl->fProofServs.remove(xps); cl->fProofServs.push_back(xps); }
2508
2509 // Short steps of 1 sec
2510 if (AcceptPeer(xps, 1, emsg) != 0) {
2511 if (emsg == "timeout") {
2512 TRACE(DBG, "timeout while accepting callback");
2513 } else {
2514 TRACE(XERR, "problems accepting callback: "<<emsg);
2515 }
2516 } else {
2517 // Update the global session handlers
2518 XrdOucString key; key += xps->SrvPID();
2519 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2520 fActiveSessions.push_back(xps);
2521 xps->Protocol()->SetAdminPath(xps->AdminPath());
2522 // Remove from the temp list
2523 { XrdSysMutexHelper mhp(cl->fMutex); cl->fProofServs.remove(xps); }
2524 // Count
2525 nr++;
2526 // Notify
2527 if (TRACING(REQ)) {
2528 int pid = xps->SrvPID();
2529 int left = -1;
2530 { XrdSysMutexHelper mhp(cl->fMutex); left = cl->fProofServs.size(); }
2531 XPDPRT("session for "<<cl->fClient->User()<<"."<<cl->fClient->Group()<<
2532 " successfully recovered ("<<left<<" left); pid: "<<pid);
2533 }
2534 }
2535 }
2536
2537 // Over
2538 return nr;
2539}
2540
2541#ifndef ROOT_XrdFour
2542////////////////////////////////////////////////////////////////////////////////
2543/// Accept a callback from a starting-up server and setup the related protocol
2544/// object. Used for old servers.
2545/// Return 0 if successful or -1 in case of failure.
2546
2548 int to, XrdOucString &msg)
2549{
2550 XPDLOC(SMGR, "ProofServMgr::AcceptPeer")
2551
2552 // We will get back a peer to initialize a link
2553 XrdNetPeer peerpsrv;
2554
2555 // Check inputs
2556 if (!xps || !xps->UNIXSock()) {
2557 XPDFORM(msg, "session pointer undefined or socket invalid: %p", xps);
2558 return -1;
2559 }
2560 TRACE(REQ, "waiting for server callback for "<<to<<" secs ... on "<<xps->UNIXSockPath());
2561
2562 // Perform regular accept
2563 if (!(xps->UNIXSock()->Accept(peerpsrv, XRDNET_NODNTRIM, to))) {
2564 msg = "timeout";
2565 return -1;
2566 }
2567
2568 // Setup the protocol serving this peer
2569 if (SetupProtocol(peerpsrv, xps, msg) != 0) {
2570 msg = "could not assert connected peer: ";
2571 return -1;
2572 }
2573
2574 // Done
2575 return 0;
2576}
2577
2578////////////////////////////////////////////////////////////////////////////////
2579/// Setup the protocol object serving the peer described by 'peerpsrv'
2580
2582 XrdProofdProofServ *xps, XrdOucString &msg)
2583{
2584 XPDLOC(SMGR, "ProofServMgr::SetupProtocol")
2585
2586 // We will get back a peer to initialize a link
2587 XrdLink *linkpsrv = 0;
2588 XrdProtocol *xp = 0;
2589 int lnkopts = 0;
2590 bool go = 1;
2591
2592 // Make sure we have the full host name
2593 if (peerpsrv.InetName) free(peerpsrv.InetName);
2594 peerpsrv.InetName = XrdSysDNS::getHostName("localhost");
2595
2596 // Allocate a new network object
2597 if (!(linkpsrv = XrdLink::Alloc(peerpsrv, lnkopts))) {
2598 msg = "could not allocate network object: ";
2599 go = 0;
2600 }
2601
2602 if (go) {
2603 // Keep buffer after object goes away
2604 peerpsrv.InetBuff = 0;
2605 TRACE(DBG, "connection accepted: matching protocol ... ");
2606 // Get a protocol object off the stack (if none, allocate a new one)
2608 if (!(xp = p->Match(linkpsrv))) {
2609 msg = "match failed: protocol error: ";
2610 go = 0;
2611 }
2612 delete p;
2613 }
2614
2615 if (go) {
2616 // Save path into the protocol instance: it may be needed during Process
2617 XrdOucString apath(xps->AdminPath());
2618 apath += ".status";
2619 ((XrdProofdProtocol *)xp)->SetAdminPath(apath.c_str());
2620 // Take a short-cut and process the initial request as a sticky request
2621 if (xp->Process(linkpsrv) != 0) {
2622 msg = "handshake with internal link failed: ";
2623 go = 0;
2624 }
2625 }
2626
2627 // Attach this link to the appropriate poller and enable it.
2628 if (go && !XrdPoll::Attach(linkpsrv)) {
2629 msg = "could not attach new internal link to poller: ";
2630 go = 0;
2631 }
2632
2633 if (!go) {
2634 // Close the link
2635 if (linkpsrv)
2636 linkpsrv->Close();
2637 return -1;
2638 }
2639
2640 // Tight this protocol instance to the link
2641 linkpsrv->setProtocol(xp);
2642
2643 TRACE(REQ, "Protocol "<<xp<<" attached to link "<<linkpsrv<<" ("<< peerpsrv.InetName <<")");
2644
2645 // Schedule it
2646 fMgr->Sched()->Schedule((XrdJob *)linkpsrv);
2647
2648 // Save the protocol in the session instance
2649 xps->SetProtocol((XrdProofdProtocol *)xp);
2650
2651 // Done
2652 return 0;
2653}
2654
2655#else
2656
2657////////////////////////////////////////////////////////////////////////////////
2658/// Accept a callback from a starting-up server and setup the related protocol
2659/// object. Used for old servers.
2660/// Return 0 if successful or -1 in case of failure.
2661
2663 int to, XrdOucString &msg)
2664{
2665 XPDLOC(SMGR, "ProofServMgr::AcceptPeer")
2666
2667 // We will get back a peer to initialize a link
2668 XrdNetAddr netaddr;
2669
2670 // Check inputs
2671 if (!xps || !xps->UNIXSock()) {
2672 XPDFORM(msg, "session pointer undefined or socket invalid: %p", xps);
2673 return -1;
2674 }
2675 TRACE(REQ, "waiting for server callback for "<<to<<" secs ... on "<<xps->UNIXSockPath());
2676
2677 // Perform regular accept
2678 if (!(xps->UNIXSock()->Accept(netaddr, 0, to))) {
2679 msg = "timeout";
2680 return -1;
2681 }
2682
2683 // Setup the protocol serving this peer
2684 if (SetupProtocol(netaddr, xps, msg) != 0) {
2685 msg = "could not assert connected peer: ";
2686 return -1;
2687 }
2688
2689 // Done
2690 return 0;
2691}
2692
2693////////////////////////////////////////////////////////////////////////////////
2694/// Setup the protocol object serving the peer described by 'peerpsrv'
2695
2696int XrdProofdProofServMgr::SetupProtocol(XrdNetAddr &netaddr,
2697 XrdProofdProofServ *xps, XrdOucString &msg)
2698{
2699 XPDLOC(SMGR, "ProofServMgr::SetupProtocol")
2700
2701 // We will get back a peer to initialize a link
2702 XrdLink *linkpsrv = 0;
2703 XrdProtocol *xp = 0;
2704 int lnkopts = 0;
2705 bool go = 1;
2706
2707 // Allocate a new network object
2708 if (!(linkpsrv = XrdLink::Alloc(netaddr, lnkopts))) {
2709 msg = "could not allocate network object: ";
2710 go = 0;
2711 }
2712
2713 if (go) {
2714 TRACE(DBG, "connection accepted: matching protocol ... ");
2715 // Get a protocol object off the stack (if none, allocate a new one)
2717 if (!(xp = p->Match(linkpsrv))) {
2718 msg = "match failed: protocol error: ";
2719 go = 0;
2720 }
2721 delete p;
2722 }
2723
2724 if (go) {
2725 // Save path into the protocol instance: it may be needed during Process
2726 XrdOucString apath(xps->AdminPath());
2727 apath += ".status";
2728 ((XrdProofdProtocol *)xp)->SetAdminPath(apath.c_str());
2729 // Take a short-cut and process the initial request as a sticky request
2730 if (xp->Process(linkpsrv) != 0) {
2731 msg = "handshake with internal link failed: ";
2732 go = 0;
2733 }
2734 }
2735
2736 // Attach this link to the appropriate poller and enable it.
2737 if (go && !XrdPoll::Attach(linkpsrv)) {
2738 msg = "could not attach new internal link to poller: ";
2739 go = 0;
2740 }
2741
2742 if (!go) {
2743 // Close the link
2744 if (linkpsrv)
2745 linkpsrv->Close();
2746 return -1;
2747 }
2748
2749 // Tight this protocol instance to the link
2750 linkpsrv->setProtocol(xp);
2751
2752 TRACE(REQ, "Protocol "<<xp<<" attached to link "<<linkpsrv<<" ("<< netaddr.Name() <<")");
2753
2754 // Schedule it
2755 fMgr->Sched()->Schedule((XrdJob *)linkpsrv);
2756
2757 // Save the protocol in the session instance
2758 xps->SetProtocol((XrdProofdProtocol *)xp);
2759
2760 // Done
2761 return 0;
2762}
2763
2764#endif
2765
2766////////////////////////////////////////////////////////////////////////////////
2767/// Handle a request to detach from an existing session
2768
2770{
2771 XPDLOC(SMGR, "ProofServMgr::Detach")
2772
2773 int psid = -1, rc = 0;
2774 XPD_SETRESP(p, "Detach");
2775
2776 // Unmarshall the data
2777 psid = ntohl(p->Request()->proof.sid);
2778 TRACEP(p, REQ, "psid: "<<psid);
2779
2780 // Find server session
2781 XrdProofdProofServ *xps = 0;
2782 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
2783 TRACEP(p, XERR, "session ID not found: "<<psid);
2784 response->Send(kXR_InvalidRequest,"session ID not found");
2785 return 0;
2786 }
2787 xps->FreeClientID(p->Pid());
2788
2789 // Notify to user
2790 response->Send();
2791
2792 return 0;
2793}
2794
2795////////////////////////////////////////////////////////////////////////////////
2796/// Handle a request to shutdown an existing session
2797
2799{
2800 XPDLOC(SMGR, "ProofServMgr::Destroy")
2801
2802 int psid = -1, rc = 0;
2803 XPD_SETRESP(p, "Destroy");
2804
2805 // Unmarshall the data
2806 psid = ntohl(p->Request()->proof.sid);
2807 TRACEP(p, REQ, "psid: "<<psid);
2808
2809 XrdOucString msg;
2810
2811 // Find server session
2812 XrdProofdProofServ *xpsref = 0;
2813 if (psid > -1) {
2814 // Request for a specific session
2815 if (!p->Client() || !(xpsref = p->Client()->GetServer(psid))) {
2816 TRACEP(p, XERR, "reference session ID not found");
2817 response->Send(kXR_InvalidRequest,"reference session ID not found");
2818 return 0;
2819 }
2820 XPDFORM(msg, "session %d destroyed by %s", xpsref->SrvPID(), p->Link()->ID);
2821 } else {
2822 XPDFORM(msg, "all sessions destroyed by %s", p->Link()->ID);
2823 }
2824
2825 // Terminate the servers
2827 msg.c_str(), Pipe(), fMgr->ChangeOwn());
2828
2829 // Add to destroyed list
2830 fDestroyTimes[p] = time(0);
2831
2832 // Notify to user
2833 response->Send();
2834
2835 // Over
2836 return 0;
2837}
2838
2839////////////////////////////////////////////////////////////////////////////////
2840/// Run thorugh entries to broadcast the relevant priority
2841
2842static int WriteSessEnvs(const char *, XpdEnv *env, void *s)
2843{
2844 XPDLOC(SMGR, "WriteSessEnvs")
2845
2846 XrdOucString emsg;
2847
2848 XpdWriteEnv_t *xwe = (XpdWriteEnv_t *)s;
2849
2850 if (env && xwe && xwe->fMgr && xwe->fClient && xwe->fEnv) {
2851 if (env->fEnv.length() > 0) {
2852 // Resolve keywords
2853 xwe->fMgr->ResolveKeywords(env->fEnv, xwe->fClient);
2854 // Set the env now
2855 char *ev = new char[env->fEnv.length()+1];
2856 strncpy(ev, env->fEnv.c_str(), env->fEnv.length());
2857 ev[env->fEnv.length()] = 0;
2858 fprintf(xwe->fEnv, "%s\n", ev);
2859 TRACE(DBG, ev);
2860 PutEnv(ev, xwe->fExport);
2861 }
2862 // Go to next
2863 return 0;
2864 } else {
2865 emsg = "some input undefined";
2866 }
2867
2868 // Some problem
2869 TRACE(XERR,"protocol error: "<<emsg);
2870 return 1;
2871}
2872
2873////////////////////////////////////////////////////////////////////////////////
2874/// Set environment for proofserv; old version preparing the environment for
2875/// proofserv protocol version <= 13. Needed for backward compatibility.
2876
2878{
2879 XPDLOC(SMGR, "ProofServMgr::SetProofServEnvOld")
2880
2881 char *ev = 0;
2882
2883 // Check inputs
2884 if (!p || !p->Client() || !input) {
2885 TRACE(XERR, "at leat one input is invalid - cannot continue");
2886 return -1;
2887 }
2888
2889 // Set basic environment for proofserv
2890 if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
2891 TRACE(XERR, "problems setting basic environment - exit");
2892 return -1;
2893 }
2894
2895 ProofServEnv_t *in = (ProofServEnv_t *)input;
2896
2897 // Session proxy
2898 XrdProofdProofServ *xps = in->fPS;
2899 if (!xps) {
2900 TRACE(XERR, "unable to get instance of proofserv proxy");
2901 return -1;
2902 }
2903 int psid = xps->ID();
2904 TRACE(REQ, "psid: "<<psid<<", log: "<<in->fLogLevel);
2905
2906 // Work directory
2907 XrdOucString udir = p->Client()->Sandbox()->Dir();
2908 TRACE(DBG, "working dir for "<<p->Client()->User()<<" is: "<<udir);
2909
2910 size_t len = strlen("ROOTPROOFSESSDIR=") + in->fWrkDir.length() + 2;
2911 ev = new char[len];
2912 snprintf(ev, len, "ROOTPROOFSESSDIR=%s", in->fWrkDir.c_str());
2913 putenv(ev);
2914 TRACE(DBG, ev);
2915
2916 // Log level
2917 len = strlen("ROOTPROOFLOGLEVEL=") + 5;
2918 ev = new char[len];
2919 snprintf(ev, len, "ROOTPROOFLOGLEVEL=%d", in->fLogLevel);
2920 putenv(ev);
2921 TRACE(DBG, ev);
2922
2923 // Ordinal number
2924 len = strlen("ROOTPROOFORDINAL=")+strlen(xps->Ordinal()) + 2;
2925 ev = new char[len];
2926 snprintf(ev, len, "ROOTPROOFORDINAL=%s", xps->Ordinal());
2927 putenv(ev);
2928 TRACE(DBG, ev);
2929
2930 // ROOT Version tag if not the default one
2931 len = strlen("ROOTVERSIONTAG=")+strlen(p->Client()->ROOT()->Tag())+2;
2932 ev = new char[len];
2933 snprintf(ev, len, "ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
2934 putenv(ev);
2935 TRACE(DBG, ev);
2936
2937 // Create the env file
2938 TRACE(DBG, "creating env file");
2939 XrdOucString envfile = in->fWrkDir;
2940 envfile += ".env";
2941 FILE *fenv = fopen(envfile.c_str(), "w");
2942 if (!fenv) {
2943 TRACE(XERR,
2944 "unable to open env file: "<<envfile);
2945 return -1;
2946 }
2947 TRACE(DBG, "environment file: "<< envfile);
2948
2949 // Forwarded sec credentials, if any
2950 if (p->AuthProt()) {
2951
2952 // Additional envs possibly set by the protocol for next application
2953 XrdOucString secenvs(getenv("XrdSecENVS"));
2954 if (secenvs.length() > 0) {
2955 // Go through the list
2956 XrdOucString env;
2957 int from = 0;
2958 while ((from = secenvs.tokenize(env, from, ',')) != -1) {
2959 if (env.length() > 0) {
2960 // Set the env now
2961 ev = new char[env.length()+1];
2962 strncpy(ev, env.c_str(), env.length());
2963 ev[env.length()] = 0;
2964 putenv(ev);
2965 fprintf(fenv, "%s\n", ev);
2966 TRACE(DBG, ev);
2967 }
2968 }
2969 }
2970
2971 // The credential buffer, if any
2972 XrdSecCredentials *creds = p->AuthProt()->getCredentials();
2973 if (creds) {
2974 len = strlen("XrdSecCREDS=")+creds->size;
2975 ev = new char[len + 1];
2976 strcpy(ev, "XrdSecCREDS=");
2977 memcpy(ev + strlen("XrdSecCREDS="), creds->buffer, creds->size);
2978 ev[len] = 0;
2979 putenv(ev);
2980 TRACE(DBG, "XrdSecCREDS set");
2981 if (fCredsSaver) {
2982 XrdOucString credsdir = udir;
2983 credsdir += "/.creds";
2984 // Make sure the directory exists
2985 if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
2986 if ((*fCredsSaver)(creds, credsdir.c_str(), p->Client()->UI()) != 0) {
2987 TRACE(DBG, "problems in saving authentication creds under "<<credsdir);
2988 }
2989 } else {
2990 TRACE(XERR, "unable to create creds dir: "<<credsdir);
2991 fclose(fenv);
2992 return -1;
2993 }
2994 }
2995 }
2996 }
2997
2998 // Set ROOTSYS
2999 fprintf(fenv, "ROOTSYS=%s\n", xps->ROOT()->Dir());
3000
3001 // Set conf dir
3002 fprintf(fenv, "ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
3003
3004 // Set TMPDIR
3005 fprintf(fenv, "ROOTTMPDIR=%s\n", fMgr->TMPdir());
3006
3007 // Port (really needed?)
3008 fprintf(fenv, "ROOTXPDPORT=%d\n", fMgr->Port());
3009
3010 // Work dir
3011 fprintf(fenv, "ROOTPROOFWORKDIR=%s\n", udir.c_str());
3012
3013 // Session tag
3014 fprintf(fenv, "ROOTPROOFSESSIONTAG=%s\n", in->fSessionTag.c_str());
3015
3016 // Whether user specific config files are enabled
3017 if (fMgr->NetMgr()->WorkerUsrCfg())
3018 fprintf(fenv, "ROOTUSEUSERCFG=1\n");
3019
3020 // Set Open socket
3021 fprintf(fenv, "ROOTOPENSOCK=%s\n", xps->UNIXSockPath());
3022
3023 // Entity
3024 fprintf(fenv, "ROOTENTITY=%s@%s\n", p->Client()->User(), p->Link()->Host());
3025
3026 // Session ID
3027 fprintf(fenv, "ROOTSESSIONID=%d\n", psid);
3028
3029 // Client ID
3030 fprintf(fenv, "ROOTCLIENTID=%d\n", p->CID());
3031
3032 // Client Protocol
3033 fprintf(fenv, "ROOTPROOFCLNTVERS=%d\n", p->ProofProtocol());
3034
3035 // Ordinal number
3036 fprintf(fenv, "ROOTPROOFORDINAL=%s\n", xps->Ordinal());
3037
3038 // ROOT version tag if different from the default one
3039 if (getenv("ROOTVERSIONTAG"))
3040 fprintf(fenv, "ROOTVERSIONTAG=%s\n", getenv("ROOTVERSIONTAG"));
3041
3042 // Config file
3043 if (in->fCfg.length() > 0)
3044 fprintf(fenv, "ROOTPROOFCFGFILE=%s\n", in->fCfg.c_str());
3045
3046 // Log file in the log dir
3047 fprintf(fenv, "ROOTPROOFLOGFILE=%s\n", in->fLogFile.c_str());
3048 xps->SetFileout(in->fLogFile.c_str());
3049
3050 // Additional envs (xpd.putenv directive)
3051 { XrdSysMutexHelper mhp(fEnvsMutex);
3052 if (fProofServEnvs.size() > 0) {
3053 // Hash list of the directives applying to this {user, group, svn, version}
3054 XrdOucHash<XpdEnv> sessenvs;
3055 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
3056 for ( ; ienvs != fProofServEnvs.end(); ++ienvs) {
3057 int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
3058 p->Client()->ROOT()->VersionCode());
3059 if (envmatch >= 0) {
3060 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3061 if (env) {
3062 int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
3063 p->Client()->ROOT()->VersionCode());
3064 if (envmatch > envmtcex) {
3065 // Replace the entry
3066 env = &(*ienvs);
3067 sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
3068 }
3069 } else {
3070 // Add an entry
3071 env = &(*ienvs);
3072 sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
3073 }
3074 TRACE(HDBG, "Adding: "<<(*ienvs).fEnv);
3075 }
3076 }
3077 XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv, in->fOld};
3078 sessenvs.Apply(WriteSessEnvs, (void *)&xpwe);
3079 }
3080 }
3081
3082 // Set the user envs
3083 if (xps->UserEnvs() &&
3084 strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),"=")) {
3085 // The single components
3086 XrdOucString ue = xps->UserEnvs();
3087 XrdOucString env, namelist;
3088 int from = 0, ieq = -1;
3089 while ((from = ue.tokenize(env, from, ',')) != -1) {
3090 if (env.length() > 0 && (ieq = env.find('=')) != -1) {
3091 // Resolve keywords
3092 ResolveKeywords(env, in);
3093 ev = new char[env.length()+1];
3094 strncpy(ev, env.c_str(), env.length());
3095 ev[env.length()] = 0;
3096 putenv(ev);
3097 fprintf(fenv, "%s\n", ev);
3098 TRACE(DBG, ev);
3099 env.erase(ieq);
3100 if (namelist.length() > 0)
3101 namelist += ',';
3102 namelist += env;
3103 }
3104 }
3105 // The list of names, ','-separated
3106 len = strlen("PROOF_ALLVARS=") + namelist.length() + 2;
3107 ev = new char[len];
3108 snprintf(ev, len, "PROOF_ALLVARS=%s", namelist.c_str());
3109 putenv(ev);
3110 fprintf(fenv, "%s\n", ev);
3111 TRACE(DBG, ev);
3112 }
3113
3114 // Close file
3115 fclose(fenv);
3116
3117 // Create or Update symlink to last session
3118 TRACE(DBG, "creating symlink");
3119 XrdOucString syml = udir;
3120 if (p->ConnType() == kXPD_MasterWorker)
3121 syml += "/last-worker-session";
3122 else
3123 syml += "/last-master-session";
3124 if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
3125 TRACE(XERR, "problems creating symlink to last session (errno: "<<errno<<")");
3126 }
3127
3128 // We are done
3129 TRACE(DBG, "done");
3130 return 0;
3131}
3132
3133////////////////////////////////////////////////////////////////////////////////
3134/// Set basic environment accordingly to 'r'
3135
3137{
3138 XPDLOC(SMGR, "ProofServMgr::SetProofServEnv")
3139
3140 char *ev = 0;
3141 size_t len = 0;
3142
3143 TRACE(REQ, "ROOT dir: "<< (r ? r->Dir() : "*** undef ***"));
3144
3145 if (r) {
3146 char *libdir = (char *) r->LibDir();
3147 char *ldpath = 0;
3148 if (mgr->BareLibPath() && strlen(mgr->BareLibPath()) > 0) {
3149 len = 32 + strlen(libdir) + strlen(mgr->BareLibPath());
3150 ldpath = new char[len];
3151 snprintf(ldpath, len, "%s=%s:%s", XPD_LIBPATH, libdir, mgr->BareLibPath());
3152 } else {
3153 len = 32 + strlen(libdir);
3154 ldpath = new char[len];
3155 snprintf(ldpath, len, "%s=%s", XPD_LIBPATH, libdir);
3156 }
3157 putenv(ldpath);
3158 // Set ROOTSYS
3159 char *rootsys = (char *) r->Dir();
3160 len = 15 + strlen(rootsys);
3161 ev = new char[len];
3162 snprintf(ev, len, "ROOTSYS=%s", rootsys);
3163 putenv(ev);
3164
3165 // Set bin directory
3166 char *bindir = (char *) r->BinDir();
3167 len = 15 + strlen(bindir);
3168 ev = new char[len];
3169 snprintf(ev, len, "ROOTBINDIR=%s", bindir);
3170 putenv(ev);
3171
3172 // Set conf dir
3173 char *confdir = (char *) r->DataDir();
3174 len = 20 + strlen(confdir);
3175 ev = new char[len];
3176 snprintf(ev, len, "ROOTCONFDIR=%s", confdir);
3177 putenv(ev);
3178
3179 // Set TMPDIR
3180 len = 20 + strlen(mgr->TMPdir());
3181 ev = new char[len];
3182 snprintf(ev, len, "TMPDIR=%s", mgr->TMPdir());
3183 putenv(ev);
3184
3185 // Done
3186 return 0;
3187 }
3188
3189 // Bad input
3190 TRACE(XERR, "XrdROOT instance undefined!");
3191 return -1;
3192}
3193
3194////////////////////////////////////////////////////////////////////////////////
3195
3197 XrdProofdProofServ *xps,
3198 const char *sessiondir,
3199 const char *extension,
3200 XrdOucString &outfn)
3201{
3202 XrdOucString host = fMgr->Host();
3203 XrdOucString ord = xps->Ordinal();
3204 XrdOucString role;
3205
3206 // Shorten host name
3207 if (host.find(".") != STR_NPOS)
3208 host.erase(host.find("."));
3209
3210 if (p->ConnType() == kXPD_MasterWorker) role = "worker";
3211 else role = "master";
3212
3213 // File name format:
3214 // <sessiondir>/[master|worker]-<ordinal>-<host>.<ext>
3215 // No PID is contained
3216 XPDFORM(outfn, "%s/%s-%s-%s.%s",
3217 sessiondir,
3218 role.c_str(),
3219 ord.c_str(),
3220 host.c_str(),
3221 extension
3222 );
3223}
3224
3225////////////////////////////////////////////////////////////////////////////////
3226/// Determine the unique tag and relevant dirs for this session
3227
3230 XrdOucString &sesstag, XrdOucString &topsesstag,
3231 XrdOucString &sessiondir, XrdOucString &sesswrkdir)
3232{
3233 XPDLOC(SMGR, "GetTagDirs")
3234
3235 // Client sandbox
3236 XrdOucString udir = p->Client()->Sandbox()->Dir();
3237
3238 if (pid == 0) {
3239
3240 // Create the unique tag identify this session
3241 XrdOucString host = fMgr->Host();
3242 if (host.find(".") != STR_NPOS)
3243 host.erase(host.find("."));
3244 XPDFORM(sesstag, "%s-%d-", host.c_str(), (int)time(0));
3245
3246 // Session dir
3247 sessiondir = udir;
3248 if (p->ConnType() == kXPD_ClientMaster) {
3249 sessiondir += "/session-";
3250 sessiondir += sesstag;
3251 topsesstag = sesstag;
3252 } else {
3253 sessiondir += "/";
3254 sessiondir += xps->Tag();
3255 topsesstag = xps->Tag();
3256 topsesstag.replace("session-","");
3257 // If the child, make sure the directory exists ...
3258 if (XrdProofdAux::AssertDir(sessiondir.c_str(), p->Client()->UI(),
3259 fMgr->ChangeOwn()) == -1) {
3260 TRACE(XERR, "problems asserting dir '"<<sessiondir<<"' - errno: "<<errno);
3261 return;
3262 }
3263 }
3264
3265 } else if (pid > 0) {
3266
3267 // Finalize unique tag identifying this session
3268 sesstag += pid;
3269
3270 // Session dir
3271 if (p->ConnType() == kXPD_ClientMaster) {
3272 topsesstag = sesstag;
3273 sessiondir += pid;
3274 xps->SetTag(sesstag.c_str());
3275 }
3276
3277 // If the child, make sure the directory exists ...
3278 if (pid == (int) getpid()) {
3279 if (XrdProofdAux::AssertDir(sessiondir.c_str(), p->Client()->UI(),
3280 fMgr->ChangeOwn()) == -1) {
3281 return;
3282 }
3283 }
3284
3285 // The session working dir depends on the role
3286 sesswrkdir = sessiondir;
3287 if (p->ConnType() == kXPD_MasterWorker) {
3288 XPDFORM(sesswrkdir, "%s/worker-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
3289 } else {
3290 XPDFORM(sesswrkdir, "%s/master-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
3291 }
3292 } else {
3293 TRACE(XERR, "negative pid ("<<pid<<"): should not have got here!");
3294 }
3295
3296 // Done
3297 return;
3298}
3299
3300////////////////////////////////////////////////////////////////////////////////
3301/// Run thorugh entries to broadcast the relevant priority
3302
3303static int WriteSessRCs(const char *, XpdEnv *erc, void *f)
3304{
3305 XPDLOC(SMGR, "WriteSessRCs")
3306
3307 XrdOucString emsg;
3308 FILE *frc = (FILE *)f;
3309 if (frc && erc) {
3310 XrdOucString rc = erc->fEnv;
3311 if (rc.length() > 0) {
3312 if (rc.find("Proof.DataSetManager") != STR_NPOS) {
3313 TRACE(ALL,"Proof.DataSetManager ignored: use xpd.datasetsrc to define dataset managers");
3314 } else {
3315 fprintf(frc, "%s\n", rc.c_str());
3316 }
3317 }
3318 // Go to next
3319 return 0;
3320 } else {
3321 emsg = "file or input entry undefined";
3322 }
3323
3324 // Some problem
3325 TRACE(XERR,"protocol error: "<<emsg);
3326 return 1;
3327}
3328
3329////////////////////////////////////////////////////////////////////////////////
3330/// Set environment for proofserv
3331
3333{
3334 XPDLOC(SMGR, "ProofServMgr::SetProofServEnv")
3335
3336 // Check inputs
3337 if (!p || !p->Client() || !input) {
3338 TRACE(XERR, "at leat one input is invalid - cannot continue");
3339 return -1;
3340 }
3341
3342 // Old proofservs expect different settings
3343 int rootvers = p->Client()->ROOT() ? p->Client()->ROOT()->SrvProtVers() : -1;
3344 TRACE(DBG, "rootvers: "<< rootvers);
3345 if (rootvers < 14 && rootvers > -1)
3346 return SetProofServEnvOld(p, input);
3347
3348 ProofServEnv_t *in = (ProofServEnv_t *)input;
3349
3350 // Session proxy
3351 XrdProofdProofServ *xps = in->fPS;
3352 if (!xps) {
3353 TRACE(XERR, "unable to get instance of proofserv proxy");
3354 return -1;
3355 }
3356 int psid = xps->ID();
3357 TRACE(REQ, "psid: "<<psid<<", log: "<<in->fLogLevel);
3358
3359 // Client sandbox
3360 XrdOucString udir = p->Client()->Sandbox()->Dir();
3361 TRACE(DBG, "sandbox for "<<p->Client()->User()<<" is: "<<udir);
3362 TRACE(DBG, "session unique tag "<<in->fSessionTag);
3363 TRACE(DBG, "session dir " << in->fSessionDir);
3364 TRACE(DBG, "session working dir:" << in->fWrkDir);
3365
3366 // Log into the session it
3367 if (XrdProofdAux::ChangeToDir(in->fSessionDir.c_str(), p->Client()->UI(),
3368 fMgr->ChangeOwn()) != 0) {
3369 TRACE(XERR, "couldn't change directory to " << in->fSessionDir);
3370 return -1;
3371 }
3372
3373 // Set basic environment for proofserv
3374 if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
3375 TRACE(XERR, "problems setting basic environment - exit");
3376 return -1;
3377 }
3378
3379 // Create .rootrc and .env files
3380 TRACE(DBG, "creating rc and env files");
3381 XrdOucString rcfile, envfile;
3382 FormFileNameInSessionDir(p, xps, in->fSessionDir.c_str(), "rootrc", rcfile);
3383 if (CreateProofServRootRc(p, in, rcfile.c_str()) != 0) {
3384 TRACE(XERR, "problems creating RC file "<<rcfile.c_str());
3385 return -1;
3386 }
3387
3388 FormFileNameInSessionDir(p, xps, in->fSessionDir.c_str(), "env", envfile);
3389 if (CreateProofServEnvFile(p, in, envfile.c_str(), rcfile.c_str()) != 0) {
3390 TRACE(XERR, "problems creating environment file "<<envfile.c_str());
3391 return -1;
3392 }
3393
3394 // Create or Update symlink to last session
3395 if (in->fOld) {
3396 TRACE(REQ, "creating symlink");
3397 XrdOucString syml = udir;
3398 if (p->ConnType() == kXPD_MasterWorker)
3399 syml += "/last-worker-session";
3400 else
3401 syml += "/last-master-session";
3402 if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
3403 TRACE(XERR, "problems creating symlink to "
3404 " last session (errno: "<<errno<<")");
3405 }
3406 }
3407
3408 // We are done
3409 TRACE(REQ, "done");
3410 return 0;
3411}
3412
3413////////////////////////////////////////////////////////////////////////////////
3414/// Create in 'rcfn' the rootrc file for the proofserv being created
3415/// return 0 on success, -1 on error
3416
3418 const char *envfn, const char *rcfn)
3419{
3420 XPDLOC(SMGR, "ProofServMgr::CreateProofServEnvFile")
3421
3422 // Check inputs
3423 if (!p || !input || (!envfn ||
3424 (envfn && strlen(envfn) <= 0)) || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3425 TRACE(XERR, "invalid inputs!");
3426 return -1;
3427 }
3428
3429 // Attach the structure
3430 ProofServEnv_t *in = (ProofServEnv_t *)input;
3431
3432 // Session proxy
3433 XrdProofdProofServ *xps = in->fPS;
3434 if (!xps) {
3435 TRACE(XERR, "unable to get instance of proofserv proxy");
3436 return -1;
3437 }
3438
3439 FILE *fenv = fopen(envfn, "w");
3440 if (!fenv) {
3441 TRACE(XERR, "unable to open env file: "<<envfn);
3442 return -1;
3443 }
3444 TRACE(REQ, "environment file: "<< envfn);
3445
3446 char *ev = 0;
3447 size_t len = 0;
3448 // Forwarded sec credentials, if any
3449 if (p->AuthProt()) {
3450
3451 // Additional envs possibly set by the protocol for next application
3452 XrdOucString secenvs(getenv("XrdSecENVS"));
3453 if (secenvs.length() > 0) {
3454 // Go through the list
3455 XrdOucString env;
3456 int from = 0;
3457 while ((from = secenvs.tokenize(env, from, ',')) != -1) {
3458 if (env.length() > 0) {
3459 // Set the env now
3460 ev = new char[env.length()+1];
3461 strncpy(ev, env.c_str(), env.length());
3462 ev[env.length()] = 0;
3463 fprintf(fenv, "%s\n", ev);
3464 TRACE(DBG, ev);
3465 PutEnv(ev, in->fOld);
3466 }
3467 }
3468 }
3469
3470 // The credential buffer, if any
3471 XrdSecCredentials *creds = p->AuthProt()->getCredentials();
3472 if (creds) {
3473 int lev = strlen("XrdSecCREDS=") + creds->size;
3474 ev = new char[lev+1];
3475 strncpy(ev, "XrdSecCREDS=", lev);
3476 memcpy(ev+strlen("XrdSecCREDS="), creds->buffer, creds->size);
3477 ev[lev] = 0;
3478 PutEnv(ev, in->fOld);
3479 TRACE(DBG, "XrdSecCREDS set");
3480
3481 if (fCredsSaver) {
3482 XrdOucString credsdir = p->Client()->Sandbox()->Dir();
3483 credsdir += "/.creds";
3484 // Make sure the directory exists
3485 if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
3486 if ((*fCredsSaver)(creds, credsdir.c_str(), p->Client()->UI()) != 0) {
3487 TRACE(DBG, "problems in saving authentication creds under "<<credsdir);
3488 }
3489 } else {
3490 TRACE(XERR, "unable to create creds dir: "<<credsdir);
3491 fclose(fenv);
3492 return -1;
3493 }
3494 }
3495 }
3496 }
3497
3498 // Library path
3499 fprintf(fenv, "%s=%s\n", XPD_LIBPATH, getenv(XPD_LIBPATH));
3500
3501 // ROOTSYS
3502 fprintf(fenv, "ROOTSYS=%s\n", xps->ROOT()->Dir());
3503
3504 // Conf dir
3505 fprintf(fenv, "ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
3506
3507 // TMPDIR
3508 fprintf(fenv, "TMPDIR=%s\n", fMgr->TMPdir());
3509
3510 // RC file
3511 if (in->fOld) {
3512 len = strlen("ROOTRCFILE=") + strlen(rcfn) + 2;
3513 ev = new char[len];
3514 snprintf(ev, len, "ROOTRCFILE=%s", rcfn);
3515 fprintf(fenv, "%s\n", ev);
3516 TRACE(DBG, ev);
3517 PutEnv(ev, in->fOld);
3518 }
3519
3520 // ROOT version tag (needed in building packages)
3521 len = strlen("ROOTVERSIONTAG=") + strlen(p->Client()->ROOT()->Tag()) + 2;
3522 ev = new char[len];
3523 snprintf(ev, len, "ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
3524 fprintf(fenv, "%s\n", ev);
3525 TRACE(DBG, ev);
3526 PutEnv(ev, in->fOld);
3527
3528 // Log file in the log dir
3529 if (in->fOld) {
3530 len = strlen("ROOTPROOFLOGFILE=") + in->fLogFile.length() + 2;
3531 ev = new char[len];
3532 snprintf(ev, len, "ROOTPROOFLOGFILE=%s", in->fLogFile.c_str());
3533 fprintf(fenv, "%s\n", ev);
3534 xps->SetFileout(in->fLogFile.c_str());
3535 TRACE(DBG, ev);
3536 PutEnv(ev, in->fOld);
3537 }
3538
3539 // Local data server
3540 XrdOucString locdatasrv;
3541 XPDFORM(locdatasrv, "root://%s", fMgr->Host());
3542
3543 int nrk = fMgr->ResolveKeywords(locdatasrv, p->Client());
3544 TRACE(HDBG, nrk << " placeholders resolved for LOCALDATASERVER");
3545 len = strlen("LOCALDATASERVER=") + locdatasrv.length() + 2;
3546 ev = new char[len];
3547 snprintf(ev, len, "LOCALDATASERVER=%s", locdatasrv.c_str());
3548 fprintf(fenv, "%s\n", ev);
3549 TRACE(DBG, ev);
3550 PutEnv(ev, in->fOld);
3551
3552 // Xrootd config file
3553 if (CfgFile()) {
3554 len = strlen("XRDCF=") + strlen(CfgFile()) + 2;
3555 ev = new char[len];
3556 snprintf(ev, len, "XRDCF=%s", CfgFile());
3557 fprintf(fenv, "%s\n", ev);
3558 TRACE(DBG, ev);
3559 PutEnv(ev, in->fOld);
3560 }
3561
3562 // Additional envs (xpd.putenv directive)
3563 { XrdSysMutexHelper mhp(fEnvsMutex);
3564 if (fProofServEnvs.size() > 0) {
3565 // Hash list of the directives applying to this {user, group, svn, version}
3566 XrdOucHash<XpdEnv> sessenvs;
3567 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
3568 for ( ; ienvs != fProofServEnvs.end(); ++ienvs) {
3569 int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
3570 p->Client()->ROOT()->VersionCode());
3571 if (envmatch >= 0) {
3572 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3573 if (env) {
3574 int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
3575 p->Client()->ROOT()->VersionCode());
3576 if (envmatch > envmtcex) {
3577 // Replace the entry
3578 env = &(*ienvs);
3579 sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
3580 }
3581 } else {
3582 // Add an entry
3583 env = &(*ienvs);
3584 sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
3585 }
3586 TRACE(HDBG, "Adding: "<<(*ienvs).fEnv);
3587 }
3588 }
3589 XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv, in->fOld};
3590 sessenvs.Apply(WriteSessEnvs, (void *)&xpwe);
3591 }
3592 }
3593 // Set the user envs
3594 if (xps->UserEnvs() &&
3595 strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),"=")) {
3596 // The single components
3597 XrdOucString ue = xps->UserEnvs();
3598 XrdOucString env, namelist;
3599 int from = 0, ieq = -1;
3600 while ((from = ue.tokenize(env, from, ',')) != -1) {
3601 if (env.length() > 0 && (ieq = env.find('=')) != -1) {
3602 // Resolve keywords
3603 ResolveKeywords(env, in);
3604 ev = new char[env.length()+1];
3605 strncpy(ev, env.c_str(), env.length());
3606 ev[env.length()] = 0;
3607 if (env.find("WRAPPERCMD") == STR_NPOS || !xps->IsPLite())
3608 fprintf(fenv, "%s\n", ev);
3609 TRACE(DBG, ev);
3610 PutEnv(ev, in->fOld);
3611 if (env.find("WRAPPERCMD") == STR_NPOS || !xps->IsPLite()) {
3612 env.erase(ieq);
3613 if (namelist.length() > 0)
3614 namelist += ',';
3615 namelist += env;
3616 }
3617 }
3618 }
3619 // The list of names, ','-separated
3620 len = strlen("PROOF_ALLVARS=") + namelist.length() + 2;
3621 ev = new char[len];
3622 snprintf(ev, len, "PROOF_ALLVARS=%s", namelist.c_str());
3623 fprintf(fenv, "%s\n", ev);
3624 TRACE(DBG, ev);
3625 PutEnv(ev, in->fOld);
3626 }
3627
3628 // Close file
3629 fclose(fenv);
3630
3631 // We are done
3632 return 0;
3633}
3634
3635////////////////////////////////////////////////////////////////////////////////
3636/// Create in 'rcfn' the rootrc file for the proofserv being created
3637/// return 0 on success, -1 on error
3638
3640 void *input, const char *rcfn)
3641{
3642 XPDLOC(SMGR, "ProofServMgr::CreateProofServRootRc")
3643
3644 // Check inputs
3645 if (!p || !input || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3646 TRACE(XERR, "invalid inputs!");
3647 return -1;
3648 }
3649
3650 // Attach the structure
3651 ProofServEnv_t *in = (ProofServEnv_t *)input;
3652
3653 // Session proxy
3654 XrdProofdProofServ *xps = in->fPS;
3655 if (!xps) {
3656 TRACE(XERR, "unable to get instance of proofserv proxy");
3657 return -1;
3658 }
3659 int psid = xps->ID();
3660
3661 FILE *frc = fopen(rcfn, "w");
3662 if (!frc) {
3663 TRACE(XERR, "unable to open rootrc file: "<<rcfn);
3664 return -1;
3665 }
3666 // Symlink to session.rootrc
3667 if (in->fOld) {
3668 if (XrdProofdAux::SymLink(rcfn, "session.rootrc") != 0) {
3669 TRACE(XERR, "problems creating symlink to 'session.rootrc' (errno: "<<errno<<")");
3670 }
3671 }
3672 TRACE(REQ, "session rootrc file: "<< rcfn);
3673
3674 // Port
3675 fprintf(frc, "# XrdProofdProtocol listening port\n");
3676 fprintf(frc, "ProofServ.XpdPort: %d\n", fMgr->Port());
3677
3678 // Local root prefix
3679 if (fMgr->LocalROOT() && strlen(fMgr->LocalROOT()) > 0) {
3680 fprintf(frc, "# Prefix to be prepended to local paths\n");
3681 fprintf(frc, "Path.Localroot: %s\n", fMgr->LocalROOT());
3682 }
3683
3684 // Data pool entry-point URL
3685 if (fMgr->PoolURL() && strlen(fMgr->PoolURL()) > 0) {
3686 XrdOucString purl(fMgr->PoolURL());
3687 if (!purl.endswith("/"))
3688 purl += "/";
3689 fprintf(frc, "# URL for the data pool entry-point\n");
3690 fprintf(frc, "ProofServ.PoolUrl: %s\n", purl.c_str());
3691 }
3692
3693 // The session working dir depends on the role
3694 if (in->fOld) {
3695 fprintf(frc, "# The session working dir\n");
3696 fprintf(frc, "ProofServ.SessionDir: %s\n", in->fWrkDir.c_str());
3697 }
3698
3699 // Log / Debug level
3700 fprintf(frc, "# Proof Log/Debug level\n");
3701 fprintf(frc, "Proof.DebugLevel: %d\n", in->fLogLevel);
3702
3703 // Ordinal number
3704 fprintf(frc, "# Ordinal number\n");
3705 fprintf(frc, "ProofServ.Ordinal: %s\n", xps->Ordinal());
3706
3707 // ROOT Version tag
3708 if (p->Client()->ROOT()) {
3709 fprintf(frc, "# ROOT Version tag\n");
3710 fprintf(frc, "ProofServ.RootVersionTag: %s\n", p->Client()->ROOT()->Tag());
3711 }
3712 // Proof group
3713 if (p->Client()->Group()) {
3714 fprintf(frc, "# Proof group\n");
3715 fprintf(frc, "ProofServ.ProofGroup: %s\n", p->Client()->Group());
3716 }
3717
3718 // Path to file with group information
3719 if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->GetCfgFile()) {
3720 fprintf(frc, "# File with group information\n");
3721 fprintf(frc, "Proof.GroupFile: %s\n", fMgr->GroupsMgr()->GetCfgFile());
3722 }
3723
3724 // Work dir
3725 XrdOucString udir = p->Client()->Sandbox()->Dir();
3726 fprintf(frc, "# Users sandbox\n");
3727 fprintf(frc, "ProofServ.Sandbox: %s\n", udir.c_str());
3728
3729 // Image
3730 if (fMgr->Image() && strlen(fMgr->Image()) > 0) {
3731 fprintf(frc, "# Server image\n");
3732 fprintf(frc, "ProofServ.Image: %s\n", fMgr->Image());
3733 }
3734
3735 // Session tags
3736 if (in->fOld) {
3737 fprintf(frc, "# Session tag\n");
3738 fprintf(frc, "ProofServ.SessionTag: %s\n", in->fSessionTag.c_str());
3739 fprintf(frc, "# Top Session tag\n");
3740 fprintf(frc, "ProofServ.TopSessionTag: %s\n", in->fTopSessionTag.c_str());
3741 }
3742
3743 // Session admin path
3744 fprintf(frc, "# Session admin path\n");
3745 int proofvrs = (p->Client()->ROOT()) ? p->Client()->ROOT()->SrvProtVers() : -1;
3746 if (proofvrs < 0 || proofvrs < 27) {
3747 // Use the first version of the session status file
3748 fprintf(frc, "ProofServ.AdminPath: %s\n", xps->AdminPath());
3749 } else {
3750 if (in->fOld) {
3751 // New version with updated status
3752 fprintf(frc, "ProofServ.AdminPath: %s.status\n", xps->AdminPath());
3753 }
3754 }
3755
3756 // Whether user specific config files are enabled
3757 if (fMgr->NetMgr()->WorkerUsrCfg()) {
3758 fprintf(frc, "# Whether user specific config files are enabled\n");
3759 fprintf(frc, "ProofServ.UseUserCfg: 1\n");
3760 }
3761 // Set Open socket
3762 fprintf(frc, "# Open socket\n");
3763 fprintf(frc, "ProofServ.OpenSock: %s\n", xps->UNIXSockPath());
3764 // Entity
3765 fprintf(frc, "# Entity\n");
3766 if (p->Client()->UI().fGroup.length() > 0)
3767 fprintf(frc, "ProofServ.Entity: %s:%s@%s\n",
3768 p->Client()->User(), p->Client()->UI().fGroup.c_str(), p->Link()->Host());
3769 else
3770 fprintf(frc, "ProofServ.Entity: %s@%s\n", p->Client()->User(), p->Link()->Host());
3771
3772
3773 // Session ID
3774 fprintf(frc, "# Session ID\n");
3775 fprintf(frc, "ProofServ.SessionID: %d\n", psid);
3776
3777 // Client ID
3778 fprintf(frc, "# Client ID\n");
3779 fprintf(frc, "ProofServ.ClientID: %d\n", p->CID());
3780
3781 // Client Protocol
3782 fprintf(frc, "# Client Protocol\n");
3783 fprintf(frc, "ProofServ.ClientVersion: %d\n", p->ProofProtocol());
3784
3785 // Config file
3786 if (in->fCfg.length() > 0) {
3787 if (in->fCfg == "masteronly") {
3788 fprintf(frc, "# MasterOnly option\n");
3789 // Master Only setup
3790 fprintf(frc, "Proof.MasterOnly: 1\n");
3791 } else {
3792 fprintf(frc, "# Config file\n");
3793 // User defined
3794 fprintf(frc, "ProofServ.ProofConfFile: %s\n", in->fCfg.c_str());
3795 }
3796 } else {
3797 fprintf(frc, "# Config file\n");
3798 if (fMgr->IsSuperMst()) {
3799 fprintf(frc, "ProofServ.ProofConfFile: sm:\n");
3800 } else if (xps->IsPLite()) {
3801 fprintf(frc, "ProofServ.ProofConfFile: lite:\n");
3802 fprintf(frc, "# Number of ProofLite workers\n");
3803 fprintf(frc, "ProofLite.Workers: %d\n", xps->PLiteNWrks());
3804 fprintf(frc, "# Users sandbox\n");
3805 fprintf(frc, "ProofLite.Sandbox: %s\n", udir.c_str());
3806 fprintf(frc, "# No subpaths\n");
3807 fprintf(frc, "ProofLite.SubPath: 0\n");
3808 } else if (fProofPlugin.length() > 0) {
3809 fprintf(frc, "ProofServ.ProofConfFile: %s\n", fProofPlugin.c_str());
3810 }
3811 }
3812
3813 // We set this to avoid blocking to much on xrdclient actions; they can be
3814 // oevrwritten with explicit putrc directives
3815 fprintf(frc, "# Default settings for XrdClient\n");
3816 fprintf(frc, "XNet.FirstConnectMaxCnt 3\n");
3817 fprintf(frc, "XNet.ConnectTimeout 5\n");
3818
3819 // This is a workaround for a problem fixed in 5.24/00
3820 int vrscode = (p->Client()->ROOT()) ? p->Client()->ROOT()->VersionCode() : -1;
3821 if (vrscode > 0 && vrscode < XrdROOT::GetVersionCode(5,24,0)) {
3822 fprintf(frc, "# Force remote reading also for local files to avoid a wrong TTreeCache initialization\n");
3823 fprintf(frc, "Path.ForceRemote 1\n");
3824 }
3825
3826 // Additional rootrcs (xpd.putrc directive)
3827 { XrdSysMutexHelper mhp(fEnvsMutex);
3828 if (fProofServRCs.size() > 0) {
3829 fprintf(frc, "# Additional rootrcs (xpd.putrc directives)\n");
3830 // Hash list of the directives applying to this {user, group, svn, version}
3831 XrdOucHash<XpdEnv> sessrcs;
3832 std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
3833 for ( ; ircs != fProofServRCs.end(); ++ircs) {
3834 int rcmatch = (*ircs).Matches(p->Client()->User(), p->Client()->Group(),
3835 p->Client()->ROOT()->VersionCode());
3836 if (rcmatch >= 0) {
3837 XpdEnv *rcenv = sessrcs.Find((*ircs).fName.c_str());
3838 if (rcenv) {
3839 int rcmtcex = rcenv->Matches(p->Client()->User(), p->Client()->Group(),
3840 p->Client()->ROOT()->VersionCode());
3841 if (rcmatch > rcmtcex) {
3842 // Replace the entry
3843 rcenv = &(*ircs);
3844 sessrcs.Rep(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
3845 }
3846 } else {
3847 // Add an entry
3848 rcenv = &(*ircs);
3849 sessrcs.Add(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
3850 }
3851 TRACE(HDBG, "Adding: "<<(*ircs).fEnv);
3852 }
3853 }
3854 sessrcs.Apply(WriteSessRCs, (void *)frc);
3855 }
3856 }
3857 // If applicable, add dataset managers initiators
3858 if (fMgr->DataSetSrcs()->size() > 0) {
3859 fprintf(frc, "# Dataset sources\n");
3860 XrdOucString rc("Proof.DataSetManager: ");
3861 std::list<XrdProofdDSInfo *>::iterator ii;
3862 for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ++ii) {
3863 if (ii != fMgr->DataSetSrcs()->begin()) rc += ", ";
3864 rc += (*ii)->fType;
3865 rc += " dir:";
3866 rc += (*ii)->fUrl;
3867 rc += " opt:";
3868 rc += (*ii)->fOpts;
3869 rc += " ";
3870 rc += (*ii)->fObscure;
3871 }
3872 fprintf(frc, "%s\n", rc.c_str());
3873 }
3874
3875 // If applicable, add staging requests repository directive initiator
3876 if (strlen(fMgr->StageReqRepo()) > 0) {
3877 fprintf(frc, "# Dataset staging requests repository\n");
3878 fprintf(frc, "Proof.DataSetStagingRequests: %s\n", fMgr->StageReqRepo());
3879 }
3880
3881 // If applicable, add datadir location
3882 if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0) {
3883 fprintf(frc, "# Data directory\n");
3884 XrdOucString rc;
3885 XPDFORM(rc, "ProofServ.DataDir: %s/%s/%s/%s/%s", fMgr->DataDir(),
3886 p->Client()->Group(), p->Client()->User(), xps->Ordinal(),
3887 in->fSessionTag.c_str());
3888 if (fMgr->DataDirUrlOpts() && strlen(fMgr->DataDirUrlOpts()) > 0) {
3889 fprintf(frc, "%s %s\n", rc.c_str(), fMgr->DataDirUrlOpts());
3890 } else {
3891 fprintf(frc, "%s\n", rc.c_str());
3892 }
3893 }
3894
3895 // Done with this
3896 fclose(frc);
3897
3898 // Done
3899 return 0;
3900}
3901
3902////////////////////////////////////////////////////////////////////////////////
3903/// Cleanup (kill) all 'proofserv' processes which lost control from their
3904/// creator or controller daemon. We rely here on the information in the admin
3905/// path(s) (<xrd_admin>/.xproof.<port>).
3906/// This is called regurarly by the cron job to avoid having proofservs around.
3907/// Return number of process killed or -1 in case of any error.
3908
3910{
3911 XPDLOC(SMGR, "ProofServMgr::CleanupLostProofServ")
3912
3913 if (!fCheckLost) {
3914 TRACE(REQ, "disabled ...");
3915 return 0;
3916 }
3917
3918 TRACE(REQ, "checking for orphalin proofserv processes ...");
3919 int nk = 0;
3920
3921 // Get the list of existing proofserv processes from the process table
3922 std::map<int,XrdOucString> procs;
3923 if (XrdProofdAux::GetProcesses("proofserv", &procs) <= 0) {
3924 TRACE(DBG, " no proofservs around: nothing to do");
3925 return 0;
3926 }
3927
3928 XrdProofUI ui;
3929 if (XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui) != 0) {
3930 TRACE(DBG, "problems getting info for user " << fMgr->EffectiveUser());
3931 return -1;
3932 }
3933
3934 // Hash list of controlled and xrootd process
3935 XrdOucRash<int, int> controlled, xrdproc;
3936
3937 // Hash list of sessions files loaded
3938 XrdOucHash<XrdOucString> sessionspaths;
3939
3940 // For each process extract the information about the daemon supposed to be in control
3941 int pid, ia, a;
3942 XrdOucString cmd, apath, pidpath, sessiondir, emsg, rest, after;
3943 std::map<int,XrdOucString>::iterator ip;
3944 for (ip = procs.begin(); ip != procs.end(); ++ip) {
3945 pid = ip->first;
3946 cmd = ip->second;
3947 if ((ia = cmd.find("xpdpath:")) != STR_NPOS) {
3948 cmd.tokenize(apath, ia, ' ');
3949 apath.replace("xpdpath:", "");
3950 if (apath.length() <= 0) {
3951 TRACE(ALL, "admin path not found; initial cmd line: "<<cmd);
3952 continue;
3953 }
3954 // Extract daemon PID and check that it is alive
3955 XPDFORM(pidpath, "%s/xrootd.pid", apath.c_str());
3956 TRACE(ALL, "pidpath: "<<pidpath);
3957 int xpid = XrdProofdAux::GetIDFromPath(pidpath.c_str(), emsg);
3958 int *alive = xrdproc.Find(xpid);
3959 if (!alive) {
3960 a = (XrdProofdAux::VerifyProcessByID(xpid, fParentExecs.c_str())) ? 1 : 0;
3961 xrdproc.Add(xpid, a);
3962 } else {
3963 a = *alive;
3964 }
3965 // If the daemon is still there check that the process has its entry in the
3966 // session path(s);
3967 bool ok = 0;
3968 if (a == 1) {
3969 const char *subdir[2] = {"activesessions", "terminatedsessions"};
3970 for (int i = 0; i < 2; i++) {
3971 XPDFORM(sessiondir, "%s/%s", apath.c_str(), subdir[i]);
3972 if (!sessionspaths.Find(sessiondir.c_str())) {
3973 DIR *sdir = opendir(sessiondir.c_str());
3974 if (!sdir) {
3975 XPDFORM(emsg, "cannot open '%s' - errno: %d", apath.c_str(), errno);
3976 TRACE(XERR, emsg.c_str());
3977 continue;
3978 }
3979 struct dirent *sent = 0;
3980 while ((sent = readdir(sdir))) {
3981 if (!strncmp(sent->d_name, ".", 1) || !strncmp(sent->d_name, "..", 2))
3982 continue;
3983 // Get the pid
3984 int ppid = XrdProofdAux::ParsePidPath(sent->d_name, rest, after);
3985 // Add to the list
3986 controlled.Add(ppid, ppid);
3987 }
3988 closedir(sdir);
3989 sessionspaths.Add(sessiondir.c_str(), 0, 0, Hash_data_is_key);
3990 }
3991 ok = (controlled.Find(pid)) ? 1 : ok;
3992 // We are done, if the process is controlled
3993 if (ok) break;
3994 }
3995 }
3996 // If the process is not controlled we have to kill it
3997 if (!ok) {
3998 TRACE(ALL,"process: "<<pid<<" lost its controller: killing");
3999 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
4000 nk++;
4001 }
4002 }
4003
4004 }
4005
4006 // Done
4007 return nk;
4008}
4009
4010////////////////////////////////////////////////////////////////////////////////
4011/// Cleanup (kill) all 'proofserv' processes from the process table.
4012/// Only the processes associated with 'usr' are killed,
4013/// unless 'all' is TRUE, in which case all 'proofserv' instances are
4014/// terminated (this requires superuser privileges).
4015/// Super users can also terminated all processes fo another user (specified
4016/// via usr).
4017/// Return number of process notified for termination on success, -1 otherwise
4018
4019int XrdProofdProofServMgr::CleanupProofServ(bool all, const char *usr)
4020{
4021 XPDLOC(SMGR, "ProofServMgr::CleanupProofServ")
4022
4023 TRACE(REQ, "all: "<<all<<", usr: " << (usr ? usr : "undef"));
4024 int nk = 0;
4025
4026 // Name
4027 const char *pn = "proofserv";
4028
4029 // Uid
4030 XrdProofUI ui;
4031 int refuid = -1;
4032 if (!all) {
4033 if (!usr) {
4034 TRACE(DBG, "usr must be defined for all = FALSE");
4035 return -1;
4036 }
4037 if (XrdProofdAux::GetUserInfo(usr, ui) != 0) {
4038 TRACE(DBG, "problems getting info for user " << usr);
4039 return -1;
4040 }
4041 refuid = ui.fUid;
4042 }
4043
4044#if defined(linux)
4045 // Loop over the "/proc" dir
4046 DIR *dir = opendir("/proc");
4047 if (!dir) {
4048 XrdOucString emsg("cannot open /proc - errno: ");
4049 emsg += errno;
4050 TRACE(DBG, emsg.c_str());
4051 return -1;
4052 }
4053
4054 struct dirent *ent = 0;
4055 while ((ent = readdir(dir))) {
4056 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
4057 if (DIGIT(ent->d_name[0])) {
4058 XrdOucString fn("/proc/", 256);
4059 fn += ent->d_name;
4060 fn += "/status";
4061 // Open file
4062 FILE *ffn = fopen(fn.c_str(), "r");
4063 if (!ffn) {
4064 XrdOucString emsg("cannot open file ");
4065 emsg += fn; emsg += " - errno: "; emsg += errno;
4066 TRACE(HDBG, emsg);
4067 continue;
4068 }
4069 // Read info
4070 bool xname = 1, xpid = 1, xppid = 1;
4071 bool xuid = (all) ? 0 : 1;
4072 int pid = -1;
4073 int ppid = -1;
4074 char line[2048] = { 0 };
4075 while (fgets(line, sizeof(line), ffn) &&
4076 (xname || xpid || xppid || xuid)) {
4077 // Check name
4078 if (xname && strstr(line, "Name:")) {
4079 if (!strstr(line, pn))
4080 break;
4081 xname = 0;
4082 }
4083 if (xpid && strstr(line, "Pid:")) {
4084 pid = (int) XrdProofdAux::GetLong(&line[strlen("Pid:")]);
4085 xpid = 0;
4086 }
4087 if (xppid && strstr(line, "PPid:")) {
4088 ppid = (int) XrdProofdAux::GetLong(&line[strlen("PPid:")]);
4089 // Parent process must be us or be dead
4090 if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
4091 // Process created by another running xrootd
4092 break;
4093 xppid = 0;
4094 }
4095 if (xuid && strstr(line, "Uid:")) {
4096 int uid = (int) XrdProofdAux::GetLong(&line[strlen("Uid:")]);
4097 if (refuid == uid)
4098 xuid = 0;
4099 }
4100 }
4101 // Close the file
4102 fclose(ffn);
4103 // If this is a good candidate, kill it
4104 if (!xname && !xpid && !xppid && !xuid) {
4105
4106 bool muok = 1;
4107 if (fMgr->MultiUser() && !all) {
4108 // We need to check the user name: we may be the owner of somebody
4109 // else process; if not session is attached, we kill it
4110 muok = 0;
4112 if (!srv || (srv && !strcmp(usr, srv->Client())))
4113 muok = 1;
4114 }
4115 if (muok)
4116 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
4117 nk++;
4118 }
4119 }
4120 }
4121 // Close the directory
4122 closedir(dir);
4123
4124#elif defined(__sun)
4125
4126 // Loop over the "/proc" dir
4127 DIR *dir = opendir("/proc");
4128 if (!dir) {
4129 XrdOucString emsg("cannot open /proc - errno: ");
4130 emsg += errno;
4131 TRACE(DBG, emsg);
4132 return -1;
4133 }
4134
4135 struct dirent *ent = 0;
4136 while ((ent = readdir(dir))) {
4137 if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2)) continue;
4138 if (DIGIT(ent->d_name[0])) {
4139 XrdOucString fn("/proc/", 256);
4140 fn += ent->d_name;
4141 fn += "/psinfo";
4142 // Open file
4143 int ffd = open(fn.c_str(), O_RDONLY);
4144 if (ffd <= 0) {
4145 XrdOucString emsg("cannot open file ");
4146 emsg += fn; emsg += " - errno: "; emsg += errno;
4147 TRACE(HDBG, emsg);
4148 continue;
4149 }
4150 // Read info
4151 bool xname = 1;
4152 bool xuid = (all) ? 0 : 1;
4153 bool xppid = 1;
4154 // Get the information
4155 psinfo_t psi;
4156 if (read(ffd, &psi, sizeof(psinfo_t)) != sizeof(psinfo_t)) {
4157 XrdOucString emsg("cannot read ");
4158 emsg += fn; emsg += ": errno: "; emsg += errno;
4159 TRACE(XERR, emsg);
4160 close(ffd);
4161 continue;
4162 }
4163 // Close the file
4164 close(ffd);
4165
4166 // Check name
4167 if (xname) {
4168 if (!strstr(psi.pr_fname, pn))
4169 continue;
4170 xname = 0;
4171 }
4172 // Check uid, if required
4173 if (xuid) {
4174 if (refuid == psi.pr_uid)
4175 xuid = 0;
4176 }
4177 // Parent process must be us or be dead
4178 int ppid = psi.pr_ppid;
4179 if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str())) {
4180 // Process created by another running xrootd
4181 continue;
4182 xppid = 0;
4183 }
4184
4185 // If this is a good candidate, kill it
4186 if (!xname && !xppid && !xuid) {
4187 bool muok = 1;
4188 if (fMgr->MultiUser() && !all) {
4189 // We need to check the user name: we may be the owner of somebody
4190 // else process; if no session is attached , we kill it
4191 muok = 0;
4192 XrdProofdProofServ *srv = GetActiveSession(psi.pr_pid);
4193 if (!srv || (srv && !strcmp(usr, srv->Client())))
4194 muok = 1;
4195 }
4196 if (muok)
4197 if (XrdProofdAux::KillProcess(psi.pr_pid, 1, ui, fMgr->ChangeOwn()) == 0)
4198 nk++;
4199 }
4200 }
4201 }
4202 // Close the directory
4203 closedir(dir);
4204
4205#elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
4206
4207 // Get the proclist
4208 kinfo_proc *pl = 0;
4209 int np;
4210 int ern = 0;
4211 if ((ern = XrdProofdAux::GetMacProcList(&pl, np)) != 0) {
4212 XrdOucString emsg("cannot get the process list: errno: ");
4213 emsg += ern;
4214 TRACE(XERR, emsg);
4215 return -1;
4216 }
4217
4218 // Loop over the list
4219 int ii = np;
4220 while (ii--) {
4221 if (strstr(pl[ii].kp_proc.p_comm, pn)) {
4222 if (all || (int)(pl[ii].kp_eproc.e_ucred.cr_uid) == refuid) {
4223 // Parent process must be us or be dead
4224 int ppid = pl[ii].kp_eproc.e_ppid;
4225 bool xppid = 0;
4226 if (ppid != getpid()) {
4227 int jj = np;
4228 while (jj--) {
4229 if (strstr(pl[jj].kp_proc.p_comm, "xrootd") &&
4230 pl[jj].kp_proc.p_pid == ppid) {
4231 xppid = 1;
4232 break;
4233 }
4234 }
4235 }
4236 if (!xppid) {
4237 bool muok = 1;
4238 if (fMgr->MultiUser() && !all) {
4239 // We need to check the user name: we may be the owner of somebody
4240 // else process; if no session is attached, we kill it
4241 muok = 0;
4242 XrdProofdProofServ *srv = GetActiveSession(pl[np].kp_proc.p_pid);
4243 if (!srv || (srv && !strcmp(usr, srv->Client())))
4244 muok = 1;
4245 }
4246 if (muok)
4247 // Good candidate to be shot
4248 if (XrdProofdAux::KillProcess(pl[np].kp_proc.p_pid, 1, ui, fMgr->ChangeOwn()))
4249 nk++;
4250 }
4251 }
4252 }
4253 }
4254 // Cleanup
4255 free(pl);
4256#else
4257 // For the remaining cases we use 'ps' via popen to localize the processes
4258
4259 // Build command
4260 XrdOucString cmd = "ps ";
4261 bool busr = 0;
4262#if 0
4263 // Left over of some previous implementation; but here fSuperUser is not defined: to be checked
4264 const char *cusr = (usr && strlen(usr) && fSuperUser) ? usr : fPClient->ID();
4265#else
4266 const char *cusr = (usr && strlen(usr)) ? usr : 0;
4267#endif
4268 if (all) {
4269 cmd += "ax";
4270 } else {
4271 if (cusr) {
4272 cmd += "-U ";
4273 cmd += cusr;
4274 cmd += " -u ";
4275 cmd += cusr;
4276 }
4277 cmd += " -f";
4278 busr = 1;
4279 }
4280 cmd += " | grep proofserv 2>/dev/null";
4281
4282 // Our parent ID as a string
4283 char cpid[10];
4284 snprintf(cpid, 10, "%d", getpid());
4285
4286 // Run it ...
4287 XrdOucString pids = ":";
4288 FILE *fp = popen(cmd.c_str(), "r");
4289 if (fp != 0) {
4290 char line[2048] = { 0 };
4291 while (fgets(line, sizeof(line), fp)) {
4292 // Parse line: make sure that we are the parent
4293 char *px = strstr(line, "xpd");
4294 if (!px)
4295 // Not xpd: old proofd ?
4296 continue;
4297 char *pi = strstr(px+3, cpid);
4298 if (!pi) {
4299 // Not started by us: check if the parent is still running
4300 pi = px + 3;
4301 int ppid = (int) XrdProofdAux::GetLong(pi);
4302 TRACE(HDBG, "found alternative parent ID: "<< ppid);
4303 // If still running then skip
4305 continue;
4306 }
4307 // Get pid now
4308 int from = 0;
4309 if (busr)
4310 from += strlen(cusr);
4311 int pid = (int) XrdProofdAux::GetLong(&line[from]);
4312 bool muok = 1;
4313 if (fMgr->MultiUser() && !all) {
4314 // We need to check the user name: we may be the owner of somebody
4315 // else process; if no session is attached, we kill it
4316 muok = 0;
4318 if (!srv || (srv && !strcmp(usr, srv->Client())))
4319 muok = 1;
4320 }
4321 if (muok)
4322 // Kill it
4323 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
4324 nk++;
4325 }
4326 pclose(fp);
4327 } else {
4328 // Error executing the command
4329 return -1;
4330 }
4331#endif
4332
4333 // Done
4334 return nk;
4335}
4336
4337////////////////////////////////////////////////////////////////////////////////
4338/// Set user ownerships on some critical files or directories.
4339/// Return 0 on success, -1 if enything goes wrong.
4340
4342 const char *ord, const char *stag)
4343{
4344 XPDLOC(SMGR, "ProofServMgr::SetUserOwnerships")
4345
4346 TRACE(REQ, "enter");
4347
4348 // If applicable, make sure that the private dataset dir for this user exists
4349 // and has the right permissions
4350 if (fMgr->DataSetSrcs()->size() > 0) {
4351 XrdProofUI ui;
4353 std::list<XrdProofdDSInfo *>::iterator ii;
4354 for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ++ii) {
4355 TRACE(ALL, "Checking dataset source: url:"<<(*ii)->fUrl<<", local:"
4356 <<(*ii)->fLocal<<", rw:"<<(*ii)->fRW);
4357 if ((*ii)->fLocal && (*ii)->fRW) {
4358 XrdOucString d;
4359 XPDFORM(d, "%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str());
4360 if (XrdProofdAux::AssertDir(d.c_str(), ui, fMgr->ChangeOwn()) == 0) {
4361 if (XrdProofdAux::ChangeMod(d.c_str(), 0777) == 0) {
4362 XPDFORM(d, "%s/%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str(),
4363 p->Client()->UI().fUser.c_str());
4364 if (XrdProofdAux::AssertDir(d.c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
4365 if (XrdProofdAux::ChangeMod(d.c_str(), 0755) != 0) {
4366 TRACE(XERR, "problems setting permissions 0755 on: "<<d);
4367 }
4368 } else {
4369 TRACE(XERR, "problems asserting: "<<d);
4370 }
4371 } else {
4372 TRACE(XERR, "problems setting permissions 0777 on: "<<d);
4373 }
4374 } else {
4375 TRACE(XERR, "problems asserting: "<<d);
4376 }
4377 }
4378 }
4379 }
4380
4381 // If applicable, make sure that the private data dir for this user exists
4382 // and has the right permissions
4383 if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0 &&
4384 fMgr->DataDirOpts() && strlen(fMgr->DataDirOpts()) > 0 && ord && stag) {
4385 XrdProofUI ui;
4387 XrdOucString dgr, dus[3];
4388 XPDFORM(dgr, "%s/%s", fMgr->DataDir(), p->Client()->UI().fGroup.c_str());
4389 if (XrdProofdAux::AssertDir(dgr.c_str(), ui, fMgr->ChangeOwn()) == 0) {
4390 if (XrdProofdAux::ChangeMod(dgr.c_str(), 0777) == 0) {
4391 unsigned int mode = 0755;
4392 if (strchr(fMgr->DataDirOpts(), 'g')) mode = 0775;
4393 if (strchr(fMgr->DataDirOpts(), 'a') || strchr(fMgr->DataDirOpts(), 'o')) mode = 0777;
4394 XPDFORM(dus[0], "%s/%s", dgr.c_str(), p->Client()->UI().fUser.c_str());
4395 XPDFORM(dus[1], "%s/%s", dus[0].c_str(), ord);
4396 XPDFORM(dus[2], "%s/%s", dus[1].c_str(), stag);
4397 for (int i = 0; i < 3; i++) {
4398 if (XrdProofdAux::AssertDir(dus[i].c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
4399 if (XrdProofdAux::ChangeMod(dus[i].c_str(), mode) != 0) {
4400 std::ios_base::fmtflags oflags = std::cerr.flags();
4401 TRACE(XERR, "problems setting permissions "<< oct << mode<<" on: "<<dus[i]);
4402 std::cerr.flags(oflags);
4403 }
4404 } else {
4405 TRACE(XERR, "problems asserting: "<<dus[i]);
4406 break;
4407 }
4408 }
4409 } else {
4410 TRACE(XERR, "problems setting permissions 0777 on: "<<dgr);
4411 }
4412 } else {
4413 TRACE(XERR, "problems asserting: "<<dgr);
4414 }
4415 }
4416
4417 if (fMgr->ChangeOwn()) {
4418 // Change ownership of '.creds'
4419 XrdOucString creds(p->Client()->Sandbox()->Dir());
4420 creds += "/.creds";
4421 if (XrdProofdAux::ChangeOwn(creds.c_str(), p->Client()->UI()) != 0) {
4422 TRACE(XERR, "can't change ownership of "<<creds);
4423 return -1;
4424 }
4425 }
4426
4427 // We are done
4428 TRACE(REQ, "done");
4429 return 0;
4430}
4431
4432////////////////////////////////////////////////////////////////////////////////
4433/// Set user environment: set effective user and group ID of the process
4434/// to the ones of the owner of this protocol instnace and change working
4435/// dir to the sandbox.
4436/// Return 0 on success, -1 if enything goes wrong.
4437
4439{
4440 XPDLOC(SMGR, "ProofServMgr::SetUserEnvironment")
4441
4442 TRACE(REQ, "enter");
4443
4445 p->Client()->UI(), fMgr->ChangeOwn()) != 0) {
4446 TRACE(XERR, "couldn't change directory to "<< p->Client()->Sandbox()->Dir());
4447 return -1;
4448 }
4449
4450 size_t len = 0;
4451 // set HOME env
4452 len = 8 + strlen(p->Client()->Sandbox()->Dir());
4453 char *h = new char[len];
4454 snprintf(h, len, "HOME=%s", p->Client()->Sandbox()->Dir());
4455 putenv(h);
4456 TRACE(DBG, "set "<<h);
4457
4458 // set USER env
4459 len = 8 + strlen(p->Client()->User());
4460 char *u = new char[len];
4461 snprintf(u, len, "USER=%s", p->Client()->User());
4462 putenv(u);
4463 TRACE(DBG, "set "<<u);
4464
4465 // Set access control list from /etc/initgroup
4466 // (super-user privileges required)
4467 TRACE(DBG, "setting ACLs");
4468 if (fMgr->ChangeOwn() && (int) geteuid() != p->Client()->UI().fUid) {
4469
4470 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
4471 if (XpdBadPGuard(pGuard, p->Client()->UI().fUid)) {
4472 TRACE(XERR, "could not get privileges");
4473 return -1;
4474 }
4475
4476 initgroups(p->Client()->UI().fUser.c_str(), p->Client()->UI().fGid);
4477 }
4478
4479 if (fMgr->ChangeOwn()) {
4480 // acquire permanently target user privileges
4481 TRACE(DBG, "acquiring target user identity: "<<(uid_t)p->Client()->UI().fUid<<
4482 ", "<<(gid_t)p->Client()->UI().fGid);
4483 if (XrdSysPriv::ChangePerm((uid_t)p->Client()->UI().fUid,
4484 (gid_t)p->Client()->UI().fGid) != 0) {
4485 TRACE(XERR, "can't acquire "<< p->Client()->UI().fUser <<" identity");
4486 return -1;
4487 }
4488 }
4489
4490 // We are done
4491 TRACE(REQ, "done");
4492 return 0;
4493}
4494
4495////////////////////////////////////////////////////////////////////////////////
4496/// Return active session with process ID pid, if any.
4497
4499{
4500 XrdOucString key; key += pid;
4501 return fSessions.Find(key.c_str());
4502}
4503
4504////////////////////////////////////////////////////////////////////////////////
4505/// Run thorugh entries to broadcast the relevant priority
4506
4507static int BroadcastPriority(const char *, XrdProofdProofServ *ps, void *s)
4508{
4509 XPDLOC(SMGR, "BroadcastPriority")
4510
4511 XpdBroadcastPriority_t *bp = (XpdBroadcastPriority_t *)s;
4512
4513 int nb = *(bp->fNBroadcast);
4514
4515 XrdOucString emsg;
4516 if (ps) {
4517 if (ps->IsValid() && (ps->Status() == kXPD_running) &&
4518 !(ps->SrvType() == kXPD_Master)) {
4519 XrdProofGroup *g = (ps->Group() && bp->fGroupMgr)
4520 ? bp->fGroupMgr->GetGroup(ps->Group()) : 0;
4521 TRACE(DBG, "group: "<< g<<", client: "<<ps->Client());
4522 if (g && g->Active() > 0) {
4523 TRACE(DBG, "priority: "<< g->Priority()<<" active: "<<g->Active());
4524 int prio = (int) (g->Priority() * 100);
4525 ps->BroadcastPriority(prio);
4526 nb++;
4527 }
4528 }
4529 // Go to next
4530 return 0;
4531 } else {
4532 emsg = "input entry undefined";
4533 }
4534
4535 // Some problem
4536 TRACE(XERR,"protocol error: "<<emsg);
4537 return 1;
4538}
4539
4540////////////////////////////////////////////////////////////////////////////////
4541/// Broadcast cluster info to the active sessions
4542
4544{
4545 XPDLOC(SMGR, "ProofServMgr::BroadcastClusterInfo")
4546
4547 TRACE(REQ, "enter");
4548
4549 int tot = 0, act = 0;
4550 std::list<XrdProofdProofServ *>::iterator si = fActiveSessions.begin();
4551 while (si != fActiveSessions.end()) {
4552 if ((*si)->SrvType() != kXPD_Worker) {
4553 tot++;
4554 if ((*si)->Status() == kXPD_running) act++;
4555 }
4556 ++si;
4557 }
4558 if (tot > 0) {
4559 XPDPRT("tot: "<<tot<<", act: "<<act);
4560 // Now propagate to master or sub-masters
4561 si = fActiveSessions.begin();
4562 while (si != fActiveSessions.end()) {
4563 if ((*si)->Status() == kXPD_running &&
4564 (*si)->SrvType() != kXPD_Worker) (*si)->SendClusterInfo(tot, act);
4565 ++si;
4566 }
4567 } else {
4568 TRACE(DBG, "No master or submaster controlled by this manager");
4569 }
4570}
4571
4572////////////////////////////////////////////////////////////////////////////////
4573/// Broadcast priorities to the active sessions.
4574/// Returns the number of sessions contacted.
4575
4577{
4578 XPDLOC(SMGR, "ProofServMgr::BroadcastPriorities")
4579
4580 TRACE(REQ, "enter");
4581
4582 int nb = 0;
4583 XpdBroadcastPriority_t bp = { (fMgr ? fMgr->GroupsMgr() : 0), &nb };
4584 fSessions.Apply(BroadcastPriority, (void *)&bp);
4585 // Done
4586 return nb;
4587}
4588
4589////////////////////////////////////////////////////////////////////////////////
4590/// Return true if in reconnection state, i.e. during
4591/// that period during which clients are expected to reconnect.
4592/// Return false if the session is fully effective
4593
4595{
4596 int rect = -1;
4597 if (fReconnectTime >= 0) {
4598 rect = time(0) - fReconnectTime;
4599 if (rect < fReconnectTimeOut)
4600 return true;
4601 }
4602 // Not reconnecting
4603 return false;
4604}
4605
4606////////////////////////////////////////////////////////////////////////////////
4607/// Change reconnecting status
4608///
4609
4611{
4612 XrdSysMutexHelper mhp(fMutex);
4613
4614 if (on) {
4615 fReconnectTime = time(0);
4616 } else {
4617 fReconnectTime = -1;
4618 }
4619}
4620
4621////////////////////////////////////////////////////////////////////////////////
4622/// Check destroyed status
4623///
4624
4626{
4627 XrdSysMutexHelper mhp(fMutex);
4628
4629 bool alive = true;
4630 int now = time(0);
4631 std::map<XrdProofdProtocol*,int>::iterator iter = fDestroyTimes.begin();
4632 while (iter != fDestroyTimes.end()) {
4633 int rect = now - iter->second;
4634 if (rect < fReconnectTimeOut) {
4635 if (p == iter->first) alive = false;
4636 ++iter;
4637 } else {
4638 iter = fDestroyTimes.erase(iter);
4639 }
4640 }
4641
4642 return alive;
4643}
4644
4645////////////////////////////////////////////////////////////////////////////////
4646/// Run through entries to reset the disconnecting client slots
4647
4648static int FreeClientID(const char *, XrdProofdProofServ *ps, void *s)
4649{
4650 XPDLOC(SMGR, "FreeClientID")
4651
4652 int pid = *((int *)s);
4653
4654 if (ps) {
4655 ps->FreeClientID(pid);
4656 // Go to next
4657 return 0;
4658 }
4659
4660 // Some problem
4661 TRACE(XERR, "protocol error: undefined session!");
4662 return 1;
4663}
4664
4665////////////////////////////////////////////////////////////////////////////////
4666/// Change reconnecting status
4667///
4668
4670{
4671 XrdSysMutexHelper mhp(fMutex);
4672
4673 fSessions.Apply(FreeClientID, (void *)&pid);
4674}
4675
4676////////////////////////////////////////////////////////////////////////////////
4677/// Run thorugh entries to count top-masters
4678
4679static int CountTopMasters(const char *, XrdProofdProofServ *ps, void *s)
4680{
4681 XPDLOC(SMGR, "CountTopMasters")
4682
4683 int *ntm = (int *)s;
4684
4685 XrdOucString emsg;
4686 if (ps) {
4687 if (ps->SrvType() == kXPD_TopMaster) (*ntm)++;
4688 // Go to next
4689 return 0;
4690 } else {
4691 emsg = "input entry undefined";
4692 }
4693
4694 // Some problem
4695 TRACE(XERR,"protocol error: "<<emsg);
4696 return 1;
4697}
4698
4699////////////////////////////////////////////////////////////////////////////////
4700/// Return the number of current sessions (top masters)
4701
4703{
4704 XPDLOC(SMGR, "ProofServMgr::CurrentSessions")
4705
4706 TRACE(REQ, "enter");
4707
4708 XrdSysMutexHelper mhp(fMutex);
4709 if (recalculate) {
4710 fCurrentSessions = 0;
4712 }
4713
4714 // Done
4715 return fCurrentSessions;
4716}
4717
4718////////////////////////////////////////////////////////////////////////////////
4719/// Resolve some keywords in 's'
4720/// <logfileroot>, <user>, <rootsys>
4721
4723{
4724 if (!in) return;
4725
4726 bool isWorker = 0;
4727 if (in->fPS->SrvType() == kXPD_Worker) isWorker = 1;
4728
4729 // Log file
4730 if (!isWorker && s.find("<logfilemst>") != STR_NPOS) {
4731 XrdOucString lfr(in->fLogFile);
4732 if (lfr.endswith(".log")) lfr.erase(lfr.rfind(".log"));
4733 s.replace("<logfilemst>", lfr);
4734 } else if (isWorker && s.find("<logfilewrk>") != STR_NPOS) {
4735 XrdOucString lfr(in->fLogFile);
4736 if (lfr.endswith(".log")) lfr.erase(lfr.rfind(".log"));
4737 s.replace("<logfilewrk>", lfr);
4738 }
4739
4740 // user
4741 if (getenv("USER") && s.find("<user>") != STR_NPOS) {
4742 XrdOucString usr(getenv("USER"));
4743 s.replace("<user>", usr);
4744 }
4745
4746 // rootsys
4747 if (getenv("ROOTSYS") && s.find("<rootsys>") != STR_NPOS) {
4748 XrdOucString rootsys(getenv("ROOTSYS"));
4749 s.replace("<rootsys>", rootsys);
4750 }
4751}
4752
4753//
4754// Auxilliary class to handle session pid files
4755//
4756
4757////////////////////////////////////////////////////////////////////////////////
4758/// Construct from 'c' and 's'
4759
4761{
4762 fLastAccess = 0;
4763
4764 // Fill from the client instance
4765 fUser = c ? c->User() : "";
4766 fGroup = c ? c->Group() : "";
4767
4768 // Fill from the server instance
4769 fPid = s ? s->SrvPID() : -1;
4770 fID = s ? s->ID() : -1;
4771 fSrvType = s ? s->SrvType() : -1;
4772 fPLiteNWrks = s ? s->PLiteNWrks() : -1;
4773 fStatus = s ? s->Status() : kXPD_unknown;
4774 fOrdinal = s ? s->Ordinal() : "";
4775 fTag = s ? s->Tag() : "";
4776 fAlias = s ? s->Alias() : "";
4777 fLogFile = s ? s->Fileout() : "";
4778 fROOTTag = (s && s->ROOT())? s->ROOT()->Tag() : "";
4779 fSrvProtVers = (s && s->ROOT()) ? s->ROOT()->SrvProtVers() : -1;
4780 fUserEnvs = s ? s->UserEnvs() : "";
4781 fAdminPath = s ? s->AdminPath() : "";
4782 fUnixPath = s ? s->UNIXSockPath() : "";
4783}
4784
4785////////////////////////////////////////////////////////////////////////////////
4786/// Fill 's' fields using the stored info
4787
4789{
4790 XPDLOC(SMGR, "SessionInfo::FillProofServ")
4791
4792 s.SetClient(fUser.c_str());
4793 s.SetGroup(fGroup.c_str());
4794 if (fPid > 0)
4795 s.SetSrvPID(fPid);
4796 if (fID >= 0)
4797 s.SetID(fID);
4800 s.SetStatus(fStatus);
4801 s.SetOrdinal(fOrdinal.c_str());
4802 s.SetTag(fTag.c_str());
4803 s.SetAlias(fAlias.c_str());
4804 s.SetFileout(fLogFile.c_str());
4805 if (rmgr) {
4806 if (rmgr->GetVersion(fROOTTag.c_str())) {
4807 s.SetROOT(rmgr->GetVersion(fROOTTag.c_str()));
4808 } else {
4809 TRACE(ALL, "ROOT version '"<< fROOTTag <<
4810 "' not availabe anymore: setting the default");
4811 s.SetROOT(rmgr->DefaultVersion());
4812 }
4813 }
4814 s.SetUserEnvs(fUserEnvs.c_str());
4815 s.SetAdminPath(fAdminPath.c_str(), 0, 0);
4816 s.SetUNIXSockPath(fUnixPath.c_str());
4817}
4818
4819////////////////////////////////////////////////////////////////////////////////
4820/// Save content to 'file'
4821
4823{
4824 XPDLOC(SMGR, "SessionInfo::SaveToFile")
4825
4826 // Check inputs
4827 if (!file || strlen(file) <= 0) {
4828 TRACE(XERR,"invalid input: "<< (file ? file : "<nul>"));
4829 return -1;
4830 }
4831 TRACE(HDBG,"session saved to file: "<<file);
4832
4833 // Create the file
4834 FILE *fpid = fopen(file, "w");
4835 if (fpid) {
4836 fprintf(fpid, "%s %s\n", fUser.c_str(), fGroup.c_str());
4837 fprintf(fpid, "%s\n", fUnixPath.c_str());
4838 fprintf(fpid, "%d %d %d %d\n", fPid, fID, fSrvType, fPLiteNWrks);
4839 fprintf(fpid, "%s %s %s\n", fOrdinal.c_str(), fTag.c_str(), fAlias.c_str());
4840 fprintf(fpid, "%s\n", fLogFile.c_str());
4841 fprintf(fpid, "%d %s\n", fSrvProtVers, fROOTTag.c_str());
4842 if (fUserEnvs.length() > 0)
4843 fprintf(fpid, "\n%s", fUserEnvs.c_str());
4844 fclose(fpid);
4845
4846 // Make it writable by anyone (to allow the corresponding proofserv
4847 // to touch it for the asynchronous ping request)
4848 if (chmod(file, 0666) != 0) {
4849 TRACE(XERR, "could not change mode to 0666 on file "<<
4850 file<<"; error: "<<errno);
4851 }
4852
4853 return 0;
4854 }
4855
4856 TRACE(XERR,"session pid file cannot be (re-)created: "<<
4857 file<<"; error: "<<errno);
4858 return -1;
4859}
4860
4861////////////////////////////////////////////////////////////////////////////////
4862/// Reset the content
4863
4865{
4866 fLastAccess = 0;
4867 fUser = "";
4868 fGroup = "";
4869 fAdminPath = "";
4870 fUnixPath = "";
4871 fPid = -1;
4873 fID = -1;
4874 fSrvType = -1;
4875 fPLiteNWrks = -1;
4876 fOrdinal = "";
4877 fTag = "";
4878 fAlias = "";
4879 fLogFile = "";
4880 fROOTTag = "";
4881 fSrvProtVers = -1;
4882 fUserEnvs = "";
4883}
4884
4885////////////////////////////////////////////////////////////////////////////////
4886/// Read content from 'file'
4887
4889{
4890 XPDLOC(SMGR, "SessionInfo::ReadFromFile")
4891
4892 Reset();
4893
4894 // Check inputs
4895 if (!file || strlen(file) <= 0) {
4896 TRACE(XERR,"invalid input: "<<(file ? file : "<nul>"));
4897 return -1;
4898 }
4899
4900 // Open the session file
4901 FILE *fpid = fopen(file,"r");
4902 if (fpid) {
4903 char line[4096];
4904 XrdOucString sline, t;
4905 int from = 0;
4906 if (fgets(line, sizeof(line), fpid)) {
4907 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = '\0';
4908 sline = line;
4909 if ((from = sline.tokenize(fUser, from, ' ')) == -1)
4910 TRACE(XERR,"warning: fUser: corrupted line? "<<line<<" (file: "<<file<<")");
4911 if ((from = sline.tokenize(fGroup, from, ' ')) == -1)
4912 TRACE(XERR,"warning: fGroup: corrupted line? "<<line<<" (file: "<<file<<")");
4913 }
4914 if (fgets(line, sizeof(line), fpid)) {
4915 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = '\0';
4916 fUnixPath = line;
4917 }
4918 if (fgets(line, sizeof(line), fpid)) {
4919 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = '\0';
4920 sline = line;
4921 from = 0;
4922 if ((from = sline.tokenize(t, from, ' ')) == -1)
4923 TRACE(XERR,"warning: fPid: corrupted line? "<<line<<" (file: "<<file<<")");
4924 fPid = t.atoi();
4925 if ((from = sline.tokenize(t, from, ' ')) == -1)
4926 TRACE(XERR,"warning: fID: corrupted line? "<<line<<" (file: "<<file<<")");
4927 fID = t.atoi();
4928 if ((from = sline.tokenize(t, from, ' ')) == -1)
4929 TRACE(XERR,"warning: fSrvType: corrupted line? "<<line<<" (file: "<<file<<")");
4930 fSrvType = t.atoi();
4931 }
4932 if (fgets(line, sizeof(line), fpid)) {
4933 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = '\0';
4934 sline = line;
4935 from = 0;
4936 if ((from = sline.tokenize(fOrdinal, from, ' ')) == -1)
4937 TRACE(XERR,"warning: fOrdinal: corrupted line? "<<line<<" (file: "<<file<<")");
4938 if ((from = sline.tokenize(fTag, from, ' ')) == -1)
4939 TRACE(XERR,"warning: fTag: corrupted line? "<<line<<" (file: "<<file<<")");
4940 if ((from = sline.tokenize(fAlias, from, ' ')) == -1)
4941 TRACE(HDBG,"fAlias undefined "<<line);
4942 }
4943 if (fgets(line, sizeof(line), fpid)) {
4944 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = '\0';
4945 fLogFile = line;
4946 }
4947 if (fgets(line, sizeof(line), fpid)) {
4948 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = '\0';
4949 sline = line;
4950 from = 0;
4951 if ((from = sline.tokenize(t, from, ' ')) == -1)
4952 TRACE(XERR,"warning: fSrvProtVers: corrupted line? "<<line<<" (file: "<<file<<")");
4953 fSrvProtVers = t.atoi();
4954 if ((from = sline.tokenize(fROOTTag, from, ' ')) == -1)
4955 TRACE(XERR,"warning: fROOTTag: corrupted line? "<<line<<" (file: "<<file<<")");
4956 }
4957 // All the remaining into fUserEnvs
4958 fUserEnvs = "";
4959 off_t lnow = lseek(fileno(fpid), (off_t) 0, SEEK_CUR);
4960 off_t ltot = lseek(fileno(fpid), (off_t) 0, SEEK_END);
4961 int left = (int)(ltot - lnow);
4962 int len = -1;
4963 do {
4964 int wanted = (left > 4095) ? 4095 : left;
4965 while ((len = read(fileno(fpid), line, wanted)) < 0 &&
4966 errno == EINTR)
4967 errno = 0;
4968 if (len < 0 || len < wanted) {
4969 break;
4970 } else {
4971 line[len] = '\0';
4972 fUserEnvs += line;
4973 }
4974 // Update counters
4975 left -= len;
4976 } while (len > 0 && left > 0);
4977 // Done
4978 fclose(fpid);
4979 // The file name is the admin path
4980 fAdminPath = file;
4981 // Fill access time
4982 struct stat st;
4983 if (!stat(file, &st))
4984 fLastAccess = st.st_atime;
4985 } else {
4986 TRACE(XERR,"session file cannot be open: "<< file<<"; error: "<<errno);
4987 return -1;
4988 }
4989
4990 // Read the last status now if the session is active
4991 XrdOucString fs(file);
4992 fs += ".status";
4993 fpid = fopen(fs.c_str(),"r");
4994 if (fpid) {
4995 char line[64];
4996 if (fgets(line, sizeof(line), fpid)) {
4997 if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = 0;
4998 fStatus = atoi(line);
4999 }
5000 // Done
5001 fclose(fpid);
5002 } else {
5003 TRACE(DBG,"no session status file for: "<< fs<<"; session was probably terminated");
5004 }
5005
5006 // Done
5007 return 0;
5008}
5009
5010////////////////////////////////////////////////////////////////////////////////
5011/// Check if this env applies to 'usr', 'grp, 'ver'.
5012/// Returns -1 if it does not match, >=0 if it matches. The value is a linear
5013/// combination of matching lengths for user and group, with a weight of 1000 for
5014/// the users one, so that an exact user match will always win.
5015
5016int XpdEnv::Matches(const char *usr, const char *grp, int ver)
5017{
5018 XPDLOC(SMGR, "XpdEnv::Matches")
5019
5020 int nmtc = -1;
5021 // Check the user
5022 if (fUsers.length() > 0) {
5023 XrdOucString u(usr);
5024 if ((nmtc = u.matches(fUsers.c_str())) == 0) return -1;
5025 } else {
5026 nmtc = strlen(usr);
5027 }
5028 nmtc += 1000; // Weigth of user name match
5029 // Check the group
5030 int nmtcg = -1;
5031 if (fGroups.length() > 0) {
5032 XrdOucString g(grp);
5033 if ((nmtcg = g.matches(fGroups.c_str())) == 0) return -1;
5034 } else {
5035 nmtcg = strlen(grp);
5036 }
5037 nmtc += nmtcg;
5038
5039 TRACE(HDBG, fEnv <<", u:"<<usr<<", g:"<<grp<<" --> nmtc: "<<nmtc);
5040
5041 // Check the version code
5042 TRACE(HDBG, fEnv <<", ver:"<<ver);
5043 if (fVerMin > 0 && ver < fVerMin) return -1;
5044 if (fVerMax > 0 && ver > fVerMax) return -1;
5045
5046 // If we are here then it matches
5047 return nmtc;
5048}
5049
5050////////////////////////////////////////////////////////////////////////////////
5051/// Transform version number ver (format patch + 100*minor + 10000*maj, e.g. 52706)
5052/// If 'hex' is true, the components are decoded as hex numbers
5053
5054int XpdEnv::ToVersCode(int ver, bool hex)
5055{
5056 int maj = -1, min = -1, ptc = -1, xv = ver;
5057 if (hex) {
5058 maj = xv / 65536;
5059 xv -= maj * 65536;
5060 min = xv / 256;
5061 ptc = xv - min * 256;
5062 } else {
5063 maj = xv / 10000;
5064 xv -= maj * 10000;
5065 min = xv / 100;
5066 ptc = xv - min * 100;
5067 }
5068 // Get the version code now
5069 int vc = (maj << 16) + (min << 8) + ptc;
5070 return vc;
5071}
5072
5073////////////////////////////////////////////////////////////////////////////////
5074/// Print the content of this env
5075
5076void XpdEnv::Print(const char *what)
5077{
5078 XPDLOC(SMGR, what)
5079
5080 XrdOucString vmi("-1"), vmx("-1");
5081 if (fVerMin > 0) {
5082 int maj = (fVerMin >> 16);
5083 int min = ((fVerMin - maj * 65536) >> 8);
5084 int ptc = fVerMin - maj * 65536 - min * 256;
5085 XPDFORM(vmi, "%d%d%d", maj, min, ptc);
5086 }
5087 if (fVerMax > 0) {
5088 int maj = (fVerMax >> 16);
5089 int min = ((fVerMax - maj * 65536) >> 8);
5090 int ptc = fVerMax - maj * 65536 - min * 256;
5091 XPDFORM(vmx, "%d%d%d", maj, min, ptc);
5092 }
5093 XrdOucString u("allusers"), g("allgroups");
5094 if (fUsers.length() > 0) u = fUsers;
5095 if (fGroups.length() > 0) u = fGroups;
5096
5097 TRACE(ALL, "'"<<fEnv<<"' {"<<u<<"|"<<g<<
5098 "} svn:["<<fSvnMin<<","<<fSvnMax<<"] vers:["<<vmi<<","<<vmx<<"]");
5099}
ROOT::R::TRInterface & r
Definition Object.C:4
#define d(i)
Definition RSha256.hxx:102
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
#define g(i)
Definition RSha256.hxx:105
#define a(i)
Definition RSha256.hxx:99
#define h(i)
Definition RSha256.hxx:106
#define e(i)
Definition RSha256.hxx:103
#define TRACE(Flag, Args)
Definition TGHtml.h:120
R__EXTERN C unsigned int sleep(unsigned int seconds)
@ kXP_create
@ kXP_destroy
@ kXP_detach
@ kXP_attach
#define kXPD_Master
#define kXPD_ClientMaster
@ kXPD_running
@ kXPD_unknown
#define kXPD_TopMaster
#define kXPD_AnyServer
@ kXP_TooManySess
@ kXP_ServerError
#define kXPD_MasterMaster
#define kXPD_Worker
#define kXPD_MasterWorker
@ kXPD_msg
@ kXPD_wrkmortem
@ kXPD_errmsg
@ kXPD_srvmsg
#define DIGIT(x)
int DoDirectiveString(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for a string.
#define XPDFORM
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
int DoDirectiveInt(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Process directive for an integer.
#define XpdBadPGuard(g, u)
static XpdManagerCron_t fManagerCron
#define XPD_LONGOK(x)
#define XPD_LIBPATH
static int WriteSessRCs(const char *, XpdEnv *erc, void *f)
Run thorugh entries to broadcast the relevant priority.
static int FreeClientID(const char *, XrdProofdProofServ *ps, void *s)
Run through entries to reset the disconnecting client slots.
#define PutEnv(x, e)
static int WriteSessEnvs(const char *, XpdEnv *env, void *s)
Run thorugh entries to broadcast the relevant priority.
static int CountTopMasters(const char *, XrdProofdProofServ *ps, void *s)
Run thorugh entries to count top-masters.
static int BroadcastPriority(const char *, XrdProofdProofServ *ps, void *s)
Run thorugh entries to broadcast the relevant priority.
void * XrdProofdProofServRecover(void *p)
Waiting for session to recover after an abrupt shutdown.
void * XrdProofdProofServCron(void *p)
This is an endless loop to check the system periodically or when triggered via a message in a dedicat...
static XpdManagerCron_t fManagerCron
#define PSMMAXCNTS
#define XPROOFD_VERSBIN
#define XPD_SETRESP(p, x)
#define XPDPRT(x)
#define XPDLOC(d, x)
#define TRACEP(p, act, x)
#define TRACING(x)
#define free
Definition civetweb.c:1539
const char * extension
Definition civetweb.c:7793
#define snprintf
Definition civetweb.c:1540
XrdProofdClient * fClient
std::list< XrdProofdProofServ * > fProofServs
XrdOucString fGroups
int Matches(const char *usr, const char *grp, int ver=-1)
Check if this env applies to 'usr', 'grp, 'ver'.
void Print(const char *what)
Print the content of this env.
XrdOucString fName
XrdOucString fUsers
void Reset(const char *n, const char *env, const char *usr=0, const char *grp=0, int smi=-1, int smx=-1, int vmi=-1, int vmx=-1)
static int ToVersCode(int ver, bool hex=0)
Transform version number ver (format patch + 100*minor + 10000*maj, e.g.
XrdOucString fEnv
int Type() const
const char * Buf() const
int Get(int &i)
Get next token and interpret it as an int.
void Reset()
void SetP(XrdProofdProtocol *p)
void SetSid(unsigned short sid)
static int Attach(XrdLink *lp)
const char * GetCfgFile() const
virtual int MaxSessions() const
XrdProofdPipe * Pipe()
void FillProofServ(XrdProofdProofServ &s, XrdROOTMgr *rmgr)
Fill 's' fields using the stored info.
int SaveToFile(const char *file)
Save content to 'file'.
void Reset()
Reset the content.
XrdProofSessionInfo(XrdProofdClient *c, XrdProofdProofServ *s)
Construct from 'c' and 's'.
int ReadFromFile(const char *file)
Read content from 'file'.
XrdOucString fUser
XrdOucString fGroup
static int ChangeOwn(const char *path, XrdProofUI ui)
Change the ownership of 'path' to the entity described by 'ui'.
static int GetUserInfo(const char *usr, XrdProofUI &ui)
Get information about user 'usr' in a thread safe way.
static int AssertDir(const char *path, XrdProofUI ui, bool changeown)
Make sure that 'path' exists and is owned by the entity described by 'ui'.
static int VerifyProcessByID(int pid, const char *pname="proofserv")
Check if a process named 'pname' and process 'pid' is still in the process table.
static long int GetLong(char *str)
Extract first integer from string at 'str', if any.
static const char * ProofRequestTypes(int type)
Translates the proof request type in a human readable string.
static int KillProcess(int pid, bool forcekill, XrdProofUI ui, bool changeown)
Kill the process 'pid'.
static int ChangeToDir(const char *dir, XrdProofUI ui, bool changeown)
Change current directory to 'dir'.
static int ChangeMod(const char *path, unsigned int mode)
Change the permission mode of 'path' to 'mode'.
static int GetProcesses(const char *pn, std::map< int, XrdOucString > *plist)
Get from the process table list of PIDs for processes named "proofserv' For {linux,...
static int GetIDFromPath(const char *path, XrdOucString &emsg)
Extract an integer from a file.
static void LogEmsgToFile(const char *flog, const char *emsg, const char *pfx=0)
Logs error message 'emsg' to file 'flog' using standard technology.
static int SymLink(const char *path, const char *link)
Create a symlink 'link' to 'path' Return 0 in case of success, -1 in case of error.
static int CheckIf(XrdOucStream *s, const char *h)
Check existence and match condition of an 'if' directive If none (valid) is found,...
static int ParsePidPath(const char *path, XrdOucString &before, XrdOucString &after)
Parse a path in the form of "<before>[.<pid>][.<after>]", filling 'rest' and returning 'pid'.
XrdProofdClient * GetClient(const char *usr, const char *grp=0, bool create=1)
Handle request for localizing a client instance for {usr, grp} from the list.
XrdProofUI UI() const
XrdSysRecMutex * Mutex() const
void TerminateSessions(int srvtype, XrdProofdProofServ *ref, const char *msg, XrdProofdPipe *pipe, bool changeown)
Terminate client sessions; IDs of signalled processes are added to sigpid.
XrdProofdProofServ * GetFreeServObj()
Get next free server ID.
XrdProofdSandbox * Sandbox() const
XrdROOT * ROOT() const
const char * Group() const
const char * User() const
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
bool ReadFile(bool update=true)
Return true if the file has never been read or did change since last reading, false otherwise.
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
XrdSysError * fEDest
const char * CfgFile() const
XrdProofdPriorityMgr * PriorityMgr() const
const char * PoolURL() const
XrdProofSched * ProofSched() const
bool ChangeOwn() const
const char * SockPathDir() const
const char * DataDirUrlOpts() const
XrdROOTMgr * ROOTMgr() const
std::list< XrdProofdDSInfo * > * DataSetSrcs()
const char * DataDirOpts() const
XrdProofdNetMgr * NetMgr() const
XrdProofGroupMgr * GroupsMgr() const
XrdScheduler * Sched() const
bool IsSuperMst() const
bool MultiUser() const
XrdProofdClientMgr * ClientMgr() const
const char * LocalROOT() const
const char * Host() const
const char * StageReqRepo() const
const char * BareLibPath() const
const char * Image() const
const char * DataDir() const
const char * NameSpace() const
const char * TMPdir() const
const char * EffectiveUser() const
const char * AdminPath() const
int ResolveKeywords(XrdOucString &s, XrdProofdClient *pcl)
Resolve special keywords in 's' for client 'pcl'.
bool WorkerUsrCfg() const
bool IsValid() const
int Recv(XpdMsg &msg)
Recv message from the pipe.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
void Close()
If open, close and invalidated the pipe descriptors.
int Post(int type, const char *msg)
Post message on the pipe.
int SetProcessPriority(int pid, const char *usr, int &dp)
Change priority of process pid belonging to user, if needed.
XrdProofdProofServ * GetActiveSession(int pid)
Return active session with process ID pid, if any.
int Attach(XrdProofdProtocol *p)
Handle a request to attach to an existing session.
void UpdateCounter(int t, int n)
int Create(XrdProofdProtocol *p)
Handle a request to create a new session.
void BroadcastClusterInfo()
Broadcast cluster info to the active sessions.
XrdProofdProofServ * PrepareProofServ(XrdProofdProtocol *p, XrdProofdResponse *r, unsigned short &sid)
Allocate and prepare the XrdProofdProofServ object describing this session.
int SetUserEnvironment(XrdProofdProtocol *p)
Set user environment: set effective user and group ID of the process to the ones of the owner of this...
int Destroy(XrdProofdProtocol *p)
Handle a request to shutdown an existing session.
int CurrentSessions(bool recalculate=0)
Return the number of current sessions (top masters)
int DoDirectiveProofServMgr(char *, XrdOucStream *, bool)
Process 'proofswrvmgr' directive eg: xpd.proofswrvmgr checkfq:120 termto:100 verifyto:5 recoverto:20.
void ExtractEnv(char *, XrdOucStream *, XrdOucString &users, XrdOucString &groups, XrdOucString &rcval, XrdOucString &rcnam, int &smi, int &smx, int &vmi, int &vmx, bool &hex)
Extract env information from the stream 'cfg'.
int CheckTerminatedSessions()
Go through the terminated sessions admin path and make sure sessions they are gone.
int Detach(XrdProofdProtocol *p)
Handle a request to detach from an existing session.
int PrepareSessionRecovering()
Go through the active sessions admin path and prepare reconnection of those still alive.
int CleanupLostProofServ()
Cleanup (kill) all 'proofserv' processes which lost control from their creator or controller daemon.
int BroadcastPriorities()
Broadcast priorities to the active sessions.
XrdProofdProofServMgr(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
int DoDirectivePutRc(char *, XrdOucStream *, bool)
Process 'putrc' directives.
int CleanupProofServ(bool all=0, const char *usr=0)
Cleanup (kill) all 'proofserv' processes from the process table.
void FillEnvList(std::list< XpdEnv > *el, const char *nam, const char *val, const char *usrs=0, const char *grps=0, int smi=-1, int smx=-1, int vmi=-1, int vmx=-1, bool hex=0)
Fill env entry(ies) in the relevant list.
int AcceptPeer(XrdProofdProofServ *xps, int to, XrdOucString &e)
Accept a callback from a starting-up server and setup the related protocol object.
void ParseCreateBuffer(XrdProofdProtocol *p, XrdProofdProofServ *xps, XrdOucString &tag, XrdOucString &ord, XrdOucString &cffile, XrdOucString &uenvs, int &intwait)
Extract relevant quantities from the buffer received during a create request.
bool IsClientRecovering(const char *usr, const char *grp, int &deadline)
Returns true (an the recovering deadline) if the client has sessions in recovering state; returns fal...
int MvSession(const char *fpid)
Move session file from the active to the terminated areas.
void FormFileNameInSessionDir(XrdProofdProtocol *p, XrdProofdProofServ *xps, const char *sessiondir, const char *extension, XrdOucString &outfn)
void DisconnectFromProofServ(int pid)
Change reconnecting status.
XrdOucHash< XrdProofdProofServ > fSessions
int SetProofServEnv(XrdProofdProtocol *p, void *in)
Set environment for proofserv.
int CreateSockPath(XrdProofdProofServ *xps, XrdProofdProtocol *p, unsigned int seq, XrdOucString &emsg)
Create the socket path for the starting session Return 0 on success, -1 on error (error message in 'e...
bool IsReconnecting()
Return true if in reconnection state, i.e.
int DoDirectiveShutdown(char *, XrdOucStream *, bool)
Process 'shutdown' directive.
XrdSecCredsSaver_t fCredsSaver
int RmSession(const char *fpid)
Remove session file from the terminated sessions area.
int SetProofServEnvOld(XrdProofdProtocol *p, void *in)
Set environment for proofserv; old version preparing the environment for proofserv protocol version <...
int CreateAdminPath(XrdProofdProofServ *xps, XrdProofdProtocol *p, int pid, XrdOucString &emsg)
Create the admin path for the starting session Return 0 on success, -1 on error (error message in 'em...
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
int CreateProofServEnvFile(XrdProofdProtocol *p, void *input, const char *envfn, const char *rcfn)
Create in 'rcfn' the rootrc file for the proofserv being created return 0 on success,...
std::list< XrdProofdProofServ * > fActiveSessions
int DoDirectivePutEnv(char *, XrdOucStream *, bool)
Process 'putenv' directives.
int RecoverActiveSessions()
Accept connections from sessions still alive.
int AddSession(XrdProofdProtocol *p, XrdProofdProofServ *s)
Add new active session.
std::map< XrdProofdProtocol *, int > fDestroyTimes
int CheckActiveSessions(bool verify=1)
Go through the active sessions admin path and make sure sessions are alive.
bool IsSessionSocket(const char *fpid)
Checks is fpid is the path of a session UNIX socket Returns TRUE is yes; cleans the socket if the ses...
int SetupProtocol(XrdNetPeer &peerpsrv, XrdProofdProofServ *xps, XrdOucString &e)
Setup the protocol object serving the peer described by 'peerpsrv'.
int SetUserOwnerships(XrdProofdProtocol *p, const char *ord, const char *stag)
Set user ownerships on some critical files or directories.
void GetTagDirs(int opt, XrdProofdProtocol *p, XrdProofdProofServ *xps, XrdOucString &sesstag, XrdOucString &topsesstag, XrdOucString &sessiondir, XrdOucString &sesswrkdir)
Determine the unique tag and relevant dirs for this session.
std::list< XpdClientSessions * > * fRecoverClients
void RegisterDirectives()
Register directives for configuration.
bool Alive(XrdProofdProtocol *p)
Check destroyed status.
int DeleteFromSessions(const char *pid)
Delete from the hash list the session with ID pid.
int Recover(XpdClientSessions *cl)
Handle a request to recover a session after stop&restart for a specific client.
std::list< XpdEnv > fProofServRCs
int TouchSession(const char *fpid, const char *path=0)
Update the access time for the session pid file to the current time.
void SendErrLog(const char *errlog, XrdProofdResponse *r)
Send content of errlog upstream asynchronously.
int Process(XrdProofdProtocol *p)
Process manager request.
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
void SetReconnectTime(bool on=1)
Change reconnecting status.
int CreateProofServRootRc(XrdProofdProtocol *p, void *input, const char *rcfn)
Create in 'rcfn' the rootrc file for the proofserv being created return 0 on success,...
void ResolveKeywords(XrdOucString &s, ProofServEnv_t *in)
Resolve some keywords in 's' <logfileroot>, <user>, <rootsys>
int VerifySession(const char *fpid, int to=-1, const char *path=0)
Check if the session is alive, i.e.
int ResolveSession(const char *fpid)
Handle a request to recover a session after stop&restart.
std::list< XpdEnv > fProofServEnvs
int CleanClientSessions(const char *usr, int srvtype)
Go through the sessions admin path and clean all sessions belonging to 'usr'.
XrdSrvBuffer * StartMsg() const
int BroadcastPriority(int priority)
Broadcast a new group priority value to the worker servers.
const char * Client() const
void SetOrdinal(const char *o)
void SetGroup(const char *g)
void DeleteUNIXSock()
Delete the current UNIX socket.
void SetClient(const char *c)
void SetROOT(XrdROOT *r)
void SetValid(bool valid=1)
void SetTag(const char *t)
const char * Tag() const
const char * AdminPath() const
void SetID(short int id)
void SetProtocol(XrdProofdProtocol *p)
const char * Ordinal() const
XrdClientID * GetClientID(int cid)
Get instance corresponding to cid.
const char * UserEnvs() const
int CreateUNIXSock(XrdSysError *edest)
Create UNIX socket for internal connections.
const char * Alias() const
int VerifyProofServ(bool fw)
Check if the associated proofserv process is alive.
const char * UNIXSockPath() const
int FreeClientID(int pid)
Free instance corresponding to protocol connecting process 'pid'.
int SetAdminPath(const char *a, bool assert, bool setown)
Set the admin path and make sure the file exists.
void SetFileout(const char *f)
void SetUserEnvs(const char *t)
const char * Group() const
XrdROOT * ROOT() const
void SetAlias(const char *a)
int GetNClients(bool check)
Get the number of connected clients.
void Reset()
Reset this instance.
void SetUNIXSockPath(const char *s)
XrdNet * UNIXSock() const
short int ID() const
XrdClientID * Parent() const
void SetParent(XrdClientID *cid)
const char * Fileout() const
int CheckSession(bool oldvers, bool isrec, int shutopt, int shutdel, bool changeown, int &nc)
Calculate the effective number of users on this session nodes and communicate it to the master togeth...
XrdProofdProtocol * Protocol() const
XrdLink * Link() const
short int ProofProtocol() const
static int EUidAtStartup()
XrdProtocol * Match(XrdLink *lp)
Check whether the request matches this protocol.
XrdProofdClient * Client() const
XrdSecProtocol * AuthProt() const
kXR_int32 CID() const
XrdBuffer * Argp() const
XPClientRequest * Request() const
void SetAdminPath(const char *p)
const char * Dir() const
int AddSession(const char *tag)
Record entry for new proofserv session tagged 'tag' in the active sessions file (<SandBox>/....
XrdROOT * GetVersion(const char *tag)
Return pointer to the ROOT version corresponding to 'tag' or 0 if not found.
Definition XrdROOT.cxx:738
XrdROOT * DefaultVersion() const
Definition XrdROOT.h:118
static int GetVersionCode(const char *release)
Translate 'release' into a version code integer following the rules in $ROOTSYS/include/RVersion....
Definition XrdROOT.cxx:275
const char * Export() const
Definition XrdROOT.h:70
const char * Tag() const
Definition XrdROOT.h:83
int VersionCode() const
Definition XrdROOT.h:84
const char * PrgmSrv() const
Definition XrdROOT.h:79
kXR_int16 SrvProtVers() const
Definition XrdROOT.h:82
const char * Dir() const
Definition XrdROOT.h:65
static int ChangePerm(uid_t uid, gid_t gid)
TLine * line
Definition file.py:1
static const char * what
Definition stlLoader.cc:6
XrdOucString fTopSessionTag
XrdProofdProofServ * fPS
XrdProofdProofServMgr * fSessionMgr
XrdProofdClientMgr * fClientMgr
XrdProofSched * fProofSched
auto * l
Definition textangle.C:4
struct ClientRequestHdr header
struct XPClientProofRequest proof