Skip to content

Commit 78c2110

Browse files
committed
fix(controller,agent): restart flow, runner demand scaling, and composite layer wiring
1 parent ca27e46 commit 78c2110

10 files changed

Lines changed: 419 additions & 7 deletions

api/v1alpha1/impvmrunnerpool_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ type RunnerPlatformSpec struct {
4949

5050
// RunnerScopeSpec selects org-level or repo-level runner registration.
5151
// Exactly one of Org or Repo must be set.
52-
// +kubebuilder:validation:XValidation:rule="!(size(self.org) > 0 && size(self.repo) > 0)",message="org and repo are mutually exclusive; set exactly one"
52+
// +kubebuilder:validation:XValidation:rule="(size(self.org) > 0) != (size(self.repo) > 0)",message="set exactly one of org or repo"
5353
type RunnerScopeSpec struct {
5454
// Org registers a runner for the entire organisation.
5555
// +optional

api/v1alpha1/impvmrunnerpool_types_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package v1alpha1
22

33
import (
44
"encoding/json"
5+
"os"
6+
"strings"
57
"testing"
68

79
"github.com/stretchr/testify/assert"
@@ -36,3 +38,14 @@ func TestImpVMRunnerPool_roundTrip(t *testing.T) {
3638
assert.Equal(t, "my-org", out.Spec.Platform.Scope.Org)
3739
assert.Equal(t, int32(10), out.Spec.Scaling.MaxConcurrent)
3840
}
41+
42+
func TestImpVMRunnerPoolCRD_scopeValidationRequiresExactlyOne(t *testing.T) {
43+
b, err := os.ReadFile("../../config/crd/bases/imp.dev_impvmrunnerpools.yaml")
44+
require.NoError(t, err)
45+
46+
yaml := string(b)
47+
assert.Contains(t, yaml, "set exactly one of org or repo")
48+
assert.Contains(t, yaml, "(size(self.org) > 0) != (size(self.repo) > 0)")
49+
assert.False(t, strings.Contains(yaml, "!(size(self.org) > 0 && size(self.repo) > 0)"),
50+
"scope validation should require at least one of org or repo, not only mutual exclusion")
51+
}

config/crd/bases/imp.dev_impvmrunnerpools.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ spec:
117117
type: string
118118
type: object
119119
x-kubernetes-validations:
120-
- message: org and repo are mutually exclusive; set exactly one
121-
rule: '!(size(self.org) > 0 && size(self.repo) > 0)'
120+
- message: set exactly one of org or repo
121+
rule: (size(self.org) > 0) != (size(self.repo) > 0)
122122
serverURL:
123123
description: ServerURL is required for GitLab and Forgejo. Leave
124124
empty for github.com.

config/rbac/role.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ rules:
2020
verbs:
2121
- create
2222
- patch
23+
- apiGroups:
24+
- ""
25+
resources:
26+
- secrets
27+
verbs:
28+
- get
2329
- apiGroups:
2430
- cilium.io
2531
resources:

internal/agent/firecracker_driver.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ type fcProc struct {
5050
probeCancel context.CancelFunc // non-nil when probe goroutine is running
5151
}
5252

53+
type rootfsBuilder interface {
54+
Build(ctx context.Context, imageRef string, opts ...rootfs.BuildOption) (string, error)
55+
BuildComposite(ctx context.Context, baseImage string, extraLayers []string, opts ...rootfs.BuildOption) (string, error)
56+
}
57+
5358
// FirecrackerDriver is a VMDriver that launches real Firecracker microVMs.
5459
// It is safe for concurrent use.
5560
type FirecrackerDriver struct {
@@ -62,7 +67,7 @@ type FirecrackerDriver struct {
6267
// KernelArgs is the kernel command-line string.
6368
KernelArgs string
6469
// Cache builds and caches ext4 rootfs images from OCI images.
65-
Cache *rootfs.Builder
70+
Cache rootfsBuilder
6671
// Client is the controller-runtime Kubernetes client.
6772
Client ctrlclient.Client
6873
// Net manages host-level networking (bridge, TAP, NAT).
@@ -153,7 +158,7 @@ func (d *FirecrackerDriver) Start(ctx context.Context, vm *impdevv1alpha1.ImpVM)
153158
if gaEnabled {
154159
buildOpts = append(buildOpts, rootfs.WithGuestAgent(d.guestAgentPath()))
155160
}
156-
rootfsPath, err := d.Cache.Build(ctx, vm.Spec.Image, buildOpts...)
161+
rootfsPath, err := d.buildRootfs(ctx, vm, buildOpts...)
157162
if err != nil {
158163
return 0, fmt.Errorf("build rootfs for %s: %w", vm.Spec.Image, err)
159164
}
@@ -235,6 +240,22 @@ func (d *FirecrackerDriver) Start(ctx context.Context, vm *impdevv1alpha1.ImpVM)
235240
return int64(pid), nil
236241
}
237242

243+
func (d *FirecrackerDriver) buildRootfs(
244+
ctx context.Context, vm *impdevv1alpha1.ImpVM, opts ...rootfs.BuildOption,
245+
) (string, error) {
246+
extraLayers := make([]string, 0, 2)
247+
if vm.Spec.RunnerLayer != "" {
248+
extraLayers = append(extraLayers, vm.Spec.RunnerLayer)
249+
}
250+
if vm.Spec.CiliumLayer != "" {
251+
extraLayers = append(extraLayers, vm.Spec.CiliumLayer)
252+
}
253+
if len(extraLayers) == 0 {
254+
return d.Cache.Build(ctx, vm.Spec.Image, opts...)
255+
}
256+
return d.Cache.BuildComposite(ctx, vm.Spec.Image, extraLayers, opts...)
257+
}
258+
238259
// Stop implements VMDriver. Sends a graceful ACPI shutdown signal, waits up to
239260
// shuttingDownTimeout, then force-terminates the Firecracker process and cleans up
240261
// the Unix socket. Safe to call on a VM that was never started or already stopped.

internal/agent/firecracker_driver_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package agent
44

55
import (
66
"context"
7+
"fmt"
78
"os"
89
"os/exec"
910
"testing"
@@ -20,6 +21,36 @@ import (
2021
pb "github.com/syscode-labs/imp/internal/proto/guest"
2122
)
2223

24+
type fakeRootfsBuilder struct {
25+
buildCalled bool
26+
buildCompositeCalled bool
27+
lastBaseImage string
28+
lastExtraLayers []string
29+
returnPath string
30+
returnErr error
31+
}
32+
33+
func (f *fakeRootfsBuilder) Build(_ context.Context, imageRef string, _ ...rootfs.BuildOption) (string, error) {
34+
f.buildCalled = true
35+
f.lastBaseImage = imageRef
36+
if f.returnErr != nil {
37+
return "", f.returnErr
38+
}
39+
return f.returnPath, nil
40+
}
41+
42+
func (f *fakeRootfsBuilder) BuildComposite(
43+
_ context.Context, baseImage string, extraLayers []string, _ ...rootfs.BuildOption,
44+
) (string, error) {
45+
f.buildCompositeCalled = true
46+
f.lastBaseImage = baseImage
47+
f.lastExtraLayers = append([]string(nil), extraLayers...)
48+
if f.returnErr != nil {
49+
return "", f.returnErr
50+
}
51+
return f.returnPath, nil
52+
}
53+
2354
// hasFirecrackerBin returns true if the firecracker binary is available.
2455
func hasFirecrackerBin() bool {
2556
_, err := exec.LookPath("firecracker")
@@ -36,6 +67,56 @@ func TestFirecrackerDriverPlaceholder(t *testing.T) {
3667
t.Log("FirecrackerDriver test file compiles correctly")
3768
}
3869

70+
func TestFirecrackerDriver_buildRootfs_UsesBuildWhenNoExtraLayers(t *testing.T) {
71+
b := &fakeRootfsBuilder{returnPath: "/tmp/base.ext4"}
72+
d := &FirecrackerDriver{Cache: b}
73+
74+
vm := &impdevv1alpha1.ImpVM{}
75+
vm.Spec.Image = "ghcr.io/example/base:latest"
76+
77+
path, err := d.buildRootfs(context.Background(), vm)
78+
if err != nil {
79+
t.Fatalf("buildRootfs: %v", err)
80+
}
81+
if !b.buildCalled {
82+
t.Fatal("expected Build to be called")
83+
}
84+
if b.buildCompositeCalled {
85+
t.Fatal("did not expect BuildComposite to be called")
86+
}
87+
if path != "/tmp/base.ext4" {
88+
t.Fatalf("path = %q, want %q", path, "/tmp/base.ext4")
89+
}
90+
}
91+
92+
func TestFirecrackerDriver_buildRootfs_UsesBuildCompositeWithLayerOrder(t *testing.T) {
93+
b := &fakeRootfsBuilder{returnPath: "/tmp/composite.ext4"}
94+
d := &FirecrackerDriver{Cache: b}
95+
96+
vm := &impdevv1alpha1.ImpVM{}
97+
vm.Spec.Image = "ghcr.io/example/base:latest"
98+
vm.Spec.RunnerLayer = "ghcr.io/example/runner:v1"
99+
vm.Spec.CiliumLayer = "ghcr.io/example/cilium:v1"
100+
101+
path, err := d.buildRootfs(context.Background(), vm)
102+
if err != nil {
103+
t.Fatalf("buildRootfs: %v", err)
104+
}
105+
if !b.buildCompositeCalled {
106+
t.Fatal("expected BuildComposite to be called")
107+
}
108+
if b.buildCalled {
109+
t.Fatal("did not expect Build to be called")
110+
}
111+
wantLayers := []string{"ghcr.io/example/runner:v1", "ghcr.io/example/cilium:v1"}
112+
if fmt.Sprint(b.lastExtraLayers) != fmt.Sprint(wantLayers) {
113+
t.Fatalf("extraLayers = %v, want %v", b.lastExtraLayers, wantLayers)
114+
}
115+
if path != "/tmp/composite.ext4" {
116+
t.Fatalf("path = %q, want %q", path, "/tmp/composite.ext4")
117+
}
118+
}
119+
39120
func TestFirecrackerDriver_SocketPath(t *testing.T) {
40121
d := &FirecrackerDriver{SocketDir: "/run/imp/sockets"}
41122

internal/controller/impvmrunnerpool_controller.go

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"time"
2223

24+
corev1 "k8s.io/api/core/v1"
2325
apierrors "k8s.io/apimachinery/pkg/api/errors"
2426
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2527
"k8s.io/apimachinery/pkg/runtime"
@@ -28,18 +30,31 @@ import (
2830
logf "sigs.k8s.io/controller-runtime/pkg/log"
2931

3032
impv1alpha1 "github.com/syscode-labs/imp/api/v1alpha1"
33+
"github.com/syscode-labs/imp/internal/runner"
3134
)
3235

3336
// ImpVMRunnerPoolReconciler reconciles ImpVMRunnerPool objects.
3437
type ImpVMRunnerPoolReconciler struct {
3538
client.Client
36-
Scheme *runtime.Scheme
39+
Scheme *runtime.Scheme
40+
DriverFactory RunnerDriverFactory
3741
}
3842

43+
type runnerQueueDepthReader interface {
44+
QueueDepth(ctx context.Context) (int, error)
45+
}
46+
47+
type RunnerDriverFactory func(
48+
ctx context.Context,
49+
c client.Client,
50+
pool *impv1alpha1.ImpVMRunnerPool,
51+
) (runnerQueueDepthReader, error)
52+
3953
// +kubebuilder:rbac:groups=imp.dev,resources=impvmrunnerpools,verbs=get;list;watch;create;update;patch;delete
4054
// +kubebuilder:rbac:groups=imp.dev,resources=impvmrunnerpools/status,verbs=get;update;patch
4155
// +kubebuilder:rbac:groups=imp.dev,resources=impvmtemplates,verbs=get;list;watch
4256
// +kubebuilder:rbac:groups=imp.dev,resources=impvms,verbs=get;list;watch;create;delete
57+
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get
4358

4459
func (r *ImpVMRunnerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
4560
log := logf.FromContext(ctx)
@@ -113,7 +128,15 @@ func (r *ImpVMRunnerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
113128
maxConcurrent = pool.Spec.Scaling.MaxConcurrent
114129
}
115130

116-
toCreate := minIdle - activeCount
131+
desiredCount := minIdle
132+
queueDepth, err := r.queueDepth(ctx, pool)
133+
if err != nil {
134+
log.Info("could not fetch runner queue depth; falling back to minIdle", "pool", pool.Name, "err", err)
135+
} else if int32(queueDepth) > desiredCount {
136+
desiredCount = int32(queueDepth)
137+
}
138+
139+
toCreate := desiredCount - activeCount
117140
if toCreate < 0 {
118141
toCreate = 0
119142
}
@@ -153,6 +176,24 @@ func (r *ImpVMRunnerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
153176
return ctrl.Result{RequeueAfter: requeueAfter}, nil
154177
}
155178

179+
func (r *ImpVMRunnerPoolReconciler) queueDepth(ctx context.Context, pool *impv1alpha1.ImpVMRunnerPool) (int, error) {
180+
if pool.Spec.JobDetection == nil ||
181+
pool.Spec.JobDetection.Polling == nil ||
182+
!pool.Spec.JobDetection.Polling.Enabled {
183+
return 0, nil
184+
}
185+
186+
factory := r.DriverFactory
187+
if factory == nil {
188+
factory = defaultRunnerDriverFactory
189+
}
190+
d, err := factory(ctx, r.Client, pool)
191+
if err != nil {
192+
return 0, err
193+
}
194+
return d.QueueDepth(ctx)
195+
}
196+
156197
func (r *ImpVMRunnerPoolReconciler) createRunnerVM(ctx context.Context, pool *impv1alpha1.ImpVMRunnerPool, tpl *impv1alpha1.ImpVMTemplate) error {
157198
classRef := tpl.Spec.ClassRef
158199
vm := &impv1alpha1.ImpVM{
@@ -179,6 +220,14 @@ func (r *ImpVMRunnerPoolReconciler) createRunnerVM(ctx context.Context, pool *im
179220
if tpl.Spec.NetworkGroup != "" {
180221
vm.Spec.NetworkGroup = tpl.Spec.NetworkGroup
181222
}
223+
if pool.Spec.RunnerLayer != "" {
224+
vm.Spec.RunnerLayer = pool.Spec.RunnerLayer
225+
} else if tpl.Spec.RunnerLayer != "" {
226+
vm.Spec.RunnerLayer = tpl.Spec.RunnerLayer
227+
}
228+
if tpl.Spec.CiliumLayer != "" {
229+
vm.Spec.CiliumLayer = tpl.Spec.CiliumLayer
230+
}
182231
if err := ctrl.SetControllerReference(pool, vm, r.Scheme); err != nil {
183232
return err
184233
}
@@ -191,3 +240,76 @@ func (r *ImpVMRunnerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
191240
Owns(&impv1alpha1.ImpVM{}).
192241
Complete(r)
193242
}
243+
244+
func defaultRunnerDriverFactory(
245+
ctx context.Context,
246+
c client.Client,
247+
pool *impv1alpha1.ImpVMRunnerPool,
248+
) (runnerQueueDepthReader, error) {
249+
var creds corev1.Secret
250+
if err := c.Get(ctx, client.ObjectKey{
251+
Namespace: pool.Namespace,
252+
Name: pool.Spec.Platform.CredentialsSecret,
253+
}, &creds); err != nil {
254+
return nil, err
255+
}
256+
token := pickSecretValue(creds.Data, "token")
257+
if token == "" {
258+
return nil, fmt.Errorf("credentials secret %s/%s has no token value", pool.Namespace, creds.Name)
259+
}
260+
261+
scope, err := platformScope(pool)
262+
if err != nil {
263+
return nil, err
264+
}
265+
266+
switch pool.Spec.Platform.Type {
267+
case "github-actions":
268+
return runner.NewGitHubDriver(token, scope, nil)
269+
case "forgejo":
270+
return runner.NewForgejoDriver(token, pool.Spec.Platform.ServerURL, scope, nil)
271+
case "gitlab":
272+
return runner.NewGitLabDriver(token, pool.Spec.Platform.ServerURL, scope, nil)
273+
default:
274+
return nil, fmt.Errorf("unsupported platform type %q", pool.Spec.Platform.Type)
275+
}
276+
}
277+
278+
func platformScope(pool *impv1alpha1.ImpVMRunnerPool) (string, error) {
279+
if pool.Spec.Platform.Scope == nil {
280+
return "", fmt.Errorf("platform.scope is required")
281+
}
282+
scope := pool.Spec.Platform.Scope
283+
switch pool.Spec.Platform.Type {
284+
case "github-actions", "forgejo":
285+
if scope.Org != "" {
286+
return "org:" + scope.Org, nil
287+
}
288+
if scope.Repo != "" {
289+
return "repo:" + scope.Repo, nil
290+
}
291+
case "gitlab":
292+
if scope.Org != "" {
293+
return "group:" + scope.Org, nil
294+
}
295+
if scope.Repo != "" {
296+
return "project:" + scope.Repo, nil
297+
}
298+
}
299+
return "", fmt.Errorf("invalid platform.scope for type %q", pool.Spec.Platform.Type)
300+
}
301+
302+
func pickSecretValue(m map[string][]byte, preferredKey string) string {
303+
if len(m) == 0 {
304+
return ""
305+
}
306+
if v, ok := m[preferredKey]; ok && len(v) > 0 {
307+
return string(v)
308+
}
309+
for _, v := range m {
310+
if len(v) > 0 {
311+
return string(v)
312+
}
313+
}
314+
return ""
315+
}

0 commit comments

Comments
 (0)