75 class TPacketizerAdaptive::TFileStat :
public TObject {
86 Bool_t IsDone()
const {
return fIsDone;}
88 void SetDone() {fIsDone =
kTRUE;}
89 TFileNode *GetNode()
const {
return fNode;}
91 Long64_t GetNextEntry()
const {
return fNextEntry;}
92 void MoveNextEntry(
Long64_t step) {fNextEntry += step;}
99 const TFileStat *fst =
dynamic_cast<const TFileStat*
>(obj);
100 if (fst && GetElement() && fst->GetElement()) {
101 Long64_t ent = GetElement()->GetNum();
102 Long64_t entfst = fst->GetElement()->GetNum();
103 if (ent > 0 && entfst > 0) {
106 }
else if (ent < entfst) {
118 Printf(
"TFileStat: %s %lld", fElement ? fElement->
GetName() :
"---",
119 fElement ? fElement->
GetNum() : -1);
123 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node,
TDSetElement *elem,
TList *files)
124 : fIsDone(
kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
127 if (files) files->
Add(
this);
133 class TPacketizerAdaptive::TFileNode :
public TObject {
156 ~TFileNode() {
delete fFiles;
delete fActFiles; }
158 void IncMySlaveCnt() { fMySlaveCnt++; }
159 Int_t GetMySlaveCnt()
const {
return fMySlaveCnt; }
160 void IncExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt++; }
161 void DecExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt--;
R__ASSERT(fExtSlaveCnt >= 0); }
162 Int_t GetSlaveCnt()
const {
return fMySlaveCnt + fExtSlaveCnt; }
163 void IncRunSlaveCnt() { fRunSlaveCnt++; }
164 void DecRunSlaveCnt() { fRunSlaveCnt--;
R__ASSERT(fRunSlaveCnt >= 0); }
165 Int_t GetRunSlaveCnt()
const {
return fRunSlaveCnt; }
166 Int_t GetExtSlaveCnt()
const {
return fExtSlaveCnt; }
167 Int_t GetNumberOfActiveFiles()
const {
return fActFiles->
GetSize(); }
171 { fProcessed += nEvents; }
172 Long64_t GetProcessed()
const {
return fProcessed; }
173 void DecreaseProcessed(
Long64_t nEvents) { fProcessed -= nEvents; }
176 Long64_t GetEventsLeftPerSlave()
const 177 {
return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178 void IncEvents(
Long64_t nEvents) { fEvents += nEvents; }
179 const char *
GetName()
const {
return fNodeName.
Data(); }
180 Long64_t GetNEvents()
const {
return fEvents; }
187 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188 Printf(
"+++ TFileNode: %s +++", fNodeName.
Data());
189 Printf(
"+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190 Printf(
"+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
192 if (fFiles && fFiles->
GetSize() > 0) {
194 while ((fs = (TFileStat *) nxf())) {
195 if ((e = fs->GetElement())) {
196 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->
GetName(),
199 Printf(
"+++ #%d: no element! ", ++nn);
203 Printf(
"+++ Active files: %d ", fActFiles ? fActFiles->
GetSize() : 0);
204 if (fActFiles && fActFiles->
GetSize() > 0) {
205 TIter nxaf(fActFiles);
206 while ((fs = (TFileStat *) nxaf())) {
207 if ((e = fs->GetElement())) {
208 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->
GetName(),
211 Printf(
"+++ #%d: no element! ", ++nn);
215 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
221 TFileStat *f =
new TFileStat(
this, elem, files);
223 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->
First();
228 TObject *next = fUnAllocFileNext;
232 fActFiles->
Add(next);
233 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
236 fUnAllocFileNext = fFiles->
After(fUnAllocFileNext);
238 return (TFileStat *) next;
245 if (fActFileNext != 0) {
246 fActFileNext = fActFiles->
After(fActFileNext);
247 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
250 return (TFileStat *) next;
255 if (fActFileNext == file) fActFileNext = fActFiles->
After(file);
257 if (fFilesToProcess) fFilesToProcess->
Remove(file);
258 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
269 const TFileNode *obj =
dynamic_cast<const TFileNode*
>(other);
271 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
277 if (fStrategy == 1) {
279 Int_t myVal = GetRunSlaveCnt();
280 Int_t otherVal = obj->GetRunSlaveCnt();
281 if (myVal < otherVal) {
283 }
else if (myVal > otherVal) {
287 if ((fEvents - fProcessed) >
288 (obj->GetNEvents() - obj->GetProcessed())) {
295 Int_t myVal = GetSlaveCnt();
296 Int_t otherVal = obj->GetSlaveCnt();
297 if (myVal < otherVal) {
299 }
else if (myVal > otherVal) {
309 fUnAllocFileNext = fFiles->
First();
320 : fNodeName(name), fFiles(new
TList), fUnAllocFileNext(0),
321 fActFiles(new
TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323 fStrategy(strategy), fFilesToProcess(files)
338 TFileNode *fFileNode;
346 TSlaveStat(
TSlave *slave);
348 TFileNode *GetFileNode()
const {
return fFileNode; }
351 TFileStat *GetCurFile() {
return fCurFile; }
352 void SetFileNode(TFileNode *node) { fFileNode = node; }
354 Float_t GetAvgRate() {
return fStatus->GetRate(); }
356 return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
357 Int_t GetLocalEventsLeft() {
358 return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
359 TList *GetProcessedSubSet() {
return fDSubSet; }
367 TPacketizerAdaptive::TSlaveStat::TSlaveStat(
TSlave *slave)
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
371 fDSubSet =
new TList();
378 if (strcmp(slave->
ClassName(),
"TSlaveLite")) {
381 if (fWrkFQDN.Contains(
"localhost") || fWrkFQDN ==
"127.0.0.1")
385 Info("TSlaveStat", "wrk FQDN: %
s", fWrkFQDN.Data());
403 Error(
"UpdateRates",
"no status object!");
406 if (fCurFile->IsDone()) {
426 if (st && fDSubSet && fCurElem) {
429 fDSubSet->
Add(fCurElem);
433 Error(
"AddProcessed",
"processed subset of current elem undefined");
450 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
451 "enter (first %lld, num %lld)", first, num);
469 Error(
"TPacketizerAdaptive",
"No progress status");
477 cpsync =
gEnv->
GetValue(
"Packetizer.CachePacketSync", 1);
497 Info(
"TPacketizerAdaptive",
"using the basic strategy of TPacketizer");
498 }
else if (strategy != 1) {
499 Warning(
"TPacketizerAdaptive",
"unsupported strategy index (%d): ignore", strategy);
504 if (maxSlaveCnt < 0) {
505 Info(
"TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
514 Info(
"TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
518 maxSlaveCnt = (
Long_t) mxslcnt;
523 maxSlaveCnt =
gEnv->
GetValue(
"Packetizer.MaxWorkersPerNode", 0);
524 if (maxSlaveCnt > 0) {
526 Info(
"TPacketizerAdaptive",
"Setting max number of workers per node to %ld",
536 Int_t forceLocal = 0;
541 Info(
"TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
551 Int_t packetAsAFraction = 0;
553 if (packetAsAFraction > 0) {
555 Info(
"TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
559 Info(
"TPacketizerAdaptive",
"packetAsAFraction parameter must be higher than 0");
564 Int_t tryReassign = 0;
566 tryReassign =
gEnv->
GetValue(
"Packetizer.TryReassign", 0);
569 Info(
"TPacketizerAdaptive",
"failed packets will be re-assigned");
601 partitionsStr =
gEnv->
GetValue(
"Packetizer.Partitions",
"");
602 if (!partitionsStr.
IsNull()) {
603 Info(
"TPacketizerAdaptive",
"Partitions: %s", partitionsStr.
Data());
604 partitions = partitionsStr.
Tokenize(
",");
636 if (host.Contains(
"localhost") || host ==
"127.0.0.1") {
644 TIter iString(partitions);
663 node =
new TFileNode(nodeStr, fStrategy, fFilesToProcess);
666 Info(
"TPacketizerAdaptive",
"creating new node '%s' or the element", nodeStr.
Data());
669 Info(
"TPacketizerAdaptive",
"adding element to existing node '%s'", nodeStr.
Data());
689 Int_t validateMode = 0;
694 Info(
"TPacketizerAdaptive",
695 "processing subset of entries: validating by file? %s", byfile ?
"yes":
"no");
710 Info(
"TPacketizerAdaptive",
711 "processing range: first %lld, num %lld", first, num);
725 Info(
"TPacketizerAdaptive",
"processing element '%s'", e->
GetFileName());
727 Info(
"TPacketizerAdaptive",
728 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->
GetEntryList());
732 if (cur + eNum < first) {
735 Info(
"TPacketizerAdaptive",
" --> skip element cur %lld", cur);
740 if (num != -1 && (first+num <= cur)) {
743 Info(
"TPacketizerAdaptive",
" --> drop element cur %lld", cur);
748 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
753 e->
SetFirst( eFirst + (first - cur) );
756 Info(
"TPacketizerAdaptive",
" --> adjust start %lld and end %lld",
757 eFirst + (first - cur), first + num - cur);
760 if (num != -1 && (first+num <= cur+eNum)) {
765 Info(
"TPacketizerAdaptive",
" --> adjust end %lld", first + num - cur);
772 Info(
"TPacketizerAdaptive",
" --> increment 'cur' by %lld", eNum);
786 Info(
"TPacketizerAdaptive",
" --> entry-list element: %lld entries", eNum);
789 eNum = evl ? evl->
GetN() : eNum;
791 Info(
"TPacketizerAdaptive",
" --> event-list element: %lld entries (evl:%p)", eNum, evl);
795 Info(
"TPacketizerAdaptive",
" --> empty entry- or event-list element!");
800 Info(
"TPacketizerAdaptive",
" --> next cur %lld", cur);
816 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
824 TIter iString(partitions);
844 node =
new TFileNode(nodeStr, fStrategy, fFilesToProcess);
847 Info(
"TPacketizerAdaptive",
" --> creating new node '%s' for element", nodeStr.
Data());
850 Info(
"TPacketizerAdaptive",
" --> adding element to exiting node '%s'", nodeStr.
Data());
856 node->IncEvents(eNum);
860 Info(
"TPacketizerAdaptive",
"processing %lld entries in %d files on %d hosts",
874 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
"return");
901 Int_t noRemoteFiles = 0;
903 Int_t totalNumberOfFiles = 0;
905 while (TFileNode *fn = (TFileNode*)next()) {
906 totalNumberOfFiles += fn->GetNumberOfFiles();
907 if (fn->GetMySlaveCnt() == 0) {
908 noRemoteFiles += fn->GetNumberOfFiles();
913 if (totalNumberOfFiles == 0) {
914 Info(
"InitStats",
"no valid or non-empty file found: setting invalid");
927 PDB(kPacketizer,1)
Info(
"InitStats",
"return");
941 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->GetName());
942 file = node->GetNextUnAlloc();
945 if (nodeHostName && strlen(nodeHostName) > 0) {
956 TUrl uu(fn->GetName());
958 Info(
"GetNextUnAlloc",
"comparing %s with %s...", nodeHostName, uu.GetHost());
961 if (!strcmp(nodeHostName, uu.GetHost())) {
965 if ((file = node->GetNextUnAlloc()) == 0) {
970 Info(
"GetNextUnAlloc",
"found! (host: %s)", uu.GetHost());
975 Warning(
"GetNextUnAlloc",
"unallocate entry %d is empty!", i);
982 Info(
"GetNextUnAlloc",
"reached Workers-per-Node Limit (%ld)",
fMaxSlaveCnt);
988 while (file == 0 && ((node =
NextNode()) != 0)) {
990 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->GetName());
1003 PDB(kPacketizer, 2) {
1005 Info(
"GetNextUnAlloc",
"no file found!");
1021 PDB(kPacketizer,2) {
1050 TFileStat *file = 0;
1053 file = node->GetNextActive();
1067 PDB(kPacketizer,2) {
1068 Info(
"NextActiveNode",
"enter");
1076 Info(
"NextActiveNode",
"reached Workers-per-Node limit (%ld)",
fMaxSlaveCnt);
1088 TFileNode *node = file->GetNode();
1090 node->RemoveActive(file);
1114 while ((fn = (TFileNode*) files.
Next()) != 0) {
1120 while ((key = slaves.
Next()) != 0) {
1123 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
1128 TFileNode *fnmin = 0;
1131 while ((fn = (TFileNode*) files.
Next()) != 0) {
1132 if (!strcmp(slstat->GetName(),
TUrl(fn->GetName()).GetHost())) {
1133 if (fn->GetMySlaveCnt() < fncnt) {
1135 fncnt = fn->GetMySlaveCnt();
1140 slstat->SetFileNode(fnmin);
1141 fnmin->IncMySlaveCnt();
1143 Info(
"Reset",
"assigning node '%s' to '%s' (cnt: %d)",
1144 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1146 slstat->fCurFile = 0;
1157 TMap slaves_by_sock;
1169 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
1186 TString msg(
"Validating files");
1203 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL",
s->GetName());
1207 TFileNode *node = 0;
1208 TFileStat *file = 0;
1211 if ((node = slstat->GetFileNode()) != 0) {
1212 PDB(kPacketizer,3) node->Print();
1215 slstat->SetFileNode(0);
1226 slstat->fCurFile = file;
1229 if (entries < 0 || strlen(elem->
GetTitle()) <= 0) {
1231 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1238 s->GetSocket()->Send( m );
1241 Info(
"ValidateFiles",
1242 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1243 s->GetOrdinal(),
s->GetName(),
s->GetSocket(),
1254 Error(
"ValidateFiles",
1255 "first (%lld) higher then number of entries (%lld) in %s",
1258 slstat->fCurFile->SetDone();
1262 if (elem->
GetNum() == -1) {
1265 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of" 1271 Info(
"ValidateFiles",
1272 "found elem '%s' with %lld entries", elem->
GetFileName(), entries);
1290 if (byfile && maxent > 0) {
1292 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1293 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1296 Info(
"ValidateFiles",
"{%lld, %lld, %lld}: needs to validate %lld more files",
1297 maxent, totent, nopenf, nrestf);
1299 while ((slm = (
TSlave *) si.
Next()) && nrestf--) {
1305 Info(
"ValidateFiles",
"no need to validate more files");
1313 PDB(kPacketizer,3) {
1314 Info(
"ValidateFiles",
"waiting for %d slaves:", mon.
GetActive());
1320 Info(
"ValidateFiles",
" worker-%s (%s)",
1329 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
1336 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
1341 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
1343 ((
TProof*)
gProof)->MarkBad(slave,
"socket got invalid during validation");
1350 if (sock->
Recv(reply) <= 0) {
1352 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
1355 ((
TProof*)
gProof)->MarkBad(slave,
"receive failed during validation");
1365 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1378 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1381 (*reply) >> entries;
1386 (*reply) >> objname;
1398 Error(
"ValidateFiles",
1399 "first (%lld) higher then number of entries (%lld) in %s",
1403 slavestat->fCurFile->SetDone();
1411 Error(
"ValidateFiles",
1412 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1428 Error(
"ValidateFiles",
"cannot get entries for file: %s - skipping", e->
GetFileName() );
1435 m <<
TString(
Form(
"Cannot get entries for file: %s - skipping",
1444 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated", totent);
1447 if (maxent < 0 || ((totent < maxent) && !byfile))
1467 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1468 if (el->GetValid()) {
1470 el->SetTDSetOffset(offset);
1482 if (fStrategy == 0) {
1495 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1496 Float_t rate = slstat->GetCurRate();
1498 rate = slstat->GetAvgRate();
1517 if (fFilesToProcess->
Last()) {
1519 if (elem) maxEntries = elem->
GetNum();
1522 PDB(kPacketizer,3) {
1523 Info(
"CalculatePacketSize",
"%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1524 Info(
"CalculatePacketSize",
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1525 slstat->GetOrdinal(), fFilesToProcess->
GetSize(),
1532 if (bevt > 0. && cachesz > 0 && cpsync) {
1533 if ((
Long64_t)(rate * packetTime * bevt) < cachesz)
1534 packetTime = cachesz / bevt / rate;
1542 num = (
Long64_t)(rate * packetTime);
1546 Info(
"CalculatePacketSize",
"%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1548 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1553 num = (learnent > 0) ? 5 * learnent : 1000;
1557 Info(
"CalculatePacketSize",
"%s: num: %lld", slstat->GetOrdinal(), num);
1560 if (num < 1) num = 1;
1572 TList **listOfMissingFiles)
1577 Error(
"AddProcessed",
"%s: TSlaveStat instance for worker %s not found!",
1579 (sl ? sl->
GetName() :
"**undef**"));
1585 if ( slstat->fCurElem != 0 ) {
1586 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1590 numev = status->
GetEntries() - slstat->GetEntriesProcessed();
1598 progress = slstat->AddProcessed(status);
1600 (*fProgressStatus) += *progress;
1602 slstat->UpdateRates(status);
1609 Info(
"AddProcessed",
"%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1615 slstat->fCurElem->GetFileName(),
1623 if (numev != expectedNumEv) {
1629 if (newPacket && numev < expectedNumEv) {
1631 newPacket->
SetFirst(first + numev);
1635 Error(
"AddProcessed",
"%s: processed too much? (%lld, %lld)",
1648 slstat->fCurElem = 0;
1649 return (expectedNumEv - numev);
1676 Error(
"GetNextPacket",
"TSlaveStat instance for worker %s not found!",
1677 (sl ? sl->
GetName() :
"**undef**"));
1682 TFileStat *file = slstat->fCurFile;
1688 Int_t learnent = -1;
1689 if ( slstat->fCurElem != 0 ) {
1692 Double_t latency, proctime, proccpu;
1702 (*r) >> cachesz >> learnent;
1712 (*r) >> latency >> proctime >> proccpu;
1723 if (!fileNotOpen && !fileCorrupted) {
1725 Error(
"GetNextPacket",
"%s: the worker processed a different # of entries", sl->
GetOrdinal());
1728 Error(
"GetNextPacket",
"%s: processed too many entries! (%lld, %lld)",
1736 if (file->GetElement()) {
1737 if (fileCorrupted) {
1738 Info(
"GetNextPacket",
"%s: file '%s' turned corrupted: invalidating file (%lld)",
1739 sl->
GetOrdinal(), file->GetElement()->GetName(), restEntries);
1742 Info(
"GetNextPacket",
"%s: %d entries un-processed", sl->
GetOrdinal(), nunproc);
1747 num = file->GetElement()->GetEntries() + restEntries;
1751 Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1752 num = restEntries + rest;
1754 file->GetElement()->SetEntries(num);
1756 Info(
"GetNextPacket",
"%s: removed file: %s, entries left: %lld", sl->
GetOrdinal(),
1757 file->GetElement()->GetName(), file->GetElement()->GetEntries());
1761 Info(
"GetNextPacket",
"%s: file '%s' could not be open: invalidating related element",
1762 sl->
GetOrdinal(), file->GetElement()->GetName());
1765 file->GetElement()->Invalidate();
1775 Info(
"GetNextPacket",
"%s: error raised by worker, but TFileStat object invalid:" 1780 firstPacket =
kTRUE;
1789 if (file != 0) nodeName = file->GetNode()->GetName();
1790 TString nodeHostName(slstat->GetName());
1793 Info(
"GetNextPacket",
"%s: entries processed: %lld - looking for a packet from node '%s'",
1797 if ( file != 0 && file->IsDone() ) {
1798 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1799 file->GetNode()->DecRunSlaveCnt();
1802 file->GetElement()->GetFileName(),
kFALSE);
1806 slstat->fCurFile = file;
1819 if ( slstat->GetFileNode() != 0 ) {
1823 Bool_t nonLocalNodePossible;
1825 nonLocalNodePossible = 0;
1827 nonLocalNodePossible = firstNonLocalNode ?
1828 (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() <
fMaxSlaveCnt))
1830 openLocal = !nonLocalNodePossible;
1831 Float_t slaveRate = slstat->GetAvgRate();
1832 if ( nonLocalNodePossible && fStrategy == 1) {
1834 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1835 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1839 else if ( slaveRate == 0 ) {
1842 if ( slstat->GetLocalEventsLeft() * localPreference
1843 > (avgEventsLeftPerSlave))
1845 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1846 < slstat->GetLocalEventsLeft() * localPreference )
1848 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1850 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1854 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1856 Float_t avgTime = avgEventsLeftPerSlave
1858 if (slaveTime * localPreference > avgTime)
1860 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1861 < slstat->GetLocalEventsLeft() * localPreference)
1865 if (openLocal || fStrategy == 0) {
1867 file = slstat->GetFileNode()->GetNextUnAlloc();
1869 file = slstat->GetFileNode()->GetNextActive();
1872 slstat->SetFileNode(0);
1885 if (file == 0)
return 0;
1887 PDB(kPacketizer,3)
if (fFilesToProcess) fFilesToProcess->
Print();
1889 slstat->fCurFile = file;
1891 if (file->GetNode()->GetMySlaveCnt() == 0 &&
1892 file->GetElement()->GetFirst() == file->GetNextEntry()) {
1895 Error(
"GetNextPacket",
1896 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1901 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1902 file->GetNode()->IncRunSlaveCnt();
1905 file->GetNode()->GetName(),
1906 file->GetElement()->GetFileName(),
kTRUE);
1920 if ( first + num * 1.5 >= last ) {
1928 file->MoveNextEntry(num);
1932 slstat->fCurElem->SetEntryList(base->
GetEntryList(), first, num);
1943 return slstat->fCurElem;
1954 while ((key = nxw())) {
1956 if (wrkstat && wrkstat->fCurFile) actw++;
1974 while ((key = nxw()) != 0) {
1976 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1978 currate += slstat->GetProgressStatus()->GetCurrentRate();
2020 while ((key = nxw()) != 0) {
2024 Long64_t e = slstat->GetEntriesProcessed();
2025 if (e <= 0) all =
kFALSE;
2027 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2029 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2030 : slstat->GetAvgRate();
2038 Info(
"GetEstEntriesProcessed",
"%s: e:%lld rate:%f dt:%f e:%lld",
2039 slstat->fSlave->GetOrdinal(),
2040 slstat->GetEntriesProcessed(), rate, dt,
e);
2047 Info(
"GetEstEntriesProcessed",
2048 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2057 return ((all) ? 0 : 1);
2071 TList **listOfMissingFiles)
2075 Error(
"MarkBad",
"Worker does not exist");
2079 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2080 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2081 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2088 TList *subSet = slaveStat->GetProcessedSubSet();
2091 if (slaveStat->fCurElem) {
2092 subSet->
Add(slaveStat->fCurElem);
2095 Int_t nmg = 0, ntries = 100;
2109 }
while (nmg > 0 && --ntries > 0);
2115 Warning(
"MarkBad",
"subset processed by bad worker not found!");
2117 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2131 TList **listOfMissingFiles)
2134 Error(
"ReassignPacket",
"empty packet!");
2155 node->DecreaseProcessed(e->
GetNum());
2164 if (listOfMissingFiles && *listOfMissingFiles)
2165 (*listOfMissingFiles)->Add((
TObject *)fi);
2176 TList **listOfMissingFiles)
2179 Error(
"SplitPerHost",
"Empty list of packets!");
2182 if (elements->
GetSize() <= 0) {
2183 Error(
"SplitPerHost",
"The input list contains no elements");
2186 TIter subSetIter(elements);
2192 Error(
"SplitPerHost",
"Error removing a missing file");
virtual const char * GetName() const
Returns name of object.
virtual Int_t AddProcessed(TSlave *, TProofProgressStatus *, Double_t, TList **)
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
virtual Bool_t IsValid() const
TFileInfo * GetFileInfo(const char *type="TTree")
Return the content of this element in the form of a TFileInfo.
Long64_t GetTDSetOffset() const
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
void Reset()
Reset the internal data structure for packet distribution.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual Long64_t GetN() const
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Collectable string class.
Float_t fBaseLocalPreference
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
void SetTDSetOffset(Long64_t offset)
This class represents a WWW compatible URL.
TObject * GetParameter(const char *par) const
Get specified parameter.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
const char * GetProtocol() const
This class implements a data set to be used for PROOF processing.
Long64_t GetReadCalls() const
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Double_t GetProcTime() const
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
Float_t GetProcTime() const
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection)...
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Long64_t GetFirst() const
const char * GetOrdinal() const
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
const char * GetName() const
Returns name of object.
Basic time type with millisecond precision.
virtual void DeActivateAll()
De-activate all activated sockets.
Long64_t fNEventsOnRemLoc
Long64_t GetEntries() const
Int_t GetProtocol() const
TList * GetListOfElements() const
Double_t GetCumProcTime() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
virtual ~TPacketizerAdaptive()
Destructor.
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Int_t GetActiveWorkers()
Return the number of workers still processing.
Double_t GetLastUpdate() const
TFileNode * NextNode()
Get next node which has unallocated files.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
const char * GetFile() const
Manages an element of a TDSet.
const char * GetHost() const
virtual const char * ClassName() const
Returns name of class to which the object belongs.
TFileStat * GetNextActive()
Get next active file.
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries...
TSocket * GetSocket() const
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Double_t fMaxEntriesRatio
virtual Int_t GetN() const
const char * GetDirectory() const
Return directory where to look for object.
A sorted doubly linked list.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer, for each worker, tries to predict whether the worker will finish processing it's local files before the end of the query.
void Reset()
Reset the internal datastructure for packet distribution.
TSocket * Select()
Return pointer to socket for which an event is waiting.
TProofProgressStatus * fProgressStatus
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
const char * GetObjName() const
const char * GetName() const
Returns name of object.
void SetLastEntries(Long64_t entries)
Named parameter, streamable and storable.
const TString & GetString() const
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual Bool_t IsSortable() const
R__EXTERN TSystem * gSystem
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
TFileNode * NextActiveNode()
Get next active node.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
char * Form(const char *fmt,...)
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that, a TDSetElement::Merge method is needed.
A TEventList object is a list of selected events (entries) in a TTree.
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
Int_t MergeElement(TDSetElement *elem)
Check if 'elem' is overlapping or subsequent and, if the case, return a merged element.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
void Print(Option_t *options="") const
Print a TDSetElement. When option="a" print full data.
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap...
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
void SetHost(const char *host)
The packetizer is a load balancing object created for each query.
Float_t fFractionOfRemoteFiles
R__EXTERN TProof * gProof
void Add(THist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const THist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
TObject * GetEntryList() const
virtual const char * HostName()
Return the system's host name.
Long64_t GetEntriesProcessed() const
Double_t GetCPUTime() const
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
This class controls a Parallel ROOT Facility, PROOF, cluster.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
static constexpr double s
void SetNum(Long64_t num)
you should not use this method at all Int_t Int_t Double_t Double_t Double_t e
virtual void Reset()
Reset or initialize access to the elements.
const char * GetFileName() const
virtual void Clear(Option_t *option="")
Remove all objects from the list.
const char * GetType() const
Mother of all ROOT objects.
Long64_t GetBytesRead() const
const char * GetDataSet() const
R__EXTERN TProofServ * gProofServ
virtual void Add(TObject *obj)
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Class describing a generic file including meta information.
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual const char * GetName() const
Returns name of object.
virtual Int_t GetSize() const
Class describing a PROOF worker server.
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
void SetFirst(Long64_t first)
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
A List of entry numbers in a TTree or TChain.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
virtual const char * GetTitle() const
Returns title of object.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
const char * Data() const