ROOT logo
// @(#)root/proofplayer:$Id$
// Author: Jan Iwaszkiewicz   11/12/06

/*************************************************************************
 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TPacketizerAdaptive
#define ROOT_TPacketizerAdaptive

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TPacketizerAdaptive                                                  //
//                                                                      //
// This packetizer is based on TPacketizer but uses different           //
// load-balancing algorithms and data structures.                       //
// Two main improvements in the load-balancing strategy:                //
// - First one was to change the order in which the files are assigned  //
//   to the computing nodes in such a way that network transfers are    //
//   evenly distributed in the query time. Transfer of the remote files //
//   was often becoming a bottleneck at the end of a query.             //
// - The other improvement is the use of time-based packet size. We     //
//   measure the processing rate of all the nodes and calculate the     //
//   packet size, so that it takes certain amount of time. In this way  //
//   packetizer prevents the situation where the query can't finish     //
//   because of one slow node.                                          //
//                                                                      //
// The data structures: TFileStat, TFileNode and TSlaveStat are         //
// enriched + changed and TFileNode::Compare method is changed.         //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#ifndef ROOT_TVirtualPacketizer
#include "TVirtualPacketizer.h"
#endif


class TMessage;
class TTree;
class TMap;
class TNtupleD;
class TProofStats;
class TRandom;
class TSortedList;

class TPacketizerAdaptive : public TVirtualPacketizer {

public:              // public because of Sun CC bug
   class TFileNode;
   class TFileStat;
   class TSlaveStat;

private:
   TList         *fFileNodes;    // nodes with files
   TList         *fUnAllocated;  // nodes with unallocated files
   TList         *fActive;       // nodes with unfinished files
   Int_t          fMaxPerfIdx;   // maximum of our slaves' performance index
   TList         *fPartitions;   // list of partitions on nodes

   TSortedList   *fFilesToProcess; // Global list of files (TFileStat) to be processed

   Bool_t         fCachePacketSync; // control synchronization of cache and packet sizes
   Double_t       fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync

   Float_t        fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
   Long64_t       fNEventsOnRemLoc;       // number of events in currently
                                          // unalloc files on non-worker loc.
   Float_t        fBaseLocalPreference;   // indicates how much more likely the nodes will be
                                          // to open their local files (1 means indifferent)
   Bool_t         fForceLocal;            // if 1 - eliminate the remote processing

   Long_t         fMaxSlaveCnt;        // maximum number of workers per filenode (Long_t to avoid
                                       // warnings from backward compatibility support)
   Int_t          fPacketAsAFraction;  // used to calculate the packet size
                                       // fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
                                       // fPacketAsAFraction can be interpreted as follows:
                                       // assuming all slaves have equal processing rate, packet size
                                       // is (#events processed by 1 slave) / fPacketSizeAsAFraction.
                                       // It can be set with PROOF_PacketAsAFraction in input list.
   Int_t          fStrategy;           // 0 means the classic and 1 (default) - the adaptive strategy
   Int_t          fTryReassign;        // Controls attempts to reassign packets (0 == no reassignment)

   TPacketizerAdaptive();
   TPacketizerAdaptive(const TPacketizerAdaptive&);    // no implementation, will generate
   void           InitStats();                         // initialise the stats
   void operator=(const TPacketizerAdaptive&);         // error on accidental usage

   TFileNode     *NextNode();
   void           RemoveUnAllocNode(TFileNode *);

   TFileNode     *NextActiveNode();
   void           RemoveActiveNode(TFileNode *);

   TFileStat     *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
   TFileStat     *GetNextActive();
   void           RemoveActive(TFileStat *file);

   void           Reset();
   void           ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
   Int_t          ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
   void           SplitPerHost(TList *elements, TList **listOfMissingFiles);

public:
   TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num,
                       TList *input, TProofProgressStatus *st);
   virtual ~TPacketizerAdaptive();

   Int_t         AddProcessed(TSlave *sl, TProofProgressStatus *st,
                               Double_t latency, TList **listOfMissingFiles = 0);
   Int_t         GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls);
   Float_t       GetCurrentRate(Bool_t &all);
   Int_t         CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
   TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
   void          MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);

   Int_t         GetActiveWorkers();

   ClassDef(TPacketizerAdaptive,0)  //Generate work packets for parallel processing
};

#endif
 TPacketizerAdaptive.h:1
 TPacketizerAdaptive.h:2
 TPacketizerAdaptive.h:3
 TPacketizerAdaptive.h:4
 TPacketizerAdaptive.h:5
 TPacketizerAdaptive.h:6
 TPacketizerAdaptive.h:7
 TPacketizerAdaptive.h:8
 TPacketizerAdaptive.h:9
 TPacketizerAdaptive.h:10
 TPacketizerAdaptive.h:11
 TPacketizerAdaptive.h:12
 TPacketizerAdaptive.h:13
 TPacketizerAdaptive.h:14
 TPacketizerAdaptive.h:15
 TPacketizerAdaptive.h:16
 TPacketizerAdaptive.h:17
 TPacketizerAdaptive.h:18
 TPacketizerAdaptive.h:19
 TPacketizerAdaptive.h:20
 TPacketizerAdaptive.h:21
 TPacketizerAdaptive.h:22
 TPacketizerAdaptive.h:23
 TPacketizerAdaptive.h:24
 TPacketizerAdaptive.h:25
 TPacketizerAdaptive.h:26
 TPacketizerAdaptive.h:27
 TPacketizerAdaptive.h:28
 TPacketizerAdaptive.h:29
 TPacketizerAdaptive.h:30
 TPacketizerAdaptive.h:31
 TPacketizerAdaptive.h:32
 TPacketizerAdaptive.h:33
 TPacketizerAdaptive.h:34
 TPacketizerAdaptive.h:35
 TPacketizerAdaptive.h:36
 TPacketizerAdaptive.h:37
 TPacketizerAdaptive.h:38
 TPacketizerAdaptive.h:39
 TPacketizerAdaptive.h:40
 TPacketizerAdaptive.h:41
 TPacketizerAdaptive.h:42
 TPacketizerAdaptive.h:43
 TPacketizerAdaptive.h:44
 TPacketizerAdaptive.h:45
 TPacketizerAdaptive.h:46
 TPacketizerAdaptive.h:47
 TPacketizerAdaptive.h:48
 TPacketizerAdaptive.h:49
 TPacketizerAdaptive.h:50
 TPacketizerAdaptive.h:51
 TPacketizerAdaptive.h:52
 TPacketizerAdaptive.h:53
 TPacketizerAdaptive.h:54
 TPacketizerAdaptive.h:55
 TPacketizerAdaptive.h:56
 TPacketizerAdaptive.h:57
 TPacketizerAdaptive.h:58
 TPacketizerAdaptive.h:59
 TPacketizerAdaptive.h:60
 TPacketizerAdaptive.h:61
 TPacketizerAdaptive.h:62
 TPacketizerAdaptive.h:63
 TPacketizerAdaptive.h:64
 TPacketizerAdaptive.h:65
 TPacketizerAdaptive.h:66
 TPacketizerAdaptive.h:67
 TPacketizerAdaptive.h:68
 TPacketizerAdaptive.h:69
 TPacketizerAdaptive.h:70
 TPacketizerAdaptive.h:71
 TPacketizerAdaptive.h:72
 TPacketizerAdaptive.h:73
 TPacketizerAdaptive.h:74
 TPacketizerAdaptive.h:75
 TPacketizerAdaptive.h:76
 TPacketizerAdaptive.h:77
 TPacketizerAdaptive.h:78
 TPacketizerAdaptive.h:79
 TPacketizerAdaptive.h:80
 TPacketizerAdaptive.h:81
 TPacketizerAdaptive.h:82
 TPacketizerAdaptive.h:83
 TPacketizerAdaptive.h:84
 TPacketizerAdaptive.h:85
 TPacketizerAdaptive.h:86
 TPacketizerAdaptive.h:87
 TPacketizerAdaptive.h:88
 TPacketizerAdaptive.h:89
 TPacketizerAdaptive.h:90
 TPacketizerAdaptive.h:91
 TPacketizerAdaptive.h:92
 TPacketizerAdaptive.h:93
 TPacketizerAdaptive.h:94
 TPacketizerAdaptive.h:95
 TPacketizerAdaptive.h:96
 TPacketizerAdaptive.h:97
 TPacketizerAdaptive.h:98
 TPacketizerAdaptive.h:99
 TPacketizerAdaptive.h:100
 TPacketizerAdaptive.h:101
 TPacketizerAdaptive.h:102
 TPacketizerAdaptive.h:103
 TPacketizerAdaptive.h:104
 TPacketizerAdaptive.h:105
 TPacketizerAdaptive.h:106
 TPacketizerAdaptive.h:107
 TPacketizerAdaptive.h:108
 TPacketizerAdaptive.h:109
 TPacketizerAdaptive.h:110
 TPacketizerAdaptive.h:111
 TPacketizerAdaptive.h:112
 TPacketizerAdaptive.h:113
 TPacketizerAdaptive.h:114
 TPacketizerAdaptive.h:115
 TPacketizerAdaptive.h:116
 TPacketizerAdaptive.h:117
 TPacketizerAdaptive.h:118
 TPacketizerAdaptive.h:119
 TPacketizerAdaptive.h:120
 TPacketizerAdaptive.h:121
 TPacketizerAdaptive.h:122
 TPacketizerAdaptive.h:123
 TPacketizerAdaptive.h:124
 TPacketizerAdaptive.h:125