-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathchat_test_helpers_test.go
More file actions
221 lines (197 loc) · 6.59 KB
/
chat_test_helpers_test.go
File metadata and controls
221 lines (197 loc) · 6.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package getstream_test
import (
"bytes"
"context"
"io"
"math/rand"
"net/http"
"strconv"
"strings"
"testing"
"time"
. "github.com/GetStream/getstream-go/v4"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
// rateLimitClient wraps an HttpClient and automatically retries on 429 responses
// with exponential backoff and jitter to avoid thundering herd.
type rateLimitClient struct {
inner HttpClient
}
func (c *rateLimitClient) Do(r *http.Request) (*http.Response, error) {
// Buffer the body so we can replay it on retry
var bodyBytes []byte
if r.Body != nil {
var err error
bodyBytes, err = io.ReadAll(r.Body)
if err != nil {
return nil, err
}
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
const maxRetries = 8
for i := 0; i < maxRetries; i++ {
if i > 0 && bodyBytes != nil {
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
resp, err := c.inner.Do(r)
if err != nil {
return resp, err
}
if resp.StatusCode != http.StatusTooManyRequests {
return resp, nil
}
// Drain and close the 429 response body before retrying
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
return resp, err
}
resp.Body.Close()
// Use the reset header if available, otherwise exponential backoff + jitter
backoff := rateLimitBackoff(resp.Header, i)
time.Sleep(backoff)
}
// Exhausted retries — replay one last time and return whatever we get
if bodyBytes != nil {
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
return c.inner.Do(r)
}
// rateLimitBackoff calculates a backoff duration from the rate limit reset header.
// Falls back to exponential backoff with jitter if the header is missing.
func rateLimitBackoff(headers http.Header, attempt int) time.Duration {
if resetStr := headers.Get("X-Ratelimit-Reset"); resetStr != "" {
if resetUnix, err := strconv.ParseInt(resetStr, 10, 64); err == nil && resetUnix > 0 {
wait := time.Until(time.Unix(resetUnix, 0))
if wait > 0 && wait < 90*time.Second {
// Add small jitter to desynchronize concurrent retries
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
return wait + jitter
}
}
}
// Exponential backoff: 2s, 4s, 8s, 16s, capped at 30s — plus jitter
base := time.Duration(1<<uint(attempt+1)) * time.Second
if base > 30*time.Second {
base = 30 * time.Second
}
jitter := time.Duration(rand.Intn(1000)) * time.Millisecond
return base + jitter
}
// newRateLimitClient wraps an http.Client with automatic 429 retry.
// The inner client timeout applies per-attempt, not for the whole retry chain.
func newRateLimitClient() *rateLimitClient {
return &rateLimitClient{inner: &http.Client{Timeout: 60 * time.Second}}
}
// requireNoErrorOrSkipRateLimit asserts no error, but skips the test if the
// error is a rate limit ("Too many requests"). Use this for API calls that are
// heavily contended in parallel test runs.
func requireNoErrorOrSkipRateLimit(t *testing.T, err error) {
t.Helper()
if err != nil {
if strings.Contains(err.Error(), "Too many requests") {
t.Skipf("Skipping: rate limited after retries (%s)", t.Name())
}
}
require.NoError(t, err)
}
// deleteUsersWithRetry calls DeleteUsers with retry logic to handle rate limiting.
// Used in t.Cleanup to avoid "Too many requests" failures.
// The rateLimitClient handles 429 retries at the HTTP level, so this just
// does a single attempt — cleanup failures are acceptable.
func deleteUsersWithRetry(client *Stream, userIDs []string) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
_, _ = client.DeleteUsers(ctx, &DeleteUsersRequest{
UserIds: userIDs,
User: PtrTo("hard"),
Messages: PtrTo("hard"),
Conversations: PtrTo("hard"),
})
}
// skipIfShort skips integration tests when running with -short flag.
func skipIfShort(t *testing.T) {
t.Helper()
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
}
// createTestUsers creates n test users and returns their IDs.
// Registers t.Cleanup to hard-delete users (matching stream-chat-go's randomUser pattern).
func createTestUsers(t *testing.T, client *Stream, n int) []string {
t.Helper()
ctx := context.Background()
users := make(map[string]UserRequest, n)
ids := make([]string, n)
for i := 0; i < n; i++ {
id := "test-user-" + uuid.New().String()
ids[i] = id
users[id] = UserRequest{
ID: id,
Name: PtrTo("Test User " + id),
Role: PtrTo("user"),
}
}
_, err := client.UpdateUsers(ctx, &UpdateUsersRequest{Users: users})
require.NoError(t, err, "Failed to create test users")
t.Cleanup(func() {
deleteUsersWithRetry(client, ids)
})
return ids
}
// createTestChannel creates a messaging channel and registers cleanup to hard-delete it.
func createTestChannel(t *testing.T, client *Stream, creatorID string) (*Channels, string) {
t.Helper()
ctx := context.Background()
channelID := "test-ch-" + randomString(12)
ch := client.Chat().Channel("messaging", channelID)
_, err := ch.GetOrCreate(ctx, &GetOrCreateChannelRequest{
Data: &ChannelInput{
CreatedByID: PtrTo(creatorID),
},
})
require.NoError(t, err, "Failed to create test channel")
t.Cleanup(func() {
_, _ = ch.Delete(context.Background(), &DeleteChannelRequest{
HardDelete: PtrTo(true),
})
})
return ch, channelID
}
// createTestChannelWithMembers creates a messaging channel with members and registers cleanup to hard-delete it.
func createTestChannelWithMembers(t *testing.T, client *Stream, creatorID string, memberIDs []string) (*Channels, string) {
t.Helper()
ctx := context.Background()
channelID := "test-ch-" + randomString(12)
ch := client.Chat().Channel("messaging", channelID)
members := make([]ChannelMemberRequest, len(memberIDs))
for i, id := range memberIDs {
members[i] = ChannelMemberRequest{UserID: id}
}
_, err := ch.GetOrCreate(ctx, &GetOrCreateChannelRequest{
Data: &ChannelInput{
CreatedByID: PtrTo(creatorID),
Members: members,
},
})
require.NoError(t, err, "Failed to create test channel with members")
t.Cleanup(func() {
_, _ = ch.Delete(context.Background(), &DeleteChannelRequest{
HardDelete: PtrTo(true),
})
})
return ch, channelID
}
// sendTestMessage sends a message to a channel and returns the message ID.
func sendTestMessage(t *testing.T, ch *Channels, userID string, text string) string {
t.Helper()
ctx := context.Background()
resp, err := ch.SendMessage(ctx, &SendMessageRequest{
Message: MessageRequest{
Text: PtrTo(text),
UserID: PtrTo(userID),
},
})
require.NoError(t, err, "Failed to send test message")
require.NotEmpty(t, resp.Data.Message.ID, "Message ID should not be empty")
return resp.Data.Message.ID
}