// @(#)root/proofplayer:$Id$
// Author: Maarten Ballintijn    9/7/2002

/*************************************************************************
 * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TVirtualPacketizer
#define ROOT_TVirtualPacketizer

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TVirtualPacketizer                                                   //
//                                                                      //
// Packetizer is a load balancing object created for each query.        //
// It generates packets to be processed on PROOF worker servers.        //
// A packet is an event range (begin entry and number of entries) or    //
// object range (first object and number of objects) in a TTree         //
// (entries) or a directory (objects) in a file.                        //
// Packets are generated taking into account the performance of the     //
// remote machine, the time it took to process a previous packet on     //
// the remote machine, the locality of the database files, etc.         //
//                                                                      //
// TVirtualPacketizer includes common parts of PROOF packetizers.       //
// Look in subclasses for details.                                      //
// The default packetizer is TPacketizerAdaptive.                       //
// To use an alternative one, for instance - the TPacketizer, call:     //
// proof->SetParameter("PROOF_Packetizer", "TPacketizer");              //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#ifndef ROOT_TObject
#include "TObject.h"
#endif
#ifndef ROOT_TSlave
#include "TSlave.h"
#endif
#ifndef ROOT_TProofProgressStatus
#include "TProofProgressStatus.h"
#endif
#ifndef ROOT_TTime
#include "TTime.h"
#endif


class TDSet;
class TDSetElement;
class TList;
class TMap;
class TMessage;
class TNtuple;
class TNtupleD;
class TProofProgressInfo;
class TSlave;


class TVirtualPacketizer : public TObject {

public:              // public because of Sun CC bug
   class TVirtualSlaveStat;

protected:
   enum EUseEstOpt {        // Option for usage of estimated values
      kEstOff     = 0,
      kEstCurrent = 1,
      kEstAverage = 2
   };

   // General configuration parameters
   Double_t  fMinPacketTime; // minimum packet time
   Double_t  fMaxPacketTime; // maximum packet time
   TList    *fConfigParams;  // List of configuration parameters

   TMap     *fSlaveStats;   // slave status, keyed by correspondig TSlave

   TProofProgressStatus *fProgressStatus; // pointer to status in the player.
   TTimer   *fProgress;     // progress updates timer

   Long64_t  fTotalEntries; // total number of entries to be distributed;
                            // not used in the progressive packetizer
   TList    *fFailedPackets;// a list of packets that failed while processing

   // Members for progress info
   TTime     fStartTime;    // time offset
   Float_t   fInitTime;     // time before processing
   Float_t   fProcTime;     // time since start of processing
   Float_t   fTimeUpdt;     // time between updates
   TNtupleD *fCircProg;     // Keeps circular info for "instantenous"
                            // rate calculations
   Long_t    fCircN;        // Circularity

   TNtuple  *fProgressPerf; // {Active workers, evt rate, MBs read} as a function of processing time
   Float_t   fProcTimeLast; // Time of the last measurement
   Int_t     fActWrksLast;  // Active workers at fProcTimeLast
   Float_t   fEvtRateLast;  // Evt rate at fProcTimeLast
   Float_t   fMBsReadLast;  // MBs read at fProcTimeLast
   Float_t   fEffSessLast;  // Number of effective sessions at fProcTimeLast
   Bool_t    fAWLastFill;   // Whether to fill the last measurement
   Float_t   fReportPeriod; // Time between reports if nothing changes (estimated proc time / 100)

   EUseEstOpt fUseEstOpt;   // Control usage of estimated values for the progress info

   Bool_t   fValid;           // Constructed properly?
   Bool_t   fStop;            // Termination of Process() requested?

   TString  fDataSet;         // Name of the dataset being processed (for dataset-driven runs)

   TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0);
   TVirtualPacketizer(const TVirtualPacketizer &);  // no implementation, will generate
   void operator=(const TVirtualPacketizer &);      // error on accidental usage

   TDSetElement  *CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num);
   Long64_t       GetEntries(Bool_t tree, TDSetElement *e); // Num of entries or objects
   virtual Bool_t HandleTimer(TTimer *timer);

public:
   enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
   virtual ~TVirtualPacketizer();

   virtual Int_t           AssignWork(TDSet* /*dset*/, Long64_t /*first*/, Long64_t /*num*/) { return -1; }
   Bool_t                  IsValid() const { return fValid; }
   Long64_t                GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
   virtual Int_t           GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
                           { ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
   virtual Float_t         GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
   Long64_t                GetTotalEntries() const { return fTotalEntries; }
   virtual TDSetElement   *GetNextPacket(TSlave *sl, TMessage *r);
   virtual void            SetInitTime();
   virtual void            StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE);
   TList                  *GetFailedPackets() { return fFailedPackets; }
   void                    SetFailedPackets(TList *list) { fFailedPackets = list; }
   virtual Int_t           AddWorkers(TList *workers);

   Long64_t      GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
   Long64_t      GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
   Double_t      GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
   Float_t       GetInitTime() const { return fInitTime; }
   Float_t       GetProcTime() const { return fProcTime; }
   TNtuple      *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
                                                                } else { return fProgressPerf;} }
   TList        *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
                                                                } else { return fConfigParams;} }
   virtual void  MarkBad(TSlave * /*s*/, TProofProgressStatus * /*status*/, TList ** /*missingFiles*/) { return; }
   virtual Int_t AddProcessed(TSlave * /*sl*/, TProofProgressStatus * /*st*/,
                    Double_t /*lat*/, TList ** /*missingFiles*/) { return 0; }
   TProofProgressStatus *GetStatus() { return fProgressStatus; }
   void          SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
   void          SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }

   TMap         *GetSlaveStats() const { return fSlaveStats; }

   virtual Int_t GetActiveWorkers() { return -1; }

   ClassDef(TVirtualPacketizer,0)  //Generate work packets for parallel processing
};

