Logo ROOT   6.16/01
Reference Guide
TPacketizer.cxx
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Maarten Ballintijn 18/03/02
3
4/*************************************************************************
5 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12/** \class TPacketizer
13\ingroup proofkernel
14
15This class generates packets to be processed on PROOF worker servers.
16A packet is an event range (begin entry and number of entries) or
17object range (first object and number of objects) in a TTree
18(entries) or a directory (objects) in a file.
19Packets are generated taking into account the performance of the
20remote machine, the time it took to process a previous packet on
21the remote machine, the locality of the database files, etc.
22
23*/
24
25#include "TPacketizer.h"
26
27#include "Riostream.h"
28#include "TDSet.h"
29#include "TEnv.h"
30#include "TError.h"
31#include "TEventList.h"
32#include "TEntryList.h"
33#include "TMap.h"
34#include "TMessage.h"
35#include "TMonitor.h"
36#include "TNtupleD.h"
37#include "TObject.h"
38#include "TParameter.h"
39#include "TPerfStats.h"
40#include "TProofDebug.h"
41#include "TProof.h"
42#include "TProofPlayer.h"
43#include "TProofServ.h"
44#include "TSlave.h"
45#include "TSocket.h"
46#include "TTimer.h"
47#include "TUrl.h"
48#include "TClass.h"
49#include "TMath.h"
50#include "TObjString.h"
51
52//
53// The following three utility classes manage the state of the
54// work to be performed and the slaves involved in the process.
55// A list of TFileNode(s) describes the hosts with files, each
56// has a list of TFileStat(s) keeping the state for each TDSet
57// element (file).
58//
59// The list of TSlaveStat(s) keep track of the work (being) done
60// by each slave
61//
62
63
64//------------------------------------------------------------------------------
65
66class TPacketizer::TFileStat : public TObject {
67
68private:
69 Bool_t fIsDone; // is this element processed
70 TFileNode *fNode; // my FileNode
71 TDSetElement *fElement; // location of the file and its range
72 Long64_t fNextEntry; // cursor in the range, -1 when done
73
74public:
75 TFileStat(TFileNode *node, TDSetElement *elem);
76
77 Bool_t IsDone() const {return fIsDone;}
78 void SetDone() {fIsDone = kTRUE;}
79 TFileNode *GetNode() const {return fNode;}
80 TDSetElement *GetElement() const {return fElement;}
81 Long64_t GetNextEntry() const {return fNextEntry;}
82 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
83};
84
85
86TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
87 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
88{
89}
90
91
92//------------------------------------------------------------------------------
93
94class TPacketizer::TFileNode : public TObject {
95
96private:
97 TString fNodeName; // FQDN of the node
98 TList *fFiles; // TDSetElements (files) stored on this node
99 TObject *fUnAllocFileNext; // cursor in fFiles
100 TList *fActFiles; // files with work remaining
101 TObject *fActFileNext; // cursor in fActFiles
102 Int_t fMySlaveCnt; // number of slaves running on this node
103 Int_t fSlaveCnt; // number of external slaves processing files on this node
104
105public:
106 TFileNode(const char *name);
107 ~TFileNode() { delete fFiles; delete fActFiles; }
108
109 void IncMySlaveCnt() { fMySlaveCnt++; }
110 void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
111 void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
112 Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
113 Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
114 Bool_t IsSortable() const { return kTRUE; }
115
116 const char *GetName() const { return fNodeName.Data(); }
117
118 void Add(TDSetElement *elem)
119 {
120 TFileStat *f = new TFileStat(this,elem);
121 fFiles->Add(f);
122 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
123 }
124
125 TFileStat *GetNextUnAlloc()
126 {
127 TObject *next = fUnAllocFileNext;
128
129 if (next != 0) {
130 // make file active
131 fActFiles->Add(next);
132 if (fActFileNext == 0) fActFileNext = fActFiles->First();
133
134 // move cursor
135 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
136 }
137
138 return (TFileStat *) next;
139 }
140
141 TFileStat *GetNextActive()
142 {
143 TObject *next = fActFileNext;
144
145 if (fActFileNext != 0) {
146 fActFileNext = fActFiles->After(fActFileNext);
147 if (fActFileNext == 0) fActFileNext = fActFiles->First();
148 }
149
150 return (TFileStat *) next;
151 }
152
153 void RemoveActive(TFileStat *file)
154 {
155 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
156 fActFiles->Remove(file);
157 if (fActFileNext == 0) fActFileNext = fActFiles->First();
158 }
159
160 Int_t Compare(const TObject *other) const
161 {
162 // Must return -1 if this is smaller than obj, 0 if objects are equal
163 // and 1 if this is larger than obj.
164 const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
165 if (!obj) {
166 Error("Compare", "input is not a TPacketizer::TFileNode object");
167 return 0;
168 }
169
170 Int_t myVal = GetSlaveCnt();
171 Int_t otherVal = obj->GetSlaveCnt();
172 if (myVal < otherVal) {
173 return -1;
174 } else if (myVal > otherVal) {
175 return 1;
176 } else {
177 return 0;
178 }
179 }
180
181 void Print(Option_t *) const
182 {
183 std::cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
184 << "\tMySlaveCount " << fMySlaveCnt
185 << "\tSlaveCount " << fSlaveCnt << std::endl;
186 }
187
188 void Reset()
189 {
190 fUnAllocFileNext = fFiles->First();
191 fActFiles->Clear();
192 fActFileNext = 0;
193 fSlaveCnt = 0;
194 fMySlaveCnt = 0;
195 }
196};
197
198
199TPacketizer::TFileNode::TFileNode(const char *name)
200 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
201 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
202{
203 // Constructor
204
205 fFiles->SetOwner();
206 fActFiles->SetOwner(kFALSE);
207}
208
209
210//------------------------------------------------------------------------------
211
212class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
213
214friend class TPacketizer;
215
216private:
217 TFileNode *fFileNode; // corresponding node or 0
218 TFileStat *fCurFile; // file currently being processed
219 TDSetElement *fCurElem; // TDSetElement currently being processed
221public:
222 TSlaveStat(TSlave *slave);
223 ~TSlaveStat();
224
225 TFileNode *GetFileNode() const { return fFileNode; }
226
227 void SetFileNode(TFileNode *node) { fFileNode = node; }
228};
229
230
231TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
232 : fFileNode(0), fCurFile(0), fCurElem(0)
233{
234 fSlave = slave;
235 fStatus = new TProofProgressStatus();
236}
237
238////////////////////////////////////////////////////////////////////////////////
239/// Cleanup
240
241TPacketizer::TSlaveStat::~TSlaveStat()
242{
243 SafeDelete(fStatus);
244}
245
246TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
247{
248 // Update the status info to the 'st'.
249 // return the difference (*st - *fStatus)
250
251 if (st) {
252 // The entriesis not correct in 'st'
253 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
254 // The last proc time should not be added
255 fStatus->SetLastProcTime(0.);
256 // Get the diff
257 TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
258 *fStatus += *diff;
259 // Set the correct value
260 fStatus->SetLastEntries(lastEntries);
261 return diff;
262 } else {
263 Error("AddProcessed", "status arg undefined");
264 return 0;
265 }
266}
267
268//------------------------------------------------------------------------------
269
271
272////////////////////////////////////////////////////////////////////////////////
273/// Constructor
274
276 Long64_t num, TList *input, TProofProgressStatus *st)
277 : TVirtualPacketizer(input, st)
278{
279 PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
280
281 // Init pointer members
282 fPackets = 0;
283 fUnAllocated = 0;
284 fActive = 0;
285 fFileNodes = 0;
286 fMaxPerfIdx = 1;
287 fMaxSlaveCnt = 0;
290
291 if (!fProgressStatus) {
292 Error("TPacketizer", "No progress status");
293 return;
294 }
295
296 Long_t maxSlaveCnt = 0;
297 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
298 if (maxSlaveCnt < 0) {
299 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
300 maxSlaveCnt = 0;
301 }
302 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
303 } else {
304 // Try also with Int_t (recently supported in TProof::SetParameter)
305 Int_t mxslcnt = -1;
306 if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
307 if (mxslcnt < 0) {
308 Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
309 mxslcnt = 0;
310 }
311 maxSlaveCnt = (Long_t) mxslcnt;
312 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
313 }
314 }
315 if (!maxSlaveCnt) {
316 maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize());
317 if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
318 }
319 if (maxSlaveCnt > 0) {
320 fMaxSlaveCnt = maxSlaveCnt;
321 PDB(kPacketizer,1)
322 Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
323 }
324
325 fPackets = new TList;
327
328 fFileNodes = new TList;
330 fUnAllocated = new TList;
332 fActive = new TList;
334
335
336 fValid = kTRUE;
337
338 // Resolve end-point urls to optmize distribution
339 // dset->Lookup(); // moved to TProofPlayerRemote::Process
340
341 // Split into per host entries
342 dset->Reset();
344 while ((e = (TDSetElement*)dset->Next())) {
345 if (e->GetValid()) continue;
346
347 TUrl url = e->GetFileName();
348
349 // Map non URL filenames to dummy host
350 TString host;
351 if ( !url.IsValid() ||
352 (strncmp(url.GetProtocol(),"root", 4) &&
353 strncmp(url.GetProtocol(),"rfio", 4) &&
354 strncmp(url.GetProtocol(),"file", 4)) ) {
355 host = "no-host";
356 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
357 host = "localhost";
358 url.SetProtocol("root");
359 } else {
360 host = url.GetHost();
361 }
362 // Get full name for local hosts
363 if (host.Contains("localhost") || host == "127.0.0.1") {
364 url.SetHost(gSystem->HostName());
365 host = url.GetHostFQDN();
366 }
367
368 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
369
370 if (node == 0) {
371 node = new TFileNode(host);
372 fFileNodes->Add(node);
373 }
374
375 node->Add( e );
376 }
377
378 fSlaveStats = new TMap;
380
381 // Record initial available workers
382 Int_t nwrks = AddWorkers(slaves);
383 Info("TPacketizer", "Initial number of workers: %d", nwrks);
384
385 // Setup file & filenode structure
386 Reset();
387 // Optimize the number of files to be open when running on subsample
388 Int_t validateMode = 0;
389 Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
390 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
391 if (num > -1)
392 PDB(kPacketizer,2)
393 Info("TPacketizer",
394 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
395 ValidateFiles(dset, slaves, num, byfile);
396
397 if (!fValid) return;
398
399 // apply global range (first,num) to dset and rebuild structure
400 // ommitting TDSet elements that are not needed
401
402 Int_t files = 0;
403 fTotalEntries = 0;
404 fUnAllocated->Clear(); // avoid dangling pointers
405 fActive->Clear();
406 fFileNodes->Clear(); // then delete all objects
407 PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
408
409 dset->Reset();
410 Long64_t cur = 0;
411 while (( e = (TDSetElement*)dset->Next())) {
412
413 // Skip invalid or missing file; It will be moved
414 // from the dset to the 'MissingFiles' list in the player.
415 if (!e->GetValid()) continue;
416
417 // The dataset name, if any
418 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
419 fDataSet = e->GetDataSet();
420
421 TUrl url = e->GetFileName();
422 Long64_t eFirst = e->GetFirst();
423 Long64_t eNum = e->GetNum();
424 PDB(kPacketizer,2)
425 Info("TPacketizer", " --> '%s'", e->GetFileName());
426 PDB(kPacketizer,2)
427 Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
428
429 if (!e->GetEntryList()){
430 // this element is before the start of the global range, skip it
431 if (cur + eNum < first) {
432 cur += eNum;
433 PDB(kPacketizer,2)
434 Info("TPacketizer", " --> skip element cur %lld", cur);
435 continue;
436 }
437
438 // this element is after the end of the global range, skip it
439 if (num != -1 && (first+num <= cur)) {
440 cur += eNum;
441 PDB(kPacketizer,2)
442 Info("TPacketizer", " --> drop element cur %lld", cur);
443 continue; // break ??
444 }
445
446 Bool_t inRange = kFALSE;
447 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
448
449 if (cur <= first) {
450 // If this element contains the start of the global range
451 // adjust its start and number of entries
452 e->SetFirst( eFirst + (first - cur) );
453 e->SetNum( e->GetNum() - (first - cur) );
454 PDB(kPacketizer,2)
455 Info("TPacketizer", " --> adjust start %lld and end %lld",
456 eFirst + (first - cur), first + num - cur);
457 inRange = kTRUE;
458 }
459 if (num != -1 && (first+num <= cur+eNum)) {
460 // If this element contains the end of the global range
461 // adjust its number of entries
462 e->SetNum( first + num - e->GetFirst() - cur );
463 PDB(kPacketizer,2)
464 Info("TPacketizer", " --> adjust end %lld", first + num - cur);
465 inRange = kTRUE;
466 }
467
468 } else {
469 // Increment the counter ...
470 PDB(kPacketizer,2)
471 Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
472 cur += eNum;
473 }
474 // Re-adjust eNum and cur, if needed
475 if (inRange) {
476 cur += eNum;
477 eNum = e->GetNum();
478 }
479
480 } else {
481 TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
482 if (enl) {
483 eNum = enl->GetN();
484 } else {
485 TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
486 eNum = evl ? evl->GetN() : eNum;
487 }
488 if (!eNum)
489 continue;
490 }
491 PDB(kPacketizer,2)
492 Info("TPacketizer", " --> next cur %lld", cur);
493
494 // Map non URL filenames to dummy host
495 TString host;
496 if ( !url.IsValid() ||
497 (strncmp(url.GetProtocol(),"root", 4) &&
498 strncmp(url.GetProtocol(),"rfio", 4) &&
499 strncmp(url.GetProtocol(),"file", 4)) ) {
500 host = "no-host";
501 } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
502 host = "localhost";
503 url.SetProtocol("root");
504 } else {
505 host = url.GetHostFQDN();
506 }
507 // Get full name for local hosts
508 if (host.Contains("localhost") || host == "127.0.0.1") {
509 url.SetHost(gSystem->HostName());
510 host = url.GetHostFQDN();
511 }
512
513 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
514
515 if ( node == 0 ) {
516 node = new TFileNode( host );
517 fFileNodes->Add( node );
518 }
519
520 ++files;
521 fTotalEntries += eNum;
522 node->Add(e);
523 PDB(kPacketizer,2) e->Print("a");
524 }
525
526 PDB(kPacketizer,1)
527 Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
528 fTotalEntries, files, fFileNodes->GetSize());
529
530 // Set the total number for monitoring
531 if (gPerfStats)
532 gPerfStats->SetNumEvents(fTotalEntries);
533
534 Reset();
535
536 if (fFileNodes->GetSize() == 0) {
537 Info("TPacketizer", "no valid or non-empty file found: setting invalid");
538 // No valid files: set invalid and return
539 fValid = kFALSE;
540 return;
541 }
542
543 // Below we provide a possibility to change the way packet size is
544 // calculated or define the packet size directly.
545 // fPacketAsAFraction can be interpreted as follows:
546 // assuming all slaves have equal processing rate,
547 // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
548 // It substitutes 20 in the old formula to calculate the fPacketSize:
549 // fPacketSize = fTotalEntries / (20 * nslaves)
550 Long_t packetAsAFraction = 20;
551 if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
552 Info("Process", "using alternate fraction of query time as a packet Size: %ld",
553 packetAsAFraction);
554 fPacketAsAFraction = (Int_t)packetAsAFraction;
555
556 fPacketSize = 1;
557 if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
558 Info("Process","using alternate packet size: %lld", fPacketSize);
559 } else {
560 // Heuristic for starting packet size
562 Int_t nslaves = fSlaveStats->GetSize();
563 if (nslaves > 0) {
565 if (fPacketSize < 1) fPacketSize = 1;
566 } else {
567 fPacketSize = 1;
568 }
569 }
570
571 PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
572
573 if (!fValid)
575
576 PDB(kPacketizer,1) Info("TPacketizer", "Return");
577}
578
579////////////////////////////////////////////////////////////////////////////////
580/// Destructor.
581
583{
584 if (fSlaveStats) {
586 }
587
593}
594
595////////////////////////////////////////////////////////////////////////////////
596/// Adds new workers. Returns the number of workers added, or -1 on failure.
597
599{
600 if (!workers) {
601 Error("AddWorkers", "Null list of new workers!");
602 return -1;
603 }
604
605 Int_t curNumOfWrks = fSlaveStats->GetEntries();
606
607 TSlave *sl;
608 TIter next(workers);
609 while (( sl = dynamic_cast<TSlave*>(next()) ))
610 if (!fSlaveStats->FindObject(sl)) {
611 fSlaveStats->Add(sl, new TSlaveStat(sl));
613 }
614
615 // If heuristic (and new workers) set the packet size
616 Int_t nwrks = fSlaveStats->GetSize();
617 if (fHeuristicPSiz && nwrks > curNumOfWrks) {
618 if (nwrks > 0) {
620 if (fPacketSize < 1) fPacketSize = 1;
621 } else {
622 fPacketSize = 1;
623 }
624 }
625
626 // Update the max number that can access one file node if the default is used
627 if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
628
629 // Done
630 return nwrks;
631}
632
633////////////////////////////////////////////////////////////////////////////////
634/// Get next unallocated file.
635
636TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
637{
638 TFileStat *file = 0;
639
640 if (node != 0) {
641 file = node->GetNextUnAlloc();
642 if (file == 0) RemoveUnAllocNode(node);
643 } else {
644 while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
645 file = node->GetNextUnAlloc();
646 if (file == 0) RemoveUnAllocNode(node);
647 }
648 }
649
650 if (file != 0) {
651 // if needed make node active
652 if (fActive->FindObject(node) == 0) {
653 fActive->Add(node);
654 }
655 }
656
657 return file;
658}
659
660////////////////////////////////////////////////////////////////////////////////
661/// Get next unallocated node.
662
663TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
664{
666 PDB(kPacketizer,2) {
667 std::cout << "TPacketizer::NextUnAllocNode()" << std::endl;
669 }
670
671 TFileNode *fn = (TFileNode*) fUnAllocated->First();
672 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
673 PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
675 fn = 0;
676 }
677
678 return fn;
679}
680
681////////////////////////////////////////////////////////////////////////////////
682/// Remove unallocated node.
683
684void TPacketizer::RemoveUnAllocNode(TFileNode * node)
685{
686 fUnAllocated->Remove(node);
687}
688
689////////////////////////////////////////////////////////////////////////////////
690/// Get next active file.
691
692TPacketizer::TFileStat *TPacketizer::GetNextActive()
693{
694 TFileNode *node;
695 TFileStat *file = 0;
696
697 while (file == 0 && ((node = NextActiveNode()) != 0)) {
698 file = node->GetNextActive();
699 if (file == 0) RemoveActiveNode(node);
700 }
701
702 return file;
703}
704
705////////////////////////////////////////////////////////////////////////////////
706/// Get next active node.
707
708TPacketizer::TFileNode *TPacketizer::NextActiveNode()
709{
710 fActive->Sort();
711 PDB(kPacketizer,2) {
712 Printf("TPacketizer::NextActiveNode : ----------------------");
713 fActive->Print();
714 }
715
716 TFileNode *fn = (TFileNode*) fActive->First();
717 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
718 PDB(kPacketizer,1)
719 Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
720 fn = 0;
721 }
722
723 return fn;
724}
725
726////////////////////////////////////////////////////////////////////////////////
727/// Remove file from the list of actives.
728
730{
731 TFileNode *node = file->GetNode();
732
733 node->RemoveActive(file);
734 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
735}
736
737////////////////////////////////////////////////////////////////////////////////
738/// Remove node from the list of actives.
739
740void TPacketizer::RemoveActiveNode(TFileNode *node)
741{
742 fActive->Remove(node);
743}
744
745////////////////////////////////////////////////////////////////////////////////
746/// Reset the internal datastructure for packet distribution.
747
749{
752
753 fActive->Clear();
754
755 TIter files(fFileNodes);
756 TFileNode *fn;
757 while ((fn = (TFileNode*) files.Next()) != 0) {
758 fn->Reset();
759 }
760
761 TIter slaves(fSlaveStats);
762 TObject *key;
763 while ((key = slaves.Next()) != 0) {
764 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
765 if (slstat) {
766 fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
767 if (fn != 0 ) {
768 slstat->SetFileNode(fn);
769 fn->IncMySlaveCnt();
770 }
771 slstat->fCurFile = 0;
772 } else {
773 Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
774 }
775 }
776}
777
778////////////////////////////////////////////////////////////////////////////////
779/// Check existence of file/dir/tree an get number of entries.
780/// Assumes the files have been setup.
781
782void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
783{
784 TMap slaves_by_sock;
785 TMonitor mon;
786 TList workers;
787
788
789 // Setup the communication infrastructure
790
791 workers.AddAll(slaves);
792 TIter si(slaves);
793 TSlave *slm = 0;
794 while ((slm = (TSlave*)si.Next()) != 0) {
795 PDB(kPacketizer,3)
796 Info("ValidateFiles","socket added to monitor: %p (%s)",
797 slm->GetSocket(), slm->GetName());
798 mon.Add(slm->GetSocket());
799 slaves_by_sock.Add(slm->GetSocket(), slm);
800 PDB(kPacketizer,1)
801 Info("ValidateFiles",
802 "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
803 }
804
805 mon.DeActivateAll();
806
807 ((TProof*)gProof)->DeActivateAsyncInput();
808
809 // Some monitoring systems (TXSocketHandler) need to know this
810 ((TProof*)gProof)->fCurrentMonitor = &mon;
811
812 // Preparing for client notification
813 TString msg("Validating files");
814 UInt_t n = 0;
815 UInt_t tot = dset->GetListOfElements()->GetSize();
816 Bool_t st = kTRUE;
817
818 Long64_t totent = 0, nopenf = 0;
819 while (kTRUE) {
820
821 // send work
822 while( TSlave *s = (TSlave*)workers.First() ) {
823
824 workers.Remove(s);
825
826 // find a file
827
828 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
829 if (!slstat) {
830 Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
831 continue;
832 }
833 TFileNode *node = 0;
834 TFileStat *file = 0;
835
836 // try its own node first
837 if ( (node = slstat->GetFileNode()) != 0 ) {
838 file = GetNextUnAlloc(node);
839 if ( file == 0 ) {
840 slstat->SetFileNode(0);
841 }
842 }
843
844 // look for a file on any other node if necessary
845 if (file == 0) {
847 }
848
849 if ( file != 0 ) {
850 // files are done right away
852
853 slstat->fCurFile = file;
854 TDSetElement *elem = file->GetElement();
855 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
856 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
857 // This is decremented when we get the reply
858 file->GetNode()->IncSlaveCnt(slstat->GetName());
860 m << dset->IsTree()
861 << TString(elem->GetFileName())
862 << TString(elem->GetDirectory())
863 << TString(elem->GetObjName());
864
865 s->GetSocket()->Send( m );
866 mon.Activate(s->GetSocket());
867 PDB(kPacketizer,2)
868 Info("ValidateFiles",
869 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
870 s->GetOrdinal(), s->GetName(), s->GetSocket(),
871 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
872 elem->GetDirectory(), elem->GetObjName());
873 } else {
874 // Fill the info
875 elem->SetTDSetOffset(entries);
876 if (entries > 0) {
877 // Most likely valid
878 elem->SetValid();
879 if (!elem->GetEntryList()) {
880 if (elem->GetFirst() > entries) {
881 Error("ValidateFiles",
882 "first (%lld) higher then number of entries (%lld) in %s",
883 elem->GetFirst(), entries, elem->GetFileName());
884 // disable element
885 slstat->fCurFile->SetDone();
886 elem->Invalidate();
888 }
889 if (elem->GetNum() == -1) {
890 elem->SetNum(entries - elem->GetFirst());
891 } else if (elem->GetFirst() + elem->GetNum() > entries) {
892 Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
893 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
894 entries, elem->GetFileName());
895 elem->SetNum(entries - elem->GetFirst());
896 }
897 PDB(kPacketizer,2)
898 Info("ValidateFiles",
899 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
900 }
901 }
902 // Notify the client
903 n++;
904 gProof->SendDataSetStatus(msg, n, tot, st);
905
906 // This worker is ready for the next validation
907 workers.Add(s);
908 }
909 }
910 }
911
912 // Check if there is anything to wait for
913 if (mon.GetActive() == 0) {
914 if (byfile && maxent > 0 && totent > 0) {
915 // How many files do we still need ?
916 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
917 if (nrestf <= 0 && maxent > totent) nrestf = 1;
918 if (nrestf > 0) {
919 PDB(kPacketizer,3)
920 Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
921 maxent, totent, nopenf, nrestf);
922 si.Reset();
923 while ((slm = (TSlave *) si.Next()) && nrestf--) {
924 workers.Add(slm);
925 }
926 continue;
927 } else {
928 PDB(kPacketizer,3)
929 Info("ValidateFiles", "no need to validate more files");
930 break;
931 }
932 } else {
933 break;
934 }
935 }
936
937 PDB(kPacketizer,3) {
938 Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
939 TList *act = mon.GetListOfActives();
940 TIter next(act);
941 TSocket *s = 0;
942 while ((s = (TSocket*) next())) {
943 Info("ValidateFiles", "found sck: %p", s);
944 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
945 if (sl)
946 Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
947 }
948 delete act;
949 }
950
951 TSocket *sock = mon.Select();
952 // If we have been interrupted break
953 if (!sock) {
954 Error("ValidateFiles", "selection has been interrupted - STOP");
955 mon.DeActivateAll();
956 fValid = kFALSE;
957 break;
958 }
959 mon.DeActivate(sock);
960
961 PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
962
963 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
964 if (!sock->IsValid()) {
965 // A socket got invalid during validation
966 Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
967 slave->GetOrdinal(), slave->GetName());
968 ((TProof*)gProof)->MarkBad(slave);
969 fValid = kFALSE;
970 break;
971 }
972
973 TMessage *reply;
974
975 if ( sock->Recv(reply) <= 0 ) {
976 // Help! lost a slave?
977 ((TProof*)gProof)->MarkBad(slave);
978 fValid = kFALSE;
979 Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
980 slave->GetOrdinal(), slave->GetName());
981 continue;
982 }
983
984 if (reply->What() != kPROOF_GETENTRIES) {
985 // Not what we want: handover processing to the central machinery
986 Int_t what = reply->What();
987 ((TProof*)gProof)->HandleInputMessage(slave, reply);
988 if (what == kPROOF_FATAL) {
989 Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
990 slave->GetOrdinal(), slave->GetName());
991 fValid = kFALSE;
992 } else {
993 // Reactivate the socket
994 mon.Activate(sock);
995 }
996 // Get next message
997 continue;
998 }
999
1000 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1001 TDSetElement *e = slavestat->fCurFile->GetElement();
1002 slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1003 Long64_t entries;
1004
1005 (*reply) >> entries;
1006
1007 // Extract object name, if there
1008 if ((reply->BufferSize() > reply->Length())) {
1009 TString objname;
1010 (*reply) >> objname;
1011 e->SetTitle(objname);
1012 }
1013
1014 e->SetTDSetOffset(entries);
1015 if ( entries > 0 ) {
1016
1017 // This dataset element is most likely valid
1018 e->SetValid();
1019
1020 //if (!e->GetEventList()) {
1021 if (!e->GetEntryList()){
1022 if ( e->GetFirst() > entries ) {
1023 Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
1024 e->GetFirst(), entries, e->GetFileName());
1025
1026 // Invalidate the element
1027 slavestat->fCurFile->SetDone();
1028 e->Invalidate();
1030 }
1031
1032 if ( e->GetNum() == -1 ) {
1033 e->SetNum( entries - e->GetFirst() );
1034 } else if ( e->GetFirst() + e->GetNum() > entries ) {
1035 Error("ValidateFiles",
1036 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1037 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1038 e->SetNum(entries - e->GetFirst());
1039 }
1040 }
1041
1042 // Count
1043 totent += entries;
1044 nopenf++;
1045
1046 // Notify the client
1047 n++;
1048 gProof->SendDataSetStatus(msg, n, tot, st);
1049
1050 } else {
1051
1052 Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
1053 //
1054 // Need to fix this with a user option to allow incomplete file sets (rdm)
1055 //
1056 //fValid = kFALSE; // all element must be readable!
1057 if (gProofServ) {
1059 m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
1061 }
1062
1063 // Invalidate the element
1064 e->Invalidate();
1066 }
1067 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1068
1069 // Ready for the next job, unless we have enough files
1070 if (maxent < 0 || ((totent < maxent) && !byfile))
1071 workers.Add(slave);
1072 }
1073
1074 // report std. output from slaves??
1075
1076 ((TProof*)gProof)->ActivateAsyncInput();
1077
1078 // This needs to be reset
1079 ((TProof*)gProof)->fCurrentMonitor = 0;
1080
1081 // No reason to continue if invalid
1082 if (!fValid)
1083 return;
1084
1085
1086 // compute the offset for each file element
1087 Long64_t offset = 0;
1088 Long64_t newOffset = 0;
1089 TIter next(dset->GetListOfElements());
1090 TDSetElement *el;
1091 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1092 newOffset = offset + el->GetTDSetOffset();
1093 el->SetTDSetOffset(offset);
1094 offset = newOffset;
1095 }
1096}
1097
1098////////////////////////////////////////////////////////////////////////////////
1099/// Get entries processed by the specified slave.
1100
1102{
1103 if ( fSlaveStats == 0 ) return 0;
1104
1105 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1106
1107 if ( slstat == 0 ) return 0;
1108
1109 return slstat->GetEntriesProcessed();
1110}
1111
1112////////////////////////////////////////////////////////////////////////////////
1113/// Get Estimation of the current rate; just summing the current rates of
1114/// the active workers
1115
1117{
1118 all = kTRUE;
1119 // Loop over the workers
1120 Float_t currate = 0.;
1121 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1122 TIter nxw(fSlaveStats);
1123 TObject *key;
1124 while ((key = nxw()) != 0) {
1125 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1126 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1127 // Sum-up the current rates
1128 currate += slstat->GetProgressStatus()->GetCurrentRate();
1129 } else {
1130 all = kFALSE;
1131 }
1132 }
1133 }
1134 // Done
1135 return currate;
1136}
1137
1138////////////////////////////////////////////////////////////////////////////////
1139/// Get next packet
1140
1142{
1143 if ( !fValid ) {
1144 return 0;
1145 }
1146
1147 // Find worker
1148
1149 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1150
1151 R__ASSERT( slstat != 0 );
1152
1153 PDB(kPacketizer,1)
1154 Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1155 // update stats & free old element
1156
1157 Bool_t firstPacket = kFALSE;
1158 if ( slstat->fCurElem != 0 ) {
1159 Double_t latency = 0., proctime = 0., proccpu = 0.;
1160 Long64_t bytesRead = -1;
1161 Long64_t totalEntries = -1;
1162 Long64_t totev = 0;
1163 Long64_t numev = slstat->fCurElem->GetNum();
1164
1165 fPackets->Add(slstat->fCurElem);
1166
1167 if (sl->GetProtocol() > 18) {
1168 TProofProgressStatus *status = 0;
1169 (*r) >> latency;
1170 (*r) >> status;
1171
1172 // Calculate the progress made in the last packet
1173 TProofProgressStatus *progress = 0;
1174 if (status) {
1175 // upadte the worker status
1176 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1177 progress = slstat->AddProcessed(status);
1178 if (progress) {
1179 // (*fProgressStatus) += *progress;
1180 proctime = progress->GetProcTime();
1181 proccpu = progress->GetCPUTime();
1182 totev = status->GetEntries(); // for backward compatibility
1183 bytesRead = progress->GetBytesRead();
1184 delete progress;
1185 }
1186 delete status;
1187 } else
1188 Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
1189 } else {
1190
1191 (*r) >> latency >> proctime >> proccpu;
1192
1193 // only read new info if available
1194 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1195 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1196 if (r->BufferSize() > r->Length()) (*r) >> totev;
1197
1198 numev = totev - slstat->GetEntriesProcessed();
1199 if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1200 if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1201 if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1202 }
1203
1204 if (fProgressStatus) {
1205 if (numev > 0) fProgressStatus->IncEntries(numev);
1206 if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1207 if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1208 }
1209 PDB(kPacketizer,2)
1210 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1211 sl->GetOrdinal(), sl->GetName(),
1212 numev, latency, proctime, proccpu, bytesRead);
1213
1214 if (gPerfStats)
1215 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1216 numev, latency, proctime, proccpu, bytesRead);
1217
1218 slstat->fCurElem = 0;
1220 HandleTimer(0); // Send last timer message
1221 delete fProgress; fProgress = 0;
1222 }
1223 } else {
1224 firstPacket = kTRUE;
1225 }
1226
1227 if ( fStop ) {
1228 HandleTimer(0);
1229 return 0;
1230 }
1231
1232 // get a file if needed
1233
1234 TFileStat *file = slstat->fCurFile;
1235
1236 if ( file != 0 && file->IsDone() ) {
1237 file->GetNode()->DecSlaveCnt(slstat->GetName());
1238 if (gPerfStats)
1239 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1240 file->GetElement()->GetFileName(), kFALSE);
1241 file = 0;
1242 }
1243 // Reset the current file field
1244 slstat->fCurFile = file;
1245
1246 if (!file) {
1247
1248 // Try its own node first
1249 if (slstat->GetFileNode() != 0) {
1250 file = GetNextUnAlloc(slstat->GetFileNode());
1251 if (!file) {
1252 slstat->SetFileNode(0);
1253 }
1254 }
1255
1256 // try to find an unused filenode first
1257 if (!file) {
1258 file = GetNextUnAlloc();
1259 }
1260
1261 // then look at the active filenodes
1262 if (!file) {
1263 file = GetNextActive();
1264 }
1265
1266 if (!file) return 0;
1267
1268 slstat->fCurFile = file;
1269 file->GetNode()->IncSlaveCnt(slstat->GetName());
1270 if (gPerfStats)
1271 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1272 file->GetNode()->GetName(),
1273 file->GetElement()->GetFileName(), kTRUE);
1274 }
1275
1276 // get a packet
1277
1278 TDSetElement *base = file->GetElement();
1279 Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
1280 if (num < 1) num = 1;
1281
1282 Long64_t first = file->GetNextEntry();
1283 Long64_t last = base->GetFirst() + base->GetNum();
1284
1285 if ( first + num >= last ) {
1286 num = last - first;
1287 file->SetDone(); // done
1288
1289 // delete file from active list (unalloc list is single pass, no delete needed)
1291
1292 } else {
1293 file->MoveNextEntry(num);
1294 }
1295
1296
1297 slstat->fCurElem = CreateNewPacket(base, first, num);
1298 if (base->GetEntryList())
1299 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1300
1301 // Flag the first packet of a new run (dataset)
1302 if (firstPacket)
1303 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1304 else
1305 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1306
1307 PDB(kPacketizer,2)
1308 Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1309
1310 return slstat->fCurElem;
1311}
1312
1313////////////////////////////////////////////////////////////////////////////////
1314/// Return the number of workers still processing
1315
1317{
1318 Int_t actw = 0;
1319 TIter nxw(fSlaveStats);
1320 TObject *key;
1321 while ((key = nxw())) {
1322 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1323 if (wrkstat && wrkstat->fCurFile) actw++;
1324 }
1325 // Done
1326 return actw;
1327}
@ kPROOF_FATAL
Definition: MessageTypes.h:43
@ kPROOF_GETENTRIES
Definition: MessageTypes.h:60
@ kPROOF_MESSAGE
Definition: MessageTypes.h:85
ROOT::R::TRInterface & r
Definition: Object.C:4
#define SafeDelete(p)
Definition: RConfig.hxx:529
#define f(i)
Definition: RSha256.hxx:104
#define e(i)
Definition: RSha256.hxx:103
int Int_t
Definition: RtypesCore.h:41
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
long Long_t
Definition: RtypesCore.h:50
bool Bool_t
Definition: RtypesCore.h:59
double Double_t
Definition: RtypesCore.h:55
long long Long64_t
Definition: RtypesCore.h:69
float Float_t
Definition: RtypesCore.h:53
const Bool_t kTRUE
Definition: RtypesCore.h:87
const char Option_t
Definition: RtypesCore.h:62
#define ClassImp(name)
Definition: Rtypes.h:363
R__EXTERN TEnv * gEnv
Definition: TEnv.h:171
#define R__ASSERT(e)
Definition: TError.h:96
void Error(const char *location, const char *msgfmt,...)
#define Printf
Definition: TGeoToOCC.h:18
#define PDB(mask, level)
Definition: TProofDebug.h:56
R__EXTERN TProofServ * gProofServ
Definition: TProofServ.h:347
R__EXTERN TProof * gProof
Definition: TProof.h:1077
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
#define gPerfStats
Int_t BufferSize() const
Definition: TBuffer.h:94
Int_t Length() const
Definition: TBuffer.h:96
virtual void Print(Option_t *option="") const
Default print for collections, calls Print(option, 1).
virtual void AddAll(const TCollection *col)
Add all objects from collection col to this collection.
virtual Int_t GetEntries() const
Definition: TCollection.h:177
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
Definition: TCollection.h:182
Manages an element of a TDSet.
Definition: TDSet.h:66
Long64_t GetEntries(Bool_t istree=kTRUE, Bool_t openfile=kTRUE)
Returns number of entries in tree or objects in file.
Definition: TDSet.cxx:429
const char * GetObjName() const
Definition: TDSet.h:120
Long64_t GetNum() const
Definition: TDSet.h:114
@ kNewRun
Definition: TDSet.h:75
TObject * GetEntryList() const
Definition: TDSet.h:131
void Invalidate()
Definition: TDSet.h:134
void SetTDSetOffset(Long64_t offset)
Definition: TDSet.h:129
void SetNum(Long64_t num)
Definition: TDSet.h:118
const char * GetDirectory() const
Return directory where to look for object.
Definition: TDSet.cxx:253
void SetValid()
Definition: TDSet.h:135
Long64_t GetTDSetOffset() const
Definition: TDSet.h:128
const char * GetFileName() const
Definition: TDSet.h:111
Long64_t GetFirst() const
Definition: TDSet.h:112
This class implements a data set to be used for PROOF processing.
Definition: TDSet.h:153
virtual TDSetElement * Next(Long64_t totalEntries=-1)
Returns next TDSetElement.
Definition: TDSet.cxx:413
virtual void Reset()
Reset or initialize access to the elements.
Definition: TDSet.cxx:1369
Bool_t IsTree() const
Definition: TDSet.h:225
TList * GetListOfElements() const
Definition: TDSet.h:231
@ kSomeInvalid
Definition: TDSet.h:161
A List of entry numbers in a TTree or TChain.
Definition: TEntryList.h:26
virtual Long64_t GetN() const
Definition: TEntryList.h:75
virtual Int_t GetValue(const char *name, Int_t dflt) const
Returns the integer value for a resource.
Definition: TEnv.cxx:491
A TEventList object is a list of selected events (entries) in a TTree.
Definition: TEventList.h:31
virtual Int_t GetN() const
Definition: TEventList.h:56
TObject * Next()
Definition: TCollection.h:249
void Reset()
Definition: TCollection.h:252
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual TObject * After(const TObject *obj) const
Returns the object after object obj.
Definition: TList.cxx:327
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:818
virtual TObject * FindObject(const char *name) const
Delete a TObjLink object.
Definition: TList.cxx:574
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition: TList.cxx:655
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
virtual void Sort(Bool_t order=kSortAscending)
Sort linked list.
Definition: TList.cxx:933
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition: TMap.h:40
void Add(TObject *obj)
This function may not be used (but we need to provide it since it is a pure virtual in TCollection).
Definition: TMap.cxx:53
void DeleteValues()
Remove all (key,value) pairs from the map AND delete the values when they are allocated on the heap.
Definition: TMap.cxx:150
TObject * GetValue(const char *keyname) const
Returns a pointer to the value associated with keyname as name of the key.
Definition: TMap.cxx:235
TObject * FindObject(const char *keyname) const
Check if a (key,value) pair exists with keyname as name of the key.
Definition: TMap.cxx:214
UInt_t What() const
Definition: TMessage.h:75
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Activate(TSocket *sock)
Activate a de-activated socket.
Definition: TMonitor.cxx:250
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual void DeActivateAll()
De-activate all activated sockets.
Definition: TMonitor.cxx:302
virtual void DeActivate(TSocket *sock)
De-activate a socket.
Definition: TMonitor.cxx:284
TList * GetListOfActives() const
Returns a list with all active sockets.
Definition: TMonitor.cxx:498
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:48
Mother of all ROOT objects.
Definition: TObject.h:37
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
virtual Bool_t IsSortable() const
Definition: TObject.h:131
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:866
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual Int_t Compare(const TObject *obj) const
Compare abstract method.
Definition: TObject.cxx:159
virtual void Print(Option_t *option="") const
This method must be overridden when a class wants to print itself.
Definition: TObject.cxx:550
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:854
This class generates packets to be processed on PROOF worker servers.
Definition: TPacketizer.h:39
TFileStat * GetNextActive()
Get next active file.
Long_t fMaxSlaveCnt
Definition: TPacketizer.h:58
TFileNode * NextActiveNode()
Get next active node.
virtual ~TPacketizer()
Destructor.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
Bool_t fDefMaxWrkNode
Definition: TPacketizer.h:69
Int_t fMaxPerfIdx
Definition: TPacketizer.h:56
Int_t AddWorkers(TList *workers)
Adds new workers. Returns the number of workers added, or -1 on failure.
TFileNode * NextUnAllocNode()
Get next unallocated node.
TList * fPackets
Definition: TPacketizer.h:44
Int_t GetActiveWorkers()
Return the number of workers still processing.
TList * fUnAllocated
Definition: TPacketizer.h:50
Bool_t fHeuristicPSiz
Definition: TPacketizer.h:68
void Reset()
Reset the internal datastructure for packet distribution.
Int_t fPacketAsAFraction
Definition: TPacketizer.h:60
TList * fActive
Definition: TPacketizer.h:51
TFileStat * GetNextUnAlloc(TFileNode *node=0)
Get next unallocated file.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r)
Get next packet.
Float_t GetCurrentRate(Bool_t &all)
Get Estimation of the current rate; just summing the current rates of the active workers.
TList * fFileNodes
Definition: TPacketizer.h:49
Long64_t fPacketSize
Definition: TPacketizer.h:53
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Container class for processing statistics.
void SetLastUpdate(Double_t updtTime=0)
Update time stamp either with the passed value (if > 0) or with the current time.
Double_t GetProcTime() const
void IncBytesRead(Long64_t bytesRead)
Long64_t GetEntries() const
void SetLastEntries(Long64_t entries)
Double_t GetCPUTime() const
void IncEntries(Long64_t entries=1)
Long64_t GetBytesRead() const
TSocket * GetSocket() const
Definition: TProofServ.h:257
This class controls a Parallel ROOT Facility, PROOF, cluster.
Definition: TProof.h:316
TObject * GetParameter(const char *par) const
Get specified parameter.
Definition: TProof.cxx:9890
void SendDataSetStatus(const char *msg, UInt_t n, UInt_t tot, Bool_t st)
Send or notify data set status.
Definition: TProof.cxx:9308
Class describing a PROOF worker server.
Definition: TSlave.h:46
TSocket * GetSocket() const
Definition: TSlave.h:134
Int_t GetProtocol() const
Definition: TSlave.h:133
const char * GetName() const
Returns name of object.
Definition: TSlave.h:124
const char * GetOrdinal() const
Definition: TSlave.h:131
Int_t GetPerfIdx() const
Definition: TSlave.h:132
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:817
virtual Bool_t IsValid() const
Definition: TSocket.h:152
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:131
const char * Data() const
Definition: TString.h:364
Bool_t IsNull() const
Definition: TString.h:402
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition: TString.h:619
virtual const char * HostName()
Return the system's host name.
Definition: TSystem.cxx:312
This class represents a WWW compatible URL.
Definition: TUrl.h:35
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Bool_t IsValid() const
Definition: TUrl.h:82
const char * GetHost() const
Definition: TUrl.h:70
const char * GetHostFQDN() const
Return fully qualified domain name of url host.
Definition: TUrl.cxx:467
void SetHost(const char *host)
Definition: TUrl.h:87
const char * GetProtocol() const
Definition: TUrl.h:67
virtual TProofProgressStatus * AddProcessed(TProofProgressStatus *st)=0
The packetizer is a load balancing object created for each query.
TProofProgressStatus * fProgressStatus
virtual Bool_t HandleTimer(TTimer *timer)
Send progress message to client.
Long64_t GetEntriesProcessed() const
TDSetElement * CreateNewPacket(TDSetElement *base, Long64_t first, Long64_t num)
Creates a new TDSetElement from from base packet starting from the first entry with num entries.
const Int_t n
Definition: legend1.C:16
void Add(RHist< DIMENSIONS, PRECISION_TO, STAT_TO... > &to, const RHist< DIMENSIONS, PRECISION_FROM, STAT_FROM... > &from)
Add two histograms.
Definition: RHist.hxx:310
static constexpr double s
Definition: file.py:1
Definition: first.py:1
auto * m
Definition: textangle.C:8