Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TPacketizerAdaptive.h
Go to the documentation of this file.
1// @(#)root/proofplayer:$Id$
2// Author: Jan Iwaszkiewicz 11/12/06
3
4/*************************************************************************
5 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#ifndef ROOT_TPacketizerAdaptive
13#define ROOT_TPacketizerAdaptive
14
15//////////////////////////////////////////////////////////////////////////
16// //
17// TPacketizerAdaptive //
18// //
19// This packetizer is based on TPacketizer but uses different //
20// load-balancing algorithms and data structures. //
21// Two main improvements in the load-balancing strategy: //
22// - First one was to change the order in which the files are assigned //
23// to the computing nodes in such a way that network transfers are //
24// evenly distributed in the query time. Transfer of the remote files //
25// was often becoming a bottleneck at the end of a query. //
26// - The other improvement is the use of time-based packet size. We //
27// measure the processing rate of all the nodes and calculate the //
28// packet size, so that it takes certain amount of time. In this way //
29// packetizer prevents the situation where the query can't finish //
30// because of one slow node. //
31// //
32// The data structures: TFileStat, TFileNode and TSlaveStat are //
33// enriched + changed and TFileNode::Compare method is changed. //
34// //
35//////////////////////////////////////////////////////////////////////////
36
37#include "TVirtualPacketizer.h"
38
39
40class TMessage;
41class TTree;
42class TMap;
43class TNtupleD;
44class TProofStats;
45class TRandom;
46class TSortedList;
47
49
50public: // public because of Sun CC bug
51 class TFileNode;
52 class TFileStat;
53 class TSlaveStat;
54
55private:
56 TList *fFileNodes; // nodes with files
57 TList *fUnAllocated; // nodes with unallocated files
58 TList *fActive; // nodes with unfinished files
59 Int_t fMaxPerfIdx; // maximum of our slaves' performance index
60 TList *fPartitions; // list of partitions on nodes
61
62 TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed
63
64 Bool_t fCachePacketSync; // control synchronization of cache and packet sizes
65 Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
66
67 Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
68 Long64_t fNEventsOnRemLoc; // number of events in currently
69 // unalloc files on non-worker loc.
70 Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be
71 // to open their local files (1 means indifferent)
72 Bool_t fForceLocal; // if 1 - eliminate the remote processing
73
74 Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid
75 // warnings from backward compatibility support)
76 Int_t fPacketAsAFraction; // used to calculate the packet size
77 // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
78 // fPacketAsAFraction can be interpreted as follows:
79 // assuming all slaves have equal processing rate, packet size
80 // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
81 // It can be set with PROOF_PacketAsAFraction in input list.
82 Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
83 Int_t fTryReassign; // Controls attempts to reassign packets (0 == no reassignment)
84
86 TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate
87 void InitStats(); // initialise the stats
88 void operator=(const TPacketizerAdaptive&); // error on accidental usage
89
92
95
96 TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
99
100 void Reset();
101 void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
102 Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
103 void SplitPerHost(TList *elements, TList **listOfMissingFiles);
104
105public:
108 ~TPacketizerAdaptive() override;
109
111 Double_t latency, TList **listOfMissingFiles = 0) override;
113 Float_t GetCurrentRate(Bool_t &all) override;
114 Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
115 TDSetElement *GetNextPacket(TSlave *sl, TMessage *r) override;
116 void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles) override;
117
118 Int_t GetActiveWorkers() override;
119
120 ClassDefOverride(TPacketizerAdaptive,0) //Generate work packets for parallel processing
121};
122
123#endif
#define e(i)
Definition RSha256.hxx:103
long Long_t
Definition RtypesCore.h:54
float Float_t
Definition RtypesCore.h:57
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
long long Long64_t
Definition RtypesCore.h:80
#define ClassDefOverride(name, id)
Definition Rtypes.h:341
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t r
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t bytes
Manages an element of a TDSet.
Definition TDSet.h:66
This class implements a data set to be used for PROOF processing.
Definition TDSet.h:153
A doubly linked list.
Definition TList.h:38
TMap implements an associative array of (key,value) pairs using a THashTable for efficient retrieval ...
Definition TMap.h:40
A simple TTree restricted to a list of double variables only.
Definition TNtupleD.h:28
Mother of all ROOT objects.
Definition TObject.h:41
This packetizer is based on TPacketizer but uses different load-balancing algorithms and data structu...
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles)
The file in the listOfMissingFiles can appear several times; in order to fix that,...
void SplitPerHost(TList *elements, TList **listOfMissingFiles)
Split into per host entries The files in the listOfMissingFiles can appear several times; in order to...
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles) override
This method can be called at any time during processing as an effect of handling kPROOF_STOPPROCESS I...
void operator=(const TPacketizerAdaptive &)
TFileNode * NextNode()
Get next node which has unallocated files.
Float_t GetCurrentRate(Bool_t &all) override
Get Estimation of the current rate; just summing the current rates of the active workers.
void RemoveActive(TFileStat *file)
Remove file from the list of actives.
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st, Double_t latency, TList **listOfMissingFiles=0) override
To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS message (when the worker was a...
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent=-1, Bool_t byfile=kFALSE)
Check existence of file/dir/tree an get number of entries.
TPacketizerAdaptive(const TPacketizerAdaptive &)
Int_t GetActiveWorkers() override
Return the number of workers still processing.
void RemoveActiveNode(TFileNode *)
Remove node from the list of actives.
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls) override
Get estimation for the number of processed entries and bytes read at time t, based on the numbers alr...
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent)
The result depends on the fStrategy.
~TPacketizerAdaptive() override
Destructor.
TFileNode * NextActiveNode()
Get next active node.
TDSetElement * GetNextPacket(TSlave *sl, TMessage *r) override
Get next packet; A meaningfull difference to TPacketizer is the fact that this packetizer,...
TFileStat * GetNextUnAlloc(TFileNode *node=0, const char *nodeHostName=0)
Get next unallocated file from 'node' or other nodes: First try 'node'.
void RemoveUnAllocNode(TFileNode *)
Remove unallocated node.
void Reset()
Reset the internal data structure for packet distribution.
void InitStats()
(re)initialise the statistics called at the begining or after a worker dies.
TSortedList * fFilesToProcess
TFileStat * GetNextActive()
Get next active file.
Container class for processing statistics.
This is the base class for the ROOT Random number generators.
Definition TRandom.h:27
Class describing a PROOF worker server.
Definition TSlave.h:46
A sorted doubly linked list.
Definition TSortedList.h:28
A TTree represents a columnar dataset.
Definition TTree.h:79
The packetizer is a load balancing object created for each query.
Definition file.py:1
Definition first.py:1