Logo ROOT   6.07/09
Reference Guide
TParallelMergingFile.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Philippe Canal October 2011.
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TParallelMergingFile //
15 // //
16 // Specialization of TMemFile to connect to a parallel file merger. //
17 // Upon a call to UploadAndReset, the content already written to the //
18 // file is upload to the server and the object implementing the function//
19 // ResetAfterMerge (like TTree) are reset. //
20 // The parallel file merger will then collate the information coming //
21 // from this client and any other client in to the file described by //
22 // the filename of this object. //
23 // //
24 //////////////////////////////////////////////////////////////////////////
25 
26 #include "TParallelMergingFile.h"
27 #include "TSocket.h"
28 #include "TArrayC.h"
29 
30 ////////////////////////////////////////////////////////////////////////////////
31 /// Constructor.
32 /// We do no yet open any connection to the server. This will be done at the
33 /// time the first upload will be requested.
34 
35 TParallelMergingFile::TParallelMergingFile(const char *filename, Option_t *option /* = "" */,
36  const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
37  TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
38 {
39  TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40  if (serverurl.Length()) {
41  serverurl.ReplaceAll("pmerge=","pmerge://");
42  fServerLocation = TUrl(serverurl);
43  }
44 }
45 
46 ////////////////////////////////////////////////////////////////////////////////
47 /// Destructor.
48 
50 {
51  // We need to call Close, right here so that it is executed _before_
52  // the data member of TParallelMergingFile are destructed.
53  Close();
54  delete fClassSent;
55 }
56 
57 ////////////////////////////////////////////////////////////////////////////////
58 
60 {
61  TMemFile::Close(option);
62  if (fSocket) {
63  if (0==fSocket->Send("Finished")) { // tell server we are finished
64  Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
65  }
66  fSocket->Close();
67  delete fSocket;
68  }
69  fSocket = 0;
70 }
71 
72 ////////////////////////////////////////////////////////////////////////////////
73 /// Upload the current file data to the merging server.
74 /// Reset the file and return true in case of success.
75 
77 {
78  // Open connection to server
79  if (fSocket == 0) {
80  const char *host = fServerLocation.GetHost();
81  Int_t port = fServerLocation.GetPort();
82  if (host == 0 || host[0] == '\0') {
83  host = "localhost";
84  }
85  if (port <= 0) {
86  port = 1095;
87  }
88  fSocket = new TSocket(host,port);
89  if (!fSocket->IsValid()) {
90  Error("UploadAndReset","Could not contact the server %s:%d\n",host,port);
91  delete fSocket;
92  fSocket = 0;
93  return kFALSE;
94  }
95  // Wait till we get the start message
96  // server tells us who we are
97  Int_t kind;
98  Int_t n = fSocket->Recv(fServerIdx, kind);
99 
100  if (n < 0 && kind != 0 /* kStartConnection */)
101  {
102  Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
103  delete fSocket;
104  fSocket = 0;
105  return kTRUE;
106  }
107  n = fSocket->Recv(fServerVersion, kind);
108  if (n < 0 && kind != 1 /* kProtocol */)
109  {
110  Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
111  } else {
112  Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
113  }
115  }
116 
117  fMessage.Reset(kMESS_ANY); // re-use TMessage object
121  CopyTo(fMessage);
122 
123  // FIXME: CXX17: Use init-statement in if to declare `error` variable
124  int error;
125  if ((error = fSocket->Send(fMessage)) <= 0) {
126  Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
127  delete fSocket;
128  fSocket = 0;
129  return kFALSE;
130  }
131 
132  // Record the StreamerInfo we sent over.
133  Int_t isize = fClassIndex->GetSize();
134  if (!fClassSent) {
135  fClassSent = new TArrayC(isize);
136  } else {
137  if (isize > fClassSent->GetSize()) {
138  fClassSent->Set(isize);
139  }
140  }
141  for(Int_t c = 0; c < isize; ++c) {
142  if (fClassIndex->fArray[c]) {
143  fClassSent->fArray[c] = 1;
144  }
145  }
146  ResetAfterMerge(0);
147 
148  return kTRUE;
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Write memory objects to this file and upload them to the parallel merge server.
153 /// Then reset all the resetable object (those with a ResetAfterMerge routine,
154 /// like TTree).
155 ///
156 /// Loop on all objects in memory (including subdirectories).
157 /// A new key is created in the KEYS linked list for each object.
158 /// The list of keys is then saved on the file (via WriteKeys)
159 /// as a single data record.
160 /// For values of opt see TObject::Write().
161 /// The directory header info is rewritten on the directory header record.
162 /// The linked list of FREE segments is written.
163 /// The file header is written (bytes 1->fBEGIN).
164 
165 Int_t TParallelMergingFile::Write(const char *, Int_t opt, Int_t bufsiz)
166 {
167  Int_t nbytes = TMemFile::Write(0,opt,bufsiz);
168  if (nbytes) {
169  UploadAndReset();
170  }
171  return nbytes;
172 }
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 /// One can not save a const TDirectory object.
176 
177 Int_t TParallelMergingFile::Write(const char *n, Int_t opt, Int_t bufsize) const
178 {
179  Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
180  return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
181 }
182 
183 ////////////////////////////////////////////////////////////////////////////////
184 /// Write the list of TStreamerInfo as a single object in this file
185 /// The class Streamer description for all classes written to this file
186 /// is saved. See class TStreamerInfo.
187 
189 {
190  if (!fWritable) return;
191  if (!fClassIndex) return;
192  //no need to update the index if no new classes added to the file
193  if (fClassIndex->fArray[0] == 0) return;
194 
195  // clear fClassIndex for anything we already sent.
196  if (fClassSent) {
197  Int_t isize = fClassIndex->GetSize();
198  Int_t ssize = fClassSent->GetSize();
199  for(Int_t c = 0; c < isize && c < ssize; ++c) {
200  if (fClassSent->fArray[c]) {
201  fClassIndex->fArray[c] = 0;
202  }
203  }
204  }
205 
207 }
const char * GetHost() const
Definition: TUrl.h:76
void ResetAfterMerge(TFileMergeInfo *)
Wipe all the data from the permanent buffer but keep, the in-memory object alive. ...
Definition: TMemFile.cxx:292
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at &#39;to&#39; and of length at...
Definition: TMemFile.cxx:227
void Set(Int_t n)
Set size of this array to n chars.
Definition: TArrayC.cxx:105
Ssiz_t Length() const
Definition: TString.h:390
TArrayC * fClassIndex
!Index of TStreamerInfo classes written to this file
Definition: TFile.h:86
return c
const char Option_t
Definition: RtypesCore.h:62
virtual Bool_t IsValid() const
Definition: TSocket.h:162
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
This class represents a WWW compatible URL.
Definition: TUrl.h:41
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:899
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual Long64_t GetEND() const
Definition: TFile.h:200
const char * GetOptions() const
Definition: TUrl.h:80
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
virtual void Fatal(const char *method, const char *msgfmt,...) const
Issue fatal error message.
Definition: TObject.cxx:953
TUrl fUrl
!URL of file
Definition: TFile.h:102
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:925
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
virtual void WriteTString(const TString &s)
Write TString to TBuffer.
void Reset()
Reset the message buffer so we can use (i.e. fill) it again.
Definition: TMessage.cxx:171
Int_t GetPort() const
Definition: TUrl.h:87
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:388
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
Definition: TFile.cxx:3581
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file.
Definition: TFile.cxx:2268
TParallelMergingFile(const char *filename, Option_t *option="", const char *ftitle="", Int_t compress=1)
Constructor.
~TParallelMergingFile()
Destructor.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
virtual void WriteInt(Int_t i)
Definition: TBufferFile.h:364
Bool_t UploadAndReset()
Upload the current file data to the merging server.
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition: TMessage.cxx:116
Bool_t fWritable
True if directory is writable.
Int_t GetSize() const
Definition: TArray.h:49
virtual void Close(Option_t *option="")
Close a file.
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file and upload them to the parallel merge server.
virtual void WriteLong64(Long64_t l)
Definition: TBufferFile.h:392
Char_t * fArray
Definition: TArrayC.h:32
const Bool_t kTRUE
Definition: Rtypes.h:91
const Int_t n
Definition: legend1.C:16
virtual void Close(Option_t *option="")
Close a file.
Definition: TFile.cxx:904
Array of chars or bytes (8 bits per element).
Definition: TArrayC.h:29
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:911