From ec84d0eaa1cf8f9204cd8560b828cc73b0d6838b Mon Sep 17 00:00:00 2001 From: Takeyoshi Kikuchi Date: Sun, 1 Mar 2026 13:37:56 +0900 Subject: [PATCH] Send MQTT packets atomically in a single socket write Previously the fixed header and payload were sent in separate async socket writes. Since each write awaits, other tasks could interleave writes on the same socket, potentially corrupting packet framing. This change builds a single contiguous buffer and sends it in one async write call to ensure packet atomicity. Signed-off-by: Takeyoshi Kikuchi --- nmqtt.nim | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nmqtt.nim b/nmqtt.nim index 56b74e5..be35500 100644 --- a/nmqtt.nim +++ b/nmqtt.nim @@ -355,8 +355,8 @@ proc send(ctx: MqttCtx, pkt: Pkt): Future[bool] {.async.} = if ctx.state notin {Connecting, Connected, Disconnecting}: return false - var hdr: seq[uint8] - hdr.add (pkt.typ.int shl 4).uint8 or pkt.flags + var buf: seq[uint8] = newSeqOfCap[uint8](1 + 4 + pkt.data.len) + buf.add (pkt.typ.int shl 4).uint8 or pkt.flags var len = pkt.data.len while true: @@ -364,15 +364,16 @@ proc send(ctx: MqttCtx, pkt: Pkt): Future[bool] {.async.} = len = len div 128 if len > 0: b = b or 128 - hdr.add b.uint8 + buf.add b.uint8 if len == 0: break ctx.dmp "tx> " & $pkt - await ctx.s.send(hdr[0].unsafeAddr, hdr.len) - if pkt.data.len > 0: - await ctx.s.send(pkt.data[0].unsafeAddr, pkt.data.len) + let hdrlen = buf.len + buf.setLen(hdrlen + pkt.data.len) + copyMem(buf[hdrlen].addr, pkt.data[0].unsafeAddr, pkt.data.len) + await ctx.s.send(buf[0].unsafeAddr, buf.len) return true