Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
parallelMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can accept connections while handling currently open connections.

Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x hserv2.C
  • Execute in the second and third windows: .x hclient.C
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "THashTable.h"
#include "TMath.h"
#include "TTimeStamp.h"
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
#include "TKey.h"
static Bool_t R__NeedInitialMerge(TDirectory *dir)
{
if (dir==0) return kFALSE;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
if (cl->InheritsFrom(TDirectory::Class())) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
if (R__NeedInitialMerge(subdir)) {
return kTRUE;
}
} else {
if (0 != cl->GetResetAfterMerge()) {
return kTRUE;
}
}
}
return kFALSE;
}
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
{
if (dir==0) return;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
if (cl->InheritsFrom(TDirectory::Class())) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
R__DeleteObject(subdir,withReset);
} else {
Bool_t todelete = kFALSE;
if (withReset) {
todelete = (0 != cl->GetResetAfterMerge());
} else {
todelete = (0 == cl->GetResetAfterMerge());
}
if (todelete) {
key->Delete();
dir->GetListOfKeys()->Remove(key);
delete key;
}
}
}
}
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
{
if (destination==0 || source==0) return;
TIter nextkey(source->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
if (cl->InheritsFrom(TDirectory::Class())) {
TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
if (!source_subdir) {
source_subdir = (TDirectory *)key->ReadObj();
}
TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
if (!destination_subdir) {
destination_subdir = destination->mkdir(key->GetName());
}
R__MigrateKey(destination,source);
} else {
TKey *oldkey = destination->GetKey(key->GetName());
if (oldkey) {
oldkey->Delete();
delete oldkey;
}
TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
destination->GetFile()->SumBuffer(newkey->GetObjlen());
newkey->WriteFile(0);
if (destination->GetFile()->TestBit(TFile::kWriteError)) {
return;
}
}
}
destination->SaveSelf();
}
struct ClientInfo
{
TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
TString fLocalName;
UInt_t fContactsCount;
TTimeStamp fLastContact;
Double_t fTimeSincePrevContact;
ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
}
void Set(TFile *file)
{
// Register the new file as coming from this client.
if (file != fFile) {
// We need to keep any of the keys from the previous file that
// are not in the new file.
if (fFile) {
R__MigrateKey(fFile,file);
// delete the previous memory file (if any)
delete file;
} else {
fFile = file;
}
}
fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger : public TObject
{
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact; //
UInt_t fNClientsContact; //
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
{
// Default constructor.
fMerger.SetPrintLevel(0);
fMerger.OutputFile(filename,"RECREATE");
if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
}
~ParallelFileMerger()
{
// Destructor.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
ULong_t Hash() const
{
// Return hash value for this object.
return fFilename.Hash();
}
const char *GetName() const
{
// Return the name of the object which is the name of the output file.
return fFilename;
}
Bool_t InitialMerge(TFile *input)
{
// Initial merge of the input to copy the resetable object (TTree) into the output
// and remove them from the input file.
fMerger.AddFile(input);
R__DeleteObject(input,kTRUE);
return result;
}
Bool_t Merge()
{
// Merge the current inputs into the output file.
R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fMerger.AddFile(fClients[f].fFile);
}
// Remove any 'resetable' object (like TTree) from the input file so that they will not
// be re-merged. Keep only the object that always need to be re-merged (Histograms).
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
if (fClients[f].fFile) {
R__DeleteObject(fClients[f].fFile,kTRUE);
} else {
// We back up the file (probably due to memory constraint)
TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
file->Write();
delete file;
}
}
fLastMerge = TTimeStamp();
fNClientsContact = 0;
fClientsContact.Clear();
return result;
}
Bool_t NeedFinalMerge()
{
// Return true, if there is any data that has not been merged.
return fClientsContact.CountBits() > 0;
}
Bool_t NeedMerge(Float_t clientThreshold)
{
// Return true, if enough client have reported
if (fClients.size()==0) {
return kFALSE;
}
// Calculate average and rms of the time between the last 2 contacts.
Double_t sum2 = 0;
for(unsigned int c = 0 ; c < fClients.size(); ++c) {
sum += fClients[c].fTimeSincePrevContact;
sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
}
Double_t avg = sum / fClients.size();
Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
Double_t target = avg + 2*sigma;
if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
// Float_t cut = clientThreshold * fClients.size();
// if (!(fClientsContact.CountBits() > cut )) {
// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
// }
// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
// }
return kTRUE;
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
}
void RegisterClient(UInt_t clientId, TFile *file)
{
// Register that a client has sent a file.
++fNClientsContact;
fClientsContact.SetBitNumber(clientId);
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
ClassDef(ParallelFileMerger,0);
};
void parallelMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
UInt_t clientIndex = 0;
THashTable mergers;
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections\n");
while (1) {
TMessage *mess;
TSocket *s;
// NOTE: this needs to be update to handle the case where the client
// dies.
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientIndex, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
Long64_t length;
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
// Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
mess->SetBufferOffset(mess->Length()+length);
const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
mergers.Add(info);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
// Enough clients reported.
Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
TIter next(&mergers);
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
mergers.Delete();
delete mon;
delete ss;
}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
int Int_t
Definition RtypesCore.h:45
unsigned int UInt_t
Definition RtypesCore.h:46
const Bool_t kFALSE
Definition RtypesCore.h:101
unsigned long ULong_t
Definition RtypesCore.h:55
bool Bool_t
Definition RtypesCore.h:63
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
float Float_t
Definition RtypesCore.h:57
const Bool_t kTRUE
Definition RtypesCore.h:100
#define ClassDef(name, id)
Definition Rtypes.h:325
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:220
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:187
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
Container of bits.
Definition TBits.h:26
void Clear(Option_t *option="")
Clear the value.
Definition TBits.cxx:84
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
Definition TBits.cxx:118
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:80
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition TClass.cxx:7425
Bool_t InheritsFrom(const char *cl) const
Return kTRUE if this class inherits from a class with name "classname".
Definition TClass.cxx:4860
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition TClass.cxx:2966
Describe directory structure in memory.
Definition TDirectory.h:45
virtual TList * GetList() const
Definition TDirectory.h:222
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
virtual TFile * GetFile() const
Definition TDirectory.h:220
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition TDirectory.h:221
virtual void SaveSelf(Bool_t=kFALSE)
Definition TDirectory.h:255
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
Definition TDirectory.h:223
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
Definition TFileMerger.h:92
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition TFileMerger.h:88
@ kIncremental
Merge the input file with the content of the output file (if already existing).
Definition TFileMerger.h:71
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition TFileMerger.h:72
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:77
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition TFile.cxx:2355
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4025
@ kWriteError
Definition TFile.h:188
THashTable implements a hash table to store TObject's.
Definition THashTable.h:35
void Add(TObject *obj)
Add object to the hash table.
TObject * FindObject(const char *name) const
Find object using its name.
void Delete(Option_t *option="")
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
virtual void Delete(Option_t *option="")
Delete an object from the file.
Definition TKey.cxx:538
Int_t GetObjlen() const
Definition TKey.h:88
virtual const char * GetClassName() const
Definition TKey.h:76
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=0)
Write the encoded object supported by this key.
Definition TKey.cxx:1450
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition TKey.cxx:750
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition TList.cxx:822
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition TList.cxx:578
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
UInt_t What() const
Definition TMessage.h:75
TClass * GetClass() const
Definition TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition TMonitor.cxx:214
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:429
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition TObject.h:201
virtual ULong_t Hash() const
Return hash value for this object.
Definition TObject.cxx:505
UInt_t GetBytesRecv() const
Definition TSocket.h:120
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
UInt_t GetBytesSent() const
Definition TSocket.h:119
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:136
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition TString.cxx:662
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2314
virtual int GetPid()
Get process id.
Definition TSystem.cxx:710
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:71
Double_t AsDouble() const
Definition TTimeStamp.h:138
const Double_t sigma
Double_t Sqrt(Double_t x)
Definition TMath.h:641
Definition file.py:1
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2345
Author
Fons Rademakers

Definition in file parallelMergeServer.C.