Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
THttpWSHandler.cxx
Go to the documentation of this file.
1// $Id$
2// Author: Sergey Linev 20/10/2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "THttpWSHandler.h"
13
14#include "THttpWSEngine.h"
15#include "THttpCallArg.h"
16#include "TSystem.h"
17
18#include <thread>
19#include <chrono>
20
21/** \class THttpWSHandler
22\ingroup http
23
24Class for user-side handling of websocket with THttpServer
25
26Approximate how-to:
27
281. Create derived from THttpWSHandler class and implement
29 ProcessWS() method, where all web sockets request handled.
30
312. Register instance of derived class to running THttpServer
32
33 TUserWSHandler *handler = new TUserWSHandler("name1","title");
34 THttpServer *server = new THttpServer("http:8090");
35 server->Register("/subfolder", handler)
36
373. Now server can accept web socket connection from outside.
38 For instance, from JavaScirpt one can connect to it with code:
39
40 let ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket");
41
424. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
43
44 if (arg->IsMethod("WS_CONNECT")) {
45 return true; // to accept incoming request
46 }
47
48 if (arg->IsMethod("WS_READY")) {
49 fWSId = arg->GetWSId(); // fWSId should be member of the user class
50 return true; // connection established
51 }
52
53 if (arg->IsMethod("WS_CLOSE")) {
54 fWSId = 0;
55 return true; // confirm close of socket
56 }
57
58 if (arg->IsMethod("WS_DATA")) {
59 std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
60 std::cout << "got string " << str << std::endl;
61 SendCharStarWS(fWSId, "our reply");
62 return true;
63 }
64
655. See in `$ROOTSYS/tutorials/http/ws.C` and `$ROOTSYS/tutorials/http/ws.htm` functional example
66*/
67
68
69
70////////////////////////////////////////////////////////////////////////////////
71/// normal constructor
72
73THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
74{
75}
76
77////////////////////////////////////////////////////////////////////////////////
78/// destructor
79/// Make sure that all sending threads are stopped correctly
80
82{
84
85 std::vector<std::shared_ptr<THttpWSEngine>> clr;
86
87 {
88 std::lock_guard<std::mutex> grd(fMutex);
89 std::swap(clr, fEngines);
90 }
91
92 for (auto &eng : clr) {
93 eng->fDisabled = true;
94 if (eng->fHasSendThrd) {
95 eng->fHasSendThrd = false;
96 if (eng->fWaiting)
97 eng->fCond.notify_all();
98 eng->fSendThrd.join();
99 }
100 eng->ClearHandle(kTRUE); // terminate connection before starting destructor
101 }
102}
103
104/// Returns current number of websocket connections
106{
107 std::lock_guard<std::mutex> grd(fMutex);
108 return fEngines.size();
109}
110
111////////////////////////////////////////////////////////////////////////////////
112/// Return websocket id with given sequential number
113/// Number of websockets returned with GetNumWS() method
114
116{
117 std::lock_guard<std::mutex> grd(fMutex);
118 auto iter = fEngines.begin() + num;
119 return (*iter)->GetId();
120}
121
122////////////////////////////////////////////////////////////////////////////////
123/// Find websocket connection handle with given id
124/// If book_send parameter specified, have to book send operation under the mutex
125
126std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
127{
128 if (IsDisabled())
129 return nullptr;
130
131 std::lock_guard<std::mutex> grd(fMutex);
132
133 for (auto &eng : fEngines)
134 if (eng->GetId() == wsid) {
135
136 // not allow to work with disabled engine
137 if (eng->fDisabled)
138 return nullptr;
139
140 if (book_send) {
141 if (eng->fMTSend) {
142 Error("FindEngine", "Try to book next send operation before previous completed");
143 return nullptr;
144 }
145 eng->fMTSend = kTRUE;
146 }
147 return eng;
148 }
149
150 return nullptr;
151}
152
153////////////////////////////////////////////////////////////////////////////////
154/// Remove and destroy WS connection
155
156void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
157{
158 if (!engine) return;
159
160 {
161 std::lock_guard<std::mutex> grd(fMutex);
162
163 for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
164 if (*iter == engine) {
165 if (engine->fMTSend)
166 Error("RemoveEngine", "Trying to remove WS engine during send operation");
167
168 engine->fDisabled = true;
169 fEngines.erase(iter);
170 break;
171 }
172 }
173
174 engine->ClearHandle(terminate);
175
176 if (engine->fHasSendThrd) {
177 engine->fHasSendThrd = false;
178 if (engine->fWaiting)
179 engine->fCond.notify_all();
180 engine->fSendThrd.join();
181 }
182}
183
184////////////////////////////////////////////////////////////////////////////////
185/// Process request to websocket
186/// Different kind of requests coded into THttpCallArg::Method:
187///
188/// "WS_CONNECT" - connection request
189/// "WS_READY" - connection ready
190/// "WS_CLOSE" - connection closed
191///
192/// All other are normal data, which are delivered to users
193
194Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
195{
196 if (IsDisabled())
197 return kFALSE;
198
199 if (!arg->GetWSId())
200 return ProcessWS(arg.get());
201
202 // normally here one accept or reject connection requests
203 if (arg->IsMethod("WS_CONNECT"))
204 return ProcessWS(arg.get());
205
206 auto engine = FindEngine(arg->GetWSId());
207
208 if (arg->IsMethod("WS_READY")) {
209
210 if (engine) {
211 Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
212 RemoveEngine(engine, kTRUE);
213 }
214
215 engine = arg->TakeWSEngine();
216 {
217 std::lock_guard<std::mutex> grd(fMutex);
218 fEngines.emplace_back(engine);
219 }
220
221 if (!ProcessWS(arg.get())) {
222 // if connection refused, remove engine again
223 RemoveEngine(engine, kTRUE);
224 return kFALSE;
225 }
226
227 return kTRUE;
228 }
229
230 if (arg->IsMethod("WS_CLOSE")) {
231 // connection is closed, one can remove handle
232
233 RemoveEngine(engine);
234
235 return ProcessWS(arg.get());
236 }
237
238 if (engine && engine->PreProcess(arg)) {
239 PerformSend(engine);
240 return kTRUE;
241 }
242
243 Bool_t res = ProcessWS(arg.get());
244
245 if (engine)
246 engine->PostProcess(arg);
247
248 return res;
249}
250
251////////////////////////////////////////////////////////////////////////////////
252/// Close connection with given websocket id
253
255{
256 auto engine = FindEngine(wsid);
257
258 RemoveEngine(engine, kTRUE);
259}
260
261////////////////////////////////////////////////////////////////////////////////
262/// Send data stored in the buffer. Returns:
263///
264/// * 0 - when operation was executed immediately
265/// * 1 - when send operation will be performed in different thread
266
267Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
268{
269 if (IsSyncMode() || !engine->SupportSendThrd()) {
270 // this is case of longpoll engine, no extra thread is required for it
271 if (engine->CanSendDirectly())
272 return PerformSend(engine);
273
274 // handling will be performed in following http request handler
275
276 if (!IsSyncMode()) return 1;
277
278 // now we should wait until next polling requests is processed
279 // or when connection is closed or handler is shutdown
280
282
283 while (!IsDisabled() && !engine->fDisabled) {
285 // if send counter changed - current send operation is completed
286 if (sendcnt != fSendCnt)
287 return 0;
288 if (loopcnt++ > 1000) {
289 loopcnt = 0;
290 std::this_thread::sleep_for(std::chrono::milliseconds(1));
291 }
292 }
293
294 return -1;
295 }
296
297 // probably this thread can continuously run
298 std::thread thrd([this, engine] {
299 while (!IsDisabled() && !engine->fDisabled) {
300 PerformSend(engine);
301 if (IsDisabled() || engine->fDisabled) break;
302 std::unique_lock<std::mutex> lk(engine->fMutex);
303 if (engine->fKind == THttpWSEngine::kNone) {
304 engine->fWaiting = true;
305 engine->fCond.wait(lk);
306 engine->fWaiting = false;
307 }
308 }
309 });
310
311 engine->fSendThrd.swap(thrd);
312
313 engine->fHasSendThrd = true;
314
315 return 1;
316}
317
318
319////////////////////////////////////////////////////////////////////////////////
320/// Perform send operation, stored in buffer
321
322Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
323{
324 {
325 std::lock_guard<std::mutex> grd(engine->fMutex);
326
327 // no need to do something - operation was processed already by somebody else
328 if (engine->fKind == THttpWSEngine::kNone)
329 return 0;
330
331 if (engine->fSending)
332 return 1;
333 engine->fSending = true;
334 }
335
336 if (IsDisabled() || engine->fDisabled)
337 return 0;
338
339 switch (engine->fKind) {
341 engine->Send(engine->fData.data(), engine->fData.length());
342 break;
344 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
345 break;
347 engine->SendCharStar(engine->fData.c_str());
348 break;
349 default:
350 break;
351 }
352
353 engine->fData.clear();
354 engine->fHdr.clear();
355
356 {
357 std::lock_guard<std::mutex> grd(engine->fMutex);
358 engine->fSending = false;
359 engine->fKind = THttpWSEngine::kNone;
360 }
361
362 return CompleteSend(engine);
363}
364
365
366////////////////////////////////////////////////////////////////////////////////
367/// Complete current send operation
368
369Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
370{
371 fSendCnt++;
372 engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
373 CompleteWSSend(engine->GetId());
374 return 0; // indicates that operation is completed
375}
376
377
378////////////////////////////////////////////////////////////////////////////////
379/// Send binary data via given websocket id. Returns:
380///
381/// * -1 - in case of error
382/// * 0 - when operation was executed immediately
383/// * 1 - when send operation will be performed in different thread
384
386{
387 auto engine = FindEngine(wsid, kTRUE);
388 if (!engine) return -1;
389
390 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
391 engine->Send(buf, len);
392 return CompleteSend(engine);
393 }
394
395 bool notify = false;
396
397 // now we indicate that there is data and any thread can access it
398 {
399 std::lock_guard<std::mutex> grd(engine->fMutex);
400
401 if (engine->fKind != THttpWSEngine::kNone) {
402 Error("SendWS", "Data kind is not empty - something screwed up");
403 return -1;
404 }
405
406 notify = engine->fWaiting;
407
408 engine->fKind = THttpWSEngine::kData;
409
410 engine->fData.resize(len);
411 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
412 }
413
414 if (engine->fHasSendThrd) {
415 if (notify) engine->fCond.notify_all();
416 return 1;
417 }
418
419 return RunSendingThrd(engine);
420}
421
422
423////////////////////////////////////////////////////////////////////////////////
424/// Send binary data with text header via given websocket id. Returns:
425///
426/// * -1 - in case of error,
427/// * 0 - when operation was executed immediately,
428/// * 1 - when send operation will be performed in different thread,
429
430Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
431{
432 auto engine = FindEngine(wsid, kTRUE);
433 if (!engine) return -1;
434
435 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
436 engine->SendHeader(hdr, buf, len);
437 return CompleteSend(engine);
438 }
439
440 bool notify = false;
441
442 // now we indicate that there is data and any thread can access it
443 {
444 std::lock_guard<std::mutex> grd(engine->fMutex);
445
446 if (engine->fKind != THttpWSEngine::kNone) {
447 Error("SendWS", "Data kind is not empty - something screwed up");
448 return -1;
449 }
450
451 notify = engine->fWaiting;
452
453 engine->fKind = THttpWSEngine::kHeader;
454
455 engine->fHdr = hdr;
456 engine->fData.resize(len);
457 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
458 }
459
460 if (engine->fHasSendThrd) {
461 if (notify) engine->fCond.notify_all();
462 return 1;
463 }
464
465 return RunSendingThrd(engine);
466}
467
468////////////////////////////////////////////////////////////////////////////////
469/// Send string via given websocket id. Returns:
470///
471/// * -1 - in case of error,
472/// * 0 - when operation was executed immediately,
473/// * 1 - when send operation will be performed in different thread,
474
476{
477 auto engine = FindEngine(wsid, kTRUE);
478 if (!engine) return -1;
479
480 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
481 engine->SendCharStar(str);
482 return CompleteSend(engine);
483 }
484
485 bool notify = false;
486
487 // now we indicate that there is data and any thread can access it
488 {
489 std::lock_guard<std::mutex> grd(engine->fMutex);
490
491 if (engine->fKind != THttpWSEngine::kNone) {
492 Error("SendWS", "Data kind is not empty - something screwed up");
493 return -1;
494 }
495
496 notify = engine->fWaiting;
497
498 engine->fKind = THttpWSEngine::kText;
499 engine->fData = str;
500 }
501
502 if (engine->fHasSendThrd) {
503 if (notify) engine->fCond.notify_all();
504 return 1;
505 }
506
507 return RunSendingThrd(engine);
508}
constexpr Bool_t kFALSE
Definition RtypesCore.h:108
constexpr Bool_t kTRUE
Definition RtypesCore.h:107
ROOT::Detail::TRangeCast< T, true > TRangeDynCast
TRangeDynCast is an adapter class that allows the typed iteration through a TCollection.
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char Pixmap_t Pixmap_t PictureAttributes_t attr const char char ret_data h unsigned char height h Atom_t Int_t ULong_t ULong_t unsigned char prop_list Atom_t Atom_t Atom_t Time_t UChar_t len
char name[80]
Definition TGX11.cxx:110
R__EXTERN TSystem * gSystem
Definition TSystem.h:572
enum THttpWSEngine::@152 kNone
! kind of operation
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method:
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id.
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id.
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
normal constructor
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id.
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer.
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:1071
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition TSystem.cxx:414