From da9bbddef6dd4e463c3ca78729d1eb369884d133 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Mon, 15 Jun 2026 22:11:17 +0300 Subject: [PATCH 01/13] feat: make controller funcs return channels Signed-off-by: Ilia Petrov --- pkg/controller/cluster_reconciler.go | 8 +++- pkg/controller/controller.go | 2 +- pkg/controller/otelcol_reconciler.go | 8 +++- pkg/plugin/logging.go | 59 ++++++++++++++++++++++------ 4 files changed, 60 insertions(+), 17 deletions(-) diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index 02a8aae9..6bd2f534 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -61,7 +61,9 @@ type clusterReconciler struct { // newClusterController creates a new Controller for Cluster resources. // It sets up a manager and reconciler for Cluster resources. -func newClusterController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) { +func newClusterController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { + ch := make(chan Controller, 1) + var err error var seedClient api.Output @@ -153,7 +155,9 @@ func newClusterController(ctx context.Context, conf *config.Config, l logr.Logge l.Info("controller started and cache synced") - return reconciler, nil + ch <- reconciler + + return ch, nil } // NewControllerWithClient creates a Controller with a pre-configured client. diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 18ed9201..a4f293be 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -35,7 +35,7 @@ type Controller interface { // It sets up a manager and reconciler based on the configuration: // - If WatchOpenTelemetryCollector is true, it watches OpenTelemetryCollector resources // - Otherwise (default), it watches Cluster resources -func NewController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) { +func NewController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { if conf.ControllerConfig.WatchOpenTelemetryCollector { l.Info("using OpenTelemetryCollector mode for dynamic clients") diff --git a/pkg/controller/otelcol_reconciler.go b/pkg/controller/otelcol_reconciler.go index 925282b8..cdc3e725 100644 --- a/pkg/controller/otelcol_reconciler.go +++ b/pkg/controller/otelcol_reconciler.go @@ -66,7 +66,9 @@ type otelCollectorReconciler struct { // newOpenTelemetryCollectorController creates a new Controller for OpenTelemetryCollector resources. // It sets up a manager and reconciler for OpenTelemetryCollector resources. -func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) { +func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { + ch := make(chan Controller, 1) + // Parse the label selector for OpenTelemetryCollector resources labelSelector, err := parseLabelSelector(conf.ControllerConfig.OpenTelemetryCollectorLabelSelector) if err != nil { @@ -170,7 +172,9 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi l.Info("OpenTelemetryCollector controller started and cache synced") - return reconciler, nil + ch <- reconciler + + return ch, nil } // buildLabelPredicate creates a predicate that filters OpenTelemetryCollector resources diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index 90308c24..bb9167ac 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -7,6 +7,7 @@ import ( "context" "errors" "regexp" + "sync" "github.com/go-logr/logr" @@ -31,6 +32,7 @@ type logging struct { cfg *config.Config dynamicHostRegexp *regexp.Regexp extractKubernetesMetadataRegexp *regexp.Regexp + ctrlMu sync.RWMutex controller controller.Controller logger logr.Logger ctx context.Context @@ -59,11 +61,27 @@ func NewPlugin(cfg *config.Config, logger logr.Logger, m *metrics.FluentBitGarde l.dynamicHostRegexp = regexp.MustCompile(cfg.ControllerConfig.DynamicHostRegex) // Pass the plugin's context to the controller - if l.controller, err = controller.NewController(ctx, cfg, logger, m, ms); err != nil { + ctlCh, err := controller.NewController(ctx, cfg, logger, m, ms) + if err != nil { cancel() return nil, err } + + // The controller is delivered asynchronously: the plugin starts without + // one and routes dynamic-host records to nil (dropped) until it arrives. + // getClient handles the nil case. + go func() { + select { + case c, ok := <-ctlCh: + if !ok { + return + } + l.setController(c) + logger.Info("controller installed in plugin") + case <-ctx.Done(): + } + }() } if cfg.PluginConfig.KubernetesMetadata.FallbackToTagWhenMetadataIsMissing { @@ -96,13 +114,13 @@ func NewPluginWithController(cfg *config.Config, logger logr.Logger, m *metrics. ctx, cancel := context.WithCancel(context.Background()) l := &logging{ - cfg: cfg, - logger: logger, - ctx: ctx, - cancel: cancel, - controller: ctl, - metrics: m, + cfg: cfg, + logger: logger, + ctx: ctx, + cancel: cancel, + metrics: m, } + l.setController(ctl) if len(cfg.ControllerConfig.DynamicHostPath) > 0 { l.dynamicHostRegexp = regexp.MustCompile(cfg.ControllerConfig.DynamicHostRegex) @@ -207,8 +225,8 @@ func (l *logging) Close() { l.cancel() l.seedClient.StopWait() - if l.controller != nil { - l.controller.Stop() + if c := l.getController(); c != nil { + c.Stop() } l.logger.Info("logging plugin stopped", @@ -218,9 +236,13 @@ func (l *logging) Close() { } func (l *logging) getClient(dynamicHosName string) api.Output { - if l.isDynamicHost(dynamicHosName) && l.controller != nil { - if c, isStopped := l.controller.GetClient(dynamicHosName); !isStopped { - return c + if l.isDynamicHost(dynamicHosName) { + c := l.getController() + if c == nil { + return nil + } + if out, isStopped := c.GetClient(dynamicHosName); !isStopped { + return out } return nil @@ -229,6 +251,19 @@ func (l *logging) getClient(dynamicHosName string) api.Output { return l.seedClient } +func (l *logging) getController() controller.Controller { + l.ctrlMu.RLock() + defer l.ctrlMu.RUnlock() + + return l.controller +} + +func (l *logging) setController(c controller.Controller) { + l.ctrlMu.Lock() + defer l.ctrlMu.Unlock() + l.controller = c +} + func (l *logging) isDynamicHost(dynamicHostName string) bool { return dynamicHostName != "" && l.dynamicHostRegexp != nil && From 5085e5e88e1d30ccb38f948d0c02288a92dcb463 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Mon, 15 Jun 2026 22:30:57 +0300 Subject: [PATCH 02/13] feat: add wait for crd logic in both reconcilers Signed-off-by: Ilia Petrov --- go.mod | 2 +- pkg/controller/cluster_reconciler.go | 19 +++++ pkg/controller/otelcol_reconciler.go | 15 ++++ pkg/controller/utils.go | 100 +++++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f76e03ba..12ea3000 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( golang.org/x/time v0.15.0 google.golang.org/grpc v1.81.1 k8s.io/api v0.35.1 + k8s.io/apiextensions-apiserver v0.35.1 k8s.io/apimachinery v0.35.1 k8s.io/client-go v0.35.1 k8s.io/component-base v0.35.1 @@ -124,7 +125,6 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - k8s.io/apiextensions-apiserver v0.35.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20251125145642-4e65d59e963e // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index 6bd2f534..ab8348e5 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -17,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/dynamic" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,6 +31,11 @@ import ( "github.com/gardener/logging/v1/pkg/targets" ) +const ( + clusterCRDGroup = "extensions.gardener.cloud" + clusterCRDName = "clusters.extensions.gardener.cloud" +) + var scheme = func() *runtime.Scheme { s := runtime.NewScheme() utilruntime.Must(extensionsv1alpha1.AddToScheme(s)) @@ -83,6 +89,19 @@ func newClusterController(ctx context.Context, conf *config.Config, l logr.Logge return nil, fmt.Errorf("failed to get REST config: %w", err) } + dynamicClient, err := dynamic.NewForConfig(restConfig) + if err != nil { + seedClient.StopWait() + + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + + if err := waitForCRD(ctx, l, dynamicClient, clusterCRDGroup, clusterCRDName); err != nil { + seedClient.StopWait() + + return nil, fmt.Errorf("failed waiting for CRD %s: %w", clusterCRDName, err) + } + ctlCtx, cancel := context.WithCancel(ctx) ctrl.SetLogger(l) diff --git a/pkg/controller/otelcol_reconciler.go b/pkg/controller/otelcol_reconciler.go index cdc3e725..2705c2c8 100644 --- a/pkg/controller/otelcol_reconciler.go +++ b/pkg/controller/otelcol_reconciler.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/dynamic" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -35,6 +36,11 @@ import ( "github.com/gardener/logging/v1/pkg/targets" ) +const ( + otelcolCRDGroup = "opentelemetry.io" + otelcolCRDName = "opentelemetrycollectors.opentelemetry.io" +) + var otelcolScheme = func() *runtime.Scheme { s := runtime.NewScheme() utilruntime.Must(otelcolv1beta1.AddToScheme(s)) @@ -95,6 +101,15 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi return nil, fmt.Errorf("failed to get REST config: %w", err) } + dynamicClient, err := dynamic.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + + if err := waitForCRD(ctx, l, dynamicClient, otelcolCRDGroup, otelcolCRDName); err != nil { + return nil, fmt.Errorf("failed waiting for CRD %s: %w", otelcolCRDName, err) + } + ctlCtx, cancel := context.WithCancel(ctx) ctrl.SetLogger(l) diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index e0380971..f326de9f 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -4,12 +4,23 @@ package controller import ( + "context" "encoding/json" + "errors" "fmt" + "sync" gardencorev1beta1 "github.com/gardener/gardener/pkg/apis/core/v1beta1" v1beta1constants "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants" extensionsv1alpha1 "github.com/gardener/gardener/pkg/apis/extensions/v1alpha1" + "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + clientgocache "k8s.io/client-go/tools/cache" ) func shootFromCluster(cluster *extensionsv1alpha1.Cluster) (*gardencorev1beta1.Shoot, error) { @@ -82,3 +93,92 @@ func getShootState(shoot *gardencorev1beta1.Shoot) clusterState { return clusterStateReady } + +// crdGVR is the GroupVersionResource of the CustomResourceDefinition kind itself. +var crdGVR = schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", +} + +// isCRDEstablished returns true if the given CRD object is in group `group`, +// is Established, and has its names accepted. +func isCRDEstablished(u *unstructured.Unstructured, group string) bool { + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, crd); err != nil { + return false + } + + if crd.Spec.Group != group { + return false + } + + established := false + namesAccepted := false + for _, c := range crd.Status.Conditions { + switch c.Type { + case apiextensionsv1.Established: + established = c.Status == apiextensionsv1.ConditionTrue + case apiextensionsv1.NamesAccepted: + namesAccepted = c.Status == apiextensionsv1.ConditionTrue + } + } + + return established && namesAccepted +} + +// waitForCRD blocks until the CRD identified by (group, name) is observed as +// Established with names accepted, or until ctx is cancelled. +// +// It runs a dynamic informer on CustomResourceDefinitions and stops it as soon +// as the CRD shows up. +func waitForCRD(ctx context.Context, l logr.Logger, dynamicClient dynamic.Interface, group, name string) error { + factory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) + informer := factory.ForResource(crdGVR).Informer() + + // Cancellable context drives the informer's lifecycle; we cancel it once the CRD shows up. + informerCtx, cancelInformer := context.WithCancel(ctx) + defer cancelInformer() + + found := make(chan struct{}) + var once sync.Once + + check := func(obj any) { + u, ok := obj.(*unstructured.Unstructured) + if !ok { + return + } + if u.GetName() != name { + return + } + if !isCRDEstablished(u, group) { + return + } + once.Do(func() { + l.Info("target CRD is established, stopping informer", "crd", name) + close(found) + }) + } + + if _, err := informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { check(obj) }, + UpdateFunc: func(_, newObj any) { check(newObj) }, + }); err != nil { + return fmt.Errorf("failed to add event handler: %w", err) + } + + factory.Start(informerCtx.Done()) + if !clientgocache.WaitForCacheSync(informerCtx.Done(), informer.HasSynced) { + return errors.New("failed to sync CRD informer cache") + } + + l.Info("waiting for CRD to become available", "crd", name) + + select { + case <-found: + // cancelInformer via defer shuts the informer down. + return nil + case <-ctx.Done(): + return ctx.Err() + } +} From f5342e321bf9756a8460b5de80ef37a87273eb80 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Mon, 15 Jun 2026 22:39:40 +0300 Subject: [PATCH 03/13] fix: if the controller is not available, fall back to the seed client Signed-off-by: Ilia Petrov --- pkg/plugin/logging.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index bb9167ac..cf257f04 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -68,9 +68,9 @@ func NewPlugin(cfg *config.Config, logger logr.Logger, m *metrics.FluentBitGarde return nil, err } - // The controller is delivered asynchronously: the plugin starts without - // one and routes dynamic-host records to nil (dropped) until it arrives. - // getClient handles the nil case. + // The controller is delivered asynchronously (it waits for its CRD to be + // established). Until it arrives, getClient routes dynamic-host records + // to the seed client as a fallback so they are not dropped. go func() { select { case c, ok := <-ctlCh: @@ -239,7 +239,9 @@ func (l *logging) getClient(dynamicHosName string) api.Output { if l.isDynamicHost(dynamicHosName) { c := l.getController() if c == nil { - return nil + // Controller not yet available (e.g. CRD not installed). + // Fall back to the seed client so records aren't dropped. + return l.seedClient } if out, isStopped := c.GetClient(dynamicHosName); !isStopped { return out From 2ad03112d8f9353c9c03a6a2d2618121ed6496ad Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Wed, 17 Jun 2026 09:34:42 +0300 Subject: [PATCH 04/13] chore: move setting of the controller to the logging constructor in NewPluginWithController Signed-off-by: Ilia Petrov --- pkg/plugin/logging.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index cf257f04..6e5bac75 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -114,13 +114,13 @@ func NewPluginWithController(cfg *config.Config, logger logr.Logger, m *metrics. ctx, cancel := context.WithCancel(context.Background()) l := &logging{ - cfg: cfg, - logger: logger, - ctx: ctx, - cancel: cancel, - metrics: m, + cfg: cfg, + logger: logger, + ctx: ctx, + cancel: cancel, + controller: ctl, + metrics: m, } - l.setController(ctl) if len(cfg.ControllerConfig.DynamicHostPath) > 0 { l.dynamicHostRegexp = regexp.MustCompile(cfg.ControllerConfig.DynamicHostRegex) From 74abf1f8a47efe166d2541c90a585ffc026abb3c Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Wed, 17 Jun 2026 18:24:03 +0300 Subject: [PATCH 05/13] feat: make the waiting for CRD non-blocking Signed-off-by: Ilia Petrov --- pkg/controller/cluster_reconciler.go | 97 ++++++++++++++++------------ pkg/controller/controller.go | 10 ++- pkg/controller/otelcol_reconciler.go | 51 +++++++++------ pkg/controller/utils.go | 93 +++++++++++++++++--------- 4 files changed, 153 insertions(+), 98 deletions(-) diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index ab8348e5..40f6c150 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -17,7 +17,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/dynamic" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,11 +30,6 @@ import ( "github.com/gardener/logging/v1/pkg/targets" ) -const ( - clusterCRDGroup = "extensions.gardener.cloud" - clusterCRDName = "clusters.extensions.gardener.cloud" -) - var scheme = func() *runtime.Scheme { s := runtime.NewScheme() utilruntime.Must(extensionsv1alpha1.AddToScheme(s)) @@ -66,65 +60,88 @@ type clusterReconciler struct { } // newClusterController creates a new Controller for Cluster resources. -// It sets up a manager and reconciler for Cluster resources. -func newClusterController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { - ch := make(chan Controller, 1) - - var err error - var seedClient api.Output - +// +// The seed client is created synchronously, then awaitController is started: +// it watches the apiextensions API and, once the Cluster CRD is Established, +// builds the controller-runtime manager and reconciler and delivers the +// resulting Controller on the returned channel. +func newClusterController( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, +) (<-chan Controller, error) { cfgShallowCopy := *conf - cfgShallowCopy.OTLPConfig.DQueConfig.DQueName = conf.OTLPConfig.DQueConfig.DQueName + "-controller" - opt := []client.Option{client.WithTarget(targets.Seed), client.WithLogger(l), client.WithMetrics(m), client.WithOTLPMetricsSetup(ms)} + cfgShallowCopy.OTLPConfig.DQueConfig.DQueName = fmt.Sprintf( + "%s-controller", + conf.OTLPConfig.DQueConfig.DQueName, + ) - if seedClient, err = client.NewClient(ctx, cfgShallowCopy, opt...); err != nil { - return nil, fmt.Errorf("failed to create seed client in controller: %w", err) + opt := []client.Option{ + client.WithTarget(targets.Seed), + client.WithLogger(l), + client.WithMetrics(m), + client.WithOTLPMetricsSetup(ms), } - m.Clients.WithLabelValues(targets.Seed.String()).Inc() - restConfig, err := getRestConfig() + seedClient, err := client.NewClient(ctx, cfgShallowCopy, opt...) if err != nil { - seedClient.StopWait() - - return nil, fmt.Errorf("failed to get REST config: %w", err) + return nil, fmt.Errorf("failed to create seed client in controller: %w", err) } + m.Clients.WithLabelValues(targets.Seed.String()).Inc() - dynamicClient, err := dynamic.NewForConfig(restConfig) + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, + func(ctx context.Context) (Controller, error) { + return buildClusterReconciler(ctx, conf, l, m, ms, seedClient) + }, + ) if err != nil { seedClient.StopWait() - - return nil, fmt.Errorf("failed to create dynamic client: %w", err) + return nil, fmt.Errorf("failed to await Cluster controller: %w", err) } + return out, nil +} - if err := waitForCRD(ctx, l, dynamicClient, clusterCRDGroup, clusterCRDName); err != nil { - seedClient.StopWait() - - return nil, fmt.Errorf("failed waiting for CRD %s: %w", clusterCRDName, err) +// buildClusterReconciler constructs the controller-runtime manager and the +// clusterReconciler. It is invoked by awaitController once the Cluster CRD +// is observed on the cluster. +func buildClusterReconciler( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, + seedClient api.Output, +) (Controller, error) { + restConfig, err := getRestConfig() + if err != nil { + return nil, fmt.Errorf("failed to get REST config: %w", err) } ctlCtx, cancel := context.WithCancel(ctx) ctrl.SetLogger(l) - mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ Scheme: scheme, Logger: l, Cache: cache.Options{ - // Restrict cache to Cluster objects only; this controller does not reconcile other types. + // Restrict cache to Cluster objects only. + // This controller doesn't reconcile other types. ByObject: map[k8sclient.Object]cache.ByObject{ &extensionsv1alpha1.Cluster{}: {}, }, - // Strip managed fields from all cached objects as they are not used by the reconciler. + // Strip managed fields from all cached objects + // as they are not used by the reconciler. DefaultTransform: cache.TransformStripManagedFields(), }, - // Disable metrics and health probe servers since fluent-bit plugin handles these + // Disable metrics and health probe servers + // since fluent-bit plugin handles these. Metrics: ctrl.Options{}.Metrics, HealthProbeBindAddress: "", }) if err != nil { cancel() - seedClient.StopWait() - return nil, fmt.Errorf("failed to create manager: %w", err) } @@ -142,13 +159,11 @@ func newClusterController(ctx context.Context, conf *config.Config, l logr.Logge metricsSetup: ms, } - if err = ctrl.NewControllerManagedBy(mgr). + if err := ctrl.NewControllerManagedBy(mgr). For(&extensionsv1alpha1.Cluster{}). Named(fmt.Sprintf("cluster-%s", uuid.NewUUID())). Complete(reconciler); err != nil { cancel() - seedClient.StopWait() - return nil, fmt.Errorf("failed to create controller: %w", err) } @@ -167,16 +182,12 @@ func newClusterController(ctx context.Context, conf *config.Config, l logr.Logge if !mgr.GetCache().WaitForCacheSync(syncCtx) { cancel() - seedClient.StopWait() - return nil, errors.New("failed to wait for cache sync within timeout") } l.Info("controller started and cache synced") - ch <- reconciler - - return ch, nil + return reconciler, nil } // NewControllerWithClient creates a Controller with a pre-configured client. diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a4f293be..248af04b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -35,15 +35,19 @@ type Controller interface { // It sets up a manager and reconciler based on the configuration: // - If WatchOpenTelemetryCollector is true, it watches OpenTelemetryCollector resources // - Otherwise (default), it watches Cluster resources -func NewController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { +func NewController( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, +) (<-chan Controller, error) { if conf.ControllerConfig.WatchOpenTelemetryCollector { l.Info("using OpenTelemetryCollector mode for dynamic clients") - return newOpenTelemetryCollectorController(ctx, conf, l, m, ms) } l.Info("using Cluster mode for dynamic clients") - return newClusterController(ctx, conf, l, m, ms) } diff --git a/pkg/controller/otelcol_reconciler.go b/pkg/controller/otelcol_reconciler.go index 2705c2c8..de7ad33a 100644 --- a/pkg/controller/otelcol_reconciler.go +++ b/pkg/controller/otelcol_reconciler.go @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/client-go/dynamic" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -36,11 +35,6 @@ import ( "github.com/gardener/logging/v1/pkg/targets" ) -const ( - otelcolCRDGroup = "opentelemetry.io" - otelcolCRDName = "opentelemetrycollectors.opentelemetry.io" -) - var otelcolScheme = func() *runtime.Scheme { s := runtime.NewScheme() utilruntime.Must(otelcolv1beta1.AddToScheme(s)) @@ -71,10 +65,13 @@ type otelCollectorReconciler struct { } // newOpenTelemetryCollectorController creates a new Controller for OpenTelemetryCollector resources. -// It sets up a manager and reconciler for OpenTelemetryCollector resources. +// +// Selectors and the DynamicHostRegex are parsed synchronously, then +// awaitController watches the apiextensions API and, once the +// OpenTelemetryCollector CRD is Established, builds the controller-runtime +// manager and reconciler and delivers the resulting Controller on the +// returned channel. func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { - ch := make(chan Controller, 1) - // Parse the label selector for OpenTelemetryCollector resources labelSelector, err := parseLabelSelector(conf.ControllerConfig.OpenTelemetryCollectorLabelSelector) if err != nil { @@ -96,20 +93,34 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi conf.ControllerConfig.DynamicHostRegex, err) } + return awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, + func(ctx context.Context) (Controller, error) { + return buildOpenTelemetryCollectorReconciler( + ctx, conf, l, m, ms, + labelSelector, namespaceLabelSelector, dynamicHostRegex, + ) + }, + ) +} + +// buildOpenTelemetryCollectorReconciler constructs the controller-runtime +// manager and the otelCollectorReconciler. It is invoked by awaitController +// once the OpenTelemetryCollector CRD is observed on the cluster. +func buildOpenTelemetryCollectorReconciler( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, + labelSelector labels.Selector, + namespaceLabelSelector labels.Selector, + dynamicHostRegex *regexp.Regexp, +) (Controller, error) { restConfig, err := getRestConfig() if err != nil { return nil, fmt.Errorf("failed to get REST config: %w", err) } - dynamicClient, err := dynamic.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to create dynamic client: %w", err) - } - - if err := waitForCRD(ctx, l, dynamicClient, otelcolCRDGroup, otelcolCRDName); err != nil { - return nil, fmt.Errorf("failed waiting for CRD %s: %w", otelcolCRDName, err) - } - ctlCtx, cancel := context.WithCancel(ctx) ctrl.SetLogger(l) @@ -187,9 +198,7 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi l.Info("OpenTelemetryCollector controller started and cache synced") - ch <- reconciler - - return ch, nil + return reconciler, nil } // buildLabelPredicate creates a predicate that filters OpenTelemetryCollector resources diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index f326de9f..602d7a6a 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "sync" gardencorev1beta1 "github.com/gardener/gardener/pkg/apis/core/v1beta1" @@ -21,6 +22,8 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" clientgocache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) func shootFromCluster(cluster *extensionsv1alpha1.Cluster) (*gardencorev1beta1.Shoot, error) { @@ -127,58 +130,86 @@ func isCRDEstablished(u *unstructured.Unstructured, group string) bool { return established && namesAccepted } -// waitForCRD blocks until the CRD identified by (group, name) is observed as -// Established with names accepted, or until ctx is cancelled. +// awaitController watches for the target CRD. Once that CRD is observed as +// Established with its names accepted, it invokes `build` and delivers the +// resulting Controller on the returned channel. // -// It runs a dynamic informer on CustomResourceDefinitions and stops it as soon -// as the CRD shows up. -func waitForCRD(ctx context.Context, l logr.Logger, dynamicClient dynamic.Interface, group, name string) error { +// The returned channel always closes — either after delivering the Controller, +// or without a value if ctx is cancelled before the CRD shows up, or if `build` fails. +func awaitController( + ctx context.Context, + l logr.Logger, + scheme *runtime.Scheme, + obj client.Object, + build func(ctx context.Context) (Controller, error), +) (<-chan Controller, error) { + gvk, err := apiutil.GVKForObject(obj, scheme) + if err != nil { + return nil, fmt.Errorf("failed to derive GVK for %T: %w", obj, err) + } + // CRD plural follows the kubebuilder convention of lowercasing the Kind and adding "s". + crdName := strings.ToLower(gvk.Kind) + "s." + gvk.Group + + restConfig, err := getRestConfig() + if err != nil { + return nil, fmt.Errorf("failed to get REST config: %w", err) + } + + dynamicClient, err := dynamic.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + factory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) informer := factory.ForResource(crdGVR).Informer() - // Cancellable context drives the informer's lifecycle; we cancel it once the CRD shows up. - informerCtx, cancelInformer := context.WithCancel(ctx) - defer cancelInformer() - - found := make(chan struct{}) var once sync.Once - - check := func(obj any) { - u, ok := obj.(*unstructured.Unstructured) + found := make(chan struct{}) + check := func(o any) { + u, ok := o.(*unstructured.Unstructured) if !ok { return } - if u.GetName() != name { + if u.GetName() != crdName { return } - if !isCRDEstablished(u, group) { + if !isCRDEstablished(u, gvk.Group) { return } once.Do(func() { - l.Info("target CRD is established, stopping informer", "crd", name) + l.Info("target CRD is established", "crd", crdName) close(found) }) } - if _, err := informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { check(obj) }, - UpdateFunc: func(_, newObj any) { check(newObj) }, + AddFunc: func(o any) { check(o) }, + UpdateFunc: func(_, o any) { check(o) }, }); err != nil { - return fmt.Errorf("failed to add event handler: %w", err) + return nil, fmt.Errorf("failed to add event handler: %w", err) } - factory.Start(informerCtx.Done()) - if !clientgocache.WaitForCacheSync(informerCtx.Done(), informer.HasSynced) { - return errors.New("failed to sync CRD informer cache") + factory.Start(ctx.Done()) + if !clientgocache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, errors.New("failed to sync CRD informer cache") } + l.Info("waiting for CRD to become available", "crd", crdName) - l.Info("waiting for CRD to become available", "crd", name) + out := make(chan Controller, 1) + go func() { + defer close(out) - select { - case <-found: - // cancelInformer via defer shuts the informer down. - return nil - case <-ctx.Done(): - return ctx.Err() - } + select { + case <-found: + case <-ctx.Done(): + return + } + + c, err := build(ctx) + if err != nil { + l.Error(err, "failed to build controller after CRD became available", "crd", crdName) + return + } + out <- c + }() + return out, nil } From f720a525b2ca747eef89aa3219149203697c0586 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Wed, 17 Jun 2026 18:29:41 +0300 Subject: [PATCH 06/13] chore: get back the old way of closing seed client Signed-off-by: Ilia Petrov --- pkg/controller/cluster_reconciler.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index 40f6c150..902f7d8e 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -126,22 +126,21 @@ func buildClusterReconciler( Scheme: scheme, Logger: l, Cache: cache.Options{ - // Restrict cache to Cluster objects only. - // This controller doesn't reconcile other types. + // Restrict cache to Cluster objects only; this controller does not reconcile other types. ByObject: map[k8sclient.Object]cache.ByObject{ &extensionsv1alpha1.Cluster{}: {}, }, - // Strip managed fields from all cached objects - // as they are not used by the reconciler. + // Strip managed fields from all cached objects as they are not used by the reconciler. DefaultTransform: cache.TransformStripManagedFields(), }, - // Disable metrics and health probe servers - // since fluent-bit plugin handles these. + // Disable metrics and health probe servers since fluent-bit plugin handles these Metrics: ctrl.Options{}.Metrics, HealthProbeBindAddress: "", }) if err != nil { cancel() + seedClient.StopWait() + return nil, fmt.Errorf("failed to create manager: %w", err) } @@ -164,6 +163,8 @@ func buildClusterReconciler( Named(fmt.Sprintf("cluster-%s", uuid.NewUUID())). Complete(reconciler); err != nil { cancel() + seedClient.StopWait() + return nil, fmt.Errorf("failed to create controller: %w", err) } @@ -182,6 +183,8 @@ func buildClusterReconciler( if !mgr.GetCache().WaitForCacheSync(syncCtx) { cancel() + seedClient.StopWait() + return nil, errors.New("failed to wait for cache sync within timeout") } From 660ba4515be2918fc057d38886834eedf7ef5e32 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Wed, 17 Jun 2026 20:32:09 +0300 Subject: [PATCH 07/13] feat: add base tests for the awaitController Signed-off-by: Ilia Petrov --- pkg/controller/awaitcontroller_test.go | 186 +++++++++++++++++++++++++ pkg/controller/cluster_reconciler.go | 8 +- pkg/controller/controller.go | 12 ++ pkg/controller/otelcol_reconciler.go | 7 +- pkg/controller/utils.go | 17 +-- 5 files changed, 215 insertions(+), 15 deletions(-) create mode 100644 pkg/controller/awaitcontroller_test.go diff --git a/pkg/controller/awaitcontroller_test.go b/pkg/controller/awaitcontroller_test.go new file mode 100644 index 00000000..0ab0e8fd --- /dev/null +++ b/pkg/controller/awaitcontroller_test.go @@ -0,0 +1,186 @@ +// Copyright 2025 SPDX-FileCopyrightText: SAP SE or an SAP affiliate company and Gardener contributors +// SPDX-License-Identifier: Apache-2.0 + +package controller + +import ( + "context" + "errors" + + extensionsv1alpha1 "github.com/gardener/gardener/pkg/apis/extensions/v1alpha1" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + otelcolv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + dynamicfake "k8s.io/client-go/dynamic/fake" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/gardener/logging/v1/pkg/client/api" +) + +// fakeController is the minimal Controller implementation `build` callbacks +// can return; it tracks how many times Stop has been invoked so we can assert +// that successful builds make it through to the channel. +type fakeController struct { + stopped int +} + +func (f *fakeController) GetClient(_ string) (api.Output, bool) { return nil, false } +func (f *fakeController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, nil +} +func (f *fakeController) Stop() { f.stopped++ } + +// fakeDynamicScheme builds a runtime.Scheme that knows about +// CustomResourceDefinition. NewSimpleDynamicClient needs the scheme to derive +// the list-kind for the resources it serves. +func fakeDynamicScheme() *runtime.Scheme { + s := runtime.NewScheme() + utilruntime.Must(apiextensionsv1.AddToScheme(s)) + return s +} + +// crdUnstructured returns a CustomResourceDefinition as *unstructured.Unstructured +// so the fake dynamic client can serve it. The CRD is built typed first to keep +// the test readable, then converted. +func crdUnstructured(name, group string, established bool) *unstructured.Unstructured { + conds := []apiextensionsv1.CustomResourceDefinitionCondition{} + status := apiextensionsv1.ConditionFalse + if established { + status = apiextensionsv1.ConditionTrue + } + conds = append(conds, + apiextensionsv1.CustomResourceDefinitionCondition{Type: apiextensionsv1.Established, Status: status}, + apiextensionsv1.CustomResourceDefinitionCondition{Type: apiextensionsv1.NamesAccepted, Status: status}, + ) + + crd := &apiextensionsv1.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apiextensions.k8s.io/v1", + Kind: "CustomResourceDefinition", + }, + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{Group: group}, + Status: apiextensionsv1.CustomResourceDefinitionStatus{Conditions: conds}, + } + out, err := runtime.DefaultUnstructuredConverter.ToUnstructured(crd) + Expect(err).NotTo(HaveOccurred()) + return &unstructured.Unstructured{Object: out} +} + +var _ = Describe("awaitController", func() { + var ( + ctx context.Context + cancel context.CancelFunc + l logr.Logger + ) + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + l = logr.Discard() + }) + + AfterEach(func() { cancel() }) + + It("delivers the controller for Cluster once that CRD is Established", func() { + fc := &fakeController{} + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + crdUnstructured("clusters.extensions.gardener.cloud", "extensions.gardener.cloud", true)) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { return fc, nil }, + ) + Expect(err).NotTo(HaveOccurred()) + + var got Controller + Eventually(out, "5s", "20ms").Should(Receive(&got)) + Expect(got).To(BeIdenticalTo(Controller(fc))) + }) + + It("delivers the controller for OpenTelemetryCollector once that CRD is Established", func() { + fc := &fakeController{} + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + crdUnstructured("opentelemetrycollectors.opentelemetry.io", "opentelemetry.io", true)) + + out, err := awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dyn, + func(_ context.Context) (Controller, error) { return fc, nil }, + ) + Expect(err).NotTo(HaveOccurred()) + + Eventually(out, "5s", "20ms").Should(Receive(BeIdenticalTo(Controller(fc)))) + }) + + It("does not deliver while the CRD exists but is not yet Established", func() { + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + crdUnstructured("clusters.extensions.gardener.cloud", "extensions.gardener.cloud", false)) + + built := false + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { + built = true + return &fakeController{}, nil + }, + ) + Expect(err).NotTo(HaveOccurred()) + + Consistently(out, "300ms", "20ms").ShouldNot(Receive()) + Expect(built).To(BeFalse()) + }) + + It("ignores CRDs in a different group even if their name matches", func() { + // Same plural+name shape but a different group — must not match. + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + crdUnstructured("clusters.extensions.gardener.cloud", "wrong.group.example.com", true)) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { return &fakeController{}, nil }, + ) + Expect(err).NotTo(HaveOccurred()) + + Consistently(out, "300ms", "20ms").ShouldNot(Receive()) + }) + + It("closes the channel without a value when ctx is cancelled before the CRD shows up", func() { + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme()) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { + Fail("build must not run when ctx is cancelled before the CRD is established") + return nil, nil + }, + ) + Expect(err).NotTo(HaveOccurred()) + + cancel() + Eventually(out, "2s", "20ms").Should(BeClosed()) + }) + + It("closes the channel without a value when `build` returns an error", func() { + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + crdUnstructured("clusters.extensions.gardener.cloud", "extensions.gardener.cloud", true)) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { + return nil, errors.New("boom") + }, + ) + Expect(err).NotTo(HaveOccurred()) + + Eventually(out, "5s", "20ms").Should(BeClosed()) + }) + + It("returns a synchronous error when the typed object is not registered in the scheme", func() { + // extensionsv1alpha1.Cluster against otelcolScheme — not registered there. + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme()) + + _, err := awaitController(ctx, l, otelcolScheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { return &fakeController{}, nil }, + ) + Expect(err).To(HaveOccurred()) + }) +}) diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index 902f7d8e..b6538d11 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -91,7 +91,13 @@ func newClusterController( } m.Clients.WithLabelValues(targets.Seed.String()).Inc() - out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, + dynamicClient, err := newDynamicClient() + if err != nil { + seedClient.StopWait() + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dynamicClient, func(ctx context.Context) (Controller, error) { return buildClusterReconciler(ctx, conf, l, m, ms, seedClient) }, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 248af04b..d44b34a8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -9,6 +9,7 @@ import ( "os" "github.com/go-logr/logr" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ctrl "sigs.k8s.io/controller-runtime" @@ -66,3 +67,14 @@ func getRestConfig() (*rest.Config, error) { return clientcmd.BuildConfigFromFlags("", kubeconfig) } + +// newDynamicClient builds a dynamic.Interface against the in-cluster (or +// KUBECONFIG) REST config. Extracted so reconcilers can construct one without +// repeating the getRestConfig + dynamic.NewForConfig boilerplate. +func newDynamicClient() (dynamic.Interface, error) { + restConfig, err := getRestConfig() + if err != nil { + return nil, fmt.Errorf("failed to get REST config: %w", err) + } + return dynamic.NewForConfig(restConfig) +} diff --git a/pkg/controller/otelcol_reconciler.go b/pkg/controller/otelcol_reconciler.go index de7ad33a..b47d12d1 100644 --- a/pkg/controller/otelcol_reconciler.go +++ b/pkg/controller/otelcol_reconciler.go @@ -93,7 +93,12 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi conf.ControllerConfig.DynamicHostRegex, err) } - return awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, + dynamicClient, err := newDynamicClient() + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + + return awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dynamicClient, func(ctx context.Context) (Controller, error) { return buildOpenTelemetryCollectorReconciler( ctx, conf, l, m, ms, diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index 602d7a6a..f222a6dc 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -130,9 +130,9 @@ func isCRDEstablished(u *unstructured.Unstructured, group string) bool { return established && namesAccepted } -// awaitController watches for the target CRD. Once that CRD is observed as -// Established with its names accepted, it invokes `build` and delivers the -// resulting Controller on the returned channel. +// awaitController watches for the target CRD on the supplied dynamic client. +// Once that CRD is observed as Established with its names accepted, it invokes +// `build` and delivers the resulting Controller on the returned channel. // // The returned channel always closes — either after delivering the Controller, // or without a value if ctx is cancelled before the CRD shows up, or if `build` fails. @@ -141,6 +141,7 @@ func awaitController( l logr.Logger, scheme *runtime.Scheme, obj client.Object, + dynamicClient dynamic.Interface, build func(ctx context.Context) (Controller, error), ) (<-chan Controller, error) { gvk, err := apiutil.GVKForObject(obj, scheme) @@ -150,16 +151,6 @@ func awaitController( // CRD plural follows the kubebuilder convention of lowercasing the Kind and adding "s". crdName := strings.ToLower(gvk.Kind) + "s." + gvk.Group - restConfig, err := getRestConfig() - if err != nil { - return nil, fmt.Errorf("failed to get REST config: %w", err) - } - - dynamicClient, err := dynamic.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("failed to create dynamic client: %w", err) - } - factory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) informer := factory.ForResource(crdGVR).Informer() From 0fdbc7cf055f600f444d3b5071b0bfd6fdabed5a Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Wed, 17 Jun 2026 20:56:23 +0300 Subject: [PATCH 08/13] feat: add test to verify the switch from static to dynamic config Signed-off-by: Ilia Petrov --- pkg/plugin/logging_test.go | 70 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/pkg/plugin/logging_test.go b/pkg/plugin/logging_test.go index 361830c2..67e0a761 100644 --- a/pkg/plugin/logging_test.go +++ b/pkg/plugin/logging_test.go @@ -30,6 +30,32 @@ import ( "github.com/gardener/logging/v1/pkg/types" ) +// stubOutput is a no-op api.Output used to assert routing in plugin tests +// without involving any real client machinery. +type stubOutput struct { + name string +} + +func (*stubOutput) Handle(_ types.OutputEntry) error { return nil } +func (*stubOutput) Stop() {} +func (*stubOutput) StopWait() {} +func (s *stubOutput) Endpoint() string { return s.name } + +// stubController is a controller.Controller used to verify that the plugin +// routes dynamic-host records through the controller once it is installed. +type stubController struct { + clients map[string]api.Output +} + +// GetClient mirrors the real reconcilers' contract. +func (s *stubController) GetClient(name string) (api.Output, bool) { + return s.clients[name], false +} +func (*stubController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, nil +} +func (*stubController) Stop() {} + var _ = Describe("OutputPlugin plugin", func() { var ( cfg *config.Config @@ -346,6 +372,50 @@ var _ = Describe("OutputPlugin plugin", func() { Expect(l.isDynamicHost("random-name")).To(BeFalse()) Expect(l.isDynamicHost("garden")).To(BeFalse()) }) + + // Mirrors the awaitController flow at the plugin layer: until the + // controller is delivered, dynamic-host records fall back to the seed + // client; once it arrives, the plugin routes through it. + It("falls back to the seed client for dynamic hosts until the controller is installed", func() { + cfg.ControllerConfig = config.ControllerConfig{ + CtlSyncTimeout: 5 * time.Second, + DynamicHostPrefix: "http://logging.", + DynamicHostSuffix: ".svc:4318/v1/logs", + DynamicHostRegex: `^shoot--.*`, + DynamicHostPath: map[string]any{ + "kubernetes": map[string]any{ + "namespace_name": "namespace", + }, + }, + } + + plugin, err := NewPlugin(cfg, logger, testMetrics, nil) + Expect(err).NotTo(HaveOccurred()) + defer plugin.Close() + + l, ok := plugin.(*logging) + Expect(ok).To(BeTrue()) + Expect(l.seedClient).NotTo(BeNil()) + Expect(l.getController()).To(BeNil(), "no controller is installed yet") + + // Phase 1: CRD not yet established, controller not installed. + // A dynamic-host record must be routed to the seed client rather than dropped. + const dynamicHost = "shoot--proj--cluster" + Expect(l.isDynamicHost(dynamicHost)).To(BeTrue()) + Expect(l.getClient(dynamicHost)).To(BeIdenticalTo(l.seedClient)) + + // Phase 2: simulate awaitController delivering the controller. + shootClient := &stubOutput{name: "shoot-client"} + l.setController(&stubController{clients: map[string]api.Output{dynamicHost: shootClient}}) + + // Same dynamic host now resolves through the controller. + got := l.getClient(dynamicHost) + Expect(got).To(BeIdenticalTo(api.Output(shootClient))) + Expect(got).NotTo(BeIdenticalTo(l.seedClient)) + + // Non-dynamic hosts continue to use the seed client either way. + Expect(l.getClient("garden")).To(BeIdenticalTo(l.seedClient)) + }) }) Describe("Graceful Shutdown", func() { From 3455306a3e1ba143e34eab4be3e15a2b961b1138 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Thu, 18 Jun 2026 10:23:31 +0300 Subject: [PATCH 09/13] chore: add more logs Signed-off-by: Ilia Petrov --- pkg/plugin/logging.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index 6e5bac75..e5ab693a 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -71,14 +71,17 @@ func NewPlugin(cfg *config.Config, logger logr.Logger, m *metrics.FluentBitGarde // The controller is delivered asynchronously (it waits for its CRD to be // established). Until it arrives, getClient routes dynamic-host records // to the seed client as a fallback so they are not dropped. + logger.Info("controller pending: dynamic-host records will fall back to the seed client until the CRD is established") + go func() { select { case c, ok := <-ctlCh: if !ok { + logger.Info("controller channel closed before delivery; staying in seed-client fallback mode") return } l.setController(c) - logger.Info("controller installed in plugin") + logger.Info("controller installed in plugin: dynamic-host records now route through the controller (fallback ended)") case <-ctx.Done(): } }() @@ -241,6 +244,9 @@ func (l *logging) getClient(dynamicHosName string) api.Output { if c == nil { // Controller not yet available (e.g. CRD not installed). // Fall back to the seed client so records aren't dropped. + l.logger.V(1).Info("controller not installed yet, routing dynamic-host record to seed client", + "host", dynamicHosName, + ) return l.seedClient } if out, isStopped := c.GetClient(dynamicHosName); !isStopped { From d25ab7ae68234480d20e8e91feca62878e338787 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Thu, 18 Jun 2026 10:32:50 +0300 Subject: [PATCH 10/13] chore: wrong year in the header Signed-off-by: Ilia Petrov --- pkg/controller/awaitcontroller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/awaitcontroller_test.go b/pkg/controller/awaitcontroller_test.go index 0ab0e8fd..a5467894 100644 --- a/pkg/controller/awaitcontroller_test.go +++ b/pkg/controller/awaitcontroller_test.go @@ -1,4 +1,4 @@ -// Copyright 2025 SPDX-FileCopyrightText: SAP SE or an SAP affiliate company and Gardener contributors +// Copyright 2026 SPDX-FileCopyrightText: SAP SE or an SAP affiliate company and Gardener contributors // SPDX-License-Identifier: Apache-2.0 package controller From e8c8135c951fc59e0982ae27578a8fefdee864ee Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Thu, 18 Jun 2026 10:43:38 +0300 Subject: [PATCH 11/13] chore: missed chnage in logging test file Signed-off-by: Ilia Petrov --- pkg/plugin/logging.go | 2 +- pkg/plugin/logging_test.go | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index e5ab693a..b412ba8e 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -244,7 +244,7 @@ func (l *logging) getClient(dynamicHosName string) api.Output { if c == nil { // Controller not yet available (e.g. CRD not installed). // Fall back to the seed client so records aren't dropped. - l.logger.V(1).Info("controller not installed yet, routing dynamic-host record to seed client", + l.logger.Info("controller not installed yet, routing dynamic-host record to seed client", "host", dynamicHosName, ) return l.seedClient diff --git a/pkg/plugin/logging_test.go b/pkg/plugin/logging_test.go index 67e0a761..04eb1e9b 100644 --- a/pkg/plugin/logging_test.go +++ b/pkg/plugin/logging_test.go @@ -376,20 +376,40 @@ var _ = Describe("OutputPlugin plugin", func() { // Mirrors the awaitController flow at the plugin layer: until the // controller is delivered, dynamic-host records fall back to the seed // client; once it arrives, the plugin routes through it. + // + // Uses NewPluginWithController(ctl=nil) to avoid spinning up the real + // controller-runtime path inside NewPlugin, which needs an in-cluster + // or KUBECONFIG-based REST config and so fails in CI. It("falls back to the seed client for dynamic hosts until the controller is installed", func() { - cfg.ControllerConfig = config.ControllerConfig{ - CtlSyncTimeout: 5 * time.Second, - DynamicHostPrefix: "http://logging.", - DynamicHostSuffix: ".svc:4318/v1/logs", - DynamicHostRegex: `^shoot--.*`, - DynamicHostPath: map[string]any{ - "kubernetes": map[string]any{ - "namespace_name": "namespace", + cfg := &config.Config{ + OTLPConfig: config.OTLPConfig{ + Endpoint: "http://test-seed-endpoint:4318/v1/logs", + DQueConfig: config.DQueConfig{ + DQueDir: GinkgoT().TempDir(), + DQueSegmentSize: 500, + DQueSync: false, + DQueName: fmt.Sprintf("dque-fallback-%d", time.Now().UnixNano()), + }, + }, + PluginConfig: config.PluginConfig{ + SeedType: types.NOOP.String(), + ShootType: types.NOOP.String(), + LogLevel: "info", + }, + ControllerConfig: config.ControllerConfig{ + CtlSyncTimeout: 5 * time.Second, + DynamicHostPrefix: "http://logging.", + DynamicHostSuffix: ".svc:4318/v1/logs", + DynamicHostRegex: `^shoot--.*`, + DynamicHostPath: map[string]any{ + "kubernetes": map[string]any{ + "namespace_name": "namespace", + }, }, }, } - plugin, err := NewPlugin(cfg, logger, testMetrics, nil) + plugin, err := NewPluginWithController(cfg, logger, testMetrics, nil, nil) Expect(err).NotTo(HaveOccurred()) defer plugin.Close() From cc3b1dd117a80f68a5d4ba4a176d9af183565643 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Thu, 18 Jun 2026 11:26:33 +0300 Subject: [PATCH 12/13] chore: rm not needed comment Signed-off-by: Ilia Petrov --- pkg/plugin/logging_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/plugin/logging_test.go b/pkg/plugin/logging_test.go index 04eb1e9b..ee69df54 100644 --- a/pkg/plugin/logging_test.go +++ b/pkg/plugin/logging_test.go @@ -376,10 +376,6 @@ var _ = Describe("OutputPlugin plugin", func() { // Mirrors the awaitController flow at the plugin layer: until the // controller is delivered, dynamic-host records fall back to the seed // client; once it arrives, the plugin routes through it. - // - // Uses NewPluginWithController(ctl=nil) to avoid spinning up the real - // controller-runtime path inside NewPlugin, which needs an in-cluster - // or KUBECONFIG-based REST config and so fails in CI. It("falls back to the seed client for dynamic hosts until the controller is installed", func() { cfg := &config.Config{ OTLPConfig: config.OTLPConfig{ From cc48ec3881cfae8411f7e9985ff5c5cc2019e627 Mon Sep 17 00:00:00 2001 From: Ilia Petrov Date: Thu, 18 Jun 2026 14:00:02 +0300 Subject: [PATCH 13/13] chore: lint Signed-off-by: Ilia Petrov --- pkg/controller/awaitcontroller_test.go | 45 +++++++++++++++----------- pkg/controller/cluster_reconciler.go | 3 ++ pkg/controller/controller.go | 3 ++ pkg/controller/utils.go | 5 +++ pkg/plugin/logging.go | 2 ++ 5 files changed, 39 insertions(+), 19 deletions(-) diff --git a/pkg/controller/awaitcontroller_test.go b/pkg/controller/awaitcontroller_test.go index a5467894..525c0aba 100644 --- a/pkg/controller/awaitcontroller_test.go +++ b/pkg/controller/awaitcontroller_test.go @@ -30,8 +30,8 @@ type fakeController struct { stopped int } -func (f *fakeController) GetClient(_ string) (api.Output, bool) { return nil, false } -func (f *fakeController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) { +func (*fakeController) GetClient(_ string) (api.Output, bool) { return nil, false } +func (*fakeController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, nil } func (f *fakeController) Stop() { f.stopped++ } @@ -42,22 +42,26 @@ func (f *fakeController) Stop() { f.stopped++ } func fakeDynamicScheme() *runtime.Scheme { s := runtime.NewScheme() utilruntime.Must(apiextensionsv1.AddToScheme(s)) + return s } -// crdUnstructured returns a CustomResourceDefinition as *unstructured.Unstructured -// so the fake dynamic client can serve it. The CRD is built typed first to keep -// the test readable, then converted. -func crdUnstructured(name, group string, established bool) *unstructured.Unstructured { - conds := []apiextensionsv1.CustomResourceDefinitionCondition{} - status := apiextensionsv1.ConditionFalse - if established { - status = apiextensionsv1.ConditionTrue +// establishedCRD returns a CRD whose Established/NamesAccepted conditions are +// True. pendingCRD returns one whose conditions are False. Splitting these +// keeps the call sites self-describing and avoids a control-flag parameter. +func establishedCRD(name, group string) *unstructured.Unstructured { + return crdUnstructuredWithStatus(name, group, apiextensionsv1.ConditionTrue) +} + +func pendingCRD(name, group string) *unstructured.Unstructured { + return crdUnstructuredWithStatus(name, group, apiextensionsv1.ConditionFalse) +} + +func crdUnstructuredWithStatus(name, group string, status apiextensionsv1.ConditionStatus) *unstructured.Unstructured { + conds := []apiextensionsv1.CustomResourceDefinitionCondition{ + {Type: apiextensionsv1.Established, Status: status}, + {Type: apiextensionsv1.NamesAccepted, Status: status}, } - conds = append(conds, - apiextensionsv1.CustomResourceDefinitionCondition{Type: apiextensionsv1.Established, Status: status}, - apiextensionsv1.CustomResourceDefinitionCondition{Type: apiextensionsv1.NamesAccepted, Status: status}, - ) crd := &apiextensionsv1.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ @@ -70,6 +74,7 @@ func crdUnstructured(name, group string, established bool) *unstructured.Unstruc } out, err := runtime.DefaultUnstructuredConverter.ToUnstructured(crd) Expect(err).NotTo(HaveOccurred()) + return &unstructured.Unstructured{Object: out} } @@ -90,7 +95,7 @@ var _ = Describe("awaitController", func() { It("delivers the controller for Cluster once that CRD is Established", func() { fc := &fakeController{} dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), - crdUnstructured("clusters.extensions.gardener.cloud", "extensions.gardener.cloud", true)) + establishedCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud")) out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, func(_ context.Context) (Controller, error) { return fc, nil }, @@ -105,7 +110,7 @@ var _ = Describe("awaitController", func() { It("delivers the controller for OpenTelemetryCollector once that CRD is Established", func() { fc := &fakeController{} dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), - crdUnstructured("opentelemetrycollectors.opentelemetry.io", "opentelemetry.io", true)) + establishedCRD("opentelemetrycollectors.opentelemetry.io", "opentelemetry.io")) out, err := awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dyn, func(_ context.Context) (Controller, error) { return fc, nil }, @@ -117,12 +122,13 @@ var _ = Describe("awaitController", func() { It("does not deliver while the CRD exists but is not yet Established", func() { dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), - crdUnstructured("clusters.extensions.gardener.cloud", "extensions.gardener.cloud", false)) + pendingCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud")) built := false out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, func(_ context.Context) (Controller, error) { built = true + return &fakeController{}, nil }, ) @@ -135,7 +141,7 @@ var _ = Describe("awaitController", func() { It("ignores CRDs in a different group even if their name matches", func() { // Same plural+name shape but a different group — must not match. dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), - crdUnstructured("clusters.extensions.gardener.cloud", "wrong.group.example.com", true)) + establishedCRD("clusters.extensions.gardener.cloud", "wrong.group.example.com")) out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, func(_ context.Context) (Controller, error) { return &fakeController{}, nil }, @@ -151,6 +157,7 @@ var _ = Describe("awaitController", func() { out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, func(_ context.Context) (Controller, error) { Fail("build must not run when ctx is cancelled before the CRD is established") + return nil, nil }, ) @@ -162,7 +169,7 @@ var _ = Describe("awaitController", func() { It("closes the channel without a value when `build` returns an error", func() { dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), - crdUnstructured("clusters.extensions.gardener.cloud", "extensions.gardener.cloud", true)) + establishedCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud")) out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, func(_ context.Context) (Controller, error) { diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index b6538d11..e84f563d 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -94,6 +94,7 @@ func newClusterController( dynamicClient, err := newDynamicClient() if err != nil { seedClient.StopWait() + return nil, fmt.Errorf("failed to create dynamic client: %w", err) } @@ -104,8 +105,10 @@ func newClusterController( ) if err != nil { seedClient.StopWait() + return nil, fmt.Errorf("failed to await Cluster controller: %w", err) } + return out, nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d44b34a8..a70c248c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -45,10 +45,12 @@ func NewController( ) (<-chan Controller, error) { if conf.ControllerConfig.WatchOpenTelemetryCollector { l.Info("using OpenTelemetryCollector mode for dynamic clients") + return newOpenTelemetryCollectorController(ctx, conf, l, m, ms) } l.Info("using Cluster mode for dynamic clients") + return newClusterController(ctx, conf, l, m, ms) } @@ -76,5 +78,6 @@ func newDynamicClient() (dynamic.Interface, error) { if err != nil { return nil, fmt.Errorf("failed to get REST config: %w", err) } + return dynamic.NewForConfig(restConfig) } diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index f222a6dc..92324e02 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -124,6 +124,9 @@ func isCRDEstablished(u *unstructured.Unstructured, group string) bool { established = c.Status == apiextensionsv1.ConditionTrue case apiextensionsv1.NamesAccepted: namesAccepted = c.Status == apiextensionsv1.ConditionTrue + default: + // Other condition types (e.g. Terminating, KubernetesAPIApprovalPolicyConformant) + // don't gate readiness for our purposes. } } @@ -198,9 +201,11 @@ func awaitController( c, err := build(ctx) if err != nil { l.Error(err, "failed to build controller after CRD became available", "crd", crdName) + return } out <- c }() + return out, nil } diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index b412ba8e..5bd4eb69 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -78,6 +78,7 @@ func NewPlugin(cfg *config.Config, logger logr.Logger, m *metrics.FluentBitGarde case c, ok := <-ctlCh: if !ok { logger.Info("controller channel closed before delivery; staying in seed-client fallback mode") + return } l.setController(c) @@ -247,6 +248,7 @@ func (l *logging) getClient(dynamicHosName string) api.Output { l.logger.Info("controller not installed yet, routing dynamic-host record to seed client", "host", dynamicHosName, ) + return l.seedClient } if out, isStopped := c.GetClient(dynamicHosName); !isStopped {