Skip to content

Commit 924237c

Browse files
committed
feat: stable random ports
Adds a service to allocate random ports for instances from a configurable range. These ports are then persisted with the instance spec. The next time the instance spec is evaluated, we copy the ports from the persisted copy of the spec. The end result is that when we assign a random port to an instance, it will retain that port assignment until it is deleted or the user sets the port to `null` or a non-zero value in the database/node spec. Since this mechanism does not rely on Docker or the OS to assign ports, it will also work for the SystemD orchestrator. PLAT-417 PLAT-236
1 parent 4cfbac6 commit 924237c

15 files changed

Lines changed: 980 additions & 6 deletions

File tree

docs/installation/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,4 @@ This is the current list of components that can be configured in the `logging.co
6868
- `migration_runner`
6969
- `scheduler_service`
7070
- `workflows_worker`
71+
- `ports_service`

server/cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/pgEdge/control-plane/server/internal/monitor"
2424
"github.com/pgEdge/control-plane/server/internal/orchestrator"
2525
"github.com/pgEdge/control-plane/server/internal/orchestrator/swarm"
26+
"github.com/pgEdge/control-plane/server/internal/ports"
2627
"github.com/pgEdge/control-plane/server/internal/resource"
2728
"github.com/pgEdge/control-plane/server/internal/scheduler"
2829
"github.com/pgEdge/control-plane/server/internal/task"
@@ -76,6 +77,7 @@ func newRootCmd(i *do.Injector) *cobra.Command {
7677
logging.Provide(i)
7778
migrate.Provide(i)
7879
monitor.Provide(i)
80+
ports.Provide(i)
7981
resource.Provide(i)
8082
scheduler.Provide(i)
8183
workflows.Provide(i)

server/internal/config/config.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,32 @@ const (
194194
EtcdModeClient EtcdMode = "client"
195195
)
196196

197+
type RandomPorts struct {
198+
Min int `koanf:"min" json:"min,omitempty"`
199+
Max int `koanf:"max" json:"max,omitempty"`
200+
}
201+
202+
func (r RandomPorts) validate() []error {
203+
var errs []error
204+
if r.Min < 1 {
205+
errs = append(errs, errors.New("min: cannot be less than 1"))
206+
}
207+
if r.Max > 65535 {
208+
errs = append(errs, errors.New("max: cannot be greater than 65535"))
209+
}
210+
if r.Max <= r.Min {
211+
errs = append(errs, errors.New("max: must be greater than min"))
212+
}
213+
return errs
214+
}
215+
216+
// We're intentionally using a range that's well below the ephemeral port range
217+
// to reduce the risk of interference from the OS.
218+
var defaultRandomPorts = RandomPorts{
219+
Min: 5432,
220+
Max: 15432,
221+
}
222+
197223
type Config struct {
198224
TenantID string `koanf:"tenant_id" json:"tenant_id,omitempty"`
199225
HostID string `koanf:"host_id" json:"host_id,omitempty"`
@@ -216,6 +242,7 @@ type Config struct {
216242
DockerSwarm DockerSwarm `koanf:"docker_swarm" json:"docker_swarm,omitzero"`
217243
DatabaseOwnerUID int `koanf:"database_owner_uid" json:"database_owner_uid,omitempty"`
218244
ProfilingEnabled bool `koanf:"profiling_enabled" json:"profiling_enabled,omitempty"`
245+
RandomPorts RandomPorts `koanf:"random_ports" json:"random_ports,omitzero"`
219246
}
220247

221248
// ClientAddress is a convenience function to return the first client address.
@@ -310,6 +337,9 @@ func (c Config) Validate() error {
310337
for _, err := range c.Logging.validate() {
311338
errs = append(errs, fmt.Errorf("logging.%w", err))
312339
}
340+
for _, err := range c.RandomPorts.validate() {
341+
errs = append(errs, fmt.Errorf("random_ports.%w", err))
342+
}
313343
if c.Orchestrator != OrchestratorSwarm {
314344
errs = append(errs, fmt.Errorf("orchestrator: unsupported orchestrator %q", c.Orchestrator))
315345
}
@@ -361,6 +391,7 @@ func DefaultConfig() (Config, error) {
361391
EtcdClient: etcdClientDefault,
362392
DockerSwarm: defaultDockerSwarm,
363393
DatabaseOwnerUID: 26,
394+
RandomPorts: defaultRandomPorts,
364395
}, nil
365396
}
366397

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package database
2+
3+
import (
4+
clientv3 "go.etcd.io/etcd/client/v3"
5+
6+
"github.com/pgEdge/control-plane/server/internal/storage"
7+
)
8+
9+
type StoredInstanceSpec struct {
10+
storage.StoredValue
11+
Spec *InstanceSpec `json:"spec"`
12+
}
13+
14+
type InstanceSpecStore struct {
15+
client *clientv3.Client
16+
root string
17+
}
18+
19+
func NewInstanceSpecStore(client *clientv3.Client, root string) *InstanceSpecStore {
20+
return &InstanceSpecStore{
21+
client: client,
22+
root: root,
23+
}
24+
}
25+
26+
func (s *InstanceSpecStore) Prefix() string {
27+
return storage.Prefix("/", s.root, "instance_specs")
28+
}
29+
30+
func (s *InstanceSpecStore) DatabasePrefix(databaseID string) string {
31+
return storage.Prefix(s.Prefix(), databaseID)
32+
}
33+
34+
func (s *InstanceSpecStore) Key(databaseID, instanceID string) string {
35+
return storage.Key(s.DatabasePrefix(databaseID), instanceID)
36+
}
37+
38+
func (s *InstanceSpecStore) GetByKey(databaseID, instanceID string) storage.GetOp[*StoredInstanceSpec] {
39+
key := s.Key(databaseID, instanceID)
40+
return storage.NewGetOp[*StoredInstanceSpec](s.client, key)
41+
}
42+
43+
func (s *InstanceSpecStore) GetByDatabaseID(databaseID string) storage.GetMultipleOp[*StoredInstanceSpec] {
44+
prefix := s.DatabasePrefix(databaseID)
45+
return storage.NewGetPrefixOp[*StoredInstanceSpec](s.client, prefix)
46+
}
47+
48+
func (s *InstanceSpecStore) GetAll() storage.GetMultipleOp[*StoredInstanceSpec] {
49+
prefix := s.Prefix()
50+
return storage.NewGetPrefixOp[*StoredInstanceSpec](s.client, prefix)
51+
}
52+
53+
func (s *InstanceSpecStore) Put(item *StoredInstanceSpec) storage.PutOp[*StoredInstanceSpec] {
54+
key := s.Key(item.Spec.DatabaseID, item.Spec.InstanceID)
55+
return storage.NewPutOp(s.client, key, item)
56+
}
57+
58+
func (s *InstanceSpecStore) DeleteByKey(databaseID, instanceID string) storage.DeleteOp {
59+
key := s.Key(databaseID, instanceID)
60+
return storage.NewDeleteKeyOp(s.client, key)
61+
}
62+
63+
func (s *InstanceSpecStore) DeleteByDatabaseID(databaseID string) storage.DeleteOp {
64+
prefix := s.DatabasePrefix(databaseID)
65+
return storage.NewDeletePrefixOp(s.client, prefix)
66+
}

server/internal/database/provide.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/pgEdge/control-plane/server/internal/config"
88
"github.com/pgEdge/control-plane/server/internal/host"
9+
"github.com/pgEdge/control-plane/server/internal/ports"
910
)
1011

1112
func Provide(i *do.Injector) {
@@ -15,6 +16,10 @@ func Provide(i *do.Injector) {
1516

1617
func provideService(i *do.Injector) {
1718
do.Provide(i, func(i *do.Injector) (*Service, error) {
19+
cfg, err := do.Invoke[config.Config](i)
20+
if err != nil {
21+
return nil, err
22+
}
1823
orch, err := do.Invoke[Orchestrator](i)
1924
if err != nil {
2025
return nil, err
@@ -27,7 +32,11 @@ func provideService(i *do.Injector) {
2732
if err != nil {
2833
return nil, err
2934
}
30-
return NewService(orch, store, hostSvc), nil
35+
portsSvc, err := do.Invoke[*ports.Service](i)
36+
if err != nil {
37+
return nil, err
38+
}
39+
return NewService(cfg, orch, store, hostSvc, portsSvc), nil
3140
})
3241
}
3342

server/internal/database/service.go

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@ import (
88

99
"github.com/google/uuid"
1010

11+
"github.com/pgEdge/control-plane/server/internal/config"
1112
"github.com/pgEdge/control-plane/server/internal/host"
13+
"github.com/pgEdge/control-plane/server/internal/ports"
1214
"github.com/pgEdge/control-plane/server/internal/storage"
15+
"github.com/pgEdge/control-plane/server/internal/utils"
1316
)
1417

1518
var (
@@ -24,16 +27,26 @@ var (
2427
)
2528

2629
type Service struct {
30+
cfg config.Config
2731
orchestrator Orchestrator
2832
store *Store
2933
hostSvc *host.Service
34+
portsSvc *ports.Service
3035
}
3136

32-
func NewService(orchestrator Orchestrator, store *Store, hostSvc *host.Service) *Service {
37+
func NewService(
38+
cfg config.Config,
39+
orchestrator Orchestrator,
40+
store *Store,
41+
hostSvc *host.Service,
42+
portsSvc *ports.Service,
43+
) *Service {
3344
return &Service{
45+
cfg: cfg,
3446
orchestrator: orchestrator,
3547
store: store,
3648
hostSvc: hostSvc,
49+
portsSvc: portsSvc,
3750
}
3851
}
3952

@@ -119,6 +132,18 @@ func (s *Service) UpdateDatabase(ctx context.Context, state DatabaseState, spec
119132
}
120133

121134
func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error {
135+
specs, err := s.store.InstanceSpec.
136+
GetByDatabaseID(databaseID).
137+
Exec(ctx)
138+
if err != nil {
139+
return fmt.Errorf("failed to get instance specs: %w", err)
140+
}
141+
for _, spec := range specs {
142+
if err := s.releaseInstancePorts(ctx, spec.Spec); err != nil {
143+
return err
144+
}
145+
}
146+
122147
var ops []storage.TxnOperation
123148

124149
spec, err := s.store.Spec.GetByKey(databaseID).Exec(ctx)
@@ -141,6 +166,7 @@ func (s *Service) DeleteDatabase(ctx context.Context, databaseID string) error {
141166

142167
ops = append(ops,
143168
s.store.Instance.DeleteByDatabaseID(databaseID),
169+
s.store.InstanceSpec.DeleteByDatabaseID(databaseID),
144170
s.store.InstanceStatus.DeleteByDatabaseID(databaseID),
145171
)
146172

@@ -279,7 +305,7 @@ func (s *Service) DeleteInstance(ctx context.Context, databaseID, instanceID str
279305
return fmt.Errorf("failed to delete stored instance status: %w", err)
280306
}
281307

282-
return nil
308+
return s.DeleteInstanceSpec(ctx, databaseID, instanceID)
283309
}
284310

285311
func (s *Service) UpdateInstanceStatus(
@@ -490,6 +516,112 @@ func (s *Service) PopulateSpecDefaults(ctx context.Context, spec *Spec) error {
490516
return nil
491517
}
492518

519+
func (s *Service) ReconcileInstanceSpec(ctx context.Context, spec *InstanceSpec) (*InstanceSpec, error) {
520+
if s.cfg.HostID != spec.HostID {
521+
return nil, fmt.Errorf("this instance belongs to another host - this host='%s', instance host='%s'", s.cfg.HostID, spec.HostID)
522+
}
523+
current, err := s.store.InstanceSpec.
524+
GetByKey(spec.DatabaseID, spec.InstanceID).
525+
Exec(ctx)
526+
switch {
527+
case errors.Is(err, storage.ErrNotFound):
528+
// Nothing to copy from, so we'll just generate any newly-needed random
529+
// ports.
530+
case err != nil:
531+
return nil, fmt.Errorf("failed to get current spec for instance '%s': %w", spec.InstanceID, err)
532+
default:
533+
spec.CopySettingsFrom(current.Spec)
534+
}
535+
536+
if spec.Port != nil && *spec.Port == 0 {
537+
port, err := s.portsSvc.AllocatePort(ctx, spec.HostID)
538+
if err != nil {
539+
return nil, fmt.Errorf("failed to allocate port: %w", err)
540+
}
541+
spec.Port = utils.PointerTo(port)
542+
}
543+
if spec.PatroniPort != nil && *spec.PatroniPort == 0 {
544+
port, err := s.portsSvc.AllocatePort(ctx, spec.HostID)
545+
if err != nil {
546+
return nil, fmt.Errorf("failed to allocate patroni port: %w", err)
547+
}
548+
spec.PatroniPort = utils.PointerTo(port)
549+
}
550+
551+
err = s.store.InstanceSpec.
552+
Put(&StoredInstanceSpec{Spec: spec}).
553+
Exec(ctx)
554+
if err != nil {
555+
return nil, fmt.Errorf("failed to persist updated instance spec: %w", err)
556+
}
557+
558+
if current != nil {
559+
if portShouldBeReleased(current.Spec.Port, spec.Port) {
560+
err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, current.Spec.Port)
561+
if err != nil {
562+
return nil, fmt.Errorf("failed to release previous port: %w", err)
563+
}
564+
}
565+
if portShouldBeReleased(current.Spec.PatroniPort, spec.PatroniPort) {
566+
err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, current.Spec.PatroniPort)
567+
if err != nil {
568+
return nil, fmt.Errorf("failed to release previous patroni port: %w", err)
569+
}
570+
}
571+
}
572+
573+
return spec, nil
574+
}
575+
576+
func (s *Service) DeleteInstanceSpec(ctx context.Context, databaseID, instanceID string) error {
577+
spec, err := s.store.InstanceSpec.
578+
GetByKey(databaseID, instanceID).
579+
Exec(ctx)
580+
if errors.Is(err, storage.ErrNotFound) {
581+
// Spec has already been deleted
582+
return nil
583+
} else if err != nil {
584+
return fmt.Errorf("failed to check if instance spec exists: %w", err)
585+
}
586+
587+
if err := s.releaseInstancePorts(ctx, spec.Spec); err != nil {
588+
return err
589+
}
590+
591+
_, err = s.store.InstanceSpec.
592+
DeleteByKey(databaseID, instanceID).
593+
Exec(ctx)
594+
if err != nil {
595+
return fmt.Errorf("failed to delete instance spec: %w", err)
596+
}
597+
598+
return nil
599+
}
600+
601+
func (s *Service) releaseInstancePorts(ctx context.Context, spec *InstanceSpec) error {
602+
err := s.portsSvc.ReleasePortIfDefined(ctx, spec.HostID, spec.Port, spec.PatroniPort)
603+
if err != nil {
604+
return fmt.Errorf("failed to release ports for instance '%s': %w", spec.InstanceID, err)
605+
}
606+
607+
return nil
608+
}
609+
610+
func portShouldBeReleased(current *int, new *int) bool {
611+
if current == nil || *current == 0 {
612+
// we didn't previously have an assigned port
613+
return false
614+
}
615+
if new == nil || *current != *new {
616+
// we had a previously assigned port and now the port is either nil or
617+
// different
618+
return true
619+
}
620+
621+
// the current and new ports are equal, so it should not be released.
622+
return false
623+
}
624+
493625
func ValidateChangedSpec(current, updated *Spec) error {
494626
var errs []error
495627

server/internal/database/spec.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,25 @@ type InstanceSpec struct {
508508
InPlaceRestore bool `json:"in_place_restore,omitempty"`
509509
}
510510

511+
func (s *InstanceSpec) CopySettingsFrom(current *InstanceSpec) {
512+
s.Port = reconcilePort(current.Port, s.Port)
513+
s.PatroniPort = reconcilePort(current.PatroniPort, s.PatroniPort)
514+
}
515+
516+
func reconcilePort(current, new *int) *int {
517+
if new == nil || *new != 0 {
518+
// no action needed if the new port is unexposed or explicitly set
519+
return new
520+
}
521+
if current != nil && *current != 0 {
522+
// we've already assigned a stable random port here
523+
return utils.PointerTo(*current) // create new pointer
524+
}
525+
526+
// return 0 to signal that we need to assign a new random port
527+
return utils.PointerTo(0)
528+
}
529+
511530
type InstanceSpecChange struct {
512531
Previous *InstanceSpec
513532
Current *InstanceSpec

0 commit comments

Comments
 (0)