diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c31c9bd5..b88a2aaf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -194,4 +194,58 @@ jobs: uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3 with: flags: unittests - file: _output/tests/linux_amd64/coverage.txt + files: _output/tests/linux_amd64/coverage.txt + token: ${{ secrets.CODECOV_TOKEN }} + + race-tests: + runs-on: ubuntu-latest + needs: detect-noop + if: needs.detect-noop.outputs.noop != 'true' + steps: + - name: Cleanup Disk + uses: jlumbroso/free-disk-space@54081f138730dfa15788a46383842cd2f914a1be # v1.3.1 + with: + large-packages: false + swap-storage: false + + - name: Checkout + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + submodules: true + + - name: Fetch History + run: git fetch --prune --unshallow + + - name: Setup Go + uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Find the Go Build Cache + id: go-cache-paths + run: | + echo "go-build=$(make go.cachedir)" >> $GITHUB_OUTPUT + echo "go-mod=$(make go.mod.cachedir)" >> $GITHUB_OUTPUT + + - name: Cache the Go Build Cache + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: ${{ steps.go-cache-paths.outputs.go-build }} + key: ${{ runner.os }}-build-race-tests-${{ hashFiles('**/go.sum') }} + restore-keys: ${{ runner.os }}-build-race-tests- + + - name: Cache Go Dependencies + uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3 + with: + path: ${{ steps.go-cache-paths.outputs.go-mod }} + key: ${{ runner.os }}-pkg-race-tests-${{ hashFiles('**/go.sum') }} + restore-keys: ${{ runner.os }}-pkg-race-tests- + + - name: Vendor Dependencies + run: make vendor vendor.check + + # Runs the unit tests with the Go race detector enabled to guard against + # regressions of concurrency bugs such as the managed resource status + # update data race. See: https://github.com/crossplane/upjet/issues/472 + - name: Run Race Tests + run: make test.race diff --git a/Makefile b/Makefile index d980face..e8cc5083 100644 --- a/Makefile +++ b/Makefile @@ -69,4 +69,14 @@ go.cachedir: go.mod.cachedir: @go env GOMODCACHE -.PHONY: reviewable submodules fallthrough go.mod.cachedir go.cachedir +# test.race runs the unit tests with the Go race detector enabled. The race +# detector requires cgo, so we enable it here. +# Do NOT directly use the `go.test.unit` make target of +# the build submodule here. There's a `pipefail` issue with that target, +# which will mask any failing tests. +test.race: + @$(INFO) go test race + @CGO_ENABLED=1 $(GOHOST) test -race $(GO_STATIC_FLAGS) $(GO_PACKAGES) || $(FAIL) + @$(OK) go test race + +.PHONY: reviewable submodules fallthrough go.mod.cachedir go.cachedir test.race diff --git a/pkg/controller/external_async_tfpluginfw.go b/pkg/controller/external_async_tfpluginfw.go index 77af400c..cc7e2da3 100644 --- a/pkg/controller/external_async_tfpluginfw.go +++ b/pkg/controller/external_async_tfpluginfw.go @@ -164,6 +164,10 @@ func (n *terraformPluginFrameworkAsyncExternalClient) Create(_ context.Context, } ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout)) + // We deep-copy the managed resource to prevent a data race between the + // goroutine we are about to start below and the managed reconciler. + // Please see: https://github.com/crossplane/upjet/issues/472 + mgCopy := mg.DeepCopyObject().(xpresource.Managed) go func() { // The order of deferred functions, executed last-in-first-out, is // significant. The context should be canceled last, because it is @@ -178,14 +182,14 @@ func (n *terraformPluginFrameworkAsyncExternalClient) Create(_ context.Context, n.opTracker.logger.Debug("Async create ended.", "error", err) n.opTracker.LastOperation.MarkEnd() - if cErr := n.callback.Create(mg.GetName())(err, ctx); cErr != nil { + if cErr := n.callback.Create(mgCopy.GetName())(err, ctx); cErr != nil { n.opTracker.logger.Info("Async create callback failed", "error", cErr.Error()) } }() defer ph.recoverIfPanic(ctx) n.opTracker.logger.Debug("Async create starting...") - _, ph.err = n.terraformPluginFrameworkExternalClient.Create(ctx, mg) + _, ph.err = n.terraformPluginFrameworkExternalClient.Create(ctx, mgCopy) }() return managed.ExternalCreation{}, n.opTracker.LastOperation.Error() @@ -197,6 +201,10 @@ func (n *terraformPluginFrameworkAsyncExternalClient) Update(_ context.Context, } ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout)) + // We deep-copy the managed resource to prevent a data race between the + // goroutine we are about to start below and the managed reconciler. + // Please see: https://github.com/crossplane/upjet/issues/472 + mgCopy := mg.DeepCopyObject().(xpresource.Managed) go func() { // The order of deferred functions, executed last-in-first-out, is // significant. The context should be canceled last, because it is @@ -211,14 +219,14 @@ func (n *terraformPluginFrameworkAsyncExternalClient) Update(_ context.Context, n.opTracker.logger.Debug("Async update ended.", "error", err) n.opTracker.LastOperation.MarkEnd() - if cErr := n.callback.Update(mg.GetName())(err, ctx); cErr != nil { + if cErr := n.callback.Update(mgCopy.GetName())(err, ctx); cErr != nil { n.opTracker.logger.Info("Async update callback failed", "error", cErr.Error()) } }() defer ph.recoverIfPanic(ctx) n.opTracker.logger.Debug("Async update starting...") - _, ph.err = n.terraformPluginFrameworkExternalClient.Update(ctx, mg) + _, ph.err = n.terraformPluginFrameworkExternalClient.Update(ctx, mgCopy) }() return managed.ExternalUpdate{}, n.opTracker.LastOperation.Error() diff --git a/pkg/controller/external_async_tfpluginfw_test.go b/pkg/controller/external_async_tfpluginfw_test.go new file mode 100644 index 00000000..e4bc7d3d --- /dev/null +++ b/pkg/controller/external_async_tfpluginfw_test.go @@ -0,0 +1,435 @@ +// SPDX-FileCopyrightText: 2026 The Crossplane Authors +// +// SPDX-License-Identifier: Apache-2.0 + +package controller + +import ( + "context" + "testing" + + "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" + xpresource "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/crossplane/crossplane-runtime/pkg/test" + "github.com/google/go-cmp/cmp" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/crossplane/upjet/pkg/config" + "github.com/crossplane/upjet/pkg/terraform" +) + +func prepareTerraformPluginFrameworkAsyncExternal(testConfig testConfiguration, fns CallbackFns) *terraformPluginFrameworkAsyncExternalClient { + return &terraformPluginFrameworkAsyncExternalClient{ + terraformPluginFrameworkExternalClient: prepareTPFExternalWithTestConfig(testConfig), + callback: fns, + } +} + +func TestAsyncTerraformPluginFrameworkConnect(t *testing.T) { + type args struct { + setupFn terraform.SetupFn + cfg *config.Resource + ots *OperationTrackerStore + obj xpresource.Managed + } + type want struct { + err error + } + cases := map[string]struct { + args + want + }{ + "Successful": { + args: args{ + setupFn: func(_ context.Context, _ client.Client, _ xpresource.Managed) (terraform.Setup, error) { + return terraform.Setup{ + FrameworkProvider: &mockTPFProvider{}, + }, nil + }, + cfg: newBaseUpjetConfig(), + obj: newObjAsync(), + ots: ots, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + c := NewTerraformPluginFrameworkAsyncConnector(nil, tc.args.ots, tc.args.setupFn, tc.args.cfg, WithTerraformPluginFrameworkAsyncLogger(logTest)) + _, err := c.Connect(context.TODO(), tc.args.obj) + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("\n%s\nConnect(...): -want error, +got error:\n", diff) + } + }) + } +} + +func TestAsyncTerraformPluginFrameworkObserve(t *testing.T) { + type want struct { + obs managed.ExternalObservation + err error + } + cases := map[string]struct { + testConfiguration + want + }{ + "NotExists": { + testConfiguration: testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + obj: newBaseObject(), + currentStateMap: nil, + plannedStateMap: map[string]any{ + "name": "example", + }, + params: map[string]any{ + "name": "example", + }, + }, + want: want{ + obs: managed.ExternalObservation{ + ResourceExists: false, + ResourceUpToDate: false, + ResourceLateInitialized: false, + ConnectionDetails: nil, + Diff: "", + }, + }, + }, + "UpToDate": { + testConfiguration: testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + obj: newBaseObject(), + params: map[string]any{ + "id": "example-id", + "name": "example", + }, + currentStateMap: map[string]any{ + "id": "example-id", + "name": "example", + }, + plannedStateMap: map[string]any{ + "id": "example-id", + "name": "example", + }, + }, + want: want{ + obs: managed.ExternalObservation{ + ResourceExists: true, + ResourceUpToDate: true, + ResourceLateInitialized: true, + ConnectionDetails: nil, + Diff: "", + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + terraformPluginFrameworkAsyncExternal := prepareTerraformPluginFrameworkAsyncExternal(tc.testConfiguration, CallbackFns{}) + observation, err := terraformPluginFrameworkAsyncExternal.Observe(context.TODO(), &tc.testConfiguration.obj) + if diff := cmp.Diff(tc.want.obs, observation); diff != "" { + t.Errorf("\n%s\nObserve(...): -want observation, +got observation:\n", diff) + } + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("\n%s\nConnect(...): -want error, +got error:\n", diff) + } + }) + } +} + +func TestAsyncTerraformPluginFrameworkCreate(t *testing.T) { + type want struct { + err error + } + cases := map[string]struct { + testConfiguration + want + }{ + "Successful": { + testConfiguration: testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + obj: newBaseObject(), + currentStateMap: nil, + plannedStateMap: map[string]any{ + "name": "example", + }, + params: map[string]any{ + "name": "example", + }, + newStateMap: map[string]any{ + "name": "example", + "id": "example-id", + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + terraformPluginFrameworkAsyncExternal := prepareTerraformPluginFrameworkAsyncExternal(tc.testConfiguration, CallbackFns{ + CreateFn: func(_ string) terraform.CallbackFn { + return func(err error, ctx context.Context) error { + return nil + } + }, + }) + _, err := terraformPluginFrameworkAsyncExternal.Create(context.TODO(), &tc.testConfiguration.obj) + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("\n%s\nterraformPluginFrameworkAsyncExternalClient.Create(...): -want error, +got error:\n", diff) + } + }) + } +} + +func TestAsyncTerraformPluginFrameworkUpdate(t *testing.T) { + type want struct { + err error + } + cases := map[string]struct { + testConfiguration + want + }{ + "Successful": { + testConfiguration: testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + obj: newBaseObject(), + currentStateMap: map[string]any{ + "name": "example", + "id": "example-id", + }, + plannedStateMap: map[string]any{ + "name": "example-updated", + "id": "example-id", + }, + params: map[string]any{ + "name": "example-updated", + }, + newStateMap: map[string]any{ + "name": "example-updated", + "id": "example-id", + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + terraformPluginFrameworkAsyncExternal := prepareTerraformPluginFrameworkAsyncExternal(tc.testConfiguration, CallbackFns{ + UpdateFn: func(_ string) terraform.CallbackFn { + return func(err error, ctx context.Context) error { + return nil + } + }, + }) + _, err := terraformPluginFrameworkAsyncExternal.Update(context.TODO(), &tc.testConfiguration.obj) + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("\n%s\nterraformPluginFrameworkAsyncExternalClient.Update(...): -want error, +got error:\n", diff) + } + }) + } +} + +func TestAsyncTerraformPluginFrameworkDelete(t *testing.T) { + type want struct { + err error + } + cases := map[string]struct { + testConfiguration + want + }{ + "Successful": { + testConfiguration: testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + obj: newBaseObject(), + currentStateMap: map[string]any{ + "name": "example", + "id": "example-id", + }, + plannedStateMap: nil, + params: map[string]any{ + "name": "example", + }, + newStateMap: nil, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + terraformPluginFrameworkAsyncExternal := prepareTerraformPluginFrameworkAsyncExternal(tc.testConfiguration, CallbackFns{ + DestroyFn: func(_ string) terraform.CallbackFn { + return func(err error, ctx context.Context) error { + return nil + } + }, + }) + _, err := terraformPluginFrameworkAsyncExternal.Delete(context.TODO(), &tc.testConfiguration.obj) + if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { + t.Errorf("\n%s\nterraformPluginFrameworkAsyncExternalClient.Delete(...): -want error, +got error:\n", diff) + } + }) + } +} + +// TestAsyncTerraformPluginFrameworkCreateRace is a regression test for +// the data race on a managed resource's status, between upjet's async Create +// operation and the managed reconciler. +// Must be run with `go test -race`. +// +// Please also see: https://github.com/crossplane/upjet/issues/472. +func TestAsyncTerraformPluginFrameworkCreateRace(t *testing.T) { + obj := newObjAsync() + tc := testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + currentStateMap: nil, + plannedStateMap: map[string]any{ + "name": "example", + }, + params: map[string]any{ + "name": "example", + }, + newStateMap: map[string]any{ + "name": "example", + "id": "example-id", + }, + } + + extDone := make(chan struct{}) + ext := prepareTerraformPluginFrameworkAsyncExternal(tc, CallbackFns{ + CreateFn: func(_ string) terraform.CallbackFn { + return func(_ error, _ context.Context) error { + // Signal the async operation of the external client has completed. + close(extDone) + return nil + } + }, + }) + // This call starts the async worker that will race with + // the managed reconciler below. + if _, err := ext.Create(context.TODO(), obj); err != nil { + t.Fatalf("terraformPluginFrameworkAsyncExternalClient.Create(...): unexpected error: %v", err) + } + + // Simulate the managed reconciler concurrently writing to the status of + // the same MR (obj above). + mrDone := make(chan struct{}) + go func() { + _ = obj.DeepCopyObject() + _ = obj.SetObservation(map[string]any{"name": "example"}) + // Signal the managed reconciler has completed. + close(mrDone) + }() + <-extDone + <-mrDone +} + +// TestAsyncTerraformPluginFrameworkUpdateRace is a regression test for +// the data race on a managed resource's status, between upjet's async Update +// operation and the managed reconciler. +// Must be run with `go test -race`. +// +// Please also see: https://github.com/crossplane/upjet/issues/472. +func TestAsyncTerraformPluginFrameworkUpdateRace(t *testing.T) { + obj := newObjAsync() + tc := testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + currentStateMap: map[string]any{ + "name": "example", + "id": "example-id", + }, + plannedStateMap: map[string]any{ + "name": "example-updated", + "id": "example-id", + }, + params: map[string]any{ + "name": "example-updated", + }, + newStateMap: map[string]any{ + "name": "example-updated", + "id": "example-id", + }, + } + + extDone := make(chan struct{}) + ext := prepareTerraformPluginFrameworkAsyncExternal(tc, CallbackFns{ + UpdateFn: func(_ string) terraform.CallbackFn { + return func(_ error, _ context.Context) error { + // Signal the async operation of the external client has completed. + close(extDone) + return nil + } + }, + }) + // This call starts the async worker that will race with + // the managed reconciler below. + if _, err := ext.Update(context.TODO(), obj); err != nil { + t.Fatalf("terraformPluginFrameworkAsyncExternalClient.Update(...): unexpected error: %v", err) + } + + // Simulate the managed reconciler concurrently writing to the status of + // the same MR (obj above). + mrDone := make(chan struct{}) + go func() { + _ = obj.DeepCopyObject() + _ = obj.SetObservation(map[string]any{"name": "example"}) + // Signal the managed reconciler has completed. + close(mrDone) + }() + <-extDone + <-mrDone +} + +// TestAsyncTerraformPluginFrameworkDeleteRace is a guard test asserting that +// upjet's async Delete operation does not concurrently access a managed +// resource's status while the managed reconciler does. +// Current async client Delete does not modify MR status or spec. +// Must be run with `go test -race`. +func TestAsyncTerraformPluginFrameworkDeleteRace(t *testing.T) { + obj := newObjAsync() + tc := testConfiguration{ + r: newMockBaseTPFResource(), + cfg: newBaseUpjetConfig(), + currentStateMap: map[string]any{ + "name": "example", + "id": "example-id", + }, + plannedStateMap: nil, + params: map[string]any{ + "name": "example", + }, + newStateMap: nil, + } + + extDone := make(chan struct{}) + ext := prepareTerraformPluginFrameworkAsyncExternal(tc, CallbackFns{ + DestroyFn: func(_ string) terraform.CallbackFn { + return func(_ error, _ context.Context) error { + // Signal the async operation of the external client has completed. + close(extDone) + return nil + } + }, + }) + // This call starts the async worker that will race with + // the managed reconciler below. + if _, err := ext.Delete(context.TODO(), obj); err != nil { + t.Fatalf("terraformPluginFrameworkAsyncExternalClient.Delete(...): unexpected error: %v", err) + } + + // Simulate the managed reconciler concurrently writing to the status of + // the same MR (obj above). + mrDone := make(chan struct{}) + go func() { + _ = obj.DeepCopyObject() + // Managed reconciler does not call SetObservation during deletion. + // This is an extra check at the moment. + _ = obj.SetObservation(map[string]any{"name": "example"}) + // Signal the managed reconciler has completed. + close(mrDone) + }() + <-extDone + <-mrDone +} diff --git a/pkg/controller/external_async_tfpluginsdk.go b/pkg/controller/external_async_tfpluginsdk.go index 4b9c66e8..7164a5cd 100644 --- a/pkg/controller/external_async_tfpluginsdk.go +++ b/pkg/controller/external_async_tfpluginsdk.go @@ -142,6 +142,10 @@ func (n *terraformPluginSDKAsyncExternal) Create(_ context.Context, mg xpresourc } ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout)) + // We deep-copy the managed resource to prevent a data race between the + // goroutine we are about to start below and the managed reconciler. + // Please see: https://github.com/crossplane/upjet/issues/472 + mgCopy := mg.DeepCopyObject().(xpresource.Managed) go func() { // The order of deferred functions, executed last-in-first-out, is // significant. The context should be canceled last, because it is @@ -156,14 +160,14 @@ func (n *terraformPluginSDKAsyncExternal) Create(_ context.Context, mg xpresourc n.opTracker.logger.Debug("Async create ended.", "error", err, "tfID", n.opTracker.GetTfID()) n.opTracker.LastOperation.MarkEnd() - if cErr := n.callback.Create(mg.GetName())(err, ctx); cErr != nil { + if cErr := n.callback.Create(mgCopy.GetName())(err, ctx); cErr != nil { n.opTracker.logger.Info("Async create callback failed", "error", cErr.Error()) } }() defer ph.recoverIfPanic(ctx) n.opTracker.logger.Debug("Async create starting...", "tfID", n.opTracker.GetTfID()) - _, ph.err = n.terraformPluginSDKExternal.Create(ctx, mg) + _, ph.err = n.terraformPluginSDKExternal.Create(ctx, mgCopy) }() return managed.ExternalCreation{}, n.opTracker.LastOperation.Error() @@ -175,6 +179,10 @@ func (n *terraformPluginSDKAsyncExternal) Update(_ context.Context, mg xpresourc } ctx, cancel := context.WithDeadline(context.Background(), n.opTracker.LastOperation.StartTime().Add(defaultAsyncTimeout)) + // We deep-copy the managed resource to prevent a data race between the + // goroutine we are about to start below and the managed reconciler. + // Please see: https://github.com/crossplane/upjet/issues/472 + mgCopy := mg.DeepCopyObject().(xpresource.Managed) go func() { // The order of deferred functions, executed last-in-first-out, is // significant. The context should be canceled last, because it is @@ -189,14 +197,14 @@ func (n *terraformPluginSDKAsyncExternal) Update(_ context.Context, mg xpresourc n.opTracker.logger.Debug("Async update ended.", "error", err, "tfID", n.opTracker.GetTfID()) n.opTracker.LastOperation.MarkEnd() - if cErr := n.callback.Update(mg.GetName())(err, ctx); cErr != nil { + if cErr := n.callback.Update(mgCopy.GetName())(err, ctx); cErr != nil { n.opTracker.logger.Info("Async update callback failed", "error", cErr.Error()) } }() defer ph.recoverIfPanic(ctx) n.opTracker.logger.Debug("Async update starting...", "tfID", n.opTracker.GetTfID()) - _, ph.err = n.terraformPluginSDKExternal.Update(ctx, mg) + _, ph.err = n.terraformPluginSDKExternal.Update(ctx, mgCopy) }() return managed.ExternalUpdate{}, n.opTracker.LastOperation.Error() diff --git a/pkg/controller/external_async_tfpluginsdk_test.go b/pkg/controller/external_async_tfpluginsdk_test.go index 08eaefeb..babe9ea2 100644 --- a/pkg/controller/external_async_tfpluginsdk_test.go +++ b/pkg/controller/external_async_tfpluginsdk_test.go @@ -60,20 +60,6 @@ var ( return nil, nil }}, } - objAsync = &fake.Terraformed{ - Parameterizable: fake.Parameterizable{ - Parameters: map[string]any{ - "name": "example", - "map": map[string]any{ - "key": "value", - }, - "list": []any{"elem1", "elem2"}, - }, - }, - Observable: fake.Observable{ - Observation: map[string]any{}, - }, - } ) func prepareTerraformPluginSDKAsyncExternal(r Resource, cfg *config.Resource, fns CallbackFns) *terraformPluginSDKAsyncExternal { @@ -118,7 +104,7 @@ func TestAsyncTerraformPluginSDKConnect(t *testing.T) { return terraform.Setup{}, nil }, cfg: cfgAsync, - obj: objAsync, + obj: newObjAsync(), ots: ots, }, }, @@ -156,7 +142,7 @@ func TestAsyncTerraformPluginSDKObserve(t *testing.T) { }, }, cfg: cfgAsync, - obj: objAsync, + obj: newObjAsync(), }, want: want{ obs: managed.ExternalObservation{ @@ -176,7 +162,7 @@ func TestAsyncTerraformPluginSDKObserve(t *testing.T) { }, }, cfg: cfgAsync, - obj: objAsync, + obj: newObjAsync(), }, want: want{ obs: managed.ExternalObservation{ @@ -225,7 +211,7 @@ func TestAsyncTerraformPluginSDKCreate(t *testing.T) { }, }, cfg: cfgAsync, - obj: objAsync, + obj: newObjAsync(), fns: CallbackFns{ CreateFn: func(s string) terraform.CallbackFn { return func(err error, ctx context.Context) error { @@ -241,7 +227,7 @@ func TestAsyncTerraformPluginSDKCreate(t *testing.T) { terraformPluginSDKAsyncExternal := prepareTerraformPluginSDKAsyncExternal(tc.args.r, tc.args.cfg, tc.args.fns) _, err := terraformPluginSDKAsyncExternal.Create(context.TODO(), tc.args.obj) if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { - t.Errorf("\n%s\nConnect(...): -want error, +got error:\n", diff) + t.Errorf("\n%s\nterraformPluginSDKAsyncExternal.Create(...): -want error, +got error:\n", diff) } }) } @@ -269,7 +255,7 @@ func TestAsyncTerraformPluginSDKUpdate(t *testing.T) { }, }, cfg: cfgAsync, - obj: objAsync, + obj: newObjAsync(), fns: CallbackFns{ UpdateFn: func(s string) terraform.CallbackFn { return func(err error, ctx context.Context) error { @@ -285,7 +271,7 @@ func TestAsyncTerraformPluginSDKUpdate(t *testing.T) { terraformPluginSDKAsyncExternal := prepareTerraformPluginSDKAsyncExternal(tc.args.r, tc.args.cfg, tc.args.fns) _, err := terraformPluginSDKAsyncExternal.Update(context.TODO(), tc.args.obj) if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { - t.Errorf("\n%s\nConnect(...): -want error, +got error:\n", diff) + t.Errorf("\n%s\nterraformPluginSDKAsyncExternal.Update(...): -want error, +got error:\n", diff) } }) } @@ -313,7 +299,7 @@ func TestAsyncTerraformPluginSDKDelete(t *testing.T) { }, }, cfg: cfgAsync, - obj: objAsync, + obj: newObjAsync(), fns: CallbackFns{ DestroyFn: func(s string) terraform.CallbackFn { return func(err error, ctx context.Context) error { @@ -329,8 +315,155 @@ func TestAsyncTerraformPluginSDKDelete(t *testing.T) { terraformPluginSDKAsyncExternal := prepareTerraformPluginSDKAsyncExternal(tc.args.r, tc.args.cfg, tc.args.fns) _, err := terraformPluginSDKAsyncExternal.Delete(context.TODO(), tc.args.obj) if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { - t.Errorf("\n%s\nConnect(...): -want error, +got error:\n", diff) + t.Errorf("\n%s\nterraformPluginSDKAsyncExternal.Delete(...): -want error, +got error:\n", diff) } }) } } + +// TestAsyncTerraformPluginSDKCreateRace is a regression test for +// the data race on a managed resource's status, between upjet's async Create +// operation and the managed reconciler. +// Must be run with `go test -race`. +// +// Please also see: https://github.com/crossplane/upjet/issues/472. +func TestAsyncTerraformPluginSDKCreateRace(t *testing.T) { + obj := newObjAsync() + r := mockResource{ + ApplyFn: func(_ context.Context, _ *tf.InstanceState, _ *tf.InstanceDiff, _ interface{}) (*tf.InstanceState, diag.Diagnostics) { + return &tf.InstanceState{ID: "example-id", Attributes: map[string]string{"name": "example"}}, nil + }, + } + + extDone := make(chan struct{}) + ext := prepareTerraformPluginSDKAsyncExternal(r, cfgAsync, CallbackFns{ + CreateFn: func(_ string) terraform.CallbackFn { + return func(_ error, _ context.Context) error { + // Signal the async operation of the external client has completed. + close(extDone) + return nil + } + }, + }) + // This call starts the async worker that will race with + // the managed reconciler below. + if _, err := ext.Create(context.TODO(), obj); err != nil { + t.Fatalf("terraformPluginSDKAsyncExternal.Create(...): unexpected error: %v", err) + } + + // Simulate the managed reconciler concurrently writing to the status of + // the same MR (obj above). + mrDone := make(chan struct{}) + go func() { + _ = obj.DeepCopyObject() + _ = obj.SetObservation(map[string]any{"name": "example"}) + // Signal the managed reconciler has completed. + close(mrDone) + }() + <-extDone + <-mrDone +} + +// TestAsyncTerraformPluginSDKUpdateRace is a regression test for +// the data race on a managed resource's status, between upjet's async Update +// operation and the managed reconciler. +// Must be run with `go test -race`. +// +// Please also see: https://github.com/crossplane/upjet/issues/472. +func TestAsyncTerraformPluginSDKUpdateRace(t *testing.T) { + obj := newObjAsync() + r := mockResource{ + ApplyFn: func(_ context.Context, _ *tf.InstanceState, _ *tf.InstanceDiff, _ interface{}) (*tf.InstanceState, diag.Diagnostics) { + return &tf.InstanceState{ID: "example-id", Attributes: map[string]string{"name": "example"}}, nil + }, + } + + extDone := make(chan struct{}) + ext := prepareTerraformPluginSDKAsyncExternal(r, cfgAsync, CallbackFns{ + UpdateFn: func(_ string) terraform.CallbackFn { + return func(_ error, _ context.Context) error { + // Signal the async operation of the external client has completed. + close(extDone) + return nil + } + }, + }) + // This call starts the async worker that will race with + // the managed reconciler below. + if _, err := ext.Update(context.TODO(), obj); err != nil { + t.Fatalf("terraformPluginSDKAsyncExternal.Update(...): unexpected error: %v", err) + } + + // Simulate the managed reconciler concurrently writing to the status of + // the same MR (obj above). + mrDone := make(chan struct{}) + go func() { + _ = obj.DeepCopyObject() + _ = obj.SetObservation(map[string]any{"name": "example"}) + // Signal the managed reconciler has completed. + close(mrDone) + }() + <-extDone + <-mrDone +} + +// TestAsyncTerraformPluginSDKDeleteRace is a guard test asserting that upjet's +// async Delete operation does not concurrently access a managed resource's +// status while the managed reconciler does. Current async client Delete +// implementation does not modify MR status or spec. +// Must be run with `go test -race`. +func TestAsyncTerraformPluginSDKDeleteRace(t *testing.T) { + obj := newObjAsync() + r := mockResource{ + ApplyFn: func(_ context.Context, _ *tf.InstanceState, _ *tf.InstanceDiff, _ interface{}) (*tf.InstanceState, diag.Diagnostics) { + return &tf.InstanceState{ID: "example-id", Attributes: map[string]string{"name": "example"}}, nil + }, + } + + extDone := make(chan struct{}) + ext := prepareTerraformPluginSDKAsyncExternal(r, cfgAsync, CallbackFns{ + DestroyFn: func(_ string) terraform.CallbackFn { + return func(_ error, _ context.Context) error { + // Signal the async operation of the external client has completed. + close(extDone) + return nil + } + }, + }) + // This call starts the async worker that will race with + // the managed reconciler below. + if _, err := ext.Delete(context.TODO(), obj); err != nil { + t.Fatalf("terraformPluginSDKAsyncExternal.Delete(...): unexpected error: %v", err) + } + + // Simulate the managed reconciler concurrently writing to the status of + // the same MR (obj above). + mrDone := make(chan struct{}) + go func() { + _ = obj.DeepCopyObject() + // Managed reconciler does not call SetObservation during deletion. + // This is an extra check at the moment. + _ = obj.SetObservation(map[string]any{"name": "example"}) + // Signal the managed reconciler has completed. + close(mrDone) + }() + <-extDone + <-mrDone +} + +func newObjAsync() *fake.Terraformed { + return &fake.Terraformed{ + Parameterizable: fake.Parameterizable{ + Parameters: map[string]any{ + "name": "example", + "map": map[string]any{ + "key": "value", + }, + "list": []any{"elem1", "elem2"}, + }, + }, + Observable: fake.Observable{ + Observation: map[string]any{}, + }, + } +}