Skip to content

Commit 406147f

Browse files
committed
Fix broken WebSocket defragmentation
Fix #382 WebSocket defragmentation was broken after a fix for Safari in PR #353
1 parent fa5d554 commit 406147f

2 files changed

Lines changed: 116 additions & 51 deletions

File tree

examples/WebSocket/WebSocket.ino

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,51 @@ void setup() {
101101

102102
} else if (type == WS_EVT_DATA) {
103103
AwsFrameInfo *info = (AwsFrameInfo *)arg;
104-
Serial.printf("index: %" PRIu64 ", len: %" PRIu64 ", final: %" PRIu8 ", opcode: %" PRIu8 "\n", info->index, info->len, info->final, info->opcode);
105-
String msg = "";
104+
Serial.printf(
105+
"index: %" PRIu64 ", len: %" PRIu64 ", final: %" PRIu8 ", opcode: %" PRIu8 ", framelen: %d\n", info->index, info->len, info->final, info->opcode, len
106+
);
107+
108+
// complete frame
106109
if (info->final && info->index == 0 && info->len == len) {
107110
if (info->opcode == WS_TEXT) {
108111
data[len] = 0;
109112
Serial.printf("ws text: %s\n", (char *)data);
110113
client->ping();
111114
}
115+
116+
} else {
117+
// incomplete frame
118+
if (info->index == 0) {
119+
if (info->num == 0) {
120+
Serial.printf(
121+
"ws[%s][%" PRIu32 "] [%" PRIu32 "] MSG START %s\n", server->url(), client->id(), info->num, (info->message_opcode == WS_TEXT) ? "text" : "binary"
122+
);
123+
}
124+
Serial.printf("ws[%s][%" PRIu32 "] [%" PRIu32 "] FRAME START len=%" PRIu64 "\n", server->url(), client->id(), info->num, info->len);
125+
}
126+
127+
Serial.printf(
128+
"ws[%s][%" PRIu32 "] [%" PRIu32 "] FRAME %s, index=%" PRIu64 ", len=%" PRIu32 "]: ", server->url(), client->id(), info->num,
129+
(info->message_opcode == WS_TEXT) ? "text" : "binary", info->index, (uint32_t)len
130+
);
131+
132+
if (info->message_opcode == WS_TEXT) {
133+
data[len] = 0;
134+
Serial.printf("%s\n", (char *)data);
135+
} else {
136+
for (size_t i = 0; i < len; i++) {
137+
Serial.printf("%02x ", data[i]);
138+
}
139+
Serial.printf("\n");
140+
}
141+
142+
if ((info->index + len) == info->len) {
143+
Serial.printf("ws[%s][%" PRIu32 "] [%" PRIu32 "] FRAME END\n", server->url(), client->id(), info->num);
144+
145+
if (info->final) {
146+
Serial.printf("ws[%s][%" PRIu32 "] [%" PRIu32 "] MSG END\n", server->url(), client->id(), info->num);
147+
}
148+
}
112149
}
113150
}
114151
});
@@ -131,7 +168,7 @@ void setup() {
131168
}
132169

133170
static uint32_t lastWS = 0;
134-
static uint32_t deltaWS = 100;
171+
static uint32_t deltaWS = 500;
135172

136173
static uint32_t lastHeap = 0;
137174

src/AsyncWebSocket.cpp

Lines changed: 76 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
#include <memory>
2626
#include <utility>
2727

28+
#define STATE_FRAME_START 0
29+
#define STATE_FRAME_MASK 1
30+
#define STATE_FRAME_DATA 2
31+
2832
using namespace asyncsrv;
2933

