Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TParallelMergingFile.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Philippe Canal October 2011.
3
4/*************************************************************************
5 * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TParallelMergingFile //
15// //
16// Specialization of TMemFile to connect to a parallel file merger. //
17// Upon a call to UploadAndReset, the content already written to the //
18// file is upload to the server and the object implementing the function//
19// ResetAfterMerge (like TTree) are reset. //
20// The parallel file merger will then collate the information coming //
21// from this client and any other client in to the file described by //
22// the filename of this object. //
23// //
24//////////////////////////////////////////////////////////////////////////
25
27#include "TSocket.h"
28#include "TArrayC.h"
29
30////////////////////////////////////////////////////////////////////////////////
31/// Constructor.
32/// We do no yet open any connection to the server. This will be done at the
33/// time the first upload will be requested.
34
36 const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
37 TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
38{
39 TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40 if (serverurl.Length()) {
41 serverurl.ReplaceAll("pmerge=","pmerge://");
43 }
44}
45
46////////////////////////////////////////////////////////////////////////////////
47/// Destructor.
48
50{
51 // We need to call Close, right here so that it is executed _before_
52 // the data member of TParallelMergingFile are destructed.
53 Close();
54 delete fClassSent;
55}
56
57////////////////////////////////////////////////////////////////////////////////
58
60{
62 if (fSocket) {
63 if (0==fSocket->Send("Finished")) { // tell server we are finished
64 Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
65 }
66 fSocket->Close();
67 delete fSocket;
68 }
69 fSocket = 0;
70}
71
72////////////////////////////////////////////////////////////////////////////////
73/// Upload the current file data to the merging server.
74/// Reset the file and return true in case of success.
75
77{
78 // Open connection to server
79 if (fSocket == 0) {
80 const char *path = fServerLocation.GetFile();
81 if (path && strlen(path) > 0 && path[0] == '/') {
82 // UNIX domain socket
83 fSocket = new TSocket(path);
84 if (!fSocket->IsValid()) {
85 Error("UploadAndReset", "Could not contact the server %s\n", path);
86 delete fSocket;
87 fSocket = 0;
88 return kFALSE;
89 }
90 } else {
91 // TCP socket
92 const char *host = fServerLocation.GetHost();
94 if (host == 0 || host[0] == '\0') {
95 host = "localhost";
96 }
97 if (port <= 0) {
98 port = 1095;
99 }
100 fSocket = new TSocket(host, port);
101 if (!fSocket->IsValid()) {
102 Error("UploadAndReset", "Could not contact the server %s:%d\n", host, port);
103 delete fSocket;
104 fSocket = 0;
105 return kFALSE;
106 }
107 }
108 // Wait till we get the start message
109 // server tells us who we are
110 Int_t kind;
111 Int_t n = fSocket->Recv(fServerIdx, kind);
112
113 if (n < 0 && kind != 0 /* kStartConnection */)
114 {
115 Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
116 delete fSocket;
117 fSocket = 0;
118 return kTRUE;
119 }
120 n = fSocket->Recv(fServerVersion, kind);
121 if (n < 0 && kind != 1 /* kProtocol */)
122 {
123 Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
124 } else {
125 Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
126 }
128 }
129
130 fMessage.Reset(kMESS_ANY); // re-use TMessage object
135
136 if (int error = fSocket->Send(fMessage); error <= 0) {
137 Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
138 delete fSocket;
139 fSocket = 0;
140 return kFALSE;
141 }
142
143 // Record the StreamerInfo we sent over.
145 if (!fClassSent) {
146 fClassSent = new TArrayC(isize);
147 } else {
148 if (isize > fClassSent->GetSize()) {
150 }
151 }
152 for(Int_t c = 0; c < isize; ++c) {
153 if (fClassIndex->fArray[c]) {
154 fClassSent->fArray[c] = 1;
155 }
156 }
158
159 return kTRUE;
160}
161
162////////////////////////////////////////////////////////////////////////////////
163/// Write memory objects to this file and upload them to the parallel merge server.
164/// Then reset all the resetable object (those with a ResetAfterMerge routine,
165/// like TTree).
166///
167/// Loop on all objects in memory (including subdirectories).
168/// A new key is created in the KEYS linked list for each object.
169/// The list of keys is then saved on the file (via WriteKeys)
170/// as a single data record.
171/// For values of opt see TObject::Write().
172/// The directory header info is rewritten on the directory header record.
173/// The linked list of FREE segments is written.
174/// The file header is written (bytes 1->fBEGIN).
175
177{
178 std::size_t prevSize = GetBytesWritten();
179 auto nbytes = TMemFile::Write(0,opt,bufsize);
180 std::size_t newSize = GetBytesWritten();
181 // NOTE: we don't rely on nbytes > 0 to do UploadAndReset() because nbytes may be 0 if the file
182 // only contains non-TObject objects (as they are not written by TMemFile::Write).
183 if (newSize > prevSize) {
185 }
186 return nbytes;
187}
188
189////////////////////////////////////////////////////////////////////////////////
190/// One can not save a const TDirectory object.
191
193{
194 Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
195 return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
196}
197
198////////////////////////////////////////////////////////////////////////////////
199/// Write the list of TStreamerInfo as a single object in this file
200/// The class Streamer description for all classes written to this file
201/// is saved. See class TStreamerInfo.
202
204{
205 if (!fWritable) return;
206 if (!fClassIndex) return;
207 //no need to update the index if no new classes added to the file
208 if (fClassIndex->fArray[0] == 0) return;
209
210 // clear fClassIndex for anything we already sent.
211 if (fClassSent) {
214 for(Int_t c = 0; c < isize && c < ssize; ++c) {
215 if (fClassSent->fArray[c]) {
216 fClassIndex->fArray[c] = 0;
217 }
218 }
219 }
220
222}
@ kMESS_ANY
@ kMESS_OBJECT
#define c(i)
Definition RSha256.hxx:101
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
const char Option_t
Option string (const char)
Definition RtypesCore.h:80
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:241
void Error(const char *location, const char *msgfmt,...)
Use this function in case an error occurred.
Definition TError.cxx:208
void Warning(const char *location, const char *msgfmt,...)
Use this function in warning situations.
Definition TError.cxx:252
void Fatal(const char *location, const char *msgfmt,...)
Use this function in case of a fatal error. It will abort the program.
Definition TError.cxx:267
Option_t Option_t option
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
Array of chars or bytes (8 bits per element).
Definition TArrayC.h:27
Char_t * fArray
Definition TArrayC.h:30
void Set(Int_t n) override
Set size of this array to n chars.
Definition TArrayC.cxx:104
Int_t GetSize() const
Definition TArray.h:47
void WriteLong64(Long64_t l) override
void WriteTString(const TString &s) override
Write TString to TBuffer.
void WriteInt(Int_t i) override
Bool_t fWritable
True if directory is writable.
virtual void Close(Option_t *option="")
Delete all objects from memory and directory structure itself.
virtual Int_t Write(const char *=nullptr, Int_t=0, Int_t=0) override
Write this object to the current directory.
Definition TDirectory.h:265
TArrayC * fClassIndex
!Index of TStreamerInfo classes written to this file
Definition TFile.h:173
TUrl fUrl
!URL of file
Definition TFile.h:189
virtual Long64_t GetEND() const
Definition TFile.h:310
virtual void WriteStreamerInfo()
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
Definition TFile.cxx:3467
virtual Long64_t GetBytesWritten() const
Return the total number of bytes written so far to the file.
Definition TFile.cxx:4249
A TMemFile is like a normal TFile except that it reads and writes only from memory.
Definition TMemFile.h:27
virtual Long64_t CopyTo(void *to, Long64_t maxsize) const
Copy the binary representation of the TMemFile into the memory area starting at 'to' and of length at...
Definition TMemFile.cxx:254
void ResetAfterMerge(TFileMergeInfo *) override
Wipe all the data from the permanent buffer but keep, the in-memory object alive.
Definition TMemFile.cxx:339
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:120
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:183
const char * GetName() const override
Returns name of object.
Definition TNamed.h:49
Int_t Write(const char *name=nullptr, Int_t opt=0, Int_t bufsize=0) override
Write memory objects to this file and upload them to the parallel merge server.
TParallelMergingFile(const char *filename, Option_t *option="", const char *ftitle="", Int_t compress=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Constructor.
void WriteStreamerInfo() override
Write the list of TStreamerInfo as a single object in this file The class Streamer description for al...
void Close(Option_t *option="") override
Close a file.
Bool_t UploadAndReset()
Upload the current file data to the merging server.
virtual Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition TSocket.cxx:809
virtual void Close(Option_t *opt="")
Close the socket.
Definition TSocket.cxx:380
virtual Bool_t IsValid() const
Definition TSocket.h:130
virtual Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition TSocket.cxx:513
Basic string class.
Definition TString.h:138
This class represents a WWW compatible URL.
Definition TUrl.h:33
const char * GetFile() const
Definition TUrl.h:69
const char * GetHost() const
Definition TUrl.h:67
const char * GetOptions() const
Definition TUrl.h:71
Int_t GetPort() const
Definition TUrl.h:78
const Int_t n
Definition legend1.C:16