Skip to content

Commit 173e1c3

Browse files
committed
fix: address code review findings for shared hub
1. Fix listen command: now streams via Listen RPC instead of blocking after initial sync 2. Add input validation on Publish: type, ID, origin, content size limit (1MB) 3. Warn on --share publish failure instead of silent suppression 4. Constant-time token comparison via crypto/subtle + O(1) map lookup 5. Wire Raft cluster to Server with SetCluster/Shutdown 6. Reject duplicate project registration in store 7. Disconnect slow fanout listeners instead of silently dropping 8. File locking on sync state to prevent concurrent race 9. Fail fast on auth errors in failover client Signed-off-by: Murat Parlakisik <parlakisik@gmail.com>
1 parent df7dac6 commit 173e1c3

16 files changed

Lines changed: 319 additions & 55 deletions

File tree

internal/cli/add/cmd/root/run.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/ActiveMemory/ctx/internal/hub"
2626
"github.com/ActiveMemory/ctx/internal/trace"
2727
writeAdd "github.com/ActiveMemory/ctx/internal/write/add"
28+
writeConnect "github.com/ActiveMemory/ctx/internal/write/connect"
2829
)
2930

3031
// Run executes the add command logic.
@@ -87,9 +88,11 @@ func Run(cmd *cobra.Command, args []string, flags entity.AddConfig) error {
8788
Content: content,
8889
Origin: filepath.Base(state.Dir()),
8990
}
90-
_ = corePub.Run(
91+
if pubErr := corePub.Run(
9192
cmd, []hub.PublishEntry{pubEntry},
92-
)
93+
); pubErr != nil {
94+
writeConnect.PublishFailed(cmd, pubErr)
95+
}
9396
}
9497

