ROOT  6.07/01
Reference Guide
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
TParallelMergingFile.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Philippe Canal October 2011.
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TParallelMergingFile //
15 // //
16 // Specialization of TMemFile to connect to a parallel file merger. //
17 // Upon a call to UploadAndReset, the content already written to the //
18 // file is upload to the server and the object implementing the function//
19 // ResetAfterMerge (like TTree) are reset. //
20 // The parallel file merger will then collate the information coming //
21 // from this client and any other client in to the file described by //
22 // the filename of this object. //
23 // //
24 //////////////////////////////////////////////////////////////////////////
25 
26 #include "TParallelMergingFile.h"
27 #include "TSocket.h"
28 #include "TArrayC.h"
29 
30 ////////////////////////////////////////////////////////////////////////////////
31 /// Constructor.
32 /// We do no yet open any connection to the server. This will be done at the
33 /// time the first upload will be requested.
34 
36  const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
37  TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
38 {
39  TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40  if (serverurl.Length()) {
41  serverurl.ReplaceAll("pmerge=","pmerge://");
42  fServerLocation = TUrl(serverurl);
43  }
44 }
45 
46 ////////////////////////////////////////////////////////////////////////////////
47 /// Destructor.
48 
50 {
51  // We need to call Close, right here so that it is executed _before_
52  // the data member of TParallelMergingFile are destructed.
53  Close();
54  delete fClassSent;
55 }
56 
57 ////////////////////////////////////////////////////////////////////////////////
58 
60 {
61  TMemFile::Close(option);
62  if (fSocket) {
63  if (0==fSocket->Send("Finished")) { // tell server we are finished
64  Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
65  }
66  fSocket->Close();
67  delete fSocket;
68  }
69  fSocket = 0;
70 }
71 
72 ////////////////////////////////////////////////////////////////////////////////
73 /// Upload the current file data to the merging server.
74 /// Reset the file and return true in case of success.
75 
77 {
78  // Open connection to server
79  if (fSocket == 0) {
80  const char *host = fServerLocation.GetHost();
81  Int_t port = fServerLocation.GetPort();
82  if (host == 0 || host[0] == '\0') {
83  host = "localhost";
84  }
85  if (port <= 0) {
86  port = 1095;
87  }
88  fSocket = new TSocket(host,port);
89  if (!fSocket->IsValid()) {
90  Error("UploadAndReset","Could not contact the server %s:%d\n",host,port);
91  delete fSocket;
92  fSocket = 0;
93  return kFALSE;
94  }
95  // Wait till we get the start message
96  // server tells us who we are
97  Int_t kind;
98  Int_t n = fSocket->Recv(fServerIdx, kind);
99 
100  if (n < 0 && kind != 0 /* kStartConnection */)
101  {
102  Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
103  delete fSocket;
104  fSocket = 0;
105  return kTRUE;
106  }
107  n = fSocket->Recv(fServerVersion, kind);
108  if (n < 0 && kind != 1 /* kProtocol */)
109  {
110  Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
111  } else {
112  Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
113  }
115  }
116 
117  fMessage.Reset(kMESS_ANY); // re-use TMessage object
121  CopyTo(fMessage);
122 
123  if (int error = fSocket->Send(fMessage) <= 0) {
124  Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
125  delete fSocket;
126  fSocket = 0;
127  return kFALSE;
128  }
129 
130  // Record the StreamerInfo we sent over.
131  Int_t isize = fClassIndex->GetSize();
132  if (!fClassSent) {
133  fClassSent = new TArrayC(isize);
134  } else {
135  if (isize > fClassSent->GetSize()) {
136  fClassSent->Set(isize);
137  }
138  }
139  for(Int_t c = 0; c < isize; ++c) {
140  if (fClassIndex->fArray[c]) {
141  fClassSent->fArray[c] = 1;
142  }
143  }
144  ResetAfterMerge(0);
145 
146  return kTRUE;
147 }
148 
149 ////////////////////////////////////////////////////////////////////////////////
150 /// Write memory objects to this file and upload them to the parallel merge server.
151 /// Then reset all the resetable object (those with a ResetAfterMerge routine,
152 /// like TTree).
153 ///
154 /// Loop on all objects in memory (including subdirectories).
155 /// A new key is created in the KEYS linked list for each object.
156 /// The list of keys is then saved on the file (via WriteKeys)
157 /// as a single data record.
158 /// For values of opt see TObject::Write().
159 /// The directory header info is rewritten on the directory header record.
160 /// The linked list of FREE segments is written.
161 /// The file header is written (bytes 1->fBEGIN).
162 
163 Int_t TParallelMergingFile::Write(const char *, Int_t opt, Int_t bufsiz)
164 {
165  Int_t nbytes = TMemFile::Write(0,opt,bufsiz);
166  if (nbytes) {
167  UploadAndReset();
168  }
169  return nbytes;
170 }
171 
172 ////////////////////////////////////////////////////////////////////////////////
173 /// One can not save a const TDirectory object.
174 
175 Int_t TParallelMergingFile::Write(const char *n, Int_t opt, Int_t bufsize) const
176 {
177  Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
178  return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
179 }
180 
181 ////////////////////////////////////////////////////////////////////////////////
182 /// Write the list of TStreamerInfo as a single object in this file
183 /// The class Streamer description for all classes written to this file
184 /// is saved. See class TStreamerInfo.
185 
187 {
188  if (!fWritable) return;
189  if (!fClassIndex) return;
190  //no need to update the index if no new classes added to the file
191  if (fClassIndex->fArray[0] == 0) return;
192 
193  // clear fClassIndex for anything we already sent.
194  if (fClassSent) {
195  Int_t isize = fClassIndex->GetSize();
196  Int_t ssize = fClassSent->GetSize();
197  for(Int_t c = 0; c < isize && c < ssize; ++c) {
198  if (fClassSent->fArray[c]) {
199  fClassIndex->fArray[c] = 0;
200  }
201  }
202  }
203 
205 }
const char * GetHost() const
Definition: TUrl.h:76
void ResetAfterMerge(TFileMergeInfo *)
Wipe all the data from the permanent buffer but keep, the in-memory object alive. ...
Definition: TMemFile.cxx:292
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at 'to' and of length at...
Definition: TMemFile.cxx:227
void Set(Int_t n)
Set size of this array to n chars.
Definition: TArrayC.cxx:105
Ssiz_t Length() const
Definition: TString.h:390
TArrayC * fClassIndex
!Index of TStreamerInfo classes written to this file
Definition: TFile.h:81
return c
const char Option_t
Definition: RtypesCore.h:62
virtual Bool_t IsValid() const
Definition: TSocket.h:162
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TSocket.cxx:520
This class represents a WWW compatible URL.
Definition: TUrl.h:41
TString & ReplaceAll(const TString &s1, const TString &s2)
Definition: TString.h:635
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TSocket.cxx:818
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
static const char * filename()
Basic string class.
Definition: TString.h:137
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kFALSE
Definition: Rtypes.h:92
virtual Long64_t GetEND() const
Definition: TFile.h:191
const char * GetOptions() const
Definition: TUrl.h:80
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition: TMemFile.h:19
virtual void Fatal(const char *method, const char *msgfmt,...) const
Issue fatal error message.
Definition: TObject.cxx:946
TUrl fUrl
!URL of file
Definition: TFile.h:97
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
virtual void WriteTString(const TString &s)
Write TString to TBuffer.
void Reset()
Reset the message buffer so we can use (i.e. fill) it again.
Definition: TMessage.cxx:171
Int_t GetPort() const
Definition: TUrl.h:87
virtual void Close(Option_t *opt="")
Close the socket.
Definition: TSocket.cxx:388
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
Definition: TFile.cxx:3561
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file.
Definition: TFile.cxx:2248
TParallelMergingFile(const char *filename, Option_t *option="", const char *ftitle="", Int_t compress=1)
Constructor.
~TParallelMergingFile()
Destructor.
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
virtual void WriteInt(Int_t i)
Definition: TBufferFile.h:364
Bool_t UploadAndReset()
Upload the current file data to the merging server.
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition: TMessage.cxx:116
Bool_t fWritable
True if directory is writable.
Int_t GetSize() const
Definition: TArray.h:49
virtual void Close(Option_t *option="")
Close a file.
virtual Int_t Write(const char *name=0, Int_t opt=0, Int_t bufsiz=0)
Write memory objects to this file and upload them to the parallel merge server.
virtual void WriteLong64(Long64_t l)
Definition: TBufferFile.h:392
Char_t * fArray
Definition: TArrayC.h:32
const Bool_t kTRUE
Definition: Rtypes.h:91
const Int_t n
Definition: legend1.C:16
virtual void Close(Option_t *option="")
Close a file.
Definition: TFile.cxx:898
Array of chars or bytes (8 bits per element).
Definition: TArrayC.h:29
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904