Logo ROOT  
Reference Guide
Loading...
Searching...
No Matches
TMessage.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 19/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMessage //
15// //
16// Message buffer class used for serializing objects and sending them //
17// over a network. This class inherits from TBuffer the basic I/O //
18// serializer. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "TMessage.h"
23#include "Compression.h"
25#include "TList.h"
26#include "Bytes.h"
27#include "TProcessID.h"
28#include "RZip.h"
29
31
32
33
34////////////////////////////////////////////////////////////////////////////////
35/// Create a TMessage object for storing objects. The "what" integer
36/// describes the type of message. Predefined ROOT system message types
37/// can be found in MessageTypes.h. Make sure your own message types are
38/// unique from the ROOT defined message types (i.e. 0 - 10000 are
39/// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
40/// will wait for an acknowledgment from the remote side. This makes
41/// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
42/// the message will be compressed in TSocket using the zip algorithm
43/// (only if message is > 256 bytes).
44
46 TBufferFile(TBuffer::kWrite, bufsize + 2*sizeof(UInt_t)),
47 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
48{
49 // space at the beginning of the message reserved for the message length
50 UInt_t reserved = 0;
51 *this << reserved;
52
53 fWhat = what;
54 *this << what;
55
56 fClass = nullptr;
57 fBufComp = nullptr;
58 fBufCompCur = nullptr;
59 fCompPos = nullptr;
60 fInfos = nullptr;
62
64}
65
66////////////////////////////////////////////////////////////////////////////////
67/// Create a TMessage object for reading objects. The objects will be
68/// read from buf. Use the What() method to get the message type.
69
70TMessage::TMessage(void *buf, Int_t bufsize, Bool_t adopt) : TBufferFile(TBuffer::kRead, bufsize, buf, adopt),
71 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
72{
73 // skip space at the beginning of the message reserved for the message length
74 fBufCur += sizeof(UInt_t);
75
76 *this >> fWhat;
77
78 fBufComp = nullptr;
79 fBufCompCur = nullptr;
80 fCompPos = nullptr;
81 fInfos = nullptr;
83
84 if (fWhat & kMESS_ZIP) {
85 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
87 fBufCompCur = fBuffer + bufsize;
88 fBuffer = nullptr;
89 Uncompress();
90 if (adopt) {
91 SetBit(kIsOwnerComp); // bufcomp points to the original buf
92 }
93 }
94
95 if (fWhat == kMESS_OBJECT) {
96 InitMap();
97 fClass = ReadClass(); // get first the class stored in message
98 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
99 ResetMap();
100 } else {
101 fClass = nullptr;
102 }
103}
104
105////////////////////////////////////////////////////////////////////////////////
106/// Destructor
107
109{
110 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
112 delete [] fBufComp;
113 delete fInfos;
114}
115
116////////////////////////////////////////////////////////////////////////////////
117/// Static function enabling or disabling the automatic schema evolution.
118/// By default schema evolution support is off.
119
121{
122 fgEvolution = enable;
123}
124
125////////////////////////////////////////////////////////////////////////////////
126/// Static function returning status of global schema evolution.
127
132
133////////////////////////////////////////////////////////////////////////////////
134/// Force writing the TStreamerInfo to the message.
135
137{
138 if (fgEvolution || fEvolution) {
139 if (!fInfos) fInfos = new TList();
140 fInfos->Add(info);
141 }
142}
143
144////////////////////////////////////////////////////////////////////////////////
145/// Change a buffer that was received into one that can be send, i.e.
146/// forward a just received message.
147
149{
150 if (IsReading()) {
151 SetWriteMode();
154
155 if (fBufComp) {
157 }
158 }
159}
160
161////////////////////////////////////////////////////////////////////////////////
162/// Remember that the StreamerInfo is being used in writing.
163///
164/// When support for schema evolution is enabled the list of TStreamerInfo
165/// used to stream this object is kept in fInfos. This information is used
166/// by TSocket::Send that sends this list through the socket. This list is in
167/// turn used by TSocket::Recv to store the TStreamerInfo objects in the
168/// relevant TClass in case the TClass does not know yet about a particular
169/// class version. This feature is implemented to support clients and servers
170/// with either different ROOT versions or different user classes versions.
171
173{
174 if (fgEvolution || fEvolution) {
175 if (!fInfos) fInfos = new TList();
176 fInfos->Add(info);
177 }
178}
179
180////////////////////////////////////////////////////////////////////////////////
181/// Reset the message buffer so we can use (i.e. fill) it again.
182
184{
185 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
186 ResetMap();
187
188 if (fBufComp) {
189 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
191 delete [] fBufComp;
192 fBufComp = nullptr;
193 fBufCompCur = nullptr;
194 fCompPos = nullptr;
195 }
196
197 if (fgEvolution || fEvolution) {
198 if (fInfos)
199 fInfos->Clear();
200 }
201 fBitsPIDs.ResetAllBits();
202}
203
204////////////////////////////////////////////////////////////////////////////////
205/// Set the message length at the beginning of the message buffer.
206/// This method is only called by TSocket::Send().
207
209{
210 if (IsWriting()) {
211 char *buf = Buffer();
212 if (buf)
213 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
214
215 if (fBufComp) {
216 buf = fBufComp;
217 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
218 }
219 }
220}
221
222////////////////////////////////////////////////////////////////////////////////
223/// Using this method one can change the message type a-posteriori
224/// In case you OR "what" with kMESS_ACK, the message will wait for
225/// an acknowledgment from the remote side. This makes the sending
226/// process synchronous.
227
229{
230 fWhat = what;
231
232 char *buf = Buffer();
233 if (buf) {
234 buf += sizeof(UInt_t); // skip reserved length space
235 tobuf(buf, what);
236 }
237
238 if (fBufComp) {
239 buf = fBufComp;
240 buf += sizeof(UInt_t); // skip reserved length space
241 tobuf(buf, what | kMESS_ZIP);
242 }
243}
244
245////////////////////////////////////////////////////////////////////////////////
246/// Set compression algorithm
247
249{
250 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
251 Int_t newCompress;
252 if (fCompress < 0) {
253 newCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
254 } else {
255 int level = fCompress % 100;
256 newCompress = 100 * algorithm + level;
257 }
258 if (newCompress != fCompress && fBufComp) {
259 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
261 delete [] fBufComp;
262 fBufComp = nullptr;
263 fBufCompCur = nullptr;
264 fCompPos = nullptr;
265 }
266 fCompress = newCompress;
267}
268
269////////////////////////////////////////////////////////////////////////////////
270/// Set compression level
271
273{
274 if (level < 0) level = 0;
275 if (level > 99) level = 99;
276 Int_t newCompress;
277 if (fCompress < 0) {
278 newCompress = level;
279 } else {
280 int algorithm = fCompress / 100;
281 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
282 newCompress = 100 * algorithm + level;
283 }
284 if (newCompress != fCompress && fBufComp) {
285 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
287 delete [] fBufComp;
288 fBufComp = nullptr;
289 fBufCompCur = nullptr;
290 fCompPos = nullptr;
291 }
292 fCompress = newCompress;
293}
294
295////////////////////////////////////////////////////////////////////////////////
296/// Set compression settings
297
299{
300 if (settings != fCompress && fBufComp) {
301 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
303 delete [] fBufComp;
304 fBufComp = nullptr;
305 fBufCompCur = nullptr;
306 fCompPos = nullptr;
307 }
308 fCompress = settings;
309}
310
311////////////////////////////////////////////////////////////////////////////////
312/// Compress the message. The message will only be compressed if the
313/// compression level > 0 and the if the message is > 256 bytes.
314/// Returns -1 in case of error (when compression fails or
315/// when the message increases in size in some pathological cases),
316/// otherwise returns 0.
317
319{
320 Int_t compressionLevel = GetCompressionLevel();
321 Int_t compressionAlgorithm = GetCompressionAlgorithm();
322 if (compressionLevel <= 0) {
323 // no compression specified
324 if (fBufComp) {
325 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
327 delete [] fBufComp;
328 fBufComp = nullptr;
329 fBufCompCur = nullptr;
330 fCompPos = nullptr;
331 }
332 return 0;
333 }
334
335 if (fBufComp && fCompPos == fBufCur) {
336 // the message was already compressed
337 return 0;
338 }
339
340 // remove any existing compressed buffer before compressing modified message
341 if (fBufComp) {
342 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
344 delete [] fBufComp;
345 fBufComp = nullptr;
346 fBufCompCur = nullptr;
347 fCompPos = nullptr;
348 }
349
350 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
351 // this message is too small to be compressed
352 return 0;
353 }
354
355 if (!Buffer()) {
356 // error condition, should never happen
357 return -1;
358 }
359
360 Int_t hdrlen = 2*sizeof(UInt_t);
361 Int_t messlen = Length() - hdrlen;
362 Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
363 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
364 Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
365 fBufComp = new char[buflen];
366 char *messbuf = Buffer() + hdrlen;
367 char *bufcur = fBufComp + chdrlen;
369 Int_t nzip = 0;
370 Int_t nout, bufmax;
371 for (Int_t i = 0; i < nbuffers; ++i) {
372 if (i == nbuffers - 1)
373 bufmax = messlen - nzip;
374 else
375 bufmax = kMAXZIPBUF;
376 R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout,
377 static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(compressionAlgorithm));
378 if (nout == 0 || nout >= messlen) {
379 //this happens when the buffer cannot be compressed
381 delete [] fBufComp;
382 fBufComp = nullptr;
383 fBufCompCur = nullptr;
384 fCompPos = nullptr;
385 return -1;
386 }
387 bufcur += nout;
388 messbuf += kMAXZIPBUF;
389 nzip += kMAXZIPBUF;
390 }
391 fBufCompCur = bufcur;
393
394 bufcur = fBufComp;
395 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
397 tobuf(bufcur, what);
398 tobuf(bufcur, Length()); // original uncompressed buffer length
399
400 return 0;
401}
402
403////////////////////////////////////////////////////////////////////////////////
404/// Uncompress the message. The message will only be uncompressed when
405/// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
406
408{
409 if (!fBufComp || !(fWhat & kMESS_ZIP))
410 return -1;
411
412 Int_t nbytesRemain = CompLength();
413 if (nbytesRemain < static_cast<Int_t>(3 * sizeof(Int_t))) {
414 Error("Uncompress", "Compressed buffer too short (%d)", CompLength());
415 return -1;
416 }
417 Int_t buflen;
418 Int_t hdrlen = 2*sizeof(UInt_t);
419 char *bufcur1 = fBufComp + hdrlen;
420 frombuf(bufcur1, &buflen);
421 UChar_t *bufcur = (UChar_t*)bufcur1;
422 nbytesRemain -= 3 * sizeof(Int_t);
423
424 if (buflen < hdrlen) {
425 Error("Uncompress", "Uncompressed buffer length too short (%d)", buflen);
426 return -1;
427 }
428
429 /* early consistency check */
430 Int_t nin = 0;
431 Int_t nbuf = 0;
432 if ((nbytesRemain < ROOT::Internal::kZipHeaderSize) || R__unzip_header(&nin, bufcur, &nbuf) != 0) {
433 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
434 return -1;
435 }
436
437 fBuffer = new char[buflen];
438 fBufSize = buflen;
439 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
441 char *messbuf = fBuffer + hdrlen;
442
443 // Force being owner of the newly created buffer
445
446 Int_t nout = 0;
447 Int_t noutot = 0;
448 Int_t objlenRemain = buflen - hdrlen;
449 while (nbytesRemain >= ROOT::Internal::kZipHeaderSize) {
450 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
451 if ((hc != 0) || (nin > nbytesRemain) || (nbuf > objlenRemain))
452 break;
453 R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
454 if (!nout) break;
455 noutot += nout;
456 if (noutot >= buflen - hdrlen) break;
457 bufcur += nin;
458 messbuf += nout;
459 nbytesRemain -= nin;
460 objlenRemain -= nout;
461 }
462
463 if (noutot != buflen - hdrlen) {
464 Error("Uncompress", "buflen = %d, objlenRemain = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d", buflen, objlenRemain,
465 noutot, nout, nin, nbuf);
466 delete[] fBuffer;
467 fBuffer = fBufCur = fBufMax = nullptr;
468 fBufSize = 0;
469 return -1;
470 }
471
472 fWhat &= ~kMESS_ZIP;
473 fCompress = 1;
474
475 return 0;
476}
477
478////////////////////////////////////////////////////////////////////////////////
479/// Check if the ProcessID pid is already in the message.
480/// If not, then:
481/// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
482/// - mark bit uid+1 where uid id the uid of the ProcessID
483
485{
486 if (fBitsPIDs.TestBitNumber(0)) return 0;
487 if (!pid)
488 pid = TProcessID::GetPID();
489 if (!pid) return 0;
490 fBitsPIDs.SetBitNumber(0);
491 UInt_t uid = pid->GetUniqueID();
492 fBitsPIDs.SetBitNumber(uid+1);
493 return 1;
494}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:270
void tobuf(char *&buf, Bool_t x)
Definition Bytes.h:55
@ kMESS_OBJECT
@ kMESS_ZIP
unsigned short UShort_t
Unsigned Short integer 2 bytes (unsigned short).
Definition RtypesCore.h:54
int Int_t
Signed integer 4 bytes (int).
Definition RtypesCore.h:59
unsigned char UChar_t
Unsigned Character 1 byte (unsigned char).
Definition RtypesCore.h:52
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int).
Definition RtypesCore.h:60
bool Bool_t
Boolean (0=false, 1=true) (bool).
Definition RtypesCore.h:77
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
TClass * ReadClass(const TClass *cl=nullptr, UInt_t *objTag=nullptr) override
Read class definition from I/O buffer.
void InitMap() override
Create the fMap container and initialize them with the null object.
void ResetMap() override
Delete existing fMap and reset map counter.
TBuffer()
Definition TBuffer.h:59
void SetWriteMode()
Set buffer in write mode.
Definition TBuffer.cxx:315
Int_t fBufSize
Definition TBuffer.h:50
@ kCannotHandleMemberWiseStreaming
Definition TBuffer.h:76
@ kIsOwner
Definition TBuffer.h:75
@ kWrite
Definition TBuffer.h:73
@ kRead
Definition TBuffer.h:73
char * fBufMax
Definition TBuffer.h:53
char * fBufCur
Definition TBuffer.h:52
Bool_t IsWriting() const
Definition TBuffer.h:87
Bool_t IsReading() const
Definition TBuffer.h:86
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
char * fBuffer
Definition TBuffer.h:51
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A doubly linked list.
Definition TList.h:38
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:120
@ kIsOwnerComp
Definition TMessage.h:62
UInt_t fWhat
Definition TMessage.h:43
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force) override
Force writing the TStreamerInfo to the message.
Definition TMessage.cxx:136
char * fCompPos
Definition TMessage.h:48
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition TMessage.cxx:148
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:208
Int_t GetCompressionAlgorithm() const
Definition TMessage.h:102
TClass * fClass
Definition TMessage.h:44
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:183
char * fBufComp
Definition TMessage.h:46
char * fBufCompCur
Definition TMessage.h:47
Bool_t fEvolution
Definition TMessage.h:49
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Set compression settings.
Definition TMessage.cxx:298
UShort_t WriteProcessID(TProcessID *pid) override
Check if the ProcessID pid is already in the message.
Definition TMessage.cxx:484
Int_t Compress()
Compress the message.
Definition TMessage.cxx:318
virtual ~TMessage()
Destructor.
Definition TMessage.cxx:108
Int_t fCompress
Definition TMessage.h:45
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition TMessage.cxx:128
TMessage(const TMessage &)
void TagStreamerInfo(TVirtualStreamerInfo *info) override
Remember that the StreamerInfo is being used in writing.
Definition TMessage.cxx:172
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
Set compression level.
Definition TMessage.cxx:272
Int_t Uncompress()
Uncompress the message.
Definition TMessage.cxx:407
Int_t GetCompressionLevel() const
Definition TMessage.h:108
TBits fBitsPIDs
Definition TMessage.h:42
TList * fInfos
Definition TMessage.h:41
Int_t CompLength() const
Definition TMessage.h:92
static Bool_t fgEvolution
Definition TMessage.h:51
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:228
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
Set compression algorithm.
Definition TMessage.cxx:248
Bool_t TestBit(UInt_t f) const
Definition TObject.h:204
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:480
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:888
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1098
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Abstract Interface class describing Streamer information for one class.
static const char * what
Definition stlLoader.cc:5
EValues
Note: this is only temporarily a struct and will become a enum class hence the name convention used.
Definition Compression.h:88
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression).
Definition Compression.h:72