Logo ROOT   6.18/05
Reference Guide
TPacketizer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 18/03/02
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TPacketizer
13\ingroup proofkernel
14
15This class generates packets to be processed on PROOF worker servers.
16A packet is an event range (begin entry and number of entries) or
17object range (first object and number of objects) in a TTree
18(entries) or a directory (objects) in a file.
19Packets are generated taking into account the performance of the
20remote machine, the time it took to process a previous packet on
21the remote machine, the locality of the database files, etc.
22
23*/
24
25#include "TPacketizer.h"
26
27#include "Riostream.h"
28#include "TDSet.h"
29#include "TEnv.h"
30#include "TError.h"
31#include "TEventList.h"
32#include "TEntryList.h"
33#include "TMap.h"
34#include "TMessage.h"
35#include "TMonitor.h"
36#include "TNtupleD.h"
37#include "TObject.h"
38#include "TParameter.h"
39#include "TPerfStats.h"
40#include "TProofDebug.h"
41#include "TProof.h"
42#include "TProofPlayer.h"
43#include "TProofServ.h"
44#include "TSlave.h"
45#include "TSocket.h"
46#include "TTimer.h"
47#include "TUrl.h"
48#include "TClass.h"
49#include "TMath.h"
50#include "TObjString.h"
51
52//
53// The following three utility classes manage the state of the
54// work to be performed and the slaves involved in the process.
55// A list of TFileNode(s) describes the hosts with files, each
56// has a list of TFileStat(s) keeping the state for each TDSet
57// element (file).
58//
59// The list of TSlaveStat(s) keep track of the work (being) done
60// by each slave
61//
62
63
64//------------------------------------------------------------------------------
65
66class TPacketizer::TFileStat : public TObject {
67
68private:
69 Bool_t fIsDone; // is this element processed
70 TFileNode *fNode; // my FileNode
71 TDSetElement *fElement; // location of the file and its range
72 Long64_t fNextEntry; // cursor in the range, -1 when done
73
74public:
75 TFileStat(TFileNode *node, TDSetElement *elem);
76
77 Bool_t IsDone() const {return fIsDone;}
78 void SetDone() {fIsDone = kTRUE;}
79 TFileNode *GetNode() const {return fNode;}
80 TDSetElement *GetElement() const {return fElement;}
81 Long64_t GetNextEntry() const {return fNextEntry;}
82 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
83};
84
85
86TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
87 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
88{
89}
90
91
92//------------------------------------------------------------------------------
93
94class TPacketizer::TFileNode : public TObject {
95
96private:
97 TString fNodeName; // FQDN of the node
98 TList *fFiles; // TDSetElements (files) stored on this node
99 TObject *fUnAllocFileNext; // cursor in fFiles
100 TList *fActFiles; // files with work remaining
101 TObject *fActFileNext; // cursor in fActFiles
102 Int_t fMySlaveCnt; // number of slaves running on this node
103 Int_t fSlaveCnt; // number of external slaves processing files on this node
104
105public:
106 TFileNode(const char *name);
107 ~TFileNode() { delete fFiles; delete fActFiles; }
108
109 void IncMySlaveCnt() { fMySlaveCnt++; }
110 void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
111 void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
112 Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
113 Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
114 Bool_t IsSortable() const { return kTRUE; }
115
116 const char *GetName() const { return fNodeName.Data(); }
117
118 void Add(TDSetElement *elem)
119 {
120 TFileStat *f = new TFileStat(this,elem);
121 fFiles->Add(f);
122 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
123 }
124
125 TFileStat *GetNextUnAlloc()
126 {
127 TObject *next = fUnAllocFileNext;
128
129 if (next != 0) {
130 // make file active
131 fActFiles->Add(next);
132 if (fActFileNext == 0) fActFileNext = fActFiles->First();
133
134 // move cursor
135 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
136 }
137
138 return (TFileStat *) next;
139 }
140
141 TFileStat *GetNextActive()
142 {
143 TObject *next = fActFileNext;
144
145 if (fActFileNext != 0) {
146 fActFileNext = fActFiles->After(fActFileNext);
147 if (fActFileNext == 0) fActFileNext = fActFiles->First();
148 }
149
150 return (TFileStat *) next;
151 }
152
153 void RemoveActive(TFileStat *file)
154 {
155 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
156 fActFiles->Remove(file);
157 if (fActFileNext == 0) fActFileNext = fActFiles->First();
158 }
159
160 Int_t Compare(const TObject *other) const
161 {
162 // Must return -1 if this is smaller than obj, 0 if objects are equal
163 // and 1 if this is larger than obj.
164 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
165 if (!obj) {
166 Error("Compare", "input is not a TPacketizer::TFileNode object");
167 return 0;
168 }
169
170 Int_t myVal = GetSlaveCnt();
171 Int_t otherVal = obj->GetSlaveCnt();
172 if (myVal < otherVal) {
173 return -1;
174 } else if (myVal > otherVal) {
175 return 1;
176 } else {
177 return 0;
178 }
179 }
180
181 void Print(Option_t *) const
182 {
183 std::cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
184 << "\tMySlaveCount " << fMySlaveCnt
185 << "\tSlaveCount " << fSlaveCnt << std::endl;
186 }
187
188 void Reset()
189 {
190 fUnAllocFileNext = fFiles->First();
191 fActFiles->Clear();
192 fActFileNext = 0;
193 fSlaveCnt = 0;
194 fMySlaveCnt = 0;
195 }
196};
197
198
199TPacketizer::TFileNode::TFileNode(const char *name)
200 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
201 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
202{
203 // Constructor
204
205 fFiles->SetOwner();
206 fActFiles->SetOwner(kFALSE);
207}
208
209
210//------------------------------------------------------------------------------
211
212class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
213
214friend class TPacketizer;
215
216private:
217 TFileNode *fFileNode; // corresponding node or 0
218 TFileStat *fCurFile; // file currently being processed
219 TDSetElement *fCurElem; // TDSetElement currently being processed
221public:
222 TSlaveStat(TSlave *slave);
223 ~TSlaveStat();
224
225 TFileNode *GetFileNode() const { return fFileNode; }
226
227 void SetFileNode(TFileNode *node) { fFileNode = node; }
228};
229
230
231TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
232 : fFileNode(0), fCurFile(0), fCurElem(0)
233{
234 fSlave = slave;
235 fStatus = new TProofProgressStatus();
236}
237
238////////////////////////////////////////////////////////////////////////////////
239/// Cleanup
240
241TPacketizer::TSlaveStat::~TSlaveStat()
242{
243 SafeDelete(fStatus);
244}
245
246TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
247{
248 // Update the status info to the 'st'.
249 // return the difference (*st - *fStatus)
250
251 if (st) {
252 // The entriesis not correct in 'st'
253 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
254 // The last proc time should not be added
255 fStatus->SetLastProcTime(0.);
256 // Get the diff
257 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
258 *fStatus += *diff;
259 // Set the correct value
260 fStatus->SetLastEntries(lastEntries);
261 return diff;
262 } else {
263 Error("AddProcessed", "status arg undefined");
264 return 0;
265 }
266}
267
268//------------------------------------------------------------------------------
269
271
272////////////////////////////////////////////////////////////////////////////////
273/// Constructor
274
276 Long64_t num, TList *input, TProofProgressStatus *st)
277 : TVirtualPacketizer(input, st)
278{
279 PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
280
281 // Init pointer members
282 fPackets = 0;
283 fUnAllocated = 0;
284 fActive = 0;
285 fFileNodes = 0;
286 fMaxPerfIdx = 1;
287 fMaxSlaveCnt = 0;
290
291 if (!fProgressStatus) {
292 Error("TPacketizer", "No progress status");
293 return;
294 }
295
296 Long_t maxSlaveCnt = 0;
297 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
298 if (maxSlaveCnt < 0) {
299 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
300 maxSlaveCnt = 0;
301 }
302 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
303 } else {
304 // Try also with Int_t (recently supported in TProof::SetParameter)
305 Int_t mxslcnt = -1;
306 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
307 if (mxslcnt < 0) {
308 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
309 mxslcnt = 0;
310 }
311 maxSlaveCnt = (Long_t) mxslcnt;
312 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
313 }
314 }
315 if (!maxSlaveCnt) {
316 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize());
317 if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
318 }
319 if (maxSlaveCnt > 0) {
320 fMaxSlaveCnt = maxSlaveCnt;
321 PDB(kPacketizer,1)
322 Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
323 }
324
325 fPackets = new TList;
327
328 fFileNodes = new TList;
330 fUnAllocated = new TList;
332 fActive = new TList;
334
335
336 fValid = kTRUE;
337
338 // Resolve end-point urls to optmize distribution
339 // dset->Lookup(); // moved to TProofPlayerRemote::Process
340
341 // Split into per host entries
342 dset->Reset();
344 while ((e = (TDSetElement*)dset->Next())) {
345 if (e->GetValid()) continue;
346
347 TUrl url = e->GetFileName();
348
349 // Map non URL filenames to dummy host
350 TString host;
351 if ( !url.IsValid() ||
352 (strncmp(url.GetProtocol(),"root", 4) &&
353 strncmp(url.GetProtocol(),"file", 4)) ) {
354 host = "no-host";
355 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
356 host = "localhost";
357 url.SetProtocol("root");
358 } else {
359 host = url.GetHost();
360 }
361 // Get full name for local hosts
362 if (host.Contains("localhost") || host == "127.0.0.1") {
363 url.SetHost(gSystem->HostName());
364 host = url.GetHostFQDN();
365 }
366
367 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
368
369 if (node == 0) {
370 node = new TFileNode(host);
371 fFileNodes->Add(node);
372 }
373
374 node->Add( e );
375 }
376
377 fSlaveStats = new TMap;
379
380 // Record initial available workers
381 Int_t nwrks = AddWorkers(slaves);
382 Info("TPacketizer", "Initial number of workers: %d", nwrks);
383
384 // Setup file & filenode structure
385 Reset();
386 // Optimize the number of files to be open when running on subsample
387 Int_t validateMode = 0;
388 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
389 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
390 if (num > -1)
391 PDB(kPacketizer,2)
392 Info("TPacketizer",
393 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
394 ValidateFiles(dset, slaves, num, byfile);
395
396 if (!fValid) return;
397
398 // apply global range (first,num) to dset and rebuild structure
399 // ommitting TDSet elements that are not needed
400
401 Int_t files = 0;
402 fTotalEntries = 0;
403 fUnAllocated->Clear(); // avoid dangling pointers
404 fActive->Clear();
405 fFileNodes->Clear(); // then delete all objects
406 PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
407
408 dset->Reset();
409 Long64_t cur = 0;
410 while (( e = (TDSetElement*)dset->Next())) {
411
412 // Skip invalid or missing file; It will be moved
413 // from the dset to the 'MissingFiles' list in the player.
414 if (!e->GetValid()) continue;
415
416 // The dataset name, if any
417 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
418 fDataSet = e->GetDataSet();
419
420 TUrl url = e->GetFileName();
421 Long64_t eFirst = e->GetFirst();
422 Long64_t eNum = e->GetNum();
423 PDB(kPacketizer,2)
424 Info("TPacketizer", " --> '%s'", e->GetFileName());
425 PDB(kPacketizer,2)
426 Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
427
428 if (!e->GetEntryList()){
429 // this element is before the start of the global range, skip it
430 if (cur + eNum < first) {
431 cur += eNum;
432 PDB(kPacketizer,2)
433 Info("TPacketizer", " --> skip element cur %lld", cur);
434 continue;
435 }
436
437 // this element is after the end of the global range, skip it
438 if (num != -1 && (first+num <= cur)) {
439 cur += eNum;
440 PDB(kPacketizer,2)
441 Info("TPacketizer", " --> drop element cur %lld", cur);
442 continue; // break ??
443 }
444
445 Bool_t inRange = kFALSE;
446 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
447
448 if (cur <= first) {
449 // If this element contains the start of the global range
450 // adjust its start and number of entries
451 e->SetFirst( eFirst + (first - cur) );
452 e->SetNum( e->GetNum() - (first - cur) );
453 PDB(kPacketizer,2)
454 Info("TPacketizer", " --> adjust start %lld and end %lld",
455 eFirst + (first - cur), first + num - cur);
456 inRange = kTRUE;
457 }
458 if (num != -1 && (first+num <= cur+eNum)) {
459 // If this element contains the end of the global range
460 // adjust its number of entries
461 e->SetNum( first + num - e->GetFirst() - cur );
462 PDB(kPacketizer,2)
463 Info("TPacketizer", " --> adjust end %lld", first + num - cur);
464 inRange = kTRUE;
465 }
466
467 } else {
468 // Increment the counter ...
469 PDB(kPacketizer,2)
470 Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
471 cur += eNum;
472 }
473 // Re-adjust eNum and cur, if needed
474 if (inRange) {
475 cur += eNum;
476 eNum = e->GetNum();
477 }
478
479 } else {
480 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
481 if (enl) {
482 eNum = enl->GetN();
483 } else {
484 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
485 eNum = evl ? evl->GetN() : eNum;
486 }
487 if (!eNum)
488 continue;
489 }
490 PDB(kPacketizer,2)
491 Info("TPacketizer", " --> next cur %lld", cur);
492
493 // Map non URL filenames to dummy host
494 TString host;
495 if ( !url.IsValid() ||
496 (strncmp(url.GetProtocol(),"root", 4) &&
497 strncmp(url.GetProtocol(),"file", 4)) ) {
498 host = "no-host";
499 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
500 host = "localhost";
501 url.SetProtocol("root");
502 } else {
503 host = url.GetHostFQDN();
504 }
505 // Get full name for local hosts
506 if (host.Contains("localhost") || host == "127.0.0.1") {
507 url.SetHost(gSystem->HostName());
508 host = url.GetHostFQDN();
509 }
510
511 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
512
513 if ( node == 0 ) {
514 node = new TFileNode( host );
515 fFileNodes->Add( node );
516 }
517
518 ++files;
519 fTotalEntries += eNum;
520 node->Add(e);
521 PDB(kPacketizer,2) e->Print("a");
522 }
523
524 PDB(kPacketizer,1)
525 Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
526 fTotalEntries, files, fFileNodes->GetSize());
527
528 // Set the total number for monitoring
529 if (gPerfStats)
530 gPerfStats->SetNumEvents(fTotalEntries);
531
532 Reset();
533
534 if (fFileNodes->GetSize() == 0) {
535 Info("TPacketizer", "no valid or non-empty file found: setting invalid");
536 // No valid files: set invalid and return
537 fValid = kFALSE;
538 return;
539 }
540
541 // Below we provide a possibility to change the way packet size is
542 // calculated or define the packet size directly.
543 // fPacketAsAFraction can be interpreted as follows:
544 // assuming all slaves have equal processing rate,
545 // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
546 // It substitutes 20 in the old formula to calculate the fPacketSize:
547 // fPacketSize = fTotalEntries / (20 * nslaves)
548 Long_t packetAsAFraction = 20;
549 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
550 Info("Process", "using alternate fraction of query time as a packet Size: %ld",
551 packetAsAFraction);
552 fPacketAsAFraction = (Int_t)packetAsAFraction;
553
554 fPacketSize = 1;
555 if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
556 Info("Process","using alternate packet size: %lld", fPacketSize);
557 } else {
558 // Heuristic for starting packet size
560 Int_t nslaves = fSlaveStats->GetSize();
561 if (nslaves > 0) {
563 if (fPacketSize < 1) fPacketSize = 1;
564 } else {
565 fPacketSize = 1;
566 }
567 }
568
569 PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
570
571 if (!fValid)
573
574 PDB(kPacketizer,1) Info("TPacketizer", "Return");
575}
576
577////////////////////////////////////////////////////////////////////////////////
578/// Destructor.
579
581{
582 if (fSlaveStats) {
584 }
585
591}
592
593////////////////////////////////////////////////////////////////////////////////
594/// Adds new workers. Returns the number of workers added, or -1 on failure.
595
597{
598 if (!workers) {
599 Error("AddWorkers", "Null list of new workers!");
600 return -1;
601 }
602
603 Int_t curNumOfWrks = fSlaveStats->GetEntries();
604
605 TSlave *sl;
606 TIter next(workers);
607 while (( sl = dynamic_cast<TSlave*>(next()) ))
608 if (!fSlaveStats->FindObject(sl)) {
609 fSlaveStats->Add(sl, new TSlaveStat(sl));
611 }
612
613 // If heuristic (and new workers) set the packet size
614 Int_t nwrks = fSlaveStats->GetSize();
615 if (fHeuristicPSiz && nwrks > curNumOfWrks) {
616 if (nwrks > 0) {
618 if (fPacketSize < 1) fPacketSize = 1;
619 } else {
620 fPacketSize = 1;
621 }
622 }
623
624 // Update the max number that can access one file node if the default is used
625 if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
626
627 // Done
628 return nwrks;
629}
630
631////////////////////////////////////////////////////////////////////////////////
632/// Get next unallocated file.
633
634TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
635{
636 TFileStat *file = 0;
637
638 if (node != 0) {
639 file = node->GetNextUnAlloc();
640 if (file == 0) RemoveUnAllocNode(node);
641 } else {
642 while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
643 file = node->GetNextUnAlloc();
644 if (file == 0) RemoveUnAllocNode(node);
645 }
646 }
647
648 if (file != 0) {
649 // if needed make node active
650 if (fActive->FindObject(node) == 0) {
651 fActive->Add(node);
652 }
653 }
654
655 return file;
656}
657
658////////////////////////////////////////////////////////////////////////////////
659/// Get next unallocated node.
660
661TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
662{
664 PDB(kPacketizer,2) {
665 std::cout << "TPacketizer::NextUnAllocNode()" << std::endl;
667 }
668
669 TFileNode *fn = (TFileNode*) fUnAllocated->First();
670 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
671 PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
673 fn = 0;
674 }
675
676 return fn;
677}
678
679////////////////////////////////////////////////////////////////////////////////
680/// Remove unallocated node.
681
682void TPacketizer::RemoveUnAllocNode(TFileNode * node)
683{
684 fUnAllocated->Remove(node);
685}
686
687////////////////////////////////////////////////////////////////////////////////
688/// Get next active file.
689
690TPacketizer::TFileStat *TPacketizer::GetNextActive()
691{
692 TFileNode *node;
693 TFileStat *file = 0;
694
695 while (file == 0 && ((node = NextActiveNode()) != 0)) {
696 file = node->GetNextActive();
697 if (file == 0) RemoveActiveNode(node);
698 }
699
700 return file;
701}
702
703////////////////////////////////////////////////////////////////////////////////
704/// Get next active node.
705
706TPacketizer::TFileNode *TPacketizer::NextActiveNode()
707{
708 fActive->Sort();
709 PDB(kPacketizer,2) {
710 Printf("TPacketizer::NextActiveNode : ----------------------");
711 fActive->Print();
712 }
713
714 TFileNode *fn = (TFileNode*) fActive->First();
715 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
716 PDB(kPacketizer,1)
717 Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
718 fn = 0;
719 }
720
721 return fn;
722}
723
724////////////////////////////////////////////////////////////////////////////////
725/// Remove file from the list of actives.
726
728{
729 TFileNode *node = file->GetNode();
730
731 node->RemoveActive(file);
732 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
733}
734
735////////////////////////////////////////////////////////////////////////////////
736/// Remove node from the list of actives.
737
738void TPacketizer::RemoveActiveNode(TFileNode *node)
739{
740 fActive->Remove(node);
741}
742
743////////////////////////////////////////////////////////////////////////////////
744/// Reset the internal datastructure for packet distribution.
745
747{
750
751 fActive->Clear();
752
753 TIter files(fFileNodes);
754 TFileNode *fn;
755 while ((fn = (TFileNode*) files.Next()) != 0) {
756 fn->Reset();
757 }
758
759 TIter slaves(fSlaveStats);
760 TObject *key;
761 while ((key = slaves.Next()) != 0) {
762 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
763 if (slstat) {
764 fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
765 if (fn != 0 ) {
766 slstat->SetFileNode(fn);
767 fn->IncMySlaveCnt();
768 }
769 slstat->fCurFile = 0;
770 } else {
771 Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
772 }
773 }
774}
775
776////////////////////////////////////////////////////////////////////////////////
777/// Check existence of file/dir/tree an get number of entries.
778/// Assumes the files have been setup.
779
780void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
781{
782 TMap slaves_by_sock;
783 TMonitor mon;
784 TList workers;
785
786
787 // Setup the communication infrastructure
788
789 workers.AddAll(slaves);
790 TIter si(slaves);
791 TSlave *slm = 0;
792 while ((slm = (TSlave*)si.Next()) != 0) {
793 PDB(kPacketizer,3)
794 Info("ValidateFiles","socket added to monitor: %p (%s)",
795 slm->GetSocket(), slm->GetName());
796 mon.Add(slm->GetSocket());
797 slaves_by_sock.Add(slm->GetSocket(), slm);
798 PDB(kPacketizer,1)
799 Info("ValidateFiles",
800 "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
801 }
802
803 mon.DeActivateAll();
804
805 ((TProof*)gProof)->DeActivateAsyncInput();
806
807 // Some monitoring systems (TXSocketHandler) need to know this
808 ((TProof*)gProof)->fCurrentMonitor = &mon;
809
810 // Preparing for client notification
811 TString msg("Validating files");
812 UInt_t n = 0;
813 UInt_t tot = dset->GetListOfElements()->GetSize();
814 Bool_t st = kTRUE;
815
816 Long64_t totent = 0, nopenf = 0;
817 while (kTRUE) {
818
819 // send work
820 while( TSlave *s = (TSlave*)workers.First() ) {
821
822 workers.Remove(s);
823
824 // find a file
825
826 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
827 if (!slstat) {
828 Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
829 continue;
830 }
831 TFileNode *node = 0;
832 TFileStat *file = 0;
833
834 // try its own node first
835 if ( (node = slstat->GetFileNode()) != 0 ) {
836 file = GetNextUnAlloc(node);
837 if ( file == 0 ) {
838 slstat->SetFileNode(0);
839 }
840 }
841
842 // look for a file on any other node if necessary
843 if (file == 0) {
845 }
846
847 if ( file != 0 ) {
848 // files are done right away
850
851 slstat->fCurFile = file;
852 TDSetElement *elem = file->GetElement();
853 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
854 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
855 // This is decremented when we get the reply
856 file->GetNode()->IncSlaveCnt(slstat->GetName());
858 m << dset->IsTree()
859 << TString(elem->GetFileName())
860 << TString(elem->GetDirectory())
861 << TString(elem->GetObjName());
862
863 s->GetSocket()->Send( m );
864 mon.Activate(s->GetSocket());
865 PDB(kPacketizer,2)
866 Info("ValidateFiles",
867 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
868 s->GetOrdinal(), s->GetName(), s->GetSocket(),
869 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
870 elem->GetDirectory(), elem->GetObjName());
871 } else {
872 // Fill the info
873 elem->SetTDSetOffset(entries);
874 if (entries > 0) {
875 // Most likely valid
876 elem->SetValid();
877 if (!elem->GetEntryList()) {
878 if (elem->GetFirst() > entries) {
879 Error("ValidateFiles",
880 "first (%lld) higher then number of entries (%lld) in %s",
881 elem->GetFirst(), entries, elem->GetFileName());
882 // disable element
883 slstat->fCurFile->SetDone();
884 elem->Invalidate();
886 }
887 if (elem->GetNum() == -1) {
888 elem->SetNum(entries - elem->GetFirst());
889 } else if (elem->GetFirst() + elem->GetNum() > entries) {
890 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
891 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
892 entries, elem->GetFileName());
893 elem->SetNum(entries - elem->GetFirst());
894 }
895 PDB(kPacketizer,2)
896 Info("ValidateFiles",
897 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
898 }
899 }
900 // Notify the client
901 n++;
902 gProof->SendDataSetStatus(msg, n, tot, st);
903
904 // This worker is ready for the next validation
905 workers.Add(s);
906 }
907 }
908 }
909
910 // Check if there is anything to wait for
911 if (mon.GetActive() == 0) {
912 if (byfile && maxent > 0 && totent > 0) {
913 // How many files do we still need ?
914 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
915 if (nrestf <= 0 && maxent > totent) nrestf = 1;
916 if (nrestf > 0) {
917 PDB(kPacketizer,3)
918 Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
919 maxent, totent, nopenf, nrestf);
920 si.Reset();
921 while ((slm = (TSlave *) si.Next()) && nrestf--) {
922 workers.Add(slm);
923 }
924 continue;
925 } else {
926 PDB(kPacketizer,3)
927 Info("ValidateFiles", "no need to validate more files");
928 break;
929 }
930 } else {
931 break;
932 }
933 }
934
935 PDB(kPacketizer,3) {
936 Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
937 TList *act = mon.GetListOfActives();
938 TIter next(act);
939 TSocket *s = 0;
940 while ((s = (TSocket*) next())) {
941 Info("ValidateFiles", "found sck: %p", s);
942 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
943 if (sl)
944 Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
945 }
946 delete act;
947 }
948
949 TSocket *sock = mon.Select();
950 // If we have been interrupted break
951 if (!sock) {
952 Error("ValidateFiles", "selection has been interrupted - STOP");
953 mon.DeActivateAll();
954 fValid = kFALSE;
955 break;
956 }
957 mon.DeActivate(sock);
958
959 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
960
961 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
962 if (!sock->IsValid()) {
963 // A socket got invalid during validation
964 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
965 slave->GetOrdinal(), slave->GetName());
966 ((TProof*)gProof)->MarkBad(slave);
967 fValid = kFALSE;
968 break;
969 }
970
971 TMessage *reply;
972
973 if ( sock->Recv(reply) <= 0 ) {
974 // Help! lost a slave?
975 ((TProof*)gProof)->MarkBad(slave);
976 fValid = kFALSE;
977 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
978 slave->GetOrdinal(), slave->GetName());
979 continue;
980 }
981
982 if (reply->What() != kPROOF_GETENTRIES) {
983 // Not what we want: handover processing to the central machinery
984 Int_t what = reply->What();
985 ((TProof*)gProof)->HandleInputMessage(slave, reply);
986 if (what == kPROOF_FATAL) {
987 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
988 slave->GetOrdinal(), slave->GetName());
989 fValid = kFALSE;
990 } else {
991 // Reactivate the socket
992 mon.Activate(sock);
993 }
994 // Get next message
995 continue;
996 }
997
998 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
999 TDSetElement *e = slavestat->fCurFile->GetElement();
1000 slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1001 Long64_t entries;
1002
1003 (*reply) >> entries;
1004
1005 // Extract object name, if there
1006 if ((reply->BufferSize() > reply->Length())) {
1007 TString objname;
1008 (*reply) >> objname;
1009 e->SetTitle(objname);
1010 }
1011
1012 e->SetTDSetOffset(entries);
1013 if ( entries > 0 ) {
1014
1015 // This dataset element is most likely valid
1016 e->SetValid();
1017
1018 //if (!e->GetEventList()) {
1019 if (!e->GetEntryList()){
1020 if ( e->GetFirst() > entries ) {
1021 Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
1022 e->GetFirst(), entries, e->GetFileName());
1023
1024 // Invalidate the element
1025 slavestat->fCurFile->SetDone();
1026 e->Invalidate();
1028 }
1029
1030 if ( e->GetNum() == -1 ) {
1031 e->SetNum( entries - e->GetFirst() );
1032 } else if ( e->GetFirst() + e->GetNum() > entries ) {
1033 Error("ValidateFiles",
1034 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1035 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1036 e->SetNum(entries - e->GetFirst());
1037 }
1038 }
1039
1040 // Count
1041 totent += entries;
1042 nopenf++;
1043
1044 // Notify the client
1045 n++;
1046 gProof->SendDataSetStatus(msg, n, tot, st);
1047
1048 } else {
1049
1050 Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
1051 //
1052 // Need to fix this with a user option to allow incomplete file sets (rdm)
1053 //
1054 //fValid = kFALSE; // all element must be readable!
1055 if (gProofServ) {
1057 m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
1059 }
1060
1061 // Invalidate the element
1062 e->Invalidate();
1064 }
1065 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1066
1067 // Ready for the next job, unless we have enough files
1068 if (maxent < 0 || ((totent < maxent) && !byfile))
1069 workers.Add(slave);
1070 }
1071
1072 // report std. output from slaves??
1073
1074 ((TProof*)gProof)->ActivateAsyncInput();
1075
1076 // This needs to be reset
1077 ((TProof*)gProof)->fCurrentMonitor = 0;
1078
1079 // No reason to continue if invalid
1080 if (!fValid)
1081 return;
1082
1083
1084 // compute the offset for each file element
1085 Long64_t offset = 0;
1086 Long64_t newOffset = 0;
1087 TIter next(dset->GetListOfElements());
1088 TDSetElement *el;
1089 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1090 newOffset = offset + el->GetTDSetOffset();
1091 el->SetTDSetOffset(offset);
1092 offset = newOffset;
1093 }
1094}
1095
1096////////////////////////////////////////////////////////////////////////////////
1097/// Get entries processed by the specified slave.
1098
1100{
1101 if ( fSlaveStats == 0 ) return 0;
1102
1103 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1104
1105 if ( slstat == 0 ) return 0;
1106
1107 return slstat->GetEntriesProcessed();
1108}
1109
1110////////////////////////////////////////////////////////////////////////////////
1111/// Get Estimation of the current rate; just summing the current rates of
1112/// the active workers
1113
1115{
1116 all = kTRUE;
1117 // Loop over the workers
1118 Float_t currate = 0.;
1119 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1120 TIter nxw(fSlaveStats);
1121 TObject *key;
1122 while ((key = nxw()) != 0) {
1123 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1124 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1125 // Sum-up the current rates
1126 currate += slstat->GetProgressStatus()->GetCurrentRate();
1127 } else {
1128 all = kFALSE;
1129 }
1130 }
1131 }
1132 // Done
1133 return currate;
1134}
1135
1136////////////////////////////////////////////////////////////////////////////////
1137/// Get next packet
1138
1140{
1141 if ( !fValid ) {
1142 return 0;
1143 }
1144
1145 // Find worker
1146
1147 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1148
1149 R__ASSERT( slstat != 0 );
1150
1151 PDB(kPacketizer,1)
1152 Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1153 // update stats & free old element
1154
1155 Bool_t firstPacket = kFALSE;
1156 if ( slstat->fCurElem != 0 ) {
1157 Double_t latency = 0., proctime = 0., proccpu = 0.;
1158 Long64_t bytesRead = -1;
1159 Long64_t totalEntries = -1;
1160 Long64_t totev = 0;
1161 Long64_t numev = slstat->fCurElem->GetNum();
1162
1163 fPackets->Add(slstat->fCurElem);
1164
1165 if (sl->GetProtocol() > 18) {
1166 TProofProgressStatus *status = 0;
1167 (*r) >> latency;
1168 (*r) >> status;
1169
1170 // Calculate the progress made in the last packet
1171 TProofProgressStatus *progress = 0;
1172 if (status) {
1173 // upadte the worker status
1174 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1175 progress = slstat->AddProcessed(status);
1176 if (progress) {
1177 // (*fProgressStatus) += *progress;
1178 proctime = progress->GetProcTime();
1179 proccpu = progress->GetCPUTime();
1180 totev = status->GetEntries(); // for backward compatibility
1181 bytesRead = progress->GetBytesRead();
1182 delete progress;
1183 }
1184 delete status;
1185 } else
1186 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
1187 } else {
1188
1189 (*r) >> latency >> proctime >> proccpu;
1190
1191 // only read new info if available
1192 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1193 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1194 if (r->BufferSize() > r->Length()) (*r) >> totev;
1195
1196 numev = totev - slstat->GetEntriesProcessed();
1197 if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1198 if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1199 if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1200 }
1201
1202 if (fProgressStatus) {
1203 if (numev > 0) fProgressStatus->IncEntries(numev);
1204 if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1205 if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1206 }
1207 PDB(kPacketizer,2)
1208 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1209 sl->GetOrdinal(), sl->GetName(),
1210 numev, latency, proctime, proccpu, bytesRead);
1211
1212 if (gPerfStats)
1213 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1214 numev, latency, proctime, proccpu, bytesRead);
1215
1216 slstat->fCurElem = 0;
1218 HandleTimer(0); // Send last timer message
1219 delete fProgress; fProgress = 0;
1220 }
1221 } else {
1222 firstPacket = kTRUE;
1223 }
1224
1225 if ( fStop ) {
1226 HandleTimer(0);
1227 return 0;
1228 }
1229
1230 // get a file if needed
1231
1232 TFileStat *file = slstat->fCurFile;
1233
1234 if ( file != 0 && file->IsDone() ) {
1235 file->GetNode()->DecSlaveCnt(slstat->GetName());
1236 if (gPerfStats)
1237 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1238 file->GetElement()->GetFileName(), kFALSE);
1239 file = 0;
1240 }
1241 // Reset the current file field
1242 slstat->fCurFile = file;
1243
1244 if (!file) {
1245
1246 // Try its own node first
1247 if (slstat->GetFileNode() != 0) {
1248 file = GetNextUnAlloc(slstat->GetFileNode());
1249 if (!file) {
1250 slstat->SetFileNode(0);
1251 }
1252 }
1253
1254 // try to find an unused filenode first
1255 if (!file) {
1256 file = GetNextUnAlloc();
1257 }
1258
1259 // then look at the active filenodes
1260 if (!file) {
1261 file = GetNextActive();
1262 }
1263
1264 if (!file) return 0;
1265
1266 slstat->fCurFile = file;
1267 file->GetNode()->IncSlaveCnt(slstat->GetName());
1268 if (gPerfStats)
1269 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1270 file->GetNode()->GetName(),
1271 file->GetElement()->GetFileName(), kTRUE);
1272 }
1273
1274 // get a packet
1275
1276 TDSetElement *base = file->GetElement();
1277 Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
1278 if (num < 1) num = 1;
1279
1280 Long64_t first = file->GetNextEntry();
1281 Long64_t last = base->GetFirst() + base->GetNum();
1282
1283 if ( first + num >= last ) {
1284 num = last - first;
1285 file->SetDone(); // done
1286
1287 // delete file from active list (unalloc list is single pass, no delete needed)
1289
1290 } else {
1291 file->MoveNextEntry(num);
1292 }
1293
1294
1295 slstat->fCurElem = CreateNewPacket(base, first, num);
1296 if (base->GetEntryList())
1297 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1298
1299 // Flag the first packet of a new run (dataset)
1300 if (firstPacket)
1301 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1302 else
1303 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1304
1305 PDB(kPacketizer,2)
1306 Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1307
1308 return slstat->fCurElem;
1309}
1310
1311////////////////////////////////////////////////////////////////////////////////
1312/// Return the number of workers still processing
1313
1315{
1316 Int_t actw = 0;
1317 TIter nxw(fSlaveStats);
1318 TObject *key;
1319 while ((key = nxw())) {
1320 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1321 if (wrkstat && wrkstat->fCurFile) actw++;
1322 }
1323 // Done
1324 return actw;
1325}
@ kPROOF_FATAL
Definition: MessageTypes.h:43
@ kPROOF_GETENTRIES
Definition: MessageTypes.h:60
@ kPROOF_MESSAGE
Definition: MessageTypes.h:85
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:543
#define f(i)
Definition: RSha256.hxx:104
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:41
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
const char Option_t
Definition: RtypesCore.h:62
#define ClassImp(name)
Definition: Rtypes.h:365
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
#define R__ASSERT(e)
Definition: TError.h:96
void Error(const char *location, const char *msgfmt,...)
char name[80]
Definition: TGX11.cxx:109
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
R__EXTERN TProof * gProof
Definition: TProof.h:1077
char * Form(const char *fmt,...)
void Printf(const char *fmt,...)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:560
#define gPerfStats
Int_t BufferSize() const
Definition: TBuffer.h:97
Int_t Length() const
Definition: TBuffer.h:99
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual Int_t GetEntries() const
Definition: TCollection.h:177
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:429
const char * GetObjName() const
Definition: TDSet.h:120
Long64_t GetNum() const
Definition: TDSet.h:114
@ kNewRun
Definition: TDSet.h:75
TObject * GetEntryList() const
Definition: TDSet.h:131
void Invalidate()
Definition: TDSet.h:134
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:129
void SetNum(Long64_t num)
Definition: TDSet.h:118
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:253
void SetValid()
Definition: TDSet.h:135
Long64_t GetTDSetOffset() const
Definition: TDSet.h:128
const char * GetFileName() const
Definition: TDSet.h:111
Long64_t GetFirst() const
Definition: TDSet.h:112
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:413
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1369
Bool_t IsTree() const
Definition: TDSet.h:225
TList * GetListOfElements() const
Definition: TDSet.h:231
@ kSomeInvalid
Definition: TDSet.h:161
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual Long64_t GetN() const
Definition: TEntryList.h:75
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Int_t GetN() const
Definition: TEventList.h:56
TObject * Next()
Definition: TCollection.h:249
void Reset()
Definition: TCollection.h:252
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:327
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:819
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:575
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:656
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:934
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:53
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition: TMap.cxx:150
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:214
UInt_t What() const
Definition: TMessage.h:75
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
Mother of all ROOT objects.
Definition: TObject.h:37
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
virtual Bool_t IsSortable() const
Definition: TObject.h:131
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:159
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:550
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
This class generates packets to be processed on PROOF worker servers.
Definition: TPacketizer.h:39
TFileStat * GetNextActive()
Get next active file.
Long_t fMaxSlaveCnt
Definition: TPacketizer.h:58
TFileNode * NextActiveNode()
Get next active node.
virtual ~TPacketizer()
Destructor.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Bool_t fDefMaxWrkNode
Definition: TPacketizer.h:69
Int_t fMaxPerfIdx
Definition: TPacketizer.h:56
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
TFileNode * NextUnAllocNode()
Get next unallocated node.
TList * fPackets
Definition: TPacketizer.h:44
Int_t GetActiveWorkers()
Return the number of workers still processing.
TList * fUnAllocated
Definition: TPacketizer.h:50
Bool_t fHeuristicPSiz
Definition: TPacketizer.h:68
void Reset()
Reset the internal datastructure for packet distribution.
Int_t fPacketAsAFraction
Definition: TPacketizer.h:60
TList * fActive
Definition: TPacketizer.h:51
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TList * fFileNodes
Definition: TPacketizer.h:49
Long64_t fPacketSize
Definition: TPacketizer.h:53
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
void IncBytesRead(Long64_t bytesRead)
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition: TProofServ.h:257
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:316
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9894
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9312
Class describing a PROOF worker server.
Definition: TSlave.h:46
TSocket * GetSocket() const
Definition: TSlave.h:134
Int_t GetProtocol() const
Definition: TSlave.h:133
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
const char * GetOrdinal() const
Definition: TSlave.h:131
Int_t GetPerfIdx() const
Definition: TSlave.h:132
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:816
virtual Bool_t IsValid() const
Definition: TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:521
Basic string class.
Definition: TString.h:131
const char * Data() const
Definition: TString.h:364
Bool_t IsNull() const
Definition: TString.h:402
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:619
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:312
This class represents a WWW compatible URL.
Definition: TUrl.h:35
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Bool_t IsValid() const
Definition: TUrl.h:82
const char * GetHost() const
Definition: TUrl.h:70
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
void SetHost(const char *host)
Definition: TUrl.h:87
const char * GetProtocol() const
Definition: TUrl.h:67
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
const Int_t n
Definition: legend1.C:16
void Add(RHist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
Definition: RHist.hxx:309
static constexpr double s
Definition: file.py:1
Definition: first.py:1
auto * m
Definition: textangle.C:8