diff --git a/go.mod b/go.mod index f76e03ba..12ea3000 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( golang.org/x/time v0.15.0 google.golang.org/grpc v1.81.1 k8s.io/api v0.35.1 + k8s.io/apiextensions-apiserver v0.35.1 k8s.io/apimachinery v0.35.1 k8s.io/client-go v0.35.1 k8s.io/component-base v0.35.1 @@ -124,7 +125,6 @@ require ( gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - k8s.io/apiextensions-apiserver v0.35.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20251125145642-4e65d59e963e // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect diff --git a/pkg/controller/awaitcontroller_test.go b/pkg/controller/awaitcontroller_test.go new file mode 100644 index 00000000..525c0aba --- /dev/null +++ b/pkg/controller/awaitcontroller_test.go @@ -0,0 +1,193 @@ +// Copyright 2026 SPDX-FileCopyrightText: SAP SE or an SAP affiliate company and Gardener contributors +// SPDX-License-Identifier: Apache-2.0 + +package controller + +import ( + "context" + "errors" + + extensionsv1alpha1 "github.com/gardener/gardener/pkg/apis/extensions/v1alpha1" + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + otelcolv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + dynamicfake "k8s.io/client-go/dynamic/fake" + ctrl "sigs.k8s.io/controller-runtime" + + "github.com/gardener/logging/v1/pkg/client/api" +) + +// fakeController is the minimal Controller implementation `build` callbacks +// can return; it tracks how many times Stop has been invoked so we can assert +// that successful builds make it through to the channel. +type fakeController struct { + stopped int +} + +func (*fakeController) GetClient(_ string) (api.Output, bool) { return nil, false } +func (*fakeController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, nil +} +func (f *fakeController) Stop() { f.stopped++ } + +// fakeDynamicScheme builds a runtime.Scheme that knows about +// CustomResourceDefinition. NewSimpleDynamicClient needs the scheme to derive +// the list-kind for the resources it serves. +func fakeDynamicScheme() *runtime.Scheme { + s := runtime.NewScheme() + utilruntime.Must(apiextensionsv1.AddToScheme(s)) + + return s +} + +// establishedCRD returns a CRD whose Established/NamesAccepted conditions are +// True. pendingCRD returns one whose conditions are False. Splitting these +// keeps the call sites self-describing and avoids a control-flag parameter. +func establishedCRD(name, group string) *unstructured.Unstructured { + return crdUnstructuredWithStatus(name, group, apiextensionsv1.ConditionTrue) +} + +func pendingCRD(name, group string) *unstructured.Unstructured { + return crdUnstructuredWithStatus(name, group, apiextensionsv1.ConditionFalse) +} + +func crdUnstructuredWithStatus(name, group string, status apiextensionsv1.ConditionStatus) *unstructured.Unstructured { + conds := []apiextensionsv1.CustomResourceDefinitionCondition{ + {Type: apiextensionsv1.Established, Status: status}, + {Type: apiextensionsv1.NamesAccepted, Status: status}, + } + + crd := &apiextensionsv1.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apiextensions.k8s.io/v1", + Kind: "CustomResourceDefinition", + }, + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{Group: group}, + Status: apiextensionsv1.CustomResourceDefinitionStatus{Conditions: conds}, + } + out, err := runtime.DefaultUnstructuredConverter.ToUnstructured(crd) + Expect(err).NotTo(HaveOccurred()) + + return &unstructured.Unstructured{Object: out} +} + +var _ = Describe("awaitController", func() { + var ( + ctx context.Context + cancel context.CancelFunc + l logr.Logger + ) + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + l = logr.Discard() + }) + + AfterEach(func() { cancel() }) + + It("delivers the controller for Cluster once that CRD is Established", func() { + fc := &fakeController{} + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + establishedCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud")) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { return fc, nil }, + ) + Expect(err).NotTo(HaveOccurred()) + + var got Controller + Eventually(out, "5s", "20ms").Should(Receive(&got)) + Expect(got).To(BeIdenticalTo(Controller(fc))) + }) + + It("delivers the controller for OpenTelemetryCollector once that CRD is Established", func() { + fc := &fakeController{} + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + establishedCRD("opentelemetrycollectors.opentelemetry.io", "opentelemetry.io")) + + out, err := awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dyn, + func(_ context.Context) (Controller, error) { return fc, nil }, + ) + Expect(err).NotTo(HaveOccurred()) + + Eventually(out, "5s", "20ms").Should(Receive(BeIdenticalTo(Controller(fc)))) + }) + + It("does not deliver while the CRD exists but is not yet Established", func() { + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + pendingCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud")) + + built := false + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { + built = true + + return &fakeController{}, nil + }, + ) + Expect(err).NotTo(HaveOccurred()) + + Consistently(out, "300ms", "20ms").ShouldNot(Receive()) + Expect(built).To(BeFalse()) + }) + + It("ignores CRDs in a different group even if their name matches", func() { + // Same plural+name shape but a different group — must not match. + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + establishedCRD("clusters.extensions.gardener.cloud", "wrong.group.example.com")) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { return &fakeController{}, nil }, + ) + Expect(err).NotTo(HaveOccurred()) + + Consistently(out, "300ms", "20ms").ShouldNot(Receive()) + }) + + It("closes the channel without a value when ctx is cancelled before the CRD shows up", func() { + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme()) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { + Fail("build must not run when ctx is cancelled before the CRD is established") + + return nil, nil + }, + ) + Expect(err).NotTo(HaveOccurred()) + + cancel() + Eventually(out, "2s", "20ms").Should(BeClosed()) + }) + + It("closes the channel without a value when `build` returns an error", func() { + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme(), + establishedCRD("clusters.extensions.gardener.cloud", "extensions.gardener.cloud")) + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { + return nil, errors.New("boom") + }, + ) + Expect(err).NotTo(HaveOccurred()) + + Eventually(out, "5s", "20ms").Should(BeClosed()) + }) + + It("returns a synchronous error when the typed object is not registered in the scheme", func() { + // extensionsv1alpha1.Cluster against otelcolScheme — not registered there. + dyn := dynamicfake.NewSimpleDynamicClient(fakeDynamicScheme()) + + _, err := awaitController(ctx, l, otelcolScheme, &extensionsv1alpha1.Cluster{}, dyn, + func(_ context.Context) (Controller, error) { return &fakeController{}, nil }, + ) + Expect(err).To(HaveOccurred()) + }) +}) diff --git a/pkg/controller/cluster_reconciler.go b/pkg/controller/cluster_reconciler.go index 02a8aae9..e84f563d 100644 --- a/pkg/controller/cluster_reconciler.go +++ b/pkg/controller/cluster_reconciler.go @@ -60,31 +60,77 @@ type clusterReconciler struct { } // newClusterController creates a new Controller for Cluster resources. -// It sets up a manager and reconciler for Cluster resources. -func newClusterController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) { - var err error - var seedClient api.Output - +// +// The seed client is created synchronously, then awaitController is started: +// it watches the apiextensions API and, once the Cluster CRD is Established, +// builds the controller-runtime manager and reconciler and delivers the +// resulting Controller on the returned channel. +func newClusterController( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, +) (<-chan Controller, error) { cfgShallowCopy := *conf - cfgShallowCopy.OTLPConfig.DQueConfig.DQueName = conf.OTLPConfig.DQueConfig.DQueName + "-controller" - opt := []client.Option{client.WithTarget(targets.Seed), client.WithLogger(l), client.WithMetrics(m), client.WithOTLPMetricsSetup(ms)} + cfgShallowCopy.OTLPConfig.DQueConfig.DQueName = fmt.Sprintf( + "%s-controller", + conf.OTLPConfig.DQueConfig.DQueName, + ) - if seedClient, err = client.NewClient(ctx, cfgShallowCopy, opt...); err != nil { + opt := []client.Option{ + client.WithTarget(targets.Seed), + client.WithLogger(l), + client.WithMetrics(m), + client.WithOTLPMetricsSetup(ms), + } + + seedClient, err := client.NewClient(ctx, cfgShallowCopy, opt...) + if err != nil { return nil, fmt.Errorf("failed to create seed client in controller: %w", err) } m.Clients.WithLabelValues(targets.Seed.String()).Inc() - restConfig, err := getRestConfig() + dynamicClient, err := newDynamicClient() if err != nil { seedClient.StopWait() + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + + out, err := awaitController(ctx, l, scheme, &extensionsv1alpha1.Cluster{}, dynamicClient, + func(ctx context.Context) (Controller, error) { + return buildClusterReconciler(ctx, conf, l, m, ms, seedClient) + }, + ) + if err != nil { + seedClient.StopWait() + + return nil, fmt.Errorf("failed to await Cluster controller: %w", err) + } + + return out, nil +} + +// buildClusterReconciler constructs the controller-runtime manager and the +// clusterReconciler. It is invoked by awaitController once the Cluster CRD +// is observed on the cluster. +func buildClusterReconciler( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, + seedClient api.Output, +) (Controller, error) { + restConfig, err := getRestConfig() + if err != nil { return nil, fmt.Errorf("failed to get REST config: %w", err) } ctlCtx, cancel := context.WithCancel(ctx) ctrl.SetLogger(l) - mgr, err := ctrl.NewManager(restConfig, ctrl.Options{ Scheme: scheme, Logger: l, @@ -121,7 +167,7 @@ func newClusterController(ctx context.Context, conf *config.Config, l logr.Logge metricsSetup: ms, } - if err = ctrl.NewControllerManagedBy(mgr). + if err := ctrl.NewControllerManagedBy(mgr). For(&extensionsv1alpha1.Cluster{}). Named(fmt.Sprintf("cluster-%s", uuid.NewUUID())). Complete(reconciler); err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 18ed9201..a70c248c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -9,6 +9,7 @@ import ( "os" "github.com/go-logr/logr" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ctrl "sigs.k8s.io/controller-runtime" @@ -35,7 +36,13 @@ type Controller interface { // It sets up a manager and reconciler based on the configuration: // - If WatchOpenTelemetryCollector is true, it watches OpenTelemetryCollector resources // - Otherwise (default), it watches Cluster resources -func NewController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) { +func NewController( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, +) (<-chan Controller, error) { if conf.ControllerConfig.WatchOpenTelemetryCollector { l.Info("using OpenTelemetryCollector mode for dynamic clients") @@ -62,3 +69,15 @@ func getRestConfig() (*rest.Config, error) { return clientcmd.BuildConfigFromFlags("", kubeconfig) } + +// newDynamicClient builds a dynamic.Interface against the in-cluster (or +// KUBECONFIG) REST config. Extracted so reconcilers can construct one without +// repeating the getRestConfig + dynamic.NewForConfig boilerplate. +func newDynamicClient() (dynamic.Interface, error) { + restConfig, err := getRestConfig() + if err != nil { + return nil, fmt.Errorf("failed to get REST config: %w", err) + } + + return dynamic.NewForConfig(restConfig) +} diff --git a/pkg/controller/otelcol_reconciler.go b/pkg/controller/otelcol_reconciler.go index 925282b8..b47d12d1 100644 --- a/pkg/controller/otelcol_reconciler.go +++ b/pkg/controller/otelcol_reconciler.go @@ -65,8 +65,13 @@ type otelCollectorReconciler struct { } // newOpenTelemetryCollectorController creates a new Controller for OpenTelemetryCollector resources. -// It sets up a manager and reconciler for OpenTelemetryCollector resources. -func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (Controller, error) { +// +// Selectors and the DynamicHostRegex are parsed synchronously, then +// awaitController watches the apiextensions API and, once the +// OpenTelemetryCollector CRD is Established, builds the controller-runtime +// manager and reconciler and delivers the resulting Controller on the +// returned channel. +func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Config, l logr.Logger, m *metrics.FluentBitGardenerMetrics, ms *otlp.MetricsSetup) (<-chan Controller, error) { // Parse the label selector for OpenTelemetryCollector resources labelSelector, err := parseLabelSelector(conf.ControllerConfig.OpenTelemetryCollectorLabelSelector) if err != nil { @@ -88,6 +93,34 @@ func newOpenTelemetryCollectorController(ctx context.Context, conf *config.Confi conf.ControllerConfig.DynamicHostRegex, err) } + dynamicClient, err := newDynamicClient() + if err != nil { + return nil, fmt.Errorf("failed to create dynamic client: %w", err) + } + + return awaitController(ctx, l, otelcolScheme, &otelcolv1beta1.OpenTelemetryCollector{}, dynamicClient, + func(ctx context.Context) (Controller, error) { + return buildOpenTelemetryCollectorReconciler( + ctx, conf, l, m, ms, + labelSelector, namespaceLabelSelector, dynamicHostRegex, + ) + }, + ) +} + +// buildOpenTelemetryCollectorReconciler constructs the controller-runtime +// manager and the otelCollectorReconciler. It is invoked by awaitController +// once the OpenTelemetryCollector CRD is observed on the cluster. +func buildOpenTelemetryCollectorReconciler( + ctx context.Context, + conf *config.Config, + l logr.Logger, + m *metrics.FluentBitGardenerMetrics, + ms *otlp.MetricsSetup, + labelSelector labels.Selector, + namespaceLabelSelector labels.Selector, + dynamicHostRegex *regexp.Regexp, +) (Controller, error) { restConfig, err := getRestConfig() if err != nil { return nil, fmt.Errorf("failed to get REST config: %w", err) diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index e0380971..92324e02 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -4,12 +4,26 @@ package controller import ( + "context" "encoding/json" + "errors" "fmt" + "strings" + "sync" gardencorev1beta1 "github.com/gardener/gardener/pkg/apis/core/v1beta1" v1beta1constants "github.com/gardener/gardener/pkg/apis/core/v1beta1/constants" extensionsv1alpha1 "github.com/gardener/gardener/pkg/apis/extensions/v1alpha1" + "github.com/go-logr/logr" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + clientgocache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) func shootFromCluster(cluster *extensionsv1alpha1.Cluster) (*gardencorev1beta1.Shoot, error) { @@ -82,3 +96,116 @@ func getShootState(shoot *gardencorev1beta1.Shoot) clusterState { return clusterStateReady } + +// crdGVR is the GroupVersionResource of the CustomResourceDefinition kind itself. +var crdGVR = schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", +} + +// isCRDEstablished returns true if the given CRD object is in group `group`, +// is Established, and has its names accepted. +func isCRDEstablished(u *unstructured.Unstructured, group string) bool { + crd := &apiextensionsv1.CustomResourceDefinition{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, crd); err != nil { + return false + } + + if crd.Spec.Group != group { + return false + } + + established := false + namesAccepted := false + for _, c := range crd.Status.Conditions { + switch c.Type { + case apiextensionsv1.Established: + established = c.Status == apiextensionsv1.ConditionTrue + case apiextensionsv1.NamesAccepted: + namesAccepted = c.Status == apiextensionsv1.ConditionTrue + default: + // Other condition types (e.g. Terminating, KubernetesAPIApprovalPolicyConformant) + // don't gate readiness for our purposes. + } + } + + return established && namesAccepted +} + +// awaitController watches for the target CRD on the supplied dynamic client. +// Once that CRD is observed as Established with its names accepted, it invokes +// `build` and delivers the resulting Controller on the returned channel. +// +// The returned channel always closes — either after delivering the Controller, +// or without a value if ctx is cancelled before the CRD shows up, or if `build` fails. +func awaitController( + ctx context.Context, + l logr.Logger, + scheme *runtime.Scheme, + obj client.Object, + dynamicClient dynamic.Interface, + build func(ctx context.Context) (Controller, error), +) (<-chan Controller, error) { + gvk, err := apiutil.GVKForObject(obj, scheme) + if err != nil { + return nil, fmt.Errorf("failed to derive GVK for %T: %w", obj, err) + } + // CRD plural follows the kubebuilder convention of lowercasing the Kind and adding "s". + crdName := strings.ToLower(gvk.Kind) + "s." + gvk.Group + + factory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 0) + informer := factory.ForResource(crdGVR).Informer() + + var once sync.Once + found := make(chan struct{}) + check := func(o any) { + u, ok := o.(*unstructured.Unstructured) + if !ok { + return + } + if u.GetName() != crdName { + return + } + if !isCRDEstablished(u, gvk.Group) { + return + } + once.Do(func() { + l.Info("target CRD is established", "crd", crdName) + close(found) + }) + } + if _, err := informer.AddEventHandler(clientgocache.ResourceEventHandlerFuncs{ + AddFunc: func(o any) { check(o) }, + UpdateFunc: func(_, o any) { check(o) }, + }); err != nil { + return nil, fmt.Errorf("failed to add event handler: %w", err) + } + + factory.Start(ctx.Done()) + if !clientgocache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, errors.New("failed to sync CRD informer cache") + } + l.Info("waiting for CRD to become available", "crd", crdName) + + out := make(chan Controller, 1) + go func() { + defer close(out) + + select { + case <-found: + case <-ctx.Done(): + return + } + + c, err := build(ctx) + if err != nil { + l.Error(err, "failed to build controller after CRD became available", "crd", crdName) + + return + } + out <- c + }() + + return out, nil +} diff --git a/pkg/plugin/logging.go b/pkg/plugin/logging.go index 90308c24..5bd4eb69 100644 --- a/pkg/plugin/logging.go +++ b/pkg/plugin/logging.go @@ -7,6 +7,7 @@ import ( "context" "errors" "regexp" + "sync" "github.com/go-logr/logr" @@ -31,6 +32,7 @@ type logging struct { cfg *config.Config dynamicHostRegexp *regexp.Regexp extractKubernetesMetadataRegexp *regexp.Regexp + ctrlMu sync.RWMutex controller controller.Controller logger logr.Logger ctx context.Context @@ -59,11 +61,31 @@ func NewPlugin(cfg *config.Config, logger logr.Logger, m *metrics.FluentBitGarde l.dynamicHostRegexp = regexp.MustCompile(cfg.ControllerConfig.DynamicHostRegex) // Pass the plugin's context to the controller - if l.controller, err = controller.NewController(ctx, cfg, logger, m, ms); err != nil { + ctlCh, err := controller.NewController(ctx, cfg, logger, m, ms) + if err != nil { cancel() return nil, err } + + // The controller is delivered asynchronously (it waits for its CRD to be + // established). Until it arrives, getClient routes dynamic-host records + // to the seed client as a fallback so they are not dropped. + logger.Info("controller pending: dynamic-host records will fall back to the seed client until the CRD is established") + + go func() { + select { + case c, ok := <-ctlCh: + if !ok { + logger.Info("controller channel closed before delivery; staying in seed-client fallback mode") + + return + } + l.setController(c) + logger.Info("controller installed in plugin: dynamic-host records now route through the controller (fallback ended)") + case <-ctx.Done(): + } + }() } if cfg.PluginConfig.KubernetesMetadata.FallbackToTagWhenMetadataIsMissing { @@ -207,8 +229,8 @@ func (l *logging) Close() { l.cancel() l.seedClient.StopWait() - if l.controller != nil { - l.controller.Stop() + if c := l.getController(); c != nil { + c.Stop() } l.logger.Info("logging plugin stopped", @@ -218,9 +240,19 @@ func (l *logging) Close() { } func (l *logging) getClient(dynamicHosName string) api.Output { - if l.isDynamicHost(dynamicHosName) && l.controller != nil { - if c, isStopped := l.controller.GetClient(dynamicHosName); !isStopped { - return c + if l.isDynamicHost(dynamicHosName) { + c := l.getController() + if c == nil { + // Controller not yet available (e.g. CRD not installed). + // Fall back to the seed client so records aren't dropped. + l.logger.Info("controller not installed yet, routing dynamic-host record to seed client", + "host", dynamicHosName, + ) + + return l.seedClient + } + if out, isStopped := c.GetClient(dynamicHosName); !isStopped { + return out } return nil @@ -229,6 +261,19 @@ func (l *logging) getClient(dynamicHosName string) api.Output { return l.seedClient } +func (l *logging) getController() controller.Controller { + l.ctrlMu.RLock() + defer l.ctrlMu.RUnlock() + + return l.controller +} + +func (l *logging) setController(c controller.Controller) { + l.ctrlMu.Lock() + defer l.ctrlMu.Unlock() + l.controller = c +} + func (l *logging) isDynamicHost(dynamicHostName string) bool { return dynamicHostName != "" && l.dynamicHostRegexp != nil && diff --git a/pkg/plugin/logging_test.go b/pkg/plugin/logging_test.go index 361830c2..ee69df54 100644 --- a/pkg/plugin/logging_test.go +++ b/pkg/plugin/logging_test.go @@ -30,6 +30,32 @@ import ( "github.com/gardener/logging/v1/pkg/types" ) +// stubOutput is a no-op api.Output used to assert routing in plugin tests +// without involving any real client machinery. +type stubOutput struct { + name string +} + +func (*stubOutput) Handle(_ types.OutputEntry) error { return nil } +func (*stubOutput) Stop() {} +func (*stubOutput) StopWait() {} +func (s *stubOutput) Endpoint() string { return s.name } + +// stubController is a controller.Controller used to verify that the plugin +// routes dynamic-host records through the controller once it is installed. +type stubController struct { + clients map[string]api.Output +} + +// GetClient mirrors the real reconcilers' contract. +func (s *stubController) GetClient(name string) (api.Output, bool) { + return s.clients[name], false +} +func (*stubController) Reconcile(_ context.Context, _ ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, nil +} +func (*stubController) Stop() {} + var _ = Describe("OutputPlugin plugin", func() { var ( cfg *config.Config @@ -346,6 +372,66 @@ var _ = Describe("OutputPlugin plugin", func() { Expect(l.isDynamicHost("random-name")).To(BeFalse()) Expect(l.isDynamicHost("garden")).To(BeFalse()) }) + + // Mirrors the awaitController flow at the plugin layer: until the + // controller is delivered, dynamic-host records fall back to the seed + // client; once it arrives, the plugin routes through it. + It("falls back to the seed client for dynamic hosts until the controller is installed", func() { + cfg := &config.Config{ + OTLPConfig: config.OTLPConfig{ + Endpoint: "http://test-seed-endpoint:4318/v1/logs", + DQueConfig: config.DQueConfig{ + DQueDir: GinkgoT().TempDir(), + DQueSegmentSize: 500, + DQueSync: false, + DQueName: fmt.Sprintf("dque-fallback-%d", time.Now().UnixNano()), + }, + }, + PluginConfig: config.PluginConfig{ + SeedType: types.NOOP.String(), + ShootType: types.NOOP.String(), + LogLevel: "info", + }, + ControllerConfig: config.ControllerConfig{ + CtlSyncTimeout: 5 * time.Second, + DynamicHostPrefix: "http://logging.", + DynamicHostSuffix: ".svc:4318/v1/logs", + DynamicHostRegex: `^shoot--.*`, + DynamicHostPath: map[string]any{ + "kubernetes": map[string]any{ + "namespace_name": "namespace", + }, + }, + }, + } + + plugin, err := NewPluginWithController(cfg, logger, testMetrics, nil, nil) + Expect(err).NotTo(HaveOccurred()) + defer plugin.Close() + + l, ok := plugin.(*logging) + Expect(ok).To(BeTrue()) + Expect(l.seedClient).NotTo(BeNil()) + Expect(l.getController()).To(BeNil(), "no controller is installed yet") + + // Phase 1: CRD not yet established, controller not installed. + // A dynamic-host record must be routed to the seed client rather than dropped. + const dynamicHost = "shoot--proj--cluster" + Expect(l.isDynamicHost(dynamicHost)).To(BeTrue()) + Expect(l.getClient(dynamicHost)).To(BeIdenticalTo(l.seedClient)) + + // Phase 2: simulate awaitController delivering the controller. + shootClient := &stubOutput{name: "shoot-client"} + l.setController(&stubController{clients: map[string]api.Output{dynamicHost: shootClient}}) + + // Same dynamic host now resolves through the controller. + got := l.getClient(dynamicHost) + Expect(got).To(BeIdenticalTo(api.Output(shootClient))) + Expect(got).NotTo(BeIdenticalTo(l.seedClient)) + + // Non-dynamic hosts continue to use the seed client either way. + Expect(l.getClient("garden")).To(BeIdenticalTo(l.seedClient)) + }) }) Describe("Graceful Shutdown", func() {