46const int kIncremental = 0;
47const int kReplaceImmediately = 1;
48const int kReplaceWait = 2;
54 if (dir==
nullptr)
return kFALSE;
58 while( (key = (
TKey*)nextkey()) ) {
65 if (R__NeedInitialMerge(subdir)) {
79 if (dir==
nullptr)
return;
83 while( (key = (
TKey*)nextkey()) ) {
90 R__DeleteObject(subdir,withReset);
109 if (destination==
nullptr || source==
nullptr)
return;
113 while( (key = (
TKey*)nextkey()) ) {
117 if (!source_subdir) {
121 if (!destination_subdir) {
122 destination_subdir = destination->
mkdir(key->
GetName());
124 R__MigrateKey(destination,source);
131 TKey *newkey =
new TKey(destination,*key,0 );
147 TTimeStamp fLastContact;
150 ClientInfo() : fFile(nullptr), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151 ClientInfo(
const char *filename,
UInt_t clientId) : fFile(nullptr), fContactsCount(0), fTimeSincePrevContact(0) {
152 fLocalName.
Form(
"%s-%d-%d",filename,clientId,
gSystem->GetPid());
155 void Set(TFile *file)
162 R__MigrateKey(fFile,file);
176struct ParallelFileMerger :
public TObject
178 typedef std::vector<ClientInfo> ClientColl_t;
181 TBits fClientsContact;
183 ClientColl_t fClients;
184 TTimeStamp fLastMerge;
187 ParallelFileMerger(
const char *filename,
Bool_t writeCache =
kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(
kFALSE,
kTRUE)
193 if (writeCache)
new TFileCacheWrite(fMerger.
GetOutputFile(),32*1024*1024);
196 ~ParallelFileMerger()
override
200 for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
201 fprintf(stderr,
"Client %d reported %u times\n",
f,fClients[
f].fContactsCount);
203 for( ClientColl_t::iterator iter = fClients.begin();
204 iter != fClients.end();
214 return fFilename.
Hash();
217 const char *
GetName()
const override
223 Bool_t InitialMerge(TFile *input)
232 R__DeleteObject(input,
kTRUE);
241 for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
248 for(
unsigned int f = 0 ;
f < fClients.size(); ++
f) {
249 if (fClients[
f].fFile) {
250 R__DeleteObject(fClients[
f].fFile,
kTRUE);
253 TFile *file =
TFile::Open(fClients[
f].fLocalName,
"UPDATE");
254 R__DeleteObject(file,
kTRUE);
259 fLastMerge = TTimeStamp();
260 fNClientsContact = 0;
261 fClientsContact.
Clear();
277 if (fClients.empty()) {
284 for(
unsigned int c = 0 ;
c < fClients.size(); ++
c) {
285 sum += fClients[
c].fTimeSincePrevContact;
286 sum2 += fClients[
c].fTimeSincePrevContact*fClients[
c].fTimeSincePrevContact;
302 Float_t cut = clientThreshold * fClients.size();
303 return fClientsContact.
CountBits() > cut || fNClientsContact > 2*cut;
306 void RegisterClient(
UInt_t clientId, TFile *file)
312 if (fClients.size() < clientId+1) {
313 fClients.push_back( ClientInfo(fFilename,clientId) );
315 fClients[clientId].Set(file);
321void parallelMergeServer(
bool cache =
false) {
323 TString socketPath =
"rootserv.";
329 FILE *dummy =
gSystem->TempFileName(socketPath);
331 Error(
"fastMergeServer",
"Cannot create temporary file for socket\n");
335 std::string strSocketPath(socketPath.
View());
336 remove(strSocketPath.c_str());
353 kStartConnection = 0,
359 printf(
"fastMergeServerHist ready to accept connections on %s\n", strSocketPath.c_str());
369 if (clientCount > 100) {
370 printf(
"only accept 100 clients connections\n");
375 client->
Send(clientIndex, kStartConnection);
376 client->
Send(kProtocolVersion, kProtocol);
380 printf(
"Accept %d connections\n",clientCount);
388 Error(
"fastMergeServer",
"The client did not send a message\n");
392 printf(
"Client %d: %s\n", clientCount, str);
394 printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->
GetBytesRecv(),
398 if (mon->
GetActive() == 0 || clientCount == 0) {
399 printf(
"No more active clients... stopping\n");
416 const Float_t clientThreshold = 0.75;
418 ParallelFileMerger *info = (ParallelFileMerger*)mergers.
FindObject(filename);
420 info =
new ParallelFileMerger(filename,cache);
424 if (R__NeedInitialMerge(transient)) {
425 info->InitialMerge(transient);
427 info->RegisterClient(clientId,transient);
428 if (info->NeedMerge(clientThreshold)) {
430 Info(
"fastMergeServerHist",
"Merging input from %ld clients (%d)",info->fClients.size(),clientId);
437 printf(
"*** Unexpected message ***\n");
443 TIter next(&mergers);
444 ParallelFileMerger *info;
445 while ( (info = (ParallelFileMerger*)next()) ) {
446 if (info->NeedFinalMerge())
454 remove(strSocketPath.c_str());
int Int_t
Signed integer 4 bytes (int).
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
unsigned long ULong_t
Unsigned long integer 4 bytes (unsigned long). Size depends on architecture.
bool Bool_t
Boolean (0=false, 1=true) (bool).
double Double_t
Double 8 bytes.
long long Long64_t
Portable signed long integer 8 bytes.
float Float_t
Float 4 bytes (float).
#define ClassDefOverride(name, id)
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
void Clear(Option_t *option="") override
Clear the value.
UInt_t CountBits(UInt_t startBit=0) const
Return number of bits set to 1 starting at bit startBit.
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
void ReadTString(TString &s) override
Read TString from TBuffer.
char * ReadString(char *s, Int_t max) override
Read string from I/O buffer.
void ReadInt(Int_t &i) override
void ReadLong64(Long64_t &l) override
void SetBufferOffset(Int_t offset=0)
TClass instances represent classes, structs and namespaces in the ROOT type system.
ROOT::ResetAfterMergeFunc_t GetResetAfterMerge() const
Return the wrapper around Merge.
Bool_t InheritsFrom(const char *cl) const override
Return kTRUE if this class inherits from a class with name "classname".
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
Describe directory structure in memory.
virtual TList * GetList() const
virtual TDirectory * GetDirectory(const char *namecycle, Bool_t printError=false, const char *funcname="GetDirectory")
Find a directory using apath.
virtual TFile * GetFile() const
virtual TKey * GetKey(const char *, Short_t=9999) const
virtual void SaveSelf(Bool_t=kFALSE)
virtual TDirectory * mkdir(const char *name, const char *title="", Bool_t returnExistingDirectory=kFALSE)
Create a sub-directory "a" or a hierarchy of sub-directories "a/b/c/...".
virtual TList * GetListOfKeys() const
virtual Bool_t OutputFile(const char *url, Bool_t force)
Open merger output file.
TFile * GetOutputFile() const
virtual Bool_t AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
Add the TFile to this file merger and give ownership of the TFile to this object (unless kFALSE is re...
void SetPrintLevel(Int_t level)
@ kIncremental
Merge the input file with the content of the output file (if already existing).
@ kResetable
Only the objects with a MergeAfterReset member function.
@ kAllIncremental
Merge incrementally all type of objects.
virtual Bool_t PartialMerge(Int_t type=kAll|kIncremental)
Merge the files.
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsize=0) override
Write memory objects to this file.
void SumBuffer(Int_t bufsize)
Increment statistics for buffer sizes of objects in this file.
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
THashTable implements a hash table to store TObject's.
void Add(TObject *obj) override
Add object to the hash table.
TObject * FindObject(const char *name) const override
Find object using its name.
void Delete(Option_t *option="") override
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
void Delete(Option_t *option="") override
Delete an object from the file.
virtual const char * GetClassName() const
virtual TObject * ReadObj()
To read a TObject* from the file.
virtual Int_t WriteFile(Int_t cycle=1, TFile *f=nullptr)
Write the encoded object supported by this key.
TObject * FindObject(const char *name) const override
Find an object in this list using its name.
TObject * Remove(TObject *obj) override
Remove object from the list.
A TMemFile is like a normal TFile except that it reads and writes only from memory.
TClass * GetClass() const
TSocket * Select()
Return pointer to socket for which an event is waiting.
virtual void Add(TSocket *sock, Int_t interest=kRead)
Add socket to the monitor's active list.
Int_t GetActive(Long_t timeout=-1) const
Return number of sockets in the active list.
virtual void Remove(TSocket *sock)
Remove a socket from the monitor.
const char * GetName() const override
Returns name of object.
Mother of all ROOT objects.
Bool_t TestBit(UInt_t f) const
virtual const char * GetName() const
Returns name of object.
virtual ULong_t Hash() const
Return hash value for this object.
This class implements server sockets.
This class implements client sockets.
UInt_t GetBytesRecv() const
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
UInt_t GetBytesSent() const
virtual void Close(Option_t *opt="")
Close the socket.
TClass * IsA() const override
virtual Bool_t IsValid() const
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
std::string_view View() const
UInt_t Hash(ECaseCompare cmp=kExact) const
Return hash value.
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Double_t AsDouble() const
Double_t Sqrt(Double_t x)
Returns the square root of x.
BVH_ALWAYS_INLINE T length(const Vec< T, N > &v)
static uint64_t sum(uint64_t i)