//------------------------------------------------------------------------------

class TVirtualPacketizer::TVirtualSlaveStat : public TObject {

friend class TPacketizerAdaptive;
friend class TPacketizer;

protected:
   TString        fWrkFQDN;       // Worker FQDN
   TSlave        *fSlave;         // corresponding TSlave record
   TProofProgressStatus *fStatus; // status as of the last finished packet

public:
   const char *GetName() const { return fWrkFQDN.Data(); }
   const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
   Long64_t    GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
   Double_t    GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
   Float_t     GetAvgRate() { return fStatus->GetRate(); }
   TProofProgressStatus *GetProgressStatus() { return fStatus; }
   virtual TProofProgressStatus *AddProcessed(TProofProgressStatus *st) = 0;
};

#endif
 TVirtualPacketizer.h:1
 TVirtualPacketizer.h:2
 TVirtualPacketizer.h:3
 TVirtualPacketizer.h:4
 TVirtualPacketizer.h:5
 TVirtualPacketizer.h:6
 TVirtualPacketizer.h:7
 TVirtualPacketizer.h:8
 TVirtualPacketizer.h:9
 TVirtualPacketizer.h:10
 TVirtualPacketizer.h:11
 TVirtualPacketizer.h:12
 TVirtualPacketizer.h:13
 TVirtualPacketizer.h:14
 TVirtualPacketizer.h:15
 TVirtualPacketizer.h:16
 TVirtualPacketizer.h:17
 TVirtualPacketizer.h:18
 TVirtualPacketizer.h:19
 TVirtualPacketizer.h:20
 TVirtualPacketizer.h:21
 TVirtualPacketizer.h:22
 TVirtualPacketizer.h:23
 TVirtualPacketizer.h:24
 TVirtualPacketizer.h:25
 TVirtualPacketizer.h:26
 TVirtualPacketizer.h:27
 TVirtualPacketizer.h:28
 TVirtualPacketizer.h:29
 TVirtualPacketizer.h:30
 TVirtualPacketizer.h:31
 TVirtualPacketizer.h:32
 TVirtualPacketizer.h:33
 TVirtualPacketizer.h:34
 TVirtualPacketizer.h:35
 TVirtualPacketizer.h:36
 TVirtualPacketizer.h:37
 TVirtualPacketizer.h:38
 TVirtualPacketizer.h:39
 TVirtualPacketizer.h:40
 TVirtualPacketizer.h:41
 TVirtualPacketizer.h:42
 TVirtualPacketizer.h:43
 TVirtualPacketizer.h:44
 TVirtualPacketizer.h:45
 TVirtualPacketizer.h:46
 TVirtualPacketizer.h:47
 TVirtualPacketizer.h:48
 TVirtualPacketizer.h:49
 TVirtualPacketizer.h:50
 TVirtualPacketizer.h:51
 TVirtualPacketizer.h:52
 TVirtualPacketizer.h:53
 TVirtualPacketizer.h:54
 TVirtualPacketizer.h:55
 TVirtualPacketizer.h:56
 TVirtualPacketizer.h:57
 TVirtualPacketizer.h:58
 TVirtualPacketizer.h:59
 TVirtualPacketizer.h:60
 TVirtualPacketizer.h:61
 TVirtualPacketizer.h:62
 TVirtualPacketizer.h:63
 TVirtualPacketizer.h:64
 TVirtualPacketizer.h:65
 TVirtualPacketizer.h:66
 TVirtualPacketizer.h:67
 TVirtualPacketizer.h:68
 TVirtualPacketizer.h:69
 TVirtualPacketizer.h:70
 TVirtualPacketizer.h:71
 TVirtualPacketizer.h:72
 TVirtualPacketizer.h:73
 TVirtualPacketizer.h:74
 TVirtualPacketizer.h:75
 TVirtualPacketizer.h:76
 TVirtualPacketizer.h:77
 TVirtualPacketizer.h:78
 TVirtualPacketizer.h:79
 TVirtualPacketizer.h:80
 TVirtualPacketizer.h:81
 TVirtualPacketizer.h:82
 TVirtualPacketizer.h:83
 TVirtualPacketizer.h:84
 TVirtualPacketizer.h:85
 TVirtualPacketizer.h:86
 TVirtualPacketizer.h:87
 TVirtualPacketizer.h:88
 TVirtualPacketizer.h:89
 TVirtualPacketizer.h:90
 TVirtualPacketizer.h:91
 TVirtualPacketizer.h:92
 TVirtualPacketizer.h:93
 TVirtualPacketizer.h:94
 TVirtualPacketizer.h:95
 TVirtualPacketizer.h:96
 TVirtualPacketizer.h:97
 TVirtualPacketizer.h:98
 TVirtualPacketizer.h:99
 TVirtualPacketizer.h:100
 TVirtualPacketizer.h:101
 TVirtualPacketizer.h:102
 TVirtualPacketizer.h:103
 TVirtualPacketizer.h:104
 TVirtualPacketizer.h:105
 TVirtualPacketizer.h:106
 TVirtualPacketizer.h:107
 TVirtualPacketizer.h:108
 TVirtualPacketizer.h:109
 TVirtualPacketizer.h:110
 TVirtualPacketizer.h:111
 TVirtualPacketizer.h:112
 TVirtualPacketizer.h:113
 TVirtualPacketizer.h:114
 TVirtualPacketizer.h:115
 TVirtualPacketizer.h:116
 TVirtualPacketizer.h:117
 TVirtualPacketizer.h:118
 TVirtualPacketizer.h:119
 TVirtualPacketizer.h:120
 TVirtualPacketizer.h:121
 TVirtualPacketizer.h:122
 TVirtualPacketizer.h:123
 TVirtualPacketizer.h:124
 TVirtualPacketizer.h:125
 TVirtualPacketizer.h:126
 TVirtualPacketizer.h:127
 TVirtualPacketizer.h:128
 TVirtualPacketizer.h:129
 TVirtualPacketizer.h:130
 TVirtualPacketizer.h:131
 TVirtualPacketizer.h:132
 TVirtualPacketizer.h:133
 TVirtualPacketizer.h:134
 TVirtualPacketizer.h:135
 TVirtualPacketizer.h:136
 TVirtualPacketizer.h:137
 TVirtualPacketizer.h:138
 TVirtualPacketizer.h:139
 TVirtualPacketizer.h:140
 TVirtualPacketizer.h:141
 TVirtualPacketizer.h:142
 TVirtualPacketizer.h:143
 TVirtualPacketizer.h:144
 TVirtualPacketizer.h:145
 TVirtualPacketizer.h:146
 TVirtualPacketizer.h:147
 TVirtualPacketizer.h:148
 TVirtualPacketizer.h:149
 TVirtualPacketizer.h:150
 TVirtualPacketizer.h:151
 TVirtualPacketizer.h:152
 TVirtualPacketizer.h:153
 TVirtualPacketizer.h:154
 TVirtualPacketizer.h:155
 TVirtualPacketizer.h:156
 TVirtualPacketizer.h:157
 TVirtualPacketizer.h:158
 TVirtualPacketizer.h:159
 TVirtualPacketizer.h:160
 TVirtualPacketizer.h:161
 TVirtualPacketizer.h:162
 TVirtualPacketizer.h:163
 TVirtualPacketizer.h:164
 TVirtualPacketizer.h:165
 TVirtualPacketizer.h:166
 TVirtualPacketizer.h:167
 TVirtualPacketizer.h:168
 TVirtualPacketizer.h:169
 TVirtualPacketizer.h:170
 TVirtualPacketizer.h:171
 TVirtualPacketizer.h:172
 TVirtualPacketizer.h:173
 TVirtualPacketizer.h:174
 TVirtualPacketizer.h:175
 TVirtualPacketizer.h:176
 TVirtualPacketizer.h:177
 TVirtualPacketizer.h:178
 TVirtualPacketizer.h:179
 TVirtualPacketizer.h:180
 TVirtualPacketizer.h:181
 TVirtualPacketizer.h:182
 TVirtualPacketizer.h:183