Skip to content

Commit be5db50

Browse files
zhimburacursoragent
andcommitted
Address PR #30 review: fix cross-platform tests, remove redundant APIs
- Fix commonTest for all platforms: drop JVM-only imports, use encodeToByteArray/ByteString comparison - Remove Manager.openSuspend() entirely - Remove Socket.emitSuspend(event, *args) without ack; use socket.emit() in tests/examples - RESERVED_EVENTS back to private; drop server-side events (disconnecting, newListener, removeListener) - Document Flow vs listener API in Emitter.flow() KDoc; flowWithReplay simplified - README and examples updated Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 275ffab commit be5db50

6 files changed

Lines changed: 56 additions & 139 deletions

File tree

README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ socket.flow("echoBack").collect { args ->
8686
// Open socket using suspend function
8787
socket.openSuspend()
8888

89-
// Emit event using suspend function
89+
// Emit event using regular emit; suspend is only needed for ack
9090
val bin = UnsafeByteStringOperations.wrapUnsafe(byteArrayOf(0x1, 0x3, 0x1, 0x4))
91-
socket.emitSuspend("echo", 1, "2", bin, GMTDate())
91+
socket.emit("echo", 1, "2", bin, GMTDate())
9292

9393
// Emit with suspend acknowledgement callback (trailing lambda syntax)
9494
socket.emitSuspend("echoWithAck", 42, "test") { args ->
@@ -104,9 +104,7 @@ Most of the APIs are the same as socket.io-client-java, here are some difference
104104
### Available Extension Functions
105105

106106
- `IO.socketSuspend(uri, opt)` - Create socket using coroutines
107-
- `Manager.openSuspend()` - Open manager using coroutines
108107
- `Socket.openSuspend()` - Open socket using coroutines
109-
- `Socket.emitSuspend(event, *args)` - Emit event using coroutines
110108
- `Socket.emitSuspend(event, *args, ack: suspend (Array<out Any>) -> Unit)` - Emit with suspend acknowledgement callback (trailing lambda)
111109
- `Manager.stateFlow` - StateFlow for tracking manager state
112110
- `Socket.connectedFlow` - StateFlow for tracking connection state

example/shared/src/commonMain/kotlin/com/piasy/kmp/socketio/example/CoroutinesExample.kt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ class CoroutinesExample {
5858
// Wait a bit for connection
5959
kotlinx.coroutines.delay(1000)
6060

61-
// Emit event using suspend function
61+
// Emit event using regular API; suspend is only needed for ack
6262
val bin = UnsafeByteStringOperations.wrapUnsafe(byteArrayOf(0x1, 0x3, 0x1, 0x4))
63-
socket.emitSuspend("echo", 1, "2", bin, GMTDate())
63+
socket.emit("echo", 1, "2", bin, GMTDate())
6464

6565
// Emit with suspend acknowledgement callback (trailing lambda syntax)
6666
socket.emitSuspend("echoWithAck", 42, "test") { args ->
@@ -98,7 +98,7 @@ class CoroutinesExample {
9898
// Example 1: Handle errors in suspend functions with try-catch
9999
try {
100100
socket.openSuspend()
101-
socket.emitSuspend("someEvent", "data")
101+
socket.emit("someEvent", "data")
102102
} catch (e: Exception) {
103103
println("Error occurred: ${e.message}")
104104
// Handle error appropriately
@@ -263,7 +263,7 @@ class CoroutinesExample {
263263
delay(1000)
264264

265265
// Use socket for some operations
266-
socket.emitSuspend("someEvent", "data")
266+
socket.emit("someEvent", "data")
267267

268268
// Step 2: Close old socket properly
269269
println("Closing old socket...")
@@ -293,7 +293,7 @@ class CoroutinesExample {
293293
delay(1000)
294294

295295
// Use new socket
296-
socket.emitSuspend("someEvent", "new_data")
296+
socket.emit("someEvent", "new_data")
297297

298298
delay(1000)
299299

@@ -337,7 +337,7 @@ class CoroutinesExample {
337337
try {
338338
socket.openSuspend()
339339
delay(1000)
340-
socket.emitSuspend("test", "data")
340+
socket.emit("test", "data")
341341
delay(1000)
342342
} catch (e: Exception) {
343343
println("Error: ${e.message}")

kmp-socketio/src/commonMain/kotlin/com/piasy/kmp/socketio/socketio/CoroutinesExtensions.kt

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package com.piasy.kmp.socketio.socketio
22

33
import com.piasy.kmp.socketio.emitter.Emitter
4-
import com.piasy.kmp.socketio.engineio.State
5-
import com.piasy.kmp.socketio.engineio.WorkThread
64
import com.piasy.kmp.xlog.Logging
75
import kotlinx.coroutines.CoroutineScope
86
import kotlinx.coroutines.Dispatchers
@@ -11,32 +9,6 @@ import kotlinx.coroutines.suspendCancellableCoroutine
119
import kotlin.coroutines.resume
1210
import kotlin.coroutines.resumeWithException
1311

14-
/**
15-
* Extension function to open Manager using coroutines.
16-
* Converts the callback-based open() method to a suspend function.
17-
*
18-
* @throws Exception if connection fails or times out.
19-
*/
20-
@WorkThread
21-
suspend fun Manager.openSuspend() {
22-
suspendCancellableCoroutine<Unit> { continuation ->
23-
var callbackInvoked = false
24-
open { error ->
25-
if (!callbackInvoked) {
26-
callbackInvoked = true
27-
if (error.isEmpty()) {
28-
continuation.resume(Unit)
29-
} else {
30-
continuation.resumeWithException(Exception(error))
31-
}
32-
}
33-
}
34-
continuation.invokeOnCancellation {
35-
// Cleanup if cancelled
36-
}
37-
}
38-
}
39-
4012
/**
4113
* Extension function to open Socket using coroutines.
4214
* Converts the callback-based open() method to a suspend function.
@@ -71,32 +43,6 @@ suspend fun Socket.openSuspend() {
7143
}
7244
}
7345

74-
/**
75-
* Extension function to emit event using coroutines.
76-
* Supports both regular Ack and suspend lambda for acknowledgements.
77-
*
78-
* @param event event name
79-
* @param args only accepts String/Boolean/Number/JsonElement/ByteString
80-
*/
81-
suspend fun Socket.emitSuspend(event: String, vararg args: Any) {
82-
// Check reserved events manually (since RESERVED_EVENTS is private)
83-
val reservedEvents = setOf(
84-
Socket.EVENT_CONNECT,
85-
Socket.EVENT_CONNECT_ERROR,
86-
Socket.EVENT_DISCONNECT,
87-
"disconnecting",
88-
"newListener",
89-
"removeListener",
90-
)
91-
if (reservedEvents.contains(event)) {
92-
Logging.error(Socket.TAG, "emit reserved event: $event")
93-
return
94-
}
95-
96-
// Regular emit without ack - just call the existing method
97-
emit(event, *args)
98-
}
99-
10046
/**
10147
* Extension function to emit event with suspend acknowledgement callback.
10248
*
@@ -105,28 +51,14 @@ suspend fun Socket.emitSuspend(event: String, vararg args: Any) {
10551
* @param ack suspend lambda that will be called when acknowledgement is received (trailing lambda)
10652
*/
10753
suspend fun Socket.emitSuspend(event: String, vararg args: Any, ack: suspend (Array<out Any>) -> Unit) {
108-
// Check reserved events manually (since RESERVED_EVENTS is private)
109-
val reservedEvents = setOf(
110-
Socket.EVENT_CONNECT,
111-
Socket.EVENT_CONNECT_ERROR,
112-
Socket.EVENT_DISCONNECT,
113-
"disconnecting",
114-
"newListener",
115-
"removeListener",
116-
)
117-
if (reservedEvents.contains(event)) {
118-
Logging.error(Socket.TAG, "emit reserved event: $event")
119-
return
120-
}
121-
12254
// Use suspendCancellableCoroutine to wait for ack
12355
suspendCancellableCoroutine<Unit> { continuation ->
12456
val ackWrapper = object : Ack {
125-
override fun call(vararg ackArgs: Any) {
57+
override fun call(vararg args: Any) {
12658
// Use CoroutineScope since socket's scope is private
12759
CoroutineScope(Dispatchers.Default).launch {
12860
try {
129-
ack(ackArgs)
61+
ack(args)
13062
continuation.resume(Unit)
13163
} catch (e: Exception) {
13264
continuation.resumeWithException(e)

kmp-socketio/src/commonMain/kotlin/com/piasy/kmp/socketio/socketio/Socket.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -466,15 +466,10 @@ class Socket(
466466

467467
const val EVENT_MESSAGE = "message"
468468
const val EVENT_ERROR = Manager.EVENT_ERROR
469-
470469
private val RESERVED_EVENTS = setOf(
471470
EVENT_CONNECT,
472471
EVENT_CONNECT_ERROR,
473472
EVENT_DISCONNECT,
474-
// used on the server-side
475-
"disconnecting",
476-
"newListener",
477-
"removeListener",
478473
)
479474
}
480475
}

kmp-socketio/src/commonMain/kotlin/com/piasy/kmp/socketio/socketio/StateFlowExtensions.kt

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package com.piasy.kmp.socketio.socketio
33
import com.piasy.kmp.socketio.emitter.Emitter
44
import com.piasy.kmp.socketio.engineio.State
55
import kotlinx.coroutines.channels.Channel
6+
import kotlinx.coroutines.channels.awaitClose
67
import kotlinx.coroutines.flow.Flow
78
import kotlinx.coroutines.flow.MutableSharedFlow
89
import kotlinx.coroutines.flow.MutableStateFlow
910
import kotlinx.coroutines.flow.StateFlow
1011
import kotlinx.coroutines.flow.asSharedFlow
1112
import kotlinx.coroutines.flow.asStateFlow
13+
import kotlinx.coroutines.flow.callbackFlow
1214

1315
/**
1416
* Extension property to get StateFlow for Manager state.
@@ -79,32 +81,33 @@ val Socket.socketIdFlow: StateFlow<String>
7981
}
8082

8183
/**
82-
* Extension function to get Flow for Emitter events.
83-
* Allows using coroutines to listen to events.
84-
*
84+
* Extension function to get a cold [Flow] for [Emitter] events.
85+
*
86+
* Differences from the original listener API:
87+
* - integrates with coroutines (cancellation, operators, structured concurrency);
88+
* - automatically subscribes/unsubscribes the underlying listener with the Flow lifecycle.
89+
*
8590
* @param event event name
86-
* @return Flow that emits event arguments
91+
* @return cold [Flow] that emits event arguments while it is collected
8792
*/
88-
fun Emitter.flow(event: String): Flow<Array<out Any>> {
89-
val sharedFlow = MutableSharedFlow<Array<out Any>>(
90-
replay = 0,
91-
extraBufferCapacity = Channel.UNLIMITED
92-
)
93-
94-
val listener = object : Emitter.Listener {
95-
override fun call(vararg args: Any) {
96-
sharedFlow.tryEmit(args)
93+
fun Emitter.flow(event: String): Flow<Array<out Any>> =
94+
callbackFlow {
95+
val listener = object : Emitter.Listener {
96+
override fun call(vararg args: Any) {
97+
trySend(args).isSuccess
98+
}
99+
}
100+
101+
on(event, listener)
102+
103+
awaitClose {
104+
off(event, listener)
97105
}
98106
}
99-
100-
on(event, listener)
101-
102-
return sharedFlow.asSharedFlow()
103-
}
104107

105108
/**
106109
* Extension function to get Flow for Emitter events with replay buffer.
107-
*
110+
*
108111
* @param event event name
109112
* @param replay number of events to replay to new subscribers
110113
* @return Flow that emits event arguments
@@ -114,14 +117,14 @@ fun Emitter.flowWithReplay(event: String, replay: Int = 1): Flow<Array<out Any>>
114117
replay = replay,
115118
extraBufferCapacity = Channel.UNLIMITED
116119
)
117-
120+
118121
val listener = object : Emitter.Listener {
119122
override fun call(vararg args: Any) {
120123
sharedFlow.tryEmit(args)
121124
}
122125
}
123-
126+
124127
on(event, listener)
125-
128+
126129
return sharedFlow.asSharedFlow()
127130
}

0 commit comments

Comments
 (0)