-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathring.go
More file actions
86 lines (72 loc) · 1.59 KB
/
ring.go
File metadata and controls
86 lines (72 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package onering
import (
"runtime"
"sync/atomic"
"unsafe"
)
type ring struct {
_ [7]int64
wp int64
_ [7]int64
rp int64
_ [7]int64
rc int64 // reader cache
_ [7]int64
data []unsafe.Pointer
mask int64
size int64
maxbatch int64
done int32
}
func (r *ring) init(n *New) {
r.data = make([]unsafe.Pointer, roundUp2(n.Size))
r.mask = int64(len(r.data) - 1)
var bs = n.BatchSize
if bs == 0 {
bs = DefaultMaxBatch
}
r.maxbatch = int64(roundUp2(bs) - 1)
}
func (r *ring) Close() {
atomic.AddInt32(&r.done, 1)
}
func (r *ring) Done() bool {
return atomic.LoadInt64(&r.wp) <= atomic.LoadInt64(&r.rp) && atomic.LoadInt32(&r.done) > 0
}
func (r *ring) wait() {
runtime.Gosched()
}
func (r *ring) waitForEq(data *int64, val int64) (keep bool) {
for keep = true; keep && atomic.LoadInt64(data) != val; runtime.Gosched() {
keep = atomic.LoadInt64(&r.wp) > atomic.LoadInt64(&r.rp) || atomic.LoadInt32(&r.done) == 0
}
return
}
type multi struct {
_ int64
ring
_ [42]byte
seq []int64
}
func (c *multi) init(n *New) {
c.ring.init(n)
c.size = int64(len(c.data))
c.seq = make([]int64, len(c.data))
for i := range c.seq {
c.seq[i] = int64(i)
}
c.wp = 1 // just to avoid 0-awkwardness with seq
c.rp = 1
c.rc = c.rp
}
func (c *multi) next(p *int64) int64 {
return atomic.AddInt64(p, 1) - 1
}
func (c *multi) frame(p int64) (data *unsafe.Pointer, seq *int64) {
var pos = c.mask & p
return &c.data[pos], &c.seq[pos]
}
// empty sync.Locker for conditionals
type nolock struct{}
func (nolock) Lock() {}
func (nolock) Unlock() {}