Skip to content

Commit 81f3596

Browse files
authored
fix probe ping drain and upgrade drain race (#31)
1 parent 23153d5 commit 81f3596

12 files changed

Lines changed: 2331 additions & 4 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
object Consts {
22
const val releaseGroup = "com.piasy"
33
const val releaseName = "kmp-socketio"
4-
const val releaseVersion = "1.4.2"
4+
const val releaseVersion = "1.4.3"
55

66
val androidNS = "$releaseGroup.${releaseName.replace('-', '.')}"
77
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
完整分析
2+
3+
测试用例 reconnectManually 逻辑
4+
5+
1. 创建 socket 并连接
6+
2. 第一次 CONNECT 回调 → socket.close()
7+
3. DISCONNECT 回调 → 注册新的 CONNECT 监听 → socket.open() (手动重连)
8+
4. 第二次 CONNECT 回调 → socket.close(),向队列 offer("done")
9+
5. values.take() 阻塞等待 "done"
10+
11+
---
12+
Transport 实例总览
13+
14+
┌──────────┬────────────────────┬──────────────────────┬────────────────────┬─────────────────────┐
15+
│ 日志 │ 第一次连接 Polling │ 第一次连接 WS Probe │ 第二次连接 Polling │ 第二次连接 WS Probe │
16+
├──────────┼────────────────────┼──────────────────────┼────────────────────┼─────────────────────┤
17+
│ good.log │ polling@709407350 │ websocket@2087361714 │ polling@969147974 │ websocket@590078241 │
18+
├──────────┼────────────────────┼──────────────────────┼────────────────────┼─────────────────────┤
19+
│ err.log │ polling@1929439163 │ websocket@260110639 │ polling@2125549749 │ websocket@897766716 │
20+
└──────────┴────────────────────┴──────────────────────┴────────────────────┴─────────────────────┘
21+
22+
---
23+
关键发现:Manager 共享 scope
24+
25+
Manager.kt:100:
26+
val socket = EngineSocket(uri, opt, scope) // Manager 把自己的 scope 传给每个新的 EngineSocket
27+
28+
每次 Manager.open() 创建新的 EngineSocket 实例,但传入的是同一个 scope。这个 scope 同时也会传给所有 Transport(factory.create(name, opts, scope, rawMessage),见 EngineSocket.kt:222)。这意味着:新旧连接的所有
29+
Transport 共享同一个 CoroutineScope。
30+
31+
---
32+
good.log 执行流程(成功路径)
33+
34+
第一次连接
35+
36+
1. polling@709407350 创建并打开,发起 GET 握手请求
37+
2. 收到 Open 包(sid=npUAHeAD7Xe4wi4TAAAA),polling@709407350 状态 OPENING → OPEN
38+
3. EngineSocket onOpen,发送 Socket.IO Connect(40),同时启动 probe:创建 websocket@2087361714,发起 WS 握手
39+
4. polling@709407350 继续 poll,收到 40{"sid":"I6cxDxOi1VMI0lYEAAAB"} → Socket onConnect
40+
41+
第一次断开(socket.close())
42+
43+
5. Socket close → Manager 发送 Disconnect(41) → polling@709407350 send
44+
6. Manager destroy/close/cleanUp
45+
7. EngineSocket.close():state=OPEN, writeBuffer 非空(有 41 在发送中), upgrading=false
46+
- 注册 once(EVENT_DRAIN, ...) 等待 drain
47+
8. Socket onClose: io client disconnect
48+
49+
手动重连
50+
51+
9. Socket.open() → Manager.open(state=CLOSED) → 创建新的 EngineSocket(state=INIT)
52+
10. 新 EngineSocket setTransport(polling@969147974) → 发起新的 GET 握手
53+
54+
旧连接清理(关键时序)
55+
56+
11. websocket@2087361714 WS 101 握手成功,onOpen(时间戳 488)
57+
12. Probe 发送 2probe
58+
13. 旧 EngineSocket 的 drain 先于 probe pong 到达(时间戳 490):
59+
- polling@709407350 POST 41 返回 OK → onDrain → emit(EVENT_DRAIN)
60+
- close 的 drain handler 检查 upgrading == false ✓
61+
- 直接调用 closeAction() → onClose("force close")
62+
- onClose 关闭 polling@709407350,清空 writeBuffer
63+
14. websocket@2087361714 后续收到 3probe pong,但 probe handler 发现 socket 已 closed → 中止
64+
65+
第二次连接正常完成
66+
67+
15. polling@969147974 收到新 Open(sid=Q5cmf6rvfmLGKrBHAAAC),正常连接
68+
16. 收到 Socket.IO Connect → Socket onConnect(sid=0_W1hJWvCHgoG_ktAAAD)
69+
17. 收到 42["message","hello client"] → 回调触发 socket.close(),offer("done") → 测试通过
70+
71+
成功的关键:drain 在 probe pong 之前触发 → upgrading=false → 直接关闭,不走 upgrade 路径。
72+
73+
---
74+
err.log 执行流程(失败路径)
75+
76+
第一次连接(与 good.log 相同,直到 close)
77+
78+
1-4. 与 good.log 相同:polling@1929439163 建立连接,Socket onConnect(sid=Q_xqaV-43ilrJOWWAAAB)
79+
80+
第一次断开
81+
82+
5-8. 与 good.log 相同:发送 41,EngineSocket.close() 注册 drain handler
83+
84+
手动重连
85+
86+
9-10. 与 good.log 相同:创建新 EngineSocket,setTransport(polling@2125549749)
87+
88+
关键时序差异!
89+
90+
11. websocket@260110639 WS 101 握手成功,onOpen(时间戳 698)
91+
12. Probe 发送 2probe(ioScope.launch 异步发送,此处 ioScope 协程 A 启动)
92+
13. Probe pong 先于 drain 到达(时间戳 702):
93+
- 收到 3probe → EngineSocket probe transport websocket pong
94+
- upgrading = true ← 关键状态变更
95+
- 开始 pause polling@1929439163
96+
14. polling@1929439163 POST 41 返回 OK → onDrain(时间戳 705):
97+
- drain handler 检查 upgrading == true → 走 waitForUpgrade() 路径
98+
- 注册 once(EVENT_UPGRADE, cleanupAndClose) 和 once(EVENT_UPGRADE_ERROR, cleanupAndClose)
99+
15. Pause 完成(时间戳 705):
100+
- polling@1929439163 paused
101+
- EngineSocket changing transport and sending upgrade packet
102+
- 在 websocket@260110639 上注册 once(EVENT_DRAIN, upgradeHandler)
103+
- 发送 Upgrade 包 5(ioScope.launch 异步发送,协程 B 启动)
104+
16. Upgrade 包的 ioScope 协程 B 完成 → scope.launch 发布 drain 事件(时间戳 705,line 321):
105+
- websocket@260110639 doSend 1 packets finish(第一个 finish)
106+
- emit(EVENT_DRAIN, 1) on websocket@260110639
107+
- Emitter 快照 listeners:onceCallbacks[EVENT_DRAIN] = [upgradeHandler]
108+
- upgradeHandler 执行:
109+
a. emit(EVENT_UPGRADE, transport) → cleanupAndClose 触发:
110+
- onClose("force close")
111+
- writeBuffer.clear(), prevBufferLen = 0
112+
- state = State.CLOSED
113+
b. setTransport(websocket@260110639) → 在 websocket@260110639 上添加新的持久 EVENT_DRAIN callback listener(On.on → transport.on)
114+
c. upgrading = false, flush() → state CLOSED 忽略
115+
17. 2probe 的 ioScope 协程 A 延迟完成 → scope.launch 发布 drain 事件(时间戳 712,line 405):
116+
- websocket@260110639 doSend 1 packets finish(第二个 finish)
117+
- emit(EVENT_DRAIN, 1) on websocket@260110639
118+
- Emitter 快照 listeners:callbacks[EVENT_DRAIN] = [setTransport 刚注册的持久 listener]
119+
- setTransport 的 listener 调用 onDrain(1)
120+
- onDrain: prevBufferLen=0, writeBuffer.size=0, len=1
121+
- writeBuffer.removeAt(0) → ArrayDeque is empty 💥 CRASH
122+
123+
Exception: java.util.NoSuchElementException: ArrayDeque is empty.
124+
at EngineSocket.onDrain(EngineSocket.kt:280)
125+
at EngineSocket$setTransport$1.call(EngineSocket.kt:246)
126+
at Emitter.emit(Emitter.kt:136)
127+
at WebSocket$doSend$2$3.invokeSuspend(WebSocket.kt:172)
128+
129+
崩溃后的连锁反应
130+
131+
18. 异常发生在 scope.launch(WebSocket.kt:169)内部,scope 是 Manager 的共享 scope
132+
19. 未捕获的异常导致 共享 scope 的 Job 被取消
133+
20. 第二次连接的 polling@2125549749 虽然收到了 HTTP GET 响应(40{"sid":"N11GQoFftXqh9245AAAD"},line 457),但 scope.launch { onPollComplete(...) } 无法执行(scope 已取消)
134+
21. onPollComplete 从未被调用(日志中完全没有出现第二次连接的 onPollComplete)
135+
22. 第二次连接永远无法收到 Socket.IO Connect → Socket onConnect 不触发 → "done" 永远不会 offer → 测试 7 秒超时
136+
137+
---
138+
根因总结
139+
140+
存在两层竞态条件:
141+
142+
竞态 1:Close-drain vs Probe-pong
143+
- EngineSocket.close() 注册 drain handler,其中检查 upgrading 决定走哪条路径
144+
- probe() 中收到 pong 会设置 upgrading = true
145+
- 如果 drain 先于 pong → upgrading=false → 直接关闭(good.log)
146+
- 如果 pong 先于 drain → upgrading=true → 走 upgrade 路径,最终调用 setTransport(websocket)
147+
148+
竞态 2:2probe-drain vs Upgrade-drain
149+
- websocket@260110639 的 doSend 被调用了两次:一次发 2probe,一次发 Upgrade
150+
- 每次 doSend 都在 ioScope.launch 中异步发送数据,完成后通过 scope.launch 发射 EVENT_DRAIN
151+
- 这两个 ioScope.launch 的完成顺序不确定
152+
- 如果 Upgrade 的 drain 先触发:upgradeHandler 消费了 once listener → setTransport 注册了新的持久 drain listener
153+
- 然后 2probe 的 drain 延迟触发:hit 到 setTransport 注册的新 listener → onDrain(1) 在空 writeBuffer 上操作 → 崩溃
154+
155+
致命后果:崩溃发生在 Manager 共享的 CoroutineScope 中(Manager.kt:100 将同一个 scope 传给新旧 EngineSocket),导致整个 scope 被取消,新连接的所有协程调度都失效,测试超时。

0 commit comments

Comments
 (0)