Logo ROOT   6.10/09
Reference Guide
TMessage.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Fons Rademakers 19/12/96
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TMessage //
15 // //
16 // Message buffer class used for serializing objects and sending them //
17 // over a network. This class inherits from TBuffer the basic I/O //
18 // serializer. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "TMessage.h"
23 #include "Compression.h"
24 #include "TVirtualStreamerInfo.h"
25 #include "Bytes.h"
26 #include "TFile.h"
27 #include "TProcessID.h"
28 #include "RZip.h"
29 
31 
32 
34 
35 ////////////////////////////////////////////////////////////////////////////////
36 /// Create a TMessage object for storing objects. The "what" integer
37 /// describes the type of message. Predifined ROOT system message types
38 /// can be found in MessageTypes.h. Make sure your own message types are
39 /// unique from the ROOT defined message types (i.e. 0 - 10000 are
40 /// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41 /// will wait for an acknowledgement from the remote side. This makes
42 /// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43 /// the message will be compressed in TSocket using the zip algorithm
44 /// (only if message is > 256 bytes).
45 
46 TMessage::TMessage(UInt_t what, Int_t bufsiz) :
47  TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t))
48 {
49  // space at the beginning of the message reserved for the message length
50  UInt_t reserved = 0;
51  *this << reserved;
52 
53  fWhat = what;
54  *this << what;
55 
56  fClass = 0;
57  fCompress = 0;
58  fBufComp = 0;
59  fBufCompCur = 0;
60  fCompPos = 0;
61  fInfos = 0;
62  fEvolution = kFALSE;
63 
64  SetBit(kCannotHandleMemberWiseStreaming);
65 }
66 
67 ////////////////////////////////////////////////////////////////////////////////
68 /// Create a TMessage object for reading objects. The objects will be
69 /// read from buf. Use the What() method to get the message type.
70 
71 TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf)
72 {
73  // skip space at the beginning of the message reserved for the message length
74  fBufCur += sizeof(UInt_t);
75 
76  *this >> fWhat;
77 
78  fCompress = 0;
79  fBufComp = 0;
80  fBufCompCur = 0;
81  fCompPos = 0;
82  fInfos = 0;
84 
85  if (fWhat & kMESS_ZIP) {
86  // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
87  fBufComp = fBuffer;
88  fBufCompCur = fBuffer + bufsize;
89  fBuffer = 0;
90  Uncompress();
91  }
92 
93  if (fWhat == kMESS_OBJECT) {
94  InitMap();
95  fClass = ReadClass(); // get first the class stored in message
96  SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
97  ResetMap();
98  } else {
99  fClass = 0;
100  }
101 }
102 
103 ////////////////////////////////////////////////////////////////////////////////
104 /// Clean up compression buffer.
105 
107 {
108  delete [] fBufComp;
109  delete fInfos;
110 }
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Static function enabling or disabling the automatic schema evolution.
114 /// By default schema evolution support is off.
115 
117 {
118  fgEvolution = enable;
119 }
120 
121 ////////////////////////////////////////////////////////////////////////////////
122 /// Static function returning status of global schema evolution.
123 
125 {
126  return fgEvolution;
127 }
128 
129 ////////////////////////////////////////////////////////////////////////////////
130 /// Force writing the TStreamerInfo to the message.
131 
133 {
134  if (fgEvolution || fEvolution) {
135  if (!fInfos) fInfos = new TList();
136  fInfos->Add(info);
137  }
138 }
139 
140 ////////////////////////////////////////////////////////////////////////////////
141 /// Change a buffer that was received into one that can be send, i.e.
142 /// forward a just received message.
143 
145 {
146  if (IsReading()) {
147  SetWriteMode();
150 
151  if (fBufComp) {
152  fCompPos = fBufCur;
153  }
154  }
155 }
156 
157 ////////////////////////////////////////////////////////////////////////////////
158 /// Remember that the StreamerInfo is being used in writing.
159 
161 {
162  if (fgEvolution || fEvolution) {
163  if (!fInfos) fInfos = new TList();
164  fInfos->Add(info);
165  }
166 }
167 
168 ////////////////////////////////////////////////////////////////////////////////
169 /// Reset the message buffer so we can use (i.e. fill) it again.
170 
172 {
173  SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
174  ResetMap();
175 
176  if (fBufComp) {
177  delete [] fBufComp;
178  fBufComp = 0;
179  fBufCompCur = 0;
180  fCompPos = 0;
181  }
182 }
183 
184 ////////////////////////////////////////////////////////////////////////////////
185 /// Set the message length at the beginning of the message buffer.
186 /// This method is only called by TSocket::Send().
187 
189 {
190  if (IsWriting()) {
191  char *buf = Buffer();
192  tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
193 
194  if (fBufComp) {
195  buf = fBufComp;
196  tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
197  }
198  }
199 }
200 
201 ////////////////////////////////////////////////////////////////////////////////
202 /// Using this method one can change the message type a-posteriory.
203 /// In case you OR "what" with kMESS_ACK, the message will wait for
204 /// an acknowledgement from the remote side. This makes the sending
205 /// process synchronous.
206 
208 {
209  fWhat = what;
210 
211  char *buf = Buffer();
212  buf += sizeof(UInt_t); // skip reserved length space
213  tobuf(buf, what);
214 
215  if (fBufComp) {
216  buf = fBufComp;
217  buf += sizeof(UInt_t); // skip reserved length space
218  tobuf(buf, what | kMESS_ZIP);
219  }
220 }
221 
222 ////////////////////////////////////////////////////////////////////////////////
223 
225 {
226  if (algorithm < 0 || algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
227  Int_t newCompress;
228  if (fCompress < 0) {
229  newCompress = 100 * algorithm + 1;
230  } else {
231  int level = fCompress % 100;
232  newCompress = 100 * algorithm + level;
233  }
234  if (newCompress != fCompress && fBufComp) {
235  delete [] fBufComp;
236  fBufComp = 0;
237  fBufCompCur = 0;
238  fCompPos = 0;
239  }
240  fCompress = newCompress;
241 }
242 
243 ////////////////////////////////////////////////////////////////////////////////
244 
246 {
247  if (level < 0) level = 0;
248  if (level > 99) level = 99;
249  Int_t newCompress;
250  if (fCompress < 0) {
251  newCompress = level;
252  } else {
253  int algorithm = fCompress / 100;
254  if (algorithm >= ROOT::kUndefinedCompressionAlgorithm) algorithm = 0;
255  newCompress = 100 * algorithm + level;
256  }
257  if (newCompress != fCompress && fBufComp) {
258  delete [] fBufComp;
259  fBufComp = 0;
260  fBufCompCur = 0;
261  fCompPos = 0;
262  }
263  fCompress = newCompress;
264 }
265 
266 ////////////////////////////////////////////////////////////////////////////////
267 
269 {
270  if (settings != fCompress && fBufComp) {
271  delete [] fBufComp;
272  fBufComp = 0;
273  fBufCompCur = 0;
274  fCompPos = 0;
275  }
276  fCompress = settings;
277 }
278 
279 ////////////////////////////////////////////////////////////////////////////////
280 /// Compress the message. The message will only be compressed if the
281 /// compression level > 0 and the if the message is > 256 bytes.
282 /// Returns -1 in case of error (when compression fails or
283 /// when the message increases in size in some pathological cases),
284 /// otherwise returns 0.
285 
287 {
288  Int_t compressionLevel = GetCompressionLevel();
289  Int_t compressionAlgorithm = GetCompressionAlgorithm();
290  if (compressionLevel <= 0) {
291  // no compression specified
292  if (fBufComp) {
293  delete [] fBufComp;
294  fBufComp = 0;
295  fBufCompCur = 0;
296  fCompPos = 0;
297  }
298  return 0;
299  }
300 
301  if (fBufComp && fCompPos == fBufCur) {
302  // the message was already compressed
303  return 0;
304  }
305 
306  // remove any existing compressed buffer before compressing modified message
307  if (fBufComp) {
308  delete [] fBufComp;
309  fBufComp = 0;
310  fBufCompCur = 0;
311  fCompPos = 0;
312  }
313 
314  if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
315  // this message is too small to be compressed
316  return 0;
317  }
318 
319  Int_t hdrlen = 2*sizeof(UInt_t);
320  Int_t messlen = Length() - hdrlen;
321  Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
322  Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
323  Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
324  fBufComp = new char[buflen];
325  char *messbuf = Buffer() + hdrlen;
326  char *bufcur = fBufComp + chdrlen;
327  Int_t noutot = 0;
328  Int_t nzip = 0;
329  Int_t nout, bufmax;
330  for (Int_t i = 0; i < nbuffers; ++i) {
331  if (i == nbuffers - 1)
332  bufmax = messlen - nzip;
333  else
334  bufmax = kMAXZIPBUF;
335  R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout, compressionAlgorithm);
336  if (nout == 0 || nout >= messlen) {
337  //this happens when the buffer cannot be compressed
338  delete [] fBufComp;
339  fBufComp = 0;
340  fBufCompCur = 0;
341  fCompPos = 0;
342  return -1;
343  }
344  bufcur += nout;
345  noutot += nout;
346  messbuf += kMAXZIPBUF;
347  nzip += kMAXZIPBUF;
348  }
349  fBufCompCur = bufcur;
350  fCompPos = fBufCur;
351 
352  bufcur = fBufComp;
353  tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
354  Int_t what = fWhat | kMESS_ZIP;
355  tobuf(bufcur, what);
356  tobuf(bufcur, Length()); // original uncompressed buffer length
357 
358  return 0;
359 }
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 /// Uncompress the message. The message will only be uncompressed when
363 /// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
364 
366 {
367  if (!fBufComp || !(fWhat & kMESS_ZIP))
368  return -1;
369 
370  Int_t buflen;
371  Int_t hdrlen = 2*sizeof(UInt_t);
372  char *bufcur1 = fBufComp + hdrlen;
373  frombuf(bufcur1, &buflen);
374  UChar_t *bufcur = (UChar_t*)bufcur1;
375 
376  /* early consistency check */
377  Int_t nin, nbuf;
378  if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
379  Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
380  return -1;
381  }
382 
383  fBuffer = new char[buflen];
384  fBufSize = buflen;
385  fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
387  char *messbuf = fBuffer + hdrlen;
388 
389  Int_t nout;
390  Int_t noutot = 0;
391  while (1) {
392  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
393  if (hc!=0) break;
394  R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
395  if (!nout) break;
396  noutot += nout;
397  if (noutot >= buflen - hdrlen) break;
398  bufcur += nin;
399  messbuf += nout;
400  }
401 
402  fWhat &= ~kMESS_ZIP;
403  fCompress = 1;
404 
405  return 0;
406 }
407 
408 ////////////////////////////////////////////////////////////////////////////////
409 /// Write object to message buffer.
410 /// When support for schema evolution is enabled the list of TStreamerInfo
411 /// used to stream this object is kept in fInfos. This information is used
412 /// by TSocket::Send that sends this list through the socket. This list is in
413 /// turn used by TSocket::Recv to store the TStreamerInfo objects in the
414 /// relevant TClass in case the TClass does not know yet about a particular
415 /// class version. This feature is implemented to support clients and servers
416 /// with either different ROOT versions or different user classes versions.
417 
419 {
420  if (fgEvolution || fEvolution) {
421  if (fInfos)
422  fInfos->Clear();
423  else
424  fInfos = new TList();
425  }
426 
429 }
430 
431 ////////////////////////////////////////////////////////////////////////////////
432 /// Check if the ProcessID pid is already in the message.
433 /// If not, then:
434 /// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
435 /// - mark bit uid+1 where uid id the uid of the ProcessID
436 
438 {
439  if (fBitsPIDs.TestBitNumber(0)) return 0;
440  if (!pid)
441  pid = TProcessID::GetPID();
442  if (!pid) return 0;
444  UInt_t uid = pid->GetUniqueID();
445  fBitsPIDs.SetBitNumber(uid+1);
446  return 1;
447 }
void SetCompressionSettings(Int_t settings=1)
Definition: TMessage.cxx:268
virtual TClass * ReadClass(const TClass *cl=0, UInt_t *objTag=0)
Read class definition from I/O buffer.
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Definition: TProcessID.cxx:313
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:88
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition: TObject.cxx:382
Int_t Compress()
Compress the message.
Definition: TMessage.cxx:286
Bool_t IsReading() const
Definition: TBuffer.h:81
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
Bool_t IsWriting() const
Definition: TBuffer.h:82
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket...
Definition: TBufferFile.h:47
char * fBuffer
Definition: TBuffer.h:48
static Bool_t fgEvolution
Definition: TMessage.h:51
TMessage(const TMessage &)
unsigned short UShort_t
Definition: RtypesCore.h:36
UInt_t fWhat
Definition: TMessage.h:43
Buffer base class used for serializing objects.
Definition: TBuffer.h:40
Int_t GetCompressionAlgorithm() const
Definition: TMessage.h:100
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
void SetCompressionAlgorithm(Int_t algorithm=0)
Definition: TMessage.cxx:224
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition: TBits.cxx:470
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition: TMessage.cxx:124
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:687
Int_t Length() const
Definition: TBuffer.h:94
char * fBufCompCur
Definition: TMessage.h:47
void TagStreamerInfo(TVirtualStreamerInfo *info)
Remember that the StreamerInfo is being used in writing.
Definition: TMessage.cxx:160
TClass * fClass
Definition: TMessage.h:44
char * fBufComp
Definition: TMessage.h:46
void Class()
Definition: Class.C:29
char * Buffer() const
Definition: TBuffer.h:91
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriory.
Definition: TMessage.cxx:207
Int_t CompLength() const
Definition: TMessage.h:89
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void tobuf(char *&buf, Bool_t x)
Definition: Bytes.h:57
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition: TProcessID.h:69
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition: TBits.h:219
A doubly linked list.
Definition: TList.h:43
void ResetMap()
Delete existing fMap and reset map counter.
void Reset()
Reset the message buffer so we can use (i.e. fill) it again.
Definition: TMessage.cxx:171
void InitMap()
Create the fMap container and initialize them with the null object.
unsigned int UInt_t
Definition: RtypesCore.h:42
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:873
Int_t Uncompress()
Uncompress the message.
Definition: TMessage.cxx:365
TList * fInfos
Definition: TMessage.h:41
char * fBufCur
Definition: TBuffer.h:49
Bool_t fEvolution
Definition: TMessage.h:49
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition: TMessage.cxx:116
const Bool_t kFALSE
Definition: RtypesCore.h:92
UShort_t WriteProcessID(TProcessID *pid)
Check if the ProcessID pid is already in the message.
Definition: TMessage.cxx:437
#define ClassImp(name)
Definition: Rtypes.h:336
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force)
Force writing the TStreamerInfo to the message.
Definition: TMessage.cxx:132
Int_t GetCompressionLevel() const
Definition: TMessage.h:106
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:353
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
char * fCompPos
Definition: TMessage.h:48
Mother of all ROOT objects.
Definition: TObject.h:37
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:188
virtual void Add(TObject *obj)
Definition: TList.h:77
TBits fBitsPIDs
Definition: TMessage.h:42
void WriteObject(const TObject *obj)
Write object to message buffer.
Definition: TMessage.cxx:418
void SetCompressionLevel(Int_t level=1)
Definition: TMessage.cxx:245
virtual ~TMessage()
Clean up compression buffer.
Definition: TMessage.cxx:106
Int_t fBufSize
Definition: TBuffer.h:47
virtual Int_t WriteObjectAny(const void *obj, const TClass *ptrClass)
Write object to I/O buffer.
unsigned char UChar_t
Definition: RtypesCore.h:34
Abstract Interface class describing Streamer information for one class.
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition: TMessage.cxx:144
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:194
char * fBufMax
Definition: TBuffer.h:50
void SetWriteMode()
Set buffer in write mode.
Definition: TBuffer.cxx:284
Int_t fCompress
Definition: TMessage.h:45