Logo ROOT   6.10/09
Reference Guide
fastMergeServer.C
Go to the documentation of this file.
1 /// \file
2 /// \ingroup tutorial_net
3 /// This script shows how to make a simple iterative server that
4 /// can receive TMemFile from multiple clients and merge them into
5 /// a single file without block.
6 ///
7 /// Note: This server assumes that the client will reset the histogram
8 /// after each upload to simplify the merging.
9 ///
10 /// This server can accept connections while handling currently open connections.
11 /// Compare this script to hserv.C that blocks on accept.
12 /// In this script a server socket is created and added to a monitor.
13 /// A monitor object is used to monitor connection requests on
14 /// the server socket. After accepting the connection
15 /// the new socket is added to the monitor and immediately ready
16 /// for use. Once two connections are accepted the server socket
17 /// is removed from the monitor and closed. The monitor continues
18 /// monitoring the sockets.
19 ///
20 /// To run this demo do the following:
21 /// - Open three windows
22 /// - Start ROOT in all three windows
23 /// - Execute in the first window: .x fastMergerServer.C
24 /// - Execute in the second and third windows: .x treeClient.C
25 ///
26 /// \macro_code
27 ///
28 /// \author Fons Rademakers
29 
30 #include "TMessage.h"
31 #include "TBenchmark.h"
32 #include "TSocket.h"
33 #include "TH2.h"
34 #include "TTree.h"
35 #include "TMemFile.h"
36 #include "TRandom.h"
37 #include "TError.h"
38 #include "TFileMerger.h"
39 
40 #include "TServerSocket.h"
41 #include "TPad.h"
42 #include "TCanvas.h"
43 #include "TMonitor.h"
44 
45 #include "TFileCacheWrite.h"
46 
47 void fastMergeServer(bool cache = false) {
48  // Open a server socket looking for connections on a named service or
49  // on a specified port.
50  //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
51  TServerSocket *ss = new TServerSocket(9090, kTRUE);
52  if (!ss->IsValid()) {
53  return;
54  }
55 
56  TMonitor *mon = new TMonitor;
57 
58  mon->Add(ss);
59 
60  UInt_t clientCount = 0;
61  TMemFile *transient = 0;
62 
63  TFileMerger merger(kFALSE,kFALSE);
64  merger.SetPrintLevel(0);
65 
66  enum StatusKind {
67  kStartConnection = 0,
68  kProtocol = 1,
69 
70  kProtocolVersion = 1
71  };
72  if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
73  while (1) {
74  TMessage *mess;
75  TSocket *s;
76 
77  s = mon->Select();
78 
79  if (s->IsA() == TServerSocket::Class()) {
80  if (clientCount > 100) {
81  printf("only accept 100 clients connections\n");
82  mon->Remove(ss);
83  ss->Close();
84  } else {
85  TSocket *client = ((TServerSocket *)s)->Accept();
86  client->Send(clientCount, kStartConnection);
87  client->Send(kProtocolVersion, kProtocol);
88  ++clientCount;
89  mon->Add(client);
90  printf("Accept %d connections\n",clientCount);
91  }
92  continue;
93  }
94 
95  s->Recv(mess);
96 
97  if (mess==0) {
98  Error("fastMergeServer","The client did not send a message\n");
99  } else if (mess->What() == kMESS_STRING) {
100  char str[64];
101  mess->ReadString(str, 64);
102  printf("Client %d: %s\n", clientCount, str);
103  mon->Remove(s);
104  printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
105  s->GetBytesSent());
106  s->Close();
107  --clientCount;
108  if (mon->GetActive() == 0 || clientCount == 0) {
109  printf("No more active clients... stopping\n");
110  break;
111  }
112  } else if (mess->What() == kMESS_ANY) {
113 
114  Long64_t length;
115  TString filename;
116  Int_t clientId;
117  mess->ReadInt(clientId);
118  mess->ReadTString(filename);
119  mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
120 
121  Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
122 
123  delete transient;
124  transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
125  mess->SetBufferOffset(mess->Length()+length);
126  merger.OutputFile(filename,"UPDATE");
127  merger.AddAdoptFile(transient);
128 
129  merger.PartialMerge(TFileMerger::kAllIncremental);
130  transient = 0;
131  } else if (mess->What() == kMESS_OBJECT) {
132  printf("got object of class: %s\n", mess->GetClass()->GetName());
133  } else {
134  printf("*** Unexpected message ***\n");
135  }
136 
137  delete mess;
138  }
139 }
Merge incrementally all type of objects.
Definition: TFileMerger.h:62
TClass * GetClass() const
Definition: TMessage.h:70
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:47
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:88
virtual Bool_t IsValid() const
Definition: TSocket.h:146
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
long long Long64_t
Definition: RtypesCore.h:69
UInt_t GetBytesRecv() const
Definition: TSocket.h:134
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual void ReadTString(TString &s)
Read TString from TBuffer.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Basic string class.
Definition: TString.h:129
int Int_t
Definition: RtypesCore.h:41
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:17
Int_t Length() const
Definition: TBuffer.h:94
virtual void ReadLong64(Long64_t &l)
Definition: TBufferFile.h:483
virtual void ReadInt(Int_t &i)
Definition: TBufferFile.h:456
void Class()
Definition: Class.C:29
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
char * Buffer() const
Definition: TBuffer.h:91
void Info(const char *location, const char *msgfmt,...)
void Error(const char *location, const char *msgfmt,...)
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:388
UInt_t GetBytesSent() const
Definition: TSocket.h:133
This class provides file copy and merging services.
Definition: TFileMerger.h:24
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:92
UInt_t What() const
Definition: TMessage.h:74
const Bool_t kTRUE
Definition: RtypesCore.h:91
A cache when writing files over the network.
const char * Data() const
Definition: TString.h:347