Skip to content

Commit db75e85

Browse files
committed
addressing review comments
1 parent bfa793b commit db75e85

4 files changed

Lines changed: 69 additions & 146 deletions

File tree

server/internal/orchestrator/swarm/orchestrator.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -541,12 +541,10 @@ func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstan
541541
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
542542
// RAG service user role (per-host, not replicated by Spock)
543543
ragUserRole := &RAGServiceUserRole{
544-
ServiceInstanceID: spec.ServiceInstanceID,
545-
ServiceID: spec.ServiceSpec.ServiceID,
546-
DatabaseID: spec.DatabaseID,
547-
DatabaseName: spec.DatabaseName,
548-
HostID: spec.HostID,
549-
NodeName: spec.NodeName,
544+
ServiceID: spec.ServiceSpec.ServiceID,
545+
DatabaseID: spec.DatabaseID,
546+
DatabaseName: spec.DatabaseName,
547+
NodeName: spec.NodeName,
550548
}
551549
if spec.Credentials != nil {
552550
ragUserRole.Username = spec.Credentials.Username

server/internal/orchestrator/swarm/rag_service_user_role.go

Lines changed: 48 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,12 @@ package swarm
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"time"
86

9-
"github.com/jackc/pgx/v5"
107
"github.com/rs/zerolog"
118
"github.com/samber/do"
129

13-
"github.com/pgEdge/control-plane/server/internal/certificates"
1410
"github.com/pgEdge/control-plane/server/internal/database"
15-
"github.com/pgEdge/control-plane/server/internal/patroni"
1611
"github.com/pgEdge/control-plane/server/internal/postgres"
1712
"github.com/pgEdge/control-plane/server/internal/resource"
1813
"github.com/pgEdge/control-plane/server/internal/utils"
@@ -22,24 +17,24 @@ var _ resource.Resource = (*RAGServiceUserRole)(nil)
2217

2318
const ResourceTypeRAGServiceUserRole resource.Type = "swarm.rag_service_user_role"
2419

25-
func RAGServiceUserRoleIdentifier(serviceInstanceID string) resource.Identifier {
20+
func RAGServiceUserRoleIdentifier(serviceID string) resource.Identifier {
2621
return resource.Identifier{
27-
ID: serviceInstanceID,
22+
ID: serviceID,
2823
Type: ResourceTypeRAGServiceUserRole,
2924
}
3025
}
3126

27+
// RAGServiceUserRole manages the Postgres role for a RAG service.
3228
// The role is created on the primary of the co-located Postgres instance
33-
// (same HostID) and granted the pgedge_application_read_only built-in role.
29+
// and granted the pgedge_application_read_only built-in role.
30+
// Spock replicates the role to every other node because we connect via r.DatabaseName.
3431
type RAGServiceUserRole struct {
35-
ServiceInstanceID string `json:"service_instance_id"`
36-
ServiceID string `json:"service_id"`
37-
DatabaseID string `json:"database_id"`
38-
DatabaseName string `json:"database_name"`
39-
HostID string `json:"host_id"` // Used to find the co-located Postgres instance
40-
NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing
41-
Username string `json:"username"`
42-
Password string `json:"password"` // Generated on Create, persisted in state
32+
ServiceID string `json:"service_id"`
33+
DatabaseID string `json:"database_id"`
34+
DatabaseName string `json:"database_name"`
35+
NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing
36+
Username string `json:"username"`
37+
Password string `json:"password"` // Generated on Create, persisted in state
4338
}
4439

4540
func (r *RAGServiceUserRole) ResourceVersion() string {
@@ -55,7 +50,7 @@ func (r *RAGServiceUserRole) DiffIgnore() []string {
5550
}
5651

5752
func (r *RAGServiceUserRole) Identifier() resource.Identifier {
58-
return RAGServiceUserRoleIdentifier(r.ServiceInstanceID)
53+
return RAGServiceUserRoleIdentifier(r.ServiceID)
5954
}
6055

6156
func (r *RAGServiceUserRole) Executor() resource.Executor {
@@ -80,28 +75,26 @@ func (r *RAGServiceUserRole) Refresh(ctx context.Context, rc *resource.Context)
8075
return err
8176
}
8277
logger = logger.With().
83-
Str("service_instance_id", r.ServiceInstanceID).
78+
Str("service_id", r.ServiceID).
8479
Str("database_id", r.DatabaseID).
8580
Logger()
8681

87-
conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName)
82+
primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName)
8883
if err != nil {
89-
logger.Warn().Err(err).Msg("could not connect to verify RAG role existence, assuming it exists")
90-
return nil
84+
return fmt.Errorf("failed to get primary instance: %w", err)
85+
}
86+
conn, err := primary.Connection(ctx, rc, r.DatabaseName)
87+
if err != nil {
88+
return fmt.Errorf("failed to connect to database: %w", err)
9189
}
9290
defer conn.Close(ctx)
9391

94-
var exists bool
95-
err = conn.QueryRow(ctx,
96-
"SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = $1)",
97-
r.Username,
98-
).Scan(&exists)
92+
needsCreate, err := postgres.UserRoleNeedsCreate(r.Username).Scalar(ctx, conn)
9993
if err != nil {
100-
// On query failure, assume it exists
101-
logger.Warn().Err(err).Msg("pg_roles query failed, assuming RAG role exists")
102-
return nil
94+
logger.Warn().Err(err).Msg("pg_roles query failed")
95+
return fmt.Errorf("pg_roles query failed: %w", err)
10396
}
104-
if !exists {
97+
if needsCreate {
10598
return resource.ErrNotFound
10699
}
107100
return nil
@@ -113,30 +106,36 @@ func (r *RAGServiceUserRole) Create(ctx context.Context, rc *resource.Context) e
113106
return err
114107
}
115108
logger = logger.With().
116-
Str("service_instance_id", r.ServiceInstanceID).
109+
Str("service_id", r.ServiceID).
117110
Str("database_id", r.DatabaseID).
118111
Logger()
119112
logger.Info().Msg("creating RAG service user role")
120113

121-
r.Username = database.GenerateServiceUsername(r.ServiceInstanceID)
122-
password, err := utils.RandomString(32)
123-
if err != nil {
124-
return fmt.Errorf("failed to generate password: %w", err)
114+
r.Username = database.GenerateServiceUsername(r.ServiceID)
115+
if r.Password == "" {
116+
password, err := utils.RandomString(32)
117+
if err != nil {
118+
return fmt.Errorf("failed to generate password: %w", err)
119+
}
120+
r.Password = password
125121
}
126-
r.Password = password
127122

128-
if err := r.createRole(ctx, rc, logger); err != nil {
123+
if err := r.createRole(ctx, rc); err != nil {
129124
return fmt.Errorf("failed to create RAG service user role: %w", err)
130125
}
131126

132127
logger.Info().Str("username", r.Username).Msg("RAG service user role created successfully")
133128
return nil
134129
}
135130

136-
func (r *RAGServiceUserRole) createRole(ctx context.Context, rc *resource.Context, logger zerolog.Logger) error {
137-
conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName)
131+
func (r *RAGServiceUserRole) createRole(ctx context.Context, rc *resource.Context) error {
132+
primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName)
138133
if err != nil {
139-
return err
134+
return fmt.Errorf("failed to get primary instance: %w", err)
135+
}
136+
conn, err := primary.Connection(ctx, rc, r.DatabaseName)
137+
if err != nil {
138+
return fmt.Errorf("failed to connect to database: %w", err)
140139
}
141140
defer conn.Close(ctx)
142141

@@ -169,16 +168,22 @@ func (r *RAGServiceUserRole) Delete(ctx context.Context, rc *resource.Context) e
169168
return err
170169
}
171170
logger = logger.With().
172-
Str("service_instance_id", r.ServiceInstanceID).
171+
Str("service_id", r.ServiceID).
173172
Str("database_id", r.DatabaseID).
174173
Str("username", r.Username).
175174
Logger()
176175
logger.Info().Msg("deleting RAG service user from database")
177176

178-
conn, err := r.connectToColocatedPrimary(ctx, rc, logger, "postgres")
177+
primary, err := database.GetPrimaryInstance(ctx, rc, r.NodeName)
178+
if err != nil {
179+
// During deletion the database may already be gone or unreachable.
180+
logger.Warn().Err(err).Msg("failed to get primary instance, skipping RAG user deletion")
181+
return nil
182+
}
183+
conn, err := primary.Connection(ctx, rc, r.DatabaseName)
179184
if err != nil {
180185
// During deletion the database may already be gone or unreachable.
181-
logger.Warn().Err(err).Msg("failed to connect to co-located primary, skipping RAG user deletion")
186+
logger.Warn().Err(err).Msg("failed to connect to database, skipping RAG user deletion")
182187
return nil
183188
}
184189
defer conn.Close(ctx)
@@ -192,94 +197,3 @@ func (r *RAGServiceUserRole) Delete(ctx context.Context, rc *resource.Context) e
192197
logger.Info().Msg("RAG service user deleted successfully")
193198
return nil
194199
}
195-
196-
// connectToColocatedPrimary finds the primary Postgres instance on the same
197-
// host as this RAG service instance and returns an authenticated connection.
198-
// Filtering by HostID ensures the role is created on the correct node, since
199-
// CREATE ROLE is not replicated by Spock in a multi-active setup.
200-
func (r *RAGServiceUserRole) connectToColocatedPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger, dbName string) (*pgx.Conn, error) {
201-
dbSvc, err := do.Invoke[*database.Service](rc.Injector)
202-
if err != nil {
203-
return nil, err
204-
}
205-
206-
primaryInstanceID, err := r.resolveColocatedPrimary(ctx, dbSvc, logger)
207-
if err != nil {
208-
return nil, err
209-
}
210-
211-
connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID)
212-
if err != nil {
213-
return nil, fmt.Errorf("failed to get instance connection info: %w", err)
214-
}
215-
216-
certSvc, err := do.Invoke[*certificates.Service](rc.Injector)
217-
if err != nil {
218-
return nil, fmt.Errorf("failed to get certificate service: %w", err)
219-
}
220-
221-
tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge")
222-
if err != nil {
223-
return nil, fmt.Errorf("failed to create TLS config: %w", err)
224-
}
225-
226-
conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{
227-
DSN: connInfo.AdminDSN(dbName),
228-
TLS: tlsConfig,
229-
})
230-
if err != nil {
231-
return nil, fmt.Errorf("failed to connect to database: %w", err)
232-
}
233-
234-
return conn, nil
235-
}
236-
237-
// resolveColocatedPrimary fetches the database, selects co-located instances,
238-
// and returns the primary instance ID via Patroni.
239-
func (r *RAGServiceUserRole) resolveColocatedPrimary(ctx context.Context, dbSvc *database.Service, logger zerolog.Logger) (string, error) {
240-
db, err := dbSvc.GetDatabase(ctx, r.DatabaseID)
241-
if err != nil {
242-
if errors.Is(err, database.ErrDatabaseNotFound) {
243-
return "", fmt.Errorf("database not found: %w", err)
244-
}
245-
return "", fmt.Errorf("failed to get database: %w", err)
246-
}
247-
if len(db.Instances) == 0 {
248-
return "", fmt.Errorf("database has no instances")
249-
}
250-
candidates := r.colocatedInstances(db.Instances, logger)
251-
return r.findPrimaryAmong(ctx, dbSvc, candidates, logger), nil
252-
}
253-
254-
// colocatedInstances returns the subset of instances that share r.HostID.
255-
// Falls back to all instances if none are co-located.
256-
func (r *RAGServiceUserRole) colocatedInstances(all []*database.Instance, logger zerolog.Logger) []*database.Instance {
257-
candidates := make([]*database.Instance, 0, len(all))
258-
for _, inst := range all {
259-
if inst.HostID == r.HostID {
260-
candidates = append(candidates, inst)
261-
}
262-
}
263-
if len(candidates) == 0 {
264-
logger.Warn().Str("host_id", r.HostID).Msg("no co-located Postgres instances found, falling back to all instances")
265-
return all
266-
}
267-
return candidates
268-
}
269-
270-
// findPrimaryAmong queries Patroni for each candidate and returns the primary
271-
// instance ID. Falls back to the first candidate if none can be determined.
272-
func (r *RAGServiceUserRole) findPrimaryAmong(ctx context.Context, dbSvc *database.Service, candidates []*database.Instance, logger zerolog.Logger) string {
273-
for _, inst := range candidates {
274-
connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID)
275-
if err != nil {
276-
continue
277-
}
278-
primaryID, err := database.GetPrimaryInstanceID(ctx, patroni.NewClient(connInfo.PatroniURL(), nil), 10*time.Second)
279-
if err == nil && primaryID != "" {
280-
return primaryID
281-
}
282-
}
283-
logger.Warn().Msg("could not determine primary instance, using first co-located instance")
284-
return candidates[0].InstanceID
285-
}

