Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMessage.cxx
Go to the documentation of this file.
1// @(#)root/net:$Id$
2// Author: Fons Rademakers 19/12/96
3
4/*************************************************************************
5 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMessage //
15// //
16// Message buffer class used for serializing objects and sending them //
17// over a network. This class inherits from TBuffer the basic I/O //
18// serializer. //
19// //
20//////////////////////////////////////////////////////////////////////////
21
22#include "TMessage.h"
23#include "Compression.h"
25#include "TList.h"
26#include "Bytes.h"
27#include "TProcessID.h"
28#include "RZip.h"
29
31
32
34
35////////////////////////////////////////////////////////////////////////////////
36/// Create a TMessage object for storing objects. The "what" integer
37/// describes the type of message. Predefined ROOT system message types
38/// can be found in MessageTypes.h. Make sure your own message types are
39/// unique from the ROOT defined message types (i.e. 0 - 10000 are
40/// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41/// will wait for an acknowledgment from the remote side. This makes
42/// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43/// the message will be compressed in TSocket using the zip algorithm
44/// (only if message is > 256 bytes).
45
47 TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t)),
48 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
49{
50 // space at the beginning of the message reserved for the message length
51 UInt_t reserved = 0;
52 *this << reserved;
53
54 fWhat = what;
55 *this << what;
56
57 fClass = nullptr;
58 fBufComp = nullptr;
59 fBufCompCur = nullptr;
60 fCompPos = nullptr;
61 fInfos = nullptr;
63
65}
66
67////////////////////////////////////////////////////////////////////////////////
68/// Create a TMessage object for reading objects. The objects will be
69/// read from buf. Use the What() method to get the message type.
70
71TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf),
72 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
73{
74 // skip space at the beginning of the message reserved for the message length
75 fBufCur += sizeof(UInt_t);
76
77 *this >> fWhat;
78
79 fBufComp = nullptr;
80 fBufCompCur = nullptr;
81 fCompPos = nullptr;
82 fInfos = nullptr;
84
85 if (fWhat & kMESS_ZIP) {
86 // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
88 fBufCompCur = fBuffer + bufsize;
89 fBuffer = nullptr;
90 Uncompress();
91 }
92
93 if (fWhat == kMESS_OBJECT) {
94 InitMap();
95 fClass = ReadClass(); // get first the class stored in message
96 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
97 ResetMap();
98 } else {
99 fClass = nullptr;
100 }
101}
102
103////////////////////////////////////////////////////////////////////////////////
104/// Destructor
105
107{
108 delete [] fBufComp;
109 delete fInfos;
110}
111
112////////////////////////////////////////////////////////////////////////////////
113/// Static function enabling or disabling the automatic schema evolution.
114/// By default schema evolution support is off.
115
117{
118 fgEvolution = enable;
119}
120
121////////////////////////////////////////////////////////////////////////////////
122/// Static function returning status of global schema evolution.
123
125{
126 return fgEvolution;
127}
128
129////////////////////////////////////////////////////////////////////////////////
130/// Force writing the TStreamerInfo to the message.
131
133{
134 if (fgEvolution || fEvolution) {
135 if (!fInfos) fInfos = new TList();
136 fInfos->Add(info);
137 }
138}
139
140////////////////////////////////////////////////////////////////////////////////
141/// Change a buffer that was received into one that can be send, i.e.
142/// forward a just received message.
143
145{
146 if (IsReading()) {
147 SetWriteMode();
150
151 if (fBufComp) {
153 }
154 }
155}
156
157////////////////////////////////////////////////////////////////////////////////
158/// Remember that the StreamerInfo is being used in writing.
159///
160/// When support for schema evolution is enabled the list of TStreamerInfo
161/// used to stream this object is kept in fInfos. This information is used
162/// by TSocket::Send that sends this list through the socket. This list is in
163/// turn used by TSocket::Recv to store the TStreamerInfo objects in the
164/// relevant TClass in case the TClass does not know yet about a particular
165/// class version. This feature is implemented to support clients and servers
166/// with either different ROOT versions or different user classes versions.
167
169{
170 if (fgEvolution || fEvolution) {
171 if (!fInfos) fInfos = new TList();
172 fInfos->Add(info);
173 }
174}
175
176////////////////////////////////////////////////////////////////////////////////
177/// Reset the message buffer so we can use (i.e. fill) it again.
178
180{
181 SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
182 ResetMap();
183
184 if (fBufComp) {
185 delete [] fBufComp;
186 fBufComp = nullptr;
187 fBufCompCur = nullptr;
188 fCompPos = nullptr;
189 }
190
191 if (fgEvolution || fEvolution) {
192 if (fInfos)
193 fInfos->Clear();
194 }
196}
197
198////////////////////////////////////////////////////////////////////////////////
199/// Set the message length at the beginning of the message buffer.
200/// This method is only called by TSocket::Send().
201
203{
204 if (IsWriting()) {
205 char *buf = Buffer();
206 if (buf)
207 tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
208
209 if (fBufComp) {
210 buf = fBufComp;
211 tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
212 }
213 }
214}
215
216////////////////////////////////////////////////////////////////////////////////
217/// Using this method one can change the message type a-posteriori
218/// In case you OR "what" with kMESS_ACK, the message will wait for
219/// an acknowledgment from the remote side. This makes the sending
220/// process synchronous.
221
223{
224 fWhat = what;
225
226 char *buf = Buffer();
227 if (buf) {
228 buf += sizeof(UInt_t); // skip reserved length space
229 tobuf(buf, what);
230 }
231
232 if (fBufComp) {
233 buf = fBufComp;
234 buf += sizeof(UInt_t); // skip reserved length space
235 tobuf(buf, what | kMESS_ZIP);
236 }
237}
238
239////////////////////////////////////////////////////////////////////////////////
240/// Set compression algorithm
241
243{
244 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
245 Int_t newCompress;
246 if (fCompress < 0) {
247 newCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
248 } else {
249 int level = fCompress % 100;
250 newCompress = 100 * algorithm + level;
251 }
252 if (newCompress != fCompress && fBufComp) {
253 delete [] fBufComp;
254 fBufComp = nullptr;
255 fBufCompCur = nullptr;
256 fCompPos = nullptr;
257 }
258 fCompress = newCompress;
259}
260
261////////////////////////////////////////////////////////////////////////////////
262/// Set compression level
263
265{
266 if (level < 0) level = 0;
267 if (level > 99) level = 99;
268 Int_t newCompress;
269 if (fCompress < 0) {
270 newCompress = level;
271 } else {
272 int algorithm = fCompress / 100;
273 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
274 newCompress = 100 * algorithm + level;
275 }
276 if (newCompress != fCompress && fBufComp) {
277 delete [] fBufComp;
278 fBufComp = nullptr;
279 fBufCompCur = nullptr;
280 fCompPos = nullptr;
281 }
282 fCompress = newCompress;
283}
284
285////////////////////////////////////////////////////////////////////////////////
286/// Set compression settings
287
289{
290 if (settings != fCompress && fBufComp) {
291 delete [] fBufComp;
292 fBufComp = nullptr;
293 fBufCompCur = nullptr;
294 fCompPos = nullptr;
295 }
296 fCompress = settings;
297}
298
299////////////////////////////////////////////////////////////////////////////////
300/// Compress the message. The message will only be compressed if the
301/// compression level > 0 and the if the message is > 256 bytes.
302/// Returns -1 in case of error (when compression fails or
303/// when the message increases in size in some pathological cases),
304/// otherwise returns 0.
305
307{
308 Int_t compressionLevel = GetCompressionLevel();
309 Int_t compressionAlgorithm = GetCompressionAlgorithm();
310 if (compressionLevel <= 0) {
311 // no compression specified
312 if (fBufComp) {
313 delete [] fBufComp;
314 fBufComp = nullptr;
315 fBufCompCur = nullptr;
316 fCompPos = nullptr;
317 }
318 return 0;
319 }
320
321 if (fBufComp && fCompPos == fBufCur) {
322 // the message was already compressed
323 return 0;
324 }
325
326 // remove any existing compressed buffer before compressing modified message
327 if (fBufComp) {
328 delete [] fBufComp;
329 fBufComp = nullptr;
330 fBufCompCur = nullptr;
331 fCompPos = nullptr;
332 }
333
334 if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
335 // this message is too small to be compressed
336 return 0;
337 }
338
339 if (!Buffer()) {
340 // error condition, should never happen
341 return -1;
342 }
343
344 Int_t hdrlen = 2*sizeof(UInt_t);
345 Int_t messlen = Length() - hdrlen;
346 Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
347 Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
348 Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
349 fBufComp = new char[buflen];
350 char *messbuf = Buffer() + hdrlen;
351 char *bufcur = fBufComp + chdrlen;
352 Int_t nzip = 0;
353 Int_t nout, bufmax;
354 for (Int_t i = 0; i < nbuffers; ++i) {
355 if (i == nbuffers - 1)
356 bufmax = messlen - nzip;
357 else
358 bufmax = kMAXZIPBUF;
359 R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout,
360 static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(compressionAlgorithm));
361 if (nout == 0 || nout >= messlen) {
362 //this happens when the buffer cannot be compressed
363 delete [] fBufComp;
364 fBufComp = nullptr;
365 fBufCompCur = nullptr;
366 fCompPos = nullptr;
367 return -1;
368 }
369 bufcur += nout;
370 messbuf += kMAXZIPBUF;
371 nzip += kMAXZIPBUF;
372 }
373 fBufCompCur = bufcur;
375
376 bufcur = fBufComp;
377 tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
379 tobuf(bufcur, what);
380 tobuf(bufcur, Length()); // original uncompressed buffer length
381
382 return 0;
383}
384
385////////////////////////////////////////////////////////////////////////////////
386/// Uncompress the message. The message will only be uncompressed when
387/// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
388
390{
391 if (!fBufComp || !(fWhat & kMESS_ZIP))
392 return -1;
393
394 Int_t buflen;
395 Int_t hdrlen = 2*sizeof(UInt_t);
396 char *bufcur1 = fBufComp + hdrlen;
397 frombuf(bufcur1, &buflen);
398 UChar_t *bufcur = (UChar_t*)bufcur1;
399
400 /* early consistency check */
401 Int_t nin, nbuf;
402 if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
403 Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
404 return -1;
405 }
406
407 fBuffer = new char[buflen];
408 fBufSize = buflen;
409 fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
411 char *messbuf = fBuffer + hdrlen;
412
413 Int_t nout;
414 Int_t noutot = 0;
415 while (1) {
416 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
417 if (hc!=0) break;
418 R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
419 if (!nout) break;
420 noutot += nout;
421 if (noutot >= buflen - hdrlen) break;
422 bufcur += nin;
423 messbuf += nout;
424 }
425
426 fWhat &= ~kMESS_ZIP;
427 fCompress = 1;
428
429 return 0;
430}
431
432////////////////////////////////////////////////////////////////////////////////
433/// Check if the ProcessID pid is already in the message.
434/// If not, then:
435/// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
436/// - mark bit uid+1 where uid id the uid of the ProcessID
437
439{
440 if (fBitsPIDs.TestBitNumber(0)) return 0;
441 if (!pid)
442 pid = TProcessID::GetPID();
443 if (!pid) return 0;
445 UInt_t uid = pid->GetUniqueID();
446 fBitsPIDs.SetBitNumber(uid+1);
447 return 1;
448}
void frombuf(char *&buf, Bool_t *x)
Definition Bytes.h:278
void tobuf(char *&buf, Bool_t x)
Definition Bytes.h:55
@ kMESS_OBJECT
@ kMESS_ZIP
bool Bool_t
Definition RtypesCore.h:63
unsigned short UShort_t
Definition RtypesCore.h:40
unsigned char UChar_t
Definition RtypesCore.h:38
unsigned int UInt_t
Definition RtypesCore.h:46
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
#define ClassImp(name)
Definition Rtypes.h:377
void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout)
int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout)
void ResetAllBits(Bool_t value=kFALSE)
Reset all bits to 0 (false).
Definition TBits.cxx:481
Bool_t TestBitNumber(UInt_t bitnumber) const
Definition TBits.h:222
void SetBitNumber(UInt_t bitnumber, Bool_t value=kTRUE)
Definition TBits.h:206
The concrete implementation of TBuffer for writing/reading to/from a ROOT file or socket.
Definition TBufferFile.h:47
TClass * ReadClass(const TClass *cl=nullptr, UInt_t *objTag=nullptr) override
Read class definition from I/O buffer.
void InitMap() override
Create the fMap container and initialize them with the null object.
void ResetMap() override
Delete existing fMap and reset map counter.
Buffer base class used for serializing objects.
Definition TBuffer.h:43
void SetWriteMode()
Set buffer in write mode.
Definition TBuffer.cxx:315
Int_t fBufSize
Definition TBuffer.h:50
@ kCannotHandleMemberWiseStreaming
Definition TBuffer.h:76
char * fBufMax
Definition TBuffer.h:53
char * fBufCur
Definition TBuffer.h:52
Bool_t IsWriting() const
Definition TBuffer.h:87
Bool_t IsReading() const
Definition TBuffer.h:86
void SetBufferOffset(Int_t offset=0)
Definition TBuffer.h:93
char * fBuffer
Definition TBuffer.h:51
Int_t Length() const
Definition TBuffer.h:100
char * Buffer() const
Definition TBuffer.h:96
A doubly linked list.
Definition TList.h:38
void Clear(Option_t *option="") override
Remove all objects from the list.
Definition TList.cxx:402
void Add(TObject *obj) override
Definition TList.h:81
static void EnableSchemaEvolutionForAll(Bool_t enable=kTRUE)
Static function enabling or disabling the automatic schema evolution.
Definition TMessage.cxx:116
UInt_t fWhat
Definition TMessage.h:44
void ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t force) override
Force writing the TStreamerInfo to the message.
Definition TMessage.cxx:132
char * fCompPos
Definition TMessage.h:49
void Forward()
Change a buffer that was received into one that can be send, i.e.
Definition TMessage.cxx:144
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition TMessage.cxx:202
Int_t GetCompressionAlgorithm() const
Definition TMessage.h:100
TClass * fClass
Definition TMessage.h:45
void Reset() override
Reset the message buffer so we can use (i.e. fill) it again.
Definition TMessage.cxx:179
char * fBufComp
Definition TMessage.h:47
char * fBufCompCur
Definition TMessage.h:48
Bool_t fEvolution
Definition TMessage.h:50
void SetCompressionSettings(Int_t settings=ROOT::RCompressionSetting::EDefaults::kUseCompiledDefault)
Set compression settings.
Definition TMessage.cxx:288
UShort_t WriteProcessID(TProcessID *pid) override
Check if the ProcessID pid is already in the message.
Definition TMessage.cxx:438
Int_t Compress()
Compress the message.
Definition TMessage.cxx:306
virtual ~TMessage()
Destructor.
Definition TMessage.cxx:106
Int_t fCompress
Definition TMessage.h:46
static Bool_t UsesSchemaEvolutionForAll()
Static function returning status of global schema evolution.
Definition TMessage.cxx:124
TMessage(const TMessage &)
void TagStreamerInfo(TVirtualStreamerInfo *info) override
Remember that the StreamerInfo is being used in writing.
Definition TMessage.cxx:168
void SetCompressionLevel(Int_t level=ROOT::RCompressionSetting::ELevel::kUseMin)
Set compression level.
Definition TMessage.cxx:264
Int_t Uncompress()
Uncompress the message.
Definition TMessage.cxx:389
Int_t GetCompressionLevel() const
Definition TMessage.h:106
TBits fBitsPIDs
Definition TMessage.h:43
TList * fInfos
Definition TMessage.h:42
Int_t CompLength() const
Definition TMessage.h:90
static Bool_t fgEvolution
Definition TMessage.h:52
void SetWhat(UInt_t what)
Using this method one can change the message type a-posteriori In case you OR "what" with kMESS_ACK,...
Definition TMessage.cxx:222
void SetCompressionAlgorithm(Int_t algorithm=ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
Set compression algorithm.
Definition TMessage.cxx:242
virtual UInt_t GetUniqueID() const
Return the unique object id.
Definition TObject.cxx:457
void SetBit(UInt_t f, Bool_t set)
Set or unset the user status bits as specified in f.
Definition TObject.cxx:780
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:976
A TProcessID identifies a ROOT job in a unique way in time and space.
Definition TProcessID.h:74
static TProcessID * GetPID()
static: returns pointer to current TProcessID
Abstract Interface class describing Streamer information for one class.
This file contains a specialised ROOT message handler to test for diagnostic in unit tests.
static const char * what
Definition stlLoader.cc:6
EValues
Note: this is only temporarily a struct and will become a enum class hence the name.
Definition Compression.h:85
@ kUndefined
Undefined compression algorithm (must be kept the last of the list in case a new algorithm is added).
@ kUseMin
Compression level reserved when we are not sure what to use (1 is for the fastest compression)
Definition Compression.h:70