Skip to content

Commit 9a4bcb1

Browse files
committed
WSMessageStaticBlob to zero-copy send large data
A message that carries a pointer to arbitrary blob of data it could be used to send large blocks of memory in zero-copy mode, i.e. avoid intermediary buffering copies. + bunch of bugfixes for dataflow control, frame assembly, message containers methods
1 parent abc9aa9 commit 9a4bcb1

2 files changed

Lines changed: 146 additions & 64 deletions

File tree

src/AsyncWSocket.cpp

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
#include "AsyncWSocket.h"
77
#include "literals.h"
88

9+
#define WS_MAX_HEADER_SIZE 16
10+
911
constexpr const char WS_STR_CONNECTION[] = "Connection";
1012
constexpr const char WS_STR_VERSION[] = "Sec-WebSocket-Version";
1113
constexpr const char WS_STR_KEY[] = "Sec-WebSocket-Key";
1214
constexpr const char WS_STR_PROTOCOL[] = "Sec-WebSocket-Protocol";
1315

14-
// WSockServer worke task
16+
// WSockServer worker task
1517
constexpr const char WS_SRV_TASK[] = "WSSrvtask";
1618

1719
// cast enum class to uint (for bit set)
@@ -138,6 +140,7 @@ size_t webSocketSendHeader(AsyncClient *client, WSMessageFrame& frame) {
138140
}
139141

