Logo ROOT   6.18/05
Reference Guide
THttpWSHandler.cxx
Go to the documentation of this file.
1// $Id$
2// Author: Sergey Linev 20/10/2017
3
4/*************************************************************************
5 * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12#include "THttpWSHandler.h"
13
14#include "THttpWSEngine.h"
15#include "THttpCallArg.h"
16#include "TSystem.h"
17
18#include <thread>
19#include <chrono>
20
21/////////////////////////////////////////////////////////////////////////
22///
23/// THttpWSHandler
24///
25/// Class for user-side handling of websocket with THttpServer
26/// 1. Create derived from THttpWSHandler class and implement
27/// ProcessWS() method, where all web sockets request handled.
28/// 2. Register instance of derived class to running THttpServer
29///
30/// TUserWSHandler *handler = new TUserWSHandler("name1","title");
31/// THttpServer *server = new THttpServer("http:8090");
32/// server->Register("/subfolder", handler)
33///
34/// 3. Now server can accept web socket connection from outside.
35/// For instance, from JavaScirpt one can connect to it with code:
36///
37/// var ws = new WebSocket("ws://hostname:8090/subfolder/name1/root.websocket")
38///
39/// 4. In the ProcessWS(THttpCallArg *arg) method following code should be implemented:
40///
41/// if (arg->IsMethod("WS_CONNECT")) {
42/// return true; // to accept incoming request
43/// }
44///
45/// if (arg->IsMethod("WS_READY")) {
46/// fWSId = arg->GetWSId(); // fWSId should be member of the user class
47/// return true; // connection established
48/// }
49///
50/// if (arg->IsMethod("WS_CLOSE")) {
51/// fWSId = 0;
52/// return true; // confirm close of socket
53/// }
54///
55/// if (arg->IsMethod("WS_DATA")) {
56/// // received data stored as POST data
57/// std::string str((const char *)arg->GetPostData(), arg->GetPostDataLength());
58/// std::cout << "got string " << str << std::endl;
59/// // immediately send data back using websocket id
60/// SendCharStarWS(fWSId, "our reply");
61/// return true;
62/// }
63///
64///////////////////////////////////////////////////////////////////////////
65
67
68////////////////////////////////////////////////////////////////////////////////
69/// normal constructor
70
71THttpWSHandler::THttpWSHandler(const char *name, const char *title, Bool_t syncmode) : TNamed(name, title), fSyncMode(syncmode)
72{
73}
74
75////////////////////////////////////////////////////////////////////////////////
76/// destructor
77/// Make sure that all sending threads are stopped correctly
78
80{
82
83 std::vector<std::shared_ptr<THttpWSEngine>> clr;
84
85 {
86 std::lock_guard<std::mutex> grd(fMutex);
87 std::swap(clr, fEngines);
88 }
89
90 for (auto &eng : clr) {
91 eng->fDisabled = true;
92 if (eng->fHasSendThrd) {
93 eng->fHasSendThrd = false;
94 eng->fCond.notify_all();
95 eng->fSendThrd.join();
96 }
97 eng->ClearHandle(kTRUE); // terminate connection before starting destructor
98 }
99}
100
101/// Returns current number of websocket connections
103{
104 std::lock_guard<std::mutex> grd(fMutex);
105 return fEngines.size();
106}
107
108////////////////////////////////////////////////////////////////////////////////
109/// Return websocket id with given sequential number
110/// Number of websockets returned with GetNumWS() method
111
113{
114 std::lock_guard<std::mutex> grd(fMutex);
115 auto iter = fEngines.begin() + num;
116 return (*iter)->GetId();
117}
118
119////////////////////////////////////////////////////////////////////////////////
120/// Find websocket connection handle with given id
121/// If book_send parameter specified, have to book send operation under the mutex
122
123std::shared_ptr<THttpWSEngine> THttpWSHandler::FindEngine(UInt_t wsid, Bool_t book_send)
124{
125 if (IsDisabled())
126 return nullptr;
127
128 std::lock_guard<std::mutex> grd(fMutex);
129
130 for (auto &eng : fEngines)
131 if (eng->GetId() == wsid) {
132
133 // not allow to work with disabled engine
134 if (eng->fDisabled)
135 return nullptr;
136
137 if (book_send) {
138 if (eng->fMTSend) {
139 Error("FindEngine", "Try to book next send operation before previous completed");
140 return nullptr;
141 }
142 eng->fMTSend = kTRUE;
143 }
144 return eng;
145 }
146
147 return nullptr;
148}
149
150////////////////////////////////////////////////////////////////////////////////
151/// Remove and destroy WS connection
152
153void THttpWSHandler::RemoveEngine(std::shared_ptr<THttpWSEngine> &engine, Bool_t terminate)
154{
155 if (!engine) return;
156
157 {
158 std::lock_guard<std::mutex> grd(fMutex);
159
160 for (auto iter = fEngines.begin(); iter != fEngines.end(); iter++)
161 if (*iter == engine) {
162 if (engine->fMTSend)
163 Error("RemoveEngine", "Trying to remove WS engine during send operation");
164
165 engine->fDisabled = true;
166 fEngines.erase(iter);
167 break;
168 }
169 }
170
171 engine->ClearHandle(terminate);
172
173 if (engine->fHasSendThrd) {
174 engine->fHasSendThrd = false;
175 engine->fCond.notify_all();
176 engine->fSendThrd.join();
177 }
178}
179
180////////////////////////////////////////////////////////////////////////////////
181/// Process request to websocket
182/// Different kind of requests coded into THttpCallArg::Method
183/// "WS_CONNECT" - connection request
184/// "WS_READY" - connection ready
185/// "WS_CLOSE" - connection closed
186/// All other are normal data, which are delivered to users
187
188Bool_t THttpWSHandler::HandleWS(std::shared_ptr<THttpCallArg> &arg)
189{
190 if (IsDisabled())
191 return kFALSE;
192
193 if (!arg->GetWSId())
194 return ProcessWS(arg.get());
195
196 // normally here one accept or reject connection requests
197 if (arg->IsMethod("WS_CONNECT"))
198 return ProcessWS(arg.get());
199
200 auto engine = FindEngine(arg->GetWSId());
201
202 if (arg->IsMethod("WS_READY")) {
203
204 if (engine) {
205 Error("HandleWS", "WS engine with similar id exists %u", arg->GetWSId());
206 RemoveEngine(engine, kTRUE);
207 }
208
209 engine = arg->TakeWSEngine();
210 {
211 std::lock_guard<std::mutex> grd(fMutex);
212 fEngines.emplace_back(engine);
213 }
214
215 if (!ProcessWS(arg.get())) {
216 // if connection refused, remove engine again
217 RemoveEngine(engine, kTRUE);
218 return kFALSE;
219 }
220
221 return kTRUE;
222 }
223
224 if (arg->IsMethod("WS_CLOSE")) {
225 // connection is closed, one can remove handle
226
227 RemoveEngine(engine);
228
229 return ProcessWS(arg.get());
230 }
231
232 if (engine && engine->PreProcess(arg)) {
233 PerformSend(engine);
234 return kTRUE;
235 }
236
237 Bool_t res = ProcessWS(arg.get());
238
239 if (engine)
240 engine->PostProcess(arg);
241
242 return res;
243}
244
245////////////////////////////////////////////////////////////////////////////////
246/// Close connection with given websocket id
247
249{
250 auto engine = FindEngine(wsid);
251
252 RemoveEngine(engine, kTRUE);
253}
254
255////////////////////////////////////////////////////////////////////////////////
256/// Send data stored in the buffer
257/// Returns 0 - when operation was executed immediately
258/// 1 - when send operation will be performed in different thread
259
260Int_t THttpWSHandler::RunSendingThrd(std::shared_ptr<THttpWSEngine> engine)
261{
262 if (engine->fHasSendThrd) {
263 // all data are prepared - just notify thread
264 engine->fCond.notify_all();
265 return 1;
266 }
267
268 if (IsSyncMode() || !engine->SupportSendThrd()) {
269 // this is case of longpoll engine, no extra thread is required for it
270 if (engine->CanSendDirectly())
271 return PerformSend(engine);
272
273 // handling will be performed in following http request handler
274
275 if (!IsSyncMode()) return 1;
276
277 // now we should wait until next polling requests is processed
278 // or when connection is closed or handler is shutdown
279
280 Int_t sendcnt = fSendCnt, loopcnt(0);
281
282 while (!IsDisabled() && !engine->fDisabled) {
284 // if send counter changed - current send operation is completed
285 if (sendcnt != fSendCnt)
286 return 0;
287 if (loopcnt++ > 1000) {
288 loopcnt = 0;
289 std::this_thread::sleep_for(std::chrono::milliseconds(1));
290 }
291 }
292
293 return -1;
294 }
295
296 // probably this thread can continuously run
297 std::thread thrd([this, engine] {
298 while (!IsDisabled() && !engine->fDisabled) {
299 PerformSend(engine);
300 if (IsDisabled() || engine->fDisabled) break;
301 std::unique_lock<std::mutex> lk(engine->fDataMutex);
302 if (engine->fKind == THttpWSEngine::kNone)
303 engine->fCond.wait(lk);
304 }
305 });
306
307 engine->fSendThrd.swap(thrd);
308
309 engine->fHasSendThrd = true;
310
311 return 1;
312}
313
314
315////////////////////////////////////////////////////////////////////////////////
316/// Perform send operation, stored in buffer
317
318Int_t THttpWSHandler::PerformSend(std::shared_ptr<THttpWSEngine> engine)
319{
320 {
321 std::lock_guard<std::mutex> grd(engine->fDataMutex);
322
323 // no need to do something - operation was processed already by somebody else
324 if (engine->fKind == THttpWSEngine::kNone)
325 return 0;
326
327 if (engine->fDoingSend)
328 return 1;
329 engine->fDoingSend = true;
330 }
331
332 if (IsDisabled() || engine->fDisabled)
333 return 0;
334
335 switch (engine->fKind) {
337 engine->Send(engine->fData.data(), engine->fData.length());
338 break;
340 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
341 break;
343 engine->SendCharStar(engine->fData.c_str());
344 break;
345 default:
346 break;
347 }
348
349 engine->fData.clear();
350 engine->fHdr.clear();
351
352 {
353 std::lock_guard<std::mutex> grd(engine->fDataMutex);
354 engine->fDoingSend = false;
355 engine->fKind = THttpWSEngine::kNone;
356 }
357
358 return CompleteSend(engine);
359}
360
361
362////////////////////////////////////////////////////////////////////////////////
363/// Complete current send operation
364
365Int_t THttpWSHandler::CompleteSend(std::shared_ptr<THttpWSEngine> &engine)
366{
367 fSendCnt++;
368 engine->fMTSend = false; // probably we do not need to lock mutex to reset flag
369 CompleteWSSend(engine->GetId());
370 return 0; // indicates that operation is completed
371}
372
373
374////////////////////////////////////////////////////////////////////////////////
375/// Send binary data via given websocket id
376/// Returns -1 - in case of error
377/// 0 - when operation was executed immediately
378/// 1 - when send operation will be performed in different thread
379
380Int_t THttpWSHandler::SendWS(UInt_t wsid, const void *buf, int len)
381{
382 auto engine = FindEngine(wsid, kTRUE);
383 if (!engine) return -1;
384
385 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
386 engine->Send(buf, len);
387 return CompleteSend(engine);
388 }
389
390 // now we indicate that there is data and any thread can access it
391 {
392 std::lock_guard<std::mutex> grd(engine->fDataMutex);
393
394 if (engine->fKind != THttpWSEngine::kNone) {
395 Error("SendWS", "Data kind is not empty - something screwed up");
396 return -1;
397 }
398
399 engine->fData.resize(len);
400 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
401
402 engine->fDoingSend = false;
403 engine->fKind = THttpWSEngine::kData;
404 }
405
406 return RunSendingThrd(engine);
407}
408
409
410////////////////////////////////////////////////////////////////////////////////
411/// Send binary data with text header via given websocket id
412/// Returns -1 - in case of error,
413/// 0 - when operation was executed immediately,
414/// 1 - when send operation will be performed in different thread,
415
416Int_t THttpWSHandler::SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
417{
418 auto engine = FindEngine(wsid, kTRUE);
419 if (!engine) return -1;
420
421 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
422 engine->SendHeader(hdr, buf, len);
423 return CompleteSend(engine);
424 }
425
426 // now we indicate that there is data and any thread can access it
427 {
428 std::lock_guard<std::mutex> grd(engine->fDataMutex);
429
430 if (engine->fKind != THttpWSEngine::kNone) {
431 Error("SendWS", "Data kind is not empty - something screwed up");
432 return -1;
433 }
434
435 engine->fHdr = hdr;
436 engine->fData.resize(len);
437 std::copy((const char *)buf, (const char *)buf + len, engine->fData.begin());
438
439 engine->fDoingSend = false;
440 engine->fKind = THttpWSEngine::kHeader;
441 }
442
443 return RunSendingThrd(engine);
444}
445
446////////////////////////////////////////////////////////////////////////////////
447/// Send string via given websocket id
448/// Returns -1 - in case of error,
449/// 0 - when operation was executed immediately,
450/// 1 - when send operation will be performed in different thread,
451
453{
454 auto engine = FindEngine(wsid, kTRUE);
455 if (!engine) return -1;
456
457 if ((IsSyncMode() || !AllowMTSend()) && engine->CanSendDirectly()) {
458 engine->SendCharStar(str);
459 return CompleteSend(engine);
460 }
461
462 // now we indicate that there is data and any thread can access it
463 {
464 std::lock_guard<std::mutex> grd(engine->fDataMutex);
465
466 if (engine->fKind != THttpWSEngine::kNone) {
467 Error("SendWS", "Data kind is not empty - something screwed up");
468 return -1;
469 }
470
471 engine->fData = str;
472
473 engine->fDoingSend = false;
474 engine->fKind = THttpWSEngine::kText;
475 }
476
477 return RunSendingThrd(engine);
478}
int Int_t
Definition: RtypesCore.h:41
unsigned int UInt_t
Definition: RtypesCore.h:42
const Bool_t kFALSE
Definition: RtypesCore.h:88
bool Bool_t
Definition: RtypesCore.h:59
const Bool_t kTRUE
Definition: RtypesCore.h:87
#define ClassImp(name)
Definition: Rtypes.h:365
char name[80]
Definition: TGX11.cxx:109
R__EXTERN TSystem * gSystem
Definition: TSystem.h:560
enum THttpWSEngine::@151 kNone
! kind of operation
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method "WS_CONNECT" ...
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id Returns -1 - in case of error,...
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id Returns -1 - in case of error, 0 - when operation was executed imm...
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
THttpWSHandler.
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id Returns -1 - in case of error 0 - when operation was executed...
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer Returns 0 - when operation was executed immediately 1 - when send oper...
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
Definition: TNamed.h:29
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:880
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).
Definition: TSystem.cxx:425
void swap(RDirectoryEntry &e1, RDirectoryEntry &e2) noexcept