Skip to content

Commit 8c56d01

Browse files
committed
WSMessageStaticBlob to zero-copy send large data
A message that carries a pointer to arbitrary blob of data it could be used to send large blocks of memory in zero-copy mode, i.e. avoid intermediary buffering copies. + bunch of bugfixes for dataflow control, frame assembly, message containers methods
1 parent abc9aa9 commit 8c56d01

2 files changed

Lines changed: 152 additions & 68 deletions

File tree

src/AsyncWSocket.cpp

Lines changed: 56 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
#include "AsyncWSocket.h"
77
#include "literals.h"
88

9+
#define WS_MAX_HEADER_SIZE 16
10+
911
constexpr const char WS_STR_CONNECTION[] = "Connection";
1012
constexpr const char WS_STR_VERSION[] = "Sec-WebSocket-Version";
1113
constexpr const char WS_STR_KEY[] = "Sec-WebSocket-Key";
1214
constexpr const char WS_STR_PROTOCOL[] = "Sec-WebSocket-Protocol";
1315

14-
// WSockServer worke task
16+
// WSockServer worker task
1517
constexpr const char WS_SRV_TASK[] = "WSSrvtask";
1618

1719
// cast enum class to uint (for bit set)
@@ -138,6 +140,7 @@ size_t webSocketSendHeader(AsyncClient *client, WSMessageFrame& frame) {
138140
}
139141

