-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrecv.go
More file actions
38 lines (37 loc) · 761 Bytes
/
recv.go
File metadata and controls
38 lines (37 loc) · 761 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package rx
func Recv[T any](ch <-chan T) Observable[T] {
return func(observe Observer[T], scheduler Scheduler, subscriber Subscriber) {
runner := scheduler.ScheduleRecursive(func(again func()) {
if !subscriber.Subscribed() {
return
}
select {
case next, ok := <-ch:
if !subscriber.Subscribed() {
return
}
if ok {
err, ok := any(next).(error)
if !ok {
observe(next, nil, false)
if !subscriber.Subscribed() {
return
}
again()
} else {
var zero T
observe(zero, err, true)
}
} else {
var zero T
observe(zero, nil, true)
}
case <-subscriber.Context().Done():
return
}
})
subscriber.OnUnsubscribe(func() {
runner.Cancel()
})
}
}