75class TPacketizerAdaptive::TFileStat :
public TObject {
86 Bool_t IsDone()
const {
return fIsDone;}
88 void SetDone() {fIsDone =
kTRUE;}
89 TFileNode *GetNode()
const {
return fNode;}
91 Long64_t GetNextEntry()
const {
return fNextEntry;}
92 void MoveNextEntry(
Long64_t step) {fNextEntry += step;}
99 const TFileStat *fst =
dynamic_cast<const TFileStat*
>(obj);
100 if (fst && GetElement() && fst->GetElement()) {
101 Long64_t ent = GetElement()->GetNum();
102 Long64_t entfst = fst->GetElement()->GetNum();
103 if (ent > 0 && entfst > 0) {
106 }
else if (ent < entfst) {
118 Printf(
"TFileStat: %s %lld", fElement ? fElement->
GetName() :
"---",
119 fElement ? fElement->
GetNum() : -1);
123TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node,
TDSetElement *elem,
TList *files)
124 : fIsDone(
kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
127 if (files) files->
Add(
this);
133class TPacketizerAdaptive::TFileNode :
public TObject {
156 ~TFileNode() {
delete fFiles;
delete fActFiles; }
158 void IncMySlaveCnt() { fMySlaveCnt++; }
159 Int_t GetMySlaveCnt()
const {
return fMySlaveCnt; }
160 void IncExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt++; }
161 void DecExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt--;
R__ASSERT(fExtSlaveCnt >= 0); }
162 Int_t GetSlaveCnt()
const {
return fMySlaveCnt + fExtSlaveCnt; }
163 void IncRunSlaveCnt() { fRunSlaveCnt++; }
164 void DecRunSlaveCnt() { fRunSlaveCnt--;
R__ASSERT(fRunSlaveCnt >= 0); }
165 Int_t GetRunSlaveCnt()
const {
return fRunSlaveCnt; }
166 Int_t GetExtSlaveCnt()
const {
return fExtSlaveCnt; }
167 Int_t GetNumberOfActiveFiles()
const {
return fActFiles->
GetSize(); }
171 { fProcessed += nEvents; }
172 Long64_t GetProcessed()
const {
return fProcessed; }
173 void DecreaseProcessed(
Long64_t nEvents) { fProcessed -= nEvents; }
176 Long64_t GetEventsLeftPerSlave()
const
177 {
return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178 void IncEvents(
Long64_t nEvents) { fEvents += nEvents; }
179 const char *
GetName()
const {
return fNodeName.
Data(); }
180 Long64_t GetNEvents()
const {
return fEvents; }
187 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188 Printf(
"+++ TFileNode: %s +++", fNodeName.
Data());
189 Printf(
"+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190 Printf(
"+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
192 if (fFiles && fFiles->
GetSize() > 0) {
194 while ((fs = (TFileStat *) nxf())) {
195 if ((
e = fs->GetElement())) {
196 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn,
e->GetName(),
197 e->GetFirst(),
e->GetFirst() +
e->GetNum() - 1,
e->GetNum(), fs->GetNextEntry());
199 Printf(
"+++ #%d: no element! ", ++nn);
203 Printf(
"+++ Active files: %d ", fActFiles ? fActFiles->
GetSize() : 0);
204 if (fActFiles && fActFiles->
GetSize() > 0) {
205 TIter nxaf(fActFiles);
206 while ((fs = (TFileStat *) nxaf())) {
207 if ((
e = fs->GetElement())) {
208 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn,
e->GetName(),
209 e->GetFirst(),
e->GetFirst() +
e->GetNum() - 1,
e->GetNum(), fs->GetNextEntry());
211 Printf(
"+++ #%d: no element! ", ++nn);
215 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
221 TFileStat *
f =
new TFileStat(
this, elem, files);
223 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->
First();
228 TObject *next = fUnAllocFileNext;
232 fActFiles->
Add(next);
233 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
236 fUnAllocFileNext = fFiles->
After(fUnAllocFileNext);
238 return (TFileStat *) next;
245 if (fActFileNext != 0) {
246 fActFileNext = fActFiles->
After(fActFileNext);
247 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
250 return (TFileStat *) next;
255 if (fActFileNext ==
file) fActFileNext = fActFiles->
After(
file);
258 if (fActFileNext == 0) fActFileNext = fActFiles->
First();
269 const TFileNode *obj =
dynamic_cast<const TFileNode*
>(other);
271 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
279 Int_t myVal = GetRunSlaveCnt();
280 Int_t otherVal = obj->GetRunSlaveCnt();
281 if (myVal < otherVal) {
283 }
else if (myVal > otherVal) {
287 if ((fEvents - fProcessed) >
288 (obj->GetNEvents() - obj->GetProcessed())) {
295 Int_t myVal = GetSlaveCnt();
296 Int_t otherVal = obj->GetSlaveCnt();
297 if (myVal < otherVal) {
299 }
else if (myVal > otherVal) {
309 fUnAllocFileNext = fFiles->
First();
320 : fNodeName(
name), fFiles(new
TList), fUnAllocFileNext(0),
321 fActFiles(new
TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323 fStrategy(strategy), fFilesToProcess(files)
338 TFileNode *fFileNode;
346 TSlaveStat(
TSlave *slave);
348 TFileNode *GetFileNode()
const {
return fFileNode; }
351 TFileStat *GetCurFile() {
return fCurFile; }
352 void SetFileNode(TFileNode *node) { fFileNode = node; }
356 return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
357 Int_t GetLocalEventsLeft() {
358 return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
359 TList *GetProcessedSubSet() {
return fDSubSet; }
367TPacketizerAdaptive::TSlaveStat::TSlaveStat(
TSlave *slave)
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
371 fDSubSet =
new TList();
378 if (strcmp(slave->
ClassName(),
"TSlaveLite")) {
381 if (fWrkFQDN.Contains(
"localhost") || fWrkFQDN ==
"127.0.0.1")
385 Info("TSlaveStat", "wrk FQDN: %
s", fWrkFQDN.Data());
403 Error(
"UpdateRates",
"no status object!");
406 if (fCurFile->IsDone()) {
426 if (st && fDSubSet && fCurElem) {
429 fDSubSet->
Add(fCurElem);
433 Error(
"AddProcessed",
"processed subset of current elem undefined");
450 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
451 "enter (first %lld, num %lld)",
first, num);
469 Error(
"TPacketizerAdaptive",
"No progress status");
477 cpsync =
gEnv->
GetValue(
"Packetizer.CachePacketSync", 1);
497 Info(
"TPacketizerAdaptive",
"using the basic strategy of TPacketizer");
498 }
else if (strategy != 1) {
499 Warning(
"TPacketizerAdaptive",
"unsupported strategy index (%d): ignore", strategy);
504 if (maxSlaveCnt < 0) {
505 Info(
"TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
514 Info(
"TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
518 maxSlaveCnt = (
Long_t) mxslcnt;
523 maxSlaveCnt =
gEnv->
GetValue(
"Packetizer.MaxWorkersPerNode", 0);
524 if (maxSlaveCnt > 0) {
526 Info(
"TPacketizerAdaptive",
"Setting max number of workers per node to %ld",
536 Int_t forceLocal = 0;
541 Info(
"TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
551 Int_t packetAsAFraction = 0;
553 if (packetAsAFraction > 0) {
555 Info(
"TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
559 Info(
"TPacketizerAdaptive",
"packetAsAFraction parameter must be higher than 0");
564 Int_t tryReassign = 0;
566 tryReassign =
gEnv->
GetValue(
"Packetizer.TryReassign", 0);
569 Info(
"TPacketizerAdaptive",
"failed packets will be re-assigned");
601 partitionsStr =
gEnv->
GetValue(
"Packetizer.Partitions",
"");
602 if (!partitionsStr.
IsNull()) {
603 Info(
"TPacketizerAdaptive",
"Partitions: %s", partitionsStr.
Data());
604 partitions = partitionsStr.
Tokenize(
",");
612 if (
e->GetValid())
continue;
618 TUrl url =
e->GetFileName();
620 Info(
"TPacketizerAdaptive",
"element name: %s (url: %s)",
e->GetFileName(), url.
GetUrl());
636 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
644 TIter iString(partitions);
666 Info(
"TPacketizerAdaptive",
"creating new node '%s' or the element", nodeStr.
Data());
669 Info(
"TPacketizerAdaptive",
"adding element to existing node '%s'", nodeStr.
Data());
689 Int_t validateMode = 0;
694 Info(
"TPacketizerAdaptive",
695 "processing subset of entries: validating by file? %s", byfile ?
"yes":
"no");
710 Info(
"TPacketizerAdaptive",
711 "processing range: first %lld, num %lld",
first, num);
719 if (!
e->GetValid())
continue;
721 TUrl url =
e->GetFileName();
725 Info(
"TPacketizerAdaptive",
"processing element '%s'",
e->GetFileName());
727 Info(
"TPacketizerAdaptive",
728 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur,
e->GetEntryList());
730 if (!
e->GetEntryList()) {
732 if (cur + eNum <
first) {
735 Info(
"TPacketizerAdaptive",
" --> skip element cur %lld", cur);
740 if (num != -1 && (
first+num <= cur)) {
743 Info(
"TPacketizerAdaptive",
" --> drop element cur %lld", cur);
748 if (cur <=
first || (num != -1 && (
first+num <= cur+eNum))) {
753 e->SetFirst( eFirst + (
first - cur) );
754 e->SetNum(
e->GetNum() - (
first - cur) );
756 Info(
"TPacketizerAdaptive",
" --> adjust start %lld and end %lld",
760 if (num != -1 && (
first+num <= cur+eNum)) {
763 e->SetNum(
first + num -
e->GetFirst() - cur );
765 Info(
"TPacketizerAdaptive",
" --> adjust end %lld",
first + num - cur);
772 Info(
"TPacketizerAdaptive",
" --> increment 'cur' by %lld", eNum);
786 Info(
"TPacketizerAdaptive",
" --> entry-list element: %lld entries", eNum);
789 eNum = evl ? evl->
GetN() : eNum;
791 Info(
"TPacketizerAdaptive",
" --> event-list element: %lld entries (evl:%p)", eNum, evl);
795 Info(
"TPacketizerAdaptive",
" --> empty entry- or event-list element!");
800 Info(
"TPacketizerAdaptive",
" --> next cur %lld", cur);
816 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
824 TIter iString(partitions);
847 Info(
"TPacketizerAdaptive",
" --> creating new node '%s' for element", nodeStr.
Data());
850 Info(
"TPacketizerAdaptive",
" --> adding element to exiting node '%s'", nodeStr.
Data());
856 node->IncEvents(eNum);
857 PDB(kPacketizer,2)
e->Print(
"a");
860 Info(
"TPacketizerAdaptive",
"processing %lld entries in %d files on %d hosts",
874 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
"return");
901 Int_t noRemoteFiles = 0;
903 Int_t totalNumberOfFiles = 0;
905 while (TFileNode *fn = (TFileNode*)next()) {
906 totalNumberOfFiles += fn->GetNumberOfFiles();
907 if (fn->GetMySlaveCnt() == 0) {
908 noRemoteFiles += fn->GetNumberOfFiles();
913 if (totalNumberOfFiles == 0) {
914 Info(
"InitStats",
"no valid or non-empty file found: setting invalid");
927 PDB(kPacketizer,1)
Info(
"InitStats",
"return");
941 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->GetName());
942 file = node->GetNextUnAlloc();
945 if (nodeHostName && strlen(nodeHostName) > 0) {
956 TUrl uu(fn->GetName());
958 Info(
"GetNextUnAlloc",
"comparing %s with %s...", nodeHostName, uu.
GetHost());
961 if (!strcmp(nodeHostName, uu.
GetHost())) {
965 if ((
file = node->GetNextUnAlloc()) == 0) {
970 Info(
"GetNextUnAlloc",
"found! (host: %s)", uu.
GetHost());
975 Warning(
"GetNextUnAlloc",
"unallocate entry %d is empty!", i);
982 Info(
"GetNextUnAlloc",
"reached Workers-per-Node Limit (%ld)",
fMaxSlaveCnt);
990 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->GetName());
1003 PDB(kPacketizer, 2) {
1005 Info(
"GetNextUnAlloc",
"no file found!");
1021 PDB(kPacketizer,2) {
1050 TFileStat *
file = 0;
1053 file = node->GetNextActive();
1067 PDB(kPacketizer,2) {
1068 Info(
"NextActiveNode",
"enter");
1076 Info(
"NextActiveNode",
"reached Workers-per-Node limit (%ld)",
fMaxSlaveCnt);
1088 TFileNode *node =
file->GetNode();
1090 node->RemoveActive(
file);
1114 while ((fn = (TFileNode*) files.
Next()) != 0) {
1120 while ((key = slaves.
Next()) != 0) {
1123 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
1128 TFileNode *fnmin = 0;
1131 while ((fn = (TFileNode*) files.
Next()) != 0) {
1132 if (!strcmp(slstat->GetName(),
TUrl(fn->GetName()).
GetHost())) {
1133 if (fn->GetMySlaveCnt() < fncnt) {
1135 fncnt = fn->GetMySlaveCnt();
1140 slstat->SetFileNode(fnmin);
1141 fnmin->IncMySlaveCnt();
1143 Info(
"Reset",
"assigning node '%s' to '%s' (cnt: %d)",
1144 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1146 slstat->fCurFile = 0;
1157 TMap slaves_by_sock;
1169 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
1186 TString msg(
"Validating files");
1203 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL",
s->GetName());
1207 TFileNode *node = 0;
1208 TFileStat *
file = 0;
1211 if ((node = slstat->GetFileNode()) != 0) {
1212 PDB(kPacketizer,3) node->Print();
1215 slstat->SetFileNode(0);
1226 slstat->fCurFile =
file;
1229 if (entries < 0 || strlen(elem->
GetTitle()) <= 0) {
1231 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1238 s->GetSocket()->Send(
m );
1241 Info(
"ValidateFiles",
1242 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1243 s->GetOrdinal(),
s->GetName(),
s->GetSocket(),
1254 Error(
"ValidateFiles",
1255 "first (%lld) higher then number of entries (%lld) in %s",
1258 slstat->fCurFile->SetDone();
1262 if (elem->
GetNum() == -1) {
1265 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
1271 Info(
"ValidateFiles",
1272 "found elem '%s' with %lld entries", elem->
GetFileName(), entries);
1290 if (byfile && maxent > 0) {
1292 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1293 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1296 Info(
"ValidateFiles",
"{%lld, %lld, %lld}: needs to validate %lld more files",
1297 maxent, totent, nopenf, nrestf);
1299 while ((slm = (
TSlave *) si.
Next()) && nrestf--) {
1305 Info(
"ValidateFiles",
"no need to validate more files");
1313 PDB(kPacketizer,3) {
1314 Info(
"ValidateFiles",
"waiting for %d slaves:", mon.
GetActive());
1320 Info(
"ValidateFiles",
" worker-%s (%s)",
1329 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
1336 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
1341 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
1343 ((
TProof*)
gProof)->MarkBad(slave,
"socket got invalid during validation");
1350 if (sock->
Recv(reply) <= 0) {
1352 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
1355 ((
TProof*)
gProof)->MarkBad(slave,
"receive failed during validation");
1365 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1378 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1381 (*reply) >> entries;
1386 (*reply) >> objname;
1387 e->SetTitle(objname);
1390 e->SetTDSetOffset(entries);
1396 if (!
e->GetEntryList()) {
1397 if (
e->GetFirst() > entries) {
1398 Error(
"ValidateFiles",
1399 "first (%lld) higher then number of entries (%lld) in %s",
1400 e->GetFirst(), entries,
e->GetFileName());
1403 slavestat->fCurFile->SetDone();
1408 if (
e->GetNum() == -1) {
1409 e->SetNum(entries -
e->GetFirst());
1410 }
else if (
e->GetFirst() +
e->GetNum() > entries) {
1411 Error(
"ValidateFiles",
1412 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1413 e->GetNum(),
e->GetFirst(), entries,
e->GetFileName());
1414 e->SetNum(entries -
e->GetFirst());
1428 Error(
"ValidateFiles",
"cannot get entries for file: %s - skipping",
e->GetFileName() );
1435 m <<
TString(
Form(
"Cannot get entries for file: %s - skipping",
1444 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated", totent);
1447 if (maxent < 0 || ((totent < maxent) && !byfile))
1467 while ( (el =
dynamic_cast<TDSetElement*
> (next())) ) {
1495 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1496 Float_t rate = slstat->GetCurRate();
1498 rate = slstat->GetAvgRate();
1519 if (elem) maxEntries = elem->
GetNum();
1522 PDB(kPacketizer,3) {
1523 Info(
"CalculatePacketSize",
"%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1524 Info(
"CalculatePacketSize",
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1532 if (bevt > 0. && cachesz > 0 && cpsync) {
1533 if ((
Long64_t)(rate * packetTime * bevt) < cachesz)
1534 packetTime = cachesz / bevt / rate;
1542 num = (
Long64_t)(rate * packetTime);
1546 Info(
"CalculatePacketSize",
"%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1548 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1553 num = (learnent > 0) ? 5 * learnent : 1000;
1557 Info(
"CalculatePacketSize",
"%s: num: %lld", slstat->GetOrdinal(), num);
1560 if (num < 1) num = 1;
1572 TList **listOfMissingFiles)
1577 Error(
"AddProcessed",
"%s: TSlaveStat instance for worker %s not found!",
1579 (sl ? sl->
GetName() :
"**undef**"));
1585 if ( slstat->fCurElem != 0 ) {
1586 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1590 numev = status->
GetEntries() - slstat->GetEntriesProcessed();
1598 progress = slstat->AddProcessed(status);
1600 (*fProgressStatus) += *progress;
1602 slstat->UpdateRates(status);
1609 Info(
"AddProcessed",
"%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1615 slstat->fCurElem->GetFileName(),
1623 if (numev != expectedNumEv) {
1629 if (newPacket && numev < expectedNumEv) {
1635 Error(
"AddProcessed",
"%s: processed too much? (%lld, %lld)",
1648 slstat->fCurElem = 0;
1649 return (expectedNumEv - numev);
1676 Error(
"GetNextPacket",
"TSlaveStat instance for worker %s not found!",
1677 (sl ? sl->
GetName() :
"**undef**"));
1682 TFileStat *
file = slstat->fCurFile;
1688 Int_t learnent = -1;
1689 if ( slstat->fCurElem != 0 ) {
1692 Double_t latency, proctime, proccpu;
1702 (*r) >> cachesz >> learnent;
1703 if (
r->BufferSize() >
r->Length()) (*r) >> restEntries;
1712 (*r) >> latency >> proctime >> proccpu;
1714 if (
r->BufferSize() >
r->Length()) (*r) >> bytesRead;
1715 if (
r->BufferSize() >
r->Length()) (*r) >> restEntries;
1717 if (
r->BufferSize() >
r->Length()) (*r) >> totev;
1723 if (!fileNotOpen && !fileCorrupted) {
1725 Error(
"GetNextPacket",
"%s: the worker processed a different # of entries", sl->
GetOrdinal());
1728 Error(
"GetNextPacket",
"%s: processed too many entries! (%lld, %lld)",
1736 if (
file->GetElement()) {
1737 if (fileCorrupted) {
1738 Info(
"GetNextPacket",
"%s: file '%s' turned corrupted: invalidating file (%lld)",
1742 Info(
"GetNextPacket",
"%s: %d entries un-processed", sl->
GetOrdinal(), nunproc);
1747 num =
file->GetElement()->GetEntries() + restEntries;
1752 num = restEntries + rest;
1754 file->GetElement()->SetEntries(num);
1756 Info(
"GetNextPacket",
"%s: removed file: %s, entries left: %lld", sl->
GetOrdinal(),
1757 file->GetElement()->GetName(),
file->GetElement()->GetEntries());
1761 Info(
"GetNextPacket",
"%s: file '%s' could not be open: invalidating related element",
1765 file->GetElement()->Invalidate();
1775 Info(
"GetNextPacket",
"%s: error raised by worker, but TFileStat object invalid:"
1780 firstPacket =
kTRUE;
1789 if (
file != 0) nodeName =
file->GetNode()->GetName();
1790 TString nodeHostName(slstat->GetName());
1793 Info(
"GetNextPacket",
"%s: entries processed: %lld - looking for a packet from node '%s'",
1797 if (
file != 0 &&
file->IsDone() ) {
1798 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1799 file->GetNode()->DecRunSlaveCnt();
1806 slstat->fCurFile =
file;
1819 if ( slstat->GetFileNode() != 0 ) {
1823 Bool_t nonLocalNodePossible;
1825 nonLocalNodePossible = 0;
1827 nonLocalNodePossible = firstNonLocalNode ?
1830 openLocal = !nonLocalNodePossible;
1831 Float_t slaveRate = slstat->GetAvgRate();
1832 if ( nonLocalNodePossible &&
fStrategy == 1) {
1834 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1835 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1839 else if ( slaveRate == 0 ) {
1842 if ( slstat->GetLocalEventsLeft() * localPreference
1843 > (avgEventsLeftPerSlave))
1845 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1846 < slstat->GetLocalEventsLeft() * localPreference )
1848 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1850 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1854 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1856 Float_t avgTime = avgEventsLeftPerSlave
1858 if (slaveTime * localPreference > avgTime)
1860 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1861 < slstat->GetLocalEventsLeft() * localPreference)
1867 file = slstat->GetFileNode()->GetNextUnAlloc();
1869 file = slstat->GetFileNode()->GetNextActive();
1872 slstat->SetFileNode(0);
1885 if (
file == 0)
return 0;
1889 slstat->fCurFile =
file;
1891 if (
file->GetNode()->GetMySlaveCnt() == 0 &&
1892 file->GetElement()->GetFirst() ==
file->GetNextEntry()) {
1895 Error(
"GetNextPacket",
1896 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1901 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1902 file->GetNode()->IncRunSlaveCnt();
1905 file->GetNode()->GetName(),
1906 file->GetElement()->GetFileName(),
kTRUE);
1920 if (
first + num * 1.5 >= last ) {
1928 file->MoveNextEntry(num);
1943 return slstat->fCurElem;
1954 while ((key = nxw())) {
1956 if (wrkstat && wrkstat->fCurFile) actw++;
1974 while ((key = nxw()) != 0) {
1976 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1978 currate += slstat->GetProgressStatus()->GetCurrentRate();
2020 while ((key = nxw()) != 0) {
2024 Long64_t e = slstat->GetEntriesProcessed();
2027 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2029 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2030 : slstat->GetAvgRate();
2038 Info(
"GetEstEntriesProcessed",
"%s: e:%lld rate:%f dt:%f e:%lld",
2039 slstat->fSlave->GetOrdinal(),
2040 slstat->GetEntriesProcessed(), rate, dt,
e);
2047 Info(
"GetEstEntriesProcessed",
2048 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2057 return ((all) ? 0 : 1);
2071 TList **listOfMissingFiles)
2075 Error(
"MarkBad",
"Worker does not exist");
2079 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2080 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2081 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2088 TList *subSet = slaveStat->GetProcessedSubSet();
2091 if (slaveStat->fCurElem) {
2092 subSet->
Add(slaveStat->fCurElem);
2095 Int_t nmg = 0, ntries = 100;
2101 if (
e->MergeElement(enxt) >= 0) {
2109 }
while (nmg > 0 && --ntries > 0);
2115 Warning(
"MarkBad",
"subset processed by bad worker not found!");
2117 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2131 TList **listOfMissingFiles)
2134 Error(
"ReassignPacket",
"empty packet!");
2138 TUrl url =
e->GetFileName();
2155 node->DecreaseProcessed(
e->GetNum());
2164 if (listOfMissingFiles && *listOfMissingFiles)
2165 (*listOfMissingFiles)->Add((
TObject *)fi);
2176 TList **listOfMissingFiles)
2179 Error(
"SplitPerHost",
"Empty list of packets!");
2182 if (elements->
GetSize() <= 0) {
2183 Error(
"SplitPerHost",
"The input list contains no elements");
2186 TIter subSetIter(elements);
2192 Error(
"SplitPerHost",
"Error removing a missing file");
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TProofServ * gProofServ
R__EXTERN TProof * gProof
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
const char * GetObjName() const
TObject * GetEntryList() const
void SetFirst(Long64_t first)
void SetTDSetOffset(Long64_t offset)
void SetNum(Long64_t num)
const char * GetDirectory() const
Return directory where to look for object.
Long64_t GetTDSetOffset() const
const char * GetFileName() const
Long64_t GetFirst() const
This class implements a data set to be used for PROOF processing.
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
virtual void Reset()
Reset or initialize access to the elements.
const char * GetType() const
TList * GetListOfElements() const
A List of entry numbers in a TTree or TChain.
virtual Long64_t GetN() const
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
A TEventList object is a list of selected events (entries) in a TTree.
virtual Int_t GetN() const
Class describing a generic file including meta information.
virtual void Add(TObject *obj)
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
virtual TObject * Remove(TObject *obj)
Remove object from the list.
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
virtual TObject * At(Int_t idx) const
Returns the object at position idx. Returns 0 if idx is out of range.
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
virtual void Clear(Option_t *option="")
Remove all objects from the list.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
TObject * Remove(TObject *key)
Remove the (key,value) pair with key from the map.
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void DeActivateAll()
De-activate all activated sockets.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
virtual const char * GetTitle() const
Returns title of object.
virtual const char * GetName() const
Returns name of object.
Collectable string class.
const char * GetName() const
Returns name of object.
const TString & GetString() const
Mother of all ROOT objects.
virtual const char * GetName() const
Returns name of object.
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
virtual Bool_t IsSortable() const
virtual const char * ClassName() const
Returns name of class to which the object belongs.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual ~TPacketizerAdaptive()
Destructor.
Double_t fMaxEntriesRatio
TFileNode * NextNode()
Get next node which has unallocated files.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetActiveWorkers()
Return the number of workers still processing.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
Float_t fBaseLocalPreference
Float_t fFractionOfRemoteFiles
Long64_t fNEventsOnRemLoc
TFileNode * NextActiveNode()
Get next active node.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Named parameter, streamable and storable.
Container class for processing statistics.
Double_t GetProcTime() const
Double_t GetLastUpdate() const
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
This class controls a Parallel ROOT Facility, PROOF, cluster.
TObject * GetParameter(const char *par) const
Get specified parameter.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Class describing a PROOF worker server.
TSocket * GetSocket() const
Int_t GetProtocol() const
const char * GetName() const
Returns name of object.
const char * GetOrdinal() const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
A sorted doubly linked list.
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
virtual const char * HostName()
Return the system's host name.
Basic time type with millisecond precision.
This class represents a WWW compatible URL.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
const char * GetFile() const
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
const char * GetHost() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
void SetHost(const char *host)
const char * GetProtocol() const
Long64_t GetEntriesProcessed() const
Double_t GetProcTime() const
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
TProofProgressStatus * fStatus
TProofProgressStatus * GetProgressStatus()
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
Long64_t GetEntriesProcessed() const
Double_t GetCumProcTime() const
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
void Add(RHist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
static constexpr double s