Skip to content

Commit 67616df

Browse files
fix(patch): filter spurious empty-collection ops and compress plugin messages
Filter empty-array/map "add" operations and strip nested empty collections from PKL's null rendering before patch comparison. Compress Resource fields in ReadResource/DeleteResource to stay within Ergo's 64KB network buffer. Rewrite required-field validation with recursive array-aware traversal. Add PKL subHints cycle detection for self-referencing sub-resource types.
1 parent 6174ba4 commit 67616df

19 files changed

Lines changed: 972 additions & 494 deletions

File tree

Makefile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,16 @@ fetch-external-plugins:
7474
fi
7575

7676
## build-external-plugins: Build all external plugins
77+
## Uses a go.mod replace directive so plugins build against the local pkg/plugin.
78+
## This ensures plugins stay compatible when plugin message types change.
7779
build-external-plugins: fetch-external-plugins
7880
@for repo in $(EXTERNAL_PLUGIN_REPOS); do \
7981
name=$$(basename $$repo .git); \
80-
echo "Building $$name..."; \
82+
echo "Building $$name (with local pkg/plugin)..."; \
83+
cd "$(PLUGINS_CACHE)/$$name" \
84+
&& go mod edit -replace github.com/platform-engineering-labs/formae/pkg/plugin=$(CURDIR)/pkg/plugin \
85+
&& go mod tidy \
86+
&& cd "$(CURDIR)"; \
8187
$(MAKE) -C "$(PLUGINS_CACHE)/$$name" build; \
8288
done
8389

internal/metastructure/changeset/resolve_cache.go