server/internal/orchestrator/swarm/rag_service_user_role_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ func TestRAGServiceUserRole_ResourceVersion(t *testing.T) {
1616
}
1717

1818
func TestRAGServiceUserRole_Identifier(t *testing.T) {
19-
r := &RAGServiceUserRole{ServiceInstanceID: "db1-rag-host1"}
19+
r := &RAGServiceUserRole{ServiceID: "rag"}
2020
id := r.Identifier()
21-
if id.ID != "db1-rag-host1" {
22-
t.Errorf("Identifier().ID = %q, want %q", id.ID, "db1-rag-host1")
21+
if id.ID != "rag" {
22+
t.Errorf("Identifier().ID = %q, want %q", id.ID, "rag")
2323
}
2424
if id.Type != ResourceTypeRAGServiceUserRole {
2525
t.Errorf("Identifier().Type = %q, want %q", id.Type, ResourceTypeRAGServiceUserRole)
@@ -65,9 +65,9 @@ func TestRAGServiceUserRole_RefreshEmptyCredentials(t *testing.T) {
6565
for _, tt := range tests {
6666
t.Run(tt.name, func(t *testing.T) {
6767
r := &RAGServiceUserRole{
68-
ServiceInstanceID: "inst1",
69-
Username: tt.username,
70-
Password: tt.password,
68+
ServiceID: "rag",
69+
Username: tt.username,
70+
Password: tt.password,
7171
}
7272
// Refresh with nil rc — the empty-credential guard fires before any
7373
// injection call, so no injector is needed.

server/internal/postgres/roles.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@ import (
1111
var defaultSchemas = []string{"public", "spock", "pg_catalog", "information_schema"}
1212
var builtinRoles = []string{"pgedge_application", "pgedge_application_read_only", "pgedge_superuser"}
1313

14+
// UserRoleNeedsCreate returns a query that evaluates to true when the named
15+
// role does not yet exist in pg_catalog.pg_roles.
16+
func UserRoleNeedsCreate(name string) Query[bool] {
17+
return Query[bool]{
18+
SQL: "SELECT NOT EXISTS (SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = @name);",
19+
Args: pgx.NamedArgs{
20+
"name": name,
21+
},
22+
}
23+
}
24+
1425
type UserRoleOptions struct {
1526
Name string
1627
Password string

0 commit comments

Comments
 (0)