Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
THttpWSHandler.cxx
Go to the documentation of this file.
1// $Id$
2// Author: Sergey Linev 20/10/2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "THttpWSHandler.h"
13
14#include "THttpWSEngine.h"
15#include "THttpCallArg.h"
16#include "TSystem.h"
17
18#include <thread>
19#include <chrono>
20
21/** \class THttpWSHandler
22\ingroup http
23
24Class for user-side handling of websocket with THttpServer
25
26Approximate how-to:
27
281. Create derived from THttpWSHandler class and implement
29 ProcessWS() method, where all web sockets request handled.
30
312. Register instance of derived class to running THttpServer
32
33 TUserWSHandler *handler = new TUserWSHandler("name1","title");
34 THttpServer *server = new THttpServer("http:8090");
35 server->Register("/subfolder", handler)
36
373. Now server can accept web socket connection from outside.
38 For instance, from JavaScirpt one can connect to it with code:
39
40 let ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket");
41
424. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
43
44 if (arg->IsMethod("WS_CONNECT")) {
45 return true; // to accept incoming request
46 }
47
48 if (arg->IsMethod("WS_READY")) {
49 fWSId = arg->GetWSId(); // fWSId should be member of the user class
50 return true; // connection established
51 }
52
53 if (arg->IsMethod("WS_CLOSE")) {
54 fWSId = 0;
55 return true; // confirm close of socket
56 }
57
58 if (arg->IsMethod("WS_DATA")) {
59 std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
60 std::cout << "got string " << str << std::endl;
61 SendCharStarWS(fWSId, "our reply");
62 return true;
63 }
64
655. See in `$ROOTSYS/tutorials/http/ws.C` and `$ROOTSYS/tutorials/http/ws.htm` functional example
66*/
67
68
70
71////////////////////////////////////////////////////////////////////////////////
72/// normal constructor
73
74THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
75{
76}
77
78////////////////////////////////////////////////////////////////////////////////
79/// destructor
80/// Make sure that all sending threads are stopped correctly
81
83{
85
86 std::vector<std::shared_ptr<THttpWSEngine>> clr;
87
88 {
89 std::lock_guard<std::mutex> grd(fMutex);
90 std::swap(clr, fEngines);
91 }
92
93 for (auto &eng : clr) {
94 eng->fDisabled = true;
95 if (eng->fHasSendThrd) {
96 eng->fHasSendThrd = false;
97 if (eng->fWaiting)
98 eng->fCond.notify_all();
99 eng->fSendThrd.join();
100 }
101 eng->ClearHandle(kTRUE); // terminate connection before starting destructor
102 }
103}
104
105/// Returns current number of websocket connections
107{
108 std::lock_guard<std::mutex> grd(fMutex);
109 return fEngines.size();
110}
111
112////////////////////////////////////////////////////////////////////////////////
113/// Return websocket id with given sequential number
114/// Number of websockets returned with GetNumWS() method
115
117{
118 std::lock_guard<std::mutex> grd(fMutex);
119 auto iter = fEngines.begin() + num;
120 return (*iter)->GetId();
121}
122
123////////////////////////////////////////////////////////////////////////////////
124/// Find websocket connection handle with given id
125/// If book_send parameter specified, have to book send operation under the mutex
126
127std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
128{
129 if (IsDisabled())
130 return nullptr;
131
132 std::lock_guard<std::mutex> grd(fMutex);
133
134 for (auto &eng : fEngines)
135 if (eng->GetId() == wsid) {
136
137 // not allow to work with disabled engine
138 if (eng->fDisabled)
139 return nullptr;
140
141 if (book_send) {
142 if (eng->fMTSend) {
143 Error("FindEngine", "Try to book next send operation before previous completed");
144 return nullptr;
145 }
146 eng->fMTSend = kTRUE;
147 }
148 return eng;
149 }
150
151 return nullptr;
152}
153
154////////////////////////////////////////////////////////////////////////////////
155/// Remove and destroy WS connection
156
157void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
158{
159 if (!engine) return;
160
161 {
162 std::lock_guard<std::mutex> grd(fMutex);
163
164 for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
165 if (*iter == engine) {
166 if (engine->fMTSend)
167 Error("RemoveEngine", "Trying to remove WS engine during send operation");
168
169 engine->fDisabled = true;
170 fEngines.erase(iter);
171 break;
172 }
173 }
174
175 engine->ClearHandle(terminate);
176
177 if (engine->fHasSendThrd) {
178 engine->fHasSendThrd = false;
179 if (engine->fWaiting)
180 engine->fCond.notify_all();
181 engine->fSendThrd.join();
182 }
183}
184
185////////////////////////////////////////////////////////////////////////////////
186/// Process request to websocket
187/// Different kind of requests coded into THttpCallArg::Method:
188///
189/// "WS_CONNECT" - connection request
190/// "WS_READY" - connection ready
191/// "WS_CLOSE" - connection closed
192///
193/// All other are normal data, which are delivered to users
194
195Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
196{
197 if (IsDisabled())
198 return kFALSE;
199
200 if (!arg->GetWSId())
201 return ProcessWS(arg.get());
202
203 // normally here one accept or reject connection requests
204 if (arg->IsMethod("WS_CONNECT"))
205 return ProcessWS(arg.get());
206
207 auto engine = FindEngine(arg->GetWSId());
208
209 if (arg->IsMethod("WS_READY")) {
210
211 if (engine) {
212 Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
213 RemoveEngine(engine, kTRUE);
214 }
215
216 engine = arg->TakeWSEngine();
217 {
218 std::lock_guard<std::mutex> grd(fMutex);
219 fEngines.emplace_back(engine);
220 }
221
222 if (!ProcessWS(arg.get())) {
223 // if connection refused, remove engine again
224 RemoveEngine(engine, kTRUE);
225 return kFALSE;
226 }
227
228 return kTRUE;
229 }
230
231 if (arg->IsMethod("WS_CLOSE")) {
232 // connection is closed, one can remove handle
233
234 RemoveEngine(engine);
235
236 return ProcessWS(arg.get());
237 }
238
239 if (engine && engine->PreProcess(arg)) {
240 PerformSend(engine);
241 return kTRUE;
242 }
243
244 Bool_t res = ProcessWS(arg.get());
245
246 if (engine)
247 engine->PostProcess(arg);
248
249 return res;
250}
251
252////////////////////////////////////////////////////////////////////////////////
253/// Close connection with given websocket id
254
256{
257 auto engine = FindEngine(wsid);
258
259 RemoveEngine(engine, kTRUE);
260}
261
262////////////////////////////////////////////////////////////////////////////////
263/// Send data stored in the buffer. Returns:
264///
265/// * 0 - when operation was executed immediately
266/// * 1 - when send operation will be performed in different thread
267
268Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
269{
270 if (IsSyncMode() || !engine->SupportSendThrd()) {
271 // this is case of longpoll engine, no extra thread is required for it
272 if (engine->CanSendDirectly())
273 return PerformSend(engine);
274
275 // handling will be performed in following http request handler
276
277 if (!IsSyncMode()) return 1;
278
279 // now we should wait until next polling requests is processed
280 // or when connection is closed or handler is shutdown
281
282 Int_t sendcnt = fSendCnt, loopcnt = 0;
283
284 while (!IsDisabled() && !engine->fDisabled) {
286 // if send counter changed - current send operation is completed
287 if (sendcnt != fSendCnt)
288 return 0;
289 if (loopcnt++ > 1000) {
290 loopcnt = 0;
291 std::this_thread::sleep_for(std::chrono::milliseconds(1));
292 }
293 }
294
295 return -1;
296 }
297
298 // probably this thread can continuously run
299 std::thread thrd([this, engine] {
300 while (!IsDisabled() && !engine->fDisabled) {
301 PerformSend(engine);
302 if (IsDisabled() || engine->fDisabled) break;
303 std::unique_lock<std::mutex> lk(engine->fMutex);
304 if (engine->fKind == THttpWSEngine::kNone) {
305 engine->fWaiting = true;
306 engine->fCond.wait(lk);
307 engine->fWaiting = false;
308 }
309 }
310 });
311
312 engine->fSendThrd.swap(thrd);
313
314 engine->fHasSendThrd = true;
315
316 return 1;
317}
318
319
320////////////////////////////////////////////////////////////////////////////////
321/// Perform send operation, stored in buffer
322
323Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
324{
325 {
326 std::lock_guard<std::mutex> grd(engine->fMutex);
327
328 // no need to do something - operation was processed already by somebody else
329 if (engine->fKind == THttpWSEngine::kNone)
330 return 0;
331
332 if (engine->fSending)
333 return 1;
334 engine->fSending = true;
335 }
336
337 if (IsDisabled() || engine->fDisabled)
338 return 0;
339
340 switch (engine->fKind) {
342 engine->Send(engine->fData.data(), engine->fData.length());
343 break;
345 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
346 break;
348 engine->SendCharStar(engine->fData.c_str());
349 break;
350 default:
351 break;
352 }
353
354 engine->fData.clear();
355 engine->fHdr.clear();
356
357 {
358 std::lock_guard<std::mutex> grd(engine->fMutex);
359 engine->fSending = false;
360 engine->fKind = THttpWSEngine::kNone;
361 }
362
363 return CompleteSend(engine);
364}
365
366
367////////////////////////////////////////////////////////////////////////////////
368/// Complete current send operation
369
370Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
371{
372 fSendCnt++;
373 engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
374 CompleteWSSend(engine->GetId());
375 return 0; // indicates that operation is completed
376}
377
378
379////////////////////////////////////////////////////////////////////////////////
380/// Send binary data via given websocket id. Returns:
381///
382/// * -1 - in case of error
383/// * 0 - when operation was executed immediately
384/// * 1 - when send operation will be performed in different thread
385
386Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
387{
388 auto engine = FindEngine(wsid, kTRUE);
389 if (!engine) return -1;
390
391 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
392 engine->Send(buf, len);
393 return CompleteSend(engine);
394 }
395
396 bool notify = false;
397
398 // now we indicate that there is data and any thread can access it
399 {
400 std::lock_guard<std::mutex> grd(engine->fMutex);
401
402 if (engine->fKind != THttpWSEngine::kNone) {
403 Error("SendWS", "Data kind is not empty - something screwed up");
404 return -1;
405 }
406
407 notify = engine->fWaiting;
408
409 engine->fKind = THttpWSEngine::kData;
410
411 engine->fData.resize(len);
412 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
413 }
414
415 if (engine->fHasSendThrd) {
416 if (notify) engine->fCond.notify_all();
417 return 1;
418 }
419
420 return RunSendingThrd(engine);
421}
422
423
424////////////////////////////////////////////////////////////////////////////////
425/// Send binary data with text header via given websocket id. Returns:
426///
427/// * -1 - in case of error,
428/// * 0 - when operation was executed immediately,
429/// * 1 - when send operation will be performed in different thread,
430
431Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
432{
433 auto engine = FindEngine(wsid, kTRUE);
434 if (!engine) return -1;
435
436 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
437 engine->SendHeader(hdr, buf, len);
438 return CompleteSend(engine);
439 }
440
441 bool notify = false;
442
443 // now we indicate that there is data and any thread can access it
444 {
445 std::lock_guard<std::mutex> grd(engine->fMutex);
446
447 if (engine->fKind != THttpWSEngine::kNone) {
448 Error("SendWS", "Data kind is not empty - something screwed up");
449 return -1;
450 }
451
452 notify = engine->fWaiting;
453
454 engine->fKind = THttpWSEngine::kHeader;
455
456 engine->fHdr = hdr;
457 engine->fData.resize(len);
458 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
459 }
460
461 if (engine->fHasSendThrd) {
462 if (notify) engine->fCond.notify_all();
463 return 1;
464 }
465
466 return RunSendingThrd(engine);
467}
468
469////////////////////////////////////////////////////////////////////////////////
470/// Send string via given websocket id. Returns:
471///
472/// * -1 - in case of error,
473/// * 0 - when operation was executed immediately,
474/// * 1 - when send operation will be performed in different thread,
475
477{
478 auto engine = FindEngine(wsid, kTRUE);
479 if (!engine) return -1;
480
481 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
482 engine->SendCharStar(str);
483 return CompleteSend(engine);
484 }
485
486 bool notify = false;
487
488 // now we indicate that there is data and any thread can access it
489 {
490 std::lock_guard<std::mutex> grd(engine->fMutex);
491
492 if (engine->fKind != THttpWSEngine::kNone) {
493 Error("SendWS", "Data kind is not empty - something screwed up");
494 return -1;
495 }
496
497 notify = engine->fWaiting;
498
499 engine->fKind = THttpWSEngine::kText;
500 engine->fData = str;
501 }
502
503 if (engine->fHasSendThrd) {
504 if (notify) engine->fCond.notify_all();
505 return 1;
506 }
507
508 return RunSendingThrd(engine);
509}
constexpr Bool_t kFALSE
Definition RtypesCore.h:101
constexpr Bool_t kTRUE
Definition RtypesCore.h:100
#define ClassImp(name)
Definition Rtypes.h:377
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:555
enum THttpWSEngine::@153 kNone
! kind of operation
Class for user-side handling of websocket with THttpServer.
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method:
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id.
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id.
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
normal constructor
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id.
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer.
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:987
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition TSystem.cxx:416