Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
parallelMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can accept connections while handling currently open connections.

Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x hserv2.C
  • Execute in the second and third windows: .x hclient.C
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "THashTable.h"
#include "TMath.h"
#include "TTimeStamp.h"
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
#include "TKey.h"
static Bool_t R__NeedInitialMerge(TDirectory *dir)
{
if (dir==0) return kFALSE;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
if (R__NeedInitialMerge(subdir)) {
return kTRUE;
}
} else {
if (0 != cl->GetResetAfterMerge()) {
return kTRUE;
}
}
}
return kFALSE;
}
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
{
if (dir==0) return;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
R__DeleteObject(subdir,withReset);
} else {
Bool_t todelete = kFALSE;
if (withReset) {
todelete = (0 != cl->GetResetAfterMerge());
} else {
todelete = (0 == cl->GetResetAfterMerge());
}
if (todelete) {
key->Delete();
dir->GetListOfKeys()->Remove(key);
delete key;
}
}
}
}
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
{
if (destination==0 || source==0) return;
TIter nextkey(source->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
if (!source_subdir) {
source_subdir = (TDirectory *)key->ReadObj();
}
TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
if (!destination_subdir) {
destination_subdir = destination->mkdir(key->GetName());
}
R__MigrateKey(destination,source);
} else {
TKey *oldkey = destination->GetKey(key->GetName());
if (oldkey) {
oldkey->Delete();
delete oldkey;
}
TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
destination->GetFile()->SumBuffer(newkey->GetObjlen());
newkey->WriteFile(0);
if (destination->GetFile()->TestBit(TFile::kWriteError)) {
return;
}
}
}
destination->SaveSelf();
}
struct ClientInfo
{
TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
TString fLocalName;
UInt_t fContactsCount;
TTimeStamp fLastContact;
Double_t fTimeSincePrevContact;
ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
}
void Set(TFile *file)
{
// Register the new file as coming from this client.
if (file != fFile) {
// We need to keep any of the keys from the previous file that
// are not in the new file.
if (fFile) {
R__MigrateKey(fFile,file);
// delete the previous memory file (if any)
delete file;
} else {
fFile = file;
}
}
fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger : public TObject
{
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact; //
UInt_t fNClientsContact; //
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
{
// Default constructor.
fMerger.SetPrintLevel(0);
fMerger.OutputFile(filename,"RECREATE");
if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
}
~ParallelFileMerger()
{
// Destructor.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
ULong_t Hash() const
{
// Return hash value for this object.
return fFilename.Hash();
}
const char *GetName() const
{
// Return the name of the object which is the name of the output file.
return fFilename;
}
Bool_t InitialMerge(TFile *input)
{
// Initial merge of the input to copy the resetable object (TTree) into the output
// and remove them from the input file.
fMerger.AddFile(input);
R__DeleteObject(input,kTRUE);
return result;
}
Bool_t Merge()
{
// Merge the current inputs into the output file.
R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fMerger.AddFile(fClients[f].fFile);
}
// Remove any 'resetable' object (like TTree) from the input file so that they will not
// be re-merged. Keep only the object that always need to be re-merged (Histograms).
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
if (fClients[f].fFile) {
R__DeleteObject(fClients[f].fFile,kTRUE);
} else {
// We back up the file (probably due to memory constraint)
TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
file->Write();
delete file;
}
}
fLastMerge = TTimeStamp();
fNClientsContact = 0;
fClientsContact.Clear();
return result;
}
Bool_t NeedFinalMerge()
{
// Return true, if there is any data that has not been merged.
return fClientsContact.CountBits() > 0;
}
Bool_t NeedMerge(Float_t clientThreshold)
{
// Return true, if enough client have reported
if (fClients.size()==0) {
return kFALSE;
}
// Calculate average and rms of the time between the last 2 contacts.
Double_t sum2 = 0;
for(unsigned int c = 0 ; c < fClients.size(); ++c) {
sum += fClients[c].fTimeSincePrevContact;
sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
}
Double_t avg = sum / fClients.size();
Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
Double_t target = avg + 2*sigma;
if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
// Float_t cut = clientThreshold * fClients.size();
// if (!(fClientsContact.CountBits() > cut )) {
// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
// }
// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
// }
return kTRUE;
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
}
void RegisterClient(UInt_t clientId, TFile *file)
{
// Register that a client has sent a file.
++fNClientsContact;
fClientsContact.SetBitNumber(clientId);
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
ClassDef(ParallelFileMerger,0);
};
void parallelMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service or
// on a specified port.
//TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
UInt_t clientIndex = 0;
THashTable mergers;
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections\n");
while (1) {
TMessage *mess;
TSocket *s;
// NOTE: this needs to be update to handle the case where the client
// dies.
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientIndex, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==0) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
// Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
mess->SetBufferOffset(mess->Length()+length);
const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
mergers.Add(info);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
// Enough clients reported.
Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = 0;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
TIter next(&mergers);
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
mergers.Delete();
delete mon;
delete ss;
}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
bool Bool_t
Definition RtypesCore.h:63
int Int_t
Definition RtypesCore.h:45
unsigned int UInt_t
Definition RtypesCore.h:46
float Float_t
Definition RtypesCore.h:57
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:80
constexpr Bool_t kTRUE
Definition RtypesCore.h:100
unsigned long ULong_t
Definition RtypesCore.h:55
#define ClassDef(name, id)
Definition Rtypes.h:337
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:230
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:197
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void input
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t target
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t result
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h length
R__EXTERN TSystem * gSystem
Definition TSystem.h:560
Container of bits.
Definition TBits.h:26
void Clear(Option_t *option="") override
Clear the value.
Definition TBits.cxx:84
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
Definition TBits.cxx:118
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:81
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition TClass.cxx:7439
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
Definition TClass.cxx:4874
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition TClass.cxx:2968
Describe directory structure in memory.
Definition TDirectory.h:45
static TClass * Class()
virtual TList * GetList() const
Definition TDirectory.h:222
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
virtual TFile * GetFile() const
Definition TDirectory.h:220
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition TDirectory.h:221
virtual void SaveSelf(Bool_t=kFALSE)
Definition TDirectory.h:255
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
Definition TDirectory.h:223
A cache when writing files over the network.
This class provides file copy and merging services.
Definition TFileMerger.h:30
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
Definition TFileMerger.h:92
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition TFileMerger.h:88
@ kIncremental
Merge the input file with the content of the output file (if already existing).
Definition TFileMerger.h:71
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition TFileMerger.h:72
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:77
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:51
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition TFile.cxx:2381
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:4053
@ kWriteError
Definition TFile.h:185
THashTable implements a hash table to store TObject's.
Definition THashTable.h:35
void Add(TObject *obj) override
Add object to the hash table.
TObject * FindObject(const char *name) const override
Find object using its name.
void Delete(Option_t *option="") override
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
void Delete(Option_t *option="") override
Delete an object from the file.
Definition TKey.cxx:538
Int_t GetObjlen() const
Definition TKey.h:87
virtual const char * GetClassName() const
Definition TKey.h:75
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition TKey.cxx:750
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=nullptr)
Write the encoded object supported by this key.
Definition TKey.cxx:1450
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:578
TObject * Remove(TObject *obj) override
Remove object from the list.
Definition TList.cxx:822
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
UInt_t What() const
Definition TMessage.h:75
TClass * GetClass() const
Definition TMessage.h:71
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:322
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:168
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:438
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition TMonitor.cxx:214
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:41
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:439
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition TObject.h:201
virtual ULong_t Hash() const
Return hash value for this object.
Definition TObject.cxx:515
static TClass * Class()
UInt_t GetBytesRecv() const
Definition TSocket.h:120
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
UInt_t GetBytesSent() const
Definition TSocket.h:119
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
TClass * IsA() const override
Definition TSocket.h:171
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:139
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition TString.cxx:670
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2334
virtual int GetPid()
Get process id.
Definition TSystem.cxx:710
The TTimeStamp encapsulates seconds and ns since EPOCH.
Definition TTimeStamp.h:45
Double_t AsDouble() const
Definition TTimeStamp.h:112
const Double_t sigma
Double_t Sqrt(Double_t x)
Returns the square root of x.
Definition TMath.h:660
Definition file.py:1
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2345
Author
Fons Rademakers

Definition in file parallelMergeServer.C.