Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TPacketizerAdaptive.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Jan Iwaszkiewicz 11/12/06
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10*************************************************************************/
11
12/** \class TPacketizerAdaptive
13\ingroup proofkernel
14
15This packetizer is based on TPacketizer but uses different
16load-balancing algorithms and data structures.
17Two main improvements in the load-balancing strategy:
18 - First one was to change the order in which the files are assigned
19 to the computing nodes in such a way that network transfers are
20 evenly distributed in the query time. Transfer of the remote files
21 was often becoming a bottleneck at the end of a query.
22 - The other improvement is the use of time-based packet size. We
23 measure the processing rate of all the nodes and calculate the
24 packet size, so that it takes certain amount of time. In this way
25 packetizer prevents the situation where the query can't finish
26 because of one slow node.
27
28The data structures: TFileStat, TFileNode and TSlaveStat are
29enriched + changed and TFileNode::Compare method is changed.
30
31*/
32
33#include "TPacketizerAdaptive.h"
34
35#include "Riostream.h"
36#include "TDSet.h"
37#include "TError.h"
38#include "TEnv.h"
39#include "TEntryList.h"
40#include "TEventList.h"
41#include "TMap.h"
42#include "TMessage.h"
43#include "TMonitor.h"
44#include "TNtupleD.h"
45#include "TObject.h"
46#include "TParameter.h"
47#include "TPerfStats.h"
48#include "TProofDebug.h"
49#include "TProof.h"
50#include "TProofServ.h"
51#include "TSlave.h"
52#include "TSocket.h"
53#include "TSortedList.h"
54#include "TUrl.h"
55#include "TClass.h"
56#include "TRandom.h"
57#include "TMath.h"
58#include "TObjString.h"
59#include "TList.h"
60
61//
62// The following three utility classes manage the state of the
63// work to be performed and the slaves involved in the process.
64// A list of TFileNode(s) describes the hosts with files, each
65// has a list of TFileStat(s) keeping the state for each TDSet
66// element (file).
67//
68// The list of TSlaveStat(s) keep track of the work (being) done
69// by each slave
70//
71
72
73//------------------------------------------------------------------------------
74
76
77private:
78 Bool_t fIsDone; // is this element processed
79 TFileNode *fNode; // my FileNode
80 TDSetElement *fElement; // location of the file and its range
81 Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing
82
83public:
85
86 Bool_t IsDone() const {return fIsDone;}
87 Bool_t IsSortable() const { return kTRUE; }
88 void SetDone() {fIsDone = kTRUE;}
89 TFileNode *GetNode() const {return fNode;}
90 TDSetElement *GetElement() const {return fElement;}
92 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
93
94 // This method is used to keep a sorted list of remaining files to be processed
95 Int_t Compare(const TObject* obj) const
96 {
97 // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
98 // and 1 if elem.entries < obj.elem.entries.
99 const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
100 if (fst && GetElement() && fst->GetElement()) {
101 Long64_t ent = GetElement()->GetNum();
102 Long64_t entfst = fst->GetElement()->GetNum();
103 if (ent > 0 && entfst > 0) {
104 if (ent > entfst) {
105 return 1;
106 } else if (ent < entfst) {
107 return -1;
108 } else {
109 return 0;
110 }
111 }
112 }
113 // No info: assume equal (no change in order)
114 return 0;
115 }
116 void Print(Option_t * = 0) const
117 { // Notify file name and entries
118 Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
119 fElement ? fElement->GetNum() : -1);
120 }
121};
122
124 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
125{
126 // Constructor: add to the global list
127 if (files) files->Add(this);
128}
129
130//------------------------------------------------------------------------------
131
132// a class describing a file node as a part of a session
134
135private:
136 TString fNodeName; // FQDN of the node
137 TList *fFiles; // TDSetElements (files) stored on this node
138 TObject *fUnAllocFileNext; // cursor in fFiles
139 TList *fActFiles; // files with work remaining
140 TObject *fActFileNext; // cursor in fActFiles
141 Int_t fMySlaveCnt; // number of slaves running on this node
142 // (which can process remote files)
143 Int_t fExtSlaveCnt; // number of external slaves processing
144 // files on this node
145 Int_t fRunSlaveCnt; // total number of slaves processing files
146 // on this node
147 Long64_t fProcessed; // number of events processed on this node
148 Long64_t fEvents; // number of entries in files on this node
149
150 Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
151
152 TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer)
153
154public:
155 TFileNode(const char *name, Int_t strategy, TSortedList *files);
156 ~TFileNode() { delete fFiles; delete fActFiles; }
157
159 Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
160 void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
161 void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
168 Bool_t IsSortable() const { return kTRUE; }
170 void IncProcessed(Long64_t nEvents)
171 { fProcessed += nEvents; }
172 Long64_t GetProcessed() const { return fProcessed; }
173 void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
174 // this method is used by Compare() it adds 1, so it returns a number that
175 // would be true if one more slave is added.
177 { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178 void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
179 const char *GetName() const { return fNodeName.Data(); }
180 Long64_t GetNEvents() const { return fEvents; }
181
182 void Print(Option_t * = 0) const
183 {
184 TFileStat *fs = 0;
185 TDSetElement *e = 0;
186 Int_t nn = 0;
187 Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188 Printf("+++ TFileNode: %s +++", fNodeName.Data());
189 Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190 Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
191 Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
192 if (fFiles && fFiles->GetSize() > 0) {
193 TIter nxf(fFiles);
194 while ((fs = (TFileStat *) nxf())) {
195 if ((e = fs->GetElement())) {
196 Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
197 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
198 } else {
199 Printf("+++ #%d: no element! ", ++nn);
200 }
201 }
202 }
203 Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
204 if (fActFiles && fActFiles->GetSize() > 0) {
205 TIter nxaf(fActFiles);
206 while ((fs = (TFileStat *) nxaf())) {
207 if ((e = fs->GetElement())) {
208 Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
209 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
210 } else {
211 Printf("+++ #%d: no element! ", ++nn);
212 }
213 }
214 }
215 Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
216 }
217
218 void Add(TDSetElement *elem, Bool_t tolist)
219 {
220 TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
221 TFileStat *f = new TFileStat(this, elem, files);
222 fFiles->Add(f);
224 }
225
227 {
229
230 if (next != 0) {
231 // make file active
232 fActFiles->Add(next);
234
235 // move cursor
237 }
238 return (TFileStat *) next;
239 }
240
242 {
243 TObject *next = fActFileNext;
244
245 if (fActFileNext != 0) {
248 }
249
250 return (TFileStat *) next;
251 }
252
254 {
259 }
260
261 Int_t Compare(const TObject *other) const
262 {
263 // Must return -1 if this is smaller than obj, 0 if objects are equal
264 // and 1 if this is larger than obj.
265 // smaller means more needing a new worker.
266 // Two cases are considered depending on
267 // relation between harddrive speed and network bandwidth.
268
269 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
270 if (!obj) {
271 Error("Compare", "input is not a TPacketizer::TFileNode object");
272 return 0;
273 }
274
275 // how many more events it has than obj
276
277 if (fStrategy == 1) {
278 // The default adaptive strategy.
279 Int_t myVal = GetRunSlaveCnt();
280 Int_t otherVal = obj->GetRunSlaveCnt();
281 if (myVal < otherVal) {
282 return -1;
283 } else if (myVal > otherVal) {
284 return 1;
285 } else {
286 // if this has more events to process than obj
287 if ((fEvents - fProcessed) >
288 (obj->GetNEvents() - obj->GetProcessed())) {
289 return -1;
290 } else {
291 return 1;
292 }
293 }
294 } else {
295 Int_t myVal = GetSlaveCnt();
296 Int_t otherVal = obj->GetSlaveCnt();
297 if (myVal < otherVal) {
298 return -1;
299 } else if (myVal > otherVal) {
300 return 1;
301 } else {
302 return 0;
303 }
304 }
305 }
306
307 void Reset()
308 {
310 fActFiles->Clear();
311 fActFileNext = 0;
312 fExtSlaveCnt = 0;
313 fMySlaveCnt = 0;
314 fRunSlaveCnt = 0;
315 }
316};
317
318
320 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
321 fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323 fStrategy(strategy), fFilesToProcess(files)
324{
325 // Constructor
326
327 fFiles->SetOwner();
329}
330
331//------------------------------------------------------------------------------
332
334
336
337private:
338 TFileNode *fFileNode; // corresponding node or 0
339 TFileStat *fCurFile; // file currently being processed
340 TDSetElement *fCurElem; // TDSetElement currently being processed
341 Long64_t fCurProcessed; // events processed in the current file
342 Float_t fCurProcTime; // proc time spent on the current file
343 TList *fDSubSet; // packets processed by this worker
344
345public:
346 TSlaveStat(TSlave *slave);
347 ~TSlaveStat();
348 TFileNode *GetFileNode() const { return fFileNode; }
352 void SetFileNode(TFileNode *node) { fFileNode = node; }
362};
363
364////////////////////////////////////////////////////////////////////////////////
365/// Constructor
366
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
370{
371 fDSubSet = new TList();
373 fSlave = slave;
375 // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
376 // for non existing names
377 fWrkFQDN = slave->GetName();
378 if (strcmp(slave->ClassName(), "TSlaveLite")) {
380 // Get full name for local hosts
381 if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
383 }
384 PDB(kPacketizer, 2)
385 Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
386}
387
388////////////////////////////////////////////////////////////////////////////////
389/// Cleanup
390
392{
393 SafeDelete(fDSubSet);
394 SafeDelete(fStatus);
395}
396
397////////////////////////////////////////////////////////////////////////////////
398/// Update packetizer rates
399
401{
402 if (!st) {
403 Error("UpdateRates", "no status object!");
404 return;
405 }
406 if (fCurFile->IsDone()) {
407 fCurProcTime = 0;
408 fCurProcessed = 0;
409 } else {
410 fCurProcTime += st->GetProcTime() - GetProcTime();
411 fCurProcessed += st->GetEntries() - GetEntriesProcessed();
412 }
413 fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
414 st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
415 SafeDelete(fStatus);
416 fStatus = st;
417}
418
419////////////////////////////////////////////////////////////////////////////////
420/// Add the current element to the fDSubSet (subset processed by this worker)
421/// and if the status arg is given, then change the size of the packet.
422/// return the difference (*st - *fStatus)
423
425{
426 if (st && fDSubSet && fCurElem) {
427 if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
428 fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
429 fDSubSet->Add(fCurElem);
430 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
431 return diff;
432 } else {
433 Error("AddProcessed", "processed subset of current elem undefined");
434 return 0;
435 }
436}
437
438//------------------------------------------------------------------------------
439
441
442////////////////////////////////////////////////////////////////////////////////
443/// Constructor
444
447 TList *input, TProofProgressStatus *st)
448 : TVirtualPacketizer(input, st)
449{
450 PDB(kPacketizer,1) Info("TPacketizerAdaptive",
451 "enter (first %lld, num %lld)", first, num);
452
453 // Init pointer members
454 fSlaveStats = 0;
455 fUnAllocated = 0;
456 fActive = 0;
457 fFileNodes = 0;
458 fMaxPerfIdx = 1;
460 fMaxEntriesRatio = 2.;
461
462 fMaxSlaveCnt = -1;
464 fStrategy = 1;
467
468 if (!fProgressStatus) {
469 Error("TPacketizerAdaptive", "No progress status");
470 return;
471 }
472
473 // Attempt to synchronize the packet size with the tree cache size
474 Int_t cpsync = -1;
475 if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
476 // Check if there is a global cache-packet sync setting
477 cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
478 }
479 if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
480
481 // Max file entries to avg allowed ratio for cache-to-packet synchronization
482 // (applies only if fCachePacketSync is true; -1. disables the bound)
483 if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
484 // Check if there is a global ratio setting
485 fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
486 }
487
488 // The possibility to change packetizer strategy to the basic TPacketizer's
489 // one (in which workers always process their local data first).
490 Int_t strategy = -1;
491 if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
492 // Check if there is a global strategy setting
493 strategy = gEnv->GetValue("Packetizer.Strategy", 1);
494 }
495 if (strategy == 0) {
496 fStrategy = 0;
497 Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
498 } else if (strategy != 1) {
499 Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
500 }
501
502 Long_t maxSlaveCnt = 0;
503 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
504 if (maxSlaveCnt < 0) {
505 Info("TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
507 maxSlaveCnt = 0;
508 }
509 } else {
510 // Try also with Int_t (recently supported in TProof::SetParameter)
511 Int_t mxslcnt = -1;
512 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
513 if (mxslcnt < 0) {
514 Info("TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
516 mxslcnt = 0;
517 }
518 maxSlaveCnt = (Long_t) mxslcnt;
519 }
520 }
521
522 if (!maxSlaveCnt)
523 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
524 if (maxSlaveCnt > 0) {
525 fMaxSlaveCnt = maxSlaveCnt;
526 Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
528 }
529
530 // if forceLocal parameter is set to 1 then eliminate the cross-worker
531 // processing;
532 // This minimizes the network usage on the PROOF cluser at the expense of
533 // longer jobs processing times.
534 // To process successfully the session must have workers with all the data!
536 Int_t forceLocal = 0;
537 if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
538 if (forceLocal == 1)
540 else
541 Info("TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
543 }
544
545 // Below we provide a possibility to change the way packet size is
546 // calculated or define the packet time directly.
547 // fPacketAsAFraction can be interpreted as follows:
548 // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
549 // It substitutes 20 in the old formula to calculate the fPacketSize:
550 // fPacketSize = fTotalEntries / (20 * nslaves)
551 Int_t packetAsAFraction = 0;
552 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
553 if (packetAsAFraction > 0) {
554 fPacketAsAFraction = packetAsAFraction;
555 Info("TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
557 packetAsAFraction);
558 } else
559 Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
560 }
561
562 // Packet re-assignement
563 fTryReassign = 0;
564 Int_t tryReassign = 0;
565 if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0)
566 tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0);
567 fTryReassign = tryReassign;
568 if (fTryReassign != 0)
569 Info("TPacketizerAdaptive", "failed packets will be re-assigned");
570
571 // Save the config parameters in the dedicated list so that they will be saved
572 // in the outputlist and therefore in the relevant TQueryResult
573 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
574 fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
575 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
576 fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
577 fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
578 fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
579
580 Double_t baseLocalPreference = 1.2;
581 fBaseLocalPreference = (Float_t)baseLocalPreference;
582 if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0)
583 fBaseLocalPreference = (Float_t)baseLocalPreference;
584
585 fFileNodes = new TList;
587 fUnAllocated = new TList;
589 fActive = new TList;
591
592 fValid = kTRUE;
593
594 // Resolve end-point urls to optmize distribution
595 // dset->Lookup(); // moved to TProofPlayerRemote::Process
596
597 // Read list of mounted disks
598 TObjArray *partitions = 0;
599 TString partitionsStr;
600 if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
601 partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
602 if (!partitionsStr.IsNull()) {
603 Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
604 partitions = partitionsStr.Tokenize(",");
605 }
606
607 // Split into per host and disk entries
608 dset->Reset();
610 while ((e = (TDSetElement*)dset->Next())) {
611
612 if (e->GetValid()) continue;
613
614 // The dataset name, if any
615 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
616 fDataSet = e->GetDataSet();
617
618 TUrl url = e->GetFileName();
619 PDB(kPacketizer,2)
620 Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
621
622 // Map non URL filenames to dummy host
623 TString host;
624 if ( !url.IsValid() ||
625 (strncmp(url.GetProtocol(),"root", 4) &&
626 strncmp(url.GetProtocol(),"file", 4)) ) {
627 host = "no-host";
628 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
629 host = "localhost";
630 url.SetProtocol("root");
631 } else {
632 host = url.GetHostFQDN();
633 }
634 // Get full name for local hosts
635 if (host.Contains("localhost") || host == "127.0.0.1") {
636 url.SetHost(gSystem->HostName());
637 host = url.GetHostFQDN();
638 }
639
640 // Find on which disk is the file, if any
641 TString disk;
642 if (partitions) {
643 TIter iString(partitions);
644 TObjString* os = 0;
645 while ((os = (TObjString *)iString())) {
646 // Compare begining of the url with disk mountpoint
647 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
648 disk = os->GetName();
649 break;
650 }
651 }
652 }
653 // Node's url
654 TString nodeStr;
655 if (disk.IsNull())
656 nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
657 else
658 nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
659 TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
660
661 if (node == 0) {
662 node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
663 fFileNodes->Add(node);
664 PDB(kPacketizer,2)
665 Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
666 } else {
667 PDB(kPacketizer,2)
668 Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
669 }
670
671 node->Add(e, kFALSE);
672 }
673
674 fSlaveStats = new TMap;
676
677 TSlave *slave;
678 TIter si(slaves);
679 while ((slave = (TSlave*) si.Next())) {
680 fSlaveStats->Add( slave, new TSlaveStat(slave) );
681 fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
682 slave->GetPerfIdx() : fMaxPerfIdx;
683 }
684
685 // Setup file & filenode structure
686 Reset();
687 // Optimize the number of files to be open when running on subsample
688 Int_t validateMode = 0;
689 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
690 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
691 if (num > -1)
692 PDB(kPacketizer,2)
693 Info("TPacketizerAdaptive",
694 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
695 ValidateFiles(dset, slaves, num, byfile);
696
697
698 if (!fValid) return;
699
700 // apply global range (first,num) to dset and rebuild structure
701 // ommitting TDSet elements that are not needed
702
703 Int_t files = 0;
704 fTotalEntries = 0;
705 fUnAllocated->Clear(); // avoid dangling pointers
706 fActive->Clear();
707 fFileNodes->Clear(); // then delete all objects
708 PDB(kPacketizer,2)
709 Info("TPacketizerAdaptive",
710 "processing range: first %lld, num %lld", first, num);
711
712 dset->Reset();
713 Long64_t cur = 0;
714 while (( e = (TDSetElement*)dset->Next())) {
715
716 // Skip invalid or missing file; It will be moved
717 // from the dset to the 'MissingFiles' list in the player.
718 if (!e->GetValid()) continue;
719
720 TUrl url = e->GetFileName();
721 Long64_t eFirst = e->GetFirst();
722 Long64_t eNum = e->GetNum();
723 PDB(kPacketizer,2)
724 Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
725 PDB(kPacketizer,2)
726 Info("TPacketizerAdaptive",
727 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
728
729 if (!e->GetEntryList()) {
730 // This element is before the start of the global range, skip it
731 if (cur + eNum < first) {
732 cur += eNum;
733 PDB(kPacketizer,2)
734 Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
735 continue;
736 }
737
738 // This element is after the end of the global range, skip it
739 if (num != -1 && (first+num <= cur)) {
740 cur += eNum;
741 PDB(kPacketizer,2)
742 Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
743 continue; // break ??
744 }
745
746 Bool_t inRange = kFALSE;
747 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
748
749 if (cur <= first) {
750 // If this element contains the start of the global range
751 // adjust its start and number of entries
752 e->SetFirst( eFirst + (first - cur) );
753 e->SetNum( e->GetNum() - (first - cur) );
754 PDB(kPacketizer,2)
755 Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
756 eFirst + (first - cur), first + num - cur);
757 inRange = kTRUE;
758 }
759 if (num != -1 && (first+num <= cur+eNum)) {
760 // If this element contains the end of the global range
761 // adjust its number of entries
762 e->SetNum( first + num - e->GetFirst() - cur );
763 PDB(kPacketizer,2)
764 Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
765 inRange = kTRUE;
766 }
767
768 } else {
769 // Increment the counter ...
770 PDB(kPacketizer,2)
771 Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
772 cur += eNum;
773 }
774 // Re-adjust eNum and cur, if needed
775 if (inRange) {
776 cur += eNum;
777 eNum = e->GetNum();
778 }
779
780 } else {
781 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
782 if (enl) {
783 eNum = enl->GetN();
784 PDB(kPacketizer,2)
785 Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
786 } else {
787 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
788 eNum = evl ? evl->GetN() : eNum;
789 PDB(kPacketizer,2)
790 Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
791 }
792 if (!eNum) {
793 PDB(kPacketizer,2)
794 Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
795 continue;
796 }
797 }
798 PDB(kPacketizer,2)
799 Info("TPacketizerAdaptive", " --> next cur %lld", cur);
800
801 // Map non URL filenames to dummy host
802 TString host;
803 if ( !url.IsValid() ||
804 (strncmp(url.GetProtocol(),"root", 4) &&
805 strncmp(url.GetProtocol(),"file", 4)) ) {
806 host = "no-host";
807 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
808 host = "localhost";
809 url.SetProtocol("root");
810 } else {
811 host = url.GetHostFQDN();
812 }
813 // Get full name for local hosts
814 if (host.Contains("localhost") || host == "127.0.0.1") {
815 url.SetHost(gSystem->HostName());
816 host = url.GetHostFQDN();
817 }
818
819 // Find, on which disk is the file
820 TString disk;
821 if (partitions) {
822 TIter iString(partitions);
823 TObjString* os = 0;
824 while ((os = (TObjString *)iString())) {
825 // Compare begining of the url with disk mountpoint
826 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
827 disk = os->GetName();
828 break;
829 }
830 }
831 }
832 // Node's url
833 TString nodeStr;
834 if (disk.IsNull())
835 nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
836 else
837 nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
838 TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
839
840
841 if (node == 0) {
842 node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
843 fFileNodes->Add( node );
844 PDB(kPacketizer, 2)
845 Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
846 } else {
847 PDB(kPacketizer, 2)
848 Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
849 }
850
851 ++files;
852 fTotalEntries += eNum;
853 node->Add(e, kTRUE);
854 node->IncEvents(eNum);
855 PDB(kPacketizer,2) e->Print("a");
856 }
857 PDB(kPacketizer,1)
858 Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
859 fTotalEntries, files, fFileNodes->GetSize());
860
861 // Set the total number for monitoring
862 if (gPerfStats)
863 gPerfStats->SetNumEvents(fTotalEntries);
864
865 Reset();
866
867 InitStats();
868
869 if (!fValid)
871
872 PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
873}
874
875////////////////////////////////////////////////////////////////////////////////
876/// Destructor.
877
879{
880 if (fSlaveStats) {
882 }
883
889}
890
891////////////////////////////////////////////////////////////////////////////////
892/// (re)initialise the statistics
893/// called at the begining or after a worker dies.
894
896{
897 // calculating how many files from TDSet are not cached on
898 // any slave
899 Int_t noRemoteFiles = 0;
901 Int_t totalNumberOfFiles = 0;
902 TIter next(fFileNodes);
903 while (TFileNode *fn = (TFileNode*)next()) {
904 totalNumberOfFiles += fn->GetNumberOfFiles();
905 if (fn->GetMySlaveCnt() == 0) {
906 noRemoteFiles += fn->GetNumberOfFiles();
907 fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
908 }
909 }
910
911 if (totalNumberOfFiles == 0) {
912 Info("InitStats", "no valid or non-empty file found: setting invalid");
913 // No valid files: set invalid and return
914 fValid = kFALSE;
915 return;
916 }
917
918 fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
919 Info("InitStats",
920 "fraction of remote files %f", fFractionOfRemoteFiles);
921
922 if (!fValid)
924
925 PDB(kPacketizer,1) Info("InitStats", "return");
926}
927
928////////////////////////////////////////////////////////////////////////////////
929/// Get next unallocated file from 'node' or other nodes:
930/// First try 'node'. If there is no more files, keep trying to
931/// find an unallocated file on other nodes.
932
934{
935 TFileStat *file = 0;
936
937 if (node != 0) {
938 PDB(kPacketizer, 2)
939 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
940 file = node->GetNextUnAlloc();
941 if (file == 0) RemoveUnAllocNode(node);
942 } else {
943 if (nodeHostName && strlen(nodeHostName) > 0) {
944
945 TFileNode *fn;
946 // Make sure that they are in the corrected order
948 PDB(kPacketizer,2) fUnAllocated->Print();
949
950 // Loop over unallocated fileNode list
951 for (int i = 0; i < fUnAllocated->GetSize(); i++) {
952
953 if ((fn = (TFileNode *) fUnAllocated->At(i))) {
954 TUrl uu(fn->GetName());
955 PDB(kPacketizer, 2)
956 Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
957
958 // Check, whether node's hostname is matching with current fileNode (fn)
959 if (!strcmp(nodeHostName, uu.GetHost())) {
960 node = fn;
961
962 // Fetch next unallocated file from this node
963 if ((file = node->GetNextUnAlloc()) == 0) {
964 RemoveUnAllocNode(node);
965 node = 0;
966 } else {
967 PDB(kPacketizer, 2)
968 Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
969 break;
970 }
971 }
972 } else {
973 Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
974 }
975 }
976
977 if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
978 // Unlike in TPacketizer we look at the number of ext slaves only.
979 PDB(kPacketizer,1)
980 Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
981 node = 0;
982 }
983 }
984
985 if (node == 0) {
986 while (file == 0 && ((node = NextNode()) != 0)) {
987 PDB(kPacketizer, 2)
988 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
989 if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
990 }
991 }
992 }
993
994 if (file != 0) {
995 // if needed make node active
996 if (fActive->FindObject(node) == 0) {
997 fActive->Add(node);
998 }
999 }
1000
1001 PDB(kPacketizer, 2) {
1002 if (!file) {
1003 Info("GetNextUnAlloc", "no file found!");
1004 } else {
1005 file->Print();
1006 }
1007 }
1008
1009 return file;
1010}
1011
1012////////////////////////////////////////////////////////////////////////////////
1013/// Get next node which has unallocated files.
1014/// the order is determined by TFileNode::Compare
1015
1017{
1018 fUnAllocated->Sort();
1019 PDB(kPacketizer,2) {
1021 }
1022
1024 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1025 // unlike in TPacketizer we look at the number of ext slaves only.
1026 PDB(kPacketizer,1)
1027 Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1028 fn = 0;
1029 }
1030
1031 return fn;
1032}
1033
1034////////////////////////////////////////////////////////////////////////////////
1035/// Remove unallocated node.
1036
1038{
1039 fUnAllocated->Remove(node);
1040}
1041
1042////////////////////////////////////////////////////////////////////////////////
1043/// Get next active file.
1044
1046{
1047 TFileNode *node;
1048 TFileStat *file = 0;
1049
1050 while (file == 0 && ((node = NextActiveNode()) != 0)) {
1051 file = node->GetNextActive();
1052 if (file == 0) RemoveActiveNode(node);
1053 }
1054
1055 return file;
1056}
1057
1058
1059////////////////////////////////////////////////////////////////////////////////
1060/// Get next active node.
1061
1063{
1064 fActive->Sort();
1065 PDB(kPacketizer,2) {
1066 Info("NextActiveNode", "enter");
1067 fActive->Print();
1068 }
1069
1070 TFileNode *fn = (TFileNode*) fActive->First();
1071 // look at only ext slaves
1072 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1073 PDB(kPacketizer,1)
1074 Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1075 fn = 0;
1076 }
1077
1078 return fn;
1079}
1080
1081////////////////////////////////////////////////////////////////////////////////
1082/// Remove file from the list of actives.
1083
1085{
1086 TFileNode *node = file->GetNode();
1087
1088 node->RemoveActive(file);
1089 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1090}
1091
1092////////////////////////////////////////////////////////////////////////////////
1093/// Remove node from the list of actives.
1094
1096{
1097 fActive->Remove(node);
1098}
1099
1100////////////////////////////////////////////////////////////////////////////////
1101/// Reset the internal data structure for packet distribution.
1102
1104{
1107
1108 fActive->Clear();
1109
1110 TIter files(fFileNodes);
1111 TFileNode *fn;
1112 while ((fn = (TFileNode*) files.Next()) != 0) {
1113 fn->Reset();
1114 }
1115
1116 TIter slaves(fSlaveStats);
1117 TObject *key;
1118 while ((key = slaves.Next()) != 0) {
1119 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1120 if (!slstat) {
1121 Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
1122 continue;
1123 }
1124 // Find out which file nodes are on the worker machine and assign the
1125 // one with less workers assigned
1126 TFileNode *fnmin = 0;
1127 Int_t fncnt = fSlaveStats->GetSize();
1128 files.Reset();
1129 while ((fn = (TFileNode*) files.Next()) != 0) {
1130 if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1131 if (fn->GetMySlaveCnt() < fncnt) {
1132 fnmin = fn;
1133 fncnt = fn->GetMySlaveCnt();
1134 }
1135 }
1136 }
1137 if (fnmin != 0 ) {
1138 slstat->SetFileNode(fnmin);
1139 fnmin->IncMySlaveCnt();
1140 PDB(kPacketizer, 2)
1141 Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1142 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1143 }
1144 slstat->fCurFile = 0;
1145 }
1146}
1147
1148////////////////////////////////////////////////////////////////////////////////
1149/// Check existence of file/dir/tree an get number of entries.
1150/// Assumes the files have been setup.
1151
1153 Long64_t maxent, Bool_t byfile)
1154{
1155 TMap slaves_by_sock;
1156 TMonitor mon;
1157 TList workers;
1158
1159
1160 // Setup the communication infrastructure
1161
1162 workers.AddAll(slaves);
1163 TIter si(slaves);
1164 TSlave *slm;
1165 while ((slm = (TSlave*)si.Next()) != 0) {
1166 PDB(kPacketizer,3)
1167 Info("ValidateFiles","socket added to monitor: %p (%s)",
1168 slm->GetSocket(), slm->GetName());
1169 mon.Add(slm->GetSocket());
1170 slaves_by_sock.Add(slm->GetSocket(), slm);
1171 }
1172
1173 mon.DeActivateAll();
1174
1175 ((TProof*)gProof)->DeActivateAsyncInput();
1176
1177 // Some monitoring systems (TXSocketHandler) need to know this
1178 ((TProof*)gProof)->fCurrentMonitor = &mon;
1179
1180 // Identify the type
1181 if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1182
1183 // Preparing for client notification
1184 TString msg("Validating files");
1185 UInt_t n = 0;
1186 UInt_t tot = dset->GetListOfElements()->GetSize();
1187 Bool_t st = kTRUE;
1188
1189 Long64_t totent = 0, nopenf = 0;
1190 while (kTRUE) {
1191
1192 // send work
1193 while (TSlave *s = (TSlave *)workers.First()) {
1194
1195 workers.Remove(s);
1196
1197 // find a file
1198
1199 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1200 if (!slstat) {
1201 Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
1202 continue;
1203 }
1204
1205 TFileNode *node = 0;
1206 TFileStat *file = 0;
1207
1208 // try its own node first
1209 if ((node = slstat->GetFileNode()) != 0) {
1210 PDB(kPacketizer,3) node->Print();
1211 file = GetNextUnAlloc(node);
1212 if (file == 0)
1213 slstat->SetFileNode(0);
1214 }
1215
1216 // look for a file on any other node if necessary
1217 if (file == 0)
1218 file = GetNextUnAlloc();
1219
1220 if (file != 0) {
1221 // files are done right away
1223
1224 slstat->fCurFile = file;
1225 TDSetElement *elem = file->GetElement();
1226 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1227 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1228 // This is decremented when we get the reply
1229 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1231 m << dset->IsTree()
1232 << TString(elem->GetFileName())
1233 << TString(elem->GetDirectory())
1234 << TString(elem->GetObjName());
1235
1236 s->GetSocket()->Send( m );
1237 mon.Activate(s->GetSocket());
1238 PDB(kPacketizer,2)
1239 Info("ValidateFiles",
1240 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1241 s->GetOrdinal(), s->GetName(), s->GetSocket(),
1242 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1243 elem->GetDirectory(), elem->GetObjName());
1244 } else {
1245 // Fill the info
1246 elem->SetTDSetOffset(entries);
1247 if (entries > 0) {
1248 // Most likely valid
1249 elem->SetValid();
1250 if (!elem->GetEntryList()) {
1251 if (elem->GetFirst() > entries) {
1252 Error("ValidateFiles",
1253 "first (%lld) higher then number of entries (%lld) in %s",
1254 elem->GetFirst(), entries, elem->GetFileName());
1255 // disable element
1256 slstat->fCurFile->SetDone();
1257 elem->Invalidate();
1259 }
1260 if (elem->GetNum() == -1) {
1261 elem->SetNum(entries - elem->GetFirst());
1262 } else if (elem->GetFirst() + elem->GetNum() > entries) {
1263 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
1264 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1265 entries, elem->GetFileName());
1266 elem->SetNum(entries - elem->GetFirst());
1267 }
1268 PDB(kPacketizer,2)
1269 Info("ValidateFiles",
1270 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1271 }
1272 }
1273 // Count
1274 totent += entries;
1275 nopenf++;
1276 // Notify the client
1277 n++;
1278 gProof->SendDataSetStatus(msg, n, tot, st);
1279
1280 // This worker is ready for the next validation
1281 workers.Add(s);
1282 }
1283 }
1284 }
1285
1286 // Check if there is anything to wait for
1287 if (mon.GetActive() == 0) {
1288 if (byfile && maxent > 0) {
1289 // How many files do we still need ?
1290 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1291 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1292 if (nrestf > 0) {
1293 PDB(kPacketizer,3)
1294 Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
1295 maxent, totent, nopenf, nrestf);
1296 si.Reset();
1297 while ((slm = (TSlave *) si.Next()) && nrestf--) {
1298 workers.Add(slm);
1299 }
1300 continue;
1301 } else {
1302 PDB(kPacketizer,3)
1303 Info("ValidateFiles", "no need to validate more files");
1304 break;
1305 }
1306 } else {
1307 break;
1308 }
1309 }
1310
1311 PDB(kPacketizer,3) {
1312 Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
1313 TList *act = mon.GetListOfActives();
1314 TIter next(act);
1315 while (TSocket *s = (TSocket*) next()) {
1316 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1317 if (sl)
1318 Info("ValidateFiles", " worker-%s (%s)",
1319 sl->GetOrdinal(), sl->GetName());
1320 }
1321 delete act;
1322 }
1323
1324 TSocket *sock = mon.Select();
1325 // If we have been interrupted break
1326 if (!sock) {
1327 Error("ValidateFiles", "selection has been interrupted - STOP");
1328 mon.DeActivateAll();
1329 fValid = kFALSE;
1330 break;
1331 }
1332 mon.DeActivate(sock);
1333
1334 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
1335
1336 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1337 if (!sock->IsValid()) {
1338 // A socket got invalid during validation
1339 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
1340 slave->GetOrdinal(), slave->GetName());
1341 ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1342 fValid = kFALSE;
1343 break;
1344 }
1345
1346 TMessage *reply;
1347
1348 if (sock->Recv(reply) <= 0) {
1349 // Notify
1350 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
1351 slave->GetOrdinal(), slave->GetName());
1352 // Help! lost a slave? ('slave' is deleted inside here ...)
1353 ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
1354 fValid = kFALSE;
1355 continue;
1356 }
1357
1358 if (reply->What() != kPROOF_GETENTRIES) {
1359 // Not what we want: handover processing to the central machinery
1360 Int_t what = reply->What();
1361 ((TProof*)gProof)->HandleInputMessage(slave, reply);
1362 if (what == kPROOF_FATAL) {
1363 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
1364 slave->GetOrdinal(), slave->GetName());
1365 fValid = kFALSE;
1366 } else {
1367 // Reactivate the socket
1368 mon.Activate(sock);
1369 }
1370 // Get next message
1371 continue;
1372 }
1373
1374 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1375 TDSetElement *e = slavestat->fCurFile->GetElement();
1376 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1377 Long64_t entries;
1378
1379 (*reply) >> entries;
1380
1381 // Extract object name, if there
1382 if ((reply->BufferSize() > reply->Length())) {
1383 TString objname;
1384 (*reply) >> objname;
1385 e->SetTitle(objname);
1386 }
1387
1388 e->SetTDSetOffset(entries);
1389 if (entries > 0) {
1390
1391 // This dataset element is most likely valid
1392 e->SetValid();
1393
1394 if (!e->GetEntryList()) {
1395 if (e->GetFirst() > entries) {
1396 Error("ValidateFiles",
1397 "first (%lld) higher then number of entries (%lld) in %s",
1398 e->GetFirst(), entries, e->GetFileName());
1399
1400 // Invalidate the element
1401 slavestat->fCurFile->SetDone();
1402 e->Invalidate();
1404 }
1405
1406 if (e->GetNum() == -1) {
1407 e->SetNum(entries - e->GetFirst());
1408 } else if (e->GetFirst() + e->GetNum() > entries) {
1409 Error("ValidateFiles",
1410 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1411 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1412 e->SetNum(entries - e->GetFirst());
1413 }
1414 }
1415
1416 // Count
1417 totent += entries;
1418 nopenf++;
1419
1420 // Notify the client
1421 n++;
1422 gProof->SendDataSetStatus(msg, n, tot, st);
1423
1424 } else {
1425
1426 Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
1427 //
1428 // Need to fix this with a user option to allow incomplete file sets (rdm)
1429 //
1430 //fValid = kFALSE; // all element must be readable!
1431 if (gProofServ) {
1433 m << TString(Form("Cannot get entries for file: %s - skipping",
1434 e->GetFileName()));
1436 }
1437
1438 // invalidate element
1439 e->Invalidate();
1441 }
1442 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1443
1444 // Ready for the next job, unless we have enough files
1445 if (maxent < 0 || ((totent < maxent) && !byfile))
1446 workers.Add(slave);
1447 }
1448
1449 // report std. output from slaves??
1450
1451 ((TProof*)gProof)->ActivateAsyncInput();
1452
1453 // This needs to be reset
1454 ((TProof*)gProof)->fCurrentMonitor = 0;
1455
1456 // No reason to continue if invalid
1457 if (!fValid)
1458 return;
1459
1460 // compute the offset for each file element
1461 Long64_t offset = 0;
1462 Long64_t newOffset = 0;
1463 TIter next(dset->GetListOfElements());
1464 TDSetElement *el;
1465 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1466 if (el->GetValid()) {
1467 newOffset = offset + el->GetTDSetOffset();
1468 el->SetTDSetOffset(offset);
1469 offset = newOffset;
1470 }
1471 }
1472}
1473
1474////////////////////////////////////////////////////////////////////////////////
1475/// The result depends on the fStrategy
1476
1478{
1479 Long64_t num;
1480 if (fStrategy == 0) {
1481 // TPacketizer's heuristic for starting packet size
1482 // Constant packet size;
1483 Int_t nslaves = fSlaveStats->GetSize();
1484 if (nslaves > 0) {
1485 num = fTotalEntries / (fPacketAsAFraction * nslaves);
1486 } else {
1487 num = 1;
1488 }
1489 } else {
1490 // The dynamic heuristic for setting the packet size (default)
1491 // Calculates the packet size based on performance of this slave
1492 // and estimated time left until the end of the query.
1493 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1494 Float_t rate = slstat->GetCurRate();
1495 if (!rate)
1496 rate = slstat->GetAvgRate();
1497 if (rate) {
1498
1499 // Global average rate
1501 Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1502
1503 // Bytes-to-Event conversion
1504 Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1505
1506 // Make sure it is not smaller then the cache, if the info is available and the size
1507 // synchronization is required. But apply the cache-packet size synchronization only if there
1508 // are enough left files to process and the files are all of similar sizes. Otherwise we risk
1509 // to not exploit optimally all potentially active workers.
1510 Bool_t cpsync = fCachePacketSync;
1511 if (fMaxEntriesRatio > 0. && cpsync) {
1514 Long64_t maxEntries = -1;
1515 if (fFilesToProcess->Last()) {
1517 if (elem) maxEntries = elem->GetNum();
1518 }
1519 if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1520 PDB(kPacketizer,3) {
1521 Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1522 Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1523 slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1524 (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1525 }
1526 cpsync = kFALSE;
1527 }
1528 }
1529 }
1530 if (bevt > 0. && cachesz > 0 && cpsync) {
1531 if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1532 packetTime = cachesz / bevt / rate;
1533 }
1534
1535 // Apply min-max again, if required
1536 if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1537 if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1538
1539 // Translate the packet length in number of entries
1540 num = (Long64_t)(rate * packetTime);
1541
1542 // Notify
1543 PDB(kPacketizer,2)
1544 Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1545 slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1546 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1547
1548 } else {
1549 // First packet for this worker in this query
1550 // Twice the learning phase
1551 num = (learnent > 0) ? 5 * learnent : 1000;
1552
1553 // Notify
1554 PDB(kPacketizer,2)
1555 Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1556 }
1557 }
1558 if (num < 1) num = 1;
1559 return num;
1560}
1561
1562////////////////////////////////////////////////////////////////////////////////
1563/// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
1564/// message (when the worker was asked to stop processing during a packet).
1565/// returns the #entries intended in the last packet - #processed entries
1566
1568 TProofProgressStatus *status,
1569 Double_t latency,
1570 TList **listOfMissingFiles)
1571{
1572 // find slave
1573 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1574 if (!slstat) {
1575 Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!",
1576 (sl ? sl->GetOrdinal() : "x.x"),
1577 (sl ? sl->GetName() : "**undef**"));
1578 return -1;
1579 }
1580
1581 // update stats & free old element
1582
1583 if ( slstat->fCurElem != 0 ) {
1584 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1585 // Calculate the number of events processed in the last packet
1586 Long64_t numev;
1587 if (status && status->GetEntries() > 0)
1588 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1589 else
1590 numev = 0;
1591
1592 // Calculate the progress made in the last packet
1593 TProofProgressStatus *progress = 0;
1594 if (numev > 0) {
1595 // This also moves the pointer in the corrsponding TFileInfo
1596 progress = slstat->AddProcessed(status);
1597 if (progress) {
1598 (*fProgressStatus) += *progress;
1599 // update processing rate
1600 slstat->UpdateRates(status);
1601 }
1602 } else {
1603 progress = new TProofProgressStatus();
1604 }
1605 if (progress) {
1606 PDB(kPacketizer,2)
1607 Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1608 sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1609 progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1610
1611 if (gPerfStats)
1612 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1613 slstat->fCurElem->GetFileName(),
1614 progress->GetEntries(),
1615 latency,
1616 progress->GetProcTime(),
1617 progress->GetCPUTime(),
1618 progress->GetBytesRead());
1619 delete progress;
1620 }
1621 if (numev != expectedNumEv) {
1622 // The last packet was not fully processed
1623 // and will be split in two:
1624 // - The completed part was marked as done.
1625 // - Create a new packet with the part to be resubmitted.
1626 TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
1627 if (newPacket && numev < expectedNumEv) {
1628 Long64_t first = newPacket->GetFirst();
1629 newPacket->SetFirst(first + numev);
1630 if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1631 SafeDelete(newPacket);
1632 } else
1633 Error("AddProcessed", "%s: processed too much? (%lld, %lld)",
1634 sl->GetOrdinal(), numev, expectedNumEv);
1635
1636 // TODO: a signal handler which will send info from the worker
1637 // after a packet fails.
1638 /* Add it to the failed packets list.
1639 if (!fFailedPackets) {
1640 fFailedPackets = new TList();
1641 }
1642 fFailedPackets->Add(slstat->fCurElem);
1643 */
1644 }
1645
1646 slstat->fCurElem = 0;
1647 return (expectedNumEv - numev);
1648 } else {
1649 // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
1650 // as the reply to kPROOF_GETNEXTPACKET
1651 return -1;
1652 }
1653}
1654
1655////////////////////////////////////////////////////////////////////////////////
1656/// Get next packet;
1657/// A meaningfull difference to TPacketizer is the fact that this
1658/// packetizer, for each worker, tries to predict whether the worker
1659/// will finish processing it's local files before the end of the query.
1660/// If yes, it allocates, to those workers, files from non-slave filenodes
1661/// or from slaves that are overloaded. The check is done every time a new
1662/// file needs to be assigned.
1663
1665{
1666 if ( !fValid ) {
1667 return 0;
1668 }
1669
1670 // find slave
1671
1672 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1673 if (!slstat) {
1674 Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
1675 (sl ? sl->GetName() : "**undef**"));
1676 return 0;
1677 }
1678
1679 // Attach to current file
1680 TFileStat *file = slstat->fCurFile;
1681
1682 // Update stats & free old element
1683
1684 Bool_t firstPacket = kFALSE;
1685 Long64_t cachesz = -1;
1686 Int_t learnent = -1;
1687 if ( slstat->fCurElem != 0 ) {
1688
1689 Long64_t restEntries = 0;
1690 Double_t latency, proctime, proccpu;
1691 TProofProgressStatus *status = 0;
1692 Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1693
1694 if (sl->GetProtocol() > 18) {
1695
1696 (*r) >> latency;
1697 (*r) >> status;
1698
1699 if (sl->GetProtocol() > 25) {
1700 (*r) >> cachesz >> learnent;
1701 if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1702 }
1703 fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1704 fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1705
1706 } else {
1707
1708 Long64_t bytesRead = -1;
1709
1710 (*r) >> latency >> proctime >> proccpu;
1711 // only read new info if available
1712 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1713 if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1714 Long64_t totev = 0;
1715 if (r->BufferSize() > r->Length()) (*r) >> totev;
1716
1717 status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1718 fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1719 }
1720
1721 if (!fileNotOpen && !fileCorrupted) {
1722 if (AddProcessed(sl, status, latency) != 0)
1723 Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal());
1726 Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)",
1728 // Send last timer message and stop the timer
1729 HandleTimer(0);
1731 }
1732 } else {
1733 if (file) {
1734 if (file->GetElement()) {
1735 if (fileCorrupted) {
1736 Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)",
1737 sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1738 Int_t nunproc = AddProcessed(sl, status, latency);
1739 PDB(kPacketizer,1)
1740 Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1741 // Remaining to be processed
1742 Long64_t num = 0;
1743 if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1744 // Add the remainign entries in the packet to the ones already registered
1745 num = file->GetElement()->GetEntries() + restEntries;
1746 } else {
1747 // First call: add the remaining entries in the packet and those of the file
1748 // not yet assigned
1749 Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1750 num = restEntries + rest;
1751 }
1752 file->GetElement()->SetEntries(num);
1753 PDB(kPacketizer,1)
1754 Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1755 file->GetElement()->GetName(), file->GetElement()->GetEntries());
1756 // Flag as corrupted
1757 file->GetElement()->SetBit(TDSetElement::kCorrupted);
1758 } else {
1759 Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element",
1760 sl->GetOrdinal(), file->GetElement()->GetName());
1761 }
1762 // Invalidate the element
1763 file->GetElement()->Invalidate();
1764 // Add it to the failed packets list
1765 if (!fFailedPackets) fFailedPackets = new TList();
1766 if (!fFailedPackets->FindObject(file->GetElement()))
1767 fFailedPackets->Add(file->GetElement());
1768 }
1769 // Deactivate this TFileStat
1770 file->SetDone();
1772 } else {
1773 Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:"
1774 " protocol error?", sl->GetOrdinal());
1775 }
1776 }
1777 } else {
1778 firstPacket = kTRUE;
1779 }
1780
1781 if ( fStop ) {
1782 HandleTimer(0);
1783 return 0;
1784 }
1785
1786 TString nodeName;
1787 if (file != 0) nodeName = file->GetNode()->GetName();
1788 TString nodeHostName(slstat->GetName());
1789
1790 PDB(kPacketizer,3)
1791 Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1792 sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1793
1794 // If current file is just finished
1795 if ( file != 0 && file->IsDone() ) {
1796 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1797 file->GetNode()->DecRunSlaveCnt();
1798 if (gPerfStats)
1799 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1800 file->GetElement()->GetFileName(), kFALSE);
1801 file = 0;
1802 }
1803 // Reset the current file field
1804 slstat->fCurFile = file;
1805
1806 Long64_t avgEventsLeftPerSlave =
1809 return 0;
1810 // Get a file if needed
1811 if ( file == 0) {
1812 // Needs a new file
1813 Bool_t openLocal;
1814 // Aiming for localPreference == 1 when #local == #remote events left
1815 Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1817 if ( slstat->GetFileNode() != 0 ) {
1818 // Local file node exists and has more events to process.
1819 fUnAllocated->Sort();
1820 TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1821 Bool_t nonLocalNodePossible;
1822 if (fForceLocal)
1823 nonLocalNodePossible = 0;
1824 else
1825 nonLocalNodePossible = firstNonLocalNode ?
1826 (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1827 : 0;
1828 openLocal = !nonLocalNodePossible;
1829 Float_t slaveRate = slstat->GetAvgRate();
1830 if ( nonLocalNodePossible && fStrategy == 1) {
1831 // OpenLocal is set to kFALSE
1832 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1833 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1834 // External slaves help slstat -> don't open nonlocal files
1835 // -1 because, at this point slstat is not running
1836 openLocal = kTRUE;
1837 else if ( slaveRate == 0 ) { // first file for this slave
1838 // GetLocalEventsLeft() counts the potential slave
1839 // as running on its fileNode.
1840 if ( slstat->GetLocalEventsLeft() * localPreference
1841 > (avgEventsLeftPerSlave))
1842 openLocal = kTRUE;
1843 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1844 < slstat->GetLocalEventsLeft() * localPreference )
1845 openLocal = kTRUE;
1846 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1847 openLocal = kTRUE;
1848 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1849 openLocal = kTRUE;
1850 } else {
1851 // At this point slstat has a non zero avg rate > 0
1852 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1853 // And thus fCumProcTime, fProcessed > 0
1854 Float_t avgTime = avgEventsLeftPerSlave
1856 if (slaveTime * localPreference > avgTime)
1857 openLocal = kTRUE;
1858 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1859 < slstat->GetLocalEventsLeft() * localPreference)
1860 openLocal = kTRUE;
1861 }
1862 }
1863 if (openLocal || fStrategy == 0) {
1864 // Try its own node
1865 file = slstat->GetFileNode()->GetNextUnAlloc();
1866 if (!file)
1867 file = slstat->GetFileNode()->GetNextActive();
1868 if ( file == 0 ) {
1869 // No more files on this worker
1870 slstat->SetFileNode(0);
1871 }
1872 }
1873 }
1874
1875 // Try to find an unused filenode first
1876 if(file == 0 && !fForceLocal)
1877 file = GetNextUnAlloc(0, nodeHostName);
1878
1879 // Then look at the active filenodes
1880 if(file == 0 && !fForceLocal)
1881 file = GetNextActive();
1882
1883 if (file == 0) return 0;
1884
1885 PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1886
1887 slstat->fCurFile = file;
1888 // if remote and unallocated file
1889 if (file->GetNode()->GetMySlaveCnt() == 0 &&
1890 file->GetElement()->GetFirst() == file->GetNextEntry()) {
1891 fNEventsOnRemLoc -= file->GetElement()->GetNum();
1892 if (fNEventsOnRemLoc < 0) {
1893 Error("GetNextPacket",
1894 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1896 return 0;
1897 }
1898 }
1899 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1900 file->GetNode()->IncRunSlaveCnt();
1901 if (gPerfStats)
1902 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1903 file->GetNode()->GetName(),
1904 file->GetElement()->GetFileName(), kTRUE);
1905 }
1906
1907 Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1908
1909 // Get a packet
1910
1911 TDSetElement *base = file->GetElement();
1912 Long64_t first = file->GetNextEntry();
1913 Long64_t last = base->GetFirst() + base->GetNum();
1914
1915 // If the remaining part is smaller than the (packetsize * 1.5)
1916 // then increase the packetsize
1917
1918 if ( first + num * 1.5 >= last ) {
1919 num = last - first;
1920 file->SetDone(); // done
1921 // Delete file from active list (unalloc list is single pass, no delete needed)
1923 }
1924
1925 // Update NextEntry in the file object
1926 file->MoveNextEntry(num);
1927
1928 slstat->fCurElem = CreateNewPacket(base, first, num);
1929 if (base->GetEntryList())
1930 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1931
1932 // Flag the first packet of a new run (dataset)
1933 if (firstPacket)
1935 else
1937
1938 PDB(kPacketizer,2)
1939 Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1940
1941 return slstat->fCurElem;
1942}
1943
1944////////////////////////////////////////////////////////////////////////////////
1945/// Return the number of workers still processing
1946
1948{
1949 Int_t actw = 0;
1950 TIter nxw(fSlaveStats);
1951 TObject *key;
1952 while ((key = nxw())) {
1953 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1954 if (wrkstat && wrkstat->fCurFile) actw++;
1955 }
1956 // Done
1957 return actw;
1958}
1959
1960////////////////////////////////////////////////////////////////////////////////
1961/// Get Estimation of the current rate; just summing the current rates of
1962/// the active workers
1963
1965{
1966 all = kTRUE;
1967 // Loop over the workers
1968 Float_t currate = 0.;
1969 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1970 TIter nxw(fSlaveStats);
1971 TObject *key;
1972 while ((key = nxw()) != 0) {
1973 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1974 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1975 // Sum-up the current rates
1976 currate += slstat->GetProgressStatus()->GetCurrentRate();
1977 } else {
1978 all = kFALSE;
1979 }
1980 }
1981 }
1982 // Done
1983 return currate;
1984}
1985
1986////////////////////////////////////////////////////////////////////////////////
1987/// Get estimation for the number of processed entries and bytes read at time t,
1988/// based on the numbers already processed and the latests worker measured speeds.
1989/// If t <= 0 the current time is used.
1990/// Only the estimation for the entries is currently implemented.
1991/// This is needed to smooth the instantaneous rate plot.
1992
1994 Long64_t &bytes, Long64_t &calls)
1995{
1996 // Default value
1997 ent = GetEntriesProcessed();
1998 bytes = GetBytesRead();
1999 calls = GetReadCalls();
2000
2001 // Parse option
2002 if (fUseEstOpt == kEstOff)
2003 // Do not use estimation
2004 return 0;
2005 Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2006
2007 TTime tnow = gSystem->Now();
2008 Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2009 Double_t dt = -1;
2010
2011 // Loop over the workers
2012 Bool_t all = kTRUE;
2013 Float_t trate = 0.;
2014 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2015 ent = 0;
2016 TIter nxw(fSlaveStats);
2017 TObject *key;
2018 while ((key = nxw()) != 0) {
2019 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2020 if (slstat) {
2021 // Those surely processed
2022 Long64_t e = slstat->GetEntriesProcessed();
2023 if (e <= 0) all = kFALSE;
2024 // Time elapsed since last update
2025 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2026 // Add estimated entries processed since last update
2027 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2028 : slstat->GetAvgRate();
2029 trate += rate;
2030 // Add estimated entries processed since last update
2031 e += (Long64_t) (dt * rate);
2032 // Add to the total
2033 ent += e;
2034 // Notify
2035 PDB(kPacketizer,3)
2036 Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2037 slstat->fSlave->GetOrdinal(),
2038 slstat->GetEntriesProcessed(), rate, dt, e);
2039 }
2040 }
2041 }
2042 // Notify
2043 dt = now - fProgressStatus->GetLastUpdate();
2044 PDB(kPacketizer,2)
2045 Info("GetEstEntriesProcessed",
2046 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2047 dt, ent, GetEntriesProcessed(), bytes, trate, all);
2048
2049 // Check values
2050 ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2051 ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2052 bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2053
2054 // Done
2055 return ((all) ? 0 : 1);
2056}
2057
2058////////////////////////////////////////////////////////////////////////////////
2059/// This method can be called at any time during processing
2060/// as an effect of handling kPROOF_STOPPROCESS
2061/// If the output list from this worker is going to be sent back to the master,
2062/// the 'status' includes the number of entries processed by the slave.
2063/// From this we calculate the remaining part of the packet.
2064/// 0 indicates that the results from that worker were lost completely.
2065/// Assume that the filenodes for which we have a TFileNode object
2066/// are still up and running.
2067
2069 TList **listOfMissingFiles)
2070{
2071 TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2072 if (!slaveStat) {
2073 Error("MarkBad", "Worker does not exist");
2074 return;
2075 }
2076 // Update worker counters
2077 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2078 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2079 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2080 }
2081
2082 // If status is defined, the remaining part of the last packet is
2083 // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
2084 if (!status) {
2085 // Get the subset processed by the bad worker.
2086 TList *subSet = slaveStat->GetProcessedSubSet();
2087 if (subSet) {
2088 // Take care of the current packet
2089 if (slaveStat->fCurElem) {
2090 subSet->Add(slaveStat->fCurElem);
2091 }
2092 // Merge overlapping or subsequent elements
2093 Int_t nmg = 0, ntries = 100;
2094 TDSetElement *e = 0, *enxt = 0;
2095 do {
2096 nmg = 0;
2097 e = (TDSetElement *) subSet->First();
2098 while ((enxt = (TDSetElement *) subSet->After(e))) {
2099 if (e->MergeElement(enxt) >= 0) {
2100 nmg++;
2101 subSet->Remove(enxt);
2102 delete enxt;
2103 } else {
2104 e = enxt;
2105 }
2106 }
2107 } while (nmg > 0 && --ntries > 0);
2108 // reassign the packets assigned to the bad slave and save the size;
2109 SplitPerHost(subSet, listOfMissingFiles);
2110 // the elements were reassigned so should not be deleted
2111 subSet->SetOwner(0);
2112 } else {
2113 Warning("MarkBad", "subset processed by bad worker not found!");
2114 }
2115 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2116 }
2117 // remove slavestat from the map
2118 fSlaveStats->Remove(s);
2119 delete slaveStat;
2120 // recalculate fNEventsOnRemLoc and others
2121 InitStats();
2122}
2123
2124////////////////////////////////////////////////////////////////////////////////
2125/// The file in the listOfMissingFiles can appear several times;
2126/// in order to fix that, a TDSetElement::Merge method is needed.
2127
2129 TList **listOfMissingFiles)
2130{
2131 if (!e) {
2132 Error("ReassignPacket", "empty packet!");
2133 return -1;
2134 }
2135 // Check the old filenode
2136 TUrl url = e->GetFileName();
2137 // Check the host from which 'e' was previously read.
2138 // Map non URL filenames to dummy host
2139 TString host;
2140 if (!url.IsValid() || strncmp(url.GetProtocol(),"root", 4)) {
2141 host = "no-host";
2142 } else {
2143 host = url.GetHost();
2144 }
2145
2146 // If accessible add it back to the old node
2147 // and do DecProcessed
2148 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2149 if (node && fTryReassign) {
2150 // The packet 'e' was processing data from this node.
2151 node->DecreaseProcessed(e->GetNum());
2152 // The file should be already in fFilesToProcess ...
2153 node->Add(e, kFALSE);
2154 if (!fUnAllocated->FindObject(node))
2155 fUnAllocated->Add(node);
2156 return 0;
2157 } else {
2158 // Add to the list of missing files
2159 TFileInfo *fi = e->GetFileInfo();
2160 if (listOfMissingFiles && *listOfMissingFiles)
2161 (*listOfMissingFiles)->Add((TObject *)fi);
2162 return -1;
2163 }
2164}
2165
2166////////////////////////////////////////////////////////////////////////////////
2167/// Split into per host entries
2168/// The files in the listOfMissingFiles can appear several times;
2169/// in order to fix that, a TDSetElement::Merge method is needed.
2170
2172 TList **listOfMissingFiles)
2173{
2174 if (!elements) {
2175 Error("SplitPerHost", "Empty list of packets!");
2176 return;
2177 }
2178 if (elements->GetSize() <= 0) {
2179 Error("SplitPerHost", "The input list contains no elements");
2180 return;
2181 }
2182 TIter subSetIter(elements);
2183 TDSetElement *e;
2184 while ((e = (TDSetElement*) subSetIter.Next())) {
2185 if (ReassignPacket(e, listOfMissingFiles) == -1) {
2186 // Remove from the list in order to delete it.
2187 if (elements->Remove(e))
2188 Error("SplitPerHost", "Error removing a missing file");
2189 delete e;
2190 }
2191
2192 }
2193}
@ kPROOF_FATAL
@ kPROOF_GETENTRIES
@ kPROOF_MESSAGE
ROOT::R::TRInterface & r
Definition Object.C:4
#define SafeDelete(p)
Definition RConfig.hxx:537
#define f(i)
Definition RSha256.hxx:104
#define e(i)
Definition RSha256.hxx:103
const Bool_t kFALSE
Definition RtypesCore.h:101
long Long_t
Definition RtypesCore.h:54
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
float Float_t
Definition RtypesCore.h:57
const Bool_t kTRUE
Definition RtypesCore.h:100
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:364
R__EXTERN TEnv * gEnv
Definition TEnv.h:170
#define R__ASSERT(e)
Definition TError.h:118
char name[80]
Definition TGX11.cxx:110
#define PDB(mask, level)
Definition TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition TProofServ.h:347
R__EXTERN TProof * gProof
Definition TProof.h:1077
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
#define gPerfStats
Int_t BufferSize() const
Definition TBuffer.h:98
Int_t Length() const
Definition TBuffer.h:100
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Definition TDSet.h:66
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition TDSet.cxx:429
const char * GetObjName() const
Definition TDSet.h:120
Long64_t GetNum() const
Definition TDSet.h:114
@ kCorrupted
Definition TDSet.h:74
TObject * GetEntryList() const
Definition TDSet.h:131
void SetFirst(Long64_t first)
Definition TDSet.h:113
void Invalidate()
Definition TDSet.h:134
void SetTDSetOffset(Long64_t offset)
Definition TDSet.h:129
void SetNum(Long64_t num)
Definition TDSet.h:118
const char * GetDirectory() const
Return directory where to look for object.
Definition TDSet.cxx:253
void SetValid()
Definition TDSet.h:135
void SetEntryList(TObject *aList, Long64_t first=-1, Long64_t num=-1)
Set entry (or event) list for this element.
Definition TDSet.cxx:599
Long64_t GetTDSetOffset() const
Definition TDSet.h:128
Bool_t GetValid() const
Definition TDSet.h:119
const char * GetFileName() const
Definition TDSet.h:111
Long64_t GetFirst() const
Definition TDSet.h:112
This class implements a data set to be used for PROOF processing.
Definition TDSet.h:153
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition TDSet.cxx:413
virtual void Reset()
Reset or initialize access to the elements.
Definition TDSet.cxx:1369
Bool_t IsTree() const
Definition TDSet.h:225
const char * GetType() const
Definition TDSet.h:228
TList * GetListOfElements() const
Definition TDSet.h:231
@ kSomeInvalid
Definition TDSet.h:161
A List of entry numbers in a TTree or TChain.
Definition TEntryList.h:26
virtual Long64_t GetN() const
Definition TEntryList.h:77
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition TEventList.h:31
virtual Int_t GetN() const
Definition TEventList.h:56
Class describing a generic file including meta information.
Definition TFileInfo.h:39
TObject * Next()
void Reset()
A doubly linked list.
Definition TList.h:38
virtual void Add(TObject *obj)
Definition TList.h:81
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition TList.cxx:330
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition TList.cxx:822
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition TList.cxx:578
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Definition TList.cxx:357
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition TList.cxx:693
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition TList.cxx:659
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition TList.cxx:402
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition TList.cxx:937
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition TMap.cxx:54
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition TMap.cxx:151
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition TMap.cxx:236
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
Definition TMap.cxx:296
UInt_t What() const
Definition TMessage.h:75
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition TMonitor.cxx:250
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
virtual void DeActivateAll()
De-activate all activated sockets.
Definition TMonitor.cxx:302
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition TMonitor.cxx:498
virtual const char * GetTitle() const
Returns title of object.
Definition TNamed.h:48
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
An array of TObjects.
Definition TObjArray.h:31
Collectable string class.
Definition TObjString.h:28
const char * GetName() const
Returns name of object.
Definition TObjString.h:38
const TString & GetString() const
Definition TObjString.h:46
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:429
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:200
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition TObject.cxx:949
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:766
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:963
void ResetBit(UInt_t f)
Definition TObject.h:200
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:937
void DecreaseProcessed(Long64_t nEvents)
void DecExtSlaveCnt(const char *slave)
Int_t Compare(const TObject *other) const
Compare abstract method.
void IncExtSlaveCnt(const char *slave)
TFileNode(const char *name, Int_t strategy, TSortedList *files)
const char * GetName() const
Returns name of object.
void Add(TDSetElement *elem, Bool_t tolist)
void Print(Option_t *=0) const
This method must be overridden when a class wants to print itself.
Int_t Compare(const TObject *obj) const
Compare abstract method.
TFileStat(TFileNode *node, TDSetElement *elem, TList *file)
void Print(Option_t *=0) const
This method must be overridden when a class wants to print itself.
TSlaveStat(TSlave *slave)
Constructor.
TProofProgressStatus * GetProgressStatus()
void UpdateRates(TProofProgressStatus *st)
Update packetizer rates.
TProofProgressStatus * AddProcessed(TProofProgressStatus *st=0)
Add the current element to the fDSubSet (subset processed by this worker) and if the status arg is gi...
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual ~TPacketizerAdaptive()
Destructor.
TFileNode * NextNode()
Get next node which has unallocated files.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetActiveWorkers()
Return the number of workers still processing.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Named parameter, streamable and storable.
Definition TParameter.h:35
Container class for processing statistics.
Double_t GetProcTime() const
Double_t GetLastUpdate() const
Long64_t GetEntries() const
Double_t GetCurrentRate() const
Get current rate. Rteunr the average rate if the current is not defined.
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition TProofServ.h:257
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition TProof.h:316
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition TProof.cxx:9918
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition TProof.cxx:9336
Class describing a PROOF worker server.
Definition TSlave.h:46
TSocket * GetSocket() const
Definition TSlave.h:134
Int_t GetProtocol() const
Definition TSlave.h:133
const char * GetName() const
Returns name of object.
Definition TSlave.h:124
const char * GetOrdinal() const
Definition TSlave.h:131
Int_t GetPerfIdx() const
Definition TSlave.h:132
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
A sorted doubly linked list.
Definition TSortedList.h:28
Basic string class.
Definition TString.h:136
Ssiz_t Length() const
Definition TString.h:410
const char * Data() const
Definition TString.h:369
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
Definition TString.cxx:2222
Bool_t IsNull() const
Definition TString.h:407
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2314
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:624
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
Definition TSystem.cxx:466
virtual const char * HostName()
Return the system's host name.
Definition TSystem.cxx:306
Basic time type with millisecond precision.
Definition TTime.h:27
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
Definition TUrl.cxx:389
const char * GetFile() const
Definition TUrl.h:69
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition TUrl.cxx:522
Bool_t IsValid() const
Definition TUrl.h:79
const char * GetHost() const
Definition TUrl.h:67
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition TUrl.cxx:471
void SetHost(const char *host)
Definition TUrl.h:84
const char * GetProtocol() const
Definition TUrl.h:64
const char * GetName() const
Returns name of object.
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
Long64_t GetEntriesProcessed() const
Double_t GetCumProcTime() const
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
const Int_t n
Definition legend1.C:16
Definition file.py:1
Definition first.py:1
static const char * what
Definition stlLoader.cc:6
auto * m
Definition textangle.C:8