We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 9b68aeb commit d762e71Copy full SHA for d762e71
1 file changed
intra/core/volatileflow.go
@@ -9,6 +9,7 @@ import (
9
"context"
10
"slices"
11
"sync"
12
+ "time"
13
)
14
15
type FlowFunc[T any] func(v T)
@@ -32,6 +33,15 @@ func (f FlowOn[T]) flow(v T) (flowed bool) {
32
33
return false
34
}
35
36
+func (f FlowOn[T]) obsolete() bool {
37
+ select {
38
+ case <-f.ctx.Done():
39
+ return true
40
+ default:
41
+ }
42
+ return false
43
+}
44
+
45
type Flow[T any] struct {
46
ctx context.Context
47
@@ -77,6 +87,16 @@ func (f *Flow[T]) stream() {
77
87
78
88
f.removeFinallys(notflowing)
79
89
}()
90
+ case <-time.Tick(3 * time.Hour):
91
+ go func() {
92
+ notflowing := make(map[FlowOn[T]]struct{})
93
+ for _, o := range f.observers() {
94
+ if o.obsolete() {
95
+ notflowing[o] = struct{}{}
96
97
98
+ f.removeFinallys(notflowing)
99
+ }()
80
100
81
101
82
102
0 commit comments