ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
parallelMergeServer.C
Go to the documentation of this file.
1 #include "TMessage.h"
2 #include "TBenchmark.h"
3 #include "TSocket.h"
4 #include "TH2.h"
5 #include "TTree.h"
6 #include "TMemFile.h"
7 #include "TRandom.h"
8 #include "TError.h"
9 #include "TFileMerger.h"
10 
11 #include "TServerSocket.h"
12 #include "TPad.h"
13 #include "TCanvas.h"
14 #include "TMonitor.h"
15 
16 #include "TFileCacheWrite.h"
17 #include "TSystem.h"
18 #include "THashTable.h"
19 
20 #include "TMath.h"
21 #include "TTimeStamp.h"
22 
23 const int kIncremental = 0;
24 const int kReplaceImmediately = 1;
25 const int kReplaceWait = 2;
26 
27 #include "TKey.h"
29 {
30 
31  if (dir==0) return kFALSE;
32 
33  TIter nextkey(dir->GetListOfKeys());
34  TKey *key;
35  while( (key = (TKey*)nextkey()) ) {
36  TClass *cl = TClass::GetClass(key->GetClassName());
37  if (cl->InheritsFrom(TDirectory::Class())) {
38  TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
39  if (!subdir) {
40  subdir = (TDirectory *)key->ReadObj();
41  }
42  if (R__NeedInitialMerge(subdir)) {
43  return kTRUE;
44  }
45  } else {
46  if (0 != cl->GetResetAfterMerge()) {
47  return kTRUE;
48  }
49  }
50  }
51  return kFALSE;
52 }
53 
54 static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
55 {
56  if (dir==0) return;
57 
58  TIter nextkey(dir->GetListOfKeys());
59  TKey *key;
60  while( (key = (TKey*)nextkey()) ) {
61  TClass *cl = TClass::GetClass(key->GetClassName());
62  if (cl->InheritsFrom(TDirectory::Class())) {
63  TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
64  if (!subdir) {
65  subdir = (TDirectory *)key->ReadObj();
66  }
67  R__DeleteObject(subdir,withReset);
68  } else {
69  Bool_t todelete = kFALSE;
70  if (withReset) {
71  todelete = (0 != cl->GetResetAfterMerge());
72  } else {
73  todelete = (0 == cl->GetResetAfterMerge());
74  }
75  if (todelete) {
76  key->Delete();
77  dir->GetListOfKeys()->Remove(key);
78  delete key;
79  }
80  }
81  }
82 }
83 
84 static void R__MigrateKey(TDirectory *destination, TDirectory *source)
85 {
86  if (destination==0 || source==0) return;
87 
88  TIter nextkey(source->GetListOfKeys());
89  TKey *key;
90  while( (key = (TKey*)nextkey()) ) {
91  TClass *cl = TClass::GetClass(key->GetClassName());
92  if (cl->InheritsFrom(TDirectory::Class())) {
93  TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
94  if (!source_subdir) {
95  source_subdir = (TDirectory *)key->ReadObj();
96  }
97  TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
98  if (!destination_subdir) {
99  destination_subdir = destination->mkdir(key->GetName());
100  }
101  R__MigrateKey(destination,source);
102  } else {
103  TKey *oldkey = destination->GetKey(key->GetName());
104  if (oldkey) {
105  oldkey->Delete();
106  delete oldkey;
107  }
108  TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
109  destination->GetFile()->SumBuffer(newkey->GetObjlen());
110  newkey->WriteFile(0);
111  if (destination->GetFile()->TestBit(TFile::kWriteError)) {
112  return;
113  }
114  }
115  }
116  destination->SaveSelf();
117 }
118 
119 struct ClientInfo
120 {
121  TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
122  TString fLocalName;
123  UInt_t fContactsCount;
124  TTimeStamp fLastContact;
125  Double_t fTimeSincePrevContact;
126 
127  ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
128  ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
129  fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
130  }
131 
132  void Set(TFile *file)
133  {
134  // Register the new file as coming from this client.
135  if (file != fFile) {
136  // We need to keep any of the keys from the previous file that
137  // are not in the new file.
138  if (fFile) {
139  R__MigrateKey(fFile,file);
140  // delete the previous memory file (if any)
141  delete file;
142  } else {
143  fFile = file;
144  }
145  }
146  TTimeStamp now;
147  fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
148  fLastContact = now;
149  ++fContactsCount;
150  }
151 };
152 
153 struct ParallelFileMerger : public TObject
154 {
155  typedef std::vector<ClientInfo> ClientColl_t;
156 
157  TString fFilename;
158  TBits fClientsContact; //
159  UInt_t fNClientsContact; //
160  ClientColl_t fClients;
161  TTimeStamp fLastMerge;
162  TFileMerger fMerger;
163 
164  ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
165  {
166  // Default constructor.
167 
168  fMerger.SetPrintLevel(0);
169  fMerger.OutputFile(filename,"RECREATE");
170  if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
171  }
172 
173  ~ParallelFileMerger()
174  {
175  // Destructor.
176 
177  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
178  fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
179  }
180  for( ClientColl_t::iterator iter = fClients.begin();
181  iter != fClients.end();
182  ++iter)
183  {
184  delete iter->fFile;
185  }
186  }
187 
188  ULong_t Hash() const
189  {
190  // Return hash value for this object.
191  return fFilename.Hash();
192  }
193 
194  const char *GetName() const
195  {
196  // Return the name of the object which is the name of the output file.
197  return fFilename;
198  }
199 
200  Bool_t InitialMerge(TFile *input)
201  {
202  // Initial merge of the input to copy the resetable object (TTree) into the output
203  // and remove them from the input file.
204 
205  fMerger.AddFile(input);
206 
207  Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable);
208 
209  R__DeleteObject(input,kTRUE);
210  return result;
211  }
212 
213  Bool_t Merge()
214  {
215  // Merge the current inputs into the output file.
216 
217  R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
218  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
219  fMerger.AddFile(fClients[f].fFile);
220  }
221  Bool_t result = fMerger.PartialMerge(TFileMerger::kAllIncremental);
222 
223  // Remove any 'resetable' object (like TTree) from the input file so that they will not
224  // be re-merged. Keep only the object that always need to be re-merged (Histograms).
225  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
226  if (fClients[f].fFile) {
227  R__DeleteObject(fClients[f].fFile,kTRUE);
228  } else {
229  // We back up the file (probably due to memory constraint)
230  TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
231  R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
232  file->Write();
233  delete file;
234  }
235  }
236  fLastMerge = TTimeStamp();
237  fNClientsContact = 0;
238  fClientsContact.Clear();
239 
240  return result;
241  }
242 
243  Bool_t NeedFinalMerge()
244  {
245  // Return true, if there is any data that has not been merged.
246 
247  return fClientsContact.CountBits() > 0;
248  }
249 
250  Bool_t NeedMerge(Float_t clientThreshold)
251  {
252  // Return true, if enough client have reported
253 
254  if (fClients.size()==0) {
255  return kFALSE;
256  }
257 
258  // Calculate average and rms of the time between the last 2 contacts.
259  Double_t sum = 0;
260  Double_t sum2 = 0;
261  for(unsigned int c = 0 ; c < fClients.size(); ++c) {
262  sum += fClients[c].fTimeSincePrevContact;
263  sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
264  }
265  Double_t avg = sum / fClients.size();
266  Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
267  Double_t target = avg + 2*sigma;
268  TTimeStamp now;
269  if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
270 // Float_t cut = clientThreshold * fClients.size();
271 // if (!(fClientsContact.CountBits() > cut )) {
272 // for(unsigned int c = 0 ; c < fClients.size(); ++c) {
273 // fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
274 // }
275 // fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
276 // }
277  return kTRUE;
278  }
279  Float_t cut = clientThreshold * fClients.size();
280  return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
281  }
282 
283  void RegisterClient(UInt_t clientId, TFile *file)
284  {
285  // Register that a client has sent a file.
286 
287  ++fNClientsContact;
288  fClientsContact.SetBitNumber(clientId);
289  if (fClients.size() < clientId+1) {
290  fClients.push_back( ClientInfo(fFilename,clientId) );
291  }
292  fClients[clientId].Set(file);
293  }
294 
295  ClassDef(ParallelFileMerger,0);
296 };
297 
298 void parallelMergeServer(bool cache = false) {
299  // This script shows how to make a simple iterative server that
300  // can accept connections while handling currently open connections.
301  // Compare this script to hserv.C that blocks on accept.
302  // In this script a server socket is created and added to a monitor.
303  // A monitor object is used to monitor connection requests on
304  // the server socket. After accepting the connection
305  // the new socket is added to the monitor and immediately ready
306  // for use. Once two connections are accepted the server socket
307  // is removed from the monitor and closed. The monitor continues
308  // monitoring the sockets.
309  //
310  // To run this demo do the following:
311  // - Open three windows
312  // - Start ROOT in all three windows
313  // - Execute in the first window: .x hserv2.C
314  // - Execute in the second and third windows: .x hclient.C
315  //Author: Fons Rademakers
316 
317  // Open a server socket looking for connections on a named service or
318  // on a specified port.
319  //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
320  TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
321  if (!ss->IsValid()) {
322  return;
323  }
324 
325  TMonitor *mon = new TMonitor;
326 
327  mon->Add(ss);
328 
329  UInt_t clientCount = 0;
330  UInt_t clientIndex = 0;
331 
332  THashTable mergers;
333 
334  enum StatusKind {
335  kStartConnection = 0,
336  kProtocol = 1,
337 
338  kProtocolVersion = 1
339  };
340 
341  printf("fastMergeServerHist ready to accept connections\n");
342  while (1) {
343  TMessage *mess;
344  TSocket *s;
345 
346  // NOTE: this needs to be update to handle the case where the client
347  // dies.
348  s = mon->Select();
349 
350  if (s->IsA() == TServerSocket::Class()) {
351  if (clientCount > 100) {
352  printf("only accept 100 clients connections\n");
353  mon->Remove(ss);
354  ss->Close();
355  } else {
356  TSocket *client = ((TServerSocket *)s)->Accept();
357  client->Send(clientIndex, kStartConnection);
358  client->Send(kProtocolVersion, kProtocol);
359  ++clientCount;
360  ++clientIndex;
361  mon->Add(client);
362  printf("Accept %d connections\n",clientCount);
363  }
364  continue;
365  }
366 
367  s->Recv(mess);
368 
369  if (mess==0) {
370  Error("fastMergeServer","The client did not send a message\n");
371  } else if (mess->What() == kMESS_STRING) {
372  char str[64];
373  mess->ReadString(str, 64);
374  printf("Client %d: %s\n", clientCount, str);
375  mon->Remove(s);
376  printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
377  s->GetBytesSent());
378  s->Close();
379  --clientCount;
380  if (mon->GetActive() == 0 || clientCount == 0) {
381  printf("No more active clients... stopping\n");
382  break;
383  }
384  } else if (mess->What() == kMESS_ANY) {
385 
388  Int_t clientId;
389  mess->ReadInt(clientId);
390  mess->ReadTString(filename);
391  mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
392 
393  // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
394 
395  TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
396  mess->SetBufferOffset(mess->Length()+length);
397 
398  const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
399 
400  ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
401  if (!info) {
402  info = new ParallelFileMerger(filename,cache);
403  mergers.Add(info);
404  }
405 
406  if (R__NeedInitialMerge(transient)) {
407  info->InitialMerge(transient);
408  }
409  info->RegisterClient(clientId,transient);
410  if (info->NeedMerge(clientThreshold)) {
411  // Enough clients reported.
412  Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
413  info->Merge();
414  }
415  transient = 0;
416  } else if (mess->What() == kMESS_OBJECT) {
417  printf("got object of class: %s\n", mess->GetClass()->GetName());
418  } else {
419  printf("*** Unexpected message ***\n");
420  }
421 
422  delete mess;
423  }
424 
425  TIter next(&mergers);
426  ParallelFileMerger *info;
427  while ( (info = (ParallelFileMerger*)next()) ) {
428  if (info->NeedFinalMerge())
429  {
430  info->Merge();
431  }
432  }
433 
434  mergers.Delete();
435  delete mon;
436  delete ss;
437 }
Merge incrementally all type of objects.
Definition: TFileMerger.h:67
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:90
void parallelMergeServer(bool cache=false)
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:711
TServerSocket * ss
Definition: hserv2.C:30
const int kReplaceWait
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
long long Long64_t
Definition: RtypesCore.h:69
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:158
ClassImp(TSeqCollection) Int_t TSeqCollection TIter next(this)
Return index of object in collection.
float Float_t
Definition: RtypesCore.h:53
TMonitor * mon
Definition: hserv2.C:32
virtual ULong_t Hash() const
Return hash value for this object.
Definition: TObject.cxx:477
return c
virtual Bool_t IsValid() const
Definition: TSocket.h:162
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual void ReadTString(TString &s)
Read TString from TBuffer.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
virtual TList * GetList() const
Definition: TDirectory.h:157
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:45
static const char * filename()
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
Definition: TDirectory.cxx:955
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
TFile * f
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition: TFile.cxx:2229
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3851
UInt_t GetBytesSent() const
Definition: TSocket.h:149
UInt_t GetBytesRecv() const
Definition: TSocket.h:150
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:156
virtual void ReadLong64(Long64_t &l)
Definition: TBufferFile.h:483
const int kReplaceImmediately
THashTable implements a hash table to store TObject's.
Definition: THashTable.h:39
virtual void ReadInt(Int_t &i)
Definition: TBufferFile.h:456
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
#define ClassDef(name, id)
Definition: Rtypes.h:254
void Class()
Definition: Class.C:29
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
std::map< std::string, std::string >::const_iterator iter
Definition: TAlienJob.cxx:54
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=0)
Write the encoded object supported by this key.
Definition: TKey.cxx:1440
void Info(const char *location, const char *msgfmt,...)
const Double_t sigma
Merge the input file with the content of the output file (if already exising).
Definition: TFileMerger.h:62
char * Buffer() const
Definition: TBuffer.h:93
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:30
virtual void Delete(Option_t *option="")
Delete an object from the file.
Definition: TKey.cxx:531
void Error(const char *location, const char *msgfmt,...)
static Bool_t R__NeedInitialMerge(TDirectory *dir)
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual TFile * GetFile() const
Definition: TDirectory.h:155
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:388
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
R__EXTERN TSystem * gSystem
Definition: TSystem.h:545
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file.
Definition: TFile.cxx:2248
const int kIncremental
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:675
unsigned int UInt_t
Definition: RtypesCore.h:42
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:173
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:81
virtual void SaveSelf(Bool_t=kFALSE)
Definition: TDirectory.h:189
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
tuple file
Definition: fildir.py:20
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:415
double Double_t
Definition: RtypesCore.h:55
Describe directory structure in memory.
Definition: TDirectory.h:44
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
void dir(char *path=0)
Definition: rootalias.C:30
unsigned long ULong_t
Definition: RtypesCore.h:51
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:76
Int_t GetObjlen() const
Definition: TKey.h:89
void Add(TObject *obj)
Add object to the hash table.
Definition: THashTable.cxx:76
UInt_t What() const
Definition: TMessage.h:80
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2801
Mother of all ROOT objects.
Definition: TObject.h:58
Double_t AsDouble() const
Definition: TTimeStamp.h:143
Container of bits.
Definition: TBits.h:33
void Delete(Option_t *option="")
Remove all objects from the table AND delete all heap based objects.
Definition: THashTable.cxx:194
Int_t Length() const
Definition: TBuffer.h:96
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
Definition: TDirectory.cxx:336
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashTable.cxx:210
double result[121]
Bool_t InheritsFrom(const char *cl) const
Return kTRUE if this class inherits from a class with name "classname".
Definition: TClass.cxx:4498
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition: TClass.cxx:6785
Double_t Sqrt(Double_t x)
Definition: TMath.h:464
TClass * GetClass() const
Definition: TMessage.h:76
Only the objects with a MergeAfterReset member function.
Definition: TFileMerger.h:63
const Bool_t kTRUE
Definition: Rtypes.h:91
A cache when writing files over the network.