Logo ROOT  
Reference Guide
parallelMergeServer.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_net
3/// This script shows how to make a simple iterative server that
4/// can accept connections while handling currently open connections.
5/// Compare this script to hserv.C that blocks on accept.
6/// In this script a server socket is created and added to a monitor.
7/// A monitor object is used to monitor connection requests on
8/// the server socket. After accepting the connection
9/// the new socket is added to the monitor and immediately ready
10/// for use. Once two connections are accepted the server socket
11/// is removed from the monitor and closed. The monitor continues
12/// monitoring the sockets.
13///
14/// To run this demo do the following:
15/// - Open three windows
16/// - Start ROOT in all three windows
17/// - Execute in the first window: .x hserv2.C
18/// - Execute in the second and third windows: .x hclient.C
19///
20/// \macro_code
21///
22/// \author Fons Rademakers
23
24#include "TMessage.h"
25#include "TBenchmark.h"
26#include "TSocket.h"
27#include "TH2.h"
28#include "TTree.h"
29#include "TMemFile.h"
30#include "TRandom.h"
31#include "TError.h"
32#include "TFileMerger.h"
33
34#include "TServerSocket.h"
35#include "TPad.h"
36#include "TCanvas.h"
37#include "TMonitor.h"
38
39#include "TFileCacheWrite.h"
40#include "TSystem.h"
41#include "THashTable.h"
42
43#include "TMath.h"
44#include "TTimeStamp.h"
45
46const int kIncremental = 0;
47const int kReplaceImmediately = 1;
48const int kReplaceWait = 2;
49
50#include "TKey.h"
51static Bool_t R__NeedInitialMerge(TDirectory *dir)
52{
53
54 if (dir==0) return kFALSE;
55
56 TIter nextkey(dir->GetListOfKeys());
57 TKey *key;
58 while( (key = (TKey*)nextkey()) ) {
61 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
62 if (!subdir) {
63 subdir = (TDirectory *)key->ReadObj();
64 }
65 if (R__NeedInitialMerge(subdir)) {
66 return kTRUE;
67 }
68 } else {
69 if (0 != cl->GetResetAfterMerge()) {
70 return kTRUE;
71 }
72 }
73 }
74 return kFALSE;
75}
76
77static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
78{
79 if (dir==0) return;
80
81 TIter nextkey(dir->GetListOfKeys());
82 TKey *key;
83 while( (key = (TKey*)nextkey()) ) {
86 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
87 if (!subdir) {
88 subdir = (TDirectory *)key->ReadObj();
89 }
90 R__DeleteObject(subdir,withReset);
91 } else {
92 Bool_t todelete = kFALSE;
93 if (withReset) {
94 todelete = (0 != cl->GetResetAfterMerge());
95 } else {
96 todelete = (0 == cl->GetResetAfterMerge());
97 }
98 if (todelete) {
99 key->Delete();
100 dir->GetListOfKeys()->Remove(key);
101 delete key;
102 }
103 }
104 }
105}
106
107static void R__MigrateKey(TDirectory *destination, TDirectory *source)
108{
109 if (destination==0 || source==0) return;
110
111 TIter nextkey(source->GetListOfKeys());
112 TKey *key;
113 while( (key = (TKey*)nextkey()) ) {
115 if (cl->InheritsFrom(TDirectory::Class())) {
116 TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
117 if (!source_subdir) {
118 source_subdir = (TDirectory *)key->ReadObj();
119 }
120 TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
121 if (!destination_subdir) {
122 destination_subdir = destination->mkdir(key->GetName());
123 }
124 R__MigrateKey(destination,source);
125 } else {
126 TKey *oldkey = destination->GetKey(key->GetName());
127 if (oldkey) {
128 oldkey->Delete();
129 delete oldkey;
130 }
131 TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
132 destination->GetFile()->SumBuffer(newkey->GetObjlen());
133 newkey->WriteFile(0);
134 if (destination->GetFile()->TestBit(TFile::kWriteError)) {
135 return;
136 }
137 }
138 }
139 destination->SaveSelf();
140}
141
142struct ClientInfo
143{
144 TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
145 TString fLocalName;
146 UInt_t fContactsCount;
147 TTimeStamp fLastContact;
148 Double_t fTimeSincePrevContact;
149
150 ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151 ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
152 fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
153 }
154
155 void Set(TFile *file)
156 {
157 // Register the new file as coming from this client.
158 if (file != fFile) {
159 // We need to keep any of the keys from the previous file that
160 // are not in the new file.
161 if (fFile) {
162 R__MigrateKey(fFile,file);
163 // delete the previous memory file (if any)
164 delete file;
165 } else {
166 fFile = file;
167 }
168 }
169 TTimeStamp now;
170 fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
171 fLastContact = now;
172 ++fContactsCount;
173 }
174};
175
176struct ParallelFileMerger : public TObject
177{
178 typedef std::vector<ClientInfo> ClientColl_t;
179
180 TString fFilename;
181 TBits fClientsContact; //
182 UInt_t fNClientsContact; //
183 ClientColl_t fClients;
184 TTimeStamp fLastMerge;
185 TFileMerger fMerger;
186
187 ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
188 {
189 // Default constructor.
190
191 fMerger.SetPrintLevel(0);
192 fMerger.OutputFile(filename,"RECREATE");
193 if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
194 }
195
196 ~ParallelFileMerger()
197 {
198 // Destructor.
199
200 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
201 fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
202 }
203 for( ClientColl_t::iterator iter = fClients.begin();
204 iter != fClients.end();
205 ++iter)
206 {
207 delete iter->fFile;
208 }
209 }
210
211 ULong_t Hash() const
212 {
213 // Return hash value for this object.
214 return fFilename.Hash();
215 }
216
217 const char *GetName() const
218 {
219 // Return the name of the object which is the name of the output file.
220 return fFilename;
221 }
222
223 Bool_t InitialMerge(TFile *input)
224 {
225 // Initial merge of the input to copy the resetable object (TTree) into the output
226 // and remove them from the input file.
227
228 fMerger.AddFile(input);
229
231
232 R__DeleteObject(input,kTRUE);
233 return result;
234 }
235
236 Bool_t Merge()
237 {
238 // Merge the current inputs into the output file.
239
240 R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
241 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
242 fMerger.AddFile(fClients[f].fFile);
243 }
245
246 // Remove any 'resetable' object (like TTree) from the input file so that they will not
247 // be re-merged. Keep only the object that always need to be re-merged (Histograms).
248 for(unsigned int f = 0 ; f < fClients.size(); ++f) {
249 if (fClients[f].fFile) {
250 R__DeleteObject(fClients[f].fFile,kTRUE);
251 } else {
252 // We back up the file (probably due to memory constraint)
253 TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
254 R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
255 file->Write();
256 delete file;
257 }
258 }
259 fLastMerge = TTimeStamp();
260 fNClientsContact = 0;
261 fClientsContact.Clear();
262
263 return result;
264 }
265
266 Bool_t NeedFinalMerge()
267 {
268 // Return true, if there is any data that has not been merged.
269
270 return fClientsContact.CountBits() > 0;
271 }
272
273 Bool_t NeedMerge(Float_t clientThreshold)
274 {
275 // Return true, if enough client have reported
276
277 if (fClients.size()==0) {
278 return kFALSE;
279 }
280
281 // Calculate average and rms of the time between the last 2 contacts.
282 Double_t sum = 0;
283 Double_t sum2 = 0;
284 for(unsigned int c = 0 ; c < fClients.size(); ++c) {
285 sum += fClients[c].fTimeSincePrevContact;
286 sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
287 }
288 Double_t avg = sum / fClients.size();
289 Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
290 Double_t target = avg + 2*sigma;
291 TTimeStamp now;
292 if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
293// Float_t cut = clientThreshold * fClients.size();
294// if (!(fClientsContact.CountBits() > cut )) {
295// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
296// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
297// }
298// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
299// }
300 return kTRUE;
301 }
302 Float_t cut = clientThreshold * fClients.size();
303 return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
304 }
305
306 void RegisterClient(UInt_t clientId, TFile *file)
307 {
308 // Register that a client has sent a file.
309
310 ++fNClientsContact;
311 fClientsContact.SetBitNumber(clientId);
312 if (fClients.size() < clientId+1) {
313 fClients.push_back( ClientInfo(fFilename,clientId) );
314 }
315 fClients[clientId].Set(file);
316 }
317
318 ClassDef(ParallelFileMerger,0);
319};
320
321void parallelMergeServer(bool cache = false) {
322 // Open a server socket looking for connections on a named service or
323 // on a specified port.
324 //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
325 TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
326 if (!ss->IsValid()) {
327 return;
328 }
329
330 TMonitor *mon = new TMonitor;
331
332 mon->Add(ss);
333
334 UInt_t clientCount = 0;
335 UInt_t clientIndex = 0;
336
337 THashTable mergers;
338
339 enum StatusKind {
340 kStartConnection = 0,
341 kProtocol = 1,
342
343 kProtocolVersion = 1
344 };
345
346 printf("fastMergeServerHist ready to accept connections\n");
347 while (1) {
348 TMessage *mess;
349 TSocket *s;
350
351 // NOTE: this needs to be update to handle the case where the client
352 // dies.
353 s = mon->Select();
354
355 if (s->IsA() == TServerSocket::Class()) {
356 if (clientCount > 100) {
357 printf("only accept 100 clients connections\n");
358 mon->Remove(ss);
359 ss->Close();
360 } else {
361 TSocket *client = ((TServerSocket *)s)->Accept();
362 client->Send(clientIndex, kStartConnection);
363 client->Send(kProtocolVersion, kProtocol);
364 ++clientCount;
365 ++clientIndex;
366 mon->Add(client);
367 printf("Accept %d connections\n",clientCount);
368 }
369 continue;
370 }
371
372 s->Recv(mess);
373
374 if (mess==0) {
375 Error("fastMergeServer","The client did not send a message\n");
376 } else if (mess->What() == kMESS_STRING) {
377 char str[64];
378 mess->ReadString(str, 64);
379 printf("Client %d: %s\n", clientCount, str);
380 mon->Remove(s);
381 printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
382 s->GetBytesSent());
383 s->Close();
384 --clientCount;
385 if (mon->GetActive() == 0 || clientCount == 0) {
386 printf("No more active clients... stopping\n");
387 break;
388 }
389 } else if (mess->What() == kMESS_ANY) {
390
391 Long64_t length;
392 TString filename;
393 Int_t clientId;
394 mess->ReadInt(clientId);
395 mess->ReadTString(filename);
396 mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
397
398 // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
399
400 TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
401 mess->SetBufferOffset(mess->Length()+length);
402
403 const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
404
405 ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
406 if (!info) {
407 info = new ParallelFileMerger(filename,cache);
408 mergers.Add(info);
409 }
410
411 if (R__NeedInitialMerge(transient)) {
412 info->InitialMerge(transient);
413 }
414 info->RegisterClient(clientId,transient);
415 if (info->NeedMerge(clientThreshold)) {
416 // Enough clients reported.
417 Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
418 info->Merge();
419 }
420 transient = 0;
421 } else if (mess->What() == kMESS_OBJECT) {
422 printf("got object of class: %s\n", mess->GetClass()->GetName());
423 } else {
424 printf("*** Unexpected message ***\n");
425 }
426
427 delete mess;
428 }
429
430 TIter next(&mergers);
431 ParallelFileMerger *info;
432 while ( (info = (ParallelFileMerger*)next()) ) {
433 if (info->NeedFinalMerge())
434 {
435 info->Merge();
436 }
437 }
438
439 mergers.Delete();
440 delete mon;
441 delete ss;
442}
void Class()
Definition: Class.C:29
@ kMESS_STRING
Definition: MessageTypes.h:34
@ kMESS_ANY
Definition: MessageTypes.h:31
@ kMESS_OBJECT
Definition: MessageTypes.h:35
#define f(i)
Definition: RSha256.hxx:104
#define c(i)
Definition: RSha256.hxx:101
int Int_t
Definition: RtypesCore.h:43
unsigned int UInt_t
Definition: RtypesCore.h:44
const Bool_t kFALSE
Definition: RtypesCore.h:90
unsigned long ULong_t
Definition: RtypesCore.h:53
bool Bool_t
Definition: RtypesCore.h:61
double Double_t
Definition: RtypesCore.h:57
long long Long64_t
Definition: RtypesCore.h:71
float Float_t
Definition: RtypesCore.h:55
const Bool_t kTRUE
Definition: RtypesCore.h:89
#define ClassDef(name, id)
Definition: Rtypes.h:322
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
R__EXTERN TSystem * gSystem
Definition: TSystem.h:556
Container of bits.
Definition: TBits.h:27
void Clear(Option_t *option="")
Clear the value.
Definition: TBits.cxx:84
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
Definition: TBits.cxx:118
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:206
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
Definition: TBufferFile.h:398
void ReadLong64(Long64_t &l) override
Definition: TBufferFile.h:425
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:92
Int_t Length() const
Definition: TBuffer.h:99
char * Buffer() const
Definition: TBuffer.h:95
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition: TClass.h:80
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Bool_t InheritsFrom(const char *cl) const
Return kTRUE if this class inherits from a class with name "classname".
Definition: TClass.cxx:4837
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition: TClass.cxx:2948
Describe directory structure in memory.
Definition: TDirectory.h:40
virtual TList * GetList() const
Definition: TDirectory.h:165
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
Definition: TDirectory.cxx:401
virtual TFile * GetFile() const
Definition: TDirectory.h:163
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition: TDirectory.h:164
virtual void SaveSelf(Bool_t=kFALSE)
Definition: TDirectory.h:197
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
Definition: TDirectory.h:166
A cache when writing files over the network.
This class provides file copy and merging services.
Definition: TFileMerger.h:32
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
Definition: TFileMerger.h:94
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition: TFileMerger.h:90
@ kIncremental
Merge the input file with the content of the output file (if already exising).
Definition: TFileMerger.h:73
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition: TFileMerger.h:74
@ kAllIncremental
Merge incrementally all type of objects.
Definition: TFileMerger.h:79
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition: TFile.h:53
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition: TFile.cxx:2278
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition: TFile.cxx:3942
@ kWriteError
Definition: TFile.h:185
THashTable implements a hash table to store TObject's.
Definition: THashTable.h:35
void Add(TObject *obj)
Add object to the hash table.
Definition: THashTable.cxx:92
TObject * FindObject(const char *name) const
Find object using its name.
Definition: THashTable.cxx:238
void Delete(Option_t *option="")
Remove all objects from the table AND delete all heap based objects.
Definition: THashTable.cxx:220
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition: TKey.h:28
virtual void Delete(Option_t *option="")
Delete an object from the file.
Definition: TKey.cxx:538
Int_t GetObjlen() const
Definition: TKey.h:88
virtual const char * GetClassName() const
Definition: TKey.h:76
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=0)
Write the encoded object supported by this key.
Definition: TKey.cxx:1438
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition: TKey.cxx:738
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:821
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:577
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
UInt_t What() const
Definition: TMessage.h:75
TClass * GetClass() const
Definition: TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
Mother of all ROOT objects.
Definition: TObject.h:37
virtual const char * GetName() const
Returns name of object.
Definition: TObject.cxx:357
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition: TObject.h:187
virtual ULong_t Hash() const
Return hash value for this object.
Definition: TObject.cxx:433
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:389
virtual Bool_t IsValid() const
Definition: TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:522
Basic string class.
Definition: TString.h:131
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition: TString.cxx:638
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2289
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:705
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition: TTimeStamp.h:71
Double_t AsDouble() const
Definition: TTimeStamp.h:138
const Double_t sigma
static const std::string transient("transient")
static constexpr double s
Double_t Sqrt(Double_t x)
Definition: TMath.h:681
Definition: file.py:1
static long int sum(long int i)
Definition: Factory.cxx:2275