From c0f6b72cddf1900b9b54a345eb8ab4692741a2ab Mon Sep 17 00:00:00 2001 From: "@x10an14-nav" Date: Thu, 28 May 2026 14:11:44 +0200 Subject: [PATCH] fix(hookd): recover clusters after dispatch stream failure A failed `stream.Send` means the registered `deployd` stream can no longer be trusted. Leaving that stream in `onlineClustersMap` can keep later deploys routed to dead state and block a clean `deployd` reconnect. The observed `RST_STREAM` failure does not prove `HAProxy` caused the reset, but it exposed a stale-registration path that `hookd` should treat as terminal. --- .envrc | 1 + .gitignore | 1 + flake.lock | 81 +++++++++++++++++++ flake.nix | 75 +++++++++++++++++ pkg/grpc/dispatchserver/dispatchserver.go | 3 + .../dispatchserver/dispatchserver_test.go | 74 +++++++++++++++++ 6 files changed, 235 insertions(+) create mode 100644 .envrc create mode 100644 flake.lock create mode 100644 flake.nix diff --git a/.envrc b/.envrc new file mode 100644 index 00000000..3550a30f --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +use flake diff --git a/.gitignore b/.gitignore index 0cd25bc5..4d82d948 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ .env /.testbin/ +/.direnv/ diff --git a/flake.lock b/flake.lock new file mode 100644 index 00000000..0e8ce3d5 --- /dev/null +++ b/flake.lock @@ -0,0 +1,81 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1779560665, + "narHash": "sha256-tpyBcxPpcQb8ukyNF7DoCwfSY3VPsxHoYwj00Cayv5o=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "64c08a7ca051951c8eae34e3e3cb1e202fe36786", + "type": "github" + }, + "original": { + "id": "nixpkgs", + "ref": "nixos-unstable", + "type": "indirect" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs", + "treefmt-nix": "treefmt-nix" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + }, + "treefmt-nix": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ] + }, + "locked": { + "lastModified": 1775636079, + "narHash": "sha256-pc20NRoMdiar8oPQceQT47UUZMBTiMdUuWrYu2obUP0=", + "owner": "numtide", + "repo": "treefmt-nix", + "rev": "790751ff7fd3801feeaf96d7dc416a8d581265ba", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "treefmt-nix", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 00000000..c4a355d6 --- /dev/null +++ b/flake.nix @@ -0,0 +1,75 @@ +{ + inputs = { + flake-utils.url = "github:numtide/flake-utils"; + nixpkgs.url = "nixpkgs/nixos-unstable"; + treefmt-nix = { + url = "github:numtide/treefmt-nix"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + }; + + outputs = + inputs: + inputs.flake-utils.lib.eachDefaultSystem ( + system: + let + pkgs = import inputs.nixpkgs { + localSystem = { + inherit system; + }; + }; + treefmt = inputs.treefmt-nix.lib.evalModule pkgs { + projectRootFile = "flake.nix"; + programs.nixfmt.enable = true; + }; + envtest-bins = pkgs.symlinkJoin { + name = "envtest-bins"; + paths = [ + pkgs.etcd + pkgs.kubernetes + ]; + }; + deploy = pkgs.buildGoModule { + pname = "deploy"; + version = "0.0.0"; + src = ./.; + vendorHash = "sha256-afFOb7DpB4gGyFErwM3lROMU2E1GVlT7+nSLU4zAV8E="; + + subPackages = [ + "cmd/crypt" + "cmd/deploy" + "cmd/deployd" + "cmd/hookd" + "cmd/leakdetect" + ]; + + nativeBuildInputs = with pkgs; [ + protobuf + protoc-gen-go + protoc-gen-go-grpc + ]; + + doCheck = false; + }; + in + { + packages.default = deploy; + + devShells.default = pkgs.mkShell { + inputsFrom = [ + deploy + envtest-bins + ]; + shellHook = '' + export KUBEBUILDER_ASSETS="${envtest-bins}/bin" + mkdir -p .testbin + ln -sfn ${pkgs.etcd}/bin/etcd .testbin/etcd + ln -sfn ${pkgs.kubernetes}/bin/kube-apiserver .testbin/kube-apiserver + ln -sfn ${pkgs.kubernetes}/bin/kubectl .testbin/kubectl + ''; + }; + + formatter = treefmt.config.build.wrapper; + } + ); +} diff --git a/pkg/grpc/dispatchserver/dispatchserver.go b/pkg/grpc/dispatchserver/dispatchserver.go index c796eced..3405214d 100644 --- a/pkg/grpc/dispatchserver/dispatchserver.go +++ b/pkg/grpc/dispatchserver/dispatchserver.go @@ -129,6 +129,9 @@ func (s *dispatchServer) Deployments(opts *pb.GetDeploymentOpts, stream pb.Dispa case req := <-c: err := stream.Send(req.request) req.wait <- err + if err != nil { + return err + } case <-time.After(30 * time.Minute): log.Warnf("Connection from cluster '%s' timed out", opts.Cluster) return fmt.Errorf("timeout") diff --git a/pkg/grpc/dispatchserver/dispatchserver_test.go b/pkg/grpc/dispatchserver/dispatchserver_test.go index 896ede22..c76e3845 100644 --- a/pkg/grpc/dispatchserver/dispatchserver_test.go +++ b/pkg/grpc/dispatchserver/dispatchserver_test.go @@ -16,10 +16,84 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/grpc/test/bufconn" ) +type erringDeploymentsStream struct { + ctx context.Context +} + +func (s *erringDeploymentsStream) Send(*pb.DeploymentRequest) error { + return status.Error(codes.Internal, "stream terminated by RST_STREAM with error code: INTERNAL_ERROR") +} + +func (s *erringDeploymentsStream) Context() context.Context { return s.ctx } + +func (s *erringDeploymentsStream) SetHeader(metadata.MD) error { return nil } + +func (s *erringDeploymentsStream) SendHeader(metadata.MD) error { return nil } + +func (s *erringDeploymentsStream) SetTrailer(metadata.MD) {} + +func (s *erringDeploymentsStream) SendMsg(any) error { return nil } + +func (s *erringDeploymentsStream) RecvMsg(any) error { return nil } + +func TestDeploymentsUnregistersClusterWhenSendFails(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, _ = telemetry.New(ctx, "test", "") + + deploymentStore := database.MockDeploymentStore{} + deploymentStore.On("HistoricDeployments", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + mockApiClients, _ := apiclient.NewMockClient(t) + ds := New(&deploymentStore, mockApiClients.Deployments()).(*dispatchServer) + + stream := &erringDeploymentsStream{ctx: ctx} + done := make(chan error, 1) + go func() { + done <- ds.Deployments(&pb.GetDeploymentOpts{Cluster: "dev", StartupTime: pb.TimeAsTimestamp(time.Now())}, stream) + }() + + requireEventually(t, time.Second, func() bool { + return len(ds.onlineClusters()) == 1 + }) + + err := ds.SendDeploymentRequest(ctx, &pb.DeploymentRequest{Cluster: "dev"}) + if status.Code(err) != codes.Internal { + t.Fatalf("expected send error to be propagated as Internal, got %v", err) + } + + requireEventually(t, time.Second, func() bool { + return len(ds.onlineClusters()) == 0 + }) + + select { + case err := <-done: + if err == nil { + t.Fatal("expected Deployments to exit with send error") + } + case <-time.After(time.Second): + t.Fatal("Deployments did not exit after stream Send failed") + } +} + +func requireEventually(t *testing.T, timeout time.Duration, condition func() bool) { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if condition() { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("condition was not satisfied before timeout") +} + func bufDialer(b *bufconn.Listener) func(context.Context, string) (net.Conn, error) { return func(context.Context, string) (net.Conn, error) { return b.Dial()