Logo ROOT   6.08/07
Reference Guide
parallelMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can accept connections while handling currently open connections.

Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "THashTable.h"
#include "TMath.h"
#include "TTimeStamp.h"
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
#include "TKey.h"
static Bool_t R__NeedInitialMerge(TDirectory *dir)
{
if (dir==0) return kFALSE;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TClass *cl = TClass::GetClass(key->GetClassName());
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
if (R__NeedInitialMerge(subdir)) {
return kTRUE;
}
} else {
if (0 != cl->GetResetAfterMerge()) {
return kTRUE;
}
}
}
return kFALSE;
}
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
{
if (dir==0) return;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TClass *cl = TClass::GetClass(key->GetClassName());
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
R__DeleteObject(subdir,withReset);
} else {
Bool_t todelete = kFALSE;
if (withReset) {
todelete = (0 != cl->GetResetAfterMerge());
} else {
todelete = (0 == cl->GetResetAfterMerge());
}
if (todelete) {
key->Delete();
dir->GetListOfKeys()->Remove(key);
delete key;
}
}
}
}
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
{
if (destination==0 || source==0) return;
TIter nextkey(source->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TClass *cl = TClass::GetClass(key->GetClassName());
TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
if (!source_subdir) {
source_subdir = (TDirectory *)key->ReadObj();
}
TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
if (!destination_subdir) {
destination_subdir = destination->mkdir(key->GetName());
}
R__MigrateKey(destination,source);
} else {
TKey *oldkey = destination->GetKey(key->GetName());
if (oldkey) {
oldkey->Delete();
delete oldkey;
}
TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
destination->GetFile()->SumBuffer(newkey->GetObjlen());
newkey->WriteFile(0);
if (destination->GetFile()->TestBit(TFile::kWriteError)) {
return;
}
}
}
destination->SaveSelf();
}
struct ClientInfo
{
TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
TString fLocalName;
UInt_t fContactsCount;
TTimeStamp fLastContact;
Double_t fTimeSincePrevContact;
ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
}
void Set(TFile *file)
{
// Register the new file as coming from this client.
if (file != fFile) {
// We need to keep any of the keys from the previous file that
// are not in the new file.
if (fFile) {
R__MigrateKey(fFile,file);
// delete the previous memory file (if any)
delete file;
} else {
fFile = file;
}
}
fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger : public TObject
{
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact; //
UInt_t fNClientsContact; //
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
{
// Default constructor.
fMerger.SetPrintLevel(0);
fMerger.OutputFile(filename,"RECREATE");
if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
}
~ParallelFileMerger()
{
// Destructor.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
ULong_t Hash() const
{
// Return hash value for this object.
return fFilename.Hash();
}
const char *GetName() const
{
// Return the name of the object which is the name of the output file.
return fFilename;
}
Bool_t InitialMerge(TFile *input)
{
// Initial merge of the input to copy the resetable object (TTree) into the output
// and remove them from the input file.
fMerger.AddFile(input);
R__DeleteObject(input,kTRUE);
return result;
}
Bool_t Merge()
{
// Merge the current inputs into the output file.
R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fMerger.AddFile(fClients[f].fFile);
}
// Remove any 'resetable' object (like TTree) from the input file so that they will not
// be re-merged. Keep only the object that always need to be re-merged (Histograms).
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
if (fClients[f].fFile) {
R__DeleteObject(fClients[f].fFile,kTRUE);
} else {
// We back up the file (probably due to memory constraint)
TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
file->Write();
delete file;
}
}
fLastMerge = TTimeStamp();
fNClientsContact = 0;
fClientsContact.Clear();
return result;
}
Bool_t NeedFinalMerge()
{
// Return true, if there is any data that has not been merged.
return fClientsContact.CountBits() > 0;
}
Bool_t NeedMerge(Float_t clientThreshold)
{
// Return true, if enough client have reported
if (fClients.size()==0) {
return kFALSE;
}
// Calculate average and rms of the time between the last 2 contacts.
Double_t sum = 0;
Double_t sum2 = 0;
for(unsigned int c = 0 ; c < fClients.size(); ++c) {
sum += fClients[c].fTimeSincePrevContact;
sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
}
Double_t avg = sum / fClients.size();
Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
Double_t target = avg + 2*sigma;
if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
// Float_t cut = clientThreshold * fClients.size();
// if (!(fClientsContact.CountBits() > cut )) {
// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
// }
// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
// }
return kTRUE;
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
}
void RegisterClient(UInt_t clientId, TFile *file)
{
// Register that a client has sent a file.
++fNClientsContact;
fClientsContact.SetBitNumber(clientId);
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
ClassDef(ParallelFileMerger,0);
};
void parallelMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
UInt_t clientIndex = 0;
THashTable mergers;
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections\n");
while (1) {
TMessage *mess;
TSocket *s;
// NOTE: this needs to be update to handle the case where the client
// dies.
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientIndex, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
Long64_t length;
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
// Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
mess->SetBufferOffset(mess->Length()+length);
const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
mergers.Add(info);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
// Enough clients reported.
Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
TIter next(&mergers);
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
mergers.Delete();
delete mon;
delete ss;
}
Author
Fons Rademakers

Definition in file parallelMergeServer.C.