Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
XrdProofdAdmin.cxx
Go to the documentation of this file.
1// @(#)root/proofd:$Id$
2// Author: G. Ganis Feb 2008
3
4/*************************************************************************
5 * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// XrdProofdAdmin //
15// //
16// Author: G. Ganis, CERN, 2008 //
17// //
18// Envelop class for admin services. //
19// Loaded as service by XrdProofdManager. //
20// //
21//////////////////////////////////////////////////////////////////////////
22#include "XrdProofdPlatform.h"
23
24
25#include "XpdSysError.h"
26
27#include "Xrd/XrdBuffer.hh"
28#include "Xrd/XrdScheduler.hh"
30#include "XrdOuc/XrdOucStream.hh"
31
32#include "XrdProofdAdmin.h"
33#include "XrdProofdClient.h"
34#include "XrdProofdClientMgr.h"
35#include "XrdProofdManager.h"
36#include "XrdProofdNetMgr.h"
39#include "XrdProofdProtocol.h"
40#include "XrdProofGroup.h"
41#include "XrdProofSched.h"
42#include "XrdProofdProofServ.h"
43#include "XrdROOT.h"
44
45// Tracing utilities
46#include "XrdProofdTrace.h"
47
48////////////////////////////////////////////////////////////////////////////////
49/// Decrease active session counters on worker w
50
51static int ExportCpCmd(const char *k, XpdAdminCpCmd *cc, void *s)
52{
53 XPDLOC(PMGR, "ExportCpCmd")
54
55 XrdOucString *ccs = (XrdOucString *)s;
56 if (cc && ccs) {
57 if (ccs->length() > 0) *ccs += ",";
58 *ccs += k;
59 *ccs += ":";
60 *ccs += cc->fCmd;
61 TRACE(DBG, k <<" : "<<cc->fCmd<<" fmt: '"<<cc->fFmt<<"'");
62 // Check next
63 return 0;
64 }
65
66 // Not enough info: stop
67 return 1;
68}
69
70////////////////////////////////////////////////////////////////////////////////
71/// Constructor
72
74 XrdProtocol_Config *pi, XrdSysError *e)
75 : XrdProofdConfig(pi->ConfigFN, e)
76{
77 fMgr = mgr;
78 fExportPaths.clear();
79 // Map of default copy commands supported / allowed, keyed by the protocol
80 fAllowedCpCmds.Add("file", new XpdAdminCpCmd("cp","cp -rp %s %s",1));
81 fAllowedCpCmds.Add("root", new XpdAdminCpCmd("xrdcp","xrdcp %s %s",1));
82 fAllowedCpCmds.Add("xrd", new XpdAdminCpCmd("xrdcp","xrdcp %s %s",1));
83#if !defined(__APPLE__)
84 fAllowedCpCmds.Add("http", new XpdAdminCpCmd("wget","wget %s -O %s",0));
85 fAllowedCpCmds.Add("https", new XpdAdminCpCmd("wget","wget %s -O %s",0));
86#else
87 fAllowedCpCmds.Add("http", new XpdAdminCpCmd("curl","curl %s -o %s",0));
88 fAllowedCpCmds.Add("https", new XpdAdminCpCmd("curl","curl %s -o %s",0));
89#endif
90 fCpCmds = "";
91 fAllowedCpCmds.Apply(ExportCpCmd, (void *)&fCpCmds);
92
93 // Configuration directives
95}
96
97////////////////////////////////////////////////////////////////////////////////
98/// Register directives for configuration
99
101{
102 Register("exportpath", new XrdProofdDirective("exportpath", this, &DoDirectiveClass));
103 Register("cpcmd", new XrdProofdDirective("cpcmd", this, &DoDirectiveClass));
104}
105
106////////////////////////////////////////////////////////////////////////////////
107/// Process admin request
108
110{
111 XPDLOC(ALL, "Admin::Process")
112
113 int rc = 0;
114 XPD_SETRESP(p, "Process");
115
116 TRACEP(p, REQ, "req id: " << type << " ("<<
118
119 XrdOucString emsg;
120 switch (type) {
121 case kQueryMssUrl:
122 return QueryMssUrl(p);
123 case kQuerySessions:
124 return QuerySessions(p);
125 case kQueryLogPaths:
126 return QueryLogPaths(p);
127 case kCleanupSessions:
128 return CleanupSessions(p);
129 case kSendMsgToUser:
130 return SendMsgToUser(p);
131 case kGroupProperties:
132 return SetGroupProperties(p);
133 case kGetWorkers:
134 return GetWorkers(p);
135 case kQueryWorkers:
136 return QueryWorkers(p);
138 return QueryROOTVersions(p);
139 case kROOTVersion:
140 return SetROOTVersion(p);
141 case kSessionAlias:
142 return SetSessionAlias(p);
143 case kSessionTag:
144 return SetSessionTag(p);
145 case kReleaseWorker:
146 return ReleaseWorker(p);
147 case kExec:
148 return Exec(p);
149 case kGetFile:
150 return GetFile(p);
151 case kPutFile:
152 return PutFile(p);
153 case kCpFile:
154 return CpFile(p);
155 default:
156 emsg += "Invalid type: ";
157 emsg += type;
158 break;
159 }
160
161 // Notify invalid request
162 response->Send(kXR_InvalidRequest, emsg.c_str());
163
164 // Done
165 return 0;
166}
167
168////////////////////////////////////////////////////////////////////////////////
169/// Run configuration and parse the entered config directives.
170/// Return 0 on success, -1 on error
171
173{
174 XPDLOC(ALL, "Admin::Config")
175
176 // Run first the configurator
177 if (XrdProofdConfig::Config(rcf) != 0) {
178 XPDERR("problems parsing file ");
179 return -1;
180 }
181
182 XrdOucString msg;
183 msg = (rcf) ? "re-configuring" : "configuring";
184 TRACE(ALL, msg.c_str());
185
186 // Exported paths
187 if (fExportPaths.size() > 0) {
188 TRACE(ALL, "additional paths which can be browsed by all users: ");
189 std::list<XrdOucString>::iterator is = fExportPaths.begin();
190 while (is != fExportPaths.end()) { TRACE(ALL, " "<<*is); ++is; }
191 }
192 // Allowed / supported copy commands
193 TRACE(ALL, "allowed/supported copy commands: "<<fCpCmds);
194
195 // Done
196 return 0;
197}
198
199////////////////////////////////////////////////////////////////////////////////
200/// Update the priorities of the active sessions.
201
203 char *val, XrdOucStream *cfg, bool rcf)
204{
205 XPDLOC(SMGR, "Admin::DoDirective")
206
207 if (!d)
208 // undefined inputs
209 return -1;
210
211 if (d->fName == "exportpath") {
212 return DoDirectiveExportPath(val, cfg, rcf);
213 } else if (d->fName == "cpcmd") {
214 return DoDirectiveCpCmd(val, cfg, rcf);
215 }
216 TRACE(XERR,"unknown directive: "<<d->fName);
217 return -1;
218}
219
220////////////////////////////////////////////////////////////////////////////////
221/// Process 'exportpath' directives
222/// eg: xpd.exportpath /tmp/data /data2/data
223
224int XrdProofdAdmin::DoDirectiveExportPath(char *val, XrdOucStream *cfg, bool)
225{
226 XPDLOC(SMGR, "Admin::DoDirectiveExportPath")
227
228 if (!val || !cfg)
229 // undefined inputs
230 return -1;
231
232 TRACE(ALL,"val: "<<val);
233
234 while (val) {
235 XrdOucString tkns(val), tkn;
236 int from = 0;
237 while ((from = tkns.tokenize(tkn, from, ' ')) != STR_NPOS) {
238 fExportPaths.push_back(tkn);
239 }
240 // Get next
241 val = cfg->GetWord();
242 }
243
244 return 0;
245}
246
247////////////////////////////////////////////////////////////////////////////////
248/// Process 'cpcmd' directives
249/// eg: xpd.cpcmd alien aliencp fmt:"%s %s" put:0
250
251int XrdProofdAdmin::DoDirectiveCpCmd(char *val, XrdOucStream *cfg, bool)
252{
253 XPDLOC(SMGR, "Admin::DoDirectiveCpCmd")
254
255 if (!val || !cfg)
256 // undefined inputs
257 return -1;
258
259 XrdOucString proto, cpcmd, fmt;
260 bool canput = 0, isfmt = 0, rm = 0;
261
262 while (val) {
263 XrdOucString tkn(val);
264 if (proto.length() <= 0) {
265 proto = tkn;
266 if (proto.beginswith('-')) {
267 rm = 1;
268 proto.erase(0, 1);
269 break;
270 }
271 } else if (cpcmd.length() <= 0) {
272 cpcmd = tkn;
273 } else if (tkn.beginswith("put:")) {
274 isfmt = 0;
275 if (tkn == "put:1") canput = 1;
276 } else if (tkn.beginswith("fmt:")) {
277 fmt.assign(tkn, 4, -1);
278 isfmt = 1;
279 } else {
280 if (isfmt) {
281 fmt += " ";
282 fmt += tkn;
283 }
284 }
285 // Get next
286 val = cfg->GetWord();
287 }
288
289 if (rm) {
290 // Remove the related entry
291 fAllowedCpCmds.Del(proto.c_str());
292 } else if (cpcmd.length() > 0 && fmt.length() > 0) {
293 // Add or replace
294 fmt.insert(" ", 0);
295 fmt.insert(cpcmd, 0);
296 fAllowedCpCmds.Rep(proto.c_str(), new XpdAdminCpCmd(cpcmd.c_str(),fmt.c_str(),canput));
297 } else {
298 TRACE(ALL, "incomplete information: ignoring!");
299 }
300
301 // Fill again the export string
302 fCpCmds = "";
303 fAllowedCpCmds.Apply(ExportCpCmd, (void *)&fCpCmds);
304
305 return 0;
306}
307
308////////////////////////////////////////////////////////////////////////////////
309/// Handle request for the URL to the MSS attached to the cluster.
310/// The reply contains also the namespace, i.e. proto://host:port//namespace
311
313{
314 XPDLOC(ALL, "Admin::QueryMssUrl")
315
316 int rc = 0;
317 XPD_SETRESP(p, "QueryMssUrl");
318
319 XrdOucString msg = fMgr->PoolURL();
320 msg += "/";
321 msg += fMgr->NameSpace();
322
323 TRACEP(p, DBG, "sending: "<<msg);
324
325 // Send back to user
326 response->Send((void *)msg.c_str(), msg.length()+1);
327
328 // Over
329 return 0;
330}
331
332////////////////////////////////////////////////////////////////////////////////
333/// Handle request for list of ROOT versions
334
336{
337 XPDLOC(ALL, "Admin::QueryROOTVersions")
338
339 int rc = 0;
340 XPD_SETRESP(p, "QueryROOTVersions");
341
342 XrdOucString msg = fMgr->ROOTMgr()->ExportVersions(p->Client()->ROOT());
343
344 TRACEP(p, DBG, "sending: "<<msg);
345
346 // Send back to user
347 response->Send((void *)msg.c_str(), msg.length()+1);
348
349 // Over
350 return 0;
351}
352
353////////////////////////////////////////////////////////////////////////////////
354/// Handle request for changing the default ROOT version
355
357{
358 XPDLOC(ALL, "Admin::SetROOTVersion")
359
360 int rc = 0;
361 XPD_SETRESP(p, "SetROOTVersion");
362
363 // Change default ROOT version
364 const char *t = p->Argp() ? (const char *) p->Argp()->buff : "default";
365 int len = p->Argp() ? p->Request()->header.dlen : strlen("default");
366 XrdOucString tag(t,len);
367
368 // If a user name is given separate it out and check if
369 // we can do the operation
370 XrdOucString usr;
371 if (tag.beginswith("u:")) {
372 usr = tag;
373 usr.erase(usr.rfind(' '));
374 usr.replace("u:","");
375 // Isolate the tag
376 tag.erase(0,tag.find(' ') + 1);
377 }
378 TRACEP(p, REQ, "usr: "<<usr<<", version tag: "<< tag);
379
380 // If the action is requested for a user different from us we
381 // must be 'superuser'
382 XrdProofdClient *c = p->Client();
383 XrdOucString grp;
384 if (usr.length() > 0) {
385 // Separate group info, if any
386 if (usr.find(':') != STR_NPOS) {
387 grp = usr;
388 grp.erase(grp.rfind(':'));
389 usr.erase(0,usr.find(':') + 1);
390 } else {
392 (fMgr->GroupsMgr()) ? fMgr->GroupsMgr()->GetUserGroup(usr.c_str()) : 0;
393 grp = g ? g->Name() : "default";
394 }
395 if (usr != p->Client()->User()) {
396 if (!p->SuperUser()) {
397 usr.insert("not allowed to change settings for usr '", 0);
398 usr += "'";
399 TRACEP(p, XERR, usr.c_str());
400 response->Send(kXR_InvalidRequest, usr.c_str());
401 return 0;
402 }
403 // Lookup the list
404 if (!(c = fMgr->ClientMgr()->GetClient(usr.c_str(), grp.c_str()))) {
405 // No: fail
406 XrdOucString emsg("user not found or not allowed: ");
407 emsg += usr;
408 TRACEP(p, XERR, emsg.c_str());
409 response->Send(kXR_InvalidRequest, emsg.c_str());
410 return 0;
411 }
412 }
413 }
414
415 // Search in the list
416 XrdROOT *r = fMgr->ROOTMgr()->GetVersion(tag.c_str());
417 bool ok = r ? 1 : 0;
418 if (!r && tag == "default") {
419 // If not found we may have been requested to set the default version
421 ok = r ? 1 : 0;
422 }
423
424 if (ok) {
425 // Save the version in the client instance
426 c->SetROOT(r);
427 // Notify
428 TRACEP(p, DBG, "default changed to "<<c->ROOT()->Tag()<<
429 " for {client, group} = {"<<usr<<", "<<grp<<"} ("<<c<<")");
430 // Forward down the tree, if not leaf
431 int brc = 0;
432 if (fMgr->SrvType() != kXPD_Worker) {
433 XrdOucString buf("u:");
434 buf += c->UI().fUser;
435 buf += " ";
436 buf += tag;
437 int type = ntohl(p->Request()->proof.int1);
438 brc = fMgr->NetMgr()->Broadcast(type, buf.c_str(), p->Client()->User(), response);
439 }
440 if (brc == 0) {
441 // Acknowledge user
442 response->Send();
443 } else {
444 // Notify something wrong
445 tag.insert("tag '", 0);
446 tag += "' not found in the list of available ROOT versions on some worker nodes";
447 TRACEP(p, XERR, tag.c_str());
448 response->Send(kXR_InvalidRequest, tag.c_str());
449 }
450 } else {
451 tag.insert("tag '", 0);
452 tag += "' not found in the list of available ROOT versions";
453 TRACEP(p, XERR, tag.c_str());
454 response->Send(kXR_InvalidRequest, tag.c_str());
455 }
456
457 // Over
458 return 0;
459}
460
461////////////////////////////////////////////////////////////////////////////////
462/// Handle request for getting the list of potential workers
463
465{
466 XPDLOC(ALL, "Admin::QueryWorkers")
467
468 int rc = 0;
469 XPD_SETRESP(p, "QueryWorkers");
470
471 // Send back a list of potentially available workers
472 XrdOucString sbuf(1024);
473 fMgr->ProofSched()->ExportInfo(sbuf);
474
475 // Send buffer
476 char *buf = (char *) sbuf.c_str();
477 int len = sbuf.length() + 1;
478 TRACEP(p, DBG, "sending: "<<buf);
479
480 // Send back to user
481 response->Send(buf, len);
482
483 // Over
484 return 0;
485}
486
487////////////////////////////////////////////////////////////////////////////////
488/// Handle request for getting the best set of workers
489
491{
492 XPDLOC(ALL, "Admin::GetWorkers")
493
494 int rc = 0;
495 XPD_SETRESP(p, "GetWorkers");
496
497 // Unmarshall the data
498 int psid = ntohl(p->Request()->proof.sid);
499
500 // Find server session
501 XrdProofdProofServ *xps = 0;
502 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
503 TRACEP(p, XERR, "session ID not found: "<<psid);
504 response->Send(kXR_InvalidRequest,"session ID not found");
505 return 0;
506 }
507 int pid = xps->SrvPID();
508 TRACEP(p, REQ, "request from session "<<pid);
509
510 // We should query the chosen resource provider
511 XrdOucString wrks("");
512
513 // Read the message associated with the request; needs to do like this because
514 // of a bug in the XrdOucString constructor when length is 0
515 XrdOucString msg;
516 if (p->Request()->header.dlen > 0)
517 msg.assign((const char *) p->Argp()->buff, 0, p->Request()->header.dlen);
518 if (fMgr->GetWorkers(wrks, xps, msg.c_str()) < 0 ) {
519 // Something wrong
520 response->Send(kXR_InvalidRequest, "GetWorkers failed");
521 return 0;
522 }
523
524 // Send buffer
525 // In case the session was enqueued, pass an empty list.
526 char *buf = (char *) wrks.c_str();
527 int len = wrks.length() + 1;
528 TRACEP(p, DBG, "sending: "<<buf);
529
530 // Send back to user
531 if (buf) {
532 response->Send(buf, len);
533 } else {
534 // Something wrong
535 response->Send(kXR_InvalidRequest, "GetWorkers failed");
536 return 0;
537 }
538
539 // Over
540 return 0;
541}
542
543////////////////////////////////////////////////////////////////////////////////
544/// Handle request for setting group properties
545
547{
548 XPDLOC(ALL, "Admin::SetGroupProperties")
549
550 int rc = 1;
551 XPD_SETRESP(p, "SetGroupProperties");
552
553 // User's group
554 int len = p->Request()->header.dlen;
555 char *grp = new char[len+1];
556 memcpy(grp, p->Argp()->buff, len);
557 grp[len] = 0;
558 TRACEP(p, DBG, "request to change priority for group '"<< grp<<"'");
559
560 // Make sure is the current one of the user
561 if (strcmp(grp, p->Client()->UI().fGroup.c_str())) {
562 TRACEP(p, XERR, "received group does not match the user's one");
563 response->Send(kXR_InvalidRequest,
564 "SetGroupProperties: received group does not match the user's one");
565 SafeDelArray(grp);
566 return 0;
567 }
568
569 // The priority value
570 int priority = ntohl(p->Request()->proof.int2);
571
572 // Tell the priority manager
573 if (fMgr && fMgr->PriorityMgr()) {
574 XrdOucString buf;
575 XPDFORM(buf, "%s %d", grp, priority);
577 buf.c_str()) != 0) {
578 TRACEP(p, XERR, "problem sending message on the pipe");
579 response->Send(kXR_ServerError,
580 "SetGroupProperties: problem sending message on the pipe");
581 SafeDelArray(grp);
582 return 0;
583 }
584 }
585
586 // Notify
587 TRACEP(p, REQ, "priority for group '"<< grp<<"' has been set to "<<priority);
588
589 SafeDelArray(grp);
590
591 // Acknowledge user
592 response->Send();
593
594 // Over
595 return 0;
596}
597
598////////////////////////////////////////////////////////////////////////////////
599/// Handle request for sending a message to a user
600
602{
603 XPDLOC(ALL, "Admin::SendMsgToUser")
604
605 int rc = 0;
606 XPD_SETRESP(p, "SendMsgToUser");
607
608 // Target client (default us)
609 XrdProofdClient *tgtclnt = p->Client();
610 XrdProofdClient *c = 0;
611 std::list<XrdProofdClient *>::iterator i;
612
613 // Extract the user name, if any
614 int len = p->Request()->header.dlen;
615 if (len <= 0) {
616 // No message: protocol error?
617 TRACEP(p, XERR, "no message");
618 response->Send(kXR_InvalidRequest,"SendMsgToUser: no message");
619 return 0;
620 }
621
622 XrdOucString cmsg((const char *)p->Argp()->buff, len);
623 XrdOucString usr;
624 if (cmsg.beginswith("u:")) {
625 // Extract user
626 int isp = cmsg.find(' ');
627 if (isp != STR_NPOS) {
628 usr.assign(cmsg, 2, isp-1);
629 cmsg.erase(0, isp+1);
630 }
631 if (usr.length() > 0) {
632 TRACEP(p, REQ, "request for user: '"<<usr<<"'");
633 // Find the client instance
634 bool clntfound = 0;
635 if ((c = fMgr->ClientMgr()->GetClient(usr.c_str(), 0))) {
636 tgtclnt = c;
637 clntfound = 1;
638 }
639 if (!clntfound) {
640 // No user: protocol error?
641 TRACEP(p, XERR, "target client not found");
642 response->Send(kXR_InvalidRequest,
643 "SendMsgToUser: target client not found");
644 return 0;
645 }
646 }
647 }
648 // Recheck message length
649 if (cmsg.length() <= 0) {
650 // No message: protocol error?
651 TRACEP(p, XERR, "no message after user specification");
652 response->Send(kXR_InvalidRequest,
653 "SendMsgToUser: no message after user specification");
654 return 0;
655 }
656
657 // Check if allowed
658 if (!p->SuperUser()) {
659 if (usr.length() > 0) {
660 if (tgtclnt != p->Client()) {
661 TRACEP(p, XERR, "not allowed to send messages to usr '"<<usr<<"'");
662 response->Send(kXR_InvalidRequest,
663 "SendMsgToUser: not allowed to send messages to specified usr");
664 return 0;
665 }
666 } else {
667 TRACEP(p, XERR, "not allowed to send messages to connected users");
668 response->Send(kXR_InvalidRequest,
669 "SendMsgToUser: not allowed to send messages to connected users");
670 return 0;
671 }
672 } else {
673 if (usr.length() <= 0) tgtclnt = 0;
674 }
675
676 // The clients to notified
677 fMgr->ClientMgr()->Broadcast(tgtclnt, cmsg.c_str());
678
679 // Acknowledge user
680 response->Send();
681
682 // Over
683 return 0;
684}
685
686////////////////////////////////////////////////////////////////////////////////
687/// Handle request for list of sessions
688
690{
691 XPDLOC(ALL, "Admin::QuerySessions")
692
693 int rc = 0;
694 XPD_SETRESP(p, "QuerySessions");
695
696 XrdOucString notmsg, msg;
697 { // This is needed to block the session checks
699 msg = p->Client()->ExportSessions(notmsg, response);
700 }
701
702 if (notmsg.length() > 0) {
703 // Some sessions seem non-responding: notify the client
704 response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notmsg.c_str(), notmsg.length());
705 }
706
707 TRACEP(p, DBG, "sending: "<<msg);
708
709 // Send back to user
710 response->Send((void *)msg.c_str(), msg.length()+1);
711
712 // Over
713 return 0;
714}
715
716////////////////////////////////////////////////////////////////////////////////
717/// Handle request for log paths
718
720{
721 XPDLOC(ALL, "Admin::QueryLogPaths")
722
723 int rc = 0;
724 XPD_SETRESP(p, "QueryLogPaths");
725
726 int ridx = ntohl(p->Request()->proof.int2);
727 bool broadcast = (ntohl(p->Request()->proof.int3) == 1) ? 1 : 0;
728
729 // Find out for which session is this request
730 XrdOucString stag, master, user, ord, buf;
731 int len = p->Request()->header.dlen;
732 if (len > 0) {
733 buf.assign(p->Argp()->buff,0,len-1);
734 int im = buf.find("|master:");
735 int iu = buf.find("|user:");
736 int io = buf.find("|ord:");
737 stag = buf;
738 stag.erase(stag.find("|"));
739 if (im != STR_NPOS) {
740 master.assign(buf, im + strlen("|master:"));
741 master.erase(master.find("|"));
742 }
743 if (iu != STR_NPOS) {
744 user.assign(buf, iu + strlen("|user:"));
745 user.erase(user.find("|"));
746 }
747 if (io != STR_NPOS) {
748 ord.assign(buf, io + strlen("|ord:"));
749 ord.erase(ord.find("|"));
750 }
751 if (stag.beginswith('*'))
752 stag = "";
753 }
754 TRACEP(p, DBG, "master: "<<master<<", user: "<<user<<", ord: "<<ord<<", stag: "<<stag);
755
756 XrdProofdClient *client = (user.length() > 0) ? 0 : p->Client();
757 if (!client)
758 // Find the client instance
759 client = fMgr->ClientMgr()->GetClient(user.c_str(), 0);
760 if (!client) {
761 TRACEP(p, XERR, "query sess logs: client for '"<<user<<"' not found");
762 response->Send(kXR_InvalidRequest,"QueryLogPaths: query log: client not found");
763 return 0;
764 }
765
766 XrdOucString tag = (stag == "" && ridx >= 0) ? "last" : stag;
767 if (stag == "" && client->Sandbox()->GuessTag(tag, ridx) != 0) {
768 TRACEP(p, XERR, "query sess logs: session tag not found");
769 response->Send(kXR_InvalidRequest,"QueryLogPaths: query log: session tag not found");
770 return 0;
771 }
772
773 // Return message
774 XrdOucString rmsg;
775
776 if (master.length() <= 0) {
777 // The session tag first
778 rmsg += tag; rmsg += "|";
779 // The pool URL second
780 rmsg += fMgr->PoolURL(); rmsg += "|";
781 }
782
783 // Locate the local log file
784 XrdOucString sdir(client->Sandbox()->Dir());
785 sdir += "/session-";
786 sdir += tag;
787
788 // Open dir
789 DIR *dir = opendir(sdir.c_str());
790 if (!dir) {
791 XrdOucString msg("cannot open dir ");
792 msg += sdir; msg += " (errno: "; msg += errno; msg += ")";
793 TRACEP(p, XERR, msg.c_str());
794 response->Send(kXR_InvalidRequest, msg.c_str());
795 return 0;
796 }
797
798 // Masters have the .workers file
799 XrdOucString wfile(sdir);
800 wfile += "/.workers";
801 bool ismaster = (access(wfile.c_str(), F_OK) == 0) ? 1 : 0;
802
803 // Scan the directory to add the top master (only if top master)
804 XrdOucString xo, logtag, xf;
805 int ilog, idas, iund1, iund2;
806 struct dirent *ent = 0;
807 while ((ent = (struct dirent *)readdir(dir))) {
808 if (!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..")) continue;
809 XPDFORM(xf, "%s/%s", sdir.c_str(), (const char *) ent->d_name);
810 struct stat st;
811 if (stat(xf.c_str(), &st) != 0) continue;
812 if (!S_ISREG(st.st_mode)) continue;
813 xo = ent->d_name;
814 if (xo.matches("*-*-*-*-*.log") <= 0 && xo.matches("*-*-*-*-*.valgrind.log") <= 0) continue;
815 TRACEP(p, ALL, "xf: "<<xf<<"; st_mode: "<<st.st_mode);
816 bool recordinfo = 0;
817 if ((ilog = xo.find(".log")) != STR_NPOS) {
818 xo.replace(".log", "");
819
820 // If it is an "additional" logfile, extract a "tag" identifying it
821 // from the filename. Tag is in format: __<tag>__
822 iund1 = xo.find("__");
823 if (iund1 != STR_NPOS) {
824 iund2 = xo.rfind("__");
825 if ((iund2 != STR_NPOS) && (iund2 != iund1)) {
826 logtag = xo;
827 logtag.erase(iund2);
828 logtag.erase(0, iund1+2);
829 }
830 }
831
832 if ((idas = xo.find('-')) != STR_NPOS) xo.erase(0, idas + 1);
833 if ((idas = xo.find('-')) != STR_NPOS) xo.erase(idas);
834 if (ord.length() > 0 && (ord == xo)) {
835 recordinfo = 1;
836 } else {
837 if (ismaster && !broadcast) {
838 if (!strncmp(ent->d_name, "master-", 7)) recordinfo = 1;
839 } else {
840 recordinfo = 1;
841 }
842 }
843 if (recordinfo) {
844 rmsg += "|"; rmsg += xo;
845 if (logtag != "") { rmsg += '('; rmsg += logtag; rmsg += ')'; }
846 rmsg += " proof://"; rmsg += fMgr->Host(); rmsg += ':';
847 rmsg += fMgr->Port(); rmsg += '/';
848 rmsg += sdir; rmsg += '/'; rmsg += ent->d_name;
849 }
850 }
851 }
852 // Close dir
853 closedir(dir);
854
855 // If required and it makes sense, ask the underlying nodes
856 if (broadcast && ismaster) {
857 XrdOucString msg(tag);
858 msg += "|master:";
859 msg += fMgr->Host();
860 msg += "|user:";
861 msg += client->User();
862 char *bmst = fMgr->NetMgr()->ReadLogPaths(msg.c_str(), ridx);
863 if (bmst) {
864 rmsg += bmst;
865 free(bmst);
866 }
867 } else if (ismaster) {
868 // Get info from the .workers file
869 // Now open the workers file
870 FILE *f = fopen(wfile.c_str(), "r");
871 if (f) {
872 char ln[2048];
873 while (fgets(ln, sizeof(ln), f)) {
874 if (ln[strlen(ln)-1] == '\n')
875 ln[strlen(ln)-1] = 0;
876 // Locate status and url
877 char *ps = strchr(ln, ' ');
878 if (ps) {
879 *ps = 0;
880 ps++;
881 // Locate ordinal
882 char *po = strchr(ps, ' ');
883 if (po) {
884 po++;
885 // Locate path
886 char *pp = strchr(po, ' ');
887 if (pp) {
888 *pp = 0;
889 pp++;
890 // Record now
891 rmsg += "|"; rmsg += po; rmsg += " ";
892 if (master.length() > 0) {
893 rmsg += master;
894 rmsg += ",";
895 }
896 rmsg += ln; rmsg += '/';
897 rmsg += pp;
898 // Reposition on the file name
899 char *ppl = strrchr(pp, '/');
900 pp = (ppl) ? ppl : pp;
901 // If the line is for a submaster, we have to get the info
902 // about its workers
903 bool ismst = (strstr(pp, "master-")) ? 1 : 0;
904 if (ismst) {
905 XrdClientUrlInfo u((const char *)&ln[0]);
906 XrdOucString msg(stag);
907 msg += "|master:";
908 msg += ln;
909 msg += "|user:";
910 msg += u.User;
911 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
912 char *bmst = fMgr->NetMgr()->ReadLogPaths(u.GetUrl().c_str(), msg.c_str(), ridx);
913 if (bmst) {
914 rmsg += bmst;
915 free(bmst);
916 }
917 }
918 }
919 }
920 }
921 }
922 fclose(f);
923 }
924 }
925
926 // Send back to user
927 response->Send((void *) rmsg.c_str(), rmsg.length()+1);
928
929 // Over
930 return 0;
931}
932
933////////////////////////////////////////////////////////////////////////////////
934/// Handle request of
935
937{
938 XPDLOC(ALL, "Admin::CleanupSessions")
939
940 int rc = 0;
941 XPD_SETRESP(p, "CleanupSessions");
942
943 XrdOucString cmsg;
944
945 // Target client (default us)
946 XrdProofdClient *tgtclnt = p->Client();
947
948 // If super user we may be requested to cleanup everything
949 bool all = 0;
950 char *usr = 0;
951 bool clntfound = 1;
952 if (p->SuperUser()) {
953 int what = ntohl(p->Request()->proof.int2);
954 all = (what == 1) ? 1 : 0;
955
956 if (!all) {
957 // Get a user name, if any.
958 // A super user can ask cleaning for clients different from itself
959 char *buf = 0;
960 int len = p->Request()->header.dlen;
961 if (len > 0) {
962 clntfound = 0;
963 buf = p->Argp()->buff;
964 len = (len < 9) ? len : 8;
965 } else {
966 buf = (char *) p->Client()->User();
967 len = strlen(p->Client()->User());
968 }
969 if (len > 0) {
970 usr = new char[len+1];
971 memcpy(usr, buf, len);
972 usr[len] = '\0';
973 // Group info, if any
974 char *grp = strstr(usr, ":");
975 if (grp)
976 *grp++ = 0;
977 // Find the client instance
978 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(usr, grp);
979 if (c) {
980 tgtclnt = c;
981 clntfound = 1;
982 }
983 TRACEP(p, REQ, "superuser, cleaning usr: "<< usr);
984 }
985 } else {
986 tgtclnt = 0;
987 TRACEP(p, REQ, "superuser, all sessions cleaned");
988 }
989 } else {
990 // Define the user name for later transactions (their executed under
991 // the admin name)
992 int len = strlen(tgtclnt->User()) + 1;
993 usr = new char[len+1];
994 memcpy(usr, tgtclnt->User(), len);
995 usr[len] = '\0';
996 }
997
998 // We cannot continue if we do not have anything to clean
999 if (!clntfound) {
1000 TRACEP(p, DBG, "client '"<<usr<<"' has no sessions - do nothing");
1001 }
1002
1003 // hard or soft (always hard for old clients)
1004 bool hard = (ntohl(p->Request()->proof.int3) == 1 || p->ProofProtocol() < 18) ? 1 : 0;
1005 const char *lab = hard ? "hard-reset" : "soft-reset";
1006
1007 // Asynchronous notification to requester
1008 if (fMgr->SrvType() != kXPD_Worker) {
1009 XPDFORM(cmsg, "CleanupSessions: %s: signalling active sessions for termination", lab);
1010 response->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
1011 }
1012
1013 // Send a termination request to client sessions
1014 XPDFORM(cmsg, "CleanupSessions: %s: cleaning up client: requested by: %s", lab, p->Link()->ID);
1015 int srvtype = ntohl(p->Request()->proof.int2);
1016 fMgr->ClientMgr()->TerminateSessions(tgtclnt, cmsg.c_str(), srvtype);
1017
1018 // Forward down the tree only if not leaf
1019 if (hard && fMgr->SrvType() != kXPD_Worker) {
1020
1021 // Asynchronous notification to requester
1022 XPDFORM(cmsg, "CleanupSessions: %s: forwarding the reset request to next tier(s) ", lab);
1023 response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) cmsg.c_str(), cmsg.length());
1024
1025 int type = ntohl(p->Request()->proof.int1);
1026 fMgr->NetMgr()->Broadcast(type, usr, p->Client()->User(), response, 1);
1027 }
1028
1029 // Wait just a bit before testing the activity of the session manager
1030 sleep(1);
1031
1032 // Additional waiting (max 10 secs) depends on the activity of the session manager
1033 int twait = 10;
1034 while (twait-- > 0 &&
1036 if (twait < 7) {
1037 XPDFORM(cmsg, "CleanupSessions: %s: wait %d more seconds for completion ...", lab, twait);
1038 response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) cmsg.c_str(), cmsg.length());
1039 }
1040 sleep(1);
1041 }
1042
1043 // Cleanup usr
1044 SafeDelArray(usr);
1045
1046 // Acknowledge user
1047 response->Send();
1048
1049 // Over
1050 return 0;
1051}
1052
1053////////////////////////////////////////////////////////////////////////////////
1054/// Handle request for setting the session alias
1055
1057{
1058 XPDLOC(ALL, "Admin::SetSessionAlias")
1059
1060 int rc = 0;
1061 XPD_SETRESP(p, "SetSessionAlias");
1062
1063 //
1064 // Specific info about a session
1065 int psid = ntohl(p->Request()->proof.sid);
1066 XrdProofdProofServ *xps = 0;
1067 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
1068 TRACEP(p, XERR, "session ID not found: "<<psid);
1069 response->Send(kXR_InvalidRequest,"SetSessionAlias: session ID not found");
1070 return 0;
1071 }
1072
1073 // Set session alias
1074 const char *msg = (const char *) p->Argp()->buff;
1075 int len = p->Request()->header.dlen;
1076 if (len > kXPROOFSRVALIASMAX - 1)
1077 len = kXPROOFSRVALIASMAX - 1;
1078
1079 // Save tag
1080 if (len > 0 && msg) {
1081 xps->SetAlias(msg);
1082 if (TRACING(DBG)) {
1083 XrdOucString alias(xps->Alias());
1084 TRACEP(p, DBG, "session alias set to: "<<alias);
1085 }
1086 }
1087
1088 // Acknowledge user
1089 response->Send();
1090
1091 // Over
1092 return 0;
1093}
1094
1095////////////////////////////////////////////////////////////////////////////////
1096/// Handle request for setting the session tag
1097
1099{
1100 XPDLOC(ALL, "Admin::SetSessionTag")
1101
1102 int rc = 0;
1103 XPD_SETRESP(p, "SetSessionTag");
1104 //
1105 // Specific info about a session
1106 int psid = ntohl(p->Request()->proof.sid);
1107 XrdProofdProofServ *xps = 0;
1108 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
1109 TRACEP(p, XERR, "session ID not found: "<<psid);
1110 response->Send(kXR_InvalidRequest,"SetSessionTag: session ID not found");
1111 return 0;
1112 }
1113
1114 // Set session tag
1115 const char *msg = (const char *) p->Argp()->buff;
1116 int len = p->Request()->header.dlen;
1117 if (len > kXPROOFSRVTAGMAX - 1)
1118 len = kXPROOFSRVTAGMAX - 1;
1119
1120 // Save tag
1121 if (len > 0 && msg) {
1122 xps->SetTag(msg);
1123 if (TRACING(DBG)) {
1124 XrdOucString tag(xps->Tag());
1125 TRACEP(p, DBG, "session tag set to: "<<tag);
1126 }
1127 }
1128
1129 // Acknowledge user
1130 response->Send();
1131
1132 // Over
1133 return 0;
1134}
1135
1136////////////////////////////////////////////////////////////////////////////////
1137/// Handle request for releasing a worker
1138
1140{
1141 XPDLOC(ALL, "Admin::ReleaseWorker")
1142
1143 int rc = 0;
1144 XPD_SETRESP(p, "ReleaseWorker");
1145 //
1146 // Specific info about a session
1147 int psid = ntohl(p->Request()->proof.sid);
1148 XrdProofdProofServ *xps = 0;
1149 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
1150 TRACEP(p, XERR, "session ID not found: "<<psid);
1151 response->Send(kXR_InvalidRequest,"ReleaseWorker: session ID not found");
1152 return 0;
1153 }
1154
1155 // Set session tag
1156 const char *msg = (const char *) p->Argp()->buff;
1157 int len = p->Request()->header.dlen;
1158 if (len > kXPROOFSRVTAGMAX - 1)
1159 len = kXPROOFSRVTAGMAX - 1;
1160
1161 // Save tag
1162 if (len > 0 && msg) {
1163 xps->RemoveWorker(msg);
1164 TRACEP(p, DBG, "worker \""<<msg<<"\" released");
1165 if (TRACING(HDBG)) fMgr->NetMgr()->Dump();
1166 }
1167
1168 // Acknowledge user
1169 response->Send();
1170
1171 // Over
1172 return 0;
1173}
1174
1175////////////////////////////////////////////////////////////////////////////////
1176/// Check is 's' contains any of the forbidden chars '(){};'
1177/// Return 0 if OK (no forbidden chars), -1 in not OK
1178
1180{
1181 int len = 0;
1182 if (!s || (len = strlen(s)) <= 0) return 0;
1183
1184 int j = len;
1185 while (j--) {
1186 char c = s[j];
1187 if (c == '(' || c == ')' || c == '{' || c == '}' || c == ';') {
1188 return -1;
1189 }
1190 }
1191 // Done
1192 return 0;
1193}
1194
1195////////////////////////////////////////////////////////////////////////////////
1196/// Handle request of cleaning parts of the sandbox
1197
1199{
1200 XPDLOC(ALL, "Admin::Exec")
1201
1202 // Commands; must be synchronized with EAdminExecType in XProofProtocol.h
1203#if !defined(__APPLE__)
1204 const char *cmds[] = { "rm", "ls", "more", "grep", "tail", "md5sum", "stat", "find" };
1205#else
1206 const char *cmds[] = { "rm", "ls", "more", "grep", "tail", "md5", "stat", "find" };
1207#endif
1208 const char *actcmds[] = { "remove", "access", "open", "open", "open", "open", "stat", "find"};
1209
1210 int rc = 0;
1211 XPD_SETRESP(p, "Exec");
1212
1213 XrdOucString emsg;
1214
1215 // Target client (default us)
1216 XrdProofdClient *tgtclnt = p->Client();
1217 if (!tgtclnt) {
1218 emsg = "client instance not found";
1219 TRACEP(p, XERR, emsg);
1220 response->Send(kXR_InvalidRequest, emsg.c_str());
1221 return 0;
1222 }
1223
1224 // Action type
1225 int action = ntohl(p->Request()->proof.int2);
1226 if (action < kRm || action > kFind) {
1227 emsg = "unknown action type: ";
1228 emsg += action;
1229 TRACEP(p, XERR, emsg);
1230 response->Send(kXR_InvalidRequest, emsg.c_str());
1231 return 0;
1232 }
1233
1234 // Parse the string
1235 int dlen = p->Request()->header.dlen;
1236 XrdOucString msg, node, path, opt;
1237 if (dlen > 0 && p->Argp()->buff) {
1238 msg.assign((const char *)p->Argp()->buff, 0, dlen);
1239 // Parse
1240 emsg = "";
1241 int from = 0;
1242 if ((from = msg.tokenize(node, from, '|')) != -1) {
1243 if ((from = msg.tokenize(path, from, '|')) != -1) {
1244 from = msg.tokenize(opt, from, '|');
1245 } else {
1246 emsg = "'path' not found in message";
1247 }
1248 } else {
1249 emsg = "'node' not found in message";
1250 }
1251 if (emsg.length() > 0) {
1252 TRACEP(p, XERR, emsg);
1253 response->Send(kXR_InvalidRequest, emsg.c_str());
1254 return 0;
1255 }
1256 }
1257
1258 // Path and opt cannot contain multiple commands (e.g. file; rm *)
1259 if (CheckForbiddenChars(path.c_str()) != 0) {
1260 emsg = "none of the characters '(){};' are allowed in path string ("; emsg += path; emsg += ")";
1261 TRACEP(p, XERR, emsg);
1262 response->Send(kXR_InvalidRequest, emsg.c_str());
1263 return 0;
1264 }
1265 if (CheckForbiddenChars(opt.c_str()) != 0) {
1266 emsg = "none of the characters '(){};' are allowed in opt string ("; emsg += opt; emsg += ")";
1267 TRACEP(p, XERR, emsg);
1268 response->Send(kXR_InvalidRequest, emsg.c_str());
1269 return 0;
1270 }
1271
1272 // Check if we have to forward this request
1273 XrdOucString result;
1274 bool islocal = fMgr->NetMgr()->IsLocal(node.c_str(), 1);
1275 if (fMgr->SrvType() != kXPD_Worker) {
1276 int type = ntohl(p->Request()->proof.int1);
1277 if (node == "all") {
1278 if (action == kStat || action == kMd5sum) {
1279 emsg = "action cannot be run in mode 'all' - running on master only";
1280 response->Send(kXR_attn, kXPD_srvmsg, 2, (char *)emsg.c_str(), emsg.length());
1281 } else {
1282 fMgr->NetMgr()->Broadcast(type, msg.c_str(), p->Client()->User(), response, 0, action);
1283 }
1284 } else if (!islocal) {
1285 // Create 'url'
1286 XrdOucString u = (p->Client()->User()) ? p->Client()->User() : fMgr->EffectiveUser();
1287 u += '@';
1288 u += node;
1289 TRACEP(p, HDBG, "sending request to "<<u);
1290 // Send request
1291 XrdClientMessage *xrsp;
1292 if (!(xrsp = fMgr->NetMgr()->Send(u.c_str(), type, msg.c_str(), 0, response, 0, action))) {
1293 TRACEP(p, XERR, "problems sending request to "<<u);
1294 } else {
1295 if (action == kStat || action == kMd5sum) {
1296 // Extract the result
1297 result.assign((const char *) xrsp->GetData(), 0, xrsp->DataLen());
1298 } else if (action == kRm) {
1299 // Send 'OK'
1300 result = "OK";
1301 }
1302 }
1303 // Cleanup answer
1304 SafeDel(xrsp);
1305 }
1306 }
1307
1308 // We may not have been requested to execute the command
1309 if (node != "all" && !islocal) {
1310 // We are done: acknowledge user ...
1311 if (result.length() > 0) {
1312 response->Send(result.c_str());
1313 } else {
1314 response->Send();
1315 }
1316 // ... and go
1317 return 0;
1318 }
1319
1320 // Here we execute the request
1321 XrdOucString cmd, pfx(fMgr->Host());
1322 pfx += ":"; pfx += fMgr->Port();
1323
1324 // Notify the client
1325 if (node != "all") {
1326 if (action != kStat && action != kMd5sum && action != kRm) {
1327 emsg = "Node: "; emsg += pfx;
1328 emsg += "\n-----";
1329 response->Send(kXR_attn, kXPD_srvmsg, 2, (char *)emsg.c_str(), emsg.length());
1330 }
1331 pfx = "";
1332 } else {
1333 pfx += "| ";
1334 }
1335
1336 // Get the full path, check if in sandbox and if the user is allowed
1337 // to access it
1338 XrdOucString fullpath(path);
1339 bool sandbox = 0;
1340 bool haswild = (fullpath.find('*') != STR_NPOS) ? 1 : 0;
1341 int check = (action == kMore || action == kTail ||
1342 action == kGrep || action == kMd5sum) ? 2 : 1;
1343 if ((action == kRm || action == kLs) && haswild) check = 0;
1344 int rccp = 0;
1345 struct stat st;
1346 if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
1347 fullpath, check, sandbox, &st, emsg)) != 0) {
1348 if (rccp == -2) {
1349 emsg = cmds[action];
1350 emsg += ": cannot ";
1351 emsg += actcmds[action];
1352 emsg += " `";
1353 emsg += fullpath;
1354 emsg += "': No such file or directory";
1355 } else if (rccp == -3) {
1356 emsg = cmds[action];
1357 emsg += ": cannot stat ";
1358 emsg += fullpath;
1359 emsg += ": errno: ";
1360 emsg += (int) errno;
1361 } else if (rccp == -4) {
1362 emsg = cmds[action];
1363 emsg += ": ";
1364 emsg += fullpath;
1365 emsg += ": Is not a regular file";
1366 }
1367 TRACEP(p, XERR, emsg);
1368 response->Send(kXR_InvalidRequest, emsg.c_str());
1369 return 0;
1370 }
1371
1372 // Additional checks for remove requests
1373 if (action == kRm) {
1374 // Ownership required and no support for wild cards for absolute paths
1375 if (!sandbox) {
1376 if (haswild) {
1377 emsg = "not allowed to rm with wild cards on path: ";
1378 emsg += fullpath;
1379 TRACEP(p, XERR, emsg);
1380 response->Send(kXR_InvalidRequest, emsg.c_str());
1381 return 0;
1382 }
1383 if ((int) st.st_uid != tgtclnt->UI().fUid || (int) st.st_gid != tgtclnt->UI().fGid) {
1384 emsg = "rm on path: ";
1385 emsg += fullpath;
1386 emsg += " requires ownership; path owned by: (";
1387 emsg += (int) st.st_uid; emsg += ",";
1388 emsg += (int) st.st_gid; emsg += ")";
1389 TRACEP(p, XERR, emsg);
1390 response->Send(kXR_InvalidRequest, emsg.c_str());
1391 return 0;
1392 }
1393 } else {
1394 // Will not allow to remove basic sandbox sub-dirs
1395 const char *sbdir[5] = {"queries", "packages", "cache", "datasets", "data"};
1396 while (fullpath.endswith('/'))
1397 fullpath.erasefromend(1);
1398 XrdOucString sball(tgtclnt->Sandbox()->Dir()), sball1 = sball;
1399 sball += "/*"; sball1 += "/*/";
1400 if (fullpath == sball || fullpath == sball1) {
1401 emsg = "removing all sandbox directory is not allowed: ";
1402 emsg += fullpath;
1403 TRACEP(p, XERR, emsg);
1404 response->Send(kXR_InvalidRequest, emsg.c_str());
1405 return 0;
1406 }
1407 int kk = 5;
1408 while (kk--) {
1409 if (fullpath.endswith(sbdir[kk])) {
1410 emsg = "removing a basic sandbox directory is not allowed: ";
1411 emsg += fullpath;
1412 TRACEP(p, XERR, emsg);
1413 response->Send(kXR_InvalidRequest, emsg.c_str());
1414 return 0;
1415 }
1416 }
1417 }
1418
1419 // Prepare the command
1420 cmd = cmds[action];
1421 if (opt.length() <= 0) opt = "-f";
1422 cmd += " "; cmd += opt;
1423 cmd += " "; cmd += fullpath;
1424 cmd += " 2>&1";
1425
1426 } else {
1427
1428 XrdOucString rederr;
1429 cmd = cmds[action];
1430 switch (action) {
1431 case kLs:
1432 if (opt.length() <= 0) opt = "-C";
1433 rederr = " 2>&1";
1434 break;
1435 case kMore:
1436 case kGrep:
1437 case kTail:
1438 case kFind:
1439 rederr = " 2>&1";
1440 break;
1441 case kStat:
1442 cmd = "";
1443 opt = "";
1444 break;
1445 case kMd5sum:
1446 opt = "";
1447 rederr = " 2>&1";
1448 break;
1449 }
1450 if (action != kFind) {
1451 if (cmd.length() > 0) cmd += " ";
1452 if (opt.length() > 0) { cmd += opt; cmd += " ";}
1453 cmd += fullpath;
1454 } else {
1455 cmd += " "; cmd += fullpath;
1456 if (opt.length() > 0) { cmd += " "; cmd += opt; }
1457 }
1458 if (rederr.length() > 0) cmd += rederr;
1459 }
1460
1461 // Run the command now
1462 emsg = pfx;
1463 if (ExecCmd(p, response, action, cmd.c_str(), emsg) != 0) {
1464 TRACEP(p, XERR, emsg);
1465 response->Send(kXR_ServerError, emsg.c_str());
1466 } else {
1467 // Done
1468 switch (action) {
1469 case kStat:
1470 case kMd5sum:
1471 response->Send(emsg.c_str());
1472 break;
1473 case kRm:
1474 response->Send("OK");
1475 break;
1476 default:
1477 response->Send();
1478 break;
1479 }
1480 }
1481
1482 // Over
1483 return 0;
1484}
1485
1486////////////////////////////////////////////////////////////////////////////////
1487/// Low-level execution handler. The commands must be executed in user space.
1488/// We do that by forking and logging as user in the forked instance. The
1489/// parent will just send over te messages received from the user-child via
1490/// the pipe.
1491/// Return 0 on success, -1 on error
1492
1494 int action, const char *cmd, XrdOucString &emsg)
1495{
1496 XPDLOC(ALL, "Admin::ExecCmd")
1497
1498 int rc = 0;
1499 XrdOucString pfx = emsg;
1500 emsg = "";
1501
1502 // We do it via the shell
1503 if (!cmd || strlen(cmd) <= 0) {
1504 emsg = "undefined command!";
1505 return -1;
1506 }
1507
1508 // Pipe for child-to-parent communications
1509 XrdProofdPipe pp;
1510 if (!pp.IsValid()) {
1511 emsg = "cannot create the pipe";
1512 return -1;
1513 }
1514
1515 // Fork a test agent process to handle this session
1516 TRACEP(p, DBG, "forking to execute in the private sandbox");
1517 int pid = -1;
1518 if (!(pid = fMgr->Sched()->Fork("adminexeccmd"))) {
1519 // Child process
1520 // We set to the user environment as we must to run the command
1521 // in the user space
1522 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
1523 emsg = "SetUserEnvironment did not return OK";
1524 rc = 1;
1525 } else {
1526 // Execute the command
1527 if (action == kStat) {
1528 struct stat st;
1529 if ((stat(cmd, &st)) != 0) {
1530 if (errno == ENOENT) {
1531 emsg += "stat: cannot stat `";
1532 emsg += cmd;
1533 emsg += "': No such file or directory";
1534 } else {
1535 emsg += "stat: cannot stat ";
1536 emsg += cmd;
1537 emsg += ": errno: ";
1538 emsg += (int) errno;
1539 }
1540 } else {
1541 // Fill the buffer and go
1542 char msg[256];
1543 int islink = S_ISLNK(st.st_mode);
1544 snprintf(msg, 256, "%ld %ld %d %d %d %lld %ld %d", (long)st.st_dev,
1545 (long)st.st_ino, st.st_mode, (int)(st.st_uid),
1546 (int)(st.st_gid), (kXR_int64)st.st_size, st.st_mtime, islink);
1547 emsg = msg;
1548 }
1549 } else {
1550 // Execute the command in a pipe
1551 FILE *fp = popen(cmd, "r");
1552 if (!fp) {
1553 emsg = "could not run '"; emsg += cmd; emsg += "'";
1554 rc = 1;
1555 } else {
1556 // Read line by line
1557 int pfxlen = pfx.length();
1558 int len = 0;
1559 char line[2048];
1560 char buf[1024];
1561 int bufsiz = 1024, left = bufsiz - 1, lines = 0;
1562 while (fgets(line, sizeof(line), fp)) {
1563 // Parse the line
1564 int llen = strlen(line);
1565 lines++;
1566 // If md5sum, we need to parse only the first line
1567 if (lines == 1 && action == kMd5sum) {
1568 if (line[llen-1] == '\n') {
1569 line[llen-1] = '\0';
1570 llen--;
1571 }
1572#if !defined(__APPLE__)
1573 // The first token
1574 XrdOucString sl(line);
1575 sl.tokenize(emsg, 0, ' ');
1576#else
1577 // The last token
1578 XrdOucString sl(line), tkn;
1579 int from = 0;
1580 while ((from = sl.tokenize(tkn, from, ' ')) != STR_NPOS) {
1581 emsg = tkn;
1582 }
1583#endif
1584 break;
1585 }
1586 // Send over this part, if no more space
1587 if ((llen + pfxlen) > left) {
1588 buf[len] = '\0';
1589 if (buf[len-1] == '\n') buf[len-1] = '\0';
1590 if (r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) &buf[0], len) != 0) {
1591 emsg = "error sending message to requester";
1592 rc = 1;
1593 break;
1594 }
1595 buf[0] = 0;
1596 len = 0;
1597 left = bufsiz -1;
1598 }
1599 // Add prefix to the buffer, if any
1600 if (pfxlen > 0) {
1601 memcpy(buf+len, pfx.c_str(), pfxlen);
1602 len += pfxlen;
1603 left -= pfxlen;
1604 }
1605 // Add line to the buffer
1606 memcpy(buf+len, line, llen);
1607 len += llen;
1608 left -= llen;
1609 // Check if we have been interrupted
1610 if (lines > 0 && !(lines % 10)) {
1611 char b[1];
1612 if (p->Link()->Peek(&b[0], 1, 0) == 1) {
1613 p->Process(p->Link());
1614 if (p->IsCtrlC()) break;
1615 }
1616 }
1617 }
1618 // Send the last bunch
1619 if (len > 0) {
1620 buf[len] = '\0';
1621 if (buf[len-1] == '\n') buf[len-1] = '\0';
1622 if (r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) &buf[0], len) != 0) {
1623 emsg = "error sending message to requester";
1624 rc = 1;
1625 }
1626 }
1627 // Close the pipe
1628 int rcpc = 0;
1629 if ((rcpc = pclose(fp)) == -1) {
1630 emsg = "could not close the command pipe";
1631 rc = 1;
1632 }
1633 if (WEXITSTATUS(rcpc) != 0) {
1634 emsg = "failure: return code: ";
1635 emsg += (int) WEXITSTATUS(rcpc);
1636 rc = 1;
1637 }
1638 }
1639 }
1640 }
1641 // Send error, if any
1642 if (rc == 1) {
1643 // Post Error
1644 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
1645 }
1646
1647 // End-Of-Transmission
1648 if (pp.Post(0, emsg.c_str()) != 0) rc = 1;
1649
1650 // Done
1651 exit(rc);
1652 }
1653
1654 // Parent process
1655 if (pid < 0) {
1656 emsg = "forking failed - errno: "; emsg += (int) errno;
1657 return -1;
1658 }
1659
1660 // now we wait for the callback to be (successfully) established
1661 TRACEP(p, DBG, "forking OK: wait for information");
1662
1663 // Read status-of-setup from pipe
1664 int prc = 0, rst = -1;
1665 // We wait for 60 secs max among transfers
1666 while (rst < 0 && rc >= 0) {
1667 while ((prc = pp.Poll(60)) > 0) {
1668 XpdMsg msg;
1669 if (pp.Recv(msg) != 0) {
1670 emsg = "error receiving message from pipe";
1671 return -1;
1672 }
1673 // Status is the message type
1674 rst = msg.Type();
1675 // Read string, if any
1676 XrdOucString buf;
1677 if (rst < 0) {
1678 buf = msg.Buf();
1679 if (buf.length() <= 0) {
1680 emsg = "error reading string from received message";
1681 return -1;
1682 }
1683 // Store error message
1684 emsg = buf;
1685 } else {
1686 if (action == kMd5sum || action == kStat) {
1687 buf = msg.Buf();
1688 if (buf.length() <= 0) {
1689 emsg = "error reading string from received message";
1690 return -1;
1691 }
1692 // Store md5sum
1693 emsg = buf;
1694 }
1695 // Done
1696 break;
1697 }
1698 }
1699 if (prc == 0) {
1700 emsg = "timeout from poll";
1701 return -1;
1702 } else if (prc < 0) {
1703 emsg = "error from poll - errno: "; emsg += -prc;
1704 return -1;
1705 }
1706 }
1707
1708 // Done
1709 return rc;
1710}
1711
1712////////////////////////////////////////////////////////////////////////////////
1713/// Handle request for sending a file
1714
1715int XrdProofdAdmin::CheckPath(bool superuser, const char *sbdir,
1716 XrdOucString &fullpath, int check, bool &sandbox,
1717 struct stat *st, XrdOucString &emsg)
1718{
1719 if (!sbdir || strlen(sbdir) <= 0) {
1720 emsg = "CheckPath: sandbox dir undefined!";
1721 return -1;
1722 }
1723
1724 // Get the full path and check if in sandbox
1725 XrdOucString path(fullpath);
1726 sandbox = 0;
1727 if (path.beginswith('/')) {
1728 fullpath = path;
1729 if (fullpath.beginswith(sbdir)) sandbox = 1;
1730 } else {
1731 if (path.beginswith("../")) path.erase(0,2);
1732 if (path.beginswith("./") || path.beginswith("~/")) path.erase(0,1);
1733 if (!path.beginswith("/")) path.insert('/',0);
1734 fullpath = sbdir;
1735 fullpath += path;
1736 sandbox = 1;
1737 }
1738 fullpath.replace("//","/");
1739
1740 // If the path is absolute, we must check a normal user is allowed to browse
1741 if (!sandbox && !superuser) {
1742 bool notfound = 1;
1743 std::list<XrdOucString>::iterator si = fExportPaths.begin();
1744 while (si != fExportPaths.end()) {
1745 if (path.beginswith((*si).c_str())) {
1746 notfound = 0;
1747 break;
1748 }
1749 ++si;
1750 }
1751 if (notfound) {
1752 emsg = "CheckPath: not allowed to run the requested action on ";
1753 emsg += path;
1754 return -1;
1755 }
1756 }
1757
1758 if (check > 0 && st) {
1759 // Check if the file exists
1760 if (stat(fullpath.c_str(), st) != 0) {
1761 if (errno == ENOENT) {
1762 return -2;
1763 } else {
1764 return -3;
1765 }
1766 }
1767
1768 // Certain actions require a regular file
1769 if ((check == 2) && !S_ISREG(st->st_mode)) return -4;
1770 }
1771
1772 // Done
1773 return 0;
1774}
1775
1776////////////////////////////////////////////////////////////////////////////////
1777/// Handle request for sending a file
1778
1780{
1781 XPDLOC(ALL, "Admin::GetFile")
1782
1783 int rc = 0;
1784 XPD_SETRESP(p, "GetFile");
1785
1786 XrdOucString emsg;
1787
1788 // Target client (default us)
1789 XrdProofdClient *tgtclnt = p->Client();
1790 if (!tgtclnt) {
1791 emsg = "client instance not found";
1792 TRACEP(p, XERR, emsg);
1793 response->Send(kXR_InvalidRequest, emsg.c_str());
1794 return 0;
1795 }
1796
1797 // Parse the string
1798 int dlen = p->Request()->header.dlen;
1799 XrdOucString path;
1800 if (dlen > 0 && p->Argp()->buff) {
1801 path.assign((const char *)p->Argp()->buff, 0, dlen);
1802 if (path.length() <= 0) {
1803 TRACEP(p, XERR, "path missing!");
1804 response->Send(kXR_InvalidRequest, "path missing!");
1805 return 0;
1806 }
1807 }
1808
1809 // Get the full path, check if in sandbox and if the user is allowed
1810 // to access it
1811 XrdOucString fullpath(path);
1812 bool sandbox = 0, check = 2;
1813 int rccp = 0;
1814 struct stat st;
1815 if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
1816 fullpath, check, sandbox, &st, emsg)) != 0) {
1817 if (rccp == -2) {
1818 emsg = "Cannot open `";
1819 emsg += fullpath;
1820 emsg += "': No such file or directory";
1821 } else if (rccp == -3) {
1822 emsg = "Cannot stat `";
1823 emsg += fullpath;
1824 emsg += "': errno: ";
1825 emsg += (int) errno;
1826 } else if (rccp == -4) {
1827 emsg = fullpath;
1828 emsg += " is not a regular file";
1829 }
1830 TRACEP(p, XERR, emsg);
1831 response->Send(kXR_InvalidRequest, emsg.c_str());
1832 return 0;
1833 }
1834
1835 // Pipe for child-to-parent communications
1836 XrdProofdPipe pp;
1837 if (!pp.IsValid()) {
1838 emsg = "cannot create the pipe for internal communications";
1839 TRACEP(p, XERR, emsg);
1840 response->Send(kXR_InvalidRequest, emsg.c_str());
1841 }
1842
1843 // Fork a test agent process to handle this session
1844 TRACEP(p, DBG, "forking to execute in the private sandbox");
1845 int pid = -1;
1846 if (!(pid = fMgr->Sched()->Fork("admingetfile"))) {
1847
1848 // Child process
1849 // We set to the user environment as we must to run the command
1850 // in the user space
1851 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
1852 emsg = "SetUserEnvironment did not return OK";
1853 rc = 1;
1854 } else {
1855
1856 // Open the file
1857 int fd = open(fullpath.c_str(), O_RDONLY);
1858 if (fd < 0) {
1859 emsg = "cannot open file: ";
1860 emsg += fullpath;
1861 emsg += " - errno:";
1862 emsg += (int) errno;
1863 TRACEP(p, XERR, emsg);
1864 response->Send(kXR_ServerError, emsg.c_str());
1865 rc = 1;
1866
1867 } else {
1868 // Send the size as OK message
1869 char sizmsg[64];
1870 snprintf(sizmsg, 64, "%lld", (kXR_int64) st.st_size);
1871 response->Send((const char *) &sizmsg[0]);
1872 TRACEP(p, XERR, "size is "<<sizmsg<<" bytes");
1873
1874 // Now we send the content
1875 const int kMAXBUF = 16384;
1876 char buf[kMAXBUF];
1877 off_t pos = 0;
1878 lseek(fd, pos, SEEK_SET);
1879
1880 while (rc == 0 && pos < st.st_size) {
1881 off_t left = st.st_size - pos;
1882 if (left > kMAXBUF) left = kMAXBUF;
1883
1884 int siz;
1885 while ((siz = read(fd, &buf[0], left)) < 0 && errno == EINTR)
1886 errno = 0;
1887 if (siz < 0 || siz != left) {
1888 emsg = "error reading from file: errno: ";
1889 emsg += (int) errno;
1890 rc = 1;
1891 break;
1892 }
1893
1894 int src = 0;
1895 if ((src = response->Send(kXR_attn, kXPD_msg, (void *)&buf[0], left)) != 0) {
1896 emsg = "error reading from file: errno: ";
1897 emsg += src;
1898 rc = 1;
1899 break;
1900 }
1901 // Re-position
1902 pos += left;
1903 // Reset the timeout
1904 if (pp.Post(0, "") != 0) {
1905 rc = 1;
1906 break;
1907 }
1908 }
1909 // Close the file
1910 close(fd);
1911 // Send error, if any
1912 if (rc != 0) {
1913 TRACEP(p, XERR, emsg);
1914 response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) emsg.c_str(), emsg.length());
1915 }
1916 }
1917 }
1918
1919 // Send error, if any
1920 if (rc == 1) {
1921 // Post Error
1922 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
1923 } else {
1924 // End-Of-Transmission
1925 if (pp.Post(1, "") != 0) rc = 1;
1926 }
1927
1928 // Done
1929 exit(rc);
1930 }
1931
1932 // Parent process
1933 if (pid < 0) {
1934 emsg = "forking failed - errno: "; emsg += (int) errno;
1935 TRACEP(p, XERR, emsg);
1936 response->Send(kXR_ServerError, emsg.c_str());
1937 return 0;
1938 }
1939
1940 // The parent is done: wait for the child
1941 TRACEP(p, DBG, "forking OK: execution will continue in the child process");
1942
1943 // Wait for end-of-operations from pipe
1944 int prc = 0, rst = 0;
1945 // We wait for 60 secs max among transfers
1946 while (rst == 0 && rc >= 0) {
1947 while ((prc = pp.Poll(60)) > 0) {
1948 XpdMsg msg;
1949 if (pp.Recv(msg) != 0) {
1950 emsg = "error receiving message from pipe";
1951 return -1;
1952 }
1953 // Status is the message type
1954 rst = msg.Type();
1955 // Read string, if any
1956 if (rst < 0) {
1957 // Error
1958 rc = -1;
1959 // Store error message
1960 emsg = msg.Buf();
1961 if (emsg.length() <= 0) {
1962 emsg = "error reading string from received message";
1963 }
1964 // We stop here
1965 break;
1966 } else if (rst > 0) {
1967 // We are done
1968 break;
1969 }
1970 }
1971 if (prc == 0) {
1972 emsg = "timeout from poll";
1973 rc = -1;
1974 } else if (prc < 0) {
1975 emsg = "error from poll - errno: "; emsg += -prc;
1976 rc = -1;
1977 }
1978 }
1979
1980 // The parent is done
1981 TRACEP(p, DBG, "execution over: "<< ((rc == 0) ? "ok" : "failed"));
1982
1983 // Done
1984 return 0;
1985}
1986
1987////////////////////////////////////////////////////////////////////////////////
1988/// Handle request for recieving a file
1989
1991{
1992 XPDLOC(ALL, "Admin::PutFile")
1993
1994 int rc = 0;
1995 XPD_SETRESP(p, "PutFile");
1996
1997 XrdOucString emsg;
1998
1999 // Target client (default us)
2000 XrdProofdClient *tgtclnt = p->Client();
2001 if (!tgtclnt) {
2002 emsg = "client instance not found";
2003 TRACEP(p, XERR, emsg);
2004 response->Send(kXR_InvalidRequest, emsg.c_str());
2005 return 0;
2006 }
2007
2008 // Parse the string
2009 kXR_int64 size = -1;
2010 int dlen = p->Request()->header.dlen;
2011 XrdOucString cmd, path, ssiz, opt;
2012 if (dlen > 0 && p->Argp()->buff) {
2013 cmd.assign((const char *)p->Argp()->buff, 0, dlen);
2014 if (cmd.length() <= 0) {
2015 TRACEP(p, XERR, "input buffer missing!");
2016 response->Send(kXR_InvalidRequest, "input buffer missing!");
2017 return 0;
2018 }
2019 int from = 0;
2020 if ((from = cmd.tokenize(path, from, ' ')) < 0) {
2021 TRACEP(p, XERR, "cannot resolve path!");
2022 response->Send(kXR_InvalidRequest, "cannot resolve path!");
2023 return 0;
2024 }
2025 if ((from = cmd.tokenize(ssiz, from, ' ')) < 0) {
2026 TRACEP(p, XERR, "cannot resolve word with size!");
2027 response->Send(kXR_InvalidRequest, "cannot resolve word with size!");
2028 return 0;
2029 }
2030 // Extract size
2031 size = atoll(ssiz.c_str());
2032 if (size < 0) {
2033 TRACEP(p, XERR, "cannot resolve size!");
2034 response->Send(kXR_InvalidRequest, "cannot resolve size!");
2035 return 0;
2036 }
2037 // Any option?
2038 cmd.tokenize(opt, from, ' ');
2039 }
2040 TRACEP(p, DBG, "path: '"<<path<<"'; size: "<<size<<" bytes; opt: '"<<opt<<"'");
2041
2042 // Default open and mode flags
2043 kXR_unt32 openflags = O_WRONLY | O_TRUNC | O_CREAT;
2044 kXR_unt32 modeflags = 0600;
2045
2046 // Get the full path and check if in sandbox and if the user is allowed
2047 // to create/access it
2048 XrdOucString fullpath(path);
2049 bool sandbox = 0, check = 1;
2050 struct stat st;
2051 int rccp = 0;
2052 if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
2053 fullpath, check, sandbox, &st, emsg)) != 0) {
2054 if (rccp == -3) {
2055 emsg = "File `";
2056 emsg += fullpath;
2057 emsg += "' exists but cannot be stat: errno: ";
2058 emsg += (int) errno;
2059 }
2060 if (rccp != -2) {
2061 TRACEP(p, XERR, emsg);
2062 response->Send(kXR_InvalidRequest, emsg.c_str());
2063 return 0;
2064 }
2065 } else {
2066 // File exists: either force deletion or fail
2067 if (opt == "force") {
2068 openflags = O_WRONLY | O_TRUNC;
2069 } else {
2070 emsg = "file'";
2071 emsg += fullpath;
2072 emsg += "' exists; user option 'force' to override it";
2073 TRACEP(p, XERR, emsg);
2074 response->Send(kXR_InvalidRequest, emsg.c_str());
2075 return 0;
2076 }
2077 }
2078
2079 // Pipe for child-to-parent communications
2080 XrdProofdPipe pp;
2081 if (!pp.IsValid()) {
2082 emsg = "cannot create the pipe for internal communications";
2083 TRACEP(p, XERR, emsg);
2084 response->Send(kXR_InvalidRequest, emsg.c_str());
2085 }
2086
2087 // Fork a test agent process to handle this session
2088 TRACEP(p, DBG, "forking to execute in the private sandbox");
2089 int pid = -1;
2090 if (!(pid = fMgr->Sched()->Fork("adminputfile"))) {
2091 // Child process
2092 // We set to the user environment as we must to run the command
2093 // in the user space
2094 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
2095 emsg = "SetUserEnvironment did not return OK";
2096 rc = 1;
2097 } else {
2098 // Open the file
2099 int fd = open(fullpath.c_str(), openflags, modeflags);
2100 if (fd < 0) {
2101 emsg = "cannot open file: ";
2102 emsg += fullpath;
2103 emsg += " - errno: ";
2104 emsg += (int) errno;
2105 TRACEP(p, XERR, emsg);
2106 response->Send(kXR_ServerError, emsg.c_str());
2107 rc = 1;
2108 } else {
2109 // We read in the content sent by the client
2110 rc = 0;
2111 response->Send("OK");
2112 // Receive the file
2113 const int kMAXBUF = XrdProofdProtocol::MaxBuffsz();
2114 // Get a buffer
2115 XrdBuffer *argp = XrdProofdProtocol::GetBuff(kMAXBUF);
2116 if (!argp) {
2117 emsg = "cannot get buffer to read data out";
2118 rc = 1;
2119 }
2120 int r;
2121 kXR_int64 filesize = 0, left = 0;
2122 while (rc == 0 && filesize < size) {
2123 left = size - filesize;
2124 if (left > kMAXBUF) left = kMAXBUF;
2125 // Read a bunch of data
2126 TRACEP(p, ALL, "receiving "<<left<<" ...");
2127 if ((rc = p->GetData("data", argp->buff, left))) {
2129 emsg = "cannot read data out";
2130 rc = 1;
2131 break;
2132 }
2133 // Update counters
2134 filesize += left;
2135 // Write to local file
2136 char *b = argp->buff;
2137 r = left;
2138 while (r) {
2139 int w = 0;
2140 while ((w = write(fd, b, r)) < 0 && errno == EINTR)
2141 errno = 0;
2142 if (w < 0) {
2143 emsg = "error writing to unit: ";
2144 emsg += fd;
2145 rc = 1;
2146 break;
2147 }
2148 r -= w;
2149 b += w;
2150 }
2151 // Reset the timeout
2152 if (pp.Post(0, "") != 0) {
2153 rc = 1;
2154 break;
2155 }
2156 }
2157 // Close the file
2158 close(fd);
2159 // Release the buffer
2161 // Send error, if any
2162 if (rc != 0) {
2163 TRACEP(p, XERR, emsg);
2164 response->Send(kXR_attn, kXPD_srvmsg, 0, (char *) emsg.c_str(), emsg.length());
2165 }
2166 }
2167 }
2168 // Send error, if any
2169 if (rc == 1) {
2170 // Post Error
2171 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
2172 } else {
2173 // End-Of-Transmission
2174 if (pp.Post(1, "") != 0) rc = 1;
2175 }
2176 // Done
2177 exit(rc);
2178 }
2179
2180 // Parent process
2181 if (pid < 0) {
2182 emsg = "forking failed - errno: "; emsg += (int) errno;
2183 TRACEP(p, XERR, emsg);
2184 response->Send(kXR_ServerError, emsg.c_str());
2185 return 0;
2186 }
2187
2188 // The parent is done: wait for the child
2189 TRACEP(p, DBG, "forking OK: execution will continue in the child process");
2190
2191 // Wait for end-of-operations from pipe
2192 int prc = 0, rst = 0;
2193 // We wait for 60 secs max among transfers
2194 while (rst == 0 && rc >= 0) {
2195 while ((prc = pp.Poll(60)) > 0) {
2196 XpdMsg msg;
2197 if (pp.Recv(msg) != 0) {
2198 emsg = "error receiving message from pipe";
2199 return -1;
2200 }
2201 // Status is the message type
2202 rst = msg.Type();
2203 // Read string, if any
2204 if (rst < 0) {
2205 // Error
2206 rc = -1;
2207 // Store error message
2208 emsg = msg.Buf();
2209 if (emsg.length() <= 0) {
2210 emsg = "error reading string from received message";
2211 }
2212 // We stop here
2213 break;
2214 } else if (rst > 0) {
2215 // We are done
2216 break;
2217 }
2218 }
2219 if (prc == 0) {
2220 emsg = "timeout from poll";
2221 rc = -1;
2222 } else if (prc < 0) {
2223 emsg = "error from poll - errno: "; emsg += -prc;
2224 rc = -1;
2225 }
2226 }
2227
2228 // The parent is done
2229 TRACEP(p, DBG, "execution over: "<< ((rc == 0) ? "ok" : "failed"));
2230
2231 // Done
2232 return 0;
2233}
2234
2235////////////////////////////////////////////////////////////////////////////////
2236/// Handle request for copy files from / to the sandbox
2237
2239{
2240 XPDLOC(ALL, "Admin::CpFile")
2241
2242 int rc = 0;
2243 XPD_SETRESP(p, "CpFile");
2244
2245 XrdOucString emsg;
2246
2247 // Target client (default us)
2248 XrdProofdClient *tgtclnt = p->Client();
2249 if (!tgtclnt) {
2250 emsg = "client instance not found";
2251 TRACEP(p, XERR, emsg);
2252 response->Send(kXR_InvalidRequest, emsg.c_str());
2253 return 0;
2254 }
2255
2256 // Parse the string
2257 int dlen = p->Request()->header.dlen;
2258 XrdOucString buf, src, dst, fmt;
2259 if (dlen > 0 && p->Argp()->buff) {
2260 buf.assign((const char *)p->Argp()->buff, 0, dlen);
2261 if (buf.length() <= 0) {
2262 TRACEP(p, XERR, "input buffer missing!");
2263 response->Send(kXR_InvalidRequest, "input buffer missing!");
2264 return 0;
2265 }
2266 int from = 0;
2267 if ((from = buf.tokenize(src, from, ' ')) < 0) {
2268 TRACEP(p, XERR, "cannot resolve src path!");
2269 response->Send(kXR_InvalidRequest, "cannot resolve src path!");
2270 return 0;
2271 }
2272 if ((from = buf.tokenize(dst, from, ' ')) < 0) {
2273 TRACEP(p, XERR, "cannot resolve dst path!");
2274 response->Send(kXR_InvalidRequest, "cannot resolve dst path!");
2275 return 0;
2276 }
2277 // The rest, if any, is the format string (including options)
2278 fmt.assign(buf, from);
2279 }
2280 TRACEP(p, DBG, "src: '"<<src<<"'; dst: '"<<dst<<"'; fmt: '"<<fmt<<"'");
2281
2282 // Check paths
2283 bool locsrc = 1;
2284 XrdClientUrlInfo usrc(src.c_str());
2285 if (usrc.Proto.length() > 0 && usrc.Proto != "file") {
2286 locsrc = 0;
2287 if (!fAllowedCpCmds.Find(usrc.Proto.c_str())) {
2288 TRACEP(p, XERR, "protocol for source file not supported");
2289 response->Send(kXR_InvalidRequest, "protocol for source file not supported");
2290 return 0;
2291 }
2292 }
2293 if (usrc.Proto == "file") src = usrc.File;
2294 bool locdst = 1;
2295 XrdClientUrlInfo udst(dst.c_str());
2296 if (udst.Proto.length() > 0 && udst.Proto != "file") {
2297 locdst = 0;
2298 if (!fAllowedCpCmds.Find(udst.Proto.c_str())) {
2299 TRACEP(p, XERR, "protocol for destination file not supported");
2300 response->Send(kXR_InvalidRequest, "protocol for destination file not supported");
2301 return 0;
2302 }
2303 }
2304 if (udst.Proto == "file") dst = udst.File;
2305
2306 // Locate the remote protocol, if any
2307 bool loc2loc = 1;
2308 bool loc2rem = 0;
2309 bool rem2loc = 0;
2310 XpdAdminCpCmd *xc = 0;
2311 if (!locsrc && !locdst) {
2312 // Files cannot be both remote
2313 TRACEP(p, XERR, "At least destination or source must be local");
2314 response->Send(kXR_InvalidRequest, "At least destination or source must be local");
2315 return 0;
2316 } else if (!locdst) {
2317 // Find the requested protocol and check if we can put
2318 xc = fAllowedCpCmds.Find(udst.Proto.c_str());
2319 if (!xc->fCanPut) {
2320 TRACEP(p, XERR, "not allowed to create destination file with the chosen protocol");
2321 response->Send(kXR_InvalidRequest, "not allowed to create destination file with the chosen protocol");
2322 return 0;
2323 }
2324 loc2loc = 0;
2325 loc2rem = 1;
2326 } else if (!locsrc) {
2327 // Find the requested protocol
2328 xc = fAllowedCpCmds.Find(usrc.Proto.c_str());
2329 loc2loc = 0;
2330 rem2loc = 1;
2331 } else {
2332 // Default local protocol
2333 xc = fAllowedCpCmds.Find("file");
2334 }
2335
2336 // Check the local paths
2337 XrdOucString srcpath(src), dstpath(dst);
2338 bool sbsrc = 0, sbdst = 0;
2339 struct stat stsrc, stdst;
2340 int rccpsrc = 0, rccpdst = 0;
2341 if (loc2loc || loc2rem) {
2342 if ((rccpsrc = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
2343 srcpath, 2, sbsrc, &stsrc, emsg)) != 0) {
2344 if (rccpsrc == -2) {
2345 emsg = xc->fCmd;
2346 emsg += ": cannot open `";
2347 emsg += srcpath;
2348 emsg += "': No such file or directory";
2349 } else if (rccpsrc == -3) {
2350 emsg = xc->fCmd;
2351 emsg += ": cannot stat ";
2352 emsg += srcpath;
2353 emsg += ": errno: ";
2354 emsg += (int) errno;
2355 } else if (rccpsrc == -4) {
2356 emsg = xc->fCmd;
2357 emsg += ": ";
2358 emsg += srcpath;
2359 emsg += ": Is not a regular file";
2360 }
2361 TRACEP(p, XERR, emsg);
2362 response->Send(kXR_InvalidRequest, emsg.c_str());
2363 return 0;
2364 }
2365 }
2366 if (loc2loc || rem2loc) {
2367 if ((rccpdst = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
2368 dstpath, 0, sbdst, &stdst, emsg)) != 0) {
2369 if (rccpdst == -2) {
2370 emsg = xc->fCmd;
2371 emsg += ": cannot open `";
2372 emsg += dstpath;
2373 emsg += "': No such file or directory";
2374 } else if (rccpdst == -3) {
2375 emsg = xc->fCmd;
2376 emsg += ": cannot stat ";
2377 emsg += dstpath;
2378 emsg += ": errno: ";
2379 emsg += (int) errno;
2380 } else if (rccpdst == -4) {
2381 emsg = xc->fCmd;
2382 emsg += ": ";
2383 emsg += dstpath;
2384 emsg += ": Is not a regular file";
2385 }
2386 TRACEP(p, XERR, emsg);
2387 response->Send(kXR_InvalidRequest, emsg.c_str());
2388 return 0;
2389 }
2390 }
2391
2392 // Check the format string
2393 if (fmt.length() <= 0) {
2394 fmt = xc->fFmt;
2395 } else {
2396 if (!fmt.beginswith(xc->fCmd)) {
2397 fmt.insert(" ", 0);
2398 fmt.insert(xc->fCmd, 0);
2399 }
2400 if (fmt.find("%s") == STR_NPOS) {
2401 fmt.insert(" %s %s", -1);
2402 }
2403 }
2404
2405 // Create the command now
2406 XrdOucString cmd;
2407 XrdProofdAux::Form(cmd, fmt.c_str(), srcpath.c_str(), dstpath.c_str());
2408 cmd += " 2>&1";
2409 TRACEP(p, DBG, "Executing command: " << cmd);
2410
2411 // Pipe for child-to-parent communications
2412 XrdProofdPipe pp;
2413 if (!pp.IsValid()) {
2414 emsg = "cannot create the pipe";
2415 TRACEP(p, XERR, emsg);
2416 response->Send(kXR_ServerError, emsg.c_str());
2417 return 0;
2418 }
2419
2420 // Fork a test agent process to handle this session
2421 TRACEP(p, DBG, "forking to execute in the private sandbox");
2422 int pid = -1;
2423 if (!(pid = fMgr->Sched()->Fork("admincpfile"))) {
2424 // Child process
2425 // We set to the user environment as we must to run the command
2426 // in the user space
2427 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
2428 emsg = "SetUserEnvironment did not return OK";
2429 rc = 1;
2430 } else {
2431 // Execute the command in a pipe
2432 FILE *fp = popen(cmd.c_str(), "r");
2433 if (!fp) {
2434 emsg = "could not run '"; emsg += cmd; emsg += "'";
2435 rc = 1;
2436 } else {
2437 // Read line by line
2438 char line[2048];
2439 while (fgets(line, sizeof(line), fp)) {
2440 // Parse the line
2441 int llen = strlen(line);
2442 if (llen > 0 && line[llen-1] == '\n') {
2443 line[llen-1] = '\0';
2444 llen--;
2445 }
2446 // Real-time sending (line-by-line)
2447 if (llen > 0 &&
2448 response->Send(kXR_attn, kXPD_srvmsg, 4, (char *) &line[0], llen) != 0) {
2449 emsg = "error sending message to requester";
2450 rc = 1;
2451 break;
2452 }
2453 // Check if we have been interrupted
2454 char b[1];
2455 if (p->Link()->Peek(&b[0], 1, 0) == 1) {
2456 p->Process(p->Link());
2457 if (p->IsCtrlC()) break;
2458 }
2459 // Reset timeout
2460 if (pp.Post(0, "") != 0) {
2461 rc = 1;
2462 break;
2463 }
2464 }
2465 // Close the pipe if not in error state (otherwise we may block here)
2466 int rcpc = 0;
2467 if ((rcpc = pclose(fp)) == -1) {
2468 emsg = "error while trying to close the command pipe";
2469 rc = 1;
2470 }
2471 if (WEXITSTATUS(rcpc) != 0) {
2472 emsg = "return code: ";
2473 emsg += (int) WEXITSTATUS(rcpc);
2474 rc = 1;
2475 }
2476 // Close the notification messages
2477 char cp[1] = {'\n'};
2478 if (response->Send(kXR_attn, kXPD_srvmsg, 3, (char *) &cp[0], 1) != 0) {
2479 emsg = "error sending progress notification to requester";
2480 rc = 1;
2481 }
2482 }
2483 }
2484 // Send error, if any
2485 if (rc == 1) {
2486 // Post Error
2487 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
2488 }
2489
2490 // End-Of-Transmission
2491 if (pp.Post(1, "") != 0) rc = 1;
2492
2493 // Done
2494 exit(rc);
2495 }
2496
2497 // Parent process
2498 if (pid < 0) {
2499 emsg = "forking failed - errno: "; emsg += (int) errno;
2500 return -1;
2501 }
2502
2503 // now we wait for the callback to be (successfully) established
2504 TRACEP(p, DBG, "forking OK: wait for execution");
2505
2506 // Read status-of-setup from pipe
2507 int prc = 0, rst = 0;
2508 // We wait for 60 secs max among transfers
2509 while (rst == 0 && rc >= 0) {
2510 while ((prc = pp.Poll(60)) > 0) {
2511 XpdMsg msg;
2512 if (pp.Recv(msg) != 0) {
2513 emsg = "error receiving message from pipe";;
2514 rc = -1;
2515 }
2516 // Status is the message type
2517 rst = msg.Type();
2518 // Read string, if any
2519 if (rst < 0) {
2520 // Error
2521 rc = -1;
2522 // Store error message
2523 emsg = msg.Buf();
2524 if (emsg.length() <= 0)
2525 emsg = "error reading string from received message";
2526 } else if (rst == 1) {
2527 // Done
2528 break;
2529 }
2530 }
2531 if (prc == 0) {
2532 emsg = "timeout from poll";
2533 rc = -1;
2534 } else if (prc < 0) {
2535 emsg = "error from poll - errno: "; emsg += -prc;
2536 rc = -1;
2537 }
2538 }
2539
2540 // The parent is done
2541 TRACEP(p, DBG, "execution over: "<< ((rc == 0) ? "ok" : "failed"));
2542
2543 if (rc != 0) {
2544 emsg.insert("failure: ", 0);
2545 TRACEP(p, XERR, emsg);
2546 response->Send(kXR_ServerError, emsg.c_str());
2547 } else {
2548 response->Send("OK");
2549 }
2550
2551 // Done
2552 return 0;
2553}
ROOT::R::TRInterface & r
Definition Object.C:4
#define d(i)
Definition RSha256.hxx:102
#define b(i)
Definition RSha256.hxx:100
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
#define g(i)
Definition RSha256.hxx:105
#define e(i)
Definition RSha256.hxx:103
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
#define TRACE(Flag, Args)
Definition TGHtml.h:121
int type
Definition TGX11.cxx:121
R__EXTERN C unsigned int sleep(unsigned int seconds)
@ kTail
@ kFind
@ kMore
@ kMd5sum
@ kGrep
@ kStat
@ kRm
@ kLs
@ kCleanupSessions
@ kPutFile
@ kGroupProperties
@ kExec
@ kQueryLogPaths
@ kCpFile
@ kQuerySessions
@ kSendMsgToUser
@ kROOTVersion
@ kReleaseWorker
@ kSessionAlias
@ kQueryWorkers
@ kGetFile
@ kGetWorkers
@ kQueryMssUrl
@ kQueryROOTVersions
@ kSessionTag
#define kXPD_Worker
@ kXPD_msg
@ kXPD_srvmsg
static int ExportCpCmd(const char *k, XpdAdminCpCmd *cc, void *s)
Decrease active session counters on worker w.
#define XPDFORM
#define SafeDel(x)
int DoDirectiveClass(XrdProofdDirective *, char *val, XrdOucStream *cfg, bool rcf)
Generic class directive processor.
#define SafeDelArray(x)
#define kXPROOFSRVTAGMAX
#define kXPROOFSRVALIASMAX
#define XPD_SETRESP(p, x)
#define XPDLOC(d, x)
#define TRACEP(p, act, x)
#define TRACING(x)
#define XPDERR(x)
const char * proto
Definition civetweb.c:16613
#define free
Definition civetweb.c:1539
#define snprintf
Definition civetweb.c:1540
XrdOucString fCmd
XrdOucString fFmt
int Type() const
const char * Buf() const
XrdOucString GetUrl()
XrdProofGroup * GetUserGroup(const char *usr, const char *grp=0)
Returns the instance of the first group to which this user belongs; if grp != 0, return the instance ...
virtual int ExportInfo(XrdOucString &)
Fill sbuf with some info about our current status.
XrdOucString fGroup
int CleanupSessions(XrdProofdProtocol *p)
Handle request of.
int ExecCmd(XrdProofdProtocol *p, XrdProofdResponse *r, int action, const char *cmd, XrdOucString &emsg)
Low-level execution handler.
int GetWorkers(XrdProofdProtocol *p)
Handle request for getting the best set of workers.
int SetSessionTag(XrdProofdProtocol *p)
Handle request for setting the session tag.
int SetSessionAlias(XrdProofdProtocol *p)
Handle request for setting the session alias.
int SetROOTVersion(XrdProofdProtocol *p)
Handle request for changing the default ROOT version.
XrdProofdAdmin(XrdProofdManager *mgr, XrdProtocol_Config *pi, XrdSysError *e)
Constructor.
int GetFile(XrdProofdProtocol *p)
Handle request for sending a file.
int PutFile(XrdProofdProtocol *p)
Handle request for recieving a file.
int SendMsgToUser(XrdProofdProtocol *p)
Handle request for sending a message to a user.
XrdProofdManager * fMgr
int QuerySessions(XrdProofdProtocol *p)
Handle request for list of sessions.
int CheckForbiddenChars(const char *s)
Check is 's' contains any of the forbidden chars '(){};' Return 0 if OK (no forbidden chars),...
int Exec(XrdProofdProtocol *p)
Handle request of cleaning parts of the sandbox.
int DoDirectiveCpCmd(char *, XrdOucStream *, bool)
Process 'cpcmd' directives eg: xpd.cpcmd alien aliencp fmt:"%s %s" put:0.
int CheckPath(bool superuser, const char *sbdir, XrdOucString &fullpath, int check, bool &sandbox, struct stat *st, XrdOucString &emsg)
Handle request for sending a file.
int DoDirective(XrdProofdDirective *d, char *val, XrdOucStream *cfg, bool rcf)
Update the priorities of the active sessions.
XrdOucHash< XpdAdminCpCmd > fAllowedCpCmds
int Config(bool rcf=0)
Run configuration and parse the entered config directives.
int Process(XrdProofdProtocol *p, int type)
Process admin request.
int SetGroupProperties(XrdProofdProtocol *p)
Handle request for setting group properties.
void RegisterDirectives()
Register directives for configuration.
int CpFile(XrdProofdProtocol *p)
Handle request for copy files from / to the sandbox.
std::list< XrdOucString > fExportPaths
int QueryMssUrl(XrdProofdProtocol *p)
Handle request for the URL to the MSS attached to the cluster.
XrdOucString fCpCmds
int QueryLogPaths(XrdProofdProtocol *p)
Handle request for log paths.
int QueryROOTVersions(XrdProofdProtocol *p)
Handle request for list of ROOT versions.
int DoDirectiveExportPath(char *, XrdOucStream *, bool)
Process 'exportpath' directives eg: xpd.exportpath /tmp/data /data2/data.
int ReleaseWorker(XrdProofdProtocol *p)
Handle request for releasing a worker.
int QueryWorkers(XrdProofdProtocol *p)
Handle request for getting the list of potential workers.
static void Form(XrdOucString &s, const char *fmt, int ns, const char *ss[5], int ni, int ii[6], int np, void *pp[5], int nu=0, unsigned int ui=0)
Recreate the string according to 'fmt', the up to 5 'const char *', up to 6 'int' arguments,...
static const char * AdminMsgType(int type)
Translates the admin message type in a human readable string.
XrdProofdClient * GetClient(const char *usr, const char *grp=0, bool create=1)
Handle request for localizing a client instance for {usr, grp} from the list.
void Broadcast(XrdProofdClient *c, const char *msg)
Broadcast message 'msg' to the connected instances of client 'clnt' or to all connected instances if ...
void TerminateSessions(XrdProofdClient *c, const char *msg, int srvtype)
Terminate sessions of client 'clnt' or to of all clients if clnt == 0.
XrdProofUI UI() const
XrdOucString ExportSessions(XrdOucString &emsg, XrdProofdResponse *r=0)
Return a string describing the existing sessions.
XrdProofdSandbox * Sandbox() const
XrdROOT * ROOT() const
const char * User() const
XrdProofdProofServ * GetServer(int psid)
Get from the vector server instance with ID psid.
virtual int Config(bool rcf=0)
void Register(const char *dname, XrdProofdDirective *d)
int GetWorkers(XrdOucString &workers, XrdProofdProofServ *, const char *)
Get a list of workers from the available resource broker.
XrdProofdPriorityMgr * PriorityMgr() const
const char * PoolURL() const
XrdProofSched * ProofSched() const
XrdROOTMgr * ROOTMgr() const
XrdProofdNetMgr * NetMgr() const
XrdProofGroupMgr * GroupsMgr() const
XrdScheduler * Sched() const
XrdProofdClientMgr * ClientMgr() const
const char * Host() const
const char * NameSpace() const
const char * EffectiveUser() const
XrdProofdProofServMgr * SessionMgr() const
int Broadcast(int type, const char *msg, const char *usr=0, XrdProofdResponse *r=0, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
bool IsLocal(const char *host, bool checkport=0)
Check if 'host' is this local host.
XrdClientMessage * Send(const char *url, int type, const char *msg, int srvtype, XrdProofdResponse *r, bool notify=0, int subtype=-1)
Broadcast request to known potential sub-nodes.
void Dump()
Dump status.
char * ReadLogPaths(const char *url, const char *stag, int isess)
Get log paths from next tier; used in multi-master setups Returns 0 in case of error.
bool IsValid() const
int Recv(XpdMsg &msg)
Recv message from the pipe.
int Poll(int to=-1)
Poll over the read pipe for to secs; return whatever poll returns.
int Post(int type, const char *msg)
Post message on the pipe.
int SetUserEnvironment(XrdProofdProtocol *p)
Set user environment: set effective user and group ID of the process to the ones of the owner of this...
void SetTag(const char *t)
const char * Tag() const
const char * Alias() const
void SetAlias(const char *a)
void RemoveWorker(const char *o)
Release worker assigned to this session with label 'o'.
XrdLink * Link() const
short int ProofProtocol() const
XrdProofdClient * Client() const
XrdBuffer * Argp() const
int Process(XrdLink *lp)
Process the information received on the active link.
static XrdBuffer * GetBuff(int quantum, XrdBuffer *argp=0)
Allocate a buffer to handle quantum bytes; if argp points to an existing buffer, its size is checked ...
XPClientRequest * Request() const
int GetData(const char *dtype, char *buff, int blen)
Get data from the open link.
static void ReleaseBuff(XrdBuffer *argp)
Release a buffer previously allocated via GetBuff.
const char * Dir() const
int GuessTag(XrdOucString &tag, int ridx=1)
Guess session tag completing 'tag' (typically "-<pid>") by scanning the active session file or the se...
XrdROOT * GetVersion(const char *tag)
Return pointer to the ROOT version corresponding to 'tag' or 0 if not found.
Definition XrdROOT.cxx:738
XrdROOT * DefaultVersion() const
Definition XrdROOT.h:118
XrdOucString ExportVersions(XrdROOT *def)
Return a string describing the available versions, with the default version 'def' markde with a '*'.
Definition XrdROOT.cxx:714
TLine * line
static const char * what
Definition stlLoader.cc:6
struct ClientRequestHdr header
struct XPClientProofRequest proof