140142
size_t sent = client->add((const char*)buf, headLen);
143+
//log_d("send ws header, hdr size:%u, body len:%u", headLen, frame.len);
141144
// return size of a header added or 0 if any error
142145
return sent == headLen ? sent : 0;
143146
}
@@ -155,6 +158,7 @@ WSocketClient::WSocketClient(uint32_t id, AsyncWebServerRequest *request, WSocke
155158
_lastPong = millis();
156159
// disable connection timeout
157160
_client->setRxTimeout(0);
161+
// disable Nagle's algo
158162
_client->setNoDelay(true);
159163
// set AsyncTCP callbacks
160164
_client->onAck( [](void *r, AsyncClient *c, size_t len, uint32_t rtt) { (void)c; reinterpret_cast<WSocketClient*>(r)->_clientSend(len); }, this );
@@ -184,7 +188,7 @@ WSocketClient::~WSocketClient() {
184188
//#ifdef NOTHING
185189
// callback acknowledges sending pieces of data for outgoing frame
186190
void WSocketClient::_clientSend(size_t acked_bytes){
187-
if (!_client || _connection == conn_state_t::disconnected || !_client->space())
191+
if (!_client || _connection == conn_state_t::disconnected)
188192
return;
189193

190194
/*
@@ -201,14 +205,13 @@ void WSocketClient::_clientSend(size_t acked_bytes){
201205
// Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
202206
// That way we could balance on having half the buffer in-flight while another half is filling up and minimizing events in asynctcp's Q
203207
if (acked_bytes){
204-
// if it's the ack call from AsyncTCP - wait for lock!
205-
lock.lock();
206-
log_d("_clientSend, ack:%u/%u, space:%u", acked_bytes, _in_flight, _client ? _client->space() : 0);
207-
208+
auto sock_space = _client->space();
209+
//log_d("ack:%u/%u, sock space:%u", acked_bytes, _in_flight, sock_space);
208210
_in_flight -= std::min(acked_bytes, _in_flight);
209-
// return buffer credit on acked data
210-
++_in_flight_credit;
211-
log_d("infl:%u, credits:%u, conn state:%u", _in_flight, _in_flight_credit, _connection);
211+
if (!sock_space){
212+
return;
213+
}
214+
//log_d("infl:%u, credits:%u", _in_flight, _in_flight_credit);
212215
// check if we were waiting to ack our disconnection frame
213216
if (!_in_flight && (_connection == conn_state_t::disconnecting)){
214217
log_d("closing tcp-conn");
@@ -217,25 +220,36 @@ void WSocketClient::_clientSend(size_t acked_bytes){
217220
_client->close();
218221
return;
219222
}
223+
224+
// if it's the ack call from AsyncTCP - wait for lock!
225+
lock.lock();
220226

221227
} else {
222228
// if there is no acked data - just quit if won't be able to grab a lock, we are already sending something
223229
if (!lock.try_lock())
224230
return;
231+
232+
auto sock_space = _client->space();
233+
log_d("no ack infl:%u, space:%u, data pending:%u", _in_flight, sock_space, (uint32_t)(_outFrame.len - _outFrame.index));
234+
//
235+
if (!sock_space)
236+
return;
225237
}
226238

227-
if (_in_flight > _client->space() || !_in_flight_credit) {
228-
log_d("defer ws send call, in-flight:%u/%u, credit:%u", _in_flight, _client->space(), _in_flight_credit);
239+
// ignore the call if available sock space is smaller then acked data and we won't be able to fit message's ramainder there
240+
// this will reduce AsyncTCP's event Q pressure under heavy load
241+
if ((_outFrame.msg && (_outFrame.len - _outFrame.index > _client->space())) && (_client->space() < acked_bytes) ){
242+
log_d("defer ws send call, in-flight:%u/%u", _in_flight, _client->space());
229243
return;
230244
}
231245

232-
// no message in transit, try to evict one from a Q
233-
if (!_outFrame.msg){
246+
// no message in transit and we have enough space in sockbuff - try to evict new msg from a Q
247+
if (!_outFrame.msg && _client->space() > WS_MAX_HEADER_SIZE){
234248
if (_evictOutQueue()){
235-
// generate header and add to the socket buffer
249+
// generate header and add to the socket buffer. todo: check returned size?
236250
_in_flight += webSocketSendHeader(_client, _outFrame);
237251
} else
238-
return; // nothing to send now
252+
return; // nothing to send now
239253
}
240254

241255
// if there is a pending _outFrame - send the data from there
@@ -250,14 +264,15 @@ void WSocketClient::_clientSend(size_t acked_bytes){
250264
_outFrame.index += payload_pend;
251265
_outFrame.chunk_offset += payload_pend;
252266
_in_flight += payload_pend;
267+
//size_t l = _outFrame.len;
268+
//log_d("add to sock:%u, fidx:%u/%u, infl:%u", payload_pend, (uint32_t)_outFrame.index, (uint32_t)_outFrame.len, _in_flight);
253269
}
254270

255271
if (_outFrame.index == _outFrame.len){
256272
// if we complete writing entire message, send the frame right away
257273
// increment in-flight counter and take the credit
258274
if (!_client->send())
259275
_client->abort();
260-
--_in_flight_credit;
261276

262277
if (_outFrame.msg->type == WSFrameType_t::close){
263278
// if we just sent close frame, then change client state and purge out queue, we won't transmit anything from now on
@@ -274,8 +289,8 @@ void WSocketClient::_clientSend(size_t acked_bytes){
274289
// no use case for this for now
275290
//_sendEvent(event_t::msgSent);
276291

277-
// if there are free in-flight credits try to pull next msg from Q
278-
if (_in_flight_credit && _evictOutQueue()){
292+
// if there are free in-flight credits and buffer space available try to pull next msg from Q
293+
if (_client->space() > WS_MAX_HEADER_SIZE && _evictOutQueue()){
279294
// generate header and add to the socket buffer
280295
_in_flight += webSocketSendHeader(_client, _outFrame);
281296
continue;
@@ -284,12 +299,10 @@ void WSocketClient::_clientSend(size_t acked_bytes){
284299
}
285300
}
286301

287-
if (!_client->space()){
302+
if (_client->space() <= WS_MAX_HEADER_SIZE){
288303
// we have exhausted socket buffer, send it and quit
289304
if (!_client->send())
290305
_client->abort();
291-
// take in-flight credit
292-
--_in_flight_credit;
293306
return;
294307
}
295308

@@ -300,8 +313,6 @@ void WSocketClient::_clientSend(size_t acked_bytes){
300313
if (next_chunk_size == 0){
301314
// chunk is not ready yet, need to async wait and return for data later, we quit here and reevaluate on next ack or poll event from AsyncTCP
302315
if (!_client->send()) _client->abort();
303-
// take in-flight credit
304-
--_in_flight_credit;
305316
return;
306317
} else if (next_chunk_size == -1){
307318
// something is wrong! there would be no more chunked data but the message has not reached it's full size yet, can do nothing but close the coonections
@@ -318,7 +329,7 @@ void WSocketClient::_clientSend(size_t acked_bytes){
318329

319330
bool WSocketClient::_evictOutQueue(){
320331
// check if we have something in the Q and enough sock space to send a header at least
321-
if (_messageQueueOut.size() && _client->space() > 16 ){
332+
if (_messageQueueOut.size() && _client->space() > WS_MAX_HEADER_SIZE ){
322333
{
323334
#ifdef ESP32
324335
std::unique_lock<std::recursive_mutex> lockout(_outQlock);
@@ -372,8 +383,8 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
372383
return;
373384
}
374385
// receiving a new frame from here
375-
data += framelen;
376-
plen -= std::min(framelen, plen); // safety measure from bad parsing, we can't deduct more than sockbuff size
386+
data += std::min(framelen, plen); // safety measure from bad parsing, we can't deduct more than sockbuff size
387+
plen -= std::min(framelen, plen);
377388
} else {
378389
// continuation of existing frame
379390
size_t payload_len = std::min(static_cast<size_t>(_inFrame.len - _inFrame.index), plen);
@@ -389,7 +400,7 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
389400

390401
// if we got whole frame now
391402
if (_inFrame.index == _inFrame.len){
392-
Serial.printf("_onData, cmplt msg len:%lu\n", _inFrame.len);
403+
log_d("_onData, cmplt msg len:%u", (uint32_t)_inFrame.len);
393404

394405
if (_inFrame.msg->getStatusCode() == 1007){
395406
// this is a dummy/corrupted message, we discard it
@@ -403,7 +414,7 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
403414
if (_connection == conn_state_t::disconnecting){
404415
log_d("recv close ack");
405416
// if it was ws-close ack - we can close TCP connection
406-
_connection == conn_state_t::disconnected;
417+
_connection = conn_state_t::disconnected;
407418
// normally we should call close() here and wait for other side also close tcp connection with TCP-FIN, but
408419
// for various reasons ws clients could linger connection when received TCP-FIN not closing it from the app side (even after
409420
// two side ws-close exchange, i.e. websocat, websocket-client)
@@ -497,7 +508,7 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
497508
offset += 8;
498509
}
499510

500-
log_d("new hdr, sock data:%u, msg body size:%u", len, frame.len);
511+
log_d("recv hdr, sock data:%u, msg body size:%u", len, frame.len);
501512

502513
// if ws.close() is called, Safari sends a close frame with plen 2 and masked bit set. We must not try to read mask key from beyond packet size
503514
if (masked && len >= offset + 4) {
@@ -512,12 +523,12 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
512523

513524
size_t bodylen = std::min(static_cast<size_t>(frame.len), len - offset);
514525
if (!bodylen){
515-
// if there is no body in message, then it must a specific control message with no payload
526+
// if there is no body in message, then it must be a specific control message with no payload
516527
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode));
517528
} else {
518529
if (frame.len > _max_msgsize){
519530
// message is bigger than we are allowed to accept, create a dummy container for it, it will just discard all incoming data
520-
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode, 1007)); // code 'Invalid frame payload data'
531+
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode), 1007); // code 'Invalid frame payload data'
521532
offset += bodylen;
522533
_inFrame.index = bodylen;
523534
return {offset, 1009}; // code 'message too big'
@@ -529,14 +540,14 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
529540
switch (_overflow_policy){
530541
case overflow_t::discard :
531542
// silently discard incoming message
532-
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode, 1007)); // code 'Invalid frame payload data'
543+
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode), 1007); // code 'Invalid frame payload data'
533544
offset += bodylen;
534545
_inFrame.index = bodylen;
535546
return {offset, 0};
536547

537548
case overflow_t::disconnect : {
538549
// discard incoming message and send close message
539-
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode, 1007)); // code 'Invalid frame payload data'
550+
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode), 1007); // code 'Invalid frame payload data'
540551
#ifdef ESP32
541552
std::lock_guard<std::recursive_mutex> lock(_inQlock);
542553
#endif
@@ -573,11 +584,9 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
573584

574585
case WSFrameType_t::close : {
575586
uint16_t status_code = ntohs(*(uint16_t*)(data + offset));
576-
offset += 2;
577587
if (bodylen > 2){
578-
bodylen -= 2;
579588
// create a text message container consuming as much data as possible from current payload
580-
_inFrame.msg = std::make_shared<WSMessageClose>(status_code, data + offset, bodylen);
589+
_inFrame.msg = std::make_shared<WSMessageClose>(status_code, data + offset + 2, bodylen -2); // deduce 2 bytes of message code
581590
} else {
582591
// must be close message w/o body
583592
_inFrame.msg = std::make_shared<WSMessageClose>(status_code);
@@ -587,8 +596,9 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
587596

588597
default:
589598
_inFrame.msg = std::make_shared<WSMessageContainer<std::vector<uint8_t>>>(static_cast<WSFrameType_t>(opcode), bodylen);
590-
// copy data
591-
memcpy(_inFrame.msg->getData(), data + offset, bodylen);
599+
// copy as much data as it is available in current sock buff
600+
// todo: for now assume object will consume all the payload provided
601+
_inFrame.msg->addChunk(data + offset, bodylen, 0);
592602

593603
}
594604
offset += bodylen;
@@ -605,10 +615,12 @@ WSocketClient::err_t WSocketClient::enqueueMessage(WSMessagePtr mptr){
605615
return err_t::disconnected;
606616

607617
if (_messageQueueOut.size() < _max_qcap){
608-
#ifdef ESP32
609-
std::lock_guard<std::recursive_mutex> lock(_outQlock);
610-
#endif
611-
_messageQueueOut.emplace_back( std::move(mptr) );
618+
{
619+
#ifdef ESP32
620+
std::lock_guard<std::recursive_mutex> lock(_outQlock);
621+
#endif
622+
_messageQueueOut.emplace_back( std::move(mptr) );
623+
}
612624
_clientSend();
613625
return err_t::ok;
614626
}
@@ -663,7 +675,7 @@ void WSocketClient::_sendEvent(event_t e){
663675
}
664676

665677
void WSocketClient::_keepalive(){
666-
if (millis() - _lastPong > _keepAlivePeriod){
678+
if (_keepAlivePeriod && (millis() - _lastPong > _keepAlivePeriod)){
667679
enqueueMessage(std::make_shared< WSMessageContainer<std::string> >(WSFrameType_t::pong, true, "WSocketClient Pong" ));
668680
_lastPong = millis();
669681
}
@@ -792,7 +804,7 @@ void WSocketServer::_purgeClients(){
792804
log_d("purging clients");
793805
std::lock_guard lock(clientslock);
794806
// purge clients that are disconnected and with all messages consumed
795-
std::erase_if(_clients, [](const WSocketClient& c){ return (c.connection() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); });
807+
_clients.remove_if([](const WSocketClient& c){ return (c.connection() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); });
796808
}
797809

798810
size_t WSocketServer::activeClientsCount() const {

0 commit comments

Comments
 (0)