Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use flake
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
.env

/.testbin/
/.direnv/
81 changes: 81 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 75 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
{
inputs = {
flake-utils.url = "github:numtide/flake-utils";
nixpkgs.url = "nixpkgs/nixos-unstable";
treefmt-nix = {
url = "github:numtide/treefmt-nix";
inputs.nixpkgs.follows = "nixpkgs";
};
};

outputs =
inputs:
inputs.flake-utils.lib.eachDefaultSystem (
system:
let
pkgs = import inputs.nixpkgs {
localSystem = {
inherit system;
};
};
treefmt = inputs.treefmt-nix.lib.evalModule pkgs {
projectRootFile = "flake.nix";
programs.nixfmt.enable = true;
};
envtest-bins = pkgs.symlinkJoin {
name = "envtest-bins";
paths = [
pkgs.etcd
pkgs.kubernetes
];
};
deploy = pkgs.buildGoModule {
pname = "deploy";
version = "0.0.0";
src = ./.;
vendorHash = "sha256-afFOb7DpB4gGyFErwM3lROMU2E1GVlT7+nSLU4zAV8E=";

subPackages = [
"cmd/crypt"
"cmd/deploy"
"cmd/deployd"
"cmd/hookd"
"cmd/leakdetect"
];

nativeBuildInputs = with pkgs; [
protobuf
protoc-gen-go
protoc-gen-go-grpc
];

doCheck = false;
};
in
{
packages.default = deploy;

devShells.default = pkgs.mkShell {
inputsFrom = [
deploy
envtest-bins
];
shellHook = ''
export KUBEBUILDER_ASSETS="${envtest-bins}/bin"
mkdir -p .testbin
ln -sfn ${pkgs.etcd}/bin/etcd .testbin/etcd
ln -sfn ${pkgs.kubernetes}/bin/kube-apiserver .testbin/kube-apiserver
ln -sfn ${pkgs.kubernetes}/bin/kubectl .testbin/kubectl
'';
};

formatter = treefmt.config.build.wrapper;
}
);
}
3 changes: 3 additions & 0 deletions pkg/grpc/dispatchserver/dispatchserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (s *dispatchServer) Deployments(opts *pb.GetDeploymentOpts, stream pb.Dispa
case req := <-c:
err := stream.Send(req.request)
req.wait <- err
if err != nil {
return err
}
case <-time.After(30 * time.Minute):
log.Warnf("Connection from cluster '%s' timed out", opts.Cluster)
return fmt.Errorf("timeout")
Expand Down
74 changes: 74 additions & 0 deletions pkg/grpc/dispatchserver/dispatchserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,84 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
)

type erringDeploymentsStream struct {
ctx context.Context
}

func (s *erringDeploymentsStream) Send(*pb.DeploymentRequest) error {
return status.Error(codes.Internal, "stream terminated by RST_STREAM with error code: INTERNAL_ERROR")
}

func (s *erringDeploymentsStream) Context() context.Context { return s.ctx }

func (s *erringDeploymentsStream) SetHeader(metadata.MD) error { return nil }

func (s *erringDeploymentsStream) SendHeader(metadata.MD) error { return nil }

func (s *erringDeploymentsStream) SetTrailer(metadata.MD) {}

func (s *erringDeploymentsStream) SendMsg(any) error { return nil }

func (s *erringDeploymentsStream) RecvMsg(any) error { return nil }

func TestDeploymentsUnregistersClusterWhenSendFails(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, _ = telemetry.New(ctx, "test", "")

deploymentStore := database.MockDeploymentStore{}
deploymentStore.On("HistoricDeployments", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)

mockApiClients, _ := apiclient.NewMockClient(t)
ds := New(&deploymentStore, mockApiClients.Deployments()).(*dispatchServer)

stream := &erringDeploymentsStream{ctx: ctx}
done := make(chan error, 1)
go func() {
done <- ds.Deployments(&pb.GetDeploymentOpts{Cluster: "dev", StartupTime: pb.TimeAsTimestamp(time.Now())}, stream)
}()

requireEventually(t, time.Second, func() bool {
return len(ds.onlineClusters()) == 1
})

err := ds.SendDeploymentRequest(ctx, &pb.DeploymentRequest{Cluster: "dev"})
if status.Code(err) != codes.Internal {
t.Fatalf("expected send error to be propagated as Internal, got %v", err)
}

requireEventually(t, time.Second, func() bool {
return len(ds.onlineClusters()) == 0
})

select {
case err := <-done:
if err == nil {
t.Fatal("expected Deployments to exit with send error")
}
case <-time.After(time.Second):
t.Fatal("Deployments did not exit after stream Send failed")
}
}

func requireEventually(t *testing.T, timeout time.Duration, condition func() bool) {
t.Helper()

deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if condition() {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatal("condition was not satisfied before timeout")
}

func bufDialer(b *bufconn.Listener) func(context.Context, string) (net.Conn, error) {
return func(context.Context, string) (net.Conn, error) {
return b.Dial()
Expand Down
Loading