Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TParallelMergingFile.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Philippe Canal October 2011.
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TParallelMergingFile //
15// //
16// Specialization of TMemFile to connect to a parallel file merger. //
17// Upon a call to UploadAndReset, the content already written to the //
18// file is upload to the server and the object implementing the function//
19// ResetAfterMerge (like TTree) are reset. //
20// The parallel file merger will then collate the information coming //
21// from this client and any other client in to the file described by //
22// the filename of this object. //
23// //
24//////////////////////////////////////////////////////////////////////////
25
27#include "TSocket.h"
28#include "TArrayC.h"
29
30////////////////////////////////////////////////////////////////////////////////
31/// Constructor.
32/// We do no yet open any connection to the server. This will be done at the
33/// time the first upload will be requested.
34
36 const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
37 TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
38{
39 TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40 if (serverurl.Length()) {
41 serverurl.ReplaceAll("pmerge=","pmerge://");
43 }
44}
45
46////////////////////////////////////////////////////////////////////////////////
47/// Destructor.
48
50{
51 // We need to call Close, right here so that it is executed _before_
52 // the data member of TParallelMergingFile are destructed.
53 Close();
54 delete fClassSent;
55}
56
57////////////////////////////////////////////////////////////////////////////////
58
60{
62 if (fSocket) {
63 if (0==fSocket->Send("Finished")) { // tell server we are finished
64 Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
65 }
66 fSocket->Close();
67 delete fSocket;
68 }
69 fSocket = 0;
70}
71
72////////////////////////////////////////////////////////////////////////////////
73/// Upload the current file data to the merging server.
74/// Reset the file and return true in case of success.
75
77{
78 // Open connection to server
79 if (fSocket == 0) {
80 const char *path = fServerLocation.GetFile();
81 if (path && strlen(path) > 0 && path[0] == '/') {
82 // UNIX domain socket
83 fSocket = new TSocket(path);
84 if (!fSocket->IsValid()) {
85 Error("UploadAndReset", "Could not contact the server %s\n", path);
86 delete fSocket;
87 fSocket = 0;
88 return kFALSE;
89 }
90 } else {
91 // TCP socket
92 const char *host = fServerLocation.GetHost();
94 if (host == 0 || host[0] == '\0') {
95 host = "localhost";
96 }
97 if (port <= 0) {
98 port = 1095;
99 }
100 fSocket = new TSocket(host, port);
101 if (!fSocket->IsValid()) {
102 Error("UploadAndReset", "Could not contact the server %s:%d\n", host, port);
103 delete fSocket;
104 fSocket = 0;
105 return kFALSE;
106 }
107 }
108 // Wait till we get the start message
109 // server tells us who we are
110 Int_t kind;
111 Int_t n = fSocket->Recv(fServerIdx, kind);
112
113 if (n < 0 && kind != 0 /* kStartConnection */)
114 {
115 Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
116 delete fSocket;
117 fSocket = 0;
118 return kTRUE;
119 }
120 n = fSocket->Recv(fServerVersion, kind);
121 if (n < 0 && kind != 1 /* kProtocol */)
122 {
123 Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
124 } else {
125 Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
126 }
128 }
129
130 fMessage.Reset(kMESS_ANY); // re-use TMessage object
135
136 // FIXME: CXX17: Use init-statement in if to declare `error` variable
137 int error;
138 if ((error = fSocket->Send(fMessage)) <= 0) {
139 Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
140 delete fSocket;
141 fSocket = 0;
142 return kFALSE;
143 }
144
145 // Record the StreamerInfo we sent over.
147 if (!fClassSent) {
148 fClassSent = new TArrayC(isize);
149 } else {
150 if (isize > fClassSent->GetSize()) {
152 }
153 }
154 for(Int_t c = 0; c < isize; ++c) {
155 if (fClassIndex->fArray[c]) {
156 fClassSent->fArray[c] = 1;
157 }
158 }
160
161 return kTRUE;
162}
163
164////////////////////////////////////////////////////////////////////////////////
165/// Write memory objects to this file and upload them to the parallel merge server.
166/// Then reset all the resetable object (those with a ResetAfterMerge routine,
167/// like TTree).
168///
169/// Loop on all objects in memory (including subdirectories).
170/// A new key is created in the KEYS linked list for each object.
171/// The list of keys is then saved on the file (via WriteKeys)
172/// as a single data record.
173/// For values of opt see TObject::Write().
174/// The directory header info is rewritten on the directory header record.
175/// The linked list of FREE segments is written.
176/// The file header is written (bytes 1->fBEGIN).
177
179{
181 if (nbytes) {
183 }
184 return nbytes;
185}
186
187////////////////////////////////////////////////////////////////////////////////
188/// One can not save a const TDirectory object.
189
191{
192 Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
193 return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
194}
195
196////////////////////////////////////////////////////////////////////////////////
197/// Write the list of TStreamerInfo as a single object in this file
198/// The class Streamer description for all classes written to this file
199/// is saved. See class TStreamerInfo.
200
202{
203 if (!fWritable) return;
204 if (!fClassIndex) return;
205 //no need to update the index if no new classes added to the file
206 if (fClassIndex->fArray[0] == 0) return;
207
208 // clear fClassIndex for anything we already sent.
209 if (fClassSent) {
212 for(Int_t c = 0; c < isize && c < ssize; ++c) {
213 if (fClassSent->fArray[c]) {
214 fClassIndex->fArray[c] = 0;
215 }
216 }
217 }
218
220}
@ kMESS_ANY
@ kMESS_OBJECT
#define c(i)
Definition RSha256.hxx:101
constexpr Bool_t kFALSE
Definition RtypesCore.h:94
constexpr Bool_t kTRUE
Definition RtypesCore.h:93
const char Option_t
Definition RtypesCore.h:66
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:185
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:229
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:244
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Array of chars or bytes (8 bits per element).
Definition TArrayC.h:27
Char_t * fArray
Definition TArrayC.h:30
void Set(Int_t n) override
Set size of this array to n chars.
Definition TArrayC.cxx:105
Int_t GetSize() const
Definition TArray.h:47
void WriteLong64(Long64_t l) override
void WriteTString(const TString &s) override
Write TString to TBuffer.
void WriteInt(Int_t i) override
Bool_t fWritable
True if directory is writable.
virtual void Close(Option_t *option="")
Delete all objects from memory and directory structure itself.
virtual Int_t Write(const char *=nullptr, Int_t=0, Int_t=0) override
Write this object to the current directory.
Definition TDirectory.h:264
TArrayC * fClassIndex
!Index of TStreamerInfo classes written to this file
Definition TFile.h:173
TUrl fUrl
!URL of file
Definition TFile.h:189
virtual Long64_t GetEND() const
Definition TFile.h:310
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
Definition TFile.cxx:3833
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:19
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at 'to' and of length at...
Definition TMemFile.cxx:255
void ResetAfterMerge(TFileMergeInfo *) override
Wipe all the data from the permanent buffer but keep, the in-memory object alive.
Definition TMemFile.cxx:320
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:121
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:184
const char * GetName() const override
Returns name of object.
Definition TNamed.h:47
TParallelMergingFile(const char *filename, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Constructor.
void WriteStreamerInfo() override
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
void Close(Option_t *option="") override
Close a file.
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsiz=0) override
Write memory objects to this file and upload them to the parallel merge server.
Bool_t UploadAndReset()
Upload the current file data to the merging server.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:818
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:389
virtual Bool_t IsValid() const
Definition TSocket.h:132
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:522
Basic string class.
Definition TString.h:139
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetFile() const
Definition TUrl.h:69
const char * GetHost() const
Definition TUrl.h:67
const char * GetOptions() const
Definition TUrl.h:71
Int_t GetPort() const
Definition TUrl.h:78
std::ostream & Info()
Definition hadd.cxx:171
const Int_t n
Definition legend1.C:16