Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
parallelMergeServer.C File Reference

Detailed Description

This script shows how to make a simple iterative server that can accept connections while handling currently open connections.

Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.

To run this demo do the following:

  • Open three windows
  • Start ROOT in all three windows
  • Execute in the first window: .x parallelMergeServer.C
  • Execute in the second and third windows: .x parallelMergeClient.C("<socket path printed by the server>")
#include "TMessage.h"
#include "TBenchmark.h"
#include "TSocket.h"
#include "TH2.h"
#include "TTree.h"
#include "TMemFile.h"
#include "TRandom.h"
#include "TError.h"
#include "TFileMerger.h"
#include "TServerSocket.h"
#include "TPad.h"
#include "TCanvas.h"
#include "TMonitor.h"
#include "TSystem.h"
#include "THashTable.h"
#include "TMath.h"
#include "TTimeStamp.h"
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
#include "TKey.h"
static Bool_t R__NeedInitialMerge(TDirectory *dir)
{
if (dir==nullptr) return kFALSE;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
if (R__NeedInitialMerge(subdir)) {
return kTRUE;
}
} else {
if (nullptr != cl->GetResetAfterMerge()) {
return kTRUE;
}
}
}
return kFALSE;
}
static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
{
if (dir==nullptr) return;
TIter nextkey(dir->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
if (!subdir) {
subdir = (TDirectory *)key->ReadObj();
}
R__DeleteObject(subdir,withReset);
} else {
Bool_t todelete = kFALSE;
if (withReset) {
todelete = (nullptr != cl->GetResetAfterMerge());
} else {
todelete = (nullptr == cl->GetResetAfterMerge());
}
if (todelete) {
key->Delete();
dir->GetListOfKeys()->Remove(key);
delete key;
}
}
}
}
static void R__MigrateKey(TDirectory *destination, TDirectory *source)
{
if (destination==nullptr || source==nullptr) return;
TIter nextkey(source->GetListOfKeys());
TKey *key;
while( (key = (TKey*)nextkey()) ) {
TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
if (!source_subdir) {
source_subdir = (TDirectory *)key->ReadObj();
}
TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
if (!destination_subdir) {
destination_subdir = destination->mkdir(key->GetName());
}
R__MigrateKey(destination,source);
} else {
TKey *oldkey = destination->GetKey(key->GetName());
if (oldkey) {
oldkey->Delete();
delete oldkey;
}
TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
destination->GetFile()->SumBuffer(newkey->GetObjlen());
newkey->WriteFile(0);
if (destination->GetFile()->TestBit(TFile::kWriteError)) {
return;
}
}
}
destination->SaveSelf();
}
struct ClientInfo
{
TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
TString fLocalName;
UInt_t fContactsCount;
TTimeStamp fLastContact;
Double_t fTimeSincePrevContact;
ClientInfo() : fFile(nullptr), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(const char *filename, UInt_t clientId) : fFile(nullptr), fContactsCount(0), fTimeSincePrevContact(0) {
fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
}
void Set(TFile *file)
{
// Register the new file as coming from this client.
if (file != fFile) {
// We need to keep any of the keys from the previous file that
// are not in the new file.
if (fFile) {
R__MigrateKey(fFile,file);
// delete the previous memory file (if any)
delete file;
} else {
fFile = file;
}
}
TTimeStamp now;
fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger : public TObject
{
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact; //
UInt_t fNClientsContact; //
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
{
// Default constructor.
fMerger.SetPrintLevel(0);
fMerger.OutputFile(filename,"RECREATE");
if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
}
~ParallelFileMerger() override
{
// Destructor.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
ULong_t Hash() const override
{
// Return hash value for this object.
return fFilename.Hash();
}
const char *GetName() const override
{
// Return the name of the object which is the name of the output file.
return fFilename;
}
Bool_t InitialMerge(TFile *input)
{
// Initial merge of the input to copy the resetable object (TTree) into the output
// and remove them from the input file.
fMerger.AddFile(input);
R__DeleteObject(input,kTRUE);
return result;
}
Bool_t Merge()
{
// Merge the current inputs into the output file.
R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
fMerger.AddFile(fClients[f].fFile);
}
// Remove any 'resetable' object (like TTree) from the input file so that they will not
// be re-merged. Keep only the object that always need to be re-merged (Histograms).
for(unsigned int f = 0 ; f < fClients.size(); ++f) {
if (fClients[f].fFile) {
R__DeleteObject(fClients[f].fFile,kTRUE);
} else {
// We back up the file (probably due to memory constraint)
TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
file->Write();
delete file;
}
}
fLastMerge = TTimeStamp();
fNClientsContact = 0;
fClientsContact.Clear();
return result;
}
Bool_t NeedFinalMerge()
{
// Return true, if there is any data that has not been merged.
return fClientsContact.CountBits() > 0;
}
Bool_t NeedMerge(Float_t clientThreshold)
{
// Return true, if enough client have reported
if (fClients.empty()) {
return kFALSE;
}
// Calculate average and rms of the time between the last 2 contacts.
Double_t sum2 = 0;
for(unsigned int c = 0 ; c < fClients.size(); ++c) {
sum += fClients[c].fTimeSincePrevContact;
sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
}
Double_t avg = sum / fClients.size();
Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
Double_t target = avg + 2*sigma;
TTimeStamp now;
if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
// Float_t cut = clientThreshold * fClients.size();
// if (!(fClientsContact.CountBits() > cut )) {
// for(unsigned int c = 0 ; c < fClients.size(); ++c) {
// fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
// }
// fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
// }
return kTRUE;
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
}
void RegisterClient(UInt_t clientId, TFile *file)
{
// Register that a client has sent a file.
++fNClientsContact;
fClientsContact.SetBitNumber(clientId);
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
ClassDefOverride(ParallelFileMerger,0);
};
void parallelMergeServer(bool cache = false) {
// Open a server socket looking for connections on a named service
TString socketPath = "rootserv."; // prefix for temporary file in the temp folder
// Get a unique, temporary file name for the socket. We remove and close the file
// immediatly in order to reopen it as a socket. There is a race here: between
// the removal and the creation of the socket, the file could have been recreated.
// But it is unlikely (due to the random letters in the name) and harmless: the socket
// cannot be created in this case.
FILE *dummy = gSystem->TempFileName(socketPath);
if (!dummy) {
Error("fastMergeServer", "Cannot create temporary file for socket\n");
return;
}
std::string strSocketPath(socketPath.View());
remove(strSocketPath.c_str());
fclose(dummy);
TServerSocket *ss = new TServerSocket(socketPath);
if (!ss->IsValid()) {
return;
}
TMonitor *mon = new TMonitor;
mon->Add(ss);
UInt_t clientCount = 0;
UInt_t clientIndex = 0;
THashTable mergers;
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str());
while (true) {
TMessage *mess;
TSocket *s;
// NOTE: this needs to be update to handle the case where the client
// dies.
s = mon->Select();
if (s->IsA() == TServerSocket::Class()) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
mon->Remove(ss);
ss->Close();
} else {
TSocket *client = ((TServerSocket *)s)->Accept();
client->Send(clientIndex, kStartConnection);
client->Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
mon->Add(client);
printf("Accept %d connections\n",clientCount);
}
continue;
}
s->Recv(mess);
if (mess==nullptr) {
Error("fastMergeServer","The client did not send a message\n");
} else if (mess->What() == kMESS_STRING) {
char str[64];
mess->ReadString(str, 64);
printf("Client %d: %s\n", clientCount, str);
mon->Remove(s);
printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
s->GetBytesSent());
s->Close();
--clientCount;
if (mon->GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
} else if (mess->What() == kMESS_ANY) {
TString filename;
Int_t clientId;
mess->ReadInt(clientId);
mess->ReadTString(filename);
mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
// Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
mess->SetBufferOffset(mess->Length()+length);
const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
mergers.Add(info);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
// Enough clients reported.
Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = nullptr;
} else if (mess->What() == kMESS_OBJECT) {
printf("got object of class: %s\n", mess->GetClass()->GetName());
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
TIter next(&mergers);
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
mergers.Delete();
delete mon;
remove(strSocketPath.c_str());
delete ss;
}
@ kMESS_STRING
@ kMESS_ANY
@ kMESS_OBJECT
#define f(i)
Definition RSha256.hxx:104
#define c(i)
Definition RSha256.hxx:101
int Int_t
Signed integer 4 bytes (int).
Definition RtypesCore.h:59
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
Definition RtypesCore.h:60
unsigned long ULong_t
Unsigned long integer 4 bytes (unsigned long). Size depends on architecture.
Definition RtypesCore.h:69
bool Bool_t
Boolean (0=false, 1=true) (bool).
Definition RtypesCore.h:77
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
double Double_t
Double 8 bytes.
Definition RtypesCore.h:73
long long Long64_t
Portable signed long integer 8 bytes.
Definition RtypesCore.h:83
float Float_t
Float 4 bytes (float).
Definition RtypesCore.h:71
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
#define ClassDefOverride(name, id)
Definition Rtypes.h:348
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:241
externTSystem * gSystem
Definition TSystem.h:582
void Clear(Option_t *option="") override
Clear the value.
Definition TBits.cxx:83
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
Definition TBits.cxx:117
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
TClass instances represent classes, structs and namespaces in the ROOT type system.
Definition TClass.h:84
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Definition TClass.cxx:7604
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
Definition TClass.cxx:4932
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Definition TClass.cxx:2994
Describe directory structure in memory.
Definition TDirectory.h:45
static TClass * Class()
virtual TList * GetList() const
Definition TDirectory.h:223
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
virtual TFile * GetFile() const
Definition TDirectory.h:221
virtual TKey * GetKey(const char *, Short_t=9999) const
Definition TDirectory.h:222
virtual void SaveSelf(Bool_t=kFALSE)
Definition TDirectory.h:256
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
Definition TDirectory.h:224
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
Definition TFileMerger.h:99
@ kIncremental
Merge the input file with the content of the output file (if already existing).
Definition TFileMerger.h:82
@ kResetable
Only the objects with a MergeAfterReset member function.
Definition TFileMerger.h:83
@ kAllIncremental
Merge incrementally all type of objects.
Definition TFileMerger.h:88
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsize=0) override
Write memory objects to this file.
Definition TFile.cxx:2488
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
Definition TFile.cxx:2469
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
Definition TFile.cxx:3787
@ kWriteError
Definition TFile.h:268
THashTable implements a hash table to store TObject's.
Definition THashTable.h:35
void Add(TObject *obj) override
Add object to the hash table.
TObject * FindObject(const char *name) const override
Find object using its name.
void Delete(Option_t *option="") override
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
Definition TKey.h:28
void Delete(Option_t *option="") override
Delete an object from the file.
Definition TKey.cxx:572
Int_t GetObjlen() const
Definition TKey.h:89
virtual const char * GetClassName() const
Definition TKey.h:77
virtual TObject * ReadObj()
To read a TObject* from the file.
Definition TKey.cxx:792
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=nullptr)
Write the encoded object supported by this key.
Definition TKey.cxx:1472
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
Definition TList.cxx:708
TObject * Remove(TObject *obj) override
Remove object from the list.
Definition TList.cxx:952
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:27
UInt_t What() const
Definition TMessage.h:77
TClass * GetClass() const
Definition TMessage.h:73
TSocket * Select()
Return pointer to socket for which an event is waiting.
Definition TMonitor.cxx:321
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Definition TMonitor.cxx:167
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
Definition TMonitor.cxx:437
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
Definition TMonitor.cxx:213
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
Mother of all ROOT objects.
Definition TObject.h:42
Bool_t TestBit(UInt_t f) const
Definition TObject.h:204
virtual const char * GetName() const
Returns name of object.
Definition TObject.cxx:462
virtual ULong_t Hash() const
Return hash value for this object.
Definition TObject.cxx:539
This class implements server sockets.
static TClass * Class()
This class implements client sockets.
Definition TSocket.h:39
UInt_t GetBytesRecv() const
Definition TSocket.h:121
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:807
UInt_t GetBytesSent() const
Definition TSocket.h:120
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:378
TClass * IsA() const override
Definition TSocket.h:164
virtual Bool_t IsValid() const
Definition TSocket.h:131
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:511
Basic string class.
Definition TString.h:138
std::string_view View() const
Definition TString.h:461
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
Definition TString.cxx:684
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition TString.cxx:2363
Double_t AsDouble() const
Definition TTimeStamp.h:112
const Double_t sigma
Double_t Sqrt(Double_t x)
Returns the square root of x.
Definition TMath.h:673
BVH_ALWAYS_INLINE T length(const Vec< T, N > &v)
Definition vec.h:122
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2338
Author
Fons Rademakers

Definition in file parallelMergeServer.C.