1// @(#)root/proof:$Id$
2// Author: Fons Rademakers 13/02/97
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
13// //
14// TProof //
15// //
16// This class controls a Parallel ROOT Facility, PROOF, cluster. //
17// It fires the slave servers, it keeps track of how many slaves are //
18// running, it keeps track of the slaves running status, it broadcasts //
19// messages to all slaves, it collects results, etc. //
20// //
23#include "TProofCondor.h"
25#include "TCondor.h"
26#include "TList.h"
27#include "TMap.h"
28#include "TMessage.h"
29#include "TMonitor.h"
30#include "TProofNodeInfo.h"
32#include "TProofServ.h"
33#include "TSlave.h"
34#include "TSocket.h"
35#include "TString.h"
36#include "TTimer.h"
41/// Start proof using condor
43TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
44 const char *confdir, Int_t loglevel,
45 const char *, TProofMgr *mgr)
46 : fCondor(0), fTimer(0)
48 // Default initializations
51 // This may be needed during init
52 fManager = mgr;
54 fUrl = TUrl(masterurl);
56 if (!conffile || !conffile[0]) {
57 conffile = kPROOF_ConfFile;
58 } else if (!strncasecmp(conffile, "condor:", 7)) {
59 conffile+=7;
60 }
62 if (!confdir || !confdir[0]) {
63 confdir = kPROOF_ConfDir;
64 }
66 Init(masterurl, conffile, confdir, loglevel);
70/// Clean up Condor PROOF environment.
79/// Setup Condor workers using dynamic information
83 fCondor = new TCondor;
84 TString jobad = GetJobAd();
87 if (fImage.Length() == 0) {
88 Error("StartSlaves", "Empty Condor image found for system %s",
89 gSystem->HostName());
90 return kFALSE;
91 }
93 TList claims;
94 if (fConfFile.IsNull()) {
95 // startup all slaves if no config file given
96 TList *condorclaims = fCondor->Claim(9999, jobad);
97 TIter nextclaim(condorclaims);
98 while (TObject *o = nextclaim()) claims.Add(o);
99 } else {
100 // parse config file
102 fConfFile = resources->GetFileName(); // Update the global file name (with path)
103 PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
105 // Get all workers
106 TList *workerList = resources->GetWorkers();
107 if (workerList->GetSize() == 0) {
108 Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
109 return kFALSE;
110 }
112 // check for valid slave lines and claim condor nodes
113 Int_t ord = 0;
115 // Loop over all workers and start them
116 TListIter next(workerList);
117 TObject *to;
118 TProofNodeInfo *worker;
119 int nSlavesDone = 0;
120 while ((to = next())) {
121 // Get the next worker from the list
122 worker = (TProofNodeInfo *)to;
124 // Read back worker node info
125 const Char_t *image = worker->GetImage().Data();
126 const Char_t *workdir = worker->GetWorkDir().Data();
127 Int_t perfidx = worker->GetPerfIndex();
129 gSystem->Sleep(10 /* ms */);
130 TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
131 if (csl) {
132 csl->fPerfIdx = perfidx;
133 csl->fImage = image;
134 csl->fWorkDir = workdir;
136 TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
137 csl->fOrdinal = fullord.Data();
138 claims.Add(csl);
139 ord++;
140 }
142 // Notify claim creation
143 nSlavesDone++;
145 m << TString("Creating COD Claim") << workerList->GetSize()
146 << nSlavesDone << (csl != 0);
149 } // end while (worker loop)
151 // Cleanup
152 delete resources;
153 resources = 0;
154 } // end else (parse config file)
156 Long_t delay = 500; // timer delay 0.5s
157 Int_t ntries = 20; // allow 20 tries (must be > 1 for algorithm to work)
158 Int_t trial = 1;
159 Int_t idx = 0;
161 int nClaims = claims.GetSize();
162 int nClaimsDone = 0;
163 while (claims.GetSize() > 0) {
164 TCondorSlave* c = 0;
166 // Get Condor Slave
167 if (trial == 1) {
168 c = dynamic_cast<TCondorSlave*>(claims.At(idx));
169 } else {
170 TPair *p = dynamic_cast<TPair*>(claims.At(idx));
171 if (p) {
172 TTimer *t = dynamic_cast<TTimer*>(p->Value());
173 if (t) {
174 // wait remaining time
175 Long64_t wait = t->GetAbsTime()-gSystem->Now();
176 if (wait > 0) gSystem->Sleep((UInt_t)wait);
177 c = dynamic_cast<TCondorSlave*>(p->Key());
178 }
179 }
180 }
182 // create slave
183 TSlave *slave = 0;
184 if (c) slave = CreateSlave(Form("%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
185 c->fPerfIdx, c->fImage, c->fWorkDir);
187 // add slave to appropriate list
188 if (trial < ntries) {
189 if (slave && slave->IsValid()) {
190 fSlaves->Add(slave);
191 if (trial == 1) {
192 claims.Remove(c);
193 } else {
194 TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
195 if (p) {
196 TTimer *xt = dynamic_cast<TTimer*>(p->Value());
197 if (xt) delete xt;
198 delete p;
199 }
200 }
201 nClaimsDone++;
203 m << TString("Opening connections to workers") << nClaims
204 << nClaimsDone << kTRUE;
206 } else if (slave) {
207 if (trial == 1) {
208 TTimer* timer = new TTimer(delay);
209 TPair *p = new TPair(c, timer);
210 claims.RemoveAt(idx);
211 claims.AddAt(p, idx);
212 } else {
213 TPair *p = dynamic_cast<TPair*>(claims.At(idx));
214 if (p && p->Value()) {
215 TTimer *xt = dynamic_cast<TTimer*>(p->Value());
216 if (xt) xt->Reset();
217 }
218 }
219 delete slave;
220 idx++;
221 } else {
222 Warning("StartSlaves", "could not create TSlave object!");
223 }
224 } else {
225 if (slave) {
226 fSlaves->Add(slave);
227 TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
228 if (p && p->Value()) {
229 TTimer *xt = dynamic_cast<TTimer*>(p->Value());
230 delete xt;
231 }
232 if (p) delete p;
234 nClaimsDone++;
236 m << TString("Opening connections to workers") << nClaims
237 << nClaimsDone << slave->IsValid();
239 } else {
240 Warning("StartSlaves", "could not create TSlave object!");
241 }
242 }
244 if (idx>=claims.GetSize()) {
245 trial++;
246 idx = 0;
247 }
248 }
250 // Here we finalize the server startup: in this way the bulk
251 // of remote operations are almost parallelized
252 TIter nxsl(fSlaves);
253 TSlave *sl = 0;
254 int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
255 while ((sl = (TSlave *) nxsl())) {
257 // Finalize setup of the server
258 if (sl->IsValid()) {
260 }
262 if (sl->IsValid()) {
263 fAllMonitor->Add(sl->GetSocket());
264 } else {
265 fBadSlaves->Add(sl);
266 }
268 // Notify end of startup operations
269 nSlavesDone++;
271 Bool_t wrkvalid = sl->IsValid() ? kTRUE : kFALSE;
272 m << TString("Setting up worker servers") << nSlavesTotal
273 << nSlavesDone << wrkvalid;
275 }
277 return kTRUE;
281/// Suspend or resume PROOF via Condor.
285 if (fTimer == 0) {
286 fTimer = new TTimer();
287 }
288 if (active) {
289 PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
290 fTimer->Stop();
292 fCondor->Resume();
293 } else {
294#if 1
295 return; // don't suspend for the moment
297 Int_t delay = 60000; // milli seconds
298 PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
299 delay, delay + Long64_t(gSystem->Now()));
300 fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
301 fTimer->Start(10000, kTRUE); // single shot
303 }
307/// Get job Ad
311 TString ad;
313 ad = "JobUniverse = 5\n"; // vanilla
314 ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
315 ad += Form("Iwd = \"%s\"\n", gSystem->TempDirectory());
316 ad += "In = \"/dev/null\"\n";
317 ad += Form("Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
318 ad += Form("Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
319 ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
321 return ad;
