This script shows how to make a simple iterative server that can accept connections while handling currently open connections.
const int kIncremental = 0;
const int kReplaceImmediately = 1;
const int kReplaceWait = 2;
{
while( (key = (
TKey*)nextkey()) ) {
if (!subdir) {
}
if (R__NeedInitialMerge(subdir)) {
}
} else {
}
}
}
}
{
if (dir==0) return;
while( (key = (
TKey*)nextkey()) ) {
if (!subdir) {
}
R__DeleteObject(subdir,withReset);
} else {
if (withReset) {
} else {
}
if (todelete) {
key->Delete();
delete key;
}
}
}
}
{
if (destination==0 || source==0) return;
while( (key = (
TKey*)nextkey()) ) {
if (!source_subdir) {
}
if (!destination_subdir) {
destination_subdir = destination->
mkdir(key->GetName());
}
R__MigrateKey(destination,source);
} else {
TKey *oldkey = destination->
GetKey(key->GetName());
if (oldkey) {
delete oldkey;
}
TKey *newkey =
new TKey(destination,*key,0 );
return;
}
}
}
}
struct ClientInfo
{
ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
ClientInfo(
const char *filename,
UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
}
{
if (file != fFile) {
if (fFile) {
R__MigrateKey(fFile,file);
delete file;
} else {
fFile = file;
}
}
fLastContact = now;
++fContactsCount;
}
};
struct ParallelFileMerger :
public TObject {
typedef std::vector<ClientInfo> ClientColl_t;
ClientColl_t fClients;
ParallelFileMerger(
const char *filename,
Bool_t writeCache =
kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(
kFALSE,
kTRUE)
{
}
~ParallelFileMerger()
{
for(
unsigned int f = 0 ; f < fClients.size(); ++
f) {
fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
}
for( ClientColl_t::iterator iter = fClients.begin();
iter != fClients.end();
++iter)
{
delete iter->fFile;
}
}
{
}
{
return fFilename;
}
{
R__DeleteObject(input,
kTRUE);
}
{
for(
unsigned int f = 0 ; f < fClients.size(); ++
f) {
fMerger.
AddFile(fClients[f].fFile);
}
for(
unsigned int f = 0 ; f < fClients.size(); ++
f) {
if (fClients[f].fFile) {
R__DeleteObject(fClients[f].fFile,
kTRUE);
} else {
R__DeleteObject(file,
kTRUE);
delete file;
}
}
fNClientsContact = 0;
}
{
}
{
if (fClients.size()==0) {
}
for(
unsigned int c = 0 ; c < fClients.size(); ++
c) {
sum += fClients[
c].fTimeSincePrevContact;
sum2 += fClients[
c].fTimeSincePrevContact*fClients[
c].fTimeSincePrevContact;
}
}
Float_t cut = clientThreshold * fClients.size();
return fClientsContact.
CountBits() > cut || fNClientsContact > 2*cut;
}
{
++fNClientsContact;
if (fClients.size() < clientId+1) {
fClients.push_back( ClientInfo(fFilename,clientId) );
}
fClients[clientId].Set(file);
}
};
void parallelMergeServer(bool cache = false) {
return;
}
enum StatusKind {
kStartConnection = 0,
kProtocol = 1,
kProtocolVersion = 1
};
printf("fastMergeServerHist ready to accept connections\n");
while (1) {
if (clientCount > 100) {
printf("only accept 100 clients connections\n");
} else {
client->
Send(clientIndex, kStartConnection);
client->
Send(kProtocolVersion, kProtocol);
++clientCount;
++clientIndex;
printf("Accept %d connections\n",clientCount);
}
continue;
}
if (mess==0) {
Error(
"fastMergeServer",
"The client did not send a message\n");
char str[64];
printf("Client %d: %s\n", clientCount, str);
printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->
GetBytesRecv(),
--clientCount;
if (mon->
GetActive() == 0 || clientCount == 0) {
printf("No more active clients... stopping\n");
break;
}
const Float_t clientThreshold = 0.75;
ParallelFileMerger *info = (ParallelFileMerger*)mergers.
FindObject(filename);
if (!info) {
info = new ParallelFileMerger(filename,cache);
}
if (R__NeedInitialMerge(transient)) {
info->InitialMerge(transient);
}
info->RegisterClient(clientId,transient);
if (info->NeedMerge(clientThreshold)) {
Info(
"fastMergeServerHist",
"Merging input from %ld clients (%d)",info->fClients.size(),clientId);
info->Merge();
}
transient = 0;
} else {
printf("*** Unexpected message ***\n");
}
delete mess;
}
ParallelFileMerger *info;
while ( (info = (ParallelFileMerger*)next()) ) {
if (info->NeedFinalMerge())
{
info->Merge();
}
}
delete mon;
delete ss;
}