Skip to content

Commit cde857b

Browse files
committed
WServer keepalive and server-side echo
- keepalive would send periodical unsolicited pong messages to peer as per https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3 - server-side echo. When activated server will echo incoming messages from any client to all other connected clients. This could be usefull for applications that share messages between all connected clients, i.e. WebUIs to reflect controls across all connected clients
1 parent e2ec190 commit cde857b

2 files changed

Lines changed: 153 additions & 76 deletions

File tree

src/AsyncWSocket.cpp

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void wsMaskPayload(uint32_t mask, size_t mask_offset, char *data, size_t length)
8888
}
8989

9090
size_t webSocketSendHeader(AsyncClient *client, WSMessageFrame& frame) {
91-
if (!client || !client->canSend()) {
91+
if (!client) {
9292
return 0;
9393
}
9494

@@ -373,7 +373,7 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
373373
}
374374
// receiving a new frame from here
375375
data += framelen;
376-
plen -= framelen;
376+
plen -= std::min(framelen, plen); // safety measure from bad parsing, we can't deduct more than sockbuff size
377377
} else {
378378
// continuation of existing frame
379379
size_t payload_len = std::min(static_cast<size_t>(_inFrame.len - _inFrame.index), plen);
@@ -416,20 +416,23 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
416416
}
417417

418418
// otherwise it's a close request from a peer - echo back close message as per https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1
419+
log_d("recv client's ws-close req");
419420
{
420-
log_d("recv client's ws-close req");
421421
#ifdef ESP32
422422
std::unique_lock<std::recursive_mutex> lockin(_inQlock);
423423
std::unique_lock<std::recursive_mutex> lockout(_outQlock);
424424
#endif
425-
// push message to recv Q, client might use it to understand disconnection reason
426-
_messageQueueIn.push_back(_inFrame.msg);
425+
// push message to recv Q if it has body, client might use it to understand disconnection reason
426+
if (_inFrame.len > 2)
427+
_messageQueueIn.push_back(_inFrame.msg);
427428
// purge the out Q and echo recieved frame back to client, once it's tcp-acked from the other side we can close tcp connection
428429
_messageQueueOut.clear();
429430
_messageQueueOut.push_front(_inFrame.msg);
430431
}
431432
_inFrame.msg.reset();
432-
_sendEvent(event_t::msgRecv);
433+
// send event only when message has body
434+
if (_inFrame.len > 2)
435+
_sendEvent(event_t::msgRecv);
433436
break;
434437
}
435438

@@ -522,7 +525,7 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
522525

