103 if (ent > 0 && entfst > 0) {
106 }
else if (ent < entfst) {
124 : fIsDone(
kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
127 if (files) files->
Add(
this);
187 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
195 if ((
e =
fs->GetElement())) {
196 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn,
e->GetName(),
197 e->GetFirst(),
e->GetFirst() +
e->GetNum() - 1,
e->GetNum(),
fs->GetNextEntry());
199 Printf(
"+++ #%d: no element! ", ++nn);
207 if ((
e =
fs->GetElement())) {
208 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn,
e->GetName(),
209 e->GetFirst(),
e->GetFirst() +
e->GetNum() - 1,
e->GetNum(),
fs->GetNextEntry());
211 Printf(
"+++ #%d: no element! ", ++nn);
215 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
271 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
281 if (myVal < otherVal) {
283 }
else if (myVal > otherVal) {
297 if (myVal < otherVal) {
299 }
else if (myVal > otherVal) {
320 : fNodeName(
name), fFiles(new
TList), fUnAllocFileNext(0),
321 fActFiles(new
TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
378 if (strcmp(slave->
ClassName(),
"TSlaveLite")) {
403 Error(
"UpdateRates",
"no status object!");
406 if (fCurFile->IsDone()) {
426 if (st && fDSubSet && fCurElem) {
429 fDSubSet->Add(fCurElem);
433 Error(
"AddProcessed",
"processed subset of current elem undefined");
450 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
451 "enter (first %lld, num %lld)",
first, num);
469 Error(
"TPacketizerAdaptive",
"No progress status");
477 cpsync =
gEnv->
GetValue(
"Packetizer.CachePacketSync", 1);
497 Info(
"TPacketizerAdaptive",
"using the basic strategy of TPacketizer");
498 }
else if (strategy != 1) {
499 Warning(
"TPacketizerAdaptive",
"unsupported strategy index (%d): ignore", strategy);
504 if (maxSlaveCnt < 0) {
505 Info(
"TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
514 Info(
"TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
518 maxSlaveCnt = (
Long_t) mxslcnt;
523 maxSlaveCnt =
gEnv->
GetValue(
"Packetizer.MaxWorkersPerNode", 0);
524 if (maxSlaveCnt > 0) {
526 Info(
"TPacketizerAdaptive",
"Setting max number of workers per node to %ld",
536 Int_t forceLocal = 0;
541 Info(
"TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
551 Int_t packetAsAFraction = 0;
553 if (packetAsAFraction > 0) {
555 Info(
"TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
559 Info(
"TPacketizerAdaptive",
"packetAsAFraction parameter must be higher than 0");
564 Int_t tryReassign = 0;
566 tryReassign =
gEnv->
GetValue(
"Packetizer.TryReassign", 0);
569 Info(
"TPacketizerAdaptive",
"failed packets will be re-assigned");
601 partitionsStr =
gEnv->
GetValue(
"Packetizer.Partitions",
"");
602 if (!partitionsStr.
IsNull()) {
603 Info(
"TPacketizerAdaptive",
"Partitions: %s", partitionsStr.
Data());
604 partitions = partitionsStr.
Tokenize(
",");
612 if (
e->GetValid())
continue;
618 TUrl url =
e->GetFileName();
620 Info(
"TPacketizerAdaptive",
"element name: %s (url: %s)",
e->GetFileName(), url.
GetUrl());
635 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
643 TIter iString(partitions);
665 Info(
"TPacketizerAdaptive",
"creating new node '%s' or the element", nodeStr.
Data());
668 Info(
"TPacketizerAdaptive",
"adding element to existing node '%s'", nodeStr.
Data());
688 Int_t validateMode = 0;
693 Info(
"TPacketizerAdaptive",
694 "processing subset of entries: validating by file? %s", byfile ?
"yes":
"no");
709 Info(
"TPacketizerAdaptive",
710 "processing range: first %lld, num %lld",
first, num);
718 if (!
e->GetValid())
continue;
720 TUrl url =
e->GetFileName();
724 Info(
"TPacketizerAdaptive",
"processing element '%s'",
e->GetFileName());
726 Info(
"TPacketizerAdaptive",
727 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur,
e->GetEntryList());
729 if (!
e->GetEntryList()) {
731 if (cur + eNum <
first) {
734 Info(
"TPacketizerAdaptive",
" --> skip element cur %lld", cur);
739 if (num != -1 && (
first+num <= cur)) {
742 Info(
"TPacketizerAdaptive",
" --> drop element cur %lld", cur);
747 if (cur <=
first || (num != -1 && (
first+num <= cur+eNum))) {
752 e->SetFirst( eFirst + (
first - cur) );
753 e->SetNum(
e->GetNum() - (
first - cur) );
755 Info(
"TPacketizerAdaptive",
" --> adjust start %lld and end %lld",
759 if (num != -1 && (
first+num <= cur+eNum)) {
762 e->SetNum(
first + num -
e->GetFirst() - cur );
764 Info(
"TPacketizerAdaptive",
" --> adjust end %lld",
first + num - cur);
771 Info(
"TPacketizerAdaptive",
" --> increment 'cur' by %lld", eNum);
785 Info(
"TPacketizerAdaptive",
" --> entry-list element: %lld entries", eNum);
788 eNum = evl ? evl->
GetN() : eNum;
790 Info(
"TPacketizerAdaptive",
" --> event-list element: %lld entries (evl:%p)", eNum, evl);
794 Info(
"TPacketizerAdaptive",
" --> empty entry- or event-list element!");
799 Info(
"TPacketizerAdaptive",
" --> next cur %lld", cur);
814 if (host.
Contains(
"localhost") || host ==
"127.0.0.1") {
822 TIter iString(partitions);
845 Info(
"TPacketizerAdaptive",
" --> creating new node '%s' for element", nodeStr.
Data());
848 Info(
"TPacketizerAdaptive",
" --> adding element to exiting node '%s'", nodeStr.
Data());
855 PDB(kPacketizer,2)
e->Print(
"a");
858 Info(
"TPacketizerAdaptive",
"processing %lld entries in %d files on %d hosts",
872 PDB(kPacketizer,1)
Info(
"TPacketizerAdaptive",
"return");
899 Int_t noRemoteFiles = 0;
901 Int_t totalNumberOfFiles = 0;
904 totalNumberOfFiles += fn->GetNumberOfFiles();
905 if (fn->GetMySlaveCnt() == 0) {
906 noRemoteFiles += fn->GetNumberOfFiles();
911 if (totalNumberOfFiles == 0) {
912 Info(
"InitStats",
"no valid or non-empty file found: setting invalid");
925 PDB(kPacketizer,1)
Info(
"InitStats",
"return");
939 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->
GetName());
943 if (nodeHostName && strlen(nodeHostName) > 0) {
956 Info(
"GetNextUnAlloc",
"comparing %s with %s...", nodeHostName, uu.
GetHost());
959 if (!strcmp(nodeHostName, uu.
GetHost())) {
968 Info(
"GetNextUnAlloc",
"found! (host: %s)", uu.
GetHost());
973 Warning(
"GetNextUnAlloc",
"unallocate entry %d is empty!", i);
980 Info(
"GetNextUnAlloc",
"reached Workers-per-Node Limit (%ld)",
fMaxSlaveCnt);
988 Info(
"GetNextUnAlloc",
"looking for file on node %s", node->
GetName());
1001 PDB(kPacketizer, 2) {
1003 Info(
"GetNextUnAlloc",
"no file found!");
1019 PDB(kPacketizer,2) {
1065 PDB(kPacketizer,2) {
1066 Info(
"NextActiveNode",
"enter");
1074 Info(
"NextActiveNode",
"reached Workers-per-Node limit (%ld)",
fMaxSlaveCnt);
1118 while ((key = slaves.
Next()) != 0) {
1121 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->
GetName());
1141 Info(
"Reset",
"assigning node '%s' to '%s' (cnt: %d)",
1155 TMap slaves_by_sock;
1167 Info(
"ValidateFiles",
"socket added to monitor: %p (%s)",
1184 TString msg(
"Validating files");
1201 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
1227 if (entries < 0 || strlen(elem->
GetTitle()) <= 0) {
1229 file->GetNode()->IncExtSlaveCnt(slstat->
GetName());
1236 s->GetSocket()->Send(
m );
1239 Info(
"ValidateFiles",
1240 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1241 s->GetOrdinal(), s->GetName(), s->GetSocket(),
1252 Error(
"ValidateFiles",
1253 "first (%lld) higher then number of entries (%lld) in %s",
1260 if (elem->
GetNum() == -1) {
1263 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
1269 Info(
"ValidateFiles",
1270 "found elem '%s' with %lld entries", elem->
GetFileName(), entries);
1288 if (byfile && maxent > 0) {
1290 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1291 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1294 Info(
"ValidateFiles",
"{%lld, %lld, %lld}: needs to validate %lld more files",
1295 maxent, totent, nopenf, nrestf);
1297 while ((slm = (
TSlave *) si.
Next()) && nrestf--) {
1303 Info(
"ValidateFiles",
"no need to validate more files");
1311 PDB(kPacketizer,3) {
1312 Info(
"ValidateFiles",
"waiting for %d slaves:", mon.
GetActive());
1318 Info(
"ValidateFiles",
" worker-%s (%s)",
1327 Error(
"ValidateFiles",
"selection has been interrupted - STOP");
1334 PDB(kPacketizer,3)
Info(
"ValidateFiles",
"select returned: %p", sock);
1339 Error(
"ValidateFiles",
"worker-%s (%s) got invalid - STOP",
1341 ((
TProof*)
gProof)->MarkBad(slave,
"socket got invalid during validation");
1348 if (sock->
Recv(reply) <= 0) {
1350 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
1353 ((
TProof*)
gProof)->MarkBad(slave,
"receive failed during validation");
1363 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1379 (*reply) >> entries;
1384 (*reply) >> objname;
1385 e->SetTitle(objname);
1388 e->SetTDSetOffset(entries);
1394 if (!
e->GetEntryList()) {
1395 if (
e->GetFirst() > entries) {
1396 Error(
"ValidateFiles",
1397 "first (%lld) higher then number of entries (%lld) in %s",
1398 e->GetFirst(), entries,
e->GetFileName());
1406 if (
e->GetNum() == -1) {
1407 e->SetNum(entries -
e->GetFirst());
1408 }
else if (
e->GetFirst() +
e->GetNum() > entries) {
1409 Error(
"ValidateFiles",
1410 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1411 e->GetNum(),
e->GetFirst(), entries,
e->GetFileName());
1412 e->SetNum(entries -
e->GetFirst());
1426 Error(
"ValidateFiles",
"cannot get entries for file: %s - skipping",
e->GetFileName() );
1433 m <<
TString(
Form(
"Cannot get entries for file: %s - skipping",
1442 PDB(kPacketizer,3)
Info(
"ValidateFiles",
" %lld events validated", totent);
1445 if (maxent < 0 || ((totent < maxent) && !byfile))
1465 while ( (el =
dynamic_cast<TDSetElement*
> (next())) ) {
1517 if (elem) maxEntries = elem->
GetNum();
1520 PDB(kPacketizer,3) {
1521 Info(
"CalculatePacketSize",
"%s: switching off synchronization of packet and cache sizes:", slstat->
GetOrdinal());
1522 Info(
"CalculatePacketSize",
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1530 if (bevt > 0. && cachesz > 0 && cpsync) {
1531 if ((
Long64_t)(rate * packetTime * bevt) < cachesz)
1532 packetTime = cachesz / bevt / rate;
1540 num = (
Long64_t)(rate * packetTime);
1544 Info(
"CalculatePacketSize",
"%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1546 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1551 num = (learnent > 0) ? 5 * learnent : 1000;
1555 Info(
"CalculatePacketSize",
"%s: num: %lld", slstat->
GetOrdinal(), num);
1558 if (num < 1) num = 1;
1570 TList **listOfMissingFiles)
1575 Error(
"AddProcessed",
"%s: TSlaveStat instance for worker %s not found!",
1577 (sl ? sl->
GetName() :
"**undef**"));
1598 (*fProgressStatus) += *progress;
1607 Info(
"AddProcessed",
"%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1621 if (numev != expectedNumEv) {
1627 if (newPacket && numev < expectedNumEv) {
1633 Error(
"AddProcessed",
"%s: processed too much? (%lld, %lld)",
1647 return (expectedNumEv - numev);
1674 Error(
"GetNextPacket",
"TSlaveStat instance for worker %s not found!",
1675 (sl ? sl->
GetName() :
"**undef**"));
1686 Int_t learnent = -1;
1690 Double_t latency, proctime, proccpu;
1700 (*r) >> cachesz >> learnent;
1701 if (
r->BufferSize() >
r->Length()) (*r) >> restEntries;
1710 (*r) >> latency >> proctime >> proccpu;
1712 if (
r->BufferSize() >
r->Length()) (*r) >> bytesRead;
1713 if (
r->BufferSize() >
r->Length()) (*r) >> restEntries;
1715 if (
r->BufferSize() >
r->Length()) (*r) >> totev;
1721 if (!fileNotOpen && !fileCorrupted) {
1723 Error(
"GetNextPacket",
"%s: the worker processed a different # of entries", sl->
GetOrdinal());
1726 Error(
"GetNextPacket",
"%s: processed too many entries! (%lld, %lld)",
1734 if (
file->GetElement()) {
1735 if (fileCorrupted) {
1736 Info(
"GetNextPacket",
"%s: file '%s' turned corrupted: invalidating file (%lld)",
1740 Info(
"GetNextPacket",
"%s: %d entries un-processed", sl->
GetOrdinal(), nunproc);
1745 num =
file->GetElement()->GetEntries() + restEntries;
1750 num = restEntries + rest;
1752 file->GetElement()->SetEntries(num);
1754 Info(
"GetNextPacket",
"%s: removed file: %s, entries left: %lld", sl->
GetOrdinal(),
1755 file->GetElement()->GetName(),
file->GetElement()->GetEntries());
1759 Info(
"GetNextPacket",
"%s: file '%s' could not be open: invalidating related element",
1763 file->GetElement()->Invalidate();
1773 Info(
"GetNextPacket",
"%s: error raised by worker, but TFileStat object invalid:"
1778 firstPacket =
kTRUE;
1787 if (
file != 0) nodeName =
file->GetNode()->GetName();
1791 Info(
"GetNextPacket",
"%s: entries processed: %lld - looking for a packet from node '%s'",
1795 if (
file != 0 &&
file->IsDone() ) {
1796 file->GetNode()->DecExtSlaveCnt(slstat->
GetName());
1797 file->GetNode()->DecRunSlaveCnt();
1821 Bool_t nonLocalNodePossible;
1823 nonLocalNodePossible = 0;
1825 nonLocalNodePossible = firstNonLocalNode ?
1828 openLocal = !nonLocalNodePossible;
1830 if ( nonLocalNodePossible &&
fStrategy == 1) {
1837 else if ( slaveRate == 0 ) {
1841 > (avgEventsLeftPerSlave))
1854 Float_t avgTime = avgEventsLeftPerSlave
1856 if (slaveTime * localPreference > avgTime)
1883 if (
file == 0)
return 0;
1889 if (
file->GetNode()->GetMySlaveCnt() == 0 &&
1890 file->GetElement()->GetFirst() ==
file->GetNextEntry()) {
1893 Error(
"GetNextPacket",
1894 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1899 file->GetNode()->IncExtSlaveCnt(slstat->
GetName());
1900 file->GetNode()->IncRunSlaveCnt();
1903 file->GetNode()->GetName(),
1904 file->GetElement()->GetFileName(),
kTRUE);
1918 if (
first + num * 1.5 >= last ) {
1926 file->MoveNextEntry(num);
1952 while ((key = nxw())) {
1954 if (wrkstat && wrkstat->
fCurFile) actw++;
1972 while ((key = nxw()) != 0) {
2018 while ((key = nxw()) != 0) {
2036 Info(
"GetEstEntriesProcessed",
"%s: e:%lld rate:%f dt:%f e:%lld",
2045 Info(
"GetEstEntriesProcessed",
2046 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2055 return ((all) ? 0 : 1);
2069 TList **listOfMissingFiles)
2073 Error(
"MarkBad",
"Worker does not exist");
2093 Int_t nmg = 0, ntries = 100;
2099 if (
e->MergeElement(enxt) >= 0) {
2107 }
while (nmg > 0 && --ntries > 0);
2113 Warning(
"MarkBad",
"subset processed by bad worker not found!");
2129 TList **listOfMissingFiles)
2132 Error(
"ReassignPacket",
"empty packet!");
2136 TUrl url =
e->GetFileName();
2160 if (listOfMissingFiles && *listOfMissingFiles)
2161 (*listOfMissingFiles)->Add((
TObject *)fi);
2172 TList **listOfMissingFiles)
2175 Error(
"SplitPerHost",
"Empty list of packets!");
2178 if (elements->
GetSize() <= 0) {
2179 Error(
"SplitPerHost",
"The input list contains no elements");
2182 TIter subSetIter(elements);
2188 Error(
"SplitPerHost",
"Error removing a missing file");
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h offset
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize fs
R__EXTERN TProofServ * gProofServ
R__EXTERN TProof * gProof
char * Form(const char *fmt,...)
Formats a string in a circular formatting buffer.
void Printf(const char *fmt,...)
Formats a string in a circular formatting buffer and prints the string.
R__EXTERN TSystem * gSystem
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
void Print(Option_t *option="") const override
Default print for collections, calls Print(option, 1).
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Manages an element of a TDSet.
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
const char * GetObjName() const
TObject * GetEntryList() const
void SetFirst(Long64_t first)
void SetTDSetOffset(Long64_t offset)
void SetNum(Long64_t num)
const char * GetDirectory() const
Return directory where to look for object.
void SetEntryList(TObject *aList, Long64_t first=-1, Long64_t num=-1)
Set entry (or event) list for this element.
Long64_t GetTDSetOffset() const
const char * GetFileName() const
Long64_t GetFirst() const
This class implements a data set to be used for PROOF processing.
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
virtual void Reset()
Reset or initialize access to the elements.
const char * GetType() const
TList * GetListOfElements() const
A List of entry numbers in a TTree or TChain.
virtual Long64_t GetN() const
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
A TEventList object is a list of selected events (entries) in a TTree.
virtual Int_t GetN() const
Class describing a generic file including meta information.
TObject * After(const TObject *obj) const override
Returns the object after object obj.
void Clear(Option_t *option="") override
Remove all objects from the list.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
void Add(TObject *obj) override
TObject * Remove(TObject *obj) override
Remove object from the list.
TObject * Last() const override
Return the last object in the list. Returns 0 when list is empty.
TObject * First() const override
Return the first object in the list. Returns 0 when list is empty.
TObject * At(Int_t idx) const override
Returns the object at position idx. Returns 0 if idx is out of range.
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
void Add(TObject *obj) override
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
TObject * Remove(TObject *key) override
Remove the (key,value) pair with key from the map.
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void DeActivateAll()
De-activate all activated sockets.
virtual void DeActivate(TSocket *sock)
De-activate a socket.
TList * GetListOfActives() const
Returns a list with all active sockets.
const char * GetName() const override
Returns name of object.
const char * GetTitle() const override
Returns title of object.
Collectable string class.
const TString & GetString() const
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
virtual const char * GetName() const
Returns name of object.
virtual const char * ClassName() const
Returns name of class to which the object belongs.
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Int_t GetRunSlaveCnt() const
Int_t GetMySlaveCnt() const
Int_t GetNumberOfActiveFiles() const
Int_t GetSlaveCnt() const
void IncEvents(Long64_t nEvents)
Long64_t GetNEvents() const
void IncProcessed(Long64_t nEvents)
void DecreaseProcessed(Long64_t nEvents)
TFileStat * GetNextActive()
TSortedList * fFilesToProcess
void DecExtSlaveCnt(const char *slave)
Long64_t GetEventsLeftPerSlave() const
Int_t Compare(const TObject *other) const
Compare abstract method.
void IncExtSlaveCnt(const char *slave)
TFileStat * GetNextUnAlloc()
Bool_t IsSortable() const
void RemoveActive(TFileStat *file)
TFileNode(const char *name, Int_t strategy, TSortedList *files)
Long64_t GetProcessed() const
TObject * fUnAllocFileNext
Int_t GetExtSlaveCnt() const
const char * GetName() const
Returns name of object.
void Add(TDSetElement *elem, Bool_t tolist)
void Print(Option_t *=0) const
This method must be overridden when a class wants to print itself.
void MoveNextEntry(Long64_t step)
Bool_t IsSortable() const
TDSetElement * GetElement() const
Int_t Compare(const TObject *obj) const
Compare abstract method.
Long64_t GetNextEntry() const
TFileStat(TFileNode *node, TDSetElement *elem, TList *file)
TFileNode * GetNode() const
void Print(Option_t *=0) const
This method must be overridden when a class wants to print itself.
TFileNode * GetFileNode() const
Long64_t GetEntriesProcessed() const
Int_t GetLocalEventsLeft()
TSlaveStat(TSlave *slave)
Constructor.
TProofProgressStatus * GetProgressStatus()
void UpdateRates(TProofProgressStatus *st)
Update packetizer rates.
TList * GetProcessedSubSet()
void SetFileNode(TFileNode *node)
TProofProgressStatus * AddProcessed(TProofProgressStatus *st=0)
Add the current element to the fDSubSet (subset processed by this worker) and if the status arg is gi...
Double_t GetProcTime() const
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0)
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
virtual ~TPacketizerAdaptive()
Destructor.
Double_t fMaxEntriesRatio
TFileNode * NextNode()
Get next node which has unallocated files.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetActiveWorkers()
Return the number of workers still processing.
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles)
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
Float_t fBaseLocalPreference
Float_t fFractionOfRemoteFiles
Long64_t fNEventsOnRemLoc
TFileNode * NextActiveNode()
Get next active node.
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Named parameter, streamable and storable.
Container class for processing statistics.
Double_t GetProcTime() const
Double_t GetLastUpdate() const
Long64_t GetEntries() const
Double_t GetCurrentRate() const
Get current rate. Rteunr the average rate if the current is not defined.
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
Long64_t GetBytesRead() const
TSocket * GetSocket() const
This class controls a Parallel ROOT Facility, PROOF, cluster.
TObject * GetParameter(const char *par) const
Get specified parameter.
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Class describing a PROOF worker server.
TSocket * GetSocket() const
Int_t GetProtocol() const
const char * GetName() const
Returns name of object.
const char * GetOrdinal() const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
A sorted doubly linked list.
const char * Data() const
TObjArray * Tokenize(const TString &delim) const
This function is used to isolate sequential tokens in a TString.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
virtual TTime Now()
Get current time in milliseconds since 0:00 Jan 1 1995.
virtual const char * HostName()
Return the system's host name.
Basic time type with millisecond precision.
This class represents a WWW compatible URL.
const char * GetUrl(Bool_t withDeflt=kFALSE) const
Return full URL.
const char * GetFile() const
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
const char * GetHost() const
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
void SetHost(const char *host)
const char * GetProtocol() const
const char * GetOrdinal() const
const char * GetName() const
Returns name of object.
TProofProgressStatus * fStatus
The packetizer is a load balancing object created for each query.
Float_t GetProcTime() const
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetReadCalls() const
Long64_t GetEntriesProcessed() const
Double_t GetCumProcTime() const
Long64_t GetBytesRead() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.