97 MPI_Finalized(&finalized);
128 std::vector<char> buffer(0);
136 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG,
fSubComm, &status);
140 MPI_Get_count(&status, MPI_CHAR, &number_bytes);
141 buffer.resize(number_bytes);
142 char *buf = buffer.data();
144 Int_t source = status.MPI_SOURCE;
145 Int_t tag = status.MPI_TAG;
148 MPI_Recv(buf, number_bytes, MPI_CHAR, source, tag,
fSubComm, MPI_STATUS_IGNORE);
151 if (number_bytes == 0) {
157 Error(
"RunCollector",
"Failed to create TMemFile from buffer");
172 if (
info->NeedInitialMerge(transient)) {
173 info->InitialMerge(transient);
177 info->RegisterClient(client_Id, transient);
200 Error(
"ParallelFileMerger",
"Cannot recreate the output file");
202 fMerger.GetOutputFile()->SetCompressionSettings(compression_settings);
216 delete client.GetFile();
224 TIter nextkey(
dir->GetListOfKeys());
226 while ((key = (
TKey *)nextkey())) {
237 todelete = (0 != cl->GetResetAfterMerge());
239 todelete = (0 == cl->GetResetAfterMerge());
243 dir->GetListOfKeys()->Remove(key);
254 TIter nextkey(
dir->GetListOfKeys());
256 while ((key = (
TKey *)nextkey())) {
267 if (0 != cl->GetResetAfterMerge()) {
315 if (
file->IsZombie()) {
316 Error(
"Merge",
"output file unavailable");
341 if (
fClients.size() < clientID + 1) {
367 sum2 +=
fClients[
c].GetTimeSincePrevContact() *
fClients[
c].GetTimeSincePrevContact();
398 Error(
"CreateBufferAndSend",
" should not be called by a collector");
456 MPI_Finalized(&finalized);
470 std::string _filename = this->
GetName();
472 ULong_t found = _filename.rfind(
".root");
473 if (found != std::string::npos) {
474 _filename.resize(found);
489 Error(
"CheckSplitLevel",
"At least one collector is required instead of %d",
fSplitLevel);
502 MPI_Initialized(&flag);
504 MPI_Init(
nullptr,
nullptr);
512 "Number of Output File is larger than number of Processors Allocated."
513 " Number of processors should be two times larger than outpts. For %d outputs at least %d "
514 "should be allocated instead of %d",
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
int Int_t
Signed integer 4 bytes (int).
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
unsigned long ULong_t
Unsigned long integer 4 bytes (unsigned long). Size depends on architecture.
bool Bool_t
Boolean (0=false, 1=true) (bool).
double Double_t
Double 8 bytes.
long long Long64_t
Portable signed long integer 8 bytes.
float Float_t
Float 4 bytes (float).
const char Option_t
Option string (const char).
Error("WriteTObject","The current directory (%s) is not associated with a file. The object (%s) has not been written.", GetName(), objname)
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
TClass instances represent classes, structs and namespaces in the ROOT type system.
static TClass * GetClass(const char *name, Bool_t load=kTRUE, Bool_t silent=kFALSE)
Static method returning pointer to TClass of the specified class name.
TFile * GetFile() const override
Describe directory structure in memory.
virtual void Close(Option_t *option="")
Delete all objects from memory and directory structure itself.
A cache when writing files over the network.
A class to pass information from the TFileMerger to the objects being merged.
@ kIncremental
Merge the input file with the content of the output file (if already existing).
@ kKeepCompression
Keep compression level unchanged for each input files.
@ kResetable
Only the objects with a MergeAfterReset member function.
@ kAllIncremental
Merge incrementally all type of objects.
Int_t GetCompressionSettings() const
virtual Bool_t IsOpen() const
Returns kTRUE in case file is open and kFALSE if file is not open.
virtual void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Used to specify the compression level and algorithm.
TFile(const TFile &)=delete
virtual Long64_t GetEND() const
static TFile * Open(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Int_t netopt=0)
Create / open a file.
THashTable implements a hash table to store TObject's.
void Add(TObject *obj) override
Add object to the hash table.
TObject * FindObject(const char *name) const override
Find object using its name.
void Delete(Option_t *option="") override
Remove all objects from the table AND delete all heap based objects.
Book space in a file, create I/O buffers, to fill them, (un)compress them.
void Delete(Option_t *option="") override
Delete an object from the file.
virtual const char * GetClassName() const
virtual TObject * ReadObj()
To read a TObject* from the file.
void CheckSplitLevel()
Checks that the split level is more than one.
void CreateEmptyBufferAndSend()
For Workers: Creates an empty buffer and sends it to the Collector.
~TMPIFile() override
TMPIFile destructor.
void CreateBufferAndSend()
Called by the Workers only: Copies the current content in memory and sends it asynchronously to the C...
Bool_t IsReceived()
Checks the member MPI_REQEUST object to see if a message has been received.
void UpdateEndProcess()
As worker ranks exit, they send the collector empty messages.
Bool_t IsCollector()
return True if this is the Collector rank, otherwise False
void SetOutputName()
Called by the Collector only: edits the input filename from the user to append the rank ID of the Col...
void RunCollector(Bool_t cache=kFALSE)
This is the core of the Collector rank which listens for incoming messages from Worker ranks.
void Close(Option_t *option="") final
Closes the file.
TMPIFile(const char *name, char *buffer, Long64_t size=0, Option_t *option="", Int_t split=1, const char *ftitle="", Int_t compress=4)
TMPIFile constructor.
void Sync()
Called by the Workers only: Called periodically by workers and triggers the sending of data to the Co...
void SplitMPIComm()
Called by all ranks to create the sub communicators (if more than one rank).
TMemFile(const char *name, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault, Long64_t defBlockSize=0LL)
Usual Constructor.
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at 'to' and of length at...
void ResetAfterMerge(TFileMergeInfo *) override
Wipe all the data from the permanent buffer but keep, the in-memory object alive.
const char * GetName() const override
Returns name of object.
The TTimeStamp encapsulates seconds and ns since EPOCH.
Double_t AsDouble() const
Double_t result(Double_t *x, Double_t *par)
Double_t Sqrt(Double_t x)
Returns the square root of x.
Bool_t NeedMerge(Float_t clientThreshold)
Return true, if enough client have reported.
void RegisterClient(UInt_t clientID, TFile *file)
Register that a client has sent a file.
Bool_t Merge()
Merge the current inputs into the output file.
Bool_t InitialMerge(TFile *input)
Initial merge of the input to copy the resetable object (TTree) into the output and remove them from ...
~ParallelFileMerger() override
Deconstructor for ParallelFileMerger class.
ParallelFileMerger(const char *filename, Int_t compression_settings, Bool_t writeCache=kFALSE)
Constructor for ParallelFileMerger class.
static void DeleteObject(TDirectory *dir, Bool_t withReset)
static Bool_t NeedInitialMerge(TDirectory *dir)
static uint64_t sum(uint64_t i)