Skip to content

Commit 42f6e04

Browse files
committed
feat: add master-follower replication and client failover
Followers replicate from the master via sequence-based gRPC sync with automatic retry. Failover client tries peers in order and verifies connectivity with a Status call before returning. Signed-off-by: Murat Parlakisik <parlakisik@gmail.com>
1 parent 41613a2 commit 42f6e04

3 files changed

Lines changed: 218 additions & 0 deletions

File tree

internal/hub/failover.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// / ctx: https://ctx.ist
2+
// ,'`./ do you remember?
3+
// `.,'\
4+
// \ Copyright 2026-present Context contributors.
5+
// SPDX-License-Identifier: Apache-2.0
6+
7+
package hub
8+
9+
import (
10+
"context"
11+
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/credentials/insecure"
14+
)
15+
16+
// NewFailoverClient creates a client that tries peers in
17+
// order until one succeeds. The first address is the
18+
// primary; others are fallbacks.
19+
//
20+
// Parameters:
21+
// - peers: ordered list of hub addresses
22+
// - bearerToken: token for authenticated RPCs
23+
//
24+
// Returns:
25+
// - *Client: connected client to the first reachable peer
26+
// - error: non-nil if no peer is reachable
27+
func NewFailoverClient(
28+
peers []string, bearerToken string,
29+
) (*Client, error) {
30+
var lastErr error
31+
for _, addr := range peers {
32+
conn, dialErr := grpc.NewClient(
33+
addr,
34+
grpc.WithTransportCredentials(
35+
insecure.NewCredentials(),
36+
),
37+
grpc.WithDefaultCallOptions(
38+
grpc.CallContentSubtype(codecName),
39+
),
40+
)
41+
if dialErr != nil {
42+
lastErr = dialErr
43+
continue
44+
}
45+
46+
// Verify connectivity with a Status call.
47+
resp := &StatusResponse{}
48+
callErr := conn.Invoke(
49+
addBearerMD(
50+
context.Background(), bearerToken,
51+
),
52+
"/ctx.hub.v1.CtxHub/Status",
53+
&struct{}{},
54+
resp,
55+
)
56+
if callErr != nil {
57+
_ = conn.Close()
58+
lastErr = callErr
59+
continue
60+
}
61+
62+
return &Client{
63+
conn: conn, token: bearerToken,
64+
}, nil
65+
}
66+
return nil, lastErr
67+
}

internal/hub/replicate.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// / ctx: https://ctx.ist
2+
// ,'`./ do you remember?
3+
// `.,'\
4+
// \ Copyright 2026-present Context contributors.
5+
// SPDX-License-Identifier: Apache-2.0
6+
7+
package hub
8+
9+
import (
10+
"context"
11+
"time"
12+
)
13+
14+
// replicateInterval is how often a follower retries
15+
// connecting to the master for replication.
16+
const replicateInterval = 5 * time.Second
17+
18+
// StartReplication connects to the master and streams
19+
// entries into the local store. Blocks until the context
20+
// is cancelled. Retries on failure.
21+
//
22+
// Parameters:
23+
// - ctx: context for cancellation
24+
// - masterAddr: gRPC address of the master hub
25+
// - store: local store to write replicated entries
26+
// - clientToken: bearer token for auth
27+
func StartReplication(
28+
ctx context.Context,
29+
masterAddr string,
30+
store *Store,
31+
clientToken string,
32+
) {
33+
for {
34+
select {
35+
case <-ctx.Done():
36+
return
37+
default:
38+
}
39+
40+
replicateOnce(
41+
ctx, masterAddr, store, clientToken,
42+
)
43+
44+
select {
45+
case <-ctx.Done():
46+
return
47+
case <-time.After(replicateInterval):
48+
}
49+
}
50+
}

internal/hub/sync_helper.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// / ctx: https://ctx.ist
2+
// ,'`./ do you remember?
3+
// `.,'\
4+
// \ Copyright 2026-present Context contributors.
5+
// SPDX-License-Identifier: Apache-2.0
6+
7+
package hub
8+
9+
import (
10+
"context"
11+
"time"
12+
13+
"google.golang.org/grpc"
14+
"google.golang.org/grpc/credentials/insecure"
15+
"google.golang.org/grpc/metadata"
16+
)
17+
18+
// replicateOnce connects to the master, syncs all entries
19+
// since the local store's last sequence, and appends them.
20+
func replicateOnce(
21+
ctx context.Context,
22+
masterAddr string,
23+
store *Store,
24+
clientToken string,
25+
) {
26+
conn, dialErr := grpc.NewClient(
27+
masterAddr,
28+
grpc.WithTransportCredentials(
29+
insecure.NewCredentials(),
30+
),
31+
grpc.WithDefaultCallOptions(
32+
grpc.CallContentSubtype(codecName),
33+
),
34+
)
35+
if dialErr != nil {
36+
return
37+
}
38+
defer func() { _ = conn.Close() }()
39+
40+
_, lastSeq := lastSequence(store)
41+
authed := addBearerMD(ctx, clientToken)
42+
43+
stream, streamErr := conn.NewStream(
44+
authed,
45+
&grpc.StreamDesc{ServerStreams: true},
46+
"/ctx.hub.v1.CtxHub/Sync",
47+
)
48+
if streamErr != nil {
49+
return
50+
}
51+
52+
if sendErr := stream.SendMsg(&SyncRequest{
53+
SinceSequence: lastSeq,
54+
}); sendErr != nil {
55+
return
56+
}
57+
if closeErr := stream.CloseSend(); closeErr != nil {
58+
return
59+
}
60+
61+
for {
62+
msg := &EntryMsg{}
63+
if recvErr := stream.RecvMsg(msg); recvErr != nil {
64+
return
65+
}
66+
entry := Entry{
67+
ID: msg.ID,
68+
Type: msg.Type,
69+
Content: msg.Content,
70+
Origin: msg.Origin,
71+
Author: msg.Author,
72+
Timestamp: time.Unix(msg.Timestamp, 0),
73+
Sequence: msg.Sequence,
74+
}
75+
_, _ = store.Append([]Entry{entry})
76+
}
77+
}
78+
79+
// lastSequence returns the highest sequence in the store.
80+
func lastSequence(store *Store) (bool, uint64) {
81+
all := store.Query(nil, 0)
82+
if len(all) == 0 {
83+
return false, 0
84+
}
85+
return true, all[len(all)-1].Sequence
86+
}
87+
88+
// addBearerMD adds a bearer token to outgoing gRPC
89+
// metadata.
90+
func addBearerMD(
91+
ctx context.Context, tok string,
92+
) context.Context {
93+
if tok == "" {
94+
return ctx
95+
}
96+
return metadata.NewOutgoingContext(
97+
ctx, metadata.Pairs(
98+
"authorization", bearerPrefix+tok,
99+
),
100+
)
101+
}

0 commit comments

Comments
 (0)