Skip to content

Commit 3a14025

Browse files
feat(ergo): gzip compression and target destroy improvements
Add gzip compression for all Ergo message paths (Create, Update, TrackedProgress) to handle resources exceeding the 64KB buffer limit. CompressJSON/DecompressJSON helpers on json.RawMessage fields; resolve cache decompresses transparently. Target destroy behavior: - Application formae (with resources) only delete targets whose config has $ref dependencies on resources being destroyed. Plain targets (docker, us-east-1) survive because they exist independently. - Target-only formae always delete their targets (unchanged behavior). - Cascade source displayed as human-readable resource label instead of raw KSUID in CLI output and simulation tree. - Simulated target updates now show (cascade) marker matching resources.
1 parent 4aa077b commit 3a14025

17 files changed

Lines changed: 401 additions & 127 deletions

docs/docs.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,12 +690,14 @@ const docTemplate = `{
690690
"Array",
691691
"EntitySet",
692692
"Set",
693+
"Atomic",
693694
""
694695
],
695696
"x-enum-varnames": [
696697
"FieldUpdateMethodArray",
697698
"FieldUpdateMethodEntitySet",
698699
"FieldUpdateMethodSet",
700+
"FieldUpdateMethodAtomic",
699701
"FieldUpdateMethodNone"
700702
]
701703
},
@@ -1231,6 +1233,9 @@ const docTemplate = `{
12311233
"model.TargetUpdate": {
12321234
"type": "object",
12331235
"properties": {
1236+
"CascadeSource": {
1237+
"type": "string"
1238+
},
12341239
"Discoverable": {
12351240
"type": "boolean"
12361241
},
@@ -1241,6 +1246,9 @@ const docTemplate = `{
12411246
"ErrorMessage": {
12421247
"type": "string"
12431248
},
1249+
"IsCascade": {
1250+
"type": "boolean"
1251+
},
12441252
"ModifiedTs": {
12451253
"type": "string"
12461254
},

docs/swagger.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -684,12 +684,14 @@
684684
"Array",
685685
"EntitySet",
686686
"Set",
687+
"Atomic",
687688
""
688689
],
689690
"x-enum-varnames": [
690691
"FieldUpdateMethodArray",
691692
"FieldUpdateMethodEntitySet",
692693
"FieldUpdateMethodSet",
694+
"FieldUpdateMethodAtomic",
693695
"FieldUpdateMethodNone"
694696
]
695697
},
@@ -1225,6 +1227,9 @@
12251227
"model.TargetUpdate": {
12261228
"type": "object",
12271229
"properties": {
1230+
"CascadeSource": {
1231+
"type": "string"
1232+
},
12281233
"Discoverable": {
12291234
"type": "boolean"
12301235
},
@@ -1235,6 +1240,9 @@
12351240
"ErrorMessage": {
12361241
"type": "string"
12371242
},
1243+
"IsCascade": {
1244+
"type": "boolean"
1245+
},
12381246
"ModifiedTs": {
12391247
"type": "string"
12401248
},

docs/swagger.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,14 @@ definitions:
7272
- Array
7373
- EntitySet
7474
- Set
75+
- Atomic
7576
- ""
7677
type: string
7778
x-enum-varnames:
7879
- FieldUpdateMethodArray
7980
- FieldUpdateMethodEntitySet
8081
- FieldUpdateMethodSet
82+
- FieldUpdateMethodAtomic
8183
- FieldUpdateMethodNone
8284
model.Forma:
8385
properties:
@@ -434,13 +436,17 @@ definitions:
434436
type: object
435437
model.TargetUpdate:
436438
properties:
439+
CascadeSource:
440+
type: string
437441
Discoverable:
438442
type: boolean
439443
Duration:
440444
description: milliseconds
441445
type: integer
442446
ErrorMessage:
443447
type: string
448+
IsCascade:
449+
type: boolean
444450
ModifiedTs:
445451
type: string
446452
Operation:

internal/cli/renderer/renderer.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,15 @@ func formatSimulatedTargetUpdate(root *gtree.Node, tu apimodel.TargetUpdate) {
553553
line = display.Greyf("%s target %s", op, tu.TargetLabel)
554554
}
555555

556-
root.Add(line)
556+
if tu.IsCascade {
557+
line = line + display.Gold(" (cascade)")
558+
}
559+
560+
node := root.Add(line)
561+
562+
if tu.IsCascade && tu.CascadeSource != "" {
563+
node.Add(display.Grey("because it depends on ") + display.LightBlue(tu.CascadeSource))
564+
}
557565
}
558566

559567
// formatTargetUpdate formats a target update for status view

internal/metastructure/changeset/changeset.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"log/slog"
1414

15+
"github.com/platform-engineering-labs/formae/internal/metastructure/resolver"
1516
"github.com/platform-engineering-labs/formae/internal/metastructure/resource_update"
1617
"github.com/platform-engineering-labs/formae/internal/metastructure/target_update"
1718
apimodel "github.com/platform-engineering-labs/formae/pkg/api/model"
@@ -341,7 +342,17 @@ func (p *ExecutionDAG) buildTargetResolvableEdges() {
341342
if !ok {
342343
continue
343344
}
344-
for _, uri := range tu.RemainingResolvables {
345+
346+
// For delete operations, extract resolvable URIs from the existing
347+
// target config rather than RemainingResolvables. Delete target updates
348+
// intentionally don't set RemainingResolvables to avoid the target
349+
// updater attempting resolution during destroy.
350+
resolvables := tu.RemainingResolvables
351+
if tu.Operation == target_update.TargetOperationDelete && tu.ExistingTarget != nil {
352+
resolvables = resolver.ExtractResolvableURIsFromJSON(tu.ExistingTarget.Config)
353+
}
354+
355+
for _, uri := range resolvables {
345356
ksuid := uri.KSUID()
346357
if tu.Operation == target_update.TargetOperationDelete {
347358
// REVERSED: resource delete waits for target delete

internal/metastructure/changeset/resolve_cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@ func (r *ResolveCache) readViaPlugin(retry resolveRetry) (*plugin.TrackedProgres
254254
if !ok {
255255
return nil, fmt.Errorf("unexpected result type from plugin operator: %T", progressResult)
256256
}
257+
// Decompress resource properties if sent compressed over Ergo (64KB limit)
258+
if len(progress.CompressedResourceProperties) > 0 && len(progress.ResourceProperties) == 0 {
259+
decompressed, err := plugin.DecompressJSON(progress.CompressedResourceProperties)
260+
if err != nil {
261+
return nil, fmt.Errorf("failed to decompress resource properties: %w", err)
262+
}
263+
progress.ResourceProperties = decompressed
264+
}
257265
return &progress, nil
258266
}
259267

internal/metastructure/metastructure.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1390,14 +1390,17 @@ func findCascadeTargetDeletes(
13901390
ds datastore.Datastore,
13911391
) ([]target_update.TargetUpdate, []resource_update.ResourceUpdate, error) {
13921392

1393-
// Collect KSUIDs of all resources being deleted
1393+
// Collect KSUIDs of all resources being deleted and build a KSUID→label
1394+
// lookup so cascade sources can be displayed as human-readable labels.
13941395
deletingKSUIDs := make([]string, 0)
13951396
deletingSet := make(map[string]bool)
1397+
ksuidToLabel := make(map[string]string)
13961398
for _, ru := range resourceUpdates {
13971399
if ru.Operation == resource_update.OperationDelete && ru.DesiredState.Ksuid != "" {
13981400
if !deletingSet[ru.DesiredState.Ksuid] {
13991401
deletingSet[ru.DesiredState.Ksuid] = true
14001402
deletingKSUIDs = append(deletingKSUIDs, ru.DesiredState.Ksuid)
1403+
ksuidToLabel[ru.DesiredState.Ksuid] = ru.DesiredState.Label
14011404
}
14021405
}
14031406
}
@@ -1447,12 +1450,16 @@ func findCascadeTargetDeletes(
14471450
}
14481451
alreadyDeleting[target.Label] = true
14491452

1453+
cascadeSourceLabel := ksuidToLabel[sourceKSUID]
1454+
if cascadeSourceLabel == "" {
1455+
cascadeSourceLabel = sourceKSUID
1456+
}
14501457
slog.Debug("Cascade target delete detected",
14511458
"target", target.Label,
1452-
"cascadeSource", sourceKSUID)
1459+
"cascadeSource", cascadeSourceLabel)
14531460

14541461
cascadeTargetUpdates = append(cascadeTargetUpdates,
1455-
target_update.NewTargetUpdateForCascadeDelete(target, sourceKSUID))
1462+
target_update.NewTargetUpdateForCascadeDelete(target, cascadeSourceLabel))
14561463
newDeletedTargetLabels = append(newDeletedTargetLabels, target.Label)
14571464
}
14581465
}
@@ -1644,7 +1651,7 @@ func FormaCommandFromForma(forma *pkgmodel.Forma,
16441651
}
16451652
}
16461653

1647-
targetUpdates, err := target_update.NewTargetUpdateGenerator(ds).GenerateTargetUpdates(forma.Targets, command)
1654+
targetUpdates, err := target_update.NewTargetUpdateGenerator(ds).GenerateTargetUpdates(forma.Targets, command, len(forma.Resources) > 0)
16481655
if err != nil {
16491656
return nil, err
16501657
}

internal/metastructure/resource_update/resource_update_generator.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,14 @@ func generateResourceUpdatesForDestroy(
208208
}
209209
}
210210

211+
// Build KSUID→label lookup for human-readable cascade sources
212+
ksuidToLabel := make(map[string]string, len(resourceDestroys))
213+
for _, rd := range resourceDestroys {
214+
ksuidToLabel[rd.DesiredState.Ksuid] = rd.DesiredState.Label
215+
}
216+
211217
// Find cascade deletes - resources that reference the resources being deleted
212-
cascadeDeletes, err := findCascadeDeletes(explicitDeleteKSUIDs, existingTargetMap, source, ds)
218+
cascadeDeletes, err := findCascadeDeletes(explicitDeleteKSUIDs, ksuidToLabel, existingTargetMap, source, ds)
213219
if err != nil {
214220
return nil, fmt.Errorf("failed to find cascade deletes: %w", err)
215221
}
@@ -266,6 +272,7 @@ func generateResourceUpdatesForDestroy(
266272
// resources being deleted. Uses level-by-level BFS with batched queries to find the full cascade chain.
267273
func findCascadeDeletes(
268274
toDelete map[string]bool,
275+
ksuidToLabel map[string]string,
269276
existingTargetMap map[string]*pkgmodel.Target,
270277
source FormaCommandSource,
271278
ds ResourceDataLookup) ([]ResourceUpdate, error) {
@@ -313,9 +320,9 @@ func findCascadeDeletes(
313320
continue
314321
}
315322

316-
// Use the source KSUID for the cascade message
317-
sourceLabel := sourceKSUID
318-
if _, isExplicit := toDelete[sourceKSUID]; isExplicit {
323+
// Use human-readable label for the cascade source
324+
sourceLabel := ksuidToLabel[sourceKSUID]
325+
if sourceLabel == "" {
319326
sourceLabel = sourceKSUID
320327
}
321328

@@ -339,8 +346,9 @@ func findCascadeDeletes(
339346

340347
cascadeDeletes = append(cascadeDeletes, resourceDestroy)
341348

342-
// Add to next level for further cascade detection
349+
// Add to next level for further cascade detection and update label lookup
343350
nextLevel = append(nextLevel, dependent.Ksuid)
351+
ksuidToLabel[dependent.Ksuid] = dependent.Label
344352
}
345353
}
346354

internal/metastructure/resource_update/resource_updater.go

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -470,12 +470,19 @@ func create(state gen.Atom, data ResourceUpdateData, proc gen.Process) (gen.Atom
470470
return StateFinishedWithError, data, nil, nil
471471
}
472472

473+
compressedProps, err := plugin.CompressJSON(convertedResource.Properties)
474+
if err != nil {
475+
proc.Log().Error("failed to compress resource properties: %v", err)
476+
data.resourceUpdate.MarkAsFailed()
477+
return StateFinishedWithError, data, nil, nil
478+
}
479+
473480
createOperation := plugin.CreateResource{
474-
Namespace: convertedResource.Namespace(),
475-
ResourceType: convertedResource.Type,
476-
Label: convertedResource.Label,
477-
Properties: convertedResource.Properties,
478-
TargetConfig: data.resourceUpdate.ResourceTarget.Config,
481+
Namespace: convertedResource.Namespace(),
482+
ResourceType: convertedResource.Type,
483+
Label: convertedResource.Label,
484+
CompressedProperties: compressedProps,
485+
TargetConfig: data.resourceUpdate.ResourceTarget.Config,
479486
}
480487

481488
// First we check if progress already was made on the create operation. This can happen for example if the node crashed while the
@@ -556,15 +563,28 @@ func update(state gen.Atom, data ResourceUpdateData, proc gen.Process) (gen.Atom
556563
return StateFinishedWithError, data, nil, nil
557564
}
558565

566+
compressedDesired, err := plugin.CompressJSON(convertedResource.Properties)
567+
if err != nil {
568+
proc.Log().Error("failed to compress desired properties: %v", err)
569+
data.resourceUpdate.MarkAsFailed()
570+
return StateFinishedWithError, data, nil, nil
571+
}
572+
compressedPrior, err := plugin.CompressJSON(convertedExisting.Properties)
573+
if err != nil {
574+
proc.Log().Error("failed to compress prior properties: %v", err)
575+
data.resourceUpdate.MarkAsFailed()
576+
return StateFinishedWithError, data, nil, nil
577+
}
578+
559579
updateOperation := plugin.UpdateResource{
560-
Namespace: convertedResource.Namespace(),
561-
NativeID: convertedResource.NativeID,
562-
ResourceType: convertedResource.Type,
563-
Label: convertedResource.Label,
564-
PriorProperties: convertedExisting.Properties,
565-
DesiredProperties: convertedResource.Properties,
566-
PatchDocument: string(data.resourceUpdate.DesiredState.PatchDocument),
567-
TargetConfig: data.resourceUpdate.ResourceTarget.Config,
580+
Namespace: convertedResource.Namespace(),
581+
NativeID: convertedResource.NativeID,
582+
ResourceType: convertedResource.Type,
583+
Label: convertedResource.Label,
584+
CompressedPriorProperties: compressedPrior,
585+
CompressedDesiredProperties: compressedDesired,
586+
PatchDocument: string(data.resourceUpdate.DesiredState.PatchDocument),
587+
TargetConfig: data.resourceUpdate.ResourceTarget.Config,
568588
}
569589

570590
// First we check if progress already was made on the update operation. This can happen for example if the node crashed while the
@@ -635,6 +655,17 @@ func resumeWaitingForResource(state gen.Atom, data ResourceUpdateData, progress
635655
// state machine to the next state when the plugin operation finished successfully. After the last plugin operation, or
636656
// after the first error, it reports the final state to the stack updater and exits.
637657
func handleProgressUpdate(from gen.PID, state gen.Atom, data ResourceUpdateData, message plugin.TrackedProgress, proc gen.Process) (gen.Atom, ResourceUpdateData, []statemachine.Action, error) {
658+
// Decompress resource properties if sent compressed over Ergo
659+
if len(message.CompressedResourceProperties) > 0 && len(message.ResourceProperties) == 0 {
660+
decompressed, err := plugin.DecompressJSON(message.CompressedResourceProperties)
661+
if err != nil {
662+
proc.Log().Error("failed to decompress resource properties", "error", err)
663+
data.resourceUpdate.MarkAsFailed()
664+
return StateFinishedWithError, data, nil, nil
665+
}
666+
message.ResourceProperties = decompressed
667+
}
668+
638669
err := data.resourceUpdate.RecordProgress(&message)
639670
if err != nil {
640671
proc.Log().Error("failed to record progress for resource update", "error", err)
@@ -845,6 +876,15 @@ func doPluginOperation(resourceURI pkgmodel.FormaeURI, operation plugin.PluginOp
845876
return nil, fmt.Errorf("expected TrackedProgress, got %T", response)
846877
}
847878

879+
// Decompress resource properties if sent compressed over Ergo (64KB limit)
880+
if len(progressResult.CompressedResourceProperties) > 0 && len(progressResult.ResourceProperties) == 0 {
881+
decompressed, err := plugin.DecompressJSON(progressResult.CompressedResourceProperties)
882+
if err != nil {
883+
return nil, fmt.Errorf("failed to decompress resource properties: %w", err)
884+
}
885+
progressResult.ResourceProperties = decompressed
886+
}
887+
848888
return &progressResult, nil
849889
}
850890

internal/metastructure/target_update/target_update.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ type TargetUpdate struct {
4848
}
4949

5050
// NewTargetUpdateForCascadeDelete creates a cascade delete TargetUpdate for a target
51-
// that depends on a resource being deleted. The cascadeSourceKSUID is the KSUID of
52-
// the resource that triggered the cascade.
51+
// that depends on a resource being deleted. cascadeSource is the label of the
52+
// resource that triggered the cascade.
5353
//
5454
// Unlike regular target deletes, cascade deletes do NOT set RemainingResolvables
5555
// because the referenced resources are themselves being deleted. Attempting to
5656
// resolve those references during execution would fail.
57-
func NewTargetUpdateForCascadeDelete(target *pkgmodel.Target, cascadeSourceKSUID string) TargetUpdate {
57+
func NewTargetUpdateForCascadeDelete(target *pkgmodel.Target, cascadeSource string) TargetUpdate {
5858
now := util.TimeNow()
5959
return TargetUpdate{
6060
Target: *target,
@@ -64,7 +64,7 @@ func NewTargetUpdateForCascadeDelete(target *pkgmodel.Target, cascadeSourceKSUID
6464
StartTs: now,
6565
ModifiedTs: now,
6666
IsCascade: true,
67-
CascadeSource: cascadeSourceKSUID,
67+
CascadeSource: cascadeSource,
6868
}
6969
}
7070

0 commit comments

Comments
 (0)