Logo ROOT  
Reference Guide
TMessage.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 19/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMessage //
15// //
16// Message buffer class used for serializing objects and sending them //
17// over a network. This class inherits from TBuffer the basic I/O //
18// serializer. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "TMessage.h"
23#include "Compression.h"
25#include "TList.h"
26#include "Bytes.h"
27#include "TProcessID.h"
28#include "RZip.h"
29
31
32
34
35////////////////////////////////////////////////////////////////////////////////
36/// Create a TMessage object for storing objects. The "what" integer
37/// describes the type of message. Predefined ROOT system message types
38/// can be found in MessageTypes.h. Make sure your own message types are
39/// unique from the ROOT defined message types (i.e. 0 - 10000 are
40/// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41/// will wait for an acknowledgment from the remote side. This makes
42/// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43/// the message will be compressed in TSocket using the zip algorithm
44/// (only if message is > 256 bytes).
45
47 TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t)),
48 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
49{
50 // space at the beginning of the message reserved for the message length
51 UInt_t reserved = 0;
52 *this << reserved;
53
54 fWhat = what;
55 *this << what;
56
57 fClass = nullptr;
58 fBufComp = nullptr;
59 fBufCompCur = nullptr;
60 fCompPos = nullptr;
61 fInfos = nullptr;
63
65}
66
67////////////////////////////////////////////////////////////////////////////////
68/// Create a TMessage object for reading objects. The objects will be
69/// read from buf. Use the What() method to get the message type.
70
71TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf),
72 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
73{
74 // skip space at the beginning of the message reserved for the message length
75 fBufCur += sizeof(UInt_t);
76
77 *this >> fWhat;
78
79 fBufComp = nullptr;
80 fBufCompCur = nullptr;
81 fCompPos = nullptr;
82 fInfos = nullptr;
84
85 if (fWhat & kMESS_ZIP) {
86 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
88 fBufCompCur = fBuffer + bufsize;
89 fBuffer = nullptr;
90 Uncompress();
91 }
92
93 if (fWhat == kMESS_OBJECT) {
94 InitMap();
95 fClass = ReadClass(); // get first the class stored in message
96 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
97 ResetMap();
98 } else {
99 fClass = nullptr;
100 }
101}
102
103////////////////////////////////////////////////////////////////////////////////
104/// Destructor
105
107{
108 delete [] fBufComp;
109 delete fInfos;
110}
111
112////////////////////////////////////////////////////////////////////////////////
113/// Static function enabling or disabling the automatic schema evolution.
114/// By default schema evolution support is off.
115
117{
118 fgEvolution = enable;
119}
120
121////////////////////////////////////////////////////////////////////////////////
122/// Static function returning status of global schema evolution.
123
125{
126 return fgEvolution;
127}
128
129////////////////////////////////////////////////////////////////////////////////
130/// Force writing the TStreamerInfo to the message.
131
133{
134 if (fgEvolution || fEvolution) {
135 if (!fInfos) fInfos = new TList();
136 fInfos->Add(info);
137 }
138}
139
140////////////////////////////////////////////////////////////////////////////////
141/// Change a buffer that was received into one that can be send, i.e.
142/// forward a just received message.
143
145{
146 if (IsReading()) {
147 SetWriteMode();
150
151 if (fBufComp) {
153 }
154 }
155}
156
157////////////////////////////////////////////////////////////////////////////////
158/// Remember that the StreamerInfo is being used in writing.
159///
160/// When support for schema evolution is enabled the list of TStreamerInfo
161/// used to stream this object is kept in fInfos. This information is used
162/// by TSocket::Send that sends this list through the socket. This list is in
163/// turn used by TSocket::Recv to store the TStreamerInfo objects in the
164/// relevant TClass in case the TClass does not know yet about a particular
165/// class version. This feature is implemented to support clients and servers
166/// with either different ROOT versions or different user classes versions.
167
169{
170 if (fgEvolution || fEvolution) {
171 if (!fInfos) fInfos = new TList();
172 fInfos->Add(info);
173 }
174}
175
176////////////////////////////////////////////////////////////////////////////////
177/// Reset the message buffer so we can use (i.e. fill) it again.
178
180{
181 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
182 ResetMap();
183
184 if (fBufComp) {
185 delete [] fBufComp;
186 fBufComp = nullptr;
187 fBufCompCur = nullptr;
188 fCompPos = nullptr;
189 }
190
191 if (fgEvolution || fEvolution) {
192 if (fInfos)
193 fInfos->Clear();
194 }
196}
197
198////////////////////////////////////////////////////////////////////////////////
199/// Set the message length at the beginning of the message buffer.
200/// This method is only called by TSocket::Send().
201
203{
204 if (IsWriting()) {
205 char *buf = Buffer();
206 if (buf)
207 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
208
209 if (fBufComp) {
210 buf = fBufComp;
211 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
212 }
213 }
214}
215
216////////////////////////////////////////////////////////////////////////////////
217/// Using this method one can change the message type a-posteriori
218/// In case you OR "what" with kMESS_ACK, the message will wait for
219/// an acknowledgment from the remote side. This makes the sending
220/// process synchronous.
221
223{
224 fWhat = what;
225
226 char *buf = Buffer();
227 if (buf) {
228 buf += sizeof(UInt_t); // skip reserved length space
229 tobuf(buf, what);
230 }
231
232 if (fBufComp) {
233 buf = fBufComp;
234 buf += sizeof(UInt_t); // skip reserved length space
235 tobuf(buf, what | kMESS_ZIP);
236 }
237}
238
239////////////////////////////////////////////////////////////////////////////////
240/// Set compression algorithm
241
243{
244 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
245 Int_t newCompress;
246 if (fCompress < 0) {
247 newCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
248 } else {
249 int level = fCompress % 100;
250 newCompress = 100 * algorithm + level;
251 }
252 if (newCompress != fCompress && fBufComp) {
253 delete [] fBufComp;
254 fBufComp = nullptr;
255 fBufCompCur = nullptr;
256 fCompPos = nullptr;
257 }
258 fCompress = newCompress;
259}
260
261////////////////////////////////////////////////////////////////////////////////
262/// Set compression level
263
265{
266 if (level < 0) level = 0;
267 if (level > 99) level = 99;
268 Int_t newCompress;
269 if (fCompress < 0) {
270 newCompress = level;
271 } else {
272 int algorithm = fCompress / 100;
273 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
274 newCompress = 100 * algorithm + level;
275 }
276 if (newCompress != fCompress && fBufComp) {
277 delete [] fBufComp;
278 fBufComp = nullptr;
279 fBufCompCur = nullptr;
280 fCompPos = nullptr;
281 }
282 fCompress = newCompress;
283}
284
285////////////////////////////////////////////////////////////////////////////////
286/// Set compression settings
287
289{
290 if (settings != fCompress && fBufComp) {
291 delete [] fBufComp;
292 fBufComp = nullptr;
293 fBufCompCur = nullptr;
294 fCompPos = nullptr;
295 }
296 fCompress = settings;
297}
298
299////////////////////////////////////////////////////////////////////////////////
300/// Compress the message. The message will only be compressed if the
301/// compression level > 0 and the if the message is > 256 bytes.
302/// Returns -1 in case of error (when compression fails or
303/// when the message increases in size in some pathological cases),
304/// otherwise returns 0.
305
307{
308 Int_t compressionLevel = GetCompressionLevel();
309 Int_t compressionAlgorithm = GetCompressionAlgorithm();
310 if (compressionLevel <= 0) {
311 // no compression specified
312 if (fBufComp) {
313 delete [] fBufComp;
314 fBufComp = nullptr;
315 fBufCompCur = nullptr;
316 fCompPos = nullptr;
317 }
318 return 0;
319 }
320
321 if (fBufComp && fCompPos == fBufCur) {
322 // the message was already compressed
323 return 0;
324 }
325
326 // remove any existing compressed buffer before compressing modified message
327 if (fBufComp) {
328 delete [] fBufComp;
329 fBufComp = nullptr;
330 fBufCompCur = nullptr;
331 fCompPos = nullptr;
332 }
333
334 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
335 // this message is too small to be compressed
336 return 0;
337 }
338
339 if (!Buffer()) {
340 // error condition, should never happen
341 return -1;
342 }
343
344 Int_t hdrlen = 2*sizeof(UInt_t);
345 Int_t messlen = Length() - hdrlen;
346 Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
347 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
348 Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
349 fBufComp = new char[buflen];
350 char *messbuf = Buffer() + hdrlen;
351 char *bufcur = fBufComp + chdrlen;
352 Int_t noutot = 0;
353 Int_t nzip = 0;
354 Int_t nout, bufmax;
355 for (Int_t i = 0; i < nbuffers; ++i) {
356 if (i == nbuffers - 1)
357 bufmax = messlen - nzip;
358 else
359 bufmax = kMAXZIPBUF;
360 R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout,
361 static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(compressionAlgorithm));
362 if (nout == 0 || nout >= messlen) {
363 //this happens when the buffer cannot be compressed
364 delete [] fBufComp;
365 fBufComp = nullptr;
366 fBufCompCur = nullptr;
367 fCompPos = nullptr;
368 return -1;
369 }
370 bufcur += nout;
371 noutot += nout;
372 messbuf += kMAXZIPBUF;
373 nzip += kMAXZIPBUF;
374 }
375 fBufCompCur = bufcur;
377
378 bufcur = fBufComp;
379 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
381 tobuf(bufcur, what);
382 tobuf(bufcur, Length()); // original uncompressed buffer length
383
384 return 0;
385}
386
387////////////////////////////////////////////////////////////////////////////////
388/// Uncompress the message. The message will only be uncompressed when
389/// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
390
392{
393 if (!fBufComp || !(fWhat & kMESS_ZIP))
394 return -1;
395
396 Int_t buflen;
397 Int_t hdrlen = 2*sizeof(UInt_t);
398 char *bufcur1 = fBufComp + hdrlen;
399 frombuf(bufcur1, &buflen);
400 UChar_t *bufcur = (UChar_t*)bufcur1;
401
402 /* early consistency check */
403 Int_t nin, nbuf;
404 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
405 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
406 return -1;
407 }
408
409 fBuffer = new char[buflen];
410 fBufSize = buflen;
411 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
413 char *messbuf = fBuffer + hdrlen;
414
415 Int_t nout;
416 Int_t noutot = 0;
417 while (1) {
418 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
419 if (hc!=0) break;
420 R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
421 if (!nout) break;
422 noutot += nout;
423 if (noutot >= buflen - hdrlen) break;
424 bufcur += nin;
425 messbuf += nout;
426 }
427
428 fWhat &= ~kMESS_ZIP;
429 fCompress = 1;
430
431 return 0;
432}
433
434////////////////////////////////////////////////////////////////////////////////
435/// Check if the ProcessID pid is already in the message.
436/// If not, then:
437/// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
438/// - mark bit uid+1 where uid id the uid of the ProcessID
439
441{
442 if (fBitsPIDs.TestBitNumber(0)) return 0;
443 if (!pid)
444 pid = TProcessID::GetPID();
445 if (!pid) return 0;
447 UInt_t uid = pid->GetUniqueID();
448 fBitsPIDs.SetBitNumber(uid+1);
449 return 1;
450}
void frombuf(char *&buf, Bool_t *x)
Definition: Bytes.h:280
void tobuf(char *&buf, Bool_t x)
Definition: Bytes.h:57
@ kMESS_OBJECT
Definition: MessageTypes.h:35
@ kMESS_ZIP
Definition: MessageTypes.h:28
unsigned short UShort_t
Definition: RtypesCore.h:38
unsigned char UChar_t
Definition: RtypesCore.h:36
unsigned int UInt_t
Definition: RtypesCore.h:44
const Bool_t kFALSE
Definition: RtypesCore.h:90
bool Bool_t
Definition: RtypesCore.h:61
#define ClassImp(name)
Definition: Rtypes.h:361
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition: TBits.cxx:481
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition: TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition: TBits.h:206
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket.
Definition: TBufferFile.h:46
TClass * ReadClass(const TClass *cl=nullptr, UInt_t *objTag=nullptr) override
Read class definition from I/O buffer.
void InitMap() override
Create the fMap container and initialize them with the null object.
Definition: TBufferIO.cxx:129
void ResetMap() override
Delete existing fMap and reset map counter.
Definition: TBufferIO.cxx:288
Buffer base class used for serializing objects.
Definition: TBuffer.h:42
void SetWriteMode()
Set buffer in write mode.
Definition: TBuffer.cxx:315
Int_t fBufSize
Definition: TBuffer.h:49
@ kCannotHandleMemberWiseStreaming
Definition: TBuffer.h:75
char * fBufMax
Definition: TBuffer.h:52
char * fBufCur
Definition: TBuffer.h:51
Bool_t IsWriting() const
Definition: TBuffer.h:86
Bool_t IsReading() const
Definition: TBuffer.h:85
void SetBufferOffset(Int_t offset=0)
Definition: TBuffer.h:92
char * fBuffer
Definition: TBuffer.h:50
Int_t Length() const
Definition: TBuffer.h:99
char * Buffer() const
Definition: TBuffer.h:95
A doubly linked list.
Definition: TList.h:44
virtual void Add(TObject *obj)
Definition: TList.h:87
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition: TList.cxx:401
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition: TMessage.cxx:116
UInt_t fWhat
Definition: TMessage.h:44
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force) override
Force writing the TStreamerInfo to the message.
Definition: TMessage.cxx:132
char * fCompPos
Definition: TMessage.h:49
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition: TMessage.cxx:144
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:202
Int_t GetCompressionAlgorithm() const
Definition: TMessage.h:100
TClass * fClass
Definition: TMessage.h:45
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition: TMessage.cxx:179
char * fBufComp
Definition: TMessage.h:47
char * fBufCompCur
Definition: TMessage.h:48
Bool_t fEvolution
Definition: TMessage.h:50
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Set compression settings.
Definition: TMessage.cxx:288
UShort_t WriteProcessID(TProcessID *pid) override
Check if the ProcessID pid is already in the message.
Definition: TMessage.cxx:440
Int_t Compress()
Compress the message.
Definition: TMessage.cxx:306
virtual ~TMessage()
Destructor.
Definition: TMessage.cxx:106
Int_t fCompress
Definition: TMessage.h:46
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition: TMessage.cxx:124
TMessage(const TMessage &)
void TagStreamerInfo(TVirtualStreamerInfo *info) override
Remember that the StreamerInfo is being used in writing.
Definition: TMessage.cxx:168
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
Set compression level.
Definition: TMessage.cxx:264
Int_t Uncompress()
Uncompress the message.
Definition: TMessage.cxx:391
Int_t GetCompressionLevel() const
Definition: TMessage.h:106
TBits fBitsPIDs
Definition: TMessage.h:43
TList * fInfos
Definition: TMessage.h:42
Int_t CompLength() const
Definition: TMessage.h:90
static Bool_t fgEvolution
Definition: TMessage.h:52
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition: TMessage.cxx:222
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
Set compression algorithm.
Definition: TMessage.cxx:242
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition: TObject.cxx:375
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition: TObject.cxx:694
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:891
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition: TProcessID.h:69
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Definition: TProcessID.cxx:341
Abstract Interface class describing Streamer information for one class.
tbb::task_arena is an alias of tbb::interface7::task_arena, which doesn't allow to forward declare tb...
Definition: StringConv.hxx:21
static const char * what
Definition: stlLoader.cc:6
EValues
Note: this is only temporarily a struct and will become a enum class hence the name.
Definition: Compression.h:83
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
Definition: Compression.h:100
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition: Compression.h:68