Skip to content

Commit 293944d

Browse files
broddomathieucarbou
authored andcommitted
Increase WebSocket throughput by reworking ack handling
Enable processing of cumulative ACKs across multiple queued messages Larger window size for reduced fragmentation Update src/AsyncWebSocket.cpp Co-authored-by: Mathieu Carbou <mathieu.carbou@gmail.com> Revert threshold to 9 improve readability Fix loop exit condition Co-authored-by: Mathieu Carbou <mathieu.carbou@gmail.com> Replace public sent() with private _remainingBytesToSend() Leverage _remainingBytesToSend() and tidy messageQueue sending
1 parent 8aefa29 commit 293944d

2 files changed

Lines changed: 49 additions & 16 deletions

File tree

src/AsyncWebSocket.cpp

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,16 @@ bool AsyncWebSocketMessageBuffer::reserve(size_t size) {
164164
AsyncWebSocketMessage::AsyncWebSocketMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask)
165165
: _WSbuffer{buffer}, _opcode(opcode & 0x07), _mask{mask}, _status{_WSbuffer ? WS_MSG_SENDING : WS_MSG_ERROR} {}
166166

167-
void AsyncWebSocketMessage::ack(size_t len, uint32_t time) {
167+
size_t AsyncWebSocketMessage::ack(size_t len, uint32_t time) {
168168
(void)time;
169-
_acked += len;
170-
if (_sent >= _WSbuffer->size() && _acked >= _ack) {
169+
const size_t pending = _ack - _acked;
170+
const size_t received = std::min(len, pending);
171+
_acked += received;
172+
if (_sent >= _WSbuffer->size() && !pending) {
171173
_status = WS_MSG_SENT;
172174
}
173-
// ets_printf("A: %u\n", len);
175+
async_ws_log_v("status: %d, ack: %u/%u\n", static_cast<int>(_status), _acked, _ack);
176+
return len - received;
174177
}
175178

176179
size_t AsyncWebSocketMessage::send(AsyncClient *client) {
@@ -181,9 +184,7 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) {
181184
if (_status != WS_MSG_SENDING) {
182185
return 0;
183186
}
184-
if (_acked < _ack) {
185-
return 0;
186-
}
187+
187188
if (_sent == _WSbuffer->size()) {
188189
if (_acked == _ack) {
189190
_status = WS_MSG_SENT;
@@ -197,11 +198,14 @@ size_t AsyncWebSocketMessage::send(AsyncClient *client) {
197198
}
198199

199200
size_t toSend = _WSbuffer->size() - _sent;
200-
size_t window = webSocketSendFrameWindow(client);
201+
const size_t window = webSocketSendFrameWindow(client);
201202

202-
if (window < toSend) {
203-
toSend = window;
203+
// not enough space in lwip buffer ?
204+
if (!window) {
205+
return 0;
204206
}
207+
208+
toSend = std::min(toSend, window);
205209

206210
_sent += toSend;
207211
_ack += toSend + ((toSend < 126) ? 2 : 4) + (_mask * 4);
@@ -328,14 +332,20 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
328332
}
329333

330334
if (len && !_messageQueue.empty()) {
331-
_messageQueue.front().ack(len, time);
335+
for (auto &msg : _messageQueue) {
336+
len = msg.ack(len, time);
337+
if (len == 0) {
338+
break;
339+
}
340+
}
332341
}
333342

334343
_clearQueue();
335344

336345
_runQueue();
337346
}
338347

348+
339349
void AsyncWebSocketClient::_onPoll() {
340350
if (!_client) {
341351
return;
@@ -362,11 +372,28 @@ void AsyncWebSocketClient::_runQueue() {
362372

363373
_clearQueue();
364374

365-
if (!_controlQueue.empty() && (_messageQueue.empty() || _messageQueue.front().betweenFrames())
375+
if (!_controlQueue.empty() && !_controlQueue.front().finished() && (_messageQueue.empty() || _messageQueue.front().betweenFrames())
366376
&& webSocketSendFrameWindow(_client) > (size_t)(_controlQueue.front().len() - 1)) {
367377
_controlQueue.front().send(_client);
368-
} else if (!_messageQueue.empty() && _messageQueue.front().betweenFrames() && webSocketSendFrameWindow(_client)) {
369-
_messageQueue.front().send(_client);
378+
}
379+
380+
if (webSocketSendFrameWindow(_client)) {
381+
for (auto &msg : _messageQueue) {
382+
if (msg._remainingBytesToSend()) {
383+
msg.send(_client);
384+
}
385+
386+
// If we haven't finished sending this message, we must stop here to preserve WebSocket ordering.
387+
// We can only pipeline subsequent messages if the current one is fully passed to TCP buffer.
388+
if (msg._remainingBytesToSend()) {
389+
break;
390+
}
391+
392+
// not enough space for another message
393+
if(!webSocketSendFrameWindow(_client)) {
394+
return;
395+
}
396+
}
370397
}
371398
}
372399

src/AsyncWebSocket.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,13 @@ class AsyncWebSocketMessageBuffer {
184184
};
185185

186186
class AsyncWebSocketMessage {
187+
friend AsyncWebSocketClient;
188+
187189
private:
190+
size_t _remainingBytesToSend() const {
191+
return _WSbuffer->size() - _sent;
192+
}
193+
188194
AsyncWebSocketSharedBuffer _WSbuffer;
189195
uint8_t _opcode{WS_TEXT};
190196
bool _mask{false};
@@ -202,8 +208,8 @@ class AsyncWebSocketMessage {
202208
bool betweenFrames() const {
203209
return _acked == _ack;
204210
}
205-
206-
void ack(size_t len, uint32_t time);
211+
212+
size_t ack(size_t len, uint32_t time);
207213
size_t send(AsyncClient *client);
208214
};
209215

0 commit comments

Comments
 (0)