3034
size_t webSocketSendFrameWindow(AsyncClient *client) {
@@ -226,8 +230,8 @@ const char *AWSC_PING_PAYLOAD = "ESPAsyncWebServer-PING";
226230
const size_t AWSC_PING_PAYLOAD_LEN = 22;
227231

228232
AsyncWebSocketClient::AsyncWebSocketClient(AsyncClient *client, AsyncWebSocket *server)
229-
: _client(client), _server(server), _clientId(_server->_getNextId()), _status(WS_CONNECTED), _pstate(0), _lastMessageTime(millis()), _keepAlivePeriod(0),
230-
_tempObject(NULL) {
233+
: _client(client), _server(server), _clientId(_server->_getNextId()), _status(WS_CONNECTED), _pstate(STATE_FRAME_START), _lastMessageTime(millis()),
234+
_keepAlivePeriod(0), _tempObject(NULL) {
231235

232236
_client->setRxTimeout(0);
233237
_client->onError(
@@ -508,8 +512,11 @@ void AsyncWebSocketClient::_onDisconnect() {
508512
void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
509513
_lastMessageTime = millis();
510514
uint8_t *data = (uint8_t *)pbuf;
515+
511516
while (plen > 0) {
512-
if (!_pstate) {
517+
async_ws_log_v("WS[%" PRIu32 "]: _onData: plen=%" PRIu32 ", _pstate=%" PRIu8 ", _status=%" PRIu8, _clientId, plen, _pstate, static_cast<uint8_t>(_status));
518+
519+
if (_pstate == STATE_FRAME_START) {
513520
const uint8_t *fdata = data;
514521

515522
_pinfo.index = 0;
@@ -518,13 +525,6 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
518525
_pinfo.masked = ((fdata[1] & 0x80) != 0) ? 1 : 0;
519526
_pinfo.len = fdata[1] & 0x7F;
520527

521-
// async_ws_log_w("WS[%" PRIu32 "]: _onData: %" PRIu32, _clientId, plen);
522-
// async_ws_log_w("WS[%" PRIu32 "]: _status = %" PRIu32, _clientId, _status);
523-
// async_ws_log_w(
524-
// "WS[%" PRIu32 "]: _pinfo: index: %" PRIu64 ", final: %" PRIu8 ", opcode: %" PRIu8 ", masked: %" PRIu8 ", len: %" PRIu64, _clientId, _pinfo.index,
525-
// _pinfo.final, _pinfo.opcode, _pinfo.masked, _pinfo.len
526-
// );
527-
528528
data += 2;
529529
plen -= 2;
530530

@@ -541,47 +541,52 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
541541
}
542542
}
543543

544-
if (_pinfo.masked) {
545-
// Read mask bytes (may be fragmented across packets in Safari)
546-
size_t mask_offset = 0;
547-
548-
// If we're resuming from a previous fragmented read, check _pinfo.index
549-
if (_pstate == 1 && _pinfo.index < 4) {
550-
mask_offset = _pinfo.index;
551-
}
552-
553-
// Read as many mask bytes as available
554-
while (mask_offset < 4 && plen > 0) {
555-
_pinfo.mask[mask_offset++] = *data++;
556-
plen--;
557-
}
558-
559-
// Check if we have all 4 mask bytes
560-
if (mask_offset < 4) {
561-
// Incomplete mask
562-
if (_pinfo.opcode == WS_DISCONNECT && plen == 0) {
563-
// Safari close frame edge case: masked bit set but no mask data
564-
// async_ws_log_w("WS[%" PRIu32 "]: close frame with incomplete mask, treating as unmasked", _clientId);
544+
async_ws_log_v(
545+
"WS[%" PRIu32 "]: _pinfo: index: %" PRIu64 ", final: %" PRIu8 ", opcode: %" PRIu8 ", masked: %" PRIu8 ", len: %" PRIu64, _clientId, _pinfo.index,
546+
_pinfo.final, _pinfo.opcode, _pinfo.masked, _pinfo.len
547+
);
548+
549+
// Handle fragmented mask data - Safari may split the 4-byte mask across multiple packets
550+
// _pinfo.masked is 1 if we need to start reading mask bytes
551+
// _pinfo.masked is 2, 3, or 4 if we have partially read the mask
552+
// _pinfo.masked is 5 if the mask is complete
553+
while (_pinfo.masked && _pstate <= STATE_FRAME_MASK && _pinfo.masked < 5) {
554+
// check if we have some data
555+
if (plen == 0) {
556+
// Safari close frame edge case: masked bit set but no mask data
557+
if (_pinfo.opcode == WS_DISCONNECT) {
558+
async_ws_log_v("WS[%" PRIu32 "]: close frame with incomplete mask, treating as unmasked", _clientId);
565559
_pinfo.masked = 0;
566560
_pinfo.index = 0;
567-
} else {
568-
// Wait for more data
569-
// async_ws_log_w("WS[%" PRIu32 "]: waiting for more mask data: read=%zu/4", _clientId, mask_offset);
570-
_pinfo.index = mask_offset; // Save progress
571-
_pstate = 1;
572-
return;
561+
_pinfo.len = 0;
562+
_pstate = STATE_FRAME_START;
563+
break;
573564
}
574-
} else {
575-
// All mask bytes received
576-
// async_ws_log_w("WS[%" PRIu32 "]: mask complete", _clientId);
577-
_pinfo.index = 0; // Reset index for payload processing
565+
566+
//wait for more data
567+
_pstate = STATE_FRAME_MASK;
568+
async_ws_log_v("WS[%" PRIu32 "]: waiting for more mask data: read=%" PRIu8 "/4", _clientId, _pinfo.masked - 1);
569+
return;
578570
}
571+
572+
// accumulate mask bytes
573+
_pinfo.mask[_pinfo.masked - 1] = data[0];
574+
data += 1;
575+
plen -= 1;
576+
_pinfo.masked++;
579577
}
580578

581-
const size_t datalen = std::min((size_t)(_pinfo.len - _pinfo.index), plen);
582-
const auto datalast = data[datalen];
579+
// all mask bytes read if we were reading them
580+
_pstate = STATE_FRAME_DATA;
583581

584-
// async_ws_log_w("WS[%" PRIu32 "]: _processing data: datalen=%" PRIu32 ", plen=%" PRIu32, _clientId, datalen, plen);
582+
// restore masked to 1 for backward compatibility
583+
if (_pinfo.masked >= 5) {
584+
async_ws_log_v("WS[%" PRIu32 "]: mask read complete", _clientId);
585+
_pinfo.masked = 1;
586+
}
587+
588+
const size_t datalen = std::min((size_t)(_pinfo.len - _pinfo.index), plen);
589+
const auto datalast = datalen ? data[datalen] : 0;
585590

586591
if (_pinfo.masked) {
587592
for (size_t i = 0; i < datalen; i++) {
@@ -590,22 +595,30 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
590595
}
591596

592597
if ((datalen + _pinfo.index) < _pinfo.len) {
593-
_pstate = 1;
598+
_pstate = STATE_FRAME_DATA;
599+
async_ws_log_v("WS[%" PRIu32 "]: processing next fragment index=%" PRIu64 ", len=%" PRIu32 "", _clientId, _pinfo.index, (uint32_t)datalen);
594600

595601
if (_pinfo.index == 0) {
596602
if (_pinfo.opcode) {
597603
_pinfo.message_opcode = _pinfo.opcode;
598604
_pinfo.num = 0;
599605
}
600606
}
607+
601608
if (datalen > 0) {
602609
_server->_handleEvent(this, WS_EVT_DATA, (void *)&_pinfo, data, datalen);
603610
}
604611

612+
// track index for next fragment
605613
_pinfo.index += datalen;
614+
606615
} else if ((datalen + _pinfo.index) == _pinfo.len) {
607-
_pstate = 0;
616+
_pstate = STATE_FRAME_START;
617+
async_ws_log_v("WS[%" PRIu32 "]: processing final fragment index=%" PRIu64 ", len=%" PRIu32 "", _clientId, _pinfo.index, (uint32_t)datalen);
618+
608619
if (_pinfo.opcode == WS_DISCONNECT) {
620+
async_ws_log_v("WS[%" PRIu32 "]: processing disconnect", _clientId);
621+
609622
if (datalen) {
610623
uint16_t reasonCode = (uint16_t)(data[0] << 8) + data[1];
611624
char *reasonString = (char *)(data + 2);
@@ -625,24 +638,39 @@ void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
625638
}
626639
_queueControl(WS_DISCONNECT, data, datalen);
627640
}
641+
628642
} else if (_pinfo.opcode == WS_PING) {
643+
async_ws_log_v("WS[%" PRIu32 "]: processing ping", _clientId);
629644
_server->_handleEvent(this, WS_EVT_PING, NULL, NULL, 0);
630645
_queueControl(WS_PONG, data, datalen);
646+
631647
} else if (_pinfo.opcode == WS_PONG) {
648+
async_ws_log_v("WS[%" PRIu32 "]: processing pong", _clientId);
632649
if (datalen != AWSC_PING_PAYLOAD_LEN || memcmp(AWSC_PING_PAYLOAD, data, AWSC_PING_PAYLOAD_LEN) != 0) {
633650
_server->_handleEvent(this, WS_EVT_PONG, NULL, NULL, 0);
634651
}
652+
635653
} else if (_pinfo.opcode < WS_DISCONNECT) { // continuation or text/binary frame
654+
async_ws_log_v("WS[%" PRIu32 "]: processing data frame num=%" PRIu32 "", _clientId, _pinfo.num);
636655
_server->_handleEvent(this, WS_EVT_DATA, (void *)&_pinfo, data, datalen);
637656
if (_pinfo.final) {
638657
_pinfo.num = 0;
639658
} else {
640659
_pinfo.num += 1;
641660
}
642661
}
662+
643663
} else {
644-
// async_ws_log_w("frame error: len: %u, index: %llu, total: %llu\n", datalen, _pinfo.index, _pinfo.len);
645-
// what should we do?
664+
// unexpected frame error, close connection
665+
_pstate = STATE_FRAME_START;
666+
667+
async_ws_log_v("frame error: len: %u, index: %llu, total: %llu\n", datalen, _pinfo.index, _pinfo.len);
668+
669+
_status = WS_DISCONNECTING;
670+
if (_client) {
671+
_client->ackLater();
672+
}
673+
_queueControl(WS_DISCONNECT, data, datalen);
646674
break;
647675
}
648676

0 commit comments

Comments
 (0)