|
| 1 | +// / ctx: https://ctx.ist |
| 2 | +// ,'`./ do you remember? |
| 3 | +// `.,'\ |
| 4 | +// \ Copyright 2026-present Context contributors. |
| 5 | +// SPDX-License-Identifier: Apache-2.0 |
| 6 | + |
| 7 | +package hub |
| 8 | + |
| 9 | +import ( |
| 10 | + "context" |
| 11 | + "net" |
| 12 | + "testing" |
| 13 | + "time" |
| 14 | + |
| 15 | + "google.golang.org/grpc" |
| 16 | +) |
| 17 | + |
| 18 | +// TestIntegration_PublishAndSync spins up a hub, registers |
| 19 | +// two clients, publishes from one, and verifies the other |
| 20 | +// receives the entry via sync. |
| 21 | +func TestIntegration_PublishAndSync(t *testing.T) { |
| 22 | + srv, conn, adminTok := startTestServer(t) |
| 23 | + _ = srv |
| 24 | + |
| 25 | + // Register client A. |
| 26 | + regA := callRegister(t, conn, adminTok, "alpha") |
| 27 | + |
| 28 | + // Register client B. |
| 29 | + regB := callRegister(t, conn, adminTok, "beta") |
| 30 | + |
| 31 | + ctxA := authedCtx(regA.ClientToken) |
| 32 | + ctxB := authedCtx(regB.ClientToken) |
| 33 | + |
| 34 | + // Client A publishes an entry. |
| 35 | + pubResp := &PublishResponse{} |
| 36 | + pubErr := conn.Invoke(ctxA, |
| 37 | + "/ctx.hub.v1.CtxHub/Publish", |
| 38 | + &PublishRequest{ |
| 39 | + Entries: []PublishEntry{ |
| 40 | + { |
| 41 | + ID: "e1", |
| 42 | + Type: "decision", |
| 43 | + Content: "Use gRPC for hub", |
| 44 | + Origin: "alpha", |
| 45 | + Timestamp: time.Now().Unix(), |
| 46 | + }, |
| 47 | + }, |
| 48 | + }, |
| 49 | + pubResp, |
| 50 | + ) |
| 51 | + if pubErr != nil { |
| 52 | + t.Fatalf("Publish: %v", pubErr) |
| 53 | + } |
| 54 | + if len(pubResp.Sequences) != 1 { |
| 55 | + t.Fatalf("expected 1 sequence, got %d", |
| 56 | + len(pubResp.Sequences)) |
| 57 | + } |
| 58 | + |
| 59 | + // Client B syncs and should see the entry. |
| 60 | + stream, syncErr := conn.NewStream(ctxB, |
| 61 | + &grpc.StreamDesc{ServerStreams: true}, |
| 62 | + "/ctx.hub.v1.CtxHub/Sync", |
| 63 | + ) |
| 64 | + if syncErr != nil { |
| 65 | + t.Fatalf("Sync stream: %v", syncErr) |
| 66 | + } |
| 67 | + if sendErr := stream.SendMsg( |
| 68 | + &SyncRequest{SinceSequence: 0}, |
| 69 | + ); sendErr != nil { |
| 70 | + t.Fatalf("Sync send: %v", sendErr) |
| 71 | + } |
| 72 | + if closeErr := stream.CloseSend(); closeErr != nil { |
| 73 | + t.Fatalf("Sync close: %v", closeErr) |
| 74 | + } |
| 75 | + |
| 76 | + msg := &EntryMsg{} |
| 77 | + if recvErr := stream.RecvMsg(msg); recvErr != nil { |
| 78 | + t.Fatalf("Sync recv: %v", recvErr) |
| 79 | + } |
| 80 | + if msg.Content != "Use gRPC for hub" { |
| 81 | + t.Errorf("want 'Use gRPC for hub', got %q", |
| 82 | + msg.Content) |
| 83 | + } |
| 84 | + if msg.Origin != "alpha" { |
| 85 | + t.Errorf("want origin 'alpha', got %q", |
| 86 | + msg.Origin) |
| 87 | + } |
| 88 | +} |
| 89 | + |
| 90 | +// TestIntegration_IncrementalSync verifies that sync with |
| 91 | +// a non-zero since_sequence only returns new entries. |
| 92 | +func TestIntegration_IncrementalSync(t *testing.T) { |
| 93 | + srv, conn, adminTok := startTestServer(t) |
| 94 | + _ = srv |
| 95 | + |
| 96 | + reg := callRegister(t, conn, adminTok, "proj") |
| 97 | + ctx := authedCtx(reg.ClientToken) |
| 98 | + |
| 99 | + // Publish two entries. |
| 100 | + pubResp := &PublishResponse{} |
| 101 | + pubErr := conn.Invoke(ctx, |
| 102 | + "/ctx.hub.v1.CtxHub/Publish", |
| 103 | + &PublishRequest{ |
| 104 | + Entries: []PublishEntry{ |
| 105 | + { |
| 106 | + ID: "a", Type: "learning", |
| 107 | + Content: "First", |
| 108 | + Origin: "proj", |
| 109 | + Timestamp: time.Now().Unix(), |
| 110 | + }, |
| 111 | + { |
| 112 | + ID: "b", Type: "learning", |
| 113 | + Content: "Second", |
| 114 | + Origin: "proj", |
| 115 | + Timestamp: time.Now().Unix(), |
| 116 | + }, |
| 117 | + }, |
| 118 | + }, |
| 119 | + pubResp, |
| 120 | + ) |
| 121 | + if pubErr != nil { |
| 122 | + t.Fatal(pubErr) |
| 123 | + } |
| 124 | + |
| 125 | + // Sync since sequence 1 — should only get "Second". |
| 126 | + stream, syncErr := conn.NewStream(ctx, |
| 127 | + &grpc.StreamDesc{ServerStreams: true}, |
| 128 | + "/ctx.hub.v1.CtxHub/Sync", |
| 129 | + ) |
| 130 | + if syncErr != nil { |
| 131 | + t.Fatal(syncErr) |
| 132 | + } |
| 133 | + if sendErr := stream.SendMsg( |
| 134 | + &SyncRequest{SinceSequence: 1}, |
| 135 | + ); sendErr != nil { |
| 136 | + t.Fatal(sendErr) |
| 137 | + } |
| 138 | + _ = stream.CloseSend() |
| 139 | + |
| 140 | + msg := &EntryMsg{} |
| 141 | + if recvErr := stream.RecvMsg(msg); recvErr != nil { |
| 142 | + t.Fatal(recvErr) |
| 143 | + } |
| 144 | + if msg.Content != "Second" { |
| 145 | + t.Errorf("want 'Second', got %q", msg.Content) |
| 146 | + } |
| 147 | +} |
| 148 | + |
| 149 | +// TestIntegration_TypeFilter verifies that sync with type |
| 150 | +// filters only returns matching entries. |
| 151 | +func TestIntegration_TypeFilter(t *testing.T) { |
| 152 | + srv, conn, adminTok := startTestServer(t) |
| 153 | + _ = srv |
| 154 | + |
| 155 | + reg := callRegister(t, conn, adminTok, "proj") |
| 156 | + ctx := authedCtx(reg.ClientToken) |
| 157 | + |
| 158 | + pubResp := &PublishResponse{} |
| 159 | + _ = conn.Invoke(ctx, |
| 160 | + "/ctx.hub.v1.CtxHub/Publish", |
| 161 | + &PublishRequest{ |
| 162 | + Entries: []PublishEntry{ |
| 163 | + { |
| 164 | + ID: "d1", Type: "decision", |
| 165 | + Content: "Use Go", |
| 166 | + Origin: "proj", |
| 167 | + Timestamp: time.Now().Unix(), |
| 168 | + }, |
| 169 | + { |
| 170 | + ID: "l1", Type: "learning", |
| 171 | + Content: "Avoid mocks", |
| 172 | + Origin: "proj", |
| 173 | + Timestamp: time.Now().Unix(), |
| 174 | + }, |
| 175 | + }, |
| 176 | + }, |
| 177 | + pubResp, |
| 178 | + ) |
| 179 | + |
| 180 | + // Sync only learnings. |
| 181 | + stream, _ := conn.NewStream(ctx, |
| 182 | + &grpc.StreamDesc{ServerStreams: true}, |
| 183 | + "/ctx.hub.v1.CtxHub/Sync", |
| 184 | + ) |
| 185 | + _ = stream.SendMsg(&SyncRequest{ |
| 186 | + Types: []string{"learning"}, |
| 187 | + SinceSequence: 0, |
| 188 | + }) |
| 189 | + _ = stream.CloseSend() |
| 190 | + |
| 191 | + msg := &EntryMsg{} |
| 192 | + recvErr := stream.RecvMsg(msg) |
| 193 | + if recvErr != nil { |
| 194 | + t.Fatal(recvErr) |
| 195 | + } |
| 196 | + if msg.Type != "learning" { |
| 197 | + t.Errorf("want type 'learning', got %q", msg.Type) |
| 198 | + } |
| 199 | + if msg.Content != "Avoid mocks" { |
| 200 | + t.Errorf("want 'Avoid mocks', got %q", |
| 201 | + msg.Content) |
| 202 | + } |
| 203 | +} |
| 204 | + |
| 205 | +// TestIntegration_ClientLib verifies the Client library |
| 206 | +// works end-to-end with the server. |
| 207 | +func TestIntegration_ClientLib(t *testing.T) { |
| 208 | + _, _, adminTok := startTestServer(t) |
| 209 | + |
| 210 | + // Need a separate connection for the Client lib test |
| 211 | + // since startTestServer returns a raw conn. |
| 212 | + dir := t.TempDir() |
| 213 | + store, storeErr := NewStore(dir) |
| 214 | + if storeErr != nil { |
| 215 | + t.Fatal(storeErr) |
| 216 | + } |
| 217 | + srv := NewServer(store, adminTok) |
| 218 | + lis, lisErr := net.Listen("tcp", "127.0.0.1:0") |
| 219 | + if lisErr != nil { |
| 220 | + t.Fatal(lisErr) |
| 221 | + } |
| 222 | + go func() { _ = srv.Serve(lis) }() |
| 223 | + t.Cleanup(func() { srv.GracefulStop() }) |
| 224 | + |
| 225 | + addr := lis.Addr().String() |
| 226 | + |
| 227 | + // Register via Client lib. |
| 228 | + client, dialErr := NewClient(addr, "") |
| 229 | + if dialErr != nil { |
| 230 | + t.Fatal(dialErr) |
| 231 | + } |
| 232 | + defer func() { _ = client.Close() }() |
| 233 | + |
| 234 | + regResp, regErr := client.Register( |
| 235 | + context.Background(), adminTok, "test-proj", |
| 236 | + ) |
| 237 | + if regErr != nil { |
| 238 | + t.Fatalf("Register: %v", regErr) |
| 239 | + } |
| 240 | + |
| 241 | + // Reconnect with the client token. |
| 242 | + _ = client.Close() |
| 243 | + client2, dial2Err := NewClient( |
| 244 | + addr, regResp.ClientToken, |
| 245 | + ) |
| 246 | + if dial2Err != nil { |
| 247 | + t.Fatal(dial2Err) |
| 248 | + } |
| 249 | + defer func() { _ = client2.Close() }() |
| 250 | + |
| 251 | + // Publish. |
| 252 | + _, pubErr := client2.Publish( |
| 253 | + context.Background(), |
| 254 | + []PublishEntry{ |
| 255 | + { |
| 256 | + ID: "c1", Type: "convention", |
| 257 | + Content: "Use snake_case", |
| 258 | + Origin: "test-proj", |
| 259 | + Timestamp: time.Now().Unix(), |
| 260 | + }, |
| 261 | + }, |
| 262 | + ) |
| 263 | + if pubErr != nil { |
| 264 | + t.Fatalf("Publish: %v", pubErr) |
| 265 | + } |
| 266 | + |
| 267 | + // Sync. |
| 268 | + entries, syncErr := client2.Sync( |
| 269 | + context.Background(), nil, 0, |
| 270 | + ) |
| 271 | + if syncErr != nil { |
| 272 | + t.Fatalf("Sync: %v", syncErr) |
| 273 | + } |
| 274 | + if len(entries) != 1 { |
| 275 | + t.Fatalf("want 1 entry, got %d", len(entries)) |
| 276 | + } |
| 277 | + if entries[0].Content != "Use snake_case" { |
| 278 | + t.Errorf("want 'Use snake_case', got %q", |
| 279 | + entries[0].Content) |
| 280 | + } |
| 281 | + |
| 282 | + // Status. |
| 283 | + statusResp, statusErr := client2.Status( |
| 284 | + context.Background(), |
| 285 | + ) |
| 286 | + if statusErr != nil { |
| 287 | + t.Fatalf("Status: %v", statusErr) |
| 288 | + } |
| 289 | + if statusResp.TotalEntries != 1 { |
| 290 | + t.Errorf("want 1 total, got %d", |
| 291 | + statusResp.TotalEntries) |
| 292 | + } |
| 293 | +} |
| 294 | + |
| 295 | +// startTestServer is defined in server_test.go — reused |
| 296 | +// here for integration tests. The helper creates a |
| 297 | +// temporary store, generates an admin token, starts the |
| 298 | +// server on a random port, and returns a connected client. |
| 299 | + |
| 300 | +// authedCtx and callRegister are also in server_test.go. |
0 commit comments