Skip to content

Commit e2ec190

Browse files
committed
AsyncWSocket keepalive pongs
- keepalive pongs implementation for Client class - message size/q cap for WSocketServer class
1 parent bd4f022 commit e2ec190

2 files changed

Lines changed: 93 additions & 26 deletions

File tree

src/AsyncWSocket.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,17 +152,18 @@ WSocketClient::WSocketClient(uint32_t id, AsyncWebServerRequest *request, WSocke
152152
_max_msgsize(msgsize),
153153
_max_qcap(qcapsize)
154154
{
155+
_lastPong = millis();
155156
// disable connection timeout
156157
_client->setRxTimeout(0);
157158
_client->setNoDelay(true);
158159
// set AsyncTCP callbacks
159160
_client->onAck( [](void *r, AsyncClient *c, size_t len, uint32_t rtt) { (void)c; reinterpret_cast<WSocketClient*>(r)->_clientSend(len); }, this );
160161
//_client->onAck( [](void *r, AsyncClient *c, size_t len, uint32_t rtt) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onAck(len, rtt); }, this );
161-
_client->onDisconnect( [](void *r, AsyncClient *c) { reinterpret_cast<WSocketClient*>(r)->_onDisconnect(c); }, this );
162-
_client->onTimeout( [](void *r, AsyncClient *c, uint32_t time) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onTimeout(time); }, this );
163-
_client->onData( [](void *r, AsyncClient *c, void *buf, size_t len) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onData(buf, len); }, this );
164-
_client->onPoll( [](void *r, AsyncClient *c) { (void)c; reinterpret_cast<WSocketClient*>(r)->_clientSend(); }, this );
165-
_client->onError( [](void *r, AsyncClient *c, int8_t error) { (void)c; log_e("err:%d", error); }, this );
162+
_client->onDisconnect( [](void *r, AsyncClient *c) { reinterpret_cast<WSocketClient*>(r)->_onDisconnect(c); }, this );
163+
_client->onTimeout( [](void *r, AsyncClient *c, uint32_t time) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onTimeout(time); }, this );
164+
_client->onData( [](void *r, AsyncClient *c, void *buf, size_t len) { (void)c; reinterpret_cast<WSocketClient*>(r)->_onData(buf, len); }, this );
165+
_client->onPoll( [](void *r, AsyncClient *c) { (void)c; reinterpret_cast<WSocketClient*>(r)->_keepalive(); reinterpret_cast<WSocketClient*>(r)->_clientSend(); }, this );
166+
_client->onError( [](void *r, AsyncClient *c, int8_t error) { (void)c; log_e("err:%d", error); }, this );
166167
delete request;
167168
}
168169

@@ -652,6 +653,13 @@ void WSocketClient::_sendEvent(event_t e){
652653
_cb(this, e);
653654
}
654655

656+
void WSocketClient::_keepalive(){
657+
if (millis() - _lastPong > _keepAlivePeriod){
658+
enqueueMessage(std::make_shared< WSMessageContainer<std::string> >(WSFrameType_t::pong, true, "WSocketClient Pong" ));
659+
_lastPong = millis();
660+
}
661+
}
662+
655663

656664
// ***** WSocketServer implementation *****
657665

@@ -666,10 +674,11 @@ bool WSocketServer::newClient(AsyncWebServerRequest *request){
666674
if (eventHandler)
667675
eventHandler(c, e);
668676
else
669-
c->dequeueMessage(); } // silently discard incoming messages when there is no callback set
670-
);
677+
c->dequeueMessage(); }, // silently discard incoming messages when there is no callback set
678+
msgsize, qcap);
671679
}
672680
_clients.back().setOverflowPolicy(_overflow_policy);
681+
_clients.back().setKeepALive(_keepAlivePeriod);
673682
if (eventHandler) eventHandler(&_clients.back(), WSocketClient::event_t::connect);
674683
return true;
675684
}
@@ -742,6 +751,13 @@ WSocketServer::msgall_err_t WSocketServer::pingAll(const char *data, size_t len)
742751
return cnt == _clients.size() ? msgall_err_t::ok : msgall_err_t::partial;
743752
}
744753