9598
if fType == cfgEntry.Task && coreEntry.NeedsSpec(content) {

internal/cli/connect/core/listen/listen.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@ import (
1919
writeConnect "github.com/ActiveMemory/ctx/internal/write/connect"
2020
)
2121

22-
// Run streams entries from the hub in real-time.
23-
//
24-
// Connects to the hub, starts a Sync to get recent entries,
25-
// then blocks waiting for new entries. Writes each entry to
26-
// .context/shared/ as it arrives. Stops on Ctrl-C.
22+
// Run streams entries from the hub in real-time via the
23+
// Listen RPC. Writes each entry to .context/shared/ as
24+
// it arrives. Stops on Ctrl-C.
2725
//
2826
// Parameters:
2927
// - cmd: cobra command for output
28+
// - args: unused (cobra signature)
3029
//
3130
// Returns:
3231
// - error: non-nil if config, connection, or write fails
@@ -51,21 +50,23 @@ func Run(cmd *cobra.Command, _ []string) error {
5150

5251
writeConnect.Listening(cmd)
5352

54-
entries, syncErr := client.Sync(
53+
listenErr := client.Listen(
5554
ctx, cfg.Types, 0,
55+
func(msg hub.EntryMsg) error {
56+
writeErr := render.WriteEntries(
57+
[]hub.EntryMsg{msg},
58+
)
59+
if writeErr != nil {
60+
return writeErr
61+
}
62+
writeConnect.EntryReceived(cmd, msg.Type)
63+
return nil
64+
},
5665
)
57-
if syncErr != nil {
58-
return syncErr
59-
}
60-
if len(entries) > 0 {
61-
if writeErr := render.WriteEntries(
62-
entries,
63-
); writeErr != nil {
64-
return writeErr
65-
}
66-
writeConnect.Synced(cmd, len(entries))
67-
}
6866

69-
<-ctx.Done()
67+
// Context cancellation (Ctrl-C) is expected.
68+
if listenErr != nil && ctx.Err() == nil {
69+
return listenErr
70+
}
7071
return nil
7172
}

internal/cli/connect/core/sync/state.go

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,58 @@ import (
1919
// stateFile is the sync state persistence filename.
2020
const stateFile = ".sync-state.json"
2121

22+
// lockFile is the lock file to prevent concurrent syncs.
23+
const lockFile = ".sync.lock"
24+
2225
// loadState reads sync state from .context/shared/.
23-
func loadState() (state, error) {
26+
// Acquires a lock file to prevent concurrent access.
27+
func loadState() (state, func(), error) {
2428
var s state
25-
path := filepath.Join(
26-
rc.ContextDir(), "shared", stateFile,
27-
)
29+
dir := filepath.Join(rc.ContextDir(), "shared")
30+
lockPath := filepath.Join(dir, lockFile)
31+
32+
if mkErr := io.SafeMkdirAll(
33+
dir, fs.PermKeyDir,
34+
); mkErr != nil {
35+
return s, nil, mkErr
36+
}
37+
38+
// Acquire lock: fail if another sync is running.
39+
if _, statErr := os.Stat(lockPath); statErr == nil {
40+
return s, nil, os.ErrExist
41+
}
42+
if writeErr := io.SafeWriteFile(
43+
lockPath, []byte("lock"), fs.PermFile,
44+
); writeErr != nil {
45+
return s, nil, writeErr
46+
}
47+
48+
release := func() { _ = os.Remove(lockPath) }
49+
50+
path := filepath.Join(dir, stateFile)
2851
data, readErr := io.SafeReadUserFile(path)
2952
if os.IsNotExist(readErr) {
30-
return s, nil
53+
return s, release, nil
3154
}
3255
if readErr != nil {
33-
return s, readErr
56+
release()
57+
return s, nil, readErr
3458
}
3559
if len(data) == 0 {
36-
return s, nil
60+
return s, release, nil
61+
}
62+
if unmarshalErr := json.Unmarshal(
63+
data, &s,
64+
); unmarshalErr != nil {
65+
release()
66+
return s, nil, unmarshalErr
3767
}
38-
return s, json.Unmarshal(data, &s)
68+
return s, release, nil
3969
}
4070

4171
// saveState writes sync state to .context/shared/.
4272
func saveState(s state) error {
4373
dir := filepath.Join(rc.ContextDir(), "shared")
44-
if mkErr := io.SafeMkdirAll(
45-
dir, fs.PermKeyDir,
46-
); mkErr != nil {
47-
return mkErr
48-
}
4974
data, marshalErr := json.MarshalIndent(
5075
s, "", " ",
5176
)

internal/cli/connect/core/sync/sync.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ func Run(cmd *cobra.Command) error {
3333
return loadErr
3434
}
3535

36-
syncState, stateErr := loadState()
36+
syncState, releaseLock, stateErr := loadState()
3737
if stateErr != nil {
3838
return stateErr
3939
}
40+
defer releaseLock()
4041

4142
client, dialErr := hub.NewClient(
4243
cfg.HubAddr, cfg.Token,

internal/cli/serve/core/shared/shared.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,21 @@ func Run(
7676
return tokenErr
7777
}
7878

79+
srv := hub.NewServer(store, adminToken)
80+
7981
// Start Raft cluster if peers are configured.
8082
if len(peers) > 0 {
8183
bindAddr := fmt.Sprintf(":%d", port+1)
82-
_, clusterErr := hub.NewCluster(
84+
cluster, clusterErr := hub.NewCluster(
8385
fmt.Sprintf(":%d", port),
8486
bindAddr, dataDir, peers,
8587
)
8688
if clusterErr != nil {
8789
return clusterErr
8890
}
91+
srv.SetCluster(cluster)
8992
}
9093

91-
srv := hub.NewServer(store, adminToken)
92-
9394
addr := fmt.Sprintf(":%d", port)
9495
lis, lisErr := net.Listen("tcp", addr)
9596
if lisErr != nil {

internal/err/hub/hub.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,29 @@ func InternalErr(cause error) error {
3939
//
4040
// Returns:
4141
// - error: "action must be 'add' or 'remove', got <action>"
42+
//
43+
// DuplicateProject returns an error when a project is
44+
// already registered.
45+
//
46+
// Parameters:
47+
// - name: the duplicate project name
48+
//
49+
// Returns:
50+
// - error: "project already registered: <name>"
51+
func DuplicateProject(name string) error {
52+
return fmt.Errorf(
53+
"project already registered: %q", name,
54+
)
55+
}
56+
57+
// InvalidPeerAction returns an error for an unrecognized
58+
// peer action.
59+
//
60+
// Parameters:
61+
// - action: the unrecognized action string
62+
//
63+
// Returns:
64+
// - error: formatted error with the invalid action
4265
func InvalidPeerAction(action string) error {
4366
return fmt.Errorf(
4467
"action must be 'add' or 'remove', got %q",

internal/hub/client.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,57 @@ func (c *Client) Sync(
139139
return entries, nil
140140
}
141141

142+
// Listen opens a server-streaming Listen RPC and calls
143+
// the handler for each entry received. Blocks until the
144+
// context is cancelled or the stream ends.
145+
//
146+
// Parameters:
147+
// - ctx: context for cancellation
148+
// - types: entry types to receive (empty = all)
149+
// - sinceSequence: start from this sequence
150+
// - handler: called for each received entry
151+
//
152+
// Returns:
153+
// - error: non-nil if stream setup or recv fails
154+
func (c *Client) Listen(
155+
ctx context.Context,
156+
types []string,
157+
sinceSequence uint64,
158+
handler func(EntryMsg) error,
159+
) error {
160+
stream, streamErr := c.conn.NewStream(
161+
c.authedCtx(ctx),
162+
&grpc.StreamDesc{ServerStreams: true},
163+
"/ctx.hub.v1.CtxHub/Listen",
164+
)
165+
if streamErr != nil {
166+
return streamErr
167+
}
168+
169+
if sendErr := stream.SendMsg(&ListenRequest{
170+
Types: types,
171+
SinceSequence: sinceSequence,
172+
}); sendErr != nil {
173+
return sendErr
174+
}
175+
if closeErr := stream.CloseSend(); closeErr != nil {
176+
return closeErr
177+
}
178+
179+
for {
180+
msg := &EntryMsg{}
181+
if recvErr := stream.RecvMsg(msg); recvErr != nil {
182+
if isEOF(recvErr) {
183+
return nil
184+
}
185+
return recvErr
186+
}
187+
if handleErr := handler(*msg); handleErr != nil {
188+
return handleErr
189+
}
190+
}
191+
}
192+
142193
// Status calls the Status RPC.
143194
//
144195
// Parameters:

internal/hub/entry_validate.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// / ctx: https://ctx.ist
2+
// ,'`./ do you remember?
3+
// `.,'\
4+
// \ Copyright 2026-present Context contributors.
5+
// SPDX-License-Identifier: Apache-2.0
6+
7+
package hub
8+
9+
import (
10+
"google.golang.org/grpc/codes"
11+
"google.golang.org/grpc/status"
12+
)
13+
14+
// maxContentLen is the maximum entry content size (1MB).
15+
const maxContentLen = 1 << 20
16+
17+
// allowedTypes is the set of valid entry types.
18+
var allowedTypes = map[string]bool{
19+
"decision": true,
20+
"learning": true,
21+
"convention": true,
22+
"task": true,
23+
}
24+
25+
// validateEntry checks a PublishEntry for required fields
26+
// and enforces size limits.
27+
func validateEntry(pe PublishEntry) error {
28+
if pe.ID == "" {
29+
return status.Error(
30+
codes.InvalidArgument, "entry ID required",
31+
)
32+
}
33+
if !allowedTypes[pe.Type] {
34+
return status.Errorf(
35+
codes.InvalidArgument,
36+
"invalid entry type %q", pe.Type,
37+
)
38+
}
39+
if pe.Origin == "" {
40+
return status.Error(
41+
codes.InvalidArgument,
42+
"entry origin required",
43+
)
44+
}
45+
if len(pe.Content) > maxContentLen {
46+
return status.Error(
47+
codes.InvalidArgument,
48+
"entry content exceeds 1MB limit",
49+
)
50+
}
51+
return nil
52+
}

internal/hub/errcheck.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// / ctx: https://ctx.ist
2+
// ,'`./ do you remember?
3+
// `.,'\
4+
// \ Copyright 2026-present Context contributors.
5+
// SPDX-License-Identifier: Apache-2.0
6+
7+
package hub
8+
9+
import (
10+
"google.golang.org/grpc/codes"
11+
"google.golang.org/grpc/status"
12+
)
13+
14+
// isAuthErr reports whether err is an authentication or
15+
// authorization failure.
16+
func isAuthErr(err error) bool {
17+
s, ok := status.FromError(err)
18+
if !ok {
19+
return false
20+
}
21+
c := s.Code()
22+
return c == codes.Unauthenticated ||
23+
c == codes.PermissionDenied
24+
}

internal/hub/failover.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
)
1515

1616
// NewFailoverClient creates a client that tries peers in
17-
// order until one succeeds. The first address is the
18-
// primary; others are fallbacks.
17+
// order until one succeeds. Fails fast on auth errors
18+
// since the token is the same for all peers.
1919
//
2020
// Parameters:
2121
// - peers: ordered list of hub addresses
@@ -43,7 +43,6 @@ func NewFailoverClient(
4343
continue
4444
}
4545

46-
// Verify connectivity with a Status call.
4746
resp := &StatusResponse{}
4847
callErr := conn.Invoke(
4948
addBearerMD(
@@ -55,6 +54,13 @@ func NewFailoverClient(
5554
)
5655
if callErr != nil {
5756
_ = conn.Close()
57+
58+
// Fail fast on auth errors — same token
59+
// won't work on other peers either.
60+
if isAuthErr(callErr) {
61+
return nil, callErr
62+
}
63+
5864
lastErr = callErr
5965
continue
6066
}

0 commit comments

Comments
 (0)