@@ -164,62 +164,67 @@ bool AsyncWebSocketMessageBuffer::reserve(size_t size) {
164164AsyncWebSocketMessage::AsyncWebSocketMessage (AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask)
165165 : _WSbuffer{buffer}, _opcode(opcode & 0x07 ), _mask{mask}, _status{_WSbuffer ? WS_MSG_SENDING : WS_MSG_ERROR} {}
166166
167- void AsyncWebSocketMessage::ack (size_t len, uint32_t time) {
167+ size_t AsyncWebSocketMessage::ack (size_t len, uint32_t time) {
168168 (void )time;
169- _acked += len;
169+ const size_t pending = std::min (len, _ack - _acked);
170+ _acked += pending;
170171 if (_sent >= _WSbuffer->size () && _acked >= _ack) {
171172 _status = WS_MSG_SENT;
172173 }
173- // ets_printf("A: %u\n", len);
174+ async_ws_log_v (" msg code: %" PRIu8 " , ack: %u/%u, remain=%u/%u, status: %d" , _opcode, _acked, _ack, len - pending, len, static_cast <int >(_status));
175+ return len - pending;
174176}
175177
176178size_t AsyncWebSocketMessage::send (AsyncClient *client) {
177179 if (!client) {
180+ async_ws_log_v (" No client" );
178181 return 0 ;
179182 }
180183
181184 if (_status != WS_MSG_SENDING) {
185+ async_ws_log_v (" C[%" PRIu16 " ] Wrong status: got: %d, expected: %d" , client->remotePort (), static_cast <int >(_status), static_cast <int >(WS_MSG_SENDING));
182186 return 0 ;
183187 }
184- if (_acked < _ack) {
185- return 0 ;
186- }
188+
187189 if (_sent == _WSbuffer->size ()) {
188190 if (_acked == _ack) {
189191 _status = WS_MSG_SENT;
190192 }
193+ async_ws_log_v (" C[%" PRIu16 " ] Already sent: %u/%u" , client->remotePort (), _sent, _WSbuffer->size ());
191194 return 0 ;
192195 }
193196 if (_sent > _WSbuffer->size ()) {
194197 _status = WS_MSG_ERROR;
195- // ets_printf("E : %u > %u\n ", _sent, _WSbuffer->length ());
198+ async_ws_log_v ( " C[% " PRIu16 " ] Error, sent more : %u/%u " , client-> remotePort (), _sent, _WSbuffer->size ());
196199 return 0 ;
197200 }
198201
199202 size_t toSend = _WSbuffer->size () - _sent;
200- size_t window = webSocketSendFrameWindow (client);
203+ const size_t window = webSocketSendFrameWindow (client);
201204
202- if (window < toSend) {
203- toSend = window;
205+ // not enough space in lwip buffer ?
206+ if (!window) {
207+ async_ws_log_v (" C[%" PRIu16 " ] No space left to send more data: acked: %u, sent: %u, remaining: %u" , client->remotePort (), _acked, _sent, toSend);
208+ return 0 ;
204209 }
205210
211+ toSend = std::min (toSend, window);
212+
206213 _sent += toSend;
207214 _ack += toSend + ((toSend < 126 ) ? 2 : 4 ) + (_mask * 4 );
208215
209- // ets_printf("W: %u %u\n", _sent - toSend, toSend);
210-
211216 bool final = (_sent == _WSbuffer->size ());
212217 uint8_t *dPtr = (uint8_t *)(_WSbuffer->data () + (_sent - toSend));
213218 uint8_t opCode = (toSend && _sent == toSend) ? _opcode : (uint8_t )WS_CONTINUATION;
214219
215220 size_t sent = webSocketSendFrame (client, final , opCode, _mask, dPtr, toSend);
216221 _status = WS_MSG_SENDING;
217222 if (toSend && sent != toSend) {
218- // ets_printf("E: %u != %u\n", toSend, sent);
219223 _sent -= (toSend - sent);
220224 _ack -= (toSend - sent);
221225 }
222- // ets_printf("S: %u %u\n", _sent, sent);
226+
227+ async_ws_log_v (" C[%" PRIu16 " ] Sent %u/%u, ack: %u/%u, final: %d" , client->remotePort (), _sent, _WSbuffer->size (), _acked, _ack, final );
223228 return sent;
224229}
225230
@@ -328,7 +333,12 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
328333 }
329334
330335 if (len && !_messageQueue.empty ()) {
331- _messageQueue.front ().ack (len, time);
336+ for (auto &msg : _messageQueue) {
337+ len = msg.ack (len, time);
338+ if (len == 0 ) {
339+ break ;
340+ }
341+ }
332342 }
333343
334344 _clearQueue ();
@@ -362,11 +372,52 @@ void AsyncWebSocketClient::_runQueue() {
362372
363373 _clearQueue ();
364374
365- if (!_controlQueue.empty () && (_messageQueue.empty () || _messageQueue.front ().betweenFrames ())
366- && webSocketSendFrameWindow (_client) > (size_t )(_controlQueue.front ().len () - 1 )) {
367- _controlQueue.front ().send (_client);
368- } else if (!_messageQueue.empty () && _messageQueue.front ().betweenFrames () && webSocketSendFrameWindow (_client)) {
369- _messageQueue.front ().send (_client);
375+ size_t space = webSocketSendFrameWindow (_client);
376+
377+ if (space) {
378+ // control frames have priority over message frames
379+ // we can send a control frame if:
380+ // - there is no message frame in the queue, or the first message frame is between frames (all bytes sent are acked)
381+ // - the control frame is not finished (not sent yet)
382+ // - there is enough space to send the control frame (control frames are small, at most 129 bytes, so we can assume that if there is space to send it, it can be sent in one go)
383+ if (_messageQueue.empty () || _messageQueue.front ().betweenFrames ()) {
384+ for (auto &ctrl : _controlQueue) {
385+ if (ctrl.finished ()) {
386+ continue ;
387+ }
388+ if (space > (size_t )(ctrl.len () - 1 )) {
389+ async_ws_log_v (" WS[%" PRIu32 " ] Sending control frame: %" PRIu8 " , len: %" PRIu8, _clientId, ctrl.opcode (), ctrl.len ());
390+ ctrl.send (_client);
391+ space = webSocketSendFrameWindow (_client);
392+ }
393+ }
394+ }
395+
396+ // then we can send message frames if there is space
397+ if (space) {
398+ for (auto &msg : _messageQueue) {
399+ if (msg._remainingBytesToSend ()) {
400+ async_ws_log_v (
401+ " WS[%" PRIu32 " ] Send message fragment: %u/%u, acked: %u/%u" , _clientId, msg._remainingBytesToSend (), msg._sent + msg._remainingBytesToSend (),
402+ msg._acked , msg._ack
403+ );
404+ // will use all the remaining space, or all the remaining bytes to send, whichever is smaller
405+ msg.send (_client);
406+ space = webSocketSendFrameWindow (_client);
407+
408+ // If we haven't finished sending this message, we must stop here to preserve WebSocket ordering.
409+ // We can only pipeline subsequent messages if the current one is fully passed to TCP buffer.
410+ if (msg._remainingBytesToSend ()) {
411+ break ;
412+ }
413+ }
414+
415+ // not enough space for another message
416+ if (!space) {
417+ break ;
418+ }
419+ }
420+ }
370421 }
371422}
372423
0 commit comments