754+
WSocketClient::err_t WSocketServer::message(uint32_t id, WSMessagePtr m){
755+
if (WSocketClient *c = _getClient(id))
756+
return c->enqueueMessage(std::move(m));
757+
else
758+
return WSocketClient::err_t::disconnected;
759+
}
760+
745761
WSocketServer::msgall_err_t WSocketServer::messageAll(WSMessagePtr m){
746762
size_t cnt{0};
747763
for (auto &c : _clients) {
@@ -781,12 +797,13 @@ bool WSocketServerWorker::newClient(AsyncWebServerRequest *request){
781797
#ifdef ESP32
782798
std::lock_guard<std::mutex> lock (clientslock);
783799
#endif
784-
_clients.emplace_back(getNextId(), request, [this](WSocketClient *c, WSocketClient::event_t e){ if (_task_hndlr) xTaskNotifyGive(_task_hndlr); });
800+
_clients.emplace_back(getNextId(), request, [this](WSocketClient *c, WSocketClient::event_t e){ if (_task_hndlr) xTaskNotifyGive(_task_hndlr); }, msgsize, qcap);
785801
}
786802

787803
// create events group where we'll pick events
788804
_clients.back().createEventGroupHandle();
789805
_clients.back().setOverflowPolicy(getOverflowPolicy());
806+
_clients.back().setKeepALive(_keepAlivePeriod);
790807
xEventGroupSetBits(_clients.back().getEventGroupHandle(), enum2uint32(WSocketClient::event_t::connect));
791808
if (_task_hndlr)
792809
xTaskNotifyGive(_task_hndlr);

src/AsyncWSocket.h

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class WSMessageClose : public WSMessageContainer<std::string> {
211211
WSMessageClose (uint16_t status, Args&&... args) : WSMessageContainer<std::string>(WSFrameType_t::close, true, std::forward<Args>(args)...), _status_code(status) {
212212
// convert code to message body
213213
uint16_t buff = htons (status);
214-
container.append((char*)(&buff), 2);
214+
container.insert(0, (char*)(&buff), 2);
215215
};
216216

217217
uint16_t getStatusCode() const override { return _status_code; }
@@ -379,6 +379,18 @@ class WSocketClient {
379379
// check if client can enqueue and send new messages
380380
err_t canSend() const;
381381

382+
/**
383+
* @brief Set the WebSOcket ping Keep A Live
384+
* if set, client will send pong packet it's peer periodically to keep the connection alive
385+
* ping does not require a reply from peer
386+
*
387+
* @param seconds
388+
*/
389+
void setKeepALive(size_t seconds){ _keepAlivePeriod = seconds * 1000; };
390+
391+
// get keepalive value
392+
size_t getKeepALive() const { return _keepAlivePeriod / 1000; };
393+
382394
/**
383395
* @brief access Event Group Handle for the client
384396
*
@@ -422,6 +434,9 @@ class WSocketClient {
422434
// in-flight data credits
423435
size_t _in_flight_credit{WS_IN_FLIGHT_CREDITS};
424436

437+
// keepalive
438+
unsigned long _keepAlivePeriod{0}, _lastPong;
439+
425440
/**
426441
* @brief go through out Q and send message data ()
427442
* @note this method will grab a mutex lock on outQ internally
@@ -451,6 +466,8 @@ class WSocketClient {
451466
*/
452467
void _sendEvent(event_t e);
453468

469+
void _keepalive();
470+
454471
// AsyncTCP callbacks
455472
void _onTimeout(uint32_t time);
456473
void _onDisconnect(AsyncClient *c);
@@ -459,8 +476,8 @@ class WSocketClient {
459476

460477
/**
461478
* @brief WebServer Handler implementation that plays the role of a WebSocket server
462-
* it inherits behavior of original WebSocket server and uses AsyncTCP callback
463-
* to handle incoming messages
479+
* it inherits behavior of original WebSocket server and uses AsyncTCP's thread to
480+
* run callbacks on incoming messages
464481
*
465482
*/
466483
class WSocketServer : public AsyncWebHandler {
@@ -472,9 +489,34 @@ class WSocketServer : public AsyncWebHandler {
472489
none // no clients or all outbound queues are full, message discarded
473490
};
474491

475-
explicit WSocketServer(const char* url, WSocketClient::event_cb_t handler = {}) : _url(url), eventHandler(handler) {}
492+
explicit WSocketServer(const char* url, WSocketClient::event_cb_t handler = {}, size_t msgsize = 8 * 1024, size_t qcap = 4) : _url(url), eventHandler(handler) {}
476493
~WSocketServer() = default;
477494

495+
/**
496+
* @copydoc WSClient::setOverflowPolicy(overflow_t policy)
497+
*/
498+
void setOverflowPolicy(WSocketClient::overflow_t policy){ _overflow_policy = policy; }
499+
WSocketClient::overflow_t getOverflowPolicy() const { return _overflow_policy; }
500+
501+
/**
502+
* @brief Set Message Size limit for the incoming messages
503+
* if peer tries to send us message larger then defined limit size,
504+
* the message will be discarded and peer's connection would be closed with respective error code
505+
* @note only new connections would be affected with changed value
506+
*
507+
* @param size
508+
*/
509+
void setMaxMessageSize(size_t size){ msgsize = size; }
510+
size_t getMaxMessageSize(size_t size) const { return msgsize; }
511+
512+
/**
513+
* @brief Set in/out Message Queue Size
514+
*
515+
* @param size
516+
*/
517+
void setMessageQueueSize(size_t size){ qcap = size; }
518+
size_t getMessageQueueSize(size_t size){ return qcap; }
519+
478520
/**
479521
* @brief check if client with specified id can accept new message for sending
480522
*
@@ -498,12 +540,6 @@ class WSocketServer : public AsyncWebHandler {
498540
return _getClient(id) != nullptr;
499541
}
500542

501-
/**
502-
* @copydoc WSClient::setOverflowPolicy(overflow_t policy)
503-
*/
504-
void setOverflowPolicy(WSocketClient::overflow_t policy){ _overflow_policy = policy; }
505-
WSocketClient::overflow_t getOverflowPolicy() const { return _overflow_policy; }
506-
507543
/**
508544
* @brief disconnect client
509545
*
@@ -549,21 +585,30 @@ class WSocketServer : public AsyncWebHandler {
549585
msgall_err_t pingAll(const char *data = NULL, size_t len = 0);
550586

551587
/**
552-
* @brief send message to specific client
588+
* @brief Set the WebSocket client Keep A Live
589+
* if set, server will pong it's peers periodically to keep connections alive
590+
* @note it does not check for replies and it's validity, it only sends messages to
591+
* help keep TCP connection alive through firewalls/routers
592+
*
593+
* @param seconds
594+
*/
595+
void setKeepALive(size_t seconds){ _keepAlivePeriod = seconds; };
596+
597+
// get keepalive value
598+
size_t getKeepALive() const { return _keepAlivePeriod; };
599+
600+
601+
/**
602+
* @brief send generic message to specific client
553603
*
554604
* @param id
555605
* @param m
556606
* @return WSocketClient::err_t
557607
*/
558-
WSocketClient::err_t message(uint32_t id, WSMessagePtr m){
559-
if (WSocketClient *c = _getClient(id))
560-
return c->enqueueMessage(std::move(m));
561-
else
562-
return WSocketClient::err_t::disconnected;
563-
}
608+
WSocketClient::err_t message(uint32_t id, WSMessagePtr m);
564609

565610
/**
566-
* @brief send message to all available clients
611+
* @brief send genric message to all available clients
567612
*
568613
* @param m
569614
* @return msgall_err_t
@@ -629,6 +674,11 @@ class WSocketServer : public AsyncWebHandler {
629674
#endif
630675
// WSocketClient events handler
631676
WSocketClient::event_cb_t eventHandler;
677+
unsigned long _keepAlivePeriod{0};
678+
// max message size
679+
size_t msgsize;
680+
// client's queue capacity
681+
size_t qcap;
632682

633683
// return next available client's ID
634684
uint32_t getNextId() {

0 commit comments

Comments
 (0)