140142
size_t sent = client->add((const char*)buf, headLen);
143+
//log_d("send ws header, hdr size:%u, body len:%u", headLen, frame.len);
141144
// return size of a header added or 0 if any error
142145
return sent == headLen ? sent : 0;
143146
}
@@ -184,7 +187,7 @@ WSocketClient::~WSocketClient() {
184187
//#ifdef NOTHING
185188
// callback acknowledges sending pieces of data for outgoing frame
186189
void WSocketClient::_clientSend(size_t acked_bytes){
187-
if (!_client || _connection == conn_state_t::disconnected || !_client->space())
190+
if (!_client || _connection == conn_state_t::disconnected)
188191
return;
189192

190193
/*
@@ -201,14 +204,13 @@ void WSocketClient::_clientSend(size_t acked_bytes){
201204
// Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
202205
// That way we could balance on having half the buffer in-flight while another half is filling up and minimizing events in asynctcp's Q
203206
if (acked_bytes){
204-
// if it's the ack call from AsyncTCP - wait for lock!
205-
lock.lock();
206-
log_d("_clientSend, ack:%u/%u, space:%u", acked_bytes, _in_flight, _client ? _client->space() : 0);
207-
207+
auto sock_space = _client->space();
208+
//log_d("ack:%u/%u, sock space:%u", acked_bytes, _in_flight, sock_space);
208209
_in_flight -= std::min(acked_bytes, _in_flight);
209-
// return buffer credit on acked data
210-
++_in_flight_credit;
211-
log_d("infl:%u, credits:%u, conn state:%u", _in_flight, _in_flight_credit, _connection);
210+
if (!sock_space){
211+
return;
212+
}
213+
//log_d("infl:%u, credits:%u", _in_flight, _in_flight_credit);
212214
// check if we were waiting to ack our disconnection frame
213215
if (!_in_flight && (_connection == conn_state_t::disconnecting)){
214216
log_d("closing tcp-conn");
@@ -217,25 +219,36 @@ void WSocketClient::_clientSend(size_t acked_bytes){
217219
_client->close();
218220
return;
219221
}
222+
223+
// if it's the ack call from AsyncTCP - wait for lock!
224+
lock.lock();
220225

221226
} else {
222227
// if there is no acked data - just quit if won't be able to grab a lock, we are already sending something
223228
if (!lock.try_lock())
224229
return;
230+
231+
auto sock_space = _client->space();
232+
log_d("no ack infl:%u, space:%u, data pending:%u", _in_flight, sock_space, (uint32_t)(_outFrame.len - _outFrame.index));
233+
//
234+
if (!sock_space)
235+
return;
225236
}
226237

227-
if (_in_flight > _client->space() || !_in_flight_credit) {
228-
log_d("defer ws send call, in-flight:%u/%u, credit:%u", _in_flight, _client->space(), _in_flight_credit);
238+
// ignore the call if available sock space is smaller then acked data and we won't be able to fit message's ramainder there
239+
// this will reduce AsyncTCP's event Q pressure under heavy load
240+
if ((_outFrame.msg && (_outFrame.len - _outFrame.index > _client->space())) && (_client->space() < acked_bytes) ){
241+
log_d("defer ws send call, in-flight:%u/%u", _in_flight, _client->space());
229242
return;
230243
}
231244

232-
// no message in transit, try to evict one from a Q
233-
if (!_outFrame.msg){
245+
// no message in transit and we have enough space in sockbuff - try to evict new msg from a Q
246+
if (!_outFrame.msg && _client->space() > WS_MAX_HEADER_SIZE){
234247
if (_evictOutQueue()){
235-
// generate header and add to the socket buffer
248+
// generate header and add to the socket buffer. todo: check returned size?
236249
_in_flight += webSocketSendHeader(_client, _outFrame);
237250
} else
238-
return; // nothing to send now
251+
return; // nothing to send now
239252
}
240253

241254
// if there is a pending _outFrame - send the data from there
@@ -250,14 +263,15 @@ void WSocketClient::_clientSend(size_t acked_bytes){
250263
_outFrame.index += payload_pend;
251264
_outFrame.chunk_offset += payload_pend;
252265
_in_flight += payload_pend;
266+
//size_t l = _outFrame.len;
267+
//log_d("add to sock:%u, fidx:%u/%u, infl:%u", payload_pend, (uint32_t)_outFrame.index, (uint32_t)_outFrame.len, _in_flight);
253268
}
254269

255270
if (_outFrame.index == _outFrame.len){
256271
// if we complete writing entire message, send the frame right away
257272
// increment in-flight counter and take the credit
258273
if (!_client->send())
259274
_client->abort();
260-
--_in_flight_credit;
261275

262276
if (_outFrame.msg->type == WSFrameType_t::close){
263277
// if we just sent close frame, then change client state and purge out queue, we won't transmit anything from now on
@@ -274,8 +288,8 @@ void WSocketClient::_clientSend(size_t acked_bytes){
274288
// no use case for this for now
275289
//_sendEvent(event_t::msgSent);
276290

277-
// if there are free in-flight credits try to pull next msg from Q
278-
if (_in_flight_credit && _evictOutQueue()){
291+
// if there are free in-flight credits and buffer space available try to pull next msg from Q
292+
if (_client->space() > WS_MAX_HEADER_SIZE && _evictOutQueue()){
279293
// generate header and add to the socket buffer
280294
_in_flight += webSocketSendHeader(_client, _outFrame);
281295
continue;
@@ -284,12 +298,10 @@ void WSocketClient::_clientSend(size_t acked_bytes){
284298
}
285299
}
286300

287-
if (!_client->space()){
301+
if (_client->space() <= WS_MAX_HEADER_SIZE){
288302
// we have exhausted socket buffer, send it and quit
289303
if (!_client->send())
290304
_client->abort();
291-
// take in-flight credit
292-
--_in_flight_credit;
293305
return;
294306
}
295307

@@ -300,8 +312,6 @@ void WSocketClient::_clientSend(size_t acked_bytes){
300312
if (next_chunk_size == 0){
301313
// chunk is not ready yet, need to async wait and return for data later, we quit here and reevaluate on next ack or poll event from AsyncTCP
302314
if (!_client->send()) _client->abort();
303-
// take in-flight credit
304-
--_in_flight_credit;
305315
return;
306316
} else if (next_chunk_size == -1){
307317
// something is wrong! there would be no more chunked data but the message has not reached it's full size yet, can do nothing but close the coonections
@@ -318,7 +328,7 @@ void WSocketClient::_clientSend(size_t acked_bytes){
318328

319329
bool WSocketClient::_evictOutQueue(){
320330
// check if we have something in the Q and enough sock space to send a header at least
321-
if (_messageQueueOut.size() && _client->space() > 16 ){
331+
if (_messageQueueOut.size() && _client->space() > WS_MAX_HEADER_SIZE ){
322332
{
323333
#ifdef ESP32
324334
std::unique_lock<std::recursive_mutex> lockout(_outQlock);
@@ -372,8 +382,8 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
372382
return;
373383
}
374384
// receiving a new frame from here
375-
data += framelen;
376-
plen -= std::min(framelen, plen); // safety measure from bad parsing, we can't deduct more than sockbuff size
385+
data += std::min(framelen, plen); // safety measure from bad parsing, we can't deduct more than sockbuff size
386+
plen -= std::min(framelen, plen);
377387
} else {
378388
// continuation of existing frame
379389
size_t payload_len = std::min(static_cast<size_t>(_inFrame.len - _inFrame.index), plen);
@@ -389,7 +399,7 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
389399

390400
// if we got whole frame now
391401
if (_inFrame.index == _inFrame.len){
392-
Serial.printf("_onData, cmplt msg len:%lu\n", _inFrame.len);
402+
log_d("_onData, cmplt msg len:%u", (uint32_t)_inFrame.len);
393403

394404
if (_inFrame.msg->getStatusCode() == 1007){
395405
// this is a dummy/corrupted message, we discard it
@@ -403,7 +413,7 @@ void WSocketClient::_onData(void *pbuf, size_t plen) {
403413
if (_connection == conn_state_t::disconnecting){
404414
log_d("recv close ack");
405415
// if it was ws-close ack - we can close TCP connection
406-
_connection == conn_state_t::disconnected;
416+
_connection = conn_state_t::disconnected;
407417
// normally we should call close() here and wait for other side also close tcp connection with TCP-FIN, but
408418
// for various reasons ws clients could linger connection when received TCP-FIN not closing it from the app side (even after
409419
// two side ws-close exchange, i.e. websocat, websocket-client)
@@ -497,7 +507,7 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
497507
offset += 8;
498508
}
499509

500-
log_d("new hdr, sock data:%u, msg body size:%u", len, frame.len);
510+
log_d("recv hdr, sock data:%u, msg body size:%u", len, frame.len);
501511

502512
// if ws.close() is called, Safari sends a close frame with plen 2 and masked bit set. We must not try to read mask key from beyond packet size
503513
if (masked && len >= offset + 4) {
@@ -517,7 +527,7 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
517527
} else {
518528
if (frame.len > _max_msgsize){
519529
// message is bigger than we are allowed to accept, create a dummy container for it, it will just discard all incoming data
520-
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode, 1007)); // code 'Invalid frame payload data'
530+
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode), 1007); // code 'Invalid frame payload data'
521531
offset += bodylen;
522532
_inFrame.index = bodylen;
523533
return {offset, 1009}; // code 'message too big'
@@ -529,14 +539,14 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
529539
switch (_overflow_policy){
530540
case overflow_t::discard :
531541
// silently discard incoming message
532-
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode, 1007)); // code 'Invalid frame payload data'
542+
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode), 1007); // code 'Invalid frame payload data'
533543
offset += bodylen;
534544
_inFrame.index = bodylen;
535545
return {offset, 0};
536546

537547
case overflow_t::disconnect : {
538548
// discard incoming message and send close message
539-
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode, 1007)); // code 'Invalid frame payload data'
549+
_inFrame.msg = std::make_shared<WSMessageDummy>(static_cast<WSFrameType_t>(opcode), 1007); // code 'Invalid frame payload data'
540550
#ifdef ESP32
541551
std::lock_guard<std::recursive_mutex> lock(_inQlock);
542552
#endif
@@ -573,11 +583,9 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
573583

574584
case WSFrameType_t::close : {
575585
uint16_t status_code = ntohs(*(uint16_t*)(data + offset));
576-
offset += 2;
577586
if (bodylen > 2){
578-
bodylen -= 2;
579587
// create a text message container consuming as much data as possible from current payload
580-
_inFrame.msg = std::make_shared<WSMessageClose>(status_code, data + offset, bodylen);
588+
_inFrame.msg = std::make_shared<WSMessageClose>(status_code, data + offset + 2, bodylen -2); // deduce 2 bytes of message code
581589
} else {
582590
// must be close message w/o body
583591
_inFrame.msg = std::make_shared<WSMessageClose>(status_code);
@@ -588,7 +596,7 @@ std::pair<size_t, uint16_t> WSocketClient::_mkNewFrame(char* data, size_t len, W
588596
default:
589597
_inFrame.msg = std::make_shared<WSMessageContainer<std::vector<uint8_t>>>(static_cast<WSFrameType_t>(opcode), bodylen);
590598
// copy data
591-
memcpy(_inFrame.msg->getData(), data + offset, bodylen);
599+
memcpy(_inFrame.msg->getBuffer(), data + offset, bodylen);
592600

593601
}
594602
offset += bodylen;
@@ -605,10 +613,12 @@ WSocketClient::err_t WSocketClient::enqueueMessage(WSMessagePtr mptr){
605613
return err_t::disconnected;
606614

607615
if (_messageQueueOut.size() < _max_qcap){
608-
#ifdef ESP32
609-
std::lock_guard<std::recursive_mutex> lock(_outQlock);
610-
#endif
611-
_messageQueueOut.emplace_back( std::move(mptr) );
616+
{
617+
#ifdef ESP32
618+
std::lock_guard<std::recursive_mutex> lock(_outQlock);
619+
#endif
620+
_messageQueueOut.emplace_back( std::move(mptr) );
621+
}
612622
_clientSend();
613623
return err_t::ok;
614624
}
@@ -663,7 +673,7 @@ void WSocketClient::_sendEvent(event_t e){
663673
}
664674

665675
void WSocketClient::_keepalive(){
666-
if (millis() - _lastPong > _keepAlivePeriod){
676+
if (_keepAlivePeriod && (millis() - _lastPong > _keepAlivePeriod)){
667677
enqueueMessage(std::make_shared< WSMessageContainer<std::string> >(WSFrameType_t::pong, true, "WSocketClient Pong" ));
668678
_lastPong = millis();
669679
}
@@ -792,7 +802,7 @@ void WSocketServer::_purgeClients(){
792802
log_d("purging clients");
793803
std::lock_guard lock(clientslock);
794804
// purge clients that are disconnected and with all messages consumed
795-
std::erase_if(_clients, [](const WSocketClient& c){ return (c.connection() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); });
805+
_clients.remove_if([](const WSocketClient& c){ return (c.connection() == WSocketClient::conn_state_t::disconnected && !c.inQueueSize() ); });
796806
}
797807

798808
size_t WSocketServer::activeClientsCount() const {

0 commit comments

Comments
 (0)