83 std::vector<std::shared_ptr<THttpWSEngine>> clr;
86 std::lock_guard<std::mutex> grd(
fMutex);
90 for (
auto &eng : clr) {
91 eng->fDisabled =
true;
92 if (eng->fHasSendThrd) {
93 eng->fHasSendThrd =
false;
95 eng->fCond.notify_all();
96 eng->fSendThrd.join();
98 eng->ClearHandle(
kTRUE);
105 std::lock_guard<std::mutex> grd(
fMutex);
115 std::lock_guard<std::mutex> grd(
fMutex);
117 return (*iter)->GetId();
129 std::lock_guard<std::mutex> grd(
fMutex);
132 if (eng->GetId() == wsid) {
140 Error(
"FindEngine",
"Try to book next send operation before previous completed");
143 eng->fMTSend =
kTRUE;
159 std::lock_guard<std::mutex> grd(
fMutex);
162 if (*iter == engine) {
164 Error(
"RemoveEngine",
"Trying to remove WS engine during send operation");
166 engine->fDisabled =
true;
172 engine->ClearHandle(terminate);
174 if (engine->fHasSendThrd) {
175 engine->fHasSendThrd =
false;
176 if (engine->fWaiting)
177 engine->fCond.notify_all();
178 engine->fSendThrd.join();
199 if (arg->IsMethod(
"WS_CONNECT"))
204 if (arg->IsMethod(
"WS_READY")) {
207 Error(
"HandleWS",
"WS engine with similar id exists %u", arg->GetWSId());
211 engine = arg->TakeWSEngine();
213 std::lock_guard<std::mutex> grd(
fMutex);
226 if (arg->IsMethod(
"WS_CLOSE")) {
234 if (engine && engine->PreProcess(arg)) {
242 engine->PostProcess(arg);
264 if (
IsSyncMode() || !engine->SupportSendThrd()) {
266 if (engine->CanSendDirectly())
283 if (loopcnt++ > 1000) {
285 std::this_thread::sleep_for(std::chrono::milliseconds(1));
293 std::thread thrd([
this, engine] {
296 if (IsDisabled() || engine->fDisabled) break;
297 std::unique_lock<std::mutex> lk(engine->fMutex);
298 if (engine->fKind == THttpWSEngine::kNone) {
299 engine->fWaiting = true;
300 engine->fCond.wait(lk);
301 engine->fWaiting = false;
306 engine->fSendThrd.swap(thrd);
308 engine->fHasSendThrd =
true;
320 std::lock_guard<std::mutex> grd(engine->fMutex);
326 if (engine->fSending)
328 engine->fSending =
true;
334 switch (engine->fKind) {
336 engine->Send(engine->fData.data(), engine->fData.length());
339 engine->SendHeader(engine->fHdr.c_str(), engine->fData.data(), engine->fData.length());
342 engine->SendCharStar(engine->fData.c_str());
348 engine->fData.clear();
349 engine->fHdr.clear();
352 std::lock_guard<std::mutex> grd(engine->fMutex);
353 engine->fSending =
false;
367 engine->fMTSend =
false;
382 if (!engine)
return -1;
385 engine->Send(buf, len);
393 std::lock_guard<std::mutex> grd(engine->fMutex);
396 Error(
"SendWS",
"Data kind is not empty - something screwed up");
400 notify = engine->fWaiting;
404 engine->fData.resize(len);
405 std::copy((
const char *)buf, (
const char *)buf + len, engine->fData.begin());
408 if (engine->fHasSendThrd) {
409 if (notify) engine->fCond.notify_all();
426 if (!engine)
return -1;
429 engine->SendHeader(hdr, buf, len);
437 std::lock_guard<std::mutex> grd(engine->fMutex);
440 Error(
"SendWS",
"Data kind is not empty - something screwed up");
444 notify = engine->fWaiting;
449 engine->fData.resize(len);
450 std::copy((
const char *)buf, (
const char *)buf + len, engine->fData.begin());
453 if (engine->fHasSendThrd) {
454 if (notify) engine->fCond.notify_all();
470 if (!engine)
return -1;
473 engine->SendCharStar(str);
481 std::lock_guard<std::mutex> grd(engine->fMutex);
484 Error(
"SendWS",
"Data kind is not empty - something screwed up");
488 notify = engine->fWaiting;
494 if (engine->fHasSendThrd) {
495 if (notify) engine->fCond.notify_all();
R__EXTERN TSystem * gSystem
enum THttpWSEngine::@156 kNone
! kind of operation
Bool_t HandleWS(std::shared_ptr< THttpCallArg > &arg)
Process request to websocket Different kind of requests coded into THttpCallArg::Method "WS_CONNECT" ...
Int_t SendHeaderWS(UInt_t wsid, const char *hdr, const void *buf, int len)
Send binary data with text header via given websocket id Returns -1 - in case of error,...
Bool_t IsSyncMode() const
Returns processing mode of WS handler If sync mode is TRUE (default), all event processing and data s...
Bool_t IsDisabled() const
Returns true when processing of websockets is disabled, set shortly before handler need to be destroy...
std::vector< std::shared_ptr< THttpWSEngine > > fEngines
! list of active WS engines (connections)
void SetDisabled()
Disable all processing of websockets, normally called shortly before destructor.
void CloseWS(UInt_t wsid)
Close connection with given websocket id.
UInt_t GetWS(Int_t num=0)
Return websocket id with given sequential number Number of websockets returned with GetNumWS() method...
virtual ~THttpWSHandler()
destructor Make sure that all sending threads are stopped correctly
virtual Bool_t ProcessWS(THttpCallArg *arg)=0
Int_t SendCharStarWS(UInt_t wsid, const char *str)
Send string via given websocket id Returns -1 - in case of error, 0 - when operation was executed imm...
THttpWSHandler(const char *name, const char *title, Bool_t syncmode=kTRUE)
THttpWSHandler.
Int_t PerformSend(std::shared_ptr< THttpWSEngine > engine)
Perform send operation, stored in buffer.
Int_t CompleteSend(std::shared_ptr< THttpWSEngine > &engine)
Complete current send operation.
Int_t SendWS(UInt_t wsid, const void *buf, int len)
Send binary data via given websocket id Returns -1 - in case of error 0 - when operation was executed...
Int_t GetNumWS()
Returns current number of websocket connections.
Int_t RunSendingThrd(std::shared_ptr< THttpWSEngine > engine)
Send data stored in the buffer Returns 0 - when operation was executed immediately 1 - when send oper...
virtual Bool_t AllowMTSend() const
Allow send operations in separate threads (when supported by websocket engine)
Int_t fSendCnt
! counter for completed send operations
virtual void CompleteWSSend(UInt_t)
Method called when multi-threaded send operation is completed.
void RemoveEngine(std::shared_ptr< THttpWSEngine > &engine, Bool_t terminate=kFALSE)
Remove and destroy WS connection.
std::mutex fMutex
! protect list of engines
std::shared_ptr< THttpWSEngine > FindEngine(UInt_t id, Bool_t book_send=kFALSE)
Find websocket connection handle with given id If book_send parameter specified, have to book send op...
The TNamed class is the base class for all named ROOT classes.
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
virtual Bool_t ProcessEvents()
Process pending events (GUI, timers, sockets).