Lines changed: 48 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,7 @@ import (
2828
type ResolveCache struct {
2929
act.Actor
3030

31-
cache map[pkgmodel.FormaeURI]gjson.Result
32-
pending map[string]*pendingResolve
33-
}
34-
35-
type pendingResolve struct {
36-
resource pkgmodel.Resource
37-
targetConfig json.RawMessage
38-
requesters []pendingResolveRequester
39-
}
40-
41-
type pendingResolveRequester struct {
42-
from gen.PID
43-
resourceURI pkgmodel.FormaeURI
31+
cache map[pkgmodel.FormaeURI]gjson.Result
4432
}
4533

4634
type Shutdown struct{}
@@ -51,7 +39,6 @@ func NewResolveCache() gen.ProcessBehavior {
5139

5240
func (r *ResolveCache) Init(args ...any) error {
5341
r.cache = make(map[pkgmodel.FormaeURI]gjson.Result)
54-
r.pending = make(map[string]*pendingResolve)
5542

5643
r.Log().Debug("ResolveCache actor initialized")
5744

@@ -61,9 +48,18 @@ func (r *ResolveCache) Init(args ...any) error {
6148
func (r *ResolveCache) HandleMessage(from gen.PID, message any) error {
6249
switch msg := message.(type) {
6350
case messages.ResolveValue:
64-
return r.handleResolveValue(from, msg)
65-
case plugin.TrackedProgress:
66-
return r.handleTrackedProgress(msg)
51+
value, err := r.resolveValue(msg.ResourceURI)
52+
var response any
53+
if err != nil {
54+
response = messages.FailedToResolveValue(msg)
55+
} else {
56+
response = messages.ValueResolved{
57+
ResourceURI: msg.ResourceURI,
58+
Value: value,
59+
}
60+
}
61+
err = r.Send(from, response)
62+
return err
6763
case Shutdown:
6864
r.Log().Debug("ResolveCache received shutdown request")
6965
return gen.TerminateReasonNormal
@@ -73,75 +69,7 @@ func (r *ResolveCache) HandleMessage(from gen.PID, message any) error {
7369
return nil
7470
}
7571

76-
func (r *ResolveCache) handleResolveValue(from gen.PID, msg messages.ResolveValue) error {
77-
if value, err := r.resolveCachedValue(msg.ResourceURI); err == nil {
78-
return r.Send(from, messages.ValueResolved{ResourceURI: msg.ResourceURI, Value: value})
79-
}
80-
81-
resourceState, targetConfig, err := r.loadResource(msg.ResourceURI)
82-
if err != nil {
83-
return r.Send(from, messages.FailedToResolveValue(msg))
84-
}
85-
86-
if pending := r.pending[resourceState.NativeID]; pending != nil {
87-
pending.requesters = append(pending.requesters, pendingResolveRequester{from: from, resourceURI: msg.ResourceURI})
88-
return nil
89-
}
90-
91-
progress, err := r.startRead(resourceState, targetConfig, msg.ResourceURI)
92-
if err != nil {
93-
return r.Send(from, messages.FailedToResolveValue(msg))
94-
}
95-
requesters := []pendingResolveRequester{{from: from, resourceURI: msg.ResourceURI}}
96-
if progress.HasFinished() {
97-
return r.finishPendingRead(resourceState, progress, requesters)
98-
}
99-
if progress.NativeID == "" {
100-
progress.NativeID = resourceState.NativeID
101-
}
102-
r.pending[resourceState.NativeID] = &pendingResolve{resource: resourceState, targetConfig: targetConfig, requesters: requesters}
103-
return nil
104-
}
105-
106-
func (r *ResolveCache) handleTrackedProgress(progress plugin.TrackedProgress) error {
107-
pending := r.pending[progress.NativeID]
108-
if pending == nil || !progress.HasFinished() {
109-
return nil
110-
}
111-
delete(r.pending, progress.NativeID)
112-
return r.finishPendingRead(pending.resource, progress, pending.requesters)
113-
}
114-
115-
func (r *ResolveCache) finishPendingRead(resourceState pkgmodel.Resource, progress plugin.TrackedProgress, requesters []pendingResolveRequester) error {
116-
if progress.Failed() {
117-
for _, requester := range requesters {
118-
if err := r.Send(requester.from, messages.FailedToResolveValue{ResourceURI: requester.resourceURI}); err != nil {
119-
return err
120-
}
121-
}
122-
return nil
123-
}
124-
125-
parsed := gjson.ParseBytes([]byte(progress.ResourceProperties))
126-
enhancedParsed := r.preserveRefMetadata(resourceState, parsed)
127-
r.cache[resourceState.URI()] = enhancedParsed
128-
129-
for _, requester := range requesters {
130-
value := enhancedParsed.Get(requester.resourceURI.PropertyPath())
131-
if !value.Exists() {
132-
if err := r.Send(requester.from, messages.FailedToResolveValue{ResourceURI: requester.resourceURI}); err != nil {
133-
return err
134-
}
135-
continue
136-
}
137-
if err := r.Send(requester.from, messages.ValueResolved{ResourceURI: requester.resourceURI, Value: value.String()}); err != nil {
138-
return err
139-
}
140-
}
141-
return nil
142-
}
143-
144-
func (r *ResolveCache) resolveCachedValue(resourceURI pkgmodel.FormaeURI) (string, error) {
72+
func (r *ResolveCache) resolveValue(resourceURI pkgmodel.FormaeURI) (string, error) {
14573
// Check if the resource is already in the cache
14674
if json, ok := r.cache[resourceURI.Stripped()]; ok {
14775
r.Log().Debug("Cache hit for resource URI", "uri", resourceURI, "value", json)
@@ -152,10 +80,8 @@ func (r *ResolveCache) resolveCachedValue(resourceURI pkgmodel.FormaeURI) (strin
15280
}
15381
return value.String(), nil
15482
}
155-
return "", fmt.Errorf("cache miss")
156-
}
15783

158-
func (r *ResolveCache) loadResource(resourceURI pkgmodel.FormaeURI) (pkgmodel.Resource, json.RawMessage, error) {
84+
// Load the resource from the stack to get the native id
15985
r.Log().Debug("Cache miss for resource URI", "uri", resourceURI)
16086
stackerResult, err := r.Call(
16187
gen.ProcessID{Name: actornames.ResourcePersister, Node: r.Node().Name()},
@@ -164,68 +90,79 @@ func (r *ResolveCache) loadResource(resourceURI pkgmodel.FormaeURI) (pkgmodel.Re
16490
})
16591
if err != nil {
16692
r.Log().Error("Failed to load resource from resource persister", "resourceURI", resourceURI, "error", err)
167-
return pkgmodel.Resource{}, nil, fmt.Errorf("failed to load resource from resource persister: %w", err)
93+
return "", fmt.Errorf("failed to load resource from resource persister: %w", err)
16894
}
16995
loadResourceResult, ok := stackerResult.(messages.LoadResourceResult)
17096
if !ok {
17197
r.Log().Error("Unexpected result type from resource persister", "resultType", reflect.TypeOf(stackerResult))
172-
return pkgmodel.Resource{}, nil, fmt.Errorf("unexpected result type from resource persister: %T", stackerResult)
173-
}
174-
if !loadResourceResult.Found {
175-
return pkgmodel.Resource{}, nil, fmt.Errorf("resource %s not found", resourceURI.KSUID())
98+
return "", fmt.Errorf("unexpected result type from resource persister: %T", stackerResult)
17699
}
177-
return loadResourceResult.Resource, loadResourceResult.Target.Config, nil
178-
}
179100

180-
func (r *ResolveCache) startRead(resourceState pkgmodel.Resource, targetConfig json.RawMessage, resourceURI pkgmodel.FormaeURI) (plugin.TrackedProgress, error) {
181101
// Spawn the plugin operator via PluginCoordinator
182102
operationID := uuid.New().String()
183103
spawnResult, err := r.Call(
184104
gen.ProcessID{Name: actornames.PluginCoordinator, Node: r.Node().Name()},
185105
messages.SpawnPluginOperator{
186-
Namespace: resourceState.Namespace(),
106+
Namespace: loadResourceResult.Resource.Namespace(),
187107
ResourceURI: string(resourceURI.Stripped()),
188108
Operation: string(resource.OperationRead),
189109
OperationID: operationID,
190110
RequestedBy: r.PID(),
191111
})
192112
if err != nil {
193113
r.Log().Error("Failed to spawn plugin operator for resource", "resourceURI", resourceURI, "error", err)
194-
return plugin.TrackedProgress{}, fmt.Errorf("failed to spawn plugin operator for resource: %w", err)
114+
return "", fmt.Errorf("failed to spawn plugin operator for resource: %w", err)
195115
}
196116
spawnRes, ok := spawnResult.(messages.SpawnPluginOperatorResult)
197117
if !ok {
198118
r.Log().Error("Unexpected result type from PluginCoordinator", "resultType", reflect.TypeOf(spawnResult))
199-
return plugin.TrackedProgress{}, fmt.Errorf("unexpected result type from PluginCoordinator: %T", spawnResult)
119+
return "", fmt.Errorf("unexpected result type from PluginCoordinator: %T", spawnResult)
200120
}
201121
if spawnRes.Error != "" {
202122
r.Log().Error("Failed to spawn plugin operator", "error", spawnRes.Error)
203-
return plugin.TrackedProgress{}, fmt.Errorf("failed to spawn plugin operator: %s", spawnRes.Error)
123+
return "", fmt.Errorf("failed to spawn plugin operator: %s", spawnRes.Error)
124+
}
125+
126+
compRes, err := plugin.CompressResource(loadResourceResult.Resource)
127+
if err != nil {
128+
r.Log().Error("Failed to compress resource", "resourceURI", resourceURI, "error", err)
129+
return "", fmt.Errorf("failed to compress resource: %w", err)
204130
}
205131

206132
progressResult, err := r.Call(
207133
spawnRes.PID,
208134
plugin.ReadResource{
209-
Namespace: resourceState.Namespace(),
210-
ExistingResource: resourceState,
211-
Resource: resourceState,
212-
NativeID: resourceState.NativeID,
213-
TargetConfig: targetConfig,
135+
Namespace: loadResourceResult.Resource.Namespace(),
136+
ResourceType: loadResourceResult.Resource.Type,
137+
ResourceNamespace: loadResourceResult.Resource.Namespace(),
138+
ExistingResource: compRes,
139+
Resource: compRes,
140+
NativeID: loadResourceResult.Resource.NativeID,
141+
TargetConfig: loadResourceResult.Target.Config,
214142
})
215143

216144
if err != nil {
217145
r.Log().Error("Failed to read resource", "resourceURI", resourceURI, "error", err)
218-
return plugin.TrackedProgress{}, fmt.Errorf("failed to read resource: %w", err)
146+
return "", fmt.Errorf("failed to read resource: %w", err)
219147
}
220148
progress, ok := progressResult.(plugin.TrackedProgress)
221149
if !ok {
222150
r.Log().Error("Unexpected result type from plugin operator", "resultType", reflect.TypeOf(progressResult))
223-
return plugin.TrackedProgress{}, fmt.Errorf("unexpected result type from plugin operator: %T", progressResult)
151+
return "", fmt.Errorf("unexpected result type from plugin operator: %T", progressResult)
224152
}
225-
if progress.NativeID == "" {
226-
progress.NativeID = resourceState.NativeID
153+
parsed := gjson.ParseBytes([]byte(progress.ResourceProperties))
154+
155+
enhancedParsed := r.preserveRefMetadata(loadResourceResult.Resource, parsed)
156+
157+
r.cache[resourceURI.Stripped()] = enhancedParsed
158+
r.Log().Debug("Cache hit for resource URI", "uri", resourceURI, "value", enhancedParsed)
159+
value := enhancedParsed.Get(resourceURI.PropertyPath())
160+
if !value.Exists() {
161+
r.Log().Error("Unable to resolve property %s in cached properties for resource %s", resourceURI.PropertyPath(), resourceURI)
162+
return "", fmt.Errorf("property %s not found in cached properties for resource %s", resourceURI.PropertyPath(), resourceURI)
227163
}
228-
return progress, nil
164+
165+
return value.String(), nil
229166
}
230167

231168
func (r *ResolveCache) preserveRefMetadata(originalResource pkgmodel.Resource, pluginResult gjson.Result) gjson.Result {

internal/metastructure/messages/resource_persister.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ type LoadResource struct {
1515
type LoadResourceResult struct {
1616
Resource pkgmodel.Resource
1717
Target pkgmodel.Target
18-
Found bool
1918
}
2019

2120
// CleanupEmptyStacks is sent to ResourcePersister after a changeset completes

0 commit comments

Comments
 (0)