Logo ROOT   6.07/09
Reference Guide
parallelMergeServer.C
Go to the documentation of this file.
1 /// \file
2 /// \ingroup tutorial_net
3 /// This script shows how to make a simple iterative server that
4 /// can accept connections while handling currently open connections.
5 /// Compare this script to hserv.C that blocks on accept.
6 /// In this script a server socket is created and added to a monitor.
7 /// A monitor object is used to monitor connection requests on
8 /// the server socket. After accepting the connection
9 /// the new socket is added to the monitor and immediately ready
10 /// for use. Once two connections are accepted the server socket
11 /// is removed from the monitor and closed. The monitor continues
12 /// monitoring the sockets.
13 ///
14 /// To run this demo do the following:
15 /// - Open three windows
16 /// - Start ROOT in all three windows
17 /// - Execute in the first window: .x hserv2.C
18 /// - Execute in the second and third windows: .x hclient.C
19 ///
20 /// \macro_code
21 ///
22 /// \author Fons Rademakers
23 
24 #include "TMessage.h"
25 #include "TBenchmark.h"
26 #include "TSocket.h"
27 #include "TH2.h"
28 #include "TTree.h"
29 #include "TMemFile.h"
30 #include "TRandom.h"
31 #include "TError.h"
32 #include "TFileMerger.h"
33 
34 #include "TServerSocket.h"
35 #include "TPad.h"
36 #include "TCanvas.h"
37 #include "TMonitor.h"
38 
39 #include "TFileCacheWrite.h"
40 #include "TSystem.h"
41 #include "THashTable.h"
42 
43 #include "TMath.h"
44 #include "TTimeStamp.h"
45 
46 const int kIncremental = 0;
47 const int kReplaceImmediately = 1;
48 const int kReplaceWait = 2;
49 
50 #include "TKey.h"
51 static Bool_t R__NeedInitialMerge(TDirectory *dir)
52 {
53 
54  if (dir==0) return kFALSE;
55 
56  TIter nextkey(dir->GetListOfKeys());
57  TKey *key;
58  while( (key = (TKey*)nextkey()) ) {
59  TClass *cl = TClass::GetClass(key->GetClassName());
60  if (cl->InheritsFrom(TDirectory::Class())) {
61  TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
62  if (!subdir) {
63  subdir = (TDirectory *)key->ReadObj();
64  }
65  if (R__NeedInitialMerge(subdir)) {
66  return kTRUE;
67  }
68  } else {
69  if (0 != cl->GetResetAfterMerge()) {
70  return kTRUE;
71  }
72  }
73  }
74  return kFALSE;
75 }
76 
77 static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
78 {
79  if (dir==0) return;
80 
81  TIter nextkey(dir->GetListOfKeys());
82  TKey *key;
83  while( (key = (TKey*)nextkey()) ) {
84  TClass *cl = TClass::GetClass(key->GetClassName());
85  if (cl->InheritsFrom(TDirectory::Class())) {
86  TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
87  if (!subdir) {
88  subdir = (TDirectory *)key->ReadObj();
89  }
90  R__DeleteObject(subdir,withReset);
91  } else {
92  Bool_t todelete = kFALSE;
93  if (withReset) {
94  todelete = (0 != cl->GetResetAfterMerge());
95  } else {
96  todelete = (0 == cl->GetResetAfterMerge());
97  }
98  if (todelete) {
99  key->Delete();
100  dir->GetListOfKeys()->Remove(key);
101  delete key;
102  }
103  }
104  }
105 }
106 
107 static void R__MigrateKey(TDirectory *destination, TDirectory *source)
108 {
109  if (destination==0 || source==0) return;
110 
111  TIter nextkey(source->GetListOfKeys());
112  TKey *key;
113  while( (key = (TKey*)nextkey()) ) {
114  TClass *cl = TClass::GetClass(key->GetClassName());
115  if (cl->InheritsFrom(TDirectory::Class())) {
116  TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
117  if (!source_subdir) {
118  source_subdir = (TDirectory *)key->ReadObj();
119  }
120  TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
121  if (!destination_subdir) {
122  destination_subdir = destination->mkdir(key->GetName());
123  }
124  R__MigrateKey(destination,source);
125  } else {
126  TKey *oldkey = destination->GetKey(key->GetName());
127  if (oldkey) {
128  oldkey->Delete();
129  delete oldkey;
130  }
131  TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
132  destination->GetFile()->SumBuffer(newkey->GetObjlen());
133  newkey->WriteFile(0);
134  if (destination->GetFile()->TestBit(TFile::kWriteError)) {
135  return;
136  }
137  }
138  }
139  destination->SaveSelf();
140 }
141 
142 struct ClientInfo
143 {
144  TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
145  TString fLocalName;
146  UInt_t fContactsCount;
147  TTimeStamp fLastContact;
148  Double_t fTimeSincePrevContact;
149 
150  ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151  ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
152  fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
153  }
154 
155  void Set(TFile *file)
156  {
157  // Register the new file as coming from this client.
158  if (file != fFile) {
159  // We need to keep any of the keys from the previous file that
160  // are not in the new file.
161  if (fFile) {
162  R__MigrateKey(fFile,file);
163  // delete the previous memory file (if any)
164  delete file;
165  } else {
166  fFile = file;
167  }
168  }
169  TTimeStamp now;
170  fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
171  fLastContact = now;
172  ++fContactsCount;
173  }
174 };
175 
176 struct ParallelFileMerger : public TObject
177 {
178  typedef std::vector<ClientInfo> ClientColl_t;
179 
180  TString fFilename;
181  TBits fClientsContact; //
182  UInt_t fNClientsContact; //
183  ClientColl_t fClients;
184  TTimeStamp fLastMerge;
185  TFileMerger fMerger;
186 
187  ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
188  {
189  // Default constructor.
190 
191  fMerger.SetPrintLevel(0);
192  fMerger.OutputFile(filename,"RECREATE");
193  if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
194  }
195 
196  ~ParallelFileMerger()
197  {
198  // Destructor.
199 
200  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
201  fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
202  }
203  for( ClientColl_t::iterator iter = fClients.begin();
204  iter != fClients.end();
205  ++iter)
206  {
207  delete iter->fFile;
208  }
209  }
210 
211  ULong_t Hash() const
212  {
213  // Return hash value for this object.
214  return fFilename.Hash();
215  }
216 
217  const char *GetName() const
218  {
219  // Return the name of the object which is the name of the output file.
220  return fFilename;
221  }
222 
223  Bool_t InitialMerge(TFile *input)
224  {
225  // Initial merge of the input to copy the resetable object (TTree) into the output
226  // and remove them from the input file.
227 
228  fMerger.AddFile(input);
229 
231 
232  R__DeleteObject(input,kTRUE);
233  return result;
234  }
235 
236  Bool_t Merge()
237  {
238  // Merge the current inputs into the output file.
239 
240  R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
241  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
242  fMerger.AddFile(fClients[f].fFile);
243  }
245 
246  // Remove any 'resetable' object (like TTree) from the input file so that they will not
247  // be re-merged. Keep only the object that always need to be re-merged (Histograms).
248  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
249  if (fClients[f].fFile) {
250  R__DeleteObject(fClients[f].fFile,kTRUE);
251  } else {
252  // We back up the file (probably due to memory constraint)
253  TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
254  R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
255  file->Write();
256  delete file;
257  }
258  }
259  fLastMerge = TTimeStamp();
260  fNClientsContact = 0;
261  fClientsContact.Clear();
262 
263  return result;
264  }
265 
266  Bool_t NeedFinalMerge()
267  {
268  // Return true, if there is any data that has not been merged.
269 
270  return fClientsContact.CountBits() > 0;
271  }
272 
273  Bool_t NeedMerge(Float_t clientThreshold)
274  {
275  // Return true, if enough client have reported
276 
277  if (fClients.size()==0) {
278  return kFALSE;
279  }
280 
281  // Calculate average and rms of the time between the last 2 contacts.
282  Double_t sum = 0;
283  Double_t sum2 = 0;
284  for(unsigned int c = 0 ; c < fClients.size(); ++c) {
285  sum += fClients[c].fTimeSincePrevContact;
286  sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
287  }
288  Double_t avg = sum / fClients.size();
289  Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
290  Double_t target = avg + 2*sigma;
291  TTimeStamp now;
292  if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
293 // Float_t cut = clientThreshold * fClients.size();
294 // if (!(fClientsContact.CountBits() > cut )) {
295 // for(unsigned int c = 0 ; c < fClients.size(); ++c) {
296 // fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
297 // }
298 // fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
299 // }
300  return kTRUE;
301  }
302  Float_t cut = clientThreshold * fClients.size();
303  return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
304  }
305 
306  void RegisterClient(UInt_t clientId, TFile *file)
307  {
308  // Register that a client has sent a file.
309 
310  ++fNClientsContact;
311  fClientsContact.SetBitNumber(clientId);
312  if (fClients.size() < clientId+1) {
313  fClients.push_back( ClientInfo(fFilename,clientId) );
314  }
315  fClients[clientId].Set(file);
316  }
317 
318  ClassDef(ParallelFileMerger,0);
319 };
320 
321 void parallelMergeServer(bool cache = false) {
322  // Open a server socket looking for connections on a named service or
323  // on a specified port.
324  //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
325  TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
326  if (!ss->IsValid()) {
327  return;
328  }
329 
330  TMonitor *mon = new TMonitor;
331 
332  mon->Add(ss);
333 
334  UInt_t clientCount = 0;
335  UInt_t clientIndex = 0;
336 
337  THashTable mergers;
338 
339  enum StatusKind {
340  kStartConnection = 0,
341  kProtocol = 1,
342 
343  kProtocolVersion = 1
344  };
345 
346  printf("fastMergeServerHist ready to accept connections\n");
347  while (1) {
348  TMessage *mess;
349  TSocket *s;
350 
351  // NOTE: this needs to be update to handle the case where the client
352  // dies.
353  s = mon->Select();
354 
355  if (s->IsA() == TServerSocket::Class()) {
356  if (clientCount > 100) {
357  printf("only accept 100 clients connections\n");
358  mon->Remove(ss);
359  ss->Close();
360  } else {
361  TSocket *client = ((TServerSocket *)s)->Accept();
362  client->Send(clientIndex, kStartConnection);
363  client->Send(kProtocolVersion, kProtocol);
364  ++clientCount;
365  ++clientIndex;
366  mon->Add(client);
367  printf("Accept %d connections\n",clientCount);
368  }
369  continue;
370  }
371 
372  s->Recv(mess);
373 
374  if (mess==0) {
375  Error("fastMergeServer","The client did not send a message\n");
376  } else if (mess->What() == kMESS_STRING) {
377  char str[64];
378  mess->ReadString(str, 64);
379  printf("Client %d: %s\n", clientCount, str);
380  mon->Remove(s);
381  printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
382  s->GetBytesSent());
383  s->Close();
384  --clientCount;
385  if (mon->GetActive() == 0 || clientCount == 0) {
386  printf("No more active clients... stopping\n");
387  break;
388  }
389  } else if (mess->What() == kMESS_ANY) {
390 
391  Long64_t length;
392  TString filename;
393  Int_t clientId;
394  mess->ReadInt(clientId);
395  mess->ReadTString(filename);
396  mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
397 
398  // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
399 
400  TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
401  mess->SetBufferOffset(mess->Length()+length);
402 
403  const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
404 
405  ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
406  if (!info) {
407  info = new ParallelFileMerger(filename,cache);
408  mergers.Add(info);
409  }
410 
411  if (R__NeedInitialMerge(transient)) {
412  info->InitialMerge(transient);
413  }
414  info->RegisterClient(clientId,transient);
415  if (info->NeedMerge(clientThreshold)) {
416  // Enough clients reported.
417  Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
418  info->Merge();
419  }
420  transient = 0;
421  } else if (mess->What() == kMESS_OBJECT) {
422  printf("got object of class: %s\n", mess->GetClass()->GetName());
423  } else {
424  printf("*** Unexpected message ***\n");
425  }
426 
427  delete mess;
428  }
429 
430  TIter next(&mergers);
431  ParallelFileMerger *info;
432  while ( (info = (ParallelFileMerger*)next()) ) {
433  if (info->NeedFinalMerge())
434  {
435  info->Merge();
436  }
437  }
438 
439  mergers.Delete();
440  delete mon;
441  delete ss;
442 }
Merge incrementally all type of objects.
Definition: TFileMerger.h:68
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:90
static long int sum(long int i)
Definition: Factory.cxx:1785
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:712
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
void Clear(Option_t *option="")
Clear the value.
Definition: TBits.cxx:81
long long Long64_t
Definition: RtypesCore.h:69
UInt_t Hash(const TString &s)
Definition: TString.h:459
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:158
float Float_t
Definition: RtypesCore.h:53
return c
virtual Bool_t IsValid() const
Definition: TSocket.h:162
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual void ReadTString(TString &s)
Read TString from TBuffer.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
virtual TList * GetList() const
Definition: TDirectory.h:157
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format...
Definition: TFile.h:50
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor&#39;s active list.
Definition: TMonitor.cxx:168
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
virtual TDirectory * mkdir(const char *name, const char *title="")
Create a sub-directory and return a pointer to the created directory.
Definition: TDirectory.cxx:957
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:497
const char * Class
Definition: TXMLSetup.cxx:64
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition: TFile.cxx:2249
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=1, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3871
UInt_t GetBytesSent() const
Definition: TSocket.h:149
UInt_t GetBytesRecv() const
Definition: TSocket.h:150
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:156
virtual void ReadLong64(Long64_t &l)
Definition: TBufferFile.h:483
THashTable implements a hash table to store TObject&#39;s.
Definition: THashTable.h:39
virtual void ReadInt(Int_t &i)
Definition: TBufferFile.h:456
#define ClassDef(name, id)
Definition: Rtypes.h:254
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=0)
Write the encoded object supported by this key.
Definition: TKey.cxx:1440
void Info(const char *location, const char *msgfmt,...)
const Double_t sigma
Merge the input file with the content of the output file (if already exising).
Definition: TFileMerger.h:63
char * Buffer() const
Definition: TBuffer.h:93
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:30
virtual void Delete(Option_t *option="")
Delete an object from the file.
Definition: TKey.cxx:531
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
void Error(const char *location, const char *msgfmt,...)
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
Definition: TBits.cxx:115
virtual TFile * GetFile() const
Definition: TDirectory.h:155
TFile * GetOutputFile() const
Definition: TFileMerger.h:82
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:388
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file.
Definition: TFile.cxx:2268
This class provides file copy and merging services.
Definition: TFileMerger.h:30
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:675
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2322
unsigned int UInt_t
Definition: RtypesCore.h:42
Bool_t TestBit(UInt_t f) const
Definition: TObject.h:159
void SetPrintLevel(Int_t level)
Definition: TFileMerger.h:78
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
The ROOT global object gROOT contains a list of all defined classes.
Definition: TClass.h:81
virtual void SaveSelf(Bool_t=kFALSE)
Definition: TDirectory.h:189
double f(double x)
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
double Double_t
Definition: RtypesCore.h:55
Describe directory structure in memory.
Definition: TDirectory.h:44
unsigned long ULong_t
Definition: RtypesCore.h:51
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:76
Int_t GetObjlen() const
Definition: TKey.h:89
void Add(TObject *obj)
Add object to the hash table.
Definition: THashTable.cxx:76
UInt_t What() const
Definition: TMessage.h:80
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2882
Mother of all ROOT objects.
Definition: TObject.h:44
Double_t AsDouble() const
Definition: TTimeStamp.h:143
Container of bits.
Definition: TBits.h:33
void Delete(Option_t *option="")
Remove all objects from the table AND delete all heap based objects.
Definition: THashTable.cxx:194
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Int_t Length() const
Definition: TBuffer.h:96
Definition: file.py:1
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
Definition: TDirectory.cxx:338
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashTable.cxx:210
double result[121]
Bool_t InheritsFrom(const char *cl) const
Return kTRUE if this class inherits from a class with name "classname".
Definition: TClass.cxx:4598
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Double_t Sqrt(Double_t x)
Definition: TMath.h:464
TClass * GetClass() const
Definition: TMessage.h:76
Only the objects with a MergeAfterReset member function.
Definition: TFileMerger.h:64
const Bool_t kTRUE
Definition: Rtypes.h:91
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:198
A cache when writing files over the network.
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition: TString.cxx:606