Skip to content

Commit 4514075

Browse files
authored
Merge pull request #683 from ESTiOSAI/test/reconnectable-websocket
웹소켓 Unit Test & 네트워크 백오프 작업
2 parents f22c0f0 + ca16579 commit 4514075

12 files changed

Lines changed: 527 additions & 219 deletions

File tree

AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift

Lines changed: 0 additions & 60 deletions
This file was deleted.

AIProject/iCo/Core/Util/Async+BroadCaster.swift renamed to AIProject/iCo/Core/Remote/WebSocket/Util/Async+BroadCaster.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ public class AsyncStreamBroadcaster<Element> {
2020
/// 구독할 continuation 값들
2121
private var continuations: [UUID: AsyncStream<Element>.Continuation] = [:]
2222

23+
public init() {}
24+
2325
/// 구독 메서드로 stream을 반환
2426
/// - Returns: stream 반환
2527
public func stream() -> AsyncStream<Element> {
File renamed without changes.

AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift

Lines changed: 89 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,35 @@
11
import Foundation
22
import AsyncAlgorithms
33

4-
public final class WebSocketClient: NSObject {
4+
public class WebSocketClient: NSObject, WebSocketProvider {
55
/// 소켓 상태 채널
66
private var stateStream: AsyncStream<WebSocket.State>
77
/// WebSocket의 상태 변화를 여러 Consumer에게 동시에 전달하는 브로드캐스터
8-
public var stateBroadCaster: AsyncStreamBroadcaster<WebSocket.State> = .init()
8+
public var stateBroadCaster: AsyncStreamBroadcaster<WebSocket.State>
99
/// 메세지 채널
1010
public var incomingChannel: AsyncChannel<URLSessionWebSocketTask.Message>
1111

12-
private let url: URL
13-
private let session: URLSession
14-
private var task: URLSessionWebSocketTask?
12+
private(set) var url: URL
13+
private(set) var session: URLSessionType
14+
private(set) var task: WebSocketType?
1515

16-
private var stateTask: Task<Void, Error>?
17-
private var receiveTask: Task<Void, Error>?
16+
private(set) var stateTask: Task<Void, Error>?
17+
private(set) var receiveTask: Task<Void, Error>?
1818

1919
/// 핑 전송 task
20-
private var healthCheck: Task<Void, Error>?
20+
private(set) var healthCheck: Task<Void, Error>?
2121
private var pingInterval: Duration = .seconds(30)
2222
private var pingTimeout: Duration = .seconds(10)
23+
private var attempts: Int = 0
2324

24-
public init(url: URL, session: URLSession = .shared) {
25+
public init(
26+
url: URL,
27+
session: URLSessionType = URLSession.shared,
28+
stateBroadCaster: AsyncStreamBroadcaster<WebSocket.State> = .init()
29+
) {
2530
self.url = url
2631
self.session = session
32+
self.stateBroadCaster = stateBroadCaster
2733

2834
stateStream = stateBroadCaster.stream()
2935
incomingChannel = AsyncChannel<URLSessionWebSocketTask.Message>()
@@ -35,12 +41,9 @@ public final class WebSocketClient: NSObject {
3541
/// 웹소켓 세션을 연결하고 작업을 생성합니다.
3642
public func connect() async {
3743
await stateBroadCaster.send(.connecting)
38-
self.task = session.webSocketTask(with: url)
44+
self.task = session.makeWebSocketTask(with: url)
3945
task?.delegate = self
4046
task?.resume()
41-
42-
// 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음
43-
try? await performWithTimeout(sendPing, at: pingTimeout)
4447
}
4548

4649
/// 명시적으로 현재 WebSocket 연결을 정상적으로 종료합니다.
@@ -52,73 +55,33 @@ public final class WebSocketClient: NSObject {
5255

5356
/// 텍스트 형태의 메시지를 WebSocket 서버로 전송합니다.
5457
public func send(text: String) async throws {
55-
try await task?.send(.string(text))
58+
guard let task else { throw URLError(.notConnectedToInternet) }
59+
try await task.send(.string(text))
5660
}
5761

5862
/// 바이너리(Data) 형태의 메시지를 WebSocket 서버로 전송합니다.
5963
public func send(data: Data) async throws {
60-
try await task?.send(.data(data))
61-
}
62-
63-
deinit {
64-
debugPrint(String(describing: Self.self), #function)
65-
task?.cancel()
66-
task = nil
67-
stateBroadCaster.finish()
68-
incomingChannel.finish()
69-
}
70-
}
71-
72-
// MARK: - Test용 메소드
73-
// TODO: Deprecated 예정입니다.
74-
extension WebSocketClient {
75-
public func sendState(with state: WebSocket.State) async {
76-
await stateBroadCaster.send(state)
77-
}
78-
79-
public func cancel(with code: URLSessionWebSocketTask.CloseCode) {
80-
task?.cancel(with: code, reason: nil)
81-
task = nil
82-
}
83-
84-
public func cancel() {
85-
task?.cancel()
86-
task = nil
64+
guard let task else { throw URLError(.notConnectedToInternet) }
65+
try await task.send(.data(data))
8766
}
8867
}
8968

9069
// MARK: - Private
9170
extension WebSocketClient {
92-
/// 서버로 Ping 프레임을 전송하여 연결 상태를 확인합니다.
93-
private func sendPing() async throws {
94-
return try await withCheckedThrowingContinuation { continuation in
95-
task?.sendPing { error in
96-
Task {
97-
if let error {
98-
debugPrint("Ping Failed: \(error)")
99-
continuation.resume(throwing: error)
100-
return
101-
}
102-
103-
continuation.resume()
104-
}
105-
}
106-
}
107-
}
108-
10971
/// WebSocket의 상태 변화를 관찰하고 각 상태에 맞는 동작을 수행합니다.
11072
private func observeState() {
11173
stateTask = Task {
112-
for await state in stateStream {
74+
for await state in stateStream.removeDuplicates() {
11375
switch state {
11476
case .connecting:
11577
debugPrint("Connecting")
11678
continue
11779
case .connected:
11880
debugPrint("Connected")
81+
clearAttempts()
11982
receive()
12083
checkingAlive()
121-
case .failed, .closed:
84+
case .closed:
12285
debugPrint("Closed")
12386
release()
12487
case .reconnecting:
@@ -129,37 +92,73 @@ extension WebSocketClient {
12992
}
13093
}
13194

132-
// FIXME: 개선이 필요한지 한 번 더 생각해보기
13395
/// 서버로부터 WebSocket 메시지를 지속적으로 수신합니다.
13496
private func receive() {
135-
receiveTask?.cancel()
136-
13797
receiveTask = Task {
138-
do {
139-
guard let task else { return }
98+
while true {
99+
guard let task else { throw CancellationError() }
140100
let message = try await task.receive()
141101
await incomingChannel.send(message)
142-
receive()
143-
} catch {
144-
print("종료되어 더 이상 웹소켓 데이터를 받지 않습니다.")
145102
}
146103
}
147104
}
148105

149106
/// 주기적으로 Ping을 전송하여 WebSocket 연결 상태를 점검합니다.
150107
private func checkingAlive() {
151-
healthCheck?.cancel()
152-
153108
healthCheck = Task {
154109
do {
155110
while true {
111+
try await performWithTimeout(sendPing, at: pingTimeout)
156112
try await Task.sleep(until: .now + pingInterval)
157-
try await performWithTimeout(sendPing, at: .seconds(10))
158113
}
159114
} catch is CancellationError {
160115
debugPrint("작업이 취소되었습니다.")
161116
} catch {
162-
await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2)))
117+
if handlePingError(error) {
118+
if task?.state == .running {
119+
task?.cancel()
120+
}
121+
}
122+
}
123+
}
124+
}
125+
126+
/// sendPing(:) 으로부터 받은 에러를 핸들링하는 메소드입니다.
127+
/// - Parameter error: 에러를 전달받습니다.
128+
/// - Returns: 재연결해야 한다면 true를 반환합니다.
129+
private func handlePingError(_ error: Error) -> Bool {
130+
if let urlError = error as? URLError {
131+
switch urlError.code { // URLError (네트워크 단절)
132+
case .notConnectedToInternet, .networkConnectionLost:
133+
return true
134+
default:
135+
return false
136+
}
137+
} else if let posixError = error as? POSIXError {
138+
switch posixError.code { // POSIXError (소켓이 죽음)
139+
case .EPIPE, .ECONNRESET:
140+
return true
141+
default:
142+
return false
143+
}
144+
} else { // 소켓이 정상상태가 아님.
145+
return true
146+
}
147+
}
148+
149+
/// 서버로 Ping 프레임을 전송하여 연결 상태를 확인합니다.
150+
private func sendPing() async throws {
151+
return try await withCheckedThrowingContinuation { continuation in
152+
task?.sendPing { error in
153+
Task {
154+
if let error {
155+
debugPrint("Ping Failed: \(error)")
156+
continuation.resume(throwing: error)
157+
return
158+
}
159+
160+
continuation.resume()
161+
}
163162
}
164163
}
165164
}
@@ -169,17 +168,28 @@ extension WebSocketClient {
169168
if userClose {
170169
await stateBroadCaster.send(.closed)
171170
} else {
172-
await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2)))
171+
await stateBroadCaster.send(.reconnecting)
173172
}
174173
}
175174

175+
private func clearAttempts() {
176+
attempts = 0
177+
}
178+
179+
/// WebSocket 재연결시에 백오프를 적용합니다.
180+
/// - Returns: 백오프하는 시간을 Int 타입으로 반환합니다.
181+
private func backoff() -> Int {
182+
attempts += 1
183+
let base = min(pow(2.0, Double(attempts)) * 100.0, 10000)
184+
let jitter = Double.random(in: 0.5...1.0)
185+
186+
return Int(base * jitter)
187+
}
188+
176189
/// WebSocket 재연결을 시도합니다.
177190
private func reconnect() async {
178-
guard task?.state != .running else {
179-
return
180-
}
181-
182-
try? await Task.sleep(for: .seconds(2))
191+
if task?.state == .running || attempts > 10 { return }
192+
try? await Task.sleep(for: .milliseconds(backoff()))
183193
await connect()
184194
}
185195

@@ -189,11 +199,6 @@ extension WebSocketClient {
189199
receiveTask = nil
190200
healthCheck?.cancel()
191201
healthCheck = nil
192-
193-
if task?.state == .running {
194-
task?.cancel(with: .goingAway, reason: nil)
195-
}
196-
197202
task = nil
198203
}
199204
}
@@ -213,7 +218,7 @@ extension WebSocketClient: URLSessionWebSocketDelegate {
213218
// 1. 네트워크 닫힘, 2. 에러로 종료, 3. 정상적으로 완료
214219
public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) {
215220
if let _ = error {
216-
Task { await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) }
221+
Task { await stateBroadCaster.send(.reconnecting) }
217222
}
218223
}
219224
}

0 commit comments

Comments
 (0)