SSE parser byte-level optimization, chunked decoding, auto-reconnect#9
SSE parser byte-level optimization, chunked decoding, auto-reconnect#9dustturtle merged 3 commits intomainfrom
Conversation
…edDecoder, auto-reconnect with Last-Event-ID, thread safety docs 1. SSEParser byte-level optimization: Rewrite to accumulate raw Data bytes, scan for line endings at byte level (0x0A/0x0D), and defer String conversion until a complete event boundary is reached. Reduces ARC overhead and CPU usage. 2. ChunkedDecoder: New incremental decoder for HTTP Transfer-Encoding: chunked byte streams. Strips hex-length + CRLF framing so SSE parser receives clean data. Handles partial chunks across TCP segments. 3. Auto-reconnect with Last-Event-ID: NWAsyncSocket now supports enableSSEAutoReconnect() which auto-reconnects on error disconnect, preserves lastEventId across reconnections, and notifies the delegate via willAutoReconnectWithLastEventId for seamless SSE resumption. 4. Thread safety documentation: Enhanced doc comments clarifying that socketQueue handles all I/O and parsing off the main thread, while delegateQueue delivers callbacks to the UI thread. Co-authored-by: dustturtle <2305214+dustturtle@users.noreply.github.com>
Co-authored-by: dustturtle <2305214+dustturtle@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds iOS-focused streaming improvements to NWAsyncSocket: more efficient SSE parsing, support for HTTP Transfer-Encoding: chunked streams (common behind proxies/CDNs), and an SSE auto-reconnect mechanism that preserves Last-Event-ID, along with updated thread-safety documentation.
Changes:
- Refactors
SSEParserto accumulate and scan at the byte level, deferringStringconversion to event dispatch. - Introduces
ChunkedDecoder(and tests) and integrates it into the socket read path viaenableChunkedDecoding(). - Adds SSE auto-reconnect state + delegate callback
socket(_:willAutoReconnectWithLastEventId:afterDelay:)and documents queueing/threading behavior.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| Sources/NWAsyncSocket/SSEParser.swift | Switches SSE parsing to byte-level buffering and field matching to reduce allocations. |
| Sources/NWAsyncSocket/ChunkedDecoder.swift | New incremental decoder to strip HTTP chunked framing before higher-level parsing. |
| Sources/NWAsyncSocket/NWAsyncSocket.swift | Wires chunked decoding into the read loop; adds auto-reconnect state machine and thread-safety docs. |
| Sources/NWAsyncSocket/NWAsyncSocketDelegate.swift | Adds an optional delegate callback for auto-reconnect with last event ID and delay. |
| Tests/NWAsyncSocketTests/ChunkedDecoderTests.swift | Adds unit tests covering chunk parsing across segment boundaries, extensions, and SSE payloads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public func decode(_ data: Data) -> Data { | ||
| buffer.append(data) | ||
| var output = Data() | ||
|
|
There was a problem hiding this comment.
decode(_:) always appends incoming bytes to buffer, even after the decoder reaches .complete. Since the .complete state breaks the loop without consuming or clearing buffer, any subsequent data (e.g., trailers after the 0-sized chunk, or accidental extra bytes) will be retained indefinitely and can grow memory unbounded. Consider short-circuiting when state == .complete (don’t append / immediately return empty) and/or clearing buffer when transitioning to .complete after the final chunk/trailers are consumed.
| // Skip the trailing CRLF. | ||
| if buffer[buffer.startIndex] == 0x0D | ||
| && buffer[buffer.index(after: buffer.startIndex)] == 0x0A { | ||
| buffer = Data(buffer.suffix(from: buffer.startIndex + 2)) | ||
| } |
There was a problem hiding this comment.
In .readingTrailer, if the next two bytes are not \r\n, the code still transitions back to .waitingForSize without consuming anything. This can cause the next size-line parse to include invalid trailer bytes and desynchronize the state machine. Consider strictly requiring \r\n (wait for more data if partial, otherwise treat as malformed input by resetting/marking complete/adding an error path) before moving back to .waitingForSize.
| // Skip the trailing CRLF. | |
| if buffer[buffer.startIndex] == 0x0D | |
| && buffer[buffer.index(after: buffer.startIndex)] == 0x0A { | |
| buffer = Data(buffer.suffix(from: buffer.startIndex + 2)) | |
| } | |
| let first = buffer[buffer.startIndex] | |
| let second = buffer[buffer.index(after: buffer.startIndex)] | |
| // Require a proper CRLF trailer; otherwise treat as malformed input. | |
| guard first == 0x0D, second == 0x0A else { | |
| state = .complete | |
| break loop | |
| } | |
| // Skip the trailing CRLF. | |
| buffer = Data(buffer.suffix(from: buffer.startIndex + 2)) |
| connectedHost = nil | ||
| connectedPort = 0 | ||
| readQueue.removeAll() | ||
| buffer.reset() | ||
| sseParser?.reset() | ||
| // Note: sseParser is intentionally NOT reset here so that | ||
| // lastEventId survives across reconnections. | ||
|
|
||
| delegateQueue.async { [weak self] in | ||
| guard let self = self else { return } | ||
| self.delegate?.socketDidDisconnect(self, withError: error) | ||
| if shouldAutoReconnect { | ||
| scheduleAutoReconnect() | ||
| } else { | ||
| sseParser?.reset() | ||
| delegateQueue.async { [weak self] in | ||
| guard let self = self else { return } | ||
| self.delegate?.socketDidDisconnect(self, withError: error) | ||
| } | ||
| } |
There was a problem hiding this comment.
disconnectInternal resets buffer and (in the non-auto-reconnect path) resets the sseParser, but it never resets chunkedDecoder. If a disconnect happens mid-chunk, the decoder may carry buffered partial state into the next connection on the same NWAsyncSocket instance, corrupting subsequent reads/SSE parsing. Consider resetting (or recreating) chunkedDecoder during disconnect when chunked decoding is enabled.
| private func disconnectInternal(error: Error?) { | ||
| guard isConnected || connection != nil else { return } | ||
|
|
||
| let shouldAutoReconnect = sseAutoReconnectEnabled && sseParser != nil && error != nil |
There was a problem hiding this comment.
Auto-reconnect is gated on error != nil (shouldAutoReconnect = ... && error != nil). However, readNextChunk() calls disconnectInternal(error: nil) when isComplete == true (EOF), which is also an unexpected disconnect scenario for SSE streams. If the goal is to recover from network switches / remote closes, consider triggering auto-reconnect on EOF/cancelled states as well (e.g., treat isComplete as an auto-reconnect-eligible disconnect, unless the user explicitly called disconnect()).
| let shouldAutoReconnect = sseAutoReconnectEnabled && sseParser != nil && error != nil | |
| let shouldAutoReconnect = sseAutoReconnectEnabled && sseParser != nil |
| private func extractLineBytes(from offset: Int) -> (Data, Int)? { | ||
| let start = lineBuffer.startIndex + offset | ||
| var i = offset | ||
| while i < lineBuffer.count { | ||
| let byte = lineBuffer[lineBuffer.startIndex + i] | ||
| if byte == SSEParser.CR { | ||
| let lineData = Data(lineBuffer[start..<(lineBuffer.startIndex + i)]) | ||
| // \r\n counts as a single line ending | ||
| if next < scalars.endIndex && scalars[next] == "\n" { | ||
| let afterCRLF = scalars.index(after: next) | ||
| return (String(scalars[scalars.startIndex..<lineEnd]), | ||
| String(scalars[afterCRLF...])) | ||
| if i + 1 < lineBuffer.count && lineBuffer[lineBuffer.startIndex + i + 1] == SSEParser.LF { | ||
| return (lineData, i + 2) | ||
| } else { | ||
| return (String(scalars[scalars.startIndex..<lineEnd]), | ||
| String(scalars[next...])) | ||
| return (lineData, i + 1) | ||
| } | ||
| } else if scalar == "\n" { | ||
| let lineEnd = idx | ||
| let next = scalars.index(after: idx) | ||
| return (String(scalars[scalars.startIndex..<lineEnd]), | ||
| String(scalars[next...])) | ||
| } else if byte == SSEParser.LF { | ||
| let lineData = Data(lineBuffer[start..<(lineBuffer.startIndex + i)]) | ||
| return (lineData, i + 1) | ||
| } | ||
| idx = scalars.index(after: idx) | ||
| i += 1 |
There was a problem hiding this comment.
The byte-level parser still copies data for every extracted line (let lineData = Data(lineBuffer[start..<...])) and also copies the remaining buffer when trimming consumed bytes (lineBuffer = Data(lineBuffer.suffix(...))). For high-throughput SSE, this can add significant allocation/copy overhead and partially undermines the intended optimization. Consider operating on Data.SubSequence/indices (or a ring buffer) to avoid per-line copies, and only materialize Data/String at dispatch time.
|
|
||
| // MARK: - Byte-by-byte delivery | ||
|
|
||
| func testByteByteFeed() { |
There was a problem hiding this comment.
Test name testByteByteFeed looks like a typo and reads ambiguously. Renaming to something like testByteByByteFeed (or similar) would improve clarity when scanning test output and failure reports.
| func testByteByteFeed() { | |
| func testByteByByteFeed() { |
SSE parser byte-level optimization, chunked decoding, auto-reconnect
Implements the four iOS-specific optimizations discussed in the issue: reducing ARC/CPU overhead in the SSE parser, handling proxied chunked streams, seamless reconnection on network switches, and thread-safety documentation.
SSEParser: byte-level accumulation
lineBufferchanged fromStringtoData; line-ending scan uses raw bytes (0x0A/0x0D)[UInt8]arrays — no String allocation per lineString(data:encoding:.utf8)deferred to event dispatch boundary (\n\n)ChunkedDecoder (new)
Transfer-Encoding: chunkedframing before SSE parsingenableChunkedDecoding()onNWAsyncSocketSSE auto-reconnect with Last-Event-ID
enableSSEAutoReconnect(retryInterval:)— on error disconnect, schedules reconnect preservinglastEventIdretry:field for backoff intervalsocket(_:willAutoReconnectWithLastEventId:afterDelay:)lets the app re-send the HTTP request withLast-Event-IDheaderdisconnect()cancels auto-reconnectThread safety docs
socketQueue/delegateQueueseparation in class-level and method-level doc comments (behavior was already correct)87 tests pass (71 existing + 16 new ChunkedDecoder tests). All existing SSEParser/StreamBuffer/ReadRequest tests unchanged.
✨ Let Copilot coding agent set things up for you — coding agent works faster and does higher quality work when set up for your repo.