Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMessage.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 19/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMessage //
15// //
16// Message buffer class used for serializing objects and sending them //
17// over a network. This class inherits from TBuffer the basic I/O //
18// serializer. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "TMessage.h"
23#include "Compression.h"
25#include "TList.h"
26#include "Bytes.h"
27#include "TProcessID.h"
28#include "RZip.h"
29
31
32
33
34////////////////////////////////////////////////////////////////////////////////
35/// Create a TMessage object for storing objects. The "what" integer
36/// describes the type of message. Predefined ROOT system message types
37/// can be found in MessageTypes.h. Make sure your own message types are
38/// unique from the ROOT defined message types (i.e. 0 - 10000 are
39/// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
40/// will wait for an acknowledgment from the remote side. This makes
41/// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
42/// the message will be compressed in TSocket using the zip algorithm
43/// (only if message is > 256 bytes).
44
47 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
48{
49 // space at the beginning of the message reserved for the message length
50 UInt_t reserved = 0;
51 *this << reserved;
52
53 fWhat = what;
54 *this << what;
55
56 fClass = nullptr;
57 fBufComp = nullptr;
58 fBufCompCur = nullptr;
59 fCompPos = nullptr;
60 fInfos = nullptr;
62
64}
65
66////////////////////////////////////////////////////////////////////////////////
67/// Create a TMessage object for reading objects. The objects will be
68/// read from buf. Use the What() method to get the message type.
69
71 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
72{
73 // skip space at the beginning of the message reserved for the message length
74 fBufCur += sizeof(UInt_t);
75
76 *this >> fWhat;
77
78 fBufComp = nullptr;
79 fBufCompCur = nullptr;
80 fCompPos = nullptr;
81 fInfos = nullptr;
83
84 if (fWhat & kMESS_ZIP) {
85 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
88 fBuffer = nullptr;
89 Uncompress();
90 if (adopt) {
91 SetBit(kIsOwnerComp); // bufcomp points to the original buf
92 }
93 }
94
95 if (fWhat == kMESS_OBJECT) {
96 InitMap();
97 fClass = ReadClass(); // get first the class stored in message
98 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
99 ResetMap();
100 } else {
101 fClass = nullptr;
102 }
103}
104
105////////////////////////////////////////////////////////////////////////////////
106/// Destructor
107
109{
110 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
112 delete [] fBufComp;
113 delete fInfos;
114}
115
116////////////////////////////////////////////////////////////////////////////////
117/// Static function enabling or disabling the automatic schema evolution.
118/// By default schema evolution support is off.
119
124
125////////////////////////////////////////////////////////////////////////////////
126/// Static function returning status of global schema evolution.
127
132
133////////////////////////////////////////////////////////////////////////////////
134/// Force writing the TStreamerInfo to the message.
135
137{
138 if (fgEvolution || fEvolution) {
139 if (!fInfos) fInfos = new TList();
140 fInfos->Add(info);
141 }
142}
143
144////////////////////////////////////////////////////////////////////////////////
145/// Change a buffer that was received into one that can be send, i.e.
146/// forward a just received message.
147
149{
150 if (IsReading()) {
151 SetWriteMode();
154
155 if (fBufComp) {
157 }
158 }
159}
160
161////////////////////////////////////////////////////////////////////////////////
162/// Remember that the StreamerInfo is being used in writing.
163///
164/// When support for schema evolution is enabled the list of TStreamerInfo
165/// used to stream this object is kept in fInfos. This information is used
166/// by TSocket::Send that sends this list through the socket. This list is in
167/// turn used by TSocket::Recv to store the TStreamerInfo objects in the
168/// relevant TClass in case the TClass does not know yet about a particular
169/// class version. This feature is implemented to support clients and servers
170/// with either different ROOT versions or different user classes versions.
171
173{
174 if (fgEvolution || fEvolution) {
175 if (!fInfos) fInfos = new TList();
176 fInfos->Add(info);
177 }
178}
179
180////////////////////////////////////////////////////////////////////////////////
181/// Reset the message buffer so we can use (i.e. fill) it again.
182
184{
185 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
186 ResetMap();
187
188 if (fBufComp) {
189 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
191 delete [] fBufComp;
192 fBufComp = nullptr;
193 fBufCompCur = nullptr;
194 fCompPos = nullptr;
195 }
196
197 if (fgEvolution || fEvolution) {
198 if (fInfos)
199 fInfos->Clear();
200 }
202}
203
204////////////////////////////////////////////////////////////////////////////////
205/// Set the message length at the beginning of the message buffer.
206/// This method is only called by TSocket::Send().
207
209{
210 if (IsWriting()) {
211 char *buf = Buffer();
212 if (buf)
213 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
214
215 if (fBufComp) {
216 buf = fBufComp;
217 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
218 }
219 }
220}
221
222////////////////////////////////////////////////////////////////////////////////
223/// Using this method one can change the message type a-posteriori
224/// In case you OR "what" with kMESS_ACK, the message will wait for
225/// an acknowledgment from the remote side. This makes the sending
226/// process synchronous.
227
229{
230 fWhat = what;
231
232 char *buf = Buffer();
233 if (buf) {
234 buf += sizeof(UInt_t); // skip reserved length space
235 tobuf(buf, what);
236 }
237
238 if (fBufComp) {
239 buf = fBufComp;
240 buf += sizeof(UInt_t); // skip reserved length space
241 tobuf(buf, what | kMESS_ZIP);
242 }
243}
244
245////////////////////////////////////////////////////////////////////////////////
246/// Set compression algorithm
247
249{
252 if (fCompress < 0) {
254 } else {
255 int level = fCompress % 100;
256 newCompress = 100 * algorithm + level;
257 }
258 if (newCompress != fCompress && fBufComp) {
259 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
261 delete [] fBufComp;
262 fBufComp = nullptr;
263 fBufCompCur = nullptr;
264 fCompPos = nullptr;
265 }
267}
268
269////////////////////////////////////////////////////////////////////////////////
270/// Set compression level
271
273{
274 if (level < 0) level = 0;
275 if (level > 99) level = 99;
277 if (fCompress < 0) {
278 newCompress = level;
279 } else {
280 int algorithm = fCompress / 100;
282 newCompress = 100 * algorithm + level;
283 }
284 if (newCompress != fCompress && fBufComp) {
285 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
287 delete [] fBufComp;
288 fBufComp = nullptr;
289 fBufCompCur = nullptr;
290 fCompPos = nullptr;
291 }
293}
294
295////////////////////////////////////////////////////////////////////////////////
296/// Set compression settings
297
299{
300 if (settings != fCompress && fBufComp) {
301 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
303 delete [] fBufComp;
304 fBufComp = nullptr;
305 fBufCompCur = nullptr;
306 fCompPos = nullptr;
307 }
309}
310
311////////////////////////////////////////////////////////////////////////////////
312/// Compress the message. The message will only be compressed if the
313/// compression level > 0 and the if the message is > 256 bytes.
314/// Returns -1 in case of error (when compression fails or
315/// when the message increases in size in some pathological cases),
316/// otherwise returns 0.
317
319{
322 if (compressionLevel <= 0) {
323 // no compression specified
324 if (fBufComp) {
325 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
327 delete [] fBufComp;
328 fBufComp = nullptr;
329 fBufCompCur = nullptr;
330 fCompPos = nullptr;
331 }
332 return 0;
333 }
334
335 if (fBufComp && fCompPos == fBufCur) {
336 // the message was already compressed
337 return 0;
338 }
339
340 // remove any existing compressed buffer before compressing modified message
341 if (fBufComp) {
342 // We only own fBufComp when we explictly created it or adopt and kMESS_ZIP were true
344 delete [] fBufComp;
345 fBufComp = nullptr;
346 fBufCompCur = nullptr;
347 fCompPos = nullptr;
348 }
349
350 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
351 // this message is too small to be compressed
352 return 0;
353 }
354
355 if (!Buffer()) {
356 // error condition, should never happen
357 return -1;
358 }
359
360 Int_t hdrlen = 2*sizeof(UInt_t);
362 Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
363 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
364 Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
365 fBufComp = new char[buflen];
366 char *messbuf = Buffer() + hdrlen;
367 char *bufcur = fBufComp + chdrlen;
369 Int_t nzip = 0;
370 Int_t nout, bufmax;
371 for (Int_t i = 0; i < nbuffers; ++i) {
372 if (i == nbuffers - 1)
373 bufmax = messlen - nzip;
374 else
375 bufmax = kMAXZIPBUF;
378 if (nout == 0 || nout >= messlen) {
379 //this happens when the buffer cannot be compressed
381 delete [] fBufComp;
382 fBufComp = nullptr;
383 fBufCompCur = nullptr;
384 fCompPos = nullptr;
385 return -1;
386 }
387 bufcur += nout;
389 nzip += kMAXZIPBUF;
390 }
393
395 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
397 tobuf(bufcur, what);
398 tobuf(bufcur, Length()); // original uncompressed buffer length
399
400 return 0;
401}
402
403////////////////////////////////////////////////////////////////////////////////
404/// Uncompress the message. The message will only be uncompressed when
405/// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
406
408{
409 if (!fBufComp || !(fWhat & kMESS_ZIP))
410 return -1;
411
412 Int_t buflen;
413 Int_t hdrlen = 2*sizeof(UInt_t);
414 char *bufcur1 = fBufComp + hdrlen;
415 frombuf(bufcur1, &buflen);
417
418 /* early consistency check */
419 Int_t nin, nbuf;
420 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
421 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
422 return -1;
423 }
424
425 fBuffer = new char[buflen];
426 fBufSize = buflen;
427 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
429 char *messbuf = fBuffer + hdrlen;
430
431 // Force being owner of the newly created buffer
433
434 Int_t nout;
435 Int_t noutot = 0;
436 while (1) {
438 if (hc!=0) break;
439 R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
440 if (!nout) break;
441 noutot += nout;
442 if (noutot >= buflen - hdrlen) break;
443 bufcur += nin;
444 messbuf += nout;
445 }
446
447 fWhat &= ~kMESS_ZIP;
448 fCompress = 1;
449
450 return 0;
451}
452
453////////////////////////////////////////////////////////////////////////////////
454/// Check if the ProcessID pid is already in the message.
455/// If not, then:
456/// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
457/// - mark bit uid+1 where uid id the uid of the ProcessID
458
460{
461 if (fBitsPIDs.TestBitNumber(0)) return 0;
462 if (!pid)
463 pid = TProcessID::GetPID();
464 if (!pid) return 0;
466 UInt_t uid = pid->GetUniqueID();
467 fBitsPIDs.SetBitNumber(uid+1);
468 return 1;
469}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
void tobuf(char *&buf, Bool_t x)
Definition Bytes.h:55
@ kMESS_OBJECT
@ kMESS_ZIP
bool Bool_t
Boolean (0=false, 1=true) (bool)
Definition RtypesCore.h:77
unsigned int UInt_t
Unsigned integer 4 bytes (unsigned int)
Definition RtypesCore.h:60
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition TBits.cxx:480
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket.
Definition TBufferFile.h:47
TClass * ReadClass(const TClass *cl=nullptr, UInt_t *objTag=nullptr) override
Read class definition from I/O buffer.
void InitMap() override
Create the fMap container and initialize them with the null object.
void ResetMap() override
Delete existing fMap and reset map counter.
Buffer base class used for serializing objects.
Definition TBuffer.h:43
void SetWriteMode()
Set buffer in write mode.
Definition TBuffer.cxx:314
Int_t fBufSize
Definition TBuffer.h:50
@ kCannotHandleMemberWiseStreaming
Definition TBuffer.h:76
@ kIsOwner
Definition TBuffer.h:75
char * fBufMax
Definition TBuffer.h:53
char * fBufCur
Definition TBuffer.h:52
Bool_t IsWriting() const
Definition TBuffer.h:87
Bool_t IsReading() const
Definition TBuffer.h:86
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
char * fBuffer
Definition TBuffer.h:51
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A doubly linked list.
Definition TList.h:38
void Clear(Option_t *option="") override
Remove all objects from the list.
Definition TList.cxx:399
void Add(TObject *obj) override
Definition TList.h:81
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:120
@ kIsOwnerComp
Definition TMessage.h:63
UInt_t fWhat
Definition TMessage.h:44
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force) override
Force writing the TStreamerInfo to the message.
Definition TMessage.cxx:136
char * fCompPos
Definition TMessage.h:49
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition TMessage.cxx:148
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:208
Int_t GetCompressionAlgorithm() const
Definition TMessage.h:103
TClass * fClass
Definition TMessage.h:45
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:183
char * fBufComp
Definition TMessage.h:47
char * fBufCompCur
Definition TMessage.h:48
Bool_t fEvolution
Definition TMessage.h:50
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Set compression settings.
Definition TMessage.cxx:298
UShort_t WriteProcessID(TProcessID *pid) override
Check if the ProcessID pid is already in the message.
Definition TMessage.cxx:459
Int_t Compress()
Compress the message.
Definition TMessage.cxx:318
virtual ~TMessage()
Destructor.
Definition TMessage.cxx:108
Int_t fCompress
Definition TMessage.h:46
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition TMessage.cxx:128
TMessage(const TMessage &)
void TagStreamerInfo(TVirtualStreamerInfo *info) override
Remember that the StreamerInfo is being used in writing.
Definition TMessage.cxx:172
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
Set compression level.
Definition TMessage.cxx:272
Int_t Uncompress()
Uncompress the message.
Definition TMessage.cxx:407
Int_t GetCompressionLevel() const
Definition TMessage.h:109
TBits fBitsPIDs
Definition TMessage.h:43
TList * fInfos
Definition TMessage.h:42
Int_t CompLength() const
Definition TMessage.h:93
static Bool_t fgEvolution
Definition TMessage.h:52
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:228
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
Set compression algorithm.
Definition TMessage.cxx:248
R__ALWAYS_INLINE Bool_t TestBit(UInt_t f) const
Definition TObject.h:202
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:475
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:864
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1071
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Abstract Interface class describing Streamer information for one class.
static const char * what
Definition stlLoader.cc:5
EValues
Note: this is only temporarily a struct and will become a enum class hence the name convention used.
Definition Compression.h:88
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:72