Logo ROOT   6.18/05
Reference Guide
TMessage.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 19/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMessage //
15// //
16// Message buffer class used for serializing objects and sending them //
17// over a network. This class inherits from TBuffer the basic I/O //
18// serializer. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "TMessage.h"
23#include "Compression.h"
25#include "Bytes.h"
26#include "TFile.h"
27#include "TProcessID.h"
28#include "RZip.h"
29
31
32
34
35////////////////////////////////////////////////////////////////////////////////
36/// Create a TMessage object for storing objects. The "what" integer
37/// describes the type of message. Predifined ROOT system message types
38/// can be found in MessageTypes.h. Make sure your own message types are
39/// unique from the ROOT defined message types (i.e. 0 - 10000 are
40/// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41/// will wait for an acknowledgement from the remote side. This makes
42/// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43/// the message will be compressed in TSocket using the zip algorithm
44/// (only if message is > 256 bytes).
45
47 TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t)),
48 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
49{
50 // space at the beginning of the message reserved for the message length
51 UInt_t reserved = 0;
52 *this << reserved;
53
54 fWhat = what;
55 *this << what;
56
57 fClass = 0;
58 fBufComp = 0;
59 fBufCompCur = 0;
60 fCompPos = 0;
61 fInfos = 0;
63
65}
66
67////////////////////////////////////////////////////////////////////////////////
68/// Create a TMessage object for reading objects. The objects will be
69/// read from buf. Use the What() method to get the message type.
70
71TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf),
72 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
73{
74 // skip space at the beginning of the message reserved for the message length
75 fBufCur += sizeof(UInt_t);
76
77 *this >> fWhat;
78
79 fBufComp = 0;
80 fBufCompCur = 0;
81 fCompPos = 0;
82 fInfos = 0;
84
85 if (fWhat & kMESS_ZIP) {
86 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
88 fBufCompCur = fBuffer + bufsize;
89 fBuffer = 0;
90 Uncompress();
91 }
92
93 if (fWhat == kMESS_OBJECT) {
94 InitMap();
95 fClass = ReadClass(); // get first the class stored in message
96 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
97 ResetMap();
98 } else {
99 fClass = 0;
100 }
101}
102
103////////////////////////////////////////////////////////////////////////////////
104/// Clean up compression buffer.
105
107{
108 delete [] fBufComp;
109 delete fInfos;
110}
111
112////////////////////////////////////////////////////////////////////////////////
113/// Static function enabling or disabling the automatic schema evolution.
114/// By default schema evolution support is off.
115
117{
118 fgEvolution = enable;
119}
120
121////////////////////////////////////////////////////////////////////////////////
122/// Static function returning status of global schema evolution.
123
125{
126 return fgEvolution;
127}
128
129////////////////////////////////////////////////////////////////////////////////
130/// Force writing the TStreamerInfo to the message.
131
133{
134 if (fgEvolution || fEvolution) {
135 if (!fInfos) fInfos = new TList();
136 fInfos->Add(info);
137 }
138}
139
140////////////////////////////////////////////////////////////////////////////////
141/// Change a buffer that was received into one that can be send, i.e.
142/// forward a just received message.
143
145{
146 if (IsReading()) {
147 SetWriteMode();
150
151 if (fBufComp) {
153 }
154 }
155}
156
157////////////////////////////////////////////////////////////////////////////////
158/// Remember that the StreamerInfo is being used in writing.
159///
160/// When support for schema evolution is enabled the list of TStreamerInfo
161/// used to stream this object is kept in fInfos. This information is used
162/// by TSocket::Send that sends this list through the socket. This list is in
163/// turn used by TSocket::Recv to store the TStreamerInfo objects in the
164/// relevant TClass in case the TClass does not know yet about a particular
165/// class version. This feature is implemented to support clients and servers
166/// with either different ROOT versions or different user classes versions.
168{
169 if (fgEvolution || fEvolution) {
170 if (!fInfos) fInfos = new TList();
171 fInfos->Add(info);
172 }
173}
174
175////////////////////////////////////////////////////////////////////////////////
176/// Reset the message buffer so we can use (i.e. fill) it again.
177
179{
180 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
181 ResetMap();
182
183 if (fBufComp) {
184 delete [] fBufComp;
185 fBufComp = 0;
186 fBufCompCur = 0;
187 fCompPos = 0;
188 }
189
190 if (fgEvolution || fEvolution) {
191 if (fInfos)
192 fInfos->Clear();
193 }
195}
196
197////////////////////////////////////////////////////////////////////////////////
198/// Set the message length at the beginning of the message buffer.
199/// This method is only called by TSocket::Send().
200
202{
203 if (IsWriting()) {
204 char *buf = Buffer();
205 if (buf)
206 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
207
208 if (fBufComp) {
209 buf = fBufComp;
210 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
211 }
212 }
213}
214
215////////////////////////////////////////////////////////////////////////////////
216/// Using this method one can change the message type a-posteriory.
217/// In case you OR "what" with kMESS_ACK, the message will wait for
218/// an acknowledgement from the remote side. This makes the sending
219/// process synchronous.
220
222{
223 fWhat = what;
224
225 char *buf = Buffer();
226 if (buf) {
227 buf += sizeof(UInt_t); // skip reserved length space
228 tobuf(buf, what);
229 }
230
231 if (fBufComp) {
232 buf = fBufComp;
233 buf += sizeof(UInt_t); // skip reserved length space
234 tobuf(buf, what | kMESS_ZIP);
235 }
236}
237
238////////////////////////////////////////////////////////////////////////////////
239
241{
242 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
243 Int_t newCompress;
244 if (fCompress < 0) {
245 newCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
246 } else {
247 int level = fCompress % 100;
248 newCompress = 100 * algorithm + level;
249 }
250 if (newCompress != fCompress && fBufComp) {
251 delete [] fBufComp;
252 fBufComp = 0;
253 fBufCompCur = 0;
254 fCompPos = 0;
255 }
256 fCompress = newCompress;
257}
258
259////////////////////////////////////////////////////////////////////////////////
260
262{
263 if (level < 0) level = 0;
264 if (level > 99) level = 99;
265 Int_t newCompress;
266 if (fCompress < 0) {
267 newCompress = level;
268 } else {
269 int algorithm = fCompress / 100;
270 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
271 newCompress = 100 * algorithm + level;
272 }
273 if (newCompress != fCompress && fBufComp) {
274 delete [] fBufComp;
275 fBufComp = 0;
276 fBufCompCur = 0;
277 fCompPos = 0;
278 }
279 fCompress = newCompress;
280}
281
282////////////////////////////////////////////////////////////////////////////////
283
285{
286 if (settings != fCompress && fBufComp) {
287 delete [] fBufComp;
288 fBufComp = 0;
289 fBufCompCur = 0;
290 fCompPos = 0;
291 }
292 fCompress = settings;
293}
294
295////////////////////////////////////////////////////////////////////////////////
296/// Compress the message. The message will only be compressed if the
297/// compression level > 0 and the if the message is > 256 bytes.
298/// Returns -1 in case of error (when compression fails or
299/// when the message increases in size in some pathological cases),
300/// otherwise returns 0.
301
303{
304 Int_t compressionLevel = GetCompressionLevel();
305 Int_t compressionAlgorithm = GetCompressionAlgorithm();
306 if (compressionLevel <= 0) {
307 // no compression specified
308 if (fBufComp) {
309 delete [] fBufComp;
310 fBufComp = 0;
311 fBufCompCur = 0;
312 fCompPos = 0;
313 }
314 return 0;
315 }
316
317 if (fBufComp && fCompPos == fBufCur) {
318 // the message was already compressed
319 return 0;
320 }
321
322 // remove any existing compressed buffer before compressing modified message
323 if (fBufComp) {
324 delete [] fBufComp;
325 fBufComp = 0;
326 fBufCompCur = 0;
327 fCompPos = 0;
328 }
329
330 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
331 // this message is too small to be compressed
332 return 0;
333 }
334
335 if (!Buffer()) {
336 // error condition, should never happen
337 return -1;
338 }
339
340 Int_t hdrlen = 2*sizeof(UInt_t);
341 Int_t messlen = Length() - hdrlen;
342 Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
343 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
344 Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
345 fBufComp = new char[buflen];
346 char *messbuf = Buffer() + hdrlen;
347 char *bufcur = fBufComp + chdrlen;
348 Int_t noutot = 0;
349 Int_t nzip = 0;
350 Int_t nout, bufmax;
351 for (Int_t i = 0; i < nbuffers; ++i) {
352 if (i == nbuffers - 1)
353 bufmax = messlen - nzip;
354 else
355 bufmax = kMAXZIPBUF;
356 R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout,
357 static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(compressionAlgorithm));
358 if (nout == 0 || nout >= messlen) {
359 //this happens when the buffer cannot be compressed
360 delete [] fBufComp;
361 fBufComp = 0;
362 fBufCompCur = 0;
363 fCompPos = 0;
364 return -1;
365 }
366 bufcur += nout;
367 noutot += nout;
368 messbuf += kMAXZIPBUF;
369 nzip += kMAXZIPBUF;
370 }
371 fBufCompCur = bufcur;
373
374 bufcur = fBufComp;
375 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
376 Int_t what = fWhat | kMESS_ZIP;
377 tobuf(bufcur, what);
378 tobuf(bufcur, Length()); // original uncompressed buffer length
379
380 return 0;
381}
382
383////////////////////////////////////////////////////////////////////////////////
384/// Uncompress the message. The message will only be uncompressed when
385/// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
386
388{
389 if (!fBufComp || !(fWhat & kMESS_ZIP))
390 return -1;
391
392 Int_t buflen;
393 Int_t hdrlen = 2*sizeof(UInt_t);
394 char *bufcur1 = fBufComp + hdrlen;
395 frombuf(bufcur1, &buflen);
396 UChar_t *bufcur = (UChar_t*)bufcur1;
397
398 /* early consistency check */
399 Int_t nin, nbuf;
400 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
401 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
402 return -1;
403 }
404
405 fBuffer = new char[buflen];
406 fBufSize = buflen;
407 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
409 char *messbuf = fBuffer + hdrlen;
410
411 Int_t nout;
412 Int_t noutot = 0;
413 while (1) {
414 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
415 if (hc!=0) break;
416 R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
417 if (!nout) break;
418 noutot += nout;
419 if (noutot >= buflen - hdrlen) break;
420 bufcur += nin;
421 messbuf += nout;
422 }
423
424 fWhat &= ~kMESS_ZIP;
425 fCompress = 1;
426
427 return 0;
428}
429
430////////////////////////////////////////////////////////////////////////////////
431/// Check if the ProcessID pid is already in the message.
432/// If not, then:
433/// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
434/// - mark bit uid+1 where uid id the uid of the ProcessID
435
437{
438 if (fBitsPIDs.TestBitNumber(0)) return 0;
439 if (!pid)
440 pid = TProcessID::GetPID();
441 if (!pid) return 0;
443 UInt_t uid = pid->GetUniqueID();
444 fBitsPIDs.SetBitNumber(uid+1);
445 return 1;
446}
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
void tobuf(char *&buf, Bool_t x)
Definition: Bytes.h:57
@ kMESS_OBJECT
Definition: MessageTypes.h:35
@ kMESS_ZIP
Definition: MessageTypes.h:28
unsigned short UShort_t
Definition: RtypesCore.h:36
int Int_t
Definition: RtypesCore.h:41
unsigned char UChar_t
Definition: RtypesCore.h:34
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
#define ClassImp(name)
Definition: Rtypes.h:365
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition: TBits.cxx:481
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition: TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:206
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket.
Definition: TBufferFile.h:46
virtual TClass * ReadClass(const TClass *cl=0, UInt_t *objTag=0)
Read class definition from I/O buffer.
virtual void ResetMap()
Delete existing fMap and reset map counter.
Definition: TBufferIO.cxx:288
virtual void InitMap()
Create the fMap container and initialize them with the null object.
Definition: TBufferIO.cxx:129
Buffer base class used for serializing objects.
Definition: TBuffer.h:42
void SetWriteMode()
Set buffer in write mode.
Definition: TBuffer.cxx:315
Int_t fBufSize
Definition: TBuffer.h:49
@ kCannotHandleMemberWiseStreaming
Definition: TBuffer.h:75
char * fBufMax
Definition: TBuffer.h:52
char * fBufCur
Definition: TBuffer.h:51
Bool_t IsWriting() const
Definition: TBuffer.h:86
Bool_t IsReading() const
Definition: TBuffer.h:85
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:92
char * fBuffer
Definition: TBuffer.h:50
Int_t Length() const
Definition: TBuffer.h:99
char * Buffer() const
Definition: TBuffer.h:95
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:399
UShort_t WriteProcessID(TProcessID *pid)
Check if the ProcessID pid is already in the message.
Definition: TMessage.cxx:436
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition: TMessage.cxx:116
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force)
Force writing the TStreamerInfo to the message.
Definition: TMessage.cxx:132
UInt_t fWhat
Definition: TMessage.h:44
void TagStreamerInfo(TVirtualStreamerInfo *info)
Remember that the StreamerInfo is being used in writing.
Definition: TMessage.cxx:167
char * fCompPos
Definition: TMessage.h:49
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition: TMessage.cxx:144
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:201
void Reset()
Reset the message buffer so we can use (i.e. fill) it again.
Definition: TMessage.cxx:178
Int_t GetCompressionAlgorithm() const
Definition: TMessage.h:100
TClass * fClass
Definition: TMessage.h:45
char * fBufComp
Definition: TMessage.h:47
char * fBufCompCur
Definition: TMessage.h:48
Bool_t fEvolution
Definition: TMessage.h:50
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseGeneralPurpose)
Definition: TMessage.cxx:284
Int_t Compress()
Compress the message.
Definition: TMessage.cxx:302
virtual ~TMessage()
Clean up compression buffer.
Definition: TMessage.cxx:106
Int_t fCompress
Definition: TMessage.h:46
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition: TMessage.cxx:124
TMessage(const TMessage &)
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
Definition: TMessage.cxx:261
Int_t Uncompress()
Uncompress the message.
Definition: TMessage.cxx:387
Int_t GetCompressionLevel() const
Definition: TMessage.h:106
TBits fBitsPIDs
Definition: TMessage.h:43
TList * fInfos
Definition: TMessage.h:42
Int_t CompLength() const
Definition: TMessage.h:90
static Bool_t fgEvolution
Definition: TMessage.h:52
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriory.
Definition: TMessage.cxx:221
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
Definition: TMessage.cxx:240
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition: TObject.cxx:375
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition: TProcessID.h:69
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Definition: TProcessID.cxx:341
Abstract Interface class describing Streamer information for one class.
Namespace for new ROOT classes and functions.
Definition: StringConv.hxx:21
EValues
Note: this is only temporarily a struct and will become a enum class hence the name.
Definition: Compression.h:76
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
Definition: Compression.h:91