Logo ROOT   6.16/01
Reference Guide
TPacketizerAdaptive.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Jan Iwaszkiewicz 11/12/06
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10*************************************************************************/
11
12/** \class TPacketizerAdaptive
13\ingroup proofkernel
14
15This packetizer is based on TPacketizer but uses different
16load-balancing algorithms and data structures.
17Two main improvements in the load-balancing strategy:
18 - First one was to change the order in which the files are assigned
19 to the computing nodes in such a way that network transfers are
20 evenly distributed in the query time. Transfer of the remote files
21 was often becoming a bottleneck at the end of a query.
22 - The other improvement is the use of time-based packet size. We
23 measure the processing rate of all the nodes and calculate the
24 packet size, so that it takes certain amount of time. In this way
25 packetizer prevents the situation where the query can't finish
26 because of one slow node.
27
28The data structures: TFileStat, TFileNode and TSlaveStat are
29enriched + changed and TFileNode::Compare method is changed.
30
31*/
32
33#include "TPacketizerAdaptive.h"
34
35#include "Riostream.h"
36#include "TDSet.h"
37#include "TError.h"
38#include "TEnv.h"
39#include "TEntryList.h"
40#include "TEventList.h"
41#include "TMap.h"
42#include "TMessage.h"
43#include "TMonitor.h"
44#include "TNtupleD.h"
45#include "TObject.h"
46#include "TParameter.h"
47#include "TPerfStats.h"
48#include "TProofDebug.h"
49#include "TProof.h"
50#include "TProofServ.h"
51#include "TSlave.h"
52#include "TSocket.h"
53#include "TSortedList.h"
54#include "TUrl.h"
55#include "TClass.h"
56#include "TRandom.h"
57#include "TMath.h"
58#include "TObjString.h"
59#include "TList.h"
60
61//
62// The following three utility classes manage the state of the
63// work to be performed and the slaves involved in the process.
64// A list of TFileNode(s) describes the hosts with files, each
65// has a list of TFileStat(s) keeping the state for each TDSet
66// element (file).
67//
68// The list of TSlaveStat(s) keep track of the work (being) done
69// by each slave
70//
71
72
73//------------------------------------------------------------------------------
74
75class TPacketizerAdaptive::TFileStat : public TObject {
76
77private:
78 Bool_t fIsDone; // is this element processed
79 TFileNode *fNode; // my FileNode
80 TDSetElement *fElement; // location of the file and its range
81 Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing
82
83public:
84 TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
85
86 Bool_t IsDone() const {return fIsDone;}
87 Bool_t IsSortable() const { return kTRUE; }
88 void SetDone() {fIsDone = kTRUE;}
89 TFileNode *GetNode() const {return fNode;}
90 TDSetElement *GetElement() const {return fElement;}
91 Long64_t GetNextEntry() const {return fNextEntry;}
92 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
93
94 // This method is used to keep a sorted list of remaining files to be processed
95 Int_t Compare(const TObject* obj) const
96 {
97 // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
98 // and 1 if elem.entries < obj.elem.entries.
99 const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
100 if (fst && GetElement() && fst->GetElement()) {
101 Long64_t ent = GetElement()->GetNum();
102 Long64_t entfst = fst->GetElement()->GetNum();
103 if (ent > 0 && entfst > 0) {
104 if (ent > entfst) {
105 return 1;
106 } else if (ent < entfst) {
107 return -1;
108 } else {
109 return 0;
110 }
111 }
112 }
113 // No info: assume equal (no change in order)
114 return 0;
115 }
116 void Print(Option_t * = 0) const
117 { // Notify file name and entries
118 Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
119 fElement ? fElement->GetNum() : -1);
120 }
121};
122
123TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
124 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
125{
126 // Constructor: add to the global list
127 if (files) files->Add(this);
128}
129
130//------------------------------------------------------------------------------
131
132// a class describing a file node as a part of a session
133class TPacketizerAdaptive::TFileNode : public TObject {
134
135private:
136 TString fNodeName; // FQDN of the node
137 TList *fFiles; // TDSetElements (files) stored on this node
138 TObject *fUnAllocFileNext; // cursor in fFiles
139 TList *fActFiles; // files with work remaining
140 TObject *fActFileNext; // cursor in fActFiles
141 Int_t fMySlaveCnt; // number of slaves running on this node
142 // (which can process remote files)
143 Int_t fExtSlaveCnt; // number of external slaves processing
144 // files on this node
145 Int_t fRunSlaveCnt; // total number of slaves processing files
146 // on this node
147 Long64_t fProcessed; // number of events processed on this node
148 Long64_t fEvents; // number of entries in files on this node
149
150 Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
151
152 TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer)
153
154public:
155 TFileNode(const char *name, Int_t strategy, TSortedList *files);
156 ~TFileNode() { delete fFiles; delete fActFiles; }
157
158 void IncMySlaveCnt() { fMySlaveCnt++; }
159 Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
160 void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
161 void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
162 Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
163 void IncRunSlaveCnt() { fRunSlaveCnt++; }
164 void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
165 Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
166 Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
167 Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
168 Bool_t IsSortable() const { return kTRUE; }
169 Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
170 void IncProcessed(Long64_t nEvents)
171 { fProcessed += nEvents; }
172 Long64_t GetProcessed() const { return fProcessed; }
173 void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
174 // this method is used by Compare() it adds 1, so it returns a number that
175 // would be true if one more slave is added.
176 Long64_t GetEventsLeftPerSlave() const
177 { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178 void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
179 const char *GetName() const { return fNodeName.Data(); }
180 Long64_t GetNEvents() const { return fEvents; }
181
182 void Print(Option_t * = 0) const
183 {
184 TFileStat *fs = 0;
185 TDSetElement *e = 0;
186 Int_t nn = 0;
187 Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188 Printf("+++ TFileNode: %s +++", fNodeName.Data());
189 Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190 Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
191 Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
192 if (fFiles && fFiles->GetSize() > 0) {
193 TIter nxf(fFiles);
194 while ((fs = (TFileStat *) nxf())) {
195 if ((e = fs->GetElement())) {
196 Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
197 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
198 } else {
199 Printf("+++ #%d: no element! ", ++nn);
200 }
201 }
202 }
203 Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
204 if (fActFiles && fActFiles->GetSize() > 0) {
205 TIter nxaf(fActFiles);
206 while ((fs = (TFileStat *) nxaf())) {
207 if ((e = fs->GetElement())) {
208 Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
209 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
210 } else {
211 Printf("+++ #%d: no element! ", ++nn);
212 }
213 }
214 }
215 Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
216 }
217
218 void Add(TDSetElement *elem, Bool_t tolist)
219 {
220 TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
221 TFileStat *f = new TFileStat(this, elem, files);
222 fFiles->Add(f);
223 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
224 }
225
226 TFileStat *GetNextUnAlloc()
227 {
228 TObject *next = fUnAllocFileNext;
229
230 if (next != 0) {
231 // make file active
232 fActFiles->Add(next);
233 if (fActFileNext == 0) fActFileNext = fActFiles->First();
234
235 // move cursor
236 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
237 }
238 return (TFileStat *) next;
239 }
240
241 TFileStat *GetNextActive()
242 {
243 TObject *next = fActFileNext;
244
245 if (fActFileNext != 0) {
246 fActFileNext = fActFiles->After(fActFileNext);
247 if (fActFileNext == 0) fActFileNext = fActFiles->First();
248 }
249
250 return (TFileStat *) next;
251 }
252
253 void RemoveActive(TFileStat *file)
254 {
255 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
256 fActFiles->Remove(file);
258 if (fActFileNext == 0) fActFileNext = fActFiles->First();
259 }
260
261 Int_t Compare(const TObject *other) const
262 {
263 // Must return -1 if this is smaller than obj, 0 if objects are equal
264 // and 1 if this is larger than obj.
265 // smaller means more needing a new worker.
266 // Two cases are considered depending on
267 // relation between harddrive speed and network bandwidth.
268
269 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
270 if (!obj) {
271 Error("Compare", "input is not a TPacketizer::TFileNode object");
272 return 0;
273 }
274
275 // how many more events it has than obj
276
277 if (fStrategy == 1) {
278 // The default adaptive strategy.
279 Int_t myVal = GetRunSlaveCnt();
280 Int_t otherVal = obj->GetRunSlaveCnt();
281 if (myVal < otherVal) {
282 return -1;
283 } else if (myVal > otherVal) {
284 return 1;
285 } else {
286 // if this has more events to process than obj
287 if ((fEvents - fProcessed) >
288 (obj->GetNEvents() - obj->GetProcessed())) {
289 return -1;
290 } else {
291 return 1;
292 }
293 }
294 } else {
295 Int_t myVal = GetSlaveCnt();
296 Int_t otherVal = obj->GetSlaveCnt();
297 if (myVal < otherVal) {
298 return -1;
299 } else if (myVal > otherVal) {
300 return 1;
301 } else {
302 return 0;
303 }
304 }
305 }
306
307 void Reset()
308 {
309 fUnAllocFileNext = fFiles->First();
310 fActFiles->Clear();
311 fActFileNext = 0;
312 fExtSlaveCnt = 0;
313 fMySlaveCnt = 0;
314 fRunSlaveCnt = 0;
315 }
316};
317
318
319TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
320 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
321 fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323 fStrategy(strategy), fFilesToProcess(files)
324{
325 // Constructor
326
327 fFiles->SetOwner();
328 fActFiles->SetOwner(kFALSE);
329}
330
331//------------------------------------------------------------------------------
332
333class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
334
335friend class TPacketizerAdaptive;
336
337private:
338 TFileNode *fFileNode; // corresponding node or 0
339 TFileStat *fCurFile; // file currently being processed
340 TDSetElement *fCurElem; // TDSetElement currently being processed
341 Long64_t fCurProcessed; // events processed in the current file
342 Float_t fCurProcTime; // proc time spent on the current file
343 TList *fDSubSet; // packets processed by this worker
344
345public:
346 TSlaveStat(TSlave *slave);
347 ~TSlaveStat();
348 TFileNode *GetFileNode() const { return fFileNode; }
349 Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
350 Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
351 TFileStat *GetCurFile() { return fCurFile; }
352 void SetFileNode(TFileNode *node) { fFileNode = node; }
353 void UpdateRates(TProofProgressStatus *st);
354 Float_t GetAvgRate() { return fStatus->GetRate(); }
355 Float_t GetCurRate() {
356 return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
357 Int_t GetLocalEventsLeft() {
358 return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
359 TList *GetProcessedSubSet() { return fDSubSet; }
362};
363
364////////////////////////////////////////////////////////////////////////////////
365/// Constructor
366
367TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
370{
371 fDSubSet = new TList();
372 fDSubSet->SetOwner();
373 fSlave = slave;
374 fStatus = new TProofProgressStatus();
375 // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
376 // for non existing names
377 fWrkFQDN = slave->GetName();
378 if (strcmp(slave->ClassName(), "TSlaveLite")) {
379 fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
380 // Get full name for local hosts
381 if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
382 fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
383 }
384 PDB(kPacketizer, 2)
385 Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
386}
387
388////////////////////////////////////////////////////////////////////////////////
389/// Cleanup
390
391TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
392{
393 SafeDelete(fDSubSet);
394 SafeDelete(fStatus);
395}
396
397////////////////////////////////////////////////////////////////////////////////
398/// Update packetizer rates
399
400void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
401{
402 if (!st) {
403 Error("UpdateRates", "no status object!");
404 return;
405 }
406 if (fCurFile->IsDone()) {
407 fCurProcTime = 0;
408 fCurProcessed = 0;
409 } else {
410 fCurProcTime += st->GetProcTime() - GetProcTime();
411 fCurProcessed += st->GetEntries() - GetEntriesProcessed();
412 }
413 fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
414 st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
415 SafeDelete(fStatus);
416 fStatus = st;
417}
418
419////////////////////////////////////////////////////////////////////////////////
420/// Add the current element to the fDSubSet (subset processed by this worker)
421/// and if the status arg is given, then change the size of the packet.
422/// return the difference (*st - *fStatus)
423
424TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
425{
426 if (st && fDSubSet && fCurElem) {
427 if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
428 fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
429 fDSubSet->Add(fCurElem);
430 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
431 return diff;
432 } else {
433 Error("AddProcessed", "processed subset of current elem undefined");
434 return 0;
435 }
436}
437
438//------------------------------------------------------------------------------
439
441
442////////////////////////////////////////////////////////////////////////////////
443/// Constructor
444
447 TList *input, TProofProgressStatus *st)
448 : TVirtualPacketizer(input, st)
449{
450 PDB(kPacketizer,1) Info("TPacketizerAdaptive",
451 "enter (first %lld, num %lld)", first, num);
452
453 // Init pointer members
454 fSlaveStats = 0;
455 fUnAllocated = 0;
456 fActive = 0;
457 fFileNodes = 0;
458 fMaxPerfIdx = 1;
460 fMaxEntriesRatio = 2.;
461
462 fMaxSlaveCnt = -1;
464 fStrategy = 1;
467
468 if (!fProgressStatus) {
469 Error("TPacketizerAdaptive", "No progress status");
470 return;
471 }
472
473 // Attempt to synchronize the packet size with the tree cache size
474 Int_t cpsync = -1;
475 if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
476 // Check if there is a global cache-packet sync setting
477 cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
478 }
479 if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
480
481 // Max file entries to avg allowed ratio for cache-to-packet synchronization
482 // (applies only if fCachePacketSync is true; -1. disables the bound)
483 if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
484 // Check if there is a global ratio setting
485 fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
486 }
487
488 // The possibility to change packetizer strategy to the basic TPacketizer's
489 // one (in which workers always process their local data first).
490 Int_t strategy = -1;
491 if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
492 // Check if there is a global strategy setting
493 strategy = gEnv->GetValue("Packetizer.Strategy", 1);
494 }
495 if (strategy == 0) {
496 fStrategy = 0;
497 Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
498 } else if (strategy != 1) {
499 Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
500 }
501
502 Long_t maxSlaveCnt = 0;
503 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
504 if (maxSlaveCnt < 0) {
505 Info("TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
507 maxSlaveCnt = 0;
508 }
509 } else {
510 // Try also with Int_t (recently supported in TProof::SetParameter)
511 Int_t mxslcnt = -1;
512 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
513 if (mxslcnt < 0) {
514 Info("TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
516 mxslcnt = 0;
517 }
518 maxSlaveCnt = (Long_t) mxslcnt;
519 }
520 }
521
522 if (!maxSlaveCnt)
523 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
524 if (maxSlaveCnt > 0) {
525 fMaxSlaveCnt = maxSlaveCnt;
526 Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
528 }
529
530 // if forceLocal parameter is set to 1 then eliminate the cross-worker
531 // processing;
532 // This minimizes the network usage on the PROOF cluser at the expense of
533 // longer jobs processing times.
534 // To process successfully the session must have workers with all the data!
536 Int_t forceLocal = 0;
537 if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
538 if (forceLocal == 1)
540 else
541 Info("TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
543 }
544
545 // Below we provide a possibility to change the way packet size is
546 // calculated or define the packet time directly.
547 // fPacketAsAFraction can be interpreted as follows:
548 // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
549 // It substitutes 20 in the old formula to calculate the fPacketSize:
550 // fPacketSize = fTotalEntries / (20 * nslaves)
551 Int_t packetAsAFraction = 0;
552 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
553 if (packetAsAFraction > 0) {
554 fPacketAsAFraction = packetAsAFraction;
555 Info("TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
557 packetAsAFraction);
558 } else
559 Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
560 }
561
562 // Packet re-assignement
563 fTryReassign = 0;
564 Int_t tryReassign = 0;
565 if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0)
566 tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0);
567 fTryReassign = tryReassign;
568 if (fTryReassign != 0)
569 Info("TPacketizerAdaptive", "failed packets will be re-assigned");
570
571 // Save the config parameters in the dedicated list so that they will be saved
572 // in the outputlist and therefore in the relevant TQueryResult
573 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
574 fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
575 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
576 fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
577 fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
578 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
579
580 Double_t baseLocalPreference = 1.2;
581 fBaseLocalPreference = (Float_t)baseLocalPreference;
582 if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0)
583 fBaseLocalPreference = (Float_t)baseLocalPreference;
584
585 fFileNodes = new TList;
587 fUnAllocated = new TList;
589 fActive = new TList;
591
592 fValid = kTRUE;
593
594 // Resolve end-point urls to optmize distribution
595 // dset->Lookup(); // moved to TProofPlayerRemote::Process
596
597 // Read list of mounted disks
598 TObjArray *partitions = 0;
599 TString partitionsStr;
600 if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
601 partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
602 if (!partitionsStr.IsNull()) {
603 Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
604 partitions = partitionsStr.Tokenize(",");
605 }
606
607 // Split into per host and disk entries
608 dset->Reset();
610 while ((e = (TDSetElement*)dset->Next())) {
611
612 if (e->GetValid()) continue;
613
614 // The dataset name, if any
615 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
616 fDataSet = e->GetDataSet();
617
618 TUrl url = e->GetFileName();
619 PDB(kPacketizer,2)
620 Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
621
622 // Map non URL filenames to dummy host
623 TString host;
624 if ( !url.IsValid() ||
625 (strncmp(url.GetProtocol(),"root", 4) &&
626 strncmp(url.GetProtocol(),"rfio", 4) &&
627 strncmp(url.GetProtocol(),"file", 4)) ) {
628 host = "no-host";
629 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
630 host = "localhost";
631 url.SetProtocol("root");
632 } else {
633 host = url.GetHostFQDN();
634 }
635 // Get full name for local hosts
636 if (host.Contains("localhost") || host == "127.0.0.1") {
637 url.SetHost(gSystem->HostName());
638 host = url.GetHostFQDN();
639 }
640
641 // Find on which disk is the file, if any
642 TString disk;
643 if (partitions) {
644 TIter iString(partitions);
645 TObjString* os = 0;
646 while ((os = (TObjString *)iString())) {
647 // Compare begining of the url with disk mountpoint
648 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
649 disk = os->GetName();
650 break;
651 }
652 }
653 }
654 // Node's url
655 TString nodeStr;
656 if (disk.IsNull())
657 nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
658 else
659 nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
660 TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
661
662 if (node == 0) {
663 node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
664 fFileNodes->Add(node);
665 PDB(kPacketizer,2)
666 Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
667 } else {
668 PDB(kPacketizer,2)
669 Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
670 }
671
672 node->Add(e, kFALSE);
673 }
674
675 fSlaveStats = new TMap;
677
678 TSlave *slave;
679 TIter si(slaves);
680 while ((slave = (TSlave*) si.Next())) {
681 fSlaveStats->Add( slave, new TSlaveStat(slave) );
682 fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
683 slave->GetPerfIdx() : fMaxPerfIdx;
684 }
685
686 // Setup file & filenode structure
687 Reset();
688 // Optimize the number of files to be open when running on subsample
689 Int_t validateMode = 0;
690 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
691 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
692 if (num > -1)
693 PDB(kPacketizer,2)
694 Info("TPacketizerAdaptive",
695 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
696 ValidateFiles(dset, slaves, num, byfile);
697
698
699 if (!fValid) return;
700
701 // apply global range (first,num) to dset and rebuild structure
702 // ommitting TDSet elements that are not needed
703
704 Int_t files = 0;
705 fTotalEntries = 0;
706 fUnAllocated->Clear(); // avoid dangling pointers
707 fActive->Clear();
708 fFileNodes->Clear(); // then delete all objects
709 PDB(kPacketizer,2)
710 Info("TPacketizerAdaptive",
711 "processing range: first %lld, num %lld", first, num);
712
713 dset->Reset();
714 Long64_t cur = 0;
715 while (( e = (TDSetElement*)dset->Next())) {
716
717 // Skip invalid or missing file; It will be moved
718 // from the dset to the 'MissingFiles' list in the player.
719 if (!e->GetValid()) continue;
720
721 TUrl url = e->GetFileName();
722 Long64_t eFirst = e->GetFirst();
723 Long64_t eNum = e->GetNum();
724 PDB(kPacketizer,2)
725 Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
726 PDB(kPacketizer,2)
727 Info("TPacketizerAdaptive",
728 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
729
730 if (!e->GetEntryList()) {
731 // This element is before the start of the global range, skip it
732 if (cur + eNum < first) {
733 cur += eNum;
734 PDB(kPacketizer,2)
735 Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
736 continue;
737 }
738
739 // This element is after the end of the global range, skip it
740 if (num != -1 && (first+num <= cur)) {
741 cur += eNum;
742 PDB(kPacketizer,2)
743 Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
744 continue; // break ??
745 }
746
747 Bool_t inRange = kFALSE;
748 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
749
750 if (cur <= first) {
751 // If this element contains the start of the global range
752 // adjust its start and number of entries
753 e->SetFirst( eFirst + (first - cur) );
754 e->SetNum( e->GetNum() - (first - cur) );
755 PDB(kPacketizer,2)
756 Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
757 eFirst + (first - cur), first + num - cur);
758 inRange = kTRUE;
759 }
760 if (num != -1 && (first+num <= cur+eNum)) {
761 // If this element contains the end of the global range
762 // adjust its number of entries
763 e->SetNum( first + num - e->GetFirst() - cur );
764 PDB(kPacketizer,2)
765 Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
766 inRange = kTRUE;
767 }
768
769 } else {
770 // Increment the counter ...
771 PDB(kPacketizer,2)
772 Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
773 cur += eNum;
774 }
775 // Re-adjust eNum and cur, if needed
776 if (inRange) {
777 cur += eNum;
778 eNum = e->GetNum();
779 }
780
781 } else {
782 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
783 if (enl) {
784 eNum = enl->GetN();
785 PDB(kPacketizer,2)
786 Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
787 } else {
788 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
789 eNum = evl ? evl->GetN() : eNum;
790 PDB(kPacketizer,2)
791 Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
792 }
793 if (!eNum) {
794 PDB(kPacketizer,2)
795 Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
796 continue;
797 }
798 }
799 PDB(kPacketizer,2)
800 Info("TPacketizerAdaptive", " --> next cur %lld", cur);
801
802 // Map non URL filenames to dummy host
803 TString host;
804 if ( !url.IsValid() ||
805 (strncmp(url.GetProtocol(),"root", 4) &&
806 strncmp(url.GetProtocol(),"rfio", 4) &&
807 strncmp(url.GetProtocol(),"file", 4)) ) {
808 host = "no-host";
809 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
810 host = "localhost";
811 url.SetProtocol("root");
812 } else {
813 host = url.GetHostFQDN();
814 }
815 // Get full name for local hosts
816 if (host.Contains("localhost") || host == "127.0.0.1") {
817 url.SetHost(gSystem->HostName());
818 host = url.GetHostFQDN();
819 }
820
821 // Find, on which disk is the file
822 TString disk;
823 if (partitions) {
824 TIter iString(partitions);
825 TObjString* os = 0;
826 while ((os = (TObjString *)iString())) {
827 // Compare begining of the url with disk mountpoint
828 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
829 disk = os->GetName();
830 break;
831 }
832 }
833 }
834 // Node's url
835 TString nodeStr;
836 if (disk.IsNull())
837 nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
838 else
839 nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
840 TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
841
842
843 if (node == 0) {
844 node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
845 fFileNodes->Add( node );
846 PDB(kPacketizer, 2)
847 Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
848 } else {
849 PDB(kPacketizer, 2)
850 Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
851 }
852
853 ++files;
854 fTotalEntries += eNum;
855 node->Add(e, kTRUE);
856 node->IncEvents(eNum);
857 PDB(kPacketizer,2) e->Print("a");
858 }
859 PDB(kPacketizer,1)
860 Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
861 fTotalEntries, files, fFileNodes->GetSize());
862
863 // Set the total number for monitoring
864 if (gPerfStats)
865 gPerfStats->SetNumEvents(fTotalEntries);
866
867 Reset();
868
869 InitStats();
870
871 if (!fValid)
873
874 PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
875}
876
877////////////////////////////////////////////////////////////////////////////////
878/// Destructor.
879
881{
882 if (fSlaveStats) {
884 }
885
891}
892
893////////////////////////////////////////////////////////////////////////////////
894/// (re)initialise the statistics
895/// called at the begining or after a worker dies.
896
898{
899 // calculating how many files from TDSet are not cached on
900 // any slave
901 Int_t noRemoteFiles = 0;
903 Int_t totalNumberOfFiles = 0;
904 TIter next(fFileNodes);
905 while (TFileNode *fn = (TFileNode*)next()) {
906 totalNumberOfFiles += fn->GetNumberOfFiles();
907 if (fn->GetMySlaveCnt() == 0) {
908 noRemoteFiles += fn->GetNumberOfFiles();
909 fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
910 }
911 }
912
913 if (totalNumberOfFiles == 0) {
914 Info("InitStats", "no valid or non-empty file found: setting invalid");
915 // No valid files: set invalid and return
916 fValid = kFALSE;
917 return;
918 }
919
920 fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
921 Info("InitStats",
922 "fraction of remote files %f", fFractionOfRemoteFiles);
923
924 if (!fValid)
926
927 PDB(kPacketizer,1) Info("InitStats", "return");
928}
929
930////////////////////////////////////////////////////////////////////////////////
931/// Get next unallocated file from 'node' or other nodes:
932/// First try 'node'. If there is no more files, keep trying to
933/// find an unallocated file on other nodes.
934
935TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
936{
937 TFileStat *file = 0;
938
939 if (node != 0) {
940 PDB(kPacketizer, 2)
941 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
942 file = node->GetNextUnAlloc();
943 if (file == 0) RemoveUnAllocNode(node);
944 } else {
945 if (nodeHostName && strlen(nodeHostName) > 0) {
946
947 TFileNode *fn;
948 // Make sure that they are in the corrected order
950 PDB(kPacketizer,2) fUnAllocated->Print();
951
952 // Loop over unallocated fileNode list
953 for (int i = 0; i < fUnAllocated->GetSize(); i++) {
954
955 if ((fn = (TFileNode *) fUnAllocated->At(i))) {
956 TUrl uu(fn->GetName());
957 PDB(kPacketizer, 2)
958 Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
959
960 // Check, whether node's hostname is matching with current fileNode (fn)
961 if (!strcmp(nodeHostName, uu.GetHost())) {
962 node = fn;
963
964 // Fetch next unallocated file from this node
965 if ((file = node->GetNextUnAlloc()) == 0) {
966 RemoveUnAllocNode(node);
967 node = 0;
968 } else {
969 PDB(kPacketizer, 2)
970 Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
971 break;
972 }
973 }
974 } else {
975 Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
976 }
977 }
978
979 if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
980 // Unlike in TPacketizer we look at the number of ext slaves only.
981 PDB(kPacketizer,1)
982 Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
983 node = 0;
984 }
985 }
986
987 if (node == 0) {
988 while (file == 0 && ((node = NextNode()) != 0)) {
989 PDB(kPacketizer, 2)
990 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
991 if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
992 }
993 }
994 }
995
996 if (file != 0) {
997 // if needed make node active
998 if (fActive->FindObject(node) == 0) {
999 fActive->Add(node);
1000 }
1001 }
1002
1003 PDB(kPacketizer, 2) {
1004 if (!file) {
1005 Info("GetNextUnAlloc", "no file found!");
1006 } else {
1007 file->Print();
1008 }
1009 }
1010
1011 return file;
1012}
1013
1014////////////////////////////////////////////////////////////////////////////////
1015/// Get next node which has unallocated files.
1016/// the order is determined by TFileNode::Compare
1017
1018TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
1019{
1020 fUnAllocated->Sort();
1021 PDB(kPacketizer,2) {
1023 }
1024
1025 TFileNode *fn = (TFileNode*) fUnAllocated->First();
1026 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1027 // unlike in TPacketizer we look at the number of ext slaves only.
1028 PDB(kPacketizer,1)
1029 Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1030 fn = 0;
1031 }
1032
1033 return fn;
1034}
1035
1036////////////////////////////////////////////////////////////////////////////////
1037/// Remove unallocated node.
1038
1040{
1041 fUnAllocated->Remove(node);
1042}
1043
1044////////////////////////////////////////////////////////////////////////////////
1045/// Get next active file.
1046
1047TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
1048{
1049 TFileNode *node;
1050 TFileStat *file = 0;
1051
1052 while (file == 0 && ((node = NextActiveNode()) != 0)) {
1053 file = node->GetNextActive();
1054 if (file == 0) RemoveActiveNode(node);
1055 }
1056
1057 return file;
1058}
1059
1060
1061////////////////////////////////////////////////////////////////////////////////
1062/// Get next active node.
1063
1064TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
1065{
1066 fActive->Sort();
1067 PDB(kPacketizer,2) {
1068 Info("NextActiveNode", "enter");
1069 fActive->Print();
1070 }
1071
1072 TFileNode *fn = (TFileNode*) fActive->First();
1073 // look at only ext slaves
1074 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1075 PDB(kPacketizer,1)
1076 Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1077 fn = 0;
1078 }
1079
1080 return fn;
1081}
1082
1083////////////////////////////////////////////////////////////////////////////////
1084/// Remove file from the list of actives.
1085
1087{
1088 TFileNode *node = file->GetNode();
1089
1090 node->RemoveActive(file);
1091 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1092}
1093
1094////////////////////////////////////////////////////////////////////////////////
1095/// Remove node from the list of actives.
1096
1098{
1099 fActive->Remove(node);
1100}
1101
1102////////////////////////////////////////////////////////////////////////////////
1103/// Reset the internal data structure for packet distribution.
1104
1106{
1109
1110 fActive->Clear();
1111
1112 TIter files(fFileNodes);
1113 TFileNode *fn;
1114 while ((fn = (TFileNode*) files.Next()) != 0) {
1115 fn->Reset();
1116 }
1117
1118 TIter slaves(fSlaveStats);
1119 TObject *key;
1120 while ((key = slaves.Next()) != 0) {
1121 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1122 if (!slstat) {
1123 Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
1124 continue;
1125 }
1126 // Find out which file nodes are on the worker machine and assign the
1127 // one with less workers assigned
1128 TFileNode *fnmin = 0;
1129 Int_t fncnt = fSlaveStats->GetSize();
1130 files.Reset();
1131 while ((fn = (TFileNode*) files.Next()) != 0) {
1132 if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1133 if (fn->GetMySlaveCnt() < fncnt) {
1134 fnmin = fn;
1135 fncnt = fn->GetMySlaveCnt();
1136 }
1137 }
1138 }
1139 if (fnmin != 0 ) {
1140 slstat->SetFileNode(fnmin);
1141 fnmin->IncMySlaveCnt();
1142 PDB(kPacketizer, 2)
1143 Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1144 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1145 }
1146 slstat->fCurFile = 0;
1147 }
1148}
1149
1150////////////////////////////////////////////////////////////////////////////////
1151/// Check existence of file/dir/tree an get number of entries.
1152/// Assumes the files have been setup.
1153
1155 Long64_t maxent, Bool_t byfile)
1156{
1157 TMap slaves_by_sock;
1158 TMonitor mon;
1159 TList workers;
1160
1161
1162 // Setup the communication infrastructure
1163
1164 workers.AddAll(slaves);
1165 TIter si(slaves);
1166 TSlave *slm;
1167 while ((slm = (TSlave*)si.Next()) != 0) {
1168 PDB(kPacketizer,3)
1169 Info("ValidateFiles","socket added to monitor: %p (%s)",
1170 slm->GetSocket(), slm->GetName());
1171 mon.Add(slm->GetSocket());
1172 slaves_by_sock.Add(slm->GetSocket(), slm);
1173 }
1174
1175 mon.DeActivateAll();
1176
1177 ((TProof*)gProof)->DeActivateAsyncInput();
1178
1179 // Some monitoring systems (TXSocketHandler) need to know this
1180 ((TProof*)gProof)->fCurrentMonitor = &mon;
1181
1182 // Identify the type
1183 if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1184
1185 // Preparing for client notification
1186 TString msg("Validating files");
1187 UInt_t n = 0;
1188 UInt_t tot = dset->GetListOfElements()->GetSize();
1189 Bool_t st = kTRUE;
1190
1191 Long64_t totent = 0, nopenf = 0;
1192 while (kTRUE) {
1193
1194 // send work
1195 while (TSlave *s = (TSlave *)workers.First()) {
1196
1197 workers.Remove(s);
1198
1199 // find a file
1200
1201 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1202 if (!slstat) {
1203 Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
1204 continue;
1205 }
1206
1207 TFileNode *node = 0;
1208 TFileStat *file = 0;
1209
1210 // try its own node first
1211 if ((node = slstat->GetFileNode()) != 0) {
1212 PDB(kPacketizer,3) node->Print();
1213 file = GetNextUnAlloc(node);
1214 if (file == 0)
1215 slstat->SetFileNode(0);
1216 }
1217
1218 // look for a file on any other node if necessary
1219 if (file == 0)
1220 file = GetNextUnAlloc();
1221
1222 if (file != 0) {
1223 // files are done right away
1225
1226 slstat->fCurFile = file;
1227 TDSetElement *elem = file->GetElement();
1228 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1229 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1230 // This is decremented when we get the reply
1231 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1233 m << dset->IsTree()
1234 << TString(elem->GetFileName())
1235 << TString(elem->GetDirectory())
1236 << TString(elem->GetObjName());
1237
1238 s->GetSocket()->Send( m );
1239 mon.Activate(s->GetSocket());
1240 PDB(kPacketizer,2)
1241 Info("ValidateFiles",
1242 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1243 s->GetOrdinal(), s->GetName(), s->GetSocket(),
1244 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1245 elem->GetDirectory(), elem->GetObjName());
1246 } else {
1247 // Fill the info
1248 elem->SetTDSetOffset(entries);
1249 if (entries > 0) {
1250 // Most likely valid
1251 elem->SetValid();
1252 if (!elem->GetEntryList()) {
1253 if (elem->GetFirst() > entries) {
1254 Error("ValidateFiles",
1255 "first (%lld) higher then number of entries (%lld) in %s",
1256 elem->GetFirst(), entries, elem->GetFileName());
1257 // disable element
1258 slstat->fCurFile->SetDone();
1259 elem->Invalidate();
1261 }
1262 if (elem->GetNum() == -1) {
1263 elem->SetNum(entries - elem->GetFirst());
1264 } else if (elem->GetFirst() + elem->GetNum() > entries) {
1265 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
1266 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1267 entries, elem->GetFileName());
1268 elem->SetNum(entries - elem->GetFirst());
1269 }
1270 PDB(kPacketizer,2)
1271 Info("ValidateFiles",
1272 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1273 }
1274 }
1275 // Count
1276 totent += entries;
1277 nopenf++;
1278 // Notify the client
1279 n++;
1280 gProof->SendDataSetStatus(msg, n, tot, st);
1281
1282 // This worker is ready for the next validation
1283 workers.Add(s);
1284 }
1285 }
1286 }
1287
1288 // Check if there is anything to wait for
1289 if (mon.GetActive() == 0) {
1290 if (byfile && maxent > 0) {
1291 // How many files do we still need ?
1292 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1293 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1294 if (nrestf > 0) {
1295 PDB(kPacketizer,3)
1296 Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
1297 maxent, totent, nopenf, nrestf);
1298 si.Reset();
1299 while ((slm = (TSlave *) si.Next()) && nrestf--) {
1300 workers.Add(slm);
1301 }
1302 continue;
1303 } else {
1304 PDB(kPacketizer,3)
1305 Info("ValidateFiles", "no need to validate more files");
1306 break;
1307 }
1308 } else {
1309 break;
1310 }
1311 }
1312
1313 PDB(kPacketizer,3) {
1314 Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
1315 TList *act = mon.GetListOfActives();
1316 TIter next(act);
1317 while (TSocket *s = (TSocket*) next()) {
1318 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1319 if (sl)
1320 Info("ValidateFiles", " worker-%s (%s)",
1321 sl->GetOrdinal(), sl->GetName());
1322 }
1323 delete act;
1324 }
1325
1326 TSocket *sock = mon.Select();
1327 // If we have been interrupted break
1328 if (!sock) {
1329 Error("ValidateFiles", "selection has been interrupted - STOP");
1330 mon.DeActivateAll();
1331 fValid = kFALSE;
1332 break;
1333 }
1334 mon.DeActivate(sock);
1335
1336 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
1337
1338 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1339 if (!sock->IsValid()) {
1340 // A socket got invalid during validation
1341 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
1342 slave->GetOrdinal(), slave->GetName());
1343 ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1344 fValid = kFALSE;
1345 break;
1346 }
1347
1348 TMessage *reply;
1349
1350 if (sock->Recv(reply) <= 0) {
1351 // Notify
1352 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
1353 slave->GetOrdinal(), slave->GetName());
1354 // Help! lost a slave? ('slave' is deleted inside here ...)
1355 ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
1356 fValid = kFALSE;
1357 continue;
1358 }
1359
1360 if (reply->What() != kPROOF_GETENTRIES) {
1361 // Not what we want: handover processing to the central machinery
1362 Int_t what = reply->What();
1363 ((TProof*)gProof)->HandleInputMessage(slave, reply);
1364 if (what == kPROOF_FATAL) {
1365 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
1366 slave->GetOrdinal(), slave->GetName());
1367 fValid = kFALSE;
1368 } else {
1369 // Reactivate the socket
1370 mon.Activate(sock);
1371 }
1372 // Get next message
1373 continue;
1374 }
1375
1376 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1377 TDSetElement *e = slavestat->fCurFile->GetElement();
1378 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1379 Long64_t entries;
1380
1381 (*reply) >> entries;
1382
1383 // Extract object name, if there
1384 if ((reply->BufferSize() > reply->Length())) {
1385 TString objname;
1386 (*reply) >> objname;
1387 e->SetTitle(objname);
1388 }
1389
1390 e->SetTDSetOffset(entries);
1391 if (entries > 0) {
1392
1393 // This dataset element is most likely valid
1394 e->SetValid();
1395
1396 if (!e->GetEntryList()) {
1397 if (e->GetFirst() > entries) {
1398 Error("ValidateFiles",
1399 "first (%lld) higher then number of entries (%lld) in %s",
1400 e->GetFirst(), entries, e->GetFileName());
1401
1402 // Invalidate the element
1403 slavestat->fCurFile->SetDone();
1404 e->Invalidate();
1406 }
1407
1408 if (e->GetNum() == -1) {
1409 e->SetNum(entries - e->GetFirst());
1410 } else if (e->GetFirst() + e->GetNum() > entries) {
1411 Error("ValidateFiles",
1412 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1413 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1414 e->SetNum(entries - e->GetFirst());
1415 }
1416 }
1417
1418 // Count
1419 totent += entries;
1420 nopenf++;
1421
1422 // Notify the client
1423 n++;
1424 gProof->SendDataSetStatus(msg, n, tot, st);
1425
1426 } else {
1427
1428 Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
1429 //
1430 // Need to fix this with a user option to allow incomplete file sets (rdm)
1431 //
1432 //fValid = kFALSE; // all element must be readable!
1433 if (gProofServ) {
1435 m << TString(Form("Cannot get entries for file: %s - skipping",
1436 e->GetFileName()));
1438 }
1439
1440 // invalidate element
1441 e->Invalidate();
1443 }
1444 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1445
1446 // Ready for the next job, unless we have enough files
1447 if (maxent < 0 || ((totent < maxent) && !byfile))
1448 workers.Add(slave);
1449 }
1450
1451 // report std. output from slaves??
1452
1453 ((TProof*)gProof)->ActivateAsyncInput();
1454
1455 // This needs to be reset
1456 ((TProof*)gProof)->fCurrentMonitor = 0;
1457
1458 // No reason to continue if invalid
1459 if (!fValid)
1460 return;
1461
1462 // compute the offset for each file element
1463 Long64_t offset = 0;
1464 Long64_t newOffset = 0;
1465 TIter next(dset->GetListOfElements());
1466 TDSetElement *el;
1467 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1468 if (el->GetValid()) {
1469 newOffset = offset + el->GetTDSetOffset();
1470 el->SetTDSetOffset(offset);
1471 offset = newOffset;
1472 }
1473 }
1474}
1475
1476////////////////////////////////////////////////////////////////////////////////
1477/// The result depends on the fStrategy
1478
1480{
1481 Long64_t num;
1482 if (fStrategy == 0) {
1483 // TPacketizer's heuristic for starting packet size
1484 // Constant packet size;
1485 Int_t nslaves = fSlaveStats->GetSize();
1486 if (nslaves > 0) {
1487 num = fTotalEntries / (fPacketAsAFraction * nslaves);
1488 } else {
1489 num = 1;
1490 }
1491 } else {
1492 // The dynamic heuristic for setting the packet size (default)
1493 // Calculates the packet size based on performance of this slave
1494 // and estimated time left until the end of the query.
1495 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1496 Float_t rate = slstat->GetCurRate();
1497 if (!rate)
1498 rate = slstat->GetAvgRate();
1499 if (rate) {
1500
1501 // Global average rate
1503 Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1504
1505 // Bytes-to-Event conversion
1506 Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1507
1508 // Make sure it is not smaller then the cache, if the info is available and the size
1509 // synchronization is required. But apply the cache-packet size synchronization only if there
1510 // are enough left files to process and the files are all of similar sizes. Otherwise we risk
1511 // to not exploit optimally all potentially active workers.
1512 Bool_t cpsync = fCachePacketSync;
1513 if (fMaxEntriesRatio > 0. && cpsync) {
1516 Long64_t maxEntries = -1;
1517 if (fFilesToProcess->Last()) {
1518 TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
1519 if (elem) maxEntries = elem->GetNum();
1520 }
1521 if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1522 PDB(kPacketizer,3) {
1523 Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1524 Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1525 slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1526 (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1527 }
1528 cpsync = kFALSE;
1529 }
1530 }
1531 }
1532 if (bevt > 0. && cachesz > 0 && cpsync) {
1533 if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1534 packetTime = cachesz / bevt / rate;
1535 }
1536
1537 // Apply min-max again, if required
1538 if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1539 if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1540
1541 // Translate the packet length in number of entries
1542 num = (Long64_t)(rate * packetTime);
1543
1544 // Notify
1545 PDB(kPacketizer,2)
1546 Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1547 slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1548 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1549
1550 } else {
1551 // First packet for this worker in this query
1552 // Twice the learning phase
1553 num = (learnent > 0) ? 5 * learnent : 1000;
1554
1555 // Notify
1556 PDB(kPacketizer,2)
1557 Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1558 }
1559 }
1560 if (num < 1) num = 1;
1561 return num;
1562}
1563
1564////////////////////////////////////////////////////////////////////////////////
1565/// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
1566/// message (when the worker was asked to stop processing during a packet).
1567/// returns the #entries intended in the last packet - #processed entries
1568
1570 TProofProgressStatus *status,
1571 Double_t latency,
1572 TList **listOfMissingFiles)
1573{
1574 // find slave
1575 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1576 if (!slstat) {
1577 Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!",
1578 (sl ? sl->GetOrdinal() : "x.x"),
1579 (sl ? sl->GetName() : "**undef**"));
1580 return -1;
1581 }
1582
1583 // update stats & free old element
1584
1585 if ( slstat->fCurElem != 0 ) {
1586 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1587 // Calculate the number of events processed in the last packet
1588 Long64_t numev;
1589 if (status && status->GetEntries() > 0)
1590 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1591 else
1592 numev = 0;
1593
1594 // Calculate the progress made in the last packet
1595 TProofProgressStatus *progress = 0;
1596 if (numev > 0) {
1597 // This also moves the pointer in the corrsponding TFileInfo
1598 progress = slstat->AddProcessed(status);
1599 if (progress) {
1600 (*fProgressStatus) += *progress;
1601 // update processing rate
1602 slstat->UpdateRates(status);
1603 }
1604 } else {
1605 progress = new TProofProgressStatus();
1606 }
1607 if (progress) {
1608 PDB(kPacketizer,2)
1609 Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1610 sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1611 progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1612
1613 if (gPerfStats)
1614 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1615 slstat->fCurElem->GetFileName(),
1616 progress->GetEntries(),
1617 latency,
1618 progress->GetProcTime(),
1619 progress->GetCPUTime(),
1620 progress->GetBytesRead());
1621 delete progress;
1622 }
1623 if (numev != expectedNumEv) {
1624 // The last packet was not fully processed
1625 // and will be split in two:
1626 // - The completed part was marked as done.
1627 // - Create a new packet with the part to be resubmitted.
1628 TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
1629 if (newPacket && numev < expectedNumEv) {
1630 Long64_t first = newPacket->GetFirst();
1631 newPacket->SetFirst(first + numev);
1632 if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1633 SafeDelete(newPacket);
1634 } else
1635 Error("AddProcessed", "%s: processed too much? (%lld, %lld)",
1636 sl->GetOrdinal(), numev, expectedNumEv);
1637
1638 // TODO: a signal handler which will send info from the worker
1639 // after a packet fails.
1640 /* Add it to the failed packets list.
1641 if (!fFailedPackets) {
1642 fFailedPackets = new TList();
1643 }
1644 fFailedPackets->Add(slstat->fCurElem);
1645 */
1646 }
1647
1648 slstat->fCurElem = 0;
1649 return (expectedNumEv - numev);
1650 } else {
1651 // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
1652 // as the reply to kPROOF_GETNEXTPACKET
1653 return -1;
1654 }
1655}
1656
1657////////////////////////////////////////////////////////////////////////////////
1658/// Get next packet;
1659/// A meaningfull difference to TPacketizer is the fact that this
1660/// packetizer, for each worker, tries to predict whether the worker
1661/// will finish processing it's local files before the end of the query.
1662/// If yes, it allocates, to those workers, files from non-slave filenodes
1663/// or from slaves that are overloaded. The check is done every time a new
1664/// file needs to be assigned.
1665
1667{
1668 if ( !fValid ) {
1669 return 0;
1670 }
1671
1672 // find slave
1673
1674 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1675 if (!slstat) {
1676 Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
1677 (sl ? sl->GetName() : "**undef**"));
1678 return 0;
1679 }
1680
1681 // Attach to current file
1682 TFileStat *file = slstat->fCurFile;
1683
1684 // Update stats & free old element
1685
1686 Bool_t firstPacket = kFALSE;
1687 Long64_t cachesz = -1;
1688 Int_t learnent = -1;
1689 if ( slstat->fCurElem != 0 ) {
1690
1691 Long64_t restEntries = 0;
1692 Double_t latency, proctime, proccpu;
1693 TProofProgressStatus *status = 0;
1694 Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1695
1696 if (sl->GetProtocol() > 18) {
1697
1698 (*r) >> latency;
1699 (*r) >> status;
1700
1701 if (sl->GetProtocol() > 25) {
1702 (*r) >> cachesz >> learnent;
1703 if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1704 }
1705 fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1706 fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1707
1708 } else {
1709
1710 Long64_t bytesRead = -1;
1711
1712 (*r) >> latency >> proctime >> proccpu;
1713 // only read new info if available
1714 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1715 if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1716 Long64_t totev = 0;
1717 if (r->BufferSize() > r->Length()) (*r) >> totev;
1718
1719 status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1720 fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1721 }
1722
1723 if (!fileNotOpen && !fileCorrupted) {
1724 if (AddProcessed(sl, status, latency) != 0)
1725 Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal());
1728 Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)",
1730 // Send last timer message and stop the timer
1731 HandleTimer(0);
1733 }
1734 } else {
1735 if (file) {
1736 if (file->GetElement()) {
1737 if (fileCorrupted) {
1738 Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)",
1739 sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1740 Int_t nunproc = AddProcessed(sl, status, latency);
1741 PDB(kPacketizer,1)
1742 Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1743 // Remaining to be processed
1744 Long64_t num = 0;
1745 if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1746 // Add the remainign entries in the packet to the ones already registered
1747 num = file->GetElement()->GetEntries() + restEntries;
1748 } else {
1749 // First call: add the remaining entries in the packet and those of the file
1750 // not yet assigned
1751 Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1752 num = restEntries + rest;
1753 }
1754 file->GetElement()->SetEntries(num);
1755 PDB(kPacketizer,1)
1756 Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1757 file->GetElement()->GetName(), file->GetElement()->GetEntries());
1758 // Flag as corrupted
1759 file->GetElement()->SetBit(TDSetElement::kCorrupted);
1760 } else {
1761 Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element",
1762 sl->GetOrdinal(), file->GetElement()->GetName());
1763 }
1764 // Invalidate the element
1765 file->GetElement()->Invalidate();
1766 // Add it to the failed packets list
1767 if (!fFailedPackets) fFailedPackets = new TList();
1768 if (!fFailedPackets->FindObject(file->GetElement()))
1769 fFailedPackets->Add(file->GetElement());
1770 }
1771 // Deactivate this TFileStat
1772 file->SetDone();
1774 } else {
1775 Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:"
1776 " protocol error?", sl->GetOrdinal());
1777 }
1778 }
1779 } else {
1780 firstPacket = kTRUE;
1781 }
1782
1783 if ( fStop ) {
1784 HandleTimer(0);
1785 return 0;
1786 }
1787
1788 TString nodeName;
1789 if (file != 0) nodeName = file->GetNode()->GetName();
1790 TString nodeHostName(slstat->GetName());
1791
1792 PDB(kPacketizer,3)
1793 Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1794 sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1795
1796 // If current file is just finished
1797 if ( file != 0 && file->IsDone() ) {
1798 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1799 file->GetNode()->DecRunSlaveCnt();
1800 if (gPerfStats)
1801 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1802 file->GetElement()->GetFileName(), kFALSE);
1803 file = 0;
1804 }
1805 // Reset the current file field
1806 slstat->fCurFile = file;
1807
1808 Long64_t avgEventsLeftPerSlave =
1811 return 0;
1812 // Get a file if needed
1813 if ( file == 0) {
1814 // Needs a new file
1815 Bool_t openLocal;
1816 // Aiming for localPreference == 1 when #local == #remote events left
1817 Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1819 if ( slstat->GetFileNode() != 0 ) {
1820 // Local file node exists and has more events to process.
1821 fUnAllocated->Sort();
1822 TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1823 Bool_t nonLocalNodePossible;
1824 if (fForceLocal)
1825 nonLocalNodePossible = 0;
1826 else
1827 nonLocalNodePossible = firstNonLocalNode ?
1828 (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1829 : 0;
1830 openLocal = !nonLocalNodePossible;
1831 Float_t slaveRate = slstat->GetAvgRate();
1832 if ( nonLocalNodePossible && fStrategy == 1) {
1833 // OpenLocal is set to kFALSE
1834 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1835 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1836 // External slaves help slstat -> don't open nonlocal files
1837 // -1 because, at this point slstat is not running
1838 openLocal = kTRUE;
1839 else if ( slaveRate == 0 ) { // first file for this slave
1840 // GetLocalEventsLeft() counts the potential slave
1841 // as running on its fileNode.
1842 if ( slstat->GetLocalEventsLeft() * localPreference
1843 > (avgEventsLeftPerSlave))
1844 openLocal = kTRUE;
1845 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1846 < slstat->GetLocalEventsLeft() * localPreference )
1847 openLocal = kTRUE;
1848 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1849 openLocal = kTRUE;
1850 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1851 openLocal = kTRUE;
1852 } else {
1853 // At this point slstat has a non zero avg rate > 0
1854 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1855 // And thus fCumProcTime, fProcessed > 0
1856 Float_t avgTime = avgEventsLeftPerSlave
1858 if (slaveTime * localPreference > avgTime)
1859 openLocal = kTRUE;
1860 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1861 < slstat->GetLocalEventsLeft() * localPreference)
1862 openLocal = kTRUE;
1863 }
1864 }
1865 if (openLocal || fStrategy == 0) {
1866 // Try its own node
1867 file = slstat->GetFileNode()->GetNextUnAlloc();
1868 if (!file)
1869 file = slstat->GetFileNode()->GetNextActive();
1870 if ( file == 0 ) {
1871 // No more files on this worker
1872 slstat->SetFileNode(0);
1873 }
1874 }
1875 }
1876
1877 // Try to find an unused filenode first
1878 if(file == 0 && !fForceLocal)
1879 file = GetNextUnAlloc(0, nodeHostName);
1880
1881 // Then look at the active filenodes
1882 if(file == 0 && !fForceLocal)
1883 file = GetNextActive();
1884
1885 if (file == 0) return 0;
1886
1887 PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1888
1889 slstat->fCurFile = file;
1890 // if remote and unallocated file
1891 if (file->GetNode()->GetMySlaveCnt() == 0 &&
1892 file->GetElement()->GetFirst() == file->GetNextEntry()) {
1893 fNEventsOnRemLoc -= file->GetElement()->GetNum();
1894 if (fNEventsOnRemLoc < 0) {
1895 Error("GetNextPacket",
1896 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1898 return 0;
1899 }
1900 }
1901 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1902 file->GetNode()->IncRunSlaveCnt();
1903 if (gPerfStats)
1904 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1905 file->GetNode()->GetName(),
1906 file->GetElement()->GetFileName(), kTRUE);
1907 }
1908
1909 Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1910
1911 // Get a packet
1912
1913 TDSetElement *base = file->GetElement();
1914 Long64_t first = file->GetNextEntry();
1915 Long64_t last = base->GetFirst() + base->GetNum();
1916
1917 // If the remaining part is smaller than the (packetsize * 1.5)
1918 // then increase the packetsize
1919
1920 if ( first + num * 1.5 >= last ) {
1921 num = last - first;
1922 file->SetDone(); // done
1923 // Delete file from active list (unalloc list is single pass, no delete needed)
1925 }
1926
1927 // Update NextEntry in the file object
1928 file->MoveNextEntry(num);
1929
1930 slstat->fCurElem = CreateNewPacket(base, first, num);
1931 if (base->GetEntryList())
1932 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1933
1934 // Flag the first packet of a new run (dataset)
1935 if (firstPacket)
1936 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1937 else
1938 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1939
1940 PDB(kPacketizer,2)
1941 Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1942
1943 return slstat->fCurElem;
1944}
1945
1946////////////////////////////////////////////////////////////////////////////////
1947/// Return the number of workers still processing
1948
1950{
1951 Int_t actw = 0;
1952 TIter nxw(fSlaveStats);
1953 TObject *key;
1954 while ((key = nxw())) {
1955 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1956 if (wrkstat && wrkstat->fCurFile) actw++;
1957 }
1958 // Done
1959 return actw;
1960}
1961
1962////////////////////////////////////////////////////////////////////////////////
1963/// Get Estimation of the current rate; just summing the current rates of
1964/// the active workers
1965
1967{
1968 all = kTRUE;
1969 // Loop over the workers
1970 Float_t currate = 0.;
1971 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1972 TIter nxw(fSlaveStats);
1973 TObject *key;
1974 while ((key = nxw()) != 0) {
1975 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1976 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1977 // Sum-up the current rates
1978 currate += slstat->GetProgressStatus()->GetCurrentRate();
1979 } else {
1980 all = kFALSE;
1981 }
1982 }
1983 }
1984 // Done
1985 return currate;
1986}
1987
1988////////////////////////////////////////////////////////////////////////////////
1989/// Get estimation for the number of processed entries and bytes read at time t,
1990/// based on the numbers already processed and the latests worker measured speeds.
1991/// If t <= 0 the current time is used.
1992/// Only the estimation for the entries is currently implemented.
1993/// This is needed to smooth the instantaneous rate plot.
1994
1996 Long64_t &bytes, Long64_t &calls)
1997{
1998 // Default value
1999 ent = GetEntriesProcessed();
2000 bytes = GetBytesRead();
2001 calls = GetReadCalls();
2002
2003 // Parse option
2004 if (fUseEstOpt == kEstOff)
2005 // Do not use estimation
2006 return 0;
2007 Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2008
2009 TTime tnow = gSystem->Now();
2010 Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2011 Double_t dt = -1;
2012
2013 // Loop over the workers
2014 Bool_t all = kTRUE;
2015 Float_t trate = 0.;
2016 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2017 ent = 0;
2018 TIter nxw(fSlaveStats);
2019 TObject *key;
2020 while ((key = nxw()) != 0) {
2021 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2022 if (slstat) {
2023 // Those surely processed
2024 Long64_t e = slstat->GetEntriesProcessed();
2025 if (e <= 0) all = kFALSE;
2026 // Time elapsed since last update
2027 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2028 // Add estimated entries processed since last update
2029 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2030 : slstat->GetAvgRate();
2031 trate += rate;
2032 // Add estimated entries processed since last update
2033 e += (Long64_t) (dt * rate);
2034 // Add to the total
2035 ent += e;
2036 // Notify
2037 PDB(kPacketizer,3)
2038 Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2039 slstat->fSlave->GetOrdinal(),
2040 slstat->GetEntriesProcessed(), rate, dt, e);
2041 }
2042 }
2043 }
2044 // Notify
2045 dt = now - fProgressStatus->GetLastUpdate();
2046 PDB(kPacketizer,2)
2047 Info("GetEstEntriesProcessed",
2048 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2049 dt, ent, GetEntriesProcessed(), bytes, trate, all);
2050
2051 // Check values
2052 ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2053 ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2054 bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2055
2056 // Done
2057 return ((all) ? 0 : 1);
2058}
2059
2060////////////////////////////////////////////////////////////////////////////////
2061/// This method can be called at any time during processing
2062/// as an effect of handling kPROOF_STOPPROCESS
2063/// If the output list from this worker is going to be sent back to the master,
2064/// the 'status' includes the number of entries processed by the slave.
2065/// From this we calculate the remaining part of the packet.
2066/// 0 indicates that the results from that worker were lost completely.
2067/// Assume that the filenodes for which we have a TFileNode object
2068/// are still up and running.
2069
2071 TList **listOfMissingFiles)
2072{
2073 TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2074 if (!slaveStat) {
2075 Error("MarkBad", "Worker does not exist");
2076 return;
2077 }
2078 // Update worker counters
2079 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2080 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2081 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2082 }
2083
2084 // If status is defined, the remaining part of the last packet is
2085 // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
2086 if (!status) {
2087 // Get the subset processed by the bad worker.
2088 TList *subSet = slaveStat->GetProcessedSubSet();
2089 if (subSet) {
2090 // Take care of the current packet
2091 if (slaveStat->fCurElem) {
2092 subSet->Add(slaveStat->fCurElem);
2093 }
2094 // Merge overlapping or subsequent elements
2095 Int_t nmg = 0, ntries = 100;
2096 TDSetElement *e = 0, *enxt = 0;
2097 do {
2098 nmg = 0;
2099 e = (TDSetElement *) subSet->First();
2100 while ((enxt = (TDSetElement *) subSet->After(e))) {
2101 if (e->MergeElement(enxt) >= 0) {
2102 nmg++;
2103 subSet->Remove(enxt);
2104 delete enxt;
2105 } else {
2106 e = enxt;
2107 }
2108 }
2109 } while (nmg > 0 && --ntries > 0);
2110 // reassign the packets assigned to the bad slave and save the size;
2111 SplitPerHost(subSet, listOfMissingFiles);
2112 // the elements were reassigned so should not be deleted
2113 subSet->SetOwner(0);
2114 } else {
2115 Warning("MarkBad", "subset processed by bad worker not found!");
2116 }
2117 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2118 }
2119 // remove slavestat from the map
2121 delete slaveStat;
2122 // recalculate fNEventsOnRemLoc and others
2123 InitStats();
2124}
2125
2126////////////////////////////////////////////////////////////////////////////////
2127/// The file in the listOfMissingFiles can appear several times;
2128/// in order to fix that, a TDSetElement::Merge method is needed.
2129
2131 TList **listOfMissingFiles)
2132{
2133 if (!e) {
2134 Error("ReassignPacket", "empty packet!");
2135 return -1;
2136 }
2137 // Check the old filenode
2138 TUrl url = e->GetFileName();
2139 // Check the host from which 'e' was previously read.
2140 // Map non URL filenames to dummy host
2141 TString host;
2142 if ( !url.IsValid() ||
2143 (strncmp(url.GetProtocol(),"root", 4) &&
2144 strncmp(url.GetProtocol(),"rfio", 4))) {
2145 host = "no-host";
2146 } else {
2147 host = url.GetHost();
2148 }
2149
2150 // If accessible add it back to the old node
2151 // and do DecProcessed
2152 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2153 if (node && fTryReassign) {
2154 // The packet 'e' was processing data from this node.
2155 node->DecreaseProcessed(e->GetNum());
2156 // The file should be already in fFilesToProcess ...
2157 node->Add(e, kFALSE);
2158 if (!fUnAllocated->FindObject(node))
2159 fUnAllocated->Add(node);
2160 return 0;
2161 } else {
2162 // Add to the list of missing files
2163 TFileInfo *fi = e->GetFileInfo();
2164 if (listOfMissingFiles && *listOfMissingFiles)
2165 (*listOfMissingFiles)->Add((TObject *)fi);
2166 return -1;
2167 }
2168}
2169
2170////////////////////////////////////////////////////////////////////////////////
2171/// Split into per host entries
2172/// The files in the listOfMissingFiles can appear several times;
2173/// in order to fix that, a TDSetElement::Merge method is needed.
2174
2176 TList **listOfMissingFiles)
2177{
2178 if (!elements) {
2179 Error("SplitPerHost", "Empty list of packets!");
2180 return;
2181 }
2182 if (elements->GetSize() <= 0) {
2183 Error("SplitPerHost", "The input list contains no elements");
2184 return;
2185 }
2186 TIter subSetIter(elements);
2187 TDSetElement *e;
2188 while ((e = (TDSetElement*) subSetIter.Next())) {
2189 if (ReassignPacket(e, listOfMissingFiles) == -1) {
2190 // Remove from the list in order to delete it.
2191 if (elements->Remove(e))
2192 Error("SplitPerHost", "Error removing a missing file");
2193 delete e;
2194 }
2195
2196 }
2197}
@ kPROOF_FATAL
Definition: MessageTypes.h:43
@ kPROOF_GETENTRIES
Definition: MessageTypes.h:60
@ kPROOF_MESSAGE
Definition: MessageTypes.h:85
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:529
#define f(i)
Definition: RSha256.hxx:104
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:41
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
const char Option_t
Definition: RtypesCore.h:62
#define ClassImp(name)
Definition: Rtypes.h:363
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
#define R__ASSERT(e)
Definition: TError.h:96
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
#define Printf
Definition: TGeoToOCC.h:18
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
R__EXTERN TProof * gProof
Definition: TProof.h:1077
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
#define gPerfStats
Int_t BufferSize() const
Definition: TBuffer.h:94
Int_t Length() const
Definition: TBuffer.h:96
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:429
const char * GetObjName() const
Definition: TDSet.h:120
Long64_t GetNum() const
Definition: TDSet.h:114
@ kCorrupted
Definition: TDSet.h:74
@ kNewRun
Definition: TDSet.h:75
TObject * GetEntryList() const
Definition: TDSet.h:131
void SetFirst(Long64_t first)
Definition: TDSet.h:113
void Invalidate()
Definition: TDSet.h:134
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:129
void SetNum(Long64_t num)
Definition: TDSet.h:118
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:253
void SetValid()
Definition: TDSet.h:135
Long64_t GetTDSetOffset() const
Definition: TDSet.h:128
Bool_t GetValid() const
Definition: TDSet.h:119
const char * GetFileName() const
Definition: TDSet.h:111
Long64_t GetFirst() const
Definition: TDSet.h:112
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:413
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1369
Bool_t IsTree() const
Definition: TDSet.h:225
const char * GetType() const
Definition: TDSet.h:228
TList * GetListOfElements() const
Definition: TDSet.h:231
@ kSomeInvalid
Definition: TDSet.h:161
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual Long64_t GetN() const
Definition: TEntryList.h:75
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Int_t GetN() const
Definition: TEventList.h:56
Class describing a generic file including meta information.
Definition: TFileInfo.h:38
TObject * Next()
Definition: TCollection.h:249
void Reset()
Definition: TCollection.h:252
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:327
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:818
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition: TList.cxx:354
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:689
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:655
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:933
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:53
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition: TMap.cxx:150
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
Definition: TMap.cxx:295
UInt_t What() const
Definition: TMessage.h:75
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
An array of TObjects.
Definition: TObjArray.h:37
Collectable string class.
Definition: TObjString.h:28
const char * GetName() const
Returns name of object.
Definition: TObjString.h:39
const TString & GetString() const
Definition: TObjString.h:47
Mother of all ROOT objects.
Definition: TObject.h:37
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:172
virtual Bool_t IsSortable() const
Definition: TObject.h:131
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition: TObject.cxx:128
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:159
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:550
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual ~TPacketizerAdaptive()
Destructor.
TFileNode * NextNode()
Get next node which has unallocated files.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetActiveWorkers()
Return the number of workers still processing.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Named parameter, streamable and storable.
Definition: TParameter.h:37
Container class for processing statistics.
Double_t GetProcTime() const
Double_t GetLastUpdate() const
Double_t GetRate() const
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition: TProofServ.h:257
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:316
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9308
Class describing a PROOF worker server.
Definition: TSlave.h:46
TSocket * GetSocket() const
Definition: TSlave.h:134
Int_t GetProtocol() const
Definition: TSlave.h:133
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
const char * GetOrdinal() const
Definition: TSlave.h:131
Int_t GetPerfIdx() const
Definition: TSlave.h:132
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:817
virtual Bool_t IsValid() const
Definition: TSocket.h:152
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
A sorted doubly linked list.
Definition: TSortedList.h:28
Basic string class.
Definition: TString.h:131
Ssiz_t Length() const
Definition: TString.h:405
const char * Data() const
Definition: TString.h:364
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition: TString.cxx:2172
Bool_t IsNull() const
Definition: TString.h:402
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2264
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:619
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition: TSystem.cxx:472
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:312
Basic time type with millisecond precision.
Definition: TTime.h:27
This class represents a WWW compatible URL.
Definition: TUrl.h:35
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition: TUrl.cxx:385
const char * GetFile() const
Definition: TUrl.h:72
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Bool_t IsValid() const
Definition: TUrl.h:82
const char * GetHost() const
Definition: TUrl.h:70
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
void SetHost(const char *host)
Definition: TUrl.h:87
const char * GetProtocol() const
Definition: TUrl.h:67
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
TProofProgressStatus * GetProgressStatus()
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
Long64_t GetEntriesProcessed() const
Double_t GetCumProcTime() const
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
const Int_t n
Definition: legend1.C:16
void Add(RHist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
Definition: RHist.hxx:310
static constexpr double s
Definition: file.py:1
Definition: first.py:1
auto * m
Definition: textangle.C:8