Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
fastMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can receive TMemFile from multiple clients and merge them into a single file without block.

Note: This server assumes that the client will reset the histogram after each upload to simplify the merging.

This server can accept connections while handling currently open connections. Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x fastMergerServer.C
  • Execute in the second and third windows: .x treeClient.C
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
void fastMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(9090, kTRUE);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
TMemFile *transient = 0;
merger.SetPrintLevel(0);
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
while (1) {
TMessage *mess;
TSocket *s;
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientCount, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
Long64_t length;
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
delete transient;
transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
mess->SetBufferOffset(mess->Length()+length);
merger.OutputFile(filename,"UPDATE");
merger.AddAdoptFile(transient);
merger.PartialMerge(TFileMerger::kAllIncremental);
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
int Int_t
Definition RtypesCore.h:45
unsigned int UInt_t
Definition RtypesCore.h:46
const Bool_t kFALSE
Definition RtypesCore.h:101
long long Long64_t
Definition RtypesCore.h:80
const Bool_t kTRUE
Definition RtypesCore.h:100
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:220
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:187
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:77
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
UInt_t What() const
Definition TMessage.h:75
TClass * GetClass() const
Definition TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition TMonitor.cxx:214
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
UInt_t GetBytesRecv() const
Definition TSocket.h:120
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
UInt_t GetBytesSent() const
Definition TSocket.h:119
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:136
const char * Data() const
Definition TString.h:369
Author
Fons Rademakers

Definition in file fastMergeServer.C.