Logo ROOT   6.16/01
Reference Guide
THttpWSHandler.cxx
Go to the documentation of this file.
1// $Id$
2// Author: Sergey Linev 20/10/2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "THttpWSHandler.h"
13
14#include "THttpWSEngine.h"
15#include "THttpCallArg.h"
16#include "TSystem.h"
17
18#include <thread>
19#include <chrono>
20
21/////////////////////////////////////////////////////////////////////////
22///
23/// THttpWSHandler
24///
25/// Class for user-side handling of websocket with THttpServer
26/// 1. Create derived from THttpWSHandler class and implement
27/// ProcessWS() method, where all web sockets request handled.
28/// 2. Register instance of derived class to running THttpServer
29///
30/// TUserWSHandler *handler = new TUserWSHandler("name1","title");
31/// THttpServer *server = new THttpServer("http:8090");
32/// server->Register("/subfolder", handler)
33///
34/// 3. Now server can accept web socket connection from outside.
35/// For instance, from JavaScirpt one can connect to it with code:
36///
37/// var ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket")
38///
39/// 4. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
40///
41/// if (arg->IsMethod("WS_CONNECT")) {
42/// return true; // to accept incoming request
43/// }
44///
45/// if (arg->IsMethod("WS_READY")) {
46/// fWSId = arg->GetWSId(); // fWSId should be member of the user class
47/// return true; // connection established
48/// }
49///
50/// if (arg->IsMethod("WS_CLOSE")) {
51/// fWSId = 0;
52/// return true; // confirm close of socket
53/// }
54///
55/// if (arg->IsMethod("WS_DATA")) {
56/// // received data stored as POST data
57/// std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
58/// std::cout << "got string " << str << std::endl;
59/// // immediately send data back using websocket id
60/// SendCharStarWS(fWSId, "our reply");
61/// return true;
62/// }
63///
64///////////////////////////////////////////////////////////////////////////
65
67
68////////////////////////////////////////////////////////////////////////////////
69/// normal constructor
70
71THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
72{
73}
74
75////////////////////////////////////////////////////////////////////////////////
76/// destructor
77/// Make sure that all sending threads are stopped correctly
78
80{
82
83 std::vector<std::shared_ptr<THttpWSEngine>> clr;
84
85 {
86 std::lock_guard<std::mutex> grd(fMutex);
87 std::swap(clr, fEngines);
88 }
89
90 for (auto &eng : clr) {
91 eng->fDisabled = true;
92 if (eng->fHasSendThrd) {
93 eng->fHasSendThrd = false;
94 eng->fCond.notify_all();
95 eng->fSendThrd.join();
96 }
97 eng->ClearHandle(kTRUE); // terminate connection before starting destructor
98 }
99}
100
101/// Returns current number of websocket connections
103{
104 std::lock_guard<std::mutex> grd(fMutex);
105 return fEngines.size();
106}
107
108////////////////////////////////////////////////////////////////////////////////
109/// Return websocket id with given sequential number
110/// Number of websockets returned with GetNumWS() method
111
113{
114 std::lock_guard<std::mutex> grd(fMutex);
115 auto iter = fEngines.begin() + num;
116 return (*iter)->GetId();
117}
118
119////////////////////////////////////////////////////////////////////////////////
120/// Find websocket connection handle with given id
121/// If book_send parameter specified, have to book send operation under the mutex
122
123std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
124{
125 if (IsDisabled())
126 return nullptr;
127
128 std::lock_guard<std::mutex> grd(fMutex);
129
130 for (auto &eng : fEngines)
131 if (eng->GetId() == wsid) {
132
133 // not allow to work with disabled engine
134 if (eng->fDisabled)
135 return nullptr;
136
137 if (book_send) {
138 if (eng->fMTSend) {
139 Error("FindEngine", "Try to book next send operation before previous completed");
140 return nullptr;
141 }
142 eng->fMTSend = kTRUE;
143 }
144 return eng;
145 }
146
147 return nullptr;
148}
149
150////////////////////////////////////////////////////////////////////////////////
151/// Remove and destroy WS connection
152
153void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
154{
155 if (!engine) return;
156
157 {
158 std::lock_guard<std::mutex> grd(fMutex);
159
160 for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
161 if (*iter == engine) {
162 if (engine->fMTSend)
163 Error("RemoveEngine", "Trying to remove WS engine during send operation");
164
165 engine->fDisabled = true;
166 fEngines.erase(iter);
167 break;
168 }
169 }
170
171 engine->ClearHandle(terminate);
172
173 if (engine->fHasSendThrd) {
174 engine->fHasSendThrd = false;
175 engine->fCond.notify_all();
176 engine->fSendThrd.join();
177 }
178}
179
180////////////////////////////////////////////////////////////////////////////////
181/// Process request to websocket
182/// Different kind of requests coded into THttpCallArg::Method
183/// "WS_CONNECT" - connection request
184/// "WS_READY" - connection ready
185/// "WS_CLOSE" - connection closed
186/// All other are normal data, which are delivered to users
187
188Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
189{
190 if (IsDisabled())
191 return kFALSE;
192
193 if (!arg->GetWSId())
194 return ProcessWS(arg.get());
195
196 // normally here one accept or reject connection requests
197 if (arg->IsMethod("WS_CONNECT"))
198 return ProcessWS(arg.get());
199
200 auto engine = FindEngine(arg->GetWSId());
201
202 if (arg->IsMethod("WS_READY")) {
203
204 if (engine) {
205 Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
206 RemoveEngine(engine, kTRUE);
207 }
208
209 engine = arg->TakeWSEngine();
210 {
211 std::lock_guard<std::mutex> grd(fMutex);
212 fEngines.emplace_back(engine);
213 }
214
215 if (!ProcessWS(arg.get())) {
216 // if connection refused, remove engine again
217 RemoveEngine(engine, kTRUE);
218 return kFALSE;
219 }
220
221 return kTRUE;
222 }
223
224 if (arg->IsMethod("WS_CLOSE")) {
225 // connection is closed, one can remove handle
226
227 RemoveEngine(engine);
228
229 return ProcessWS(arg.get());
230 }
231
232 if (engine && engine->PreProcess(arg)) {
233 PerformSend(engine);
234 return kTRUE;
235 }
236
237 Bool_t res = ProcessWS(arg.get());
238
239 if (engine)
240 engine->PostProcess(arg);
241
242 return res;
243}
244
245////////////////////////////////////////////////////////////////////////////////
246/// Close connection with given websocket id
247
249{
250 auto engine = FindEngine(wsid);
251
252 RemoveEngine(engine, kTRUE);
253}
254
255////////////////////////////////////////////////////////////////////////////////
256/// Send data stored in the buffer
257/// Returns 0 - when operation was executed immediately
258/// 1 - when send operation will be performed in different thread
259
260Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
261{
262 if (engine->fHasSendThrd) {
263 // all data are prepared - just notify thread
264 engine->fCond.notify_all();
265 return 1;
266 }
267
268 if (IsSyncMode() || !engine->SupportSendThrd()) {
269 // this is case of longpoll engine, no extra thread is required for it
270 if (engine->CanSendDirectly())
271 return PerformSend(engine);
272
273 // handling will be performed in following http request handler
274
275 if (!IsSyncMode()) return 1;
276
277 // now we should wait until next polling requests is processed
278 // or when connection is closed or handler is shutdown
279
280 Int_t sendcnt = fSendCnt, loopcnt(0);
281
282 while (!IsDisabled() && !engine->fDisabled) {
284 // if send counter changed - current send operation is completed
285 if (sendcnt != fSendCnt)
286 return 0;
287 if (loopcnt++ > 1000) {
288 loopcnt = 0;
289 std::this_thread::sleep_for(std::chrono::milliseconds(1));
290 }
291 }
292
293 return -1;
294 }
295
296 // probably this thread can continuously run
297 std::thread thrd([this, engine] {
298 while (!IsDisabled() && !engine->fDisabled) {
299 PerformSend(engine);
300 if (IsDisabled() || engine->fDisabled) break;
301 std::unique_lock<std::mutex> lk(engine->fCondMutex);
302 engine->fCond.wait(lk);
303 }
304 });
305
306 engine->fSendThrd.swap(thrd);
307
308 engine->fHasSendThrd = true;
309
310 return 1;
311}
312
313
314////////////////////////////////////////////////////////////////////////////////
315/// Perform send operation, stored in buffer
316
317Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
318{
319 {
320 std::lock_guard<std::mutex> grd(engine->fDataMutex);
321
322 // no need to do something - operation was processed already by somebody else
323 if (engine->fKind == THttpWSEngine::kNone)
324 return 0;
325
326 if (engine->fDoingSend)
327 return 1;
328 engine->fDoingSend = true;
329 }
330
331 if (IsDisabled() || engine->fDisabled)
332 return 0;
333
334 switch (engine->fKind) {
336 engine->Send(engine->fData.data(), engine->fData.length());
337 break;
339 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
340 break;
342 engine->SendCharStar(engine->fData.c_str());
343 break;
344 default:
345 break;
346 }
347
348 engine->fData.clear();
349 engine->fHdr.clear();
350
351 {
352 std::lock_guard<std::mutex> grd(engine->fDataMutex);
353 engine->fDoingSend = false;
354 engine->fKind = THttpWSEngine::kNone;
355 }
356
357 return CompleteSend(engine);
358}
359
360
361////////////////////////////////////////////////////////////////////////////////
362/// Complete current send operation
363
364Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
365{
366 fSendCnt++;
367 engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
368 CompleteWSSend(engine->GetId());
369 return 0; // indicates that operation is completed
370}
371
372
373////////////////////////////////////////////////////////////////////////////////
374/// Send binary data via given websocket id
375/// Returns -1 - in case of error
376/// 0 - when operation was executed immediately
377/// 1 - when send operation will be performed in different thread
378
379Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
380{
381 auto engine = FindEngine(wsid, kTRUE);
382 if (!engine) return -1;
383
384 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
385 engine->Send(buf, len);
386 return CompleteSend(engine);
387 }
388
389 // now we indicate that there is data and any thread can access it
390 {
391 std::lock_guard<std::mutex> grd(engine->fDataMutex);
392
393 if (engine->fKind != THttpWSEngine::kNone) {
394 Error("SendWS", "Data kind is not empty - something screwed up");
395 return -1;
396 }
397
398 engine->fData.resize(len);
399 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
400
401 engine->fDoingSend = false;
402 engine->fKind = THttpWSEngine::kData;
403 }
404
405 return RunSendingThrd(engine);
406}
407
408
409////////////////////////////////////////////////////////////////////////////////
410/// Send binary data with text header via given websocket id
411/// Returns -1 - in case of error,
412/// 0 - when operation was executed immediately,
413/// 1 - when send operation will be performed in different thread,
414
415Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
416{
417 auto engine = FindEngine(wsid, kTRUE);
418 if (!engine) return -1;
419
420 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
421 engine->SendHeader(hdr, buf, len);
422 return CompleteSend(engine);
423 }
424
425 // now we indicate that there is data and any thread can access it
426 {
427 std::lock_guard<std::mutex> grd(engine->fDataMutex);
428
429 if (engine->fKind != THttpWSEngine::kNone) {
430 Error("SendWS", "Data kind is not empty - something screwed up");
431 return -1;
432 }
433
434 engine->fHdr = hdr;
435 engine->fData.resize(len);
436 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
437
438 engine->fDoingSend = false;
439 engine->fKind = THttpWSEngine::kHeader;
440 }
441
442 return RunSendingThrd(engine);
443}
444
445////////////////////////////////////////////////////////////////////////////////
446/// Send string via given websocket id
447/// Returns -1 - in case of error,
448/// 0 - when operation was executed immediately,
449/// 1 - when send operation will be performed in different thread,
450
452{
453 auto engine = FindEngine(wsid, kTRUE);
454 if (!engine) return -1;
455
456 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
457 engine->SendCharStar(str);
458 return CompleteSend(engine);
459 }
460
461 // now we indicate that there is data and any thread can access it
462 {
463 std::lock_guard<std::mutex> grd(engine->fDataMutex);
464
465 if (engine->fKind != THttpWSEngine::kNone) {
466 Error("SendWS", "Data kind is not empty - something screwed up");
467 return -1;
468 }
469
470 engine->fData = str;
471
472 engine->fDoingSend = false;
473 engine->fKind = THttpWSEngine::kText;
474 }
475
476 return RunSendingThrd(engine);
477}
int Int_t
Definition: RtypesCore.h:41
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:363
R__EXTERN TSystem * gSystem
Definition: TSystem.h:540
enum THttpWSEngine::@144 kNone
! kind of operation
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method "WS_CONNECT" ...
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id Returns -1 - in case of error,...
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id Returns -1 - in case of error, 0 - when operation was executed imm...
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
THttpWSHandler.
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id Returns -1 - in case of error 0 - when operation was executed...
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer Returns 0 - when operation was executed immediately 1 - when send oper...
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition: TSystem.cxx:425
void swap(nlohmann::json &j1, nlohmann::json &j2) noexcept(is_nothrow_move_constructible< nlohmann::json >::value and is_nothrow_move_assignable< nlohmann::json >::value)
exchanges the values of two JSON objects
Definition: json.hpp:12929