ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
fastMergeServer.C
Go to the documentation of this file.
1 #include "TMessage.h"
2 #include "TBenchmark.h"
3 #include "TSocket.h"
4 #include "TH2.h"
5 #include "TTree.h"
6 #include "TMemFile.h"
7 #include "TRandom.h"
8 #include "TError.h"
9 #include "TFileMerger.h"
10 
11 #include "TServerSocket.h"
12 #include "TPad.h"
13 #include "TCanvas.h"
14 #include "TMonitor.h"
15 
16 #include "TFileCacheWrite.h"
17 
18 void fastMergeServer(bool cache = false) {
19  // This script shows how to make a simple iterative server that
20  // can receive TMemFile from multiple clients and merge them into
21  // a single file without block.
22  //
23  // Note: This server assumes that the client will reset the histogram
24  // after each upload to simplify the merging.
25  //
26  // This server can accept connections while handling currently open connections.
27  // Compare this script to hserv.C that blocks on accept.
28  // In this script a server socket is created and added to a monitor.
29  // A monitor object is used to monitor connection requests on
30  // the server socket. After accepting the connection
31  // the new socket is added to the monitor and immediately ready
32  // for use. Once two connections are accepted the server socket
33  // is removed from the monitor and closed. The monitor continues
34  // monitoring the sockets.
35  //
36  // To run this demo do the following:
37  // - Open three windows
38  // - Start ROOT in all three windows
39  // - Execute in the first window: .x fastMergerServer.C
40  // - Execute in the second and third windows: .x treeClient.C
41  //Author: Fons Rademakers
42 
43  // Open a server socket looking for connections on a named service or
44  // on a specified port.
45  //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
46  TServerSocket *ss = new TServerSocket(9090, kTRUE);
47  if (!ss->IsValid()) {
48  return;
49  }
50 
51  TMonitor *mon = new TMonitor;
52 
53  mon->Add(ss);
54 
55  UInt_t clientCount = 0;
56  TMemFile *transient = 0;
57 
58  TFileMerger merger(kFALSE,kFALSE);
59  merger.SetPrintLevel(0);
60 
61  enum StatusKind {
62  kStartConnection = 0,
63  kProtocol = 1,
64 
65  kProtocolVersion = 1
66  };
67  if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
68  while (1) {
69  TMessage *mess;
70  TSocket *s;
71 
72  s = mon->Select();
73 
74  if (s->IsA() == TServerSocket::Class()) {
75  if (clientCount > 100) {
76  printf("only accept 100 clients connections\n");
77  mon->Remove(ss);
78  ss->Close();
79  } else {
80  TSocket *client = ((TServerSocket *)s)->Accept();
81  client->Send(clientCount, kStartConnection);
82  client->Send(kProtocolVersion, kProtocol);
83  ++clientCount;
84  mon->Add(client);
85  printf("Accept %d connections\n",clientCount);
86  }
87  continue;
88  }
89 
90  s->Recv(mess);
91 
92  if (mess==0) {
93  Error("fastMergeServer","The client did not send a message\n");
94  } else if (mess->What() == kMESS_STRING) {
95  char str[64];
96  mess->ReadString(str, 64);
97  printf("Client %d: %s\n", clientCount, str);
98  mon->Remove(s);
99  printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
100  s->GetBytesSent());
101  s->Close();
102  --clientCount;
103  if (mon->GetActive() == 0 || clientCount == 0) {
104  printf("No more active clients... stopping\n");
105  break;
106  }
107  } else if (mess->What() == kMESS_ANY) {
108 
111  Int_t clientId;
112  mess->ReadInt(clientId);
113  mess->ReadTString(filename);
114  mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
115 
116  Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
117 
118  delete transient;
119  transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
120  mess->SetBufferOffset(mess->Length()+length);
121  merger.OutputFile(filename,"UPDATE");
122  merger.AddAdoptFile(transient);
123 
125  transient = 0;
126  } else if (mess->What() == kMESS_OBJECT) {
127  printf("got object of class: %s\n", mess->GetClass()->GetName());
128  } else {
129  printf("*** Unexpected message ***\n");
130  }
131 
132  delete mess;
133  }
134 }
Merge incrementally all type of objects.
Definition: TFileMerger.h:67
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:90
TServerSocket * ss
Definition: hserv2.C:30
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition: TMonitor.cxx:214
long long Long64_t
Definition: RtypesCore.h:69
TMonitor * mon
Definition: hserv2.C:32
virtual Bool_t IsValid() const
Definition: TSocket.h:162
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
virtual void ReadTString(TString &s)
Read TString from TBuffer.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
void fastMergeServer(bool cache=false)
static const char * filename()
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition: TMonitor.cxx:168
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
const Bool_t kFALSE
Definition: Rtypes.h:92
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
UInt_t GetBytesSent() const
Definition: TSocket.h:149
UInt_t GetBytesRecv() const
Definition: TSocket.h:150
const char * Data() const
Definition: TString.h:349
virtual void ReadLong64(Long64_t &l)
Definition: TBufferFile.h:483
virtual void ReadInt(Int_t &i)
Definition: TBufferFile.h:456
void Class()
Definition: Class.C:29
virtual char * ReadString(char *s, Int_t max)
Read string from I/O buffer.
void Info(const char *location, const char *msgfmt,...)
char * Buffer() const
Definition: TBuffer.h:93
virtual Bool_t AddAdoptFile(TFile *source, Bool_t cpProgress=kTRUE)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
void Error(const char *location, const char *msgfmt,...)
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition: TMonitor.cxx:322
TFile * GetOutputFile() const
Definition: TFileMerger.h:81
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:388
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
This class provides file copy and merging services.
Definition: TFileMerger.h:30
unsigned int UInt_t
Definition: RtypesCore.h:42
void SetPrintLevel(Int_t level)
Definition: TFileMerger.h:77
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition: TMonitor.cxx:438
ClassImp(TMCParticle) void TMCParticle printf(": p=(%7.3f,%7.3f,%9.3f) ;", fPx, fPy, fPz)
UInt_t What() const
Definition: TMessage.h:80
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Int_t Length() const
Definition: TBuffer.h:96
TClass * GetClass() const
Definition: TMessage.h:76
const Bool_t kTRUE
Definition: Rtypes.h:91
A cache when writing files over the network.