523526
// check in-queue for overflow
524527
if (_messageQueueIn.size() >= _max_qcap){
525-
log_w("q overflow");
528+
log_w("q overflow, client id:%u, qsize:%u", id, _messageQueueIn.size());
526529
switch (_overflow_policy){
527530
case overflow_t::discard :
528531
// silently discard incoming message
@@ -610,7 +613,7 @@ WSocketClient::err_t WSocketClient::enqueueMessage(WSMessagePtr mptr){
610613
return err_t::ok;
611614
}
612615

613-
return err_t::nospace;
616+
return err_t::outQfull;
614617
}
615618

616619
WSMessagePtr WSocketClient::dequeueMessage(){
@@ -625,9 +628,15 @@ WSMessagePtr WSocketClient::dequeueMessage(){
625628
return msg;
626629
}
627630

628-
WSocketClient::err_t WSocketClient::canSend() const {
631+
WSMessagePtr WSocketClient::peekMessage(){
632+
return _messageQueueIn.size() ? _messageQueueIn.front() : WSMessagePtr();
633+
}
634+
635+
WSocketClient::err_t WSocketClient::state() const {
629636
if (_connection != conn_state_t::connected) return err_t::disconnected;
630-
if (_messageQueueOut.size() >= _max_qcap ) return err_t::nospace;
637+
if (_messageQueueOut.size() >= _max_qcap && _messageQueueIn.size() >= _max_qcap ) return err_t::Qsfull;
638+
if (_messageQueueIn.size() >= _max_qcap) return err_t::inQfull;
639+
if (_messageQueueOut.size() >= _max_qcap) return err_t::outQfull;
631640
return err_t::ok;
632641
}
633642

@@ -670,15 +679,18 @@ bool WSocketServer::newClient(AsyncWebServerRequest *request){
670679
#ifdef ESP32
671680
std::lock_guard<std::mutex> lock(clientslock);
672681
#endif
673-
_clients.emplace_back(getNextId(), request, [this](WSocketClient *c, WSocketClient::event_t e){
674-
if (eventHandler)
675-
eventHandler(c, e);
676-
else
677-
c->dequeueMessage(); }, // silently discard incoming messages when there is no callback set
682+
_clients.emplace_back(getNextId(), request,
683+
[this](WSocketClient *c, WSocketClient::event_t e){
684+
// server echo call
685+
if (e == WSocketClient::event_t::msgRecv) serverEcho(c);
686+
if (eventHandler)
687+
eventHandler(c, e);
688+
else
689+
c->dequeueMessage(); }, // silently discard incoming messages when there is no callback set
678690
msgsize, qcap);
679691
}
680692
_clients.back().setOverflowPolicy(_overflow_policy);
681-
_clients.back().setKeepALive(_keepAlivePeriod);
693+
_clients.back().setKeepAlive(_keepAlivePeriod);
682694
if (eventHandler) eventHandler(&_clients.back(), WSocketClient::event_t::connect);
683695
return true;
684696
}
@@ -718,24 +730,31 @@ void WSocketServer::handleRequest(AsyncWebServerRequest *request) {
718730
request->send(response);
719731
}
720732

721-
WSocketClient* WSocketServer::_getClient(uint32_t id) {
733+
WSocketClient* WSocketServer::getClient(uint32_t id) {
722734
auto iter = std::find_if(_clients.begin(), _clients.end(), [id](const WSocketClient &c) { return c.id == id; });
723735
if (iter != std::end(_clients))
724736
return &(*iter);
725737
else
726738
return nullptr;
727739
}
728740

729-
WSocketClient const* WSocketServer::_getClient(uint32_t id) const {
741+
WSocketClient const* WSocketServer::getClient(uint32_t id) const {
730742
const auto iter = std::find_if(_clients.cbegin(), _clients.cend(), [id](const WSocketClient &c) { return c.id == id; });
731743
if (iter != std::cend(_clients))
732744
return &(*iter);
733745
else
734746
return nullptr;
735747
}
736748

737-
WSocketServer::msgall_err_t WSocketServer::canSend() const {
738-
size_t cnt = std::count_if(std::begin(_clients), std::end(_clients), [](const WSocketClient &c) { return c.canSend() == WSocketClient::err_t::ok; });
749+
WSocketClient::err_t WSocketServer::clientState(uint32_t id) const {
750+
if (auto c = getClient(id))
751+
return c->state();
752+
else
753+
return WSocketClient::err_t::disconnected;
754+
};
755+
756+
WSocketServer::msgall_err_t WSocketServer::clientsState() const {
757+
size_t cnt = std::count_if(std::cbegin(_clients), std::cend(_clients), [](const WSocketClient &c) { return c.state() == WSocketClient::err_t::ok; });
739758
if (!cnt) return msgall_err_t::none;
740759
return cnt == _clients.size() ? msgall_err_t::ok : msgall_err_t::partial;
741760
}
@@ -752,7 +771,7 @@ WSocketServer::msgall_err_t WSocketServer::pingAll(const char *data, size_t len)
752771
}
753772

754773
WSocketClient::err_t WSocketServer::message(uint32_t id, WSMessagePtr m){
755-
if (WSocketClient *c = _getClient(id))
774+
if (WSocketClient *c = getClient(id))
756775
return c->enqueueMessage(std::move(m));
757776
else
758777
return WSocketClient::err_t::disconnected;
@@ -773,13 +792,26 @@ void WSocketServer::_purgeClients(){
773792
log_d("purging clients");
774793
std::lock_guard lock(clientslock);
775794
// purge clients that are disconnected and with all messages consumed
776-
std::erase_if(_clients, [](const WSocketClient& c){ return (c.status() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); });
795+
std::erase_if(_clients, [](const WSocketClient& c){ return (c.connection() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); });
777796
}
778797

779798
size_t WSocketServer::activeClientsCount() const {
780-
return std::count_if(std::begin(_clients), std::end(_clients), [](const WSocketClient &c) { return c.status() == WSocketClient::conn_state_t::connected; });
799+
return std::count_if(std::begin(_clients), std::end(_clients), [](const WSocketClient &c) { return c.connection() == WSocketClient::conn_state_t::connected; });
781800
};
782801

802+
void WSocketServer::serverEcho(WSocketClient *c){
803+
if (!_serverEcho) return;
804+
auto m = c->peekMessage();
805+
if (m && (m->type == WSFrameType_t::text || m->type == WSFrameType_t::binary) ){
806+
// echo only text or bin messages
807+
for (auto &i: _clients){
808+
if (!_serverEchoSplitHorizon || i.id != c->id){
809+
i.enqueueMessage(m);
810+
}
811+
}
812+
}
813+
}
814+
783815

784816
// ***** WSMessageClose implementation *****
785817

@@ -797,13 +829,20 @@ bool WSocketServerWorker::newClient(AsyncWebServerRequest *request){
797829
#ifdef ESP32
798830
std::lock_guard<std::mutex> lock (clientslock);
799831
#endif
800-
_clients.emplace_back(getNextId(), request, [this](WSocketClient *c, WSocketClient::event_t e){ if (_task_hndlr) xTaskNotifyGive(_task_hndlr); }, msgsize, qcap);
832+
_clients.emplace_back(getNextId(), request,
833+
[this](WSocketClient *c, WSocketClient::event_t e){
834+
log_d("client event id:%u state:%u", c->id, c->state());
835+
// server echo call
836+
if (e == WSocketClient::event_t::msgRecv) serverEcho(c);
837+
if (_task_hndlr) xTaskNotifyGive(_task_hndlr);
838+
},
839+
msgsize, qcap);
801840
}
802841

803842
// create events group where we'll pick events
804843
_clients.back().createEventGroupHandle();
805844
_clients.back().setOverflowPolicy(getOverflowPolicy());
806-
_clients.back().setKeepALive(_keepAlivePeriod);
845+
_clients.back().setKeepAlive(_keepAlivePeriod);
807846
xEventGroupSetBits(_clients.back().getEventGroupHandle(), enum2uint32(WSocketClient::event_t::connect));
808847
if (_task_hndlr)
809848
xTaskNotifyGive(_task_hndlr);
@@ -841,7 +880,6 @@ void WSocketServerWorker::_taskRunner(){
841880

842881
// check if this a new client
843882
uxBits = xEventGroupClearBits(it->getEventGroupHandle(), enum2uint32(WSocketClient::event_t::connect) );
844-
log_d("uxBits:%u", uxBits);
845883
if ( uxBits & enum2uint32(WSocketClient::event_t::connect) ){
846884
_ecb(WSocketClient::event_t::connect, it->id);
847885
}
@@ -865,7 +903,7 @@ void WSocketServerWorker::_taskRunner(){
865903
}
866904

867905
// check for disconnected client - do not care for group bits, cause if it's deleted, we will destruct the client object
868-
if (it->canSend() == WSocketClient::err_t::disconnected){
906+
if (it->connection() == WSocketClient::conn_state_t::disconnected){
869907
auto id = it->id;
870908
{
871909
#ifdef ESP32

0 commit comments

Comments
 (0)