Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
fastMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can receive TMemFile from multiple clients and merge them into a single file without block.

Note: This server assumes that the client will reset the histogram after each upload to simplify the merging.

This server can accept connections while handling currently open connections. Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x fastMergerServer.C
  • Execute in the second and third windows: .x treeClient.C
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
void fastMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(9090, kTRUE);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
TMemFile *transient = nullptr;
merger.SetPrintLevel(0);
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
if (cache) new TFileCacheWrite(merger.GetOutputFile(),32*1024*1024);
while (true) {
TMessage *mess;
TSocket *s;
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientCount, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==nullptr) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
Info("fastMergeServer","Receive input from client %d for %s",clientId,filename.Data());
delete transient;
transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length);
mess->SetBufferOffset(mess->Length()+length);
merger.OutputFile(filename,"UPDATE");
merger.AddAdoptFile(transient);
merger.PartialMerge(TFileMerger::kAllIncremental);
transient = nullptr;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
int Int_t
Signed integer 4 bytes (int).
Definition RtypesCore.h:59
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
Definition RtypesCore.h:60
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:241
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:88
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:27
UInt_t What() const
Definition TMessage.h:77
TClass * GetClass() const
Definition TMessage.h:73
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:321
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:167
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:437
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition TMonitor.cxx:213
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
This class implements server sockets.
static TClass * Class()
This class implements client sockets.
Definition TSocket.h:39
UInt_t GetBytesRecv() const
Definition TSocket.h:121
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:807
UInt_t GetBytesSent() const
Definition TSocket.h:120
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:378
TClass * IsA() const override
Definition TSocket.h:164
virtual Bool_t IsValid() const
Definition TSocket.h:131
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:511
Basic string class.
Definition TString.h:138
const char * Data() const
Definition TString.h:384
BVH_ALWAYS_INLINE T length(const Vec< T, N > &v)
Definition vec.h:122
Author
Fons Rademakers

Definition in file fastMergeServer.C.