Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
THttpWSHandler.cxx
Go to the documentation of this file.
1// $Id$
2// Author: Sergey Linev 20/10/2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "THttpWSHandler.h"
13
14#include "THttpWSEngine.h"
15#include "THttpCallArg.h"
16#include "TSystem.h"
17
18#include <thread>
19#include <chrono>
20
21/** \class THttpWSHandler
22
23Class for user-side handling of websocket with THttpServer
24
25Approximate how-to:
26
271. Create derived from THttpWSHandler class and implement
28 ProcessWS() method, where all web sockets request handled.
29
302. Register instance of derived class to running THttpServer
31
32 TUserWSHandler *handler = new TUserWSHandler("name1","title");
33 THttpServer *server = new THttpServer("http:8090");
34 server->Register("/subfolder", handler)
35
363. Now server can accept web socket connection from outside.
37 For instance, from JavaScirpt one can connect to it with code:
38
39 let ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket");
40
414. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
42
43 if (arg->IsMethod("WS_CONNECT")) {
44 return true; // to accept incoming request
45 }
46
47 if (arg->IsMethod("WS_READY")) {
48 fWSId = arg->GetWSId(); // fWSId should be member of the user class
49 return true; // connection established
50 }
51
52 if (arg->IsMethod("WS_CLOSE")) {
53 fWSId = 0;
54 return true; // confirm close of socket
55 }
56
57 if (arg->IsMethod("WS_DATA")) {
58 std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
59 std::cout << "got string " << str << std::endl;
60 SendCharStarWS(fWSId, "our reply");
61 return true;
62 }
63
645. See in `$ROOTSYS/tutorials/http/ws.C` and `$ROOTSYS/tutorials/http/ws.htm` functional example
65*/
66
67
68
69////////////////////////////////////////////////////////////////////////////////
70/// normal constructor
71
72THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
73{
74}
75
76////////////////////////////////////////////////////////////////////////////////
77/// destructor
78/// Make sure that all sending threads are stopped correctly
79
81{
83
84 std::vector<std::shared_ptr<THttpWSEngine>> clr;
85
86 {
87 std::lock_guard<std::mutex> grd(fMutex);
88 std::swap(clr, fEngines);
89 }
90
91 for (auto &eng : clr) {
92 eng->fDisabled = true;
93 if (eng->fHasSendThrd) {
94 eng->fHasSendThrd = false;
95 if (eng->fWaiting)
96 eng->fCond.notify_all();
97 eng->fSendThrd.join();
98 }
99 eng->ClearHandle(kTRUE); // terminate connection before starting destructor
100 }
101}
102
103/// Returns current number of websocket connections
105{
106 std::lock_guard<std::mutex> grd(fMutex);
107 return fEngines.size();
108}
109
110////////////////////////////////////////////////////////////////////////////////
111/// Return websocket id with given sequential number
112/// Number of websockets returned with GetNumWS() method
113
115{
116 std::lock_guard<std::mutex> grd(fMutex);
117 auto iter = fEngines.begin() + num;
118 return (*iter)->GetId();
119}
120
121////////////////////////////////////////////////////////////////////////////////
122/// Find websocket connection handle with given id
123/// If book_send parameter specified, have to book send operation under the mutex
124
125std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
126{
127 if (IsDisabled())
128 return nullptr;
129
130 std::lock_guard<std::mutex> grd(fMutex);
131
132 for (auto &eng : fEngines)
133 if (eng->GetId() == wsid) {
134
135 // not allow to work with disabled engine
136 if (eng->fDisabled)
137 return nullptr;
138
139 if (book_send) {
140 if (eng->fMTSend) {
141 Error("FindEngine", "Try to book next send operation before previous completed");
142 return nullptr;
143 }
144 eng->fMTSend = kTRUE;
145 }
146 return eng;
147 }
148
149 return nullptr;
150}
151
152////////////////////////////////////////////////////////////////////////////////
153/// Remove and destroy WS connection
154
155void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
156{
157 if (!engine) return;
158
159 {
160 std::lock_guard<std::mutex> grd(fMutex);
161
162 for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
163 if (*iter == engine) {
164 if (engine->fMTSend)
165 Error("RemoveEngine", "Trying to remove WS engine during send operation");
166
167 engine->fDisabled = true;
168 fEngines.erase(iter);
169 break;
170 }
171 }
172
173 engine->ClearHandle(terminate);
174
175 if (engine->fHasSendThrd) {
176 engine->fHasSendThrd = false;
177 if (engine->fWaiting)
178 engine->fCond.notify_all();
179 engine->fSendThrd.join();
180 }
181}
182
183////////////////////////////////////////////////////////////////////////////////
184/// Process request to websocket
185/// Different kind of requests coded into THttpCallArg::Method:
186///
187/// "WS_CONNECT" - connection request
188/// "WS_READY" - connection ready
189/// "WS_CLOSE" - connection closed
190///
191/// All other are normal data, which are delivered to users
192
193Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
194{
195 if (IsDisabled())
196 return kFALSE;
197
198 if (!arg->GetWSId())
199 return ProcessWS(arg.get());
200
201 // normally here one accept or reject connection requests
202 if (arg->IsMethod("WS_CONNECT"))
203 return ProcessWS(arg.get());
204
205 auto engine = FindEngine(arg->GetWSId());
206
207 if (arg->IsMethod("WS_READY")) {
208
209 if (engine) {
210 Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
211 RemoveEngine(engine, kTRUE);
212 }
213
214 engine = arg->TakeWSEngine();
215 {
216 std::lock_guard<std::mutex> grd(fMutex);
217 fEngines.emplace_back(engine);
218 }
219
220 if (!ProcessWS(arg.get())) {
221 // if connection refused, remove engine again
222 RemoveEngine(engine, kTRUE);
223 return kFALSE;
224 }
225
226 return kTRUE;
227 }
228
229 if (arg->IsMethod("WS_CLOSE")) {
230 // connection is closed, one can remove handle
231
232 RemoveEngine(engine);
233
234 return ProcessWS(arg.get());
235 }
236
237 if (engine && engine->PreProcess(arg)) {
238 PerformSend(engine);
239 return kTRUE;
240 }
241
242 Bool_t res = ProcessWS(arg.get());
243
244 if (engine)
245 engine->PostProcess(arg);
246
247 return res;
248}
249
250////////////////////////////////////////////////////////////////////////////////
251/// Close connection with given websocket id
252
254{
255 auto engine = FindEngine(wsid);
256
257 RemoveEngine(engine, kTRUE);
258}
259
260////////////////////////////////////////////////////////////////////////////////
261/// Send data stored in the buffer. Returns:
262///
263/// * 0 - when operation was executed immediately
264/// * 1 - when send operation will be performed in different thread
265
266Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
267{
268 if (IsSyncMode() || !engine->SupportSendThrd()) {
269 // this is case of longpoll engine, no extra thread is required for it
270 if (engine->CanSendDirectly())
271 return PerformSend(engine);
272
273 // handling will be performed in following http request handler
274
275 if (!IsSyncMode()) return 1;
276
277 // now we should wait until next polling requests is processed
278 // or when connection is closed or handler is shutdown
279
281
282 while (!IsDisabled() && !engine->fDisabled) {
284 // if send counter changed - current send operation is completed
285 if (sendcnt != fSendCnt)
286 return 0;
287 if (loopcnt++ > 1000) {
288 loopcnt = 0;
289 std::this_thread::sleep_for(std::chrono::milliseconds(1));
290 }
291 }
292
293 return -1;
294 }
295
296 // probably this thread can continuously run
297 std::thread thrd([this, engine] {
298 while (!IsDisabled() && !engine->fDisabled) {
299 PerformSend(engine);
300 if (IsDisabled() || engine->fDisabled) break;
301 std::unique_lock<std::mutex> lk(engine->fMutex);
302 if (engine->fKind == THttpWSEngine::kNone) {
303 engine->fWaiting = true;
304 engine->fCond.wait(lk);
305 engine->fWaiting = false;
306 }
307 }
308 });
309
310 engine->fSendThrd.swap(thrd);
311
312 engine->fHasSendThrd = true;
313
314 return 1;
315}
316
317
318////////////////////////////////////////////////////////////////////////////////
319/// Perform send operation, stored in buffer
320
321Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
322{
323 {
324 std::lock_guard<std::mutex> grd(engine->fMutex);
325
326 // no need to do something - operation was processed already by somebody else
327 if (engine->fKind == THttpWSEngine::kNone)
328 return 0;
329
330 if (engine->fSending)
331 return 1;
332 engine->fSending = true;
333 }
334
335 if (IsDisabled() || engine->fDisabled)
336 return 0;
337
338 switch (engine->fKind) {
340 engine->Send(engine->fData.data(), engine->fData.length());
341 break;
343 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
344 break;
346 engine->SendCharStar(engine->fData.c_str());
347 break;
348 default:
349 break;
350 }
351
352 engine->fData.clear();
353 engine->fHdr.clear();
354
355 {
356 std::lock_guard<std::mutex> grd(engine->fMutex);
357 engine->fSending = false;
358 engine->fKind = THttpWSEngine::kNone;
359 }
360
361 return CompleteSend(engine);
362}
363
364
365////////////////////////////////////////////////////////////////////////////////
366/// Complete current send operation
367
368Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
369{
370 fSendCnt++;
371 engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
372 CompleteWSSend(engine->GetId());
373 return 0; // indicates that operation is completed
374}
375
376
377////////////////////////////////////////////////////////////////////////////////
378/// Send binary data via given websocket id. Returns:
379///
380/// * -1 - in case of error
381/// * 0 - when operation was executed immediately
382/// * 1 - when send operation will be performed in different thread
383
385{
386 auto engine = FindEngine(wsid, kTRUE);
387 if (!engine) return -1;
388
389 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
390 engine->Send(buf, len);
391 return CompleteSend(engine);
392 }
393
394 bool notify = false;
395
396 // now we indicate that there is data and any thread can access it
397 {
398 std::lock_guard<std::mutex> grd(engine->fMutex);
399
400 if (engine->fKind != THttpWSEngine::kNone) {
401 Error("SendWS", "Data kind is not empty - something screwed up");
402 return -1;
403 }
404
405 notify = engine->fWaiting;
406
407 engine->fKind = THttpWSEngine::kData;
408
409 engine->fData.resize(len);
410 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
411 }
412
413 if (engine->fHasSendThrd) {
414 if (notify) engine->fCond.notify_all();
415 return 1;
416 }
417
418 return RunSendingThrd(engine);
419}
420
421
422////////////////////////////////////////////////////////////////////////////////
423/// Send binary data with text header via given websocket id. Returns:
424///
425/// * -1 - in case of error,
426/// * 0 - when operation was executed immediately,
427/// * 1 - when send operation will be performed in different thread,
428
429Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
430{
431 auto engine = FindEngine(wsid, kTRUE);
432 if (!engine) return -1;
433
434 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
435 engine->SendHeader(hdr, buf, len);
436 return CompleteSend(engine);
437 }
438
439 bool notify = false;
440
441 // now we indicate that there is data and any thread can access it
442 {
443 std::lock_guard<std::mutex> grd(engine->fMutex);
444
445 if (engine->fKind != THttpWSEngine::kNone) {
446 Error("SendWS", "Data kind is not empty - something screwed up");
447 return -1;
448 }
449
450 notify = engine->fWaiting;
451
452 engine->fKind = THttpWSEngine::kHeader;
453
454 engine->fHdr = hdr;
455 engine->fData.resize(len);
456 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
457 }
458
459 if (engine->fHasSendThrd) {
460 if (notify) engine->fCond.notify_all();
461 return 1;
462 }
463
464 return RunSendingThrd(engine);
465}
466
467////////////////////////////////////////////////////////////////////////////////
468/// Send string via given websocket id. Returns:
469///
470/// * -1 - in case of error,
471/// * 0 - when operation was executed immediately,
472/// * 1 - when send operation will be performed in different thread,
473
475{
476 auto engine = FindEngine(wsid, kTRUE);
477 if (!engine) return -1;
478
479 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
480 engine->SendCharStar(str);
481 return CompleteSend(engine);
482 }
483
484 bool notify = false;
485
486 // now we indicate that there is data and any thread can access it
487 {
488 std::lock_guard<std::mutex> grd(engine->fMutex);
489
490 if (engine->fKind != THttpWSEngine::kNone) {
491 Error("SendWS", "Data kind is not empty - something screwed up");
492 return -1;
493 }
494
495 notify = engine->fWaiting;
496
497 engine->fKind = THttpWSEngine::kText;
498 engine->fData = str;
499 }
500
501 if (engine->fHasSendThrd) {
502 if (notify) engine->fCond.notify_all();
503 return 1;
504 }
505
506 return RunSendingThrd(engine);
507}
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:148
R__EXTERN TSystem * gSystem
Definition TSystem.h:582
enum THttpWSEngine::@149 kNone
! kind of operation
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method:
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id.
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id.
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
normal constructor
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id.
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer.
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1095
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition TSystem.cxx:418