This script shows how to make a simple iterative server that can accept connections while handling currently open connections.
Compare this script to hserv.C that blocks on accept. In this script a server socket is created and added to a monitor. A monitor object is used to monitor connection requests on the server socket. After accepting the connection the new socket is added to the monitor and immediately ready for use. Once two connections are accepted the server socket is removed from the monitor and closed. The monitor continues monitoring the sockets.
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
{
if (dir==
nullptr)
return kFALSE;
while( (key = (
TKey*)nextkey()) ) {
if (!subdir) {
}
if (R__NeedInitialMerge(subdir)) {
}
} else {
}
}
}
}
{
if (dir==nullptr) return;
while( (key = (
TKey*)nextkey()) ) {
if (!subdir) {
}
R__DeleteObject(subdir,withReset);
} else {
if (withReset) {
} else {
}
if (todelete) {
delete key;
}
}
}
}
{
if (destination==nullptr || source==nullptr) return;
while( (key = (
TKey*)nextkey()) ) {
if (!source_subdir) {
}
if (!destination_subdir) {
}
R__MigrateKey(destination,source);
} else {
if (oldkey) {
delete oldkey;
}
TKey *newkey =
new TKey(destination,*key,0 );
return;
}
}
}
}
struct ClientInfo
{
TFile *fFile;
TString fLocalName;
TTimeStamp fLastContact;
ClientInfo() : fFile(nullptr), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(
const char *filename,
UInt_t clientId) : fFile(nullptr), fContactsCount(0), fTimeSincePrevContact(0) {
fLocalName.
Form(
"%s-%d-%d",filename,clientId,
gSystem->GetPid());
}
void Set(TFile *file)
{
if (file != fFile) {
if (fFile) {
R__MigrateKey(fFile,file);
delete file;
} else {
fFile = file;
}
}
TTimeStamp now;
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger :
public TObject
{
typedef std::vector<ClientInfo> ClientColl_t;
TString fFilename;
TBits fClientsContact;
ClientColl_t fClients;
TTimeStamp fLastMerge;
TFileMerger fMerger;
ParallelFileMerger(
const char *filename,
Bool_t writeCache =
kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(
kFALSE,
kTRUE)
{
if (writeCache)
new TFileCacheWrite(fMerger.
GetOutputFile(),32*1024*1024);
}
~ParallelFileMerger() override
{
for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
fprintf(stderr,
"Client %d reported %u times\n",
f,fClients[
f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
{
}
const char *
GetName()
const override
{
return fFilename;
}
Bool_t InitialMerge(TFile *input)
{
R__DeleteObject(input,
kTRUE);
return result;
}
{
for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
}
for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
R__DeleteObject(fClients[
f].fFile,
kTRUE);
} else {
R__DeleteObject(file,
kTRUE);
delete file;
}
}
fLastMerge = TTimeStamp();
fNClientsContact = 0;
return result;
}
{
}
{
if (fClients.empty()) {
}
for(
unsigned int c = 0 ;
c < fClients.size(); ++
c) {
sum += fClients[
c].fTimeSincePrevContact;
sum2 += fClients[
c].fTimeSincePrevContact*fClients[
c].fTimeSincePrevContact;
}
TTimeStamp now;
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.
CountBits() > cut || fNClientsContact > 2*cut;
}
void RegisterClient(
UInt_t clientId, TFile *file)
{
++fNClientsContact;
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
};
void parallelMergeServer(bool cache = false) {
FILE *dummy =
gSystem->TempFileName(socketPath);
if (!dummy) {
Error(
"fastMergeServer",
"Cannot create temporary file for socket\n");
return;
}
std::string strSocketPath(socketPath.
View());
remove(strSocketPath.c_str());
fclose(dummy);
return;
}
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str());
while (true) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
} else {
client->
Send(clientIndex, kStartConnection);
client->
Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
printf("Accept %d connections\n",clientCount);
}
continue;
}
if (mess==nullptr) {
Error(
"fastMergeServer",
"The client did not send a message\n");
char str[64];
printf("Client %d: %s\n", clientCount, str);
printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->
GetBytesRecv(),
--clientCount;
if (mon->
GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
const Float_t clientThreshold = 0.75;
ParallelFileMerger *info = (ParallelFileMerger*)mergers.
FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
Info(
"fastMergeServerHist",
"Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = nullptr;
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
delete mon;
remove(strSocketPath.c_str());
delete ss;
}
int Int_t
Signed integer 4 bytes (int).
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
unsigned long ULong_t
Unsigned long integer 4 bytes (unsigned long). Size depends on architecture.
bool Bool_t
Boolean (0=false, 1=true) (bool).
double Double_t
Double 8 bytes.
long long Long64_t
Portable signed long integer 8 bytes.
float Float_t
Float 4 bytes (float).
#define ClassDefOverride(name, id)
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
void Clear(Option_t *option="") override
Clear the value.
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
TClass instances represent classes, structs and namespaces in the ROOT type system.
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Describe directory structure in memory.
virtual TList * GetList() const
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
virtual TFile * GetFile() const
virtual TKey * GetKey(const char *, Short_t=9999) const
virtual void SaveSelf(Bool_t=kFALSE)
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
@ kIncremental
Merge the input file with the content of the output file (if already existing).
@ kResetable
Only the objects with a MergeAfterReset member function.
@ kAllIncremental
Merge incrementally all type of objects.
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsize=0) override
Write memory objects to this file.
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
THashTable implements a hash table to store TObject's.
void Add(TObject *obj) override
Add object to the hash table.
TObject * FindObject(const char *name) const override
Find object using its name.
void Delete(Option_t *option="") override
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
void Delete(Option_t *option="") override
Delete an object from the file.
virtual const char * GetClassName() const
virtual TObject * ReadObj()
To read a TObject* from the file.
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=nullptr)
Write the encoded object supported by this key.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
TObject * Remove(TObject *obj) override
Remove object from the list.
A TMemFile is like a normal TFile except that it reads and writes only from memory.
TClass * GetClass() const
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
Bool_t TestBit(UInt_t f) const
virtual const char * GetName() const
Returns name of object.
virtual ULong_t Hash() const
Return hash value for this object.
This class implements server sockets.
This class implements client sockets.
UInt_t GetBytesRecv() const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
UInt_t GetBytesSent() const
virtual void Close(Option_t *opt="")
Close the socket.
TClass * IsA() const override
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
std::string_view View() const
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Double_t AsDouble() const
Double_t Sqrt(Double_t x)
Returns the square root of x.
BVH_ALWAYS_INLINE T length(const Vec< T, N > &v)
static uint64_t sum(uint64_t i)