Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
golang.org/x/time v0.15.0
google.golang.org/grpc v1.81.1
k8s.io/api v0.35.1
k8s.io/apiextensions-apiserver v0.35.1
k8s.io/apimachinery v0.35.1
k8s.io/client-go v0.35.1
k8s.io/component-base v0.35.1
Expand Down Expand Up @@ -124,7 +125,6 @@ require (
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apiextensions-apiserver v0.35.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20251125145642-4e65d59e963e // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
Expand Down
193 changes: 193 additions & 0 deletions pkg/controller/awaitcontroller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2026 SPDX-FileCopyrightText: SAP SE or an SAP affiliate company and Gardener contributors
// SPDX-License-Identifier: Apache-2.0

package controller

import (
"context"
"errors"

extensionsv1alpha1 "github.com/gardener/gardener/pkg/apis/extensions/v1alpha1"
"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
otelcolv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
dynamicfake "k8s.io/client-go/dynamic/fake"
ctrl "sigs.k8s.io/controller-runtime"

"github.com/gardener/logging/v1/pkg/client/api"
)

// fakeController is the minimal Controller implementation `build` callbacks
// can return; it tracks how many times Stop has been invoked so we can assert
// that successful builds make it through to the channel.
type fakeController struct {
stopped int
}

func (*fakeController) GetClient(_ string) (api.Output, bool) { return nil, false }
func (*fakeController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, nil
}
func (f *fakeController) Stop() { f.stopped++ }

// fakeDynamicScheme builds a runtime.Scheme that knows about
// CustomResourceDefinition. NewSimpleDynamicClient needs the scheme to derive
// the list-kind for the resources it serves.
func fakeDynamicScheme() *runtime.Scheme {
s := runtime.NewScheme()
utilruntime.Must(apiextensionsv1.AddToScheme(s))

return s
}

// establishedCRD returns a CRD whose Established/NamesAccepted conditions are
// True. pendingCRD returns one whose conditions are False. Splitting these
// keeps the call sites self-describing and avoids a control-flag parameter.
func establishedCRD(name, group string) *unstructured.Unstructured {
return crdUnstructuredWithStatus(name, group, apiextensionsv1.ConditionTrue)
}

func pendingCRD(name, group string) *unstructured.Unstructured {
return crdUnstructuredWithStatus(name, group, apiextensionsv1.ConditionFalse)
}

func crdUnstructuredWithStatus(name, group string, status apiextensionsv1.ConditionStatus) *unstructured.Unstructured {
conds := []apiextensionsv1.CustomResourceDefinitionCondition{
{Type: apiextensionsv1.Established, Status: status},
{Type: apiextensionsv1.NamesAccepted, Status: status},
}

crd := &apiextensionsv1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
Kind: "CustomResourceDefinition",
},
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{Group: group},
Status: apiextensionsv1.CustomResourceDefinitionStatus{Conditions: conds},
}
out, err := runtime.DefaultUnstructuredConverter.ToUnstructured(crd)
Expect(err).NotTo(HaveOccurred())

return &unstructured.Unstructured{Object: out}
}

var _ = Describe("awaitController", func() {
var (
ctx context.Context
cancel context.CancelFunc
l logr.Logger
)

BeforeEach(func() {
ctx, cancel = context.WithCancel(context.Background())
l = logr.Discard()
})

AfterEach(func() { cancel() })

It("delivers the controller for Cluster once that CRD is Established", func() {
fc := &fakeController{}
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(),
establishedCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud"))

out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn,
func(_ context.Context) (Controller, error) { return fc, nil },
)
Expect(err).NotTo(HaveOccurred())

var got Controller
Eventually(out, "5s", "20ms").Should(Receive(&got))
Expect(got).To(BeIdenticalTo(Controller(fc)))
})

It("delivers the controller for OpenTelemetryCollector once that CRD is Established", func() {
fc := &fakeController{}
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(),
establishedCRD("opentelemetrycollectors.opentelemetry.io", "opentelemetry.io"))

out, err := awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dyn,
func(_ context.Context) (Controller, error) { return fc, nil },
)
Expect(err).NotTo(HaveOccurred())

Eventually(out, "5s", "20ms").Should(Receive(BeIdenticalTo(Controller(fc))))
})

It("does not deliver while the CRD exists but is not yet Established", func() {
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(),
pendingCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud"))

built := false
out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn,
func(_ context.Context) (Controller, error) {
built = true

return &fakeController{}, nil
},
)
Expect(err).NotTo(HaveOccurred())

Consistently(out, "300ms", "20ms").ShouldNot(Receive())
Expect(built).To(BeFalse())
})

It("ignores CRDs in a different group even if their name matches", func() {
// Same plural+name shape but a different group — must not match.
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(),
establishedCRD("clusters.extensions.gardener.cloud", "wrong.group.example.com"))

out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn,
func(_ context.Context) (Controller, error) { return &fakeController{}, nil },
)
Expect(err).NotTo(HaveOccurred())

Consistently(out, "300ms", "20ms").ShouldNot(Receive())
})

It("closes the channel without a value when ctx is cancelled before the CRD shows up", func() {
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme())

out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn,
func(_ context.Context) (Controller, error) {
Fail("build must not run when ctx is cancelled before the CRD is established")

return nil, nil
},
)
Expect(err).NotTo(HaveOccurred())

cancel()
Eventually(out, "2s", "20ms").Should(BeClosed())
})

It("closes the channel without a value when `build` returns an error", func() {
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(),
establishedCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud"))

out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn,
func(_ context.Context) (Controller, error) {
return nil, errors.New("boom")
},
)
Expect(err).NotTo(HaveOccurred())

Eventually(out, "5s", "20ms").Should(BeClosed())
})

It("returns a synchronous error when the typed object is not registered in the scheme", func() {
// extensionsv1alpha1.Cluster against otelcolScheme — not registered there.
dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme())

_, err := awaitController(ctx, l, otelcolScheme, &extensionsv1alpha1.Cluster{}, dyn,
func(_ context.Context) (Controller, error) { return &fakeController{}, nil },
)
Expect(err).To(HaveOccurred())
})
})
68 changes: 57 additions & 11 deletions pkg/controller/cluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,77 @@ type clusterReconciler struct {
}

// newClusterController creates a new Controller for Cluster resources.
// It sets up a manager and reconciler for Cluster resources.
func newClusterController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) {
var err error
var seedClient api.Output

//
// The seed client is created synchronously, then awaitController is started:
// it watches the apiextensions API and, once the Cluster CRD is Established,
// builds the controller-runtime manager and reconciler and delivers the
// resulting Controller on the returned channel.
func newClusterController(
ctx context.Context,
conf *config.Config,
l logr.Logger,
m *metrics.FluentBitGardenerMetrics,
ms *otlp.MetricsSetup,
) (<-chan Controller, error) {
cfgShallowCopy := *conf
cfgShallowCopy.OTLPConfig.DQueConfig.DQueName = conf.OTLPConfig.DQueConfig.DQueName + "-controller"
opt := []client.Option{client.WithTarget(targets.Seed), client.WithLogger(l), client.WithMetrics(m), client.WithOTLPMetricsSetup(ms)}
cfgShallowCopy.OTLPConfig.DQueConfig.DQueName = fmt.Sprintf(
"%s-controller",
conf.OTLPConfig.DQueConfig.DQueName,
)

if seedClient, err = client.NewClient(ctx, cfgShallowCopy, opt...); err != nil {
opt := []client.Option{
client.WithTarget(targets.Seed),
client.WithLogger(l),
client.WithMetrics(m),
client.WithOTLPMetricsSetup(ms),
}

seedClient, err := client.NewClient(ctx, cfgShallowCopy, opt...)
if err != nil {
return nil, fmt.Errorf("failed to create seed client in controller: %w", err)
}
m.Clients.WithLabelValues(targets.Seed.String()).Inc()

restConfig, err := getRestConfig()
dynamicClient, err := newDynamicClient()
if err != nil {
seedClient.StopWait()

return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}

out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dynamicClient,
func(ctx context.Context) (Controller, error) {
return buildClusterReconciler(ctx, conf, l, m, ms, seedClient)
},
)
if err != nil {
seedClient.StopWait()

return nil, fmt.Errorf("failed to await Cluster controller: %w", err)
}

return out, nil
}

// buildClusterReconciler constructs the controller-runtime manager and the
// clusterReconciler. It is invoked by awaitController once the Cluster CRD
// is observed on the cluster.
func buildClusterReconciler(
ctx context.Context,
conf *config.Config,
l logr.Logger,
m *metrics.FluentBitGardenerMetrics,
ms *otlp.MetricsSetup,
seedClient api.Output,
) (Controller, error) {
restConfig, err := getRestConfig()
if err != nil {
return nil, fmt.Errorf("failed to get REST config: %w", err)
}

ctlCtx, cancel := context.WithCancel(ctx)

ctrl.SetLogger(l)

mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
Logger: l,
Expand Down Expand Up @@ -121,7 +167,7 @@ func newClusterController(ctx context.Context, conf *config.Config, l logr.Logge
metricsSetup: ms,
}

if err = ctrl.NewControllerManagedBy(mgr).
if err := ctrl.NewControllerManagedBy(mgr).
For(&extensionsv1alpha1.Cluster{}).
Named(fmt.Sprintf("cluster-%s", uuid.NewUUID())).
Complete(reconciler); err != nil {
Expand Down
21 changes: 20 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"

"github.com/go-logr/logr"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -35,7 +36,13 @@ type Controller interface {
// It sets up a manager and reconciler based on the configuration:
// - If WatchOpenTelemetryCollector is true, it watches OpenTelemetryCollector resources
// - Otherwise (default), it watches Cluster resources
func NewController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) {
func NewController(
ctx context.Context,
conf *config.Config,
l logr.Logger,
m *metrics.FluentBitGardenerMetrics,
ms *otlp.MetricsSetup,
) (<-chan Controller, error) {
if conf.ControllerConfig.WatchOpenTelemetryCollector {
l.Info("using OpenTelemetryCollector mode for dynamic clients")

Expand All @@ -62,3 +69,15 @@ func getRestConfig() (*rest.Config, error) {

return clientcmd.BuildConfigFromFlags("", kubeconfig)
}

// newDynamicClient builds a dynamic.Interface against the in-cluster (or
// KUBECONFIG) REST config. Extracted so reconcilers can construct one without
// repeating the getRestConfig + dynamic.NewForConfig boilerplate.
func newDynamicClient() (dynamic.Interface, error) {
restConfig, err := getRestConfig()
if err != nil {
return nil, fmt.Errorf("failed to get REST config: %w", err)
}

return dynamic.NewForConfig(restConfig)
}
37 changes: 35 additions & 2 deletions pkg/controller/otelcol_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ type otelCollectorReconciler struct {
}

// newOpenTelemetryCollectorController creates a new Controller for OpenTelemetryCollector resources.
// It sets up a manager and reconciler for OpenTelemetryCollector resources.
func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) {
//
// Selectors and the DynamicHostRegex are parsed synchronously, then
// awaitController watches the apiextensions API and, once the
// OpenTelemetryCollector CRD is Established, builds the controller-runtime
// manager and reconciler and delivers the resulting Controller on the
// returned channel.
func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) {
// Parse the label selector for OpenTelemetryCollector resources
labelSelector, err := parseLabelSelector(conf.ControllerConfig.OpenTelemetryCollectorLabelSelector)
if err != nil {
Expand All @@ -88,6 +93,34 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi
conf.ControllerConfig.DynamicHostRegex, err)
}

dynamicClient, err := newDynamicClient()
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}

return awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dynamicClient,
func(ctx context.Context) (Controller, error) {
return buildOpenTelemetryCollectorReconciler(
ctx, conf, l, m, ms,
labelSelector, namespaceLabelSelector, dynamicHostRegex,
)
},
)
}

// buildOpenTelemetryCollectorReconciler constructs the controller-runtime
// manager and the otelCollectorReconciler. It is invoked by awaitController
// once the OpenTelemetryCollector CRD is observed on the cluster.
func buildOpenTelemetryCollectorReconciler(
ctx context.Context,
conf *config.Config,
l logr.Logger,
m *metrics.FluentBitGardenerMetrics,
ms *otlp.MetricsSetup,
labelSelector labels.Selector,
namespaceLabelSelector labels.Selector,
dynamicHostRegex *regexp.Regexp,
) (Controller, error) {
restConfig, err := getRestConfig()
if err != nil {
return nil, fmt.Errorf("failed to get REST config: %w", err)
Expand Down
Loading
Loading