Skip to content

Commit bfa793b

Browse files
committed
feat: provision per-host Postgres user for RAG service instances
1 parent 236bb0a commit bfa793b

5 files changed

Lines changed: 548 additions & 31 deletions

File tree

server/internal/orchestrator/swarm/orchestrator.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -402,11 +402,17 @@ func (o *Orchestrator) GenerateInstanceRestoreResources(spec *database.InstanceS
402402
}
403403

404404
func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
405-
// Only MCP service instance generation is currently implemented.
406-
if spec.ServiceSpec.ServiceType != "mcp" {
405+
switch spec.ServiceSpec.ServiceType {
406+
case "mcp":
407+
return o.generateMCPInstanceResources(spec)
408+
case "rag":
409+
return o.generateRAGInstanceResources(spec)
410+
default:
407411
return nil, fmt.Errorf("service type %q instance generation is not yet supported", spec.ServiceSpec.ServiceType)
408412
}
413+
}
409414

415+
func (o *Orchestrator) generateMCPInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
410416
// Get service image based on service type and version
411417
serviceImage, err := o.serviceVersions.GetServiceImage(spec.ServiceSpec.ServiceType, spec.ServiceSpec.Version)
412418
if err != nil {
@@ -529,34 +535,35 @@ func (o *Orchestrator) GenerateServiceInstanceResources(spec *database.ServiceIn
529535
serviceInstanceSpec,
530536
serviceInstance,
531537
}
538+
return o.buildServiceInstanceResources(spec, orchestratorResources)
539+
}
532540

533-
// Append per-node ServiceUserRole resources for each additional database node.
534-
// The canonical resources (above) cover the first node; nodes [1:] each get
535-
// their own RO and RW role that sources credentials from the canonical.
536-
if len(spec.DatabaseNodes) > 1 {
537-
for _, nodeInst := range spec.DatabaseNodes[1:] {
538-
orchestratorResources = append(orchestratorResources,
539-
&ServiceUserRole{
540-
ServiceID: spec.ServiceSpec.ServiceID,
541-
DatabaseID: spec.DatabaseID,
542-
DatabaseName: spec.DatabaseName,
543-
NodeName: nodeInst.NodeName,
544-
Mode: ServiceUserRoleRO,
545-
CredentialSource: &canonicalROID,
546-
},
547-
&ServiceUserRole{
548-
ServiceID: spec.ServiceSpec.ServiceID,
549-
DatabaseID: spec.DatabaseID,
550-
DatabaseName: spec.DatabaseName,
551-
NodeName: nodeInst.NodeName,
552-
Mode: ServiceUserRoleRW,
553-
CredentialSource: &canonicalRWID,
554-
},
555-
)
556-
}
541+
func (o *Orchestrator) generateRAGInstanceResources(spec *database.ServiceInstanceSpec) (*database.ServiceInstanceResources, error) {
542+
// RAG service user role (per-host, not replicated by Spock)
543+
ragUserRole := &RAGServiceUserRole{
544+
ServiceInstanceID: spec.ServiceInstanceID,
545+
ServiceID: spec.ServiceSpec.ServiceID,
546+
DatabaseID: spec.DatabaseID,
547+
DatabaseName: spec.DatabaseName,
548+
HostID: spec.HostID,
549+
NodeName: spec.NodeName,
550+
}
551+
if spec.Credentials != nil {
552+
ragUserRole.Username = spec.Credentials.Username
553+
ragUserRole.Password = spec.Credentials.Password
554+
}
555+
556+
// Resource chain: RAGServiceUserRole (container deployment in future PRs)
557+
orchestratorResources := []resource.Resource{
558+
ragUserRole,
557559
}
558560

559-
// Convert to resource data
561+
return o.buildServiceInstanceResources(spec, orchestratorResources)
562+
}
563+
564+
// buildServiceInstanceResources converts a slice of resources into a
565+
// ServiceInstanceResources, shared by all service type generators.
566+
func (o *Orchestrator) buildServiceInstanceResources(spec *database.ServiceInstanceSpec, orchestratorResources []resource.Resource) (*database.ServiceInstanceResources, error) {
560567
data := make([]*resource.ResourceData, len(orchestratorResources))
561568
for i, res := range orchestratorResources {
562569
d, err := resource.ToResourceData(res)
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
package swarm
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/jackc/pgx/v5"
10+
"github.com/rs/zerolog"
11+
"github.com/samber/do"
12+
13+
"github.com/pgEdge/control-plane/server/internal/certificates"
14+
"github.com/pgEdge/control-plane/server/internal/database"
15+
"github.com/pgEdge/control-plane/server/internal/patroni"
16+
"github.com/pgEdge/control-plane/server/internal/postgres"
17+
"github.com/pgEdge/control-plane/server/internal/resource"
18+
"github.com/pgEdge/control-plane/server/internal/utils"
19+
)
20+
21+
var _ resource.Resource = (*RAGServiceUserRole)(nil)
22+
23+
const ResourceTypeRAGServiceUserRole resource.Type = "swarm.rag_service_user_role"
24+
25+
func RAGServiceUserRoleIdentifier(serviceInstanceID string) resource.Identifier {
26+
return resource.Identifier{
27+
ID: serviceInstanceID,
28+
Type: ResourceTypeRAGServiceUserRole,
29+
}
30+
}
31+
32+
// The role is created on the primary of the co-located Postgres instance
33+
// (same HostID) and granted the pgedge_application_read_only built-in role.
34+
type RAGServiceUserRole struct {
35+
ServiceInstanceID string `json:"service_instance_id"`
36+
ServiceID string `json:"service_id"`
37+
DatabaseID string `json:"database_id"`
38+
DatabaseName string `json:"database_name"`
39+
HostID string `json:"host_id"` // Used to find the co-located Postgres instance
40+
NodeName string `json:"node_name"` // Database node name for PrimaryExecutor routing
41+
Username string `json:"username"`
42+
Password string `json:"password"` // Generated on Create, persisted in state
43+
}
44+
45+
func (r *RAGServiceUserRole) ResourceVersion() string {
46+
return "1"
47+
}
48+
49+
func (r *RAGServiceUserRole) DiffIgnore() []string {
50+
return []string{
51+
"/node_name",
52+
"/username",
53+
"/password",
54+
}
55+
}
56+
57+
func (r *RAGServiceUserRole) Identifier() resource.Identifier {
58+
return RAGServiceUserRoleIdentifier(r.ServiceInstanceID)
59+
}
60+
61+
func (r *RAGServiceUserRole) Executor() resource.Executor {
62+
return resource.PrimaryExecutor(r.NodeName)
63+
}
64+
65+
func (r *RAGServiceUserRole) Dependencies() []resource.Identifier {
66+
return nil
67+
}
68+
69+
func (r *RAGServiceUserRole) TypeDependencies() []resource.Type {
70+
return nil
71+
}
72+
73+
func (r *RAGServiceUserRole) Refresh(ctx context.Context, rc *resource.Context) error {
74+
if r.Username == "" || r.Password == "" {
75+
return resource.ErrNotFound
76+
}
77+
78+
logger, err := do.Invoke[zerolog.Logger](rc.Injector)
79+
if err != nil {
80+
return err
81+
}
82+
logger = logger.With().
83+
Str("service_instance_id", r.ServiceInstanceID).
84+
Str("database_id", r.DatabaseID).
85+
Logger()
86+
87+
conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName)
88+
if err != nil {
89+
logger.Warn().Err(err).Msg("could not connect to verify RAG role existence, assuming it exists")
90+
return nil
91+
}
92+
defer conn.Close(ctx)
93+
94+
var exists bool
95+
err = conn.QueryRow(ctx,
96+
"SELECT EXISTS(SELECT 1 FROM pg_catalog.pg_roles WHERE rolname = $1)",
97+
r.Username,
98+
).Scan(&exists)
99+
if err != nil {
100+
// On query failure, assume it exists
101+
logger.Warn().Err(err).Msg("pg_roles query failed, assuming RAG role exists")
102+
return nil
103+
}
104+
if !exists {
105+
return resource.ErrNotFound
106+
}
107+
return nil
108+
}
109+
110+
func (r *RAGServiceUserRole) Create(ctx context.Context, rc *resource.Context) error {
111+
logger, err := do.Invoke[zerolog.Logger](rc.Injector)
112+
if err != nil {
113+
return err
114+
}
115+
logger = logger.With().
116+
Str("service_instance_id", r.ServiceInstanceID).
117+
Str("database_id", r.DatabaseID).
118+
Logger()
119+
logger.Info().Msg("creating RAG service user role")
120+
121+
r.Username = database.GenerateServiceUsername(r.ServiceInstanceID)
122+
password, err := utils.RandomString(32)
123+
if err != nil {
124+
return fmt.Errorf("failed to generate password: %w", err)
125+
}
126+
r.Password = password
127+
128+
if err := r.createRole(ctx, rc, logger); err != nil {
129+
return fmt.Errorf("failed to create RAG service user role: %w", err)
130+
}
131+
132+
logger.Info().Str("username", r.Username).Msg("RAG service user role created successfully")
133+
return nil
134+
}
135+
136+
func (r *RAGServiceUserRole) createRole(ctx context.Context, rc *resource.Context, logger zerolog.Logger) error {
137+
conn, err := r.connectToColocatedPrimary(ctx, rc, logger, r.DatabaseName)
138+
if err != nil {
139+
return err
140+
}
141+
defer conn.Close(ctx)
142+
143+
statements, err := postgres.CreateUserRole(postgres.UserRoleOptions{
144+
Name: r.Username,
145+
Password: r.Password,
146+
DBName: r.DatabaseName,
147+
DBOwner: false,
148+
Attributes: []string{"LOGIN"},
149+
Roles: []string{"pgedge_application_read_only"},
150+
})
151+
if err != nil {
152+
return fmt.Errorf("failed to generate create user role statements: %w", err)
153+
}
154+
155+
if err := statements.Exec(ctx, conn); err != nil {
156+
return fmt.Errorf("failed to create RAG service user: %w", err)
157+
}
158+
159+
return nil
160+
}
161+
162+
func (r *RAGServiceUserRole) Update(ctx context.Context, rc *resource.Context) error {
163+
return nil
164+
}
165+
166+
func (r *RAGServiceUserRole) Delete(ctx context.Context, rc *resource.Context) error {
167+
logger, err := do.Invoke[zerolog.Logger](rc.Injector)
168+
if err != nil {
169+
return err
170+
}
171+
logger = logger.With().
172+
Str("service_instance_id", r.ServiceInstanceID).
173+
Str("database_id", r.DatabaseID).
174+
Str("username", r.Username).
175+
Logger()
176+
logger.Info().Msg("deleting RAG service user from database")
177+
178+
conn, err := r.connectToColocatedPrimary(ctx, rc, logger, "postgres")
179+
if err != nil {
180+
// During deletion the database may already be gone or unreachable.
181+
logger.Warn().Err(err).Msg("failed to connect to co-located primary, skipping RAG user deletion")
182+
return nil
183+
}
184+
defer conn.Close(ctx)
185+
186+
_, err = conn.Exec(ctx, fmt.Sprintf("DROP ROLE IF EXISTS %s", sanitizeIdentifier(r.Username)))
187+
if err != nil {
188+
logger.Warn().Err(err).Msg("failed to drop RAG user role, continuing anyway")
189+
return nil
190+
}
191+
192+
logger.Info().Msg("RAG service user deleted successfully")
193+
return nil
194+
}
195+
196+
// connectToColocatedPrimary finds the primary Postgres instance on the same
197+
// host as this RAG service instance and returns an authenticated connection.
198+
// Filtering by HostID ensures the role is created on the correct node, since
199+
// CREATE ROLE is not replicated by Spock in a multi-active setup.
200+
func (r *RAGServiceUserRole) connectToColocatedPrimary(ctx context.Context, rc *resource.Context, logger zerolog.Logger, dbName string) (*pgx.Conn, error) {
201+
dbSvc, err := do.Invoke[*database.Service](rc.Injector)
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
primaryInstanceID, err := r.resolveColocatedPrimary(ctx, dbSvc, logger)
207+
if err != nil {
208+
return nil, err
209+
}
210+
211+
connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, primaryInstanceID)
212+
if err != nil {
213+
return nil, fmt.Errorf("failed to get instance connection info: %w", err)
214+
}
215+
216+
certSvc, err := do.Invoke[*certificates.Service](rc.Injector)
217+
if err != nil {
218+
return nil, fmt.Errorf("failed to get certificate service: %w", err)
219+
}
220+
221+
tlsConfig, err := certSvc.PostgresUserTLS(ctx, primaryInstanceID, connInfo.InstanceHostname, "pgedge")
222+
if err != nil {
223+
return nil, fmt.Errorf("failed to create TLS config: %w", err)
224+
}
225+
226+
conn, err := database.ConnectToInstance(ctx, &database.ConnectionOptions{
227+
DSN: connInfo.AdminDSN(dbName),
228+
TLS: tlsConfig,
229+
})
230+
if err != nil {
231+
return nil, fmt.Errorf("failed to connect to database: %w", err)
232+
}
233+
234+
return conn, nil
235+
}
236+
237+
// resolveColocatedPrimary fetches the database, selects co-located instances,
238+
// and returns the primary instance ID via Patroni.
239+
func (r *RAGServiceUserRole) resolveColocatedPrimary(ctx context.Context, dbSvc *database.Service, logger zerolog.Logger) (string, error) {
240+
db, err := dbSvc.GetDatabase(ctx, r.DatabaseID)
241+
if err != nil {
242+
if errors.Is(err, database.ErrDatabaseNotFound) {
243+
return "", fmt.Errorf("database not found: %w", err)
244+
}
245+
return "", fmt.Errorf("failed to get database: %w", err)
246+
}
247+
if len(db.Instances) == 0 {
248+
return "", fmt.Errorf("database has no instances")
249+
}
250+
candidates := r.colocatedInstances(db.Instances, logger)
251+
return r.findPrimaryAmong(ctx, dbSvc, candidates, logger), nil
252+
}
253+
254+
// colocatedInstances returns the subset of instances that share r.HostID.
255+
// Falls back to all instances if none are co-located.
256+
func (r *RAGServiceUserRole) colocatedInstances(all []*database.Instance, logger zerolog.Logger) []*database.Instance {
257+
candidates := make([]*database.Instance, 0, len(all))
258+
for _, inst := range all {
259+
if inst.HostID == r.HostID {
260+
candidates = append(candidates, inst)
261+
}
262+
}
263+
if len(candidates) == 0 {
264+
logger.Warn().Str("host_id", r.HostID).Msg("no co-located Postgres instances found, falling back to all instances")
265+
return all
266+
}
267+
return candidates
268+
}
269+
270+
// findPrimaryAmong queries Patroni for each candidate and returns the primary
271+
// instance ID. Falls back to the first candidate if none can be determined.
272+
func (r *RAGServiceUserRole) findPrimaryAmong(ctx context.Context, dbSvc *database.Service, candidates []*database.Instance, logger zerolog.Logger) string {
273+
for _, inst := range candidates {
274+
connInfo, err := dbSvc.GetInstanceConnectionInfo(ctx, r.DatabaseID, inst.InstanceID)
275+
if err != nil {
276+
continue
277+
}
278+
primaryID, err := database.GetPrimaryInstanceID(ctx, patroni.NewClient(connInfo.PatroniURL(), nil), 10*time.Second)
279+
if err == nil && primaryID != "" {
280+
return primaryID
281+
}
282+
}
283+
logger.Warn().Msg("could not determine primary instance, using first co-located instance")
284+
return candidates[0].InstanceID
285+
}

0 commit comments

Comments
 (0)