From bf6156bf79b242fff67629cdd1d843ad10cfe1dd Mon Sep 17 00:00:00 2001 From: Adrian Fernandez De La Torre Date: Sun, 8 Mar 2026 14:31:31 +0100 Subject: [PATCH] Migrate to new Kubernentes event API Signed-off-by: Adrian Fernandez De La Torre --- apis/acl/zz_generated.deepcopy.go | 2 +- apis/event/v1/action.go | 47 ++++++ apis/event/v1/doc.go | 19 +++ apis/event/v1/event.go | 118 +++++++++++++++ apis/event/v1/metadata.go | 46 ++++++ apis/event/v1/zz_generated.deepcopy.go | 54 +++++++ apis/event/v1beta1/zz_generated.deepcopy.go | 2 +- apis/kustomize/zz_generated.deepcopy.go | 2 +- apis/meta/zz_generated.deepcopy.go | 2 +- hack/boilerplate.go.txt | 2 +- runtime/events/fake.go | 158 ++++++++++++++++++++ runtime/events/recorder.go | 139 +++++++++-------- runtime/events/recorder_fuzzer_test.go | 2 +- runtime/events/recorder_test.go | 17 +-- 14 files changed, 530 insertions(+), 80 deletions(-) create mode 100644 apis/event/v1/action.go create mode 100644 apis/event/v1/doc.go create mode 100644 apis/event/v1/event.go create mode 100644 apis/event/v1/metadata.go create mode 100644 apis/event/v1/zz_generated.deepcopy.go create mode 100644 runtime/events/fake.go diff --git a/apis/acl/zz_generated.deepcopy.go b/apis/acl/zz_generated.deepcopy.go index 50909f381..9130b21bf 100644 --- a/apis/acl/zz_generated.deepcopy.go +++ b/apis/acl/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated /* -Copyright 2025 The Flux authors +Copyright 2026 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/apis/event/v1/action.go b/apis/event/v1/action.go new file mode 100644 index 000000000..c4e1a1d1f --- /dev/null +++ b/apis/event/v1/action.go @@ -0,0 +1,47 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +// These constants define common event actions used throughout Flux controllers. +const ( + // ActionReconciling indicates a reconciliation is in progress. + ActionReconciling string = "Reconciling" + // ActionReconciled indicates a successful reconciliation. + ActionReconciled string = "Reconciled" + // ActionFetching indicates fetching of a resource or artifact. + ActionFetching string = "Fetching" + // ActionFetched indicates successful fetch of a resource or artifact. + ActionFetched string = "Fetched" + // ActionApplying indicates applying changes to the cluster. + ActionApplying string = "Applying" + // ActionApplied indicates successful application of changes. + ActionApplied string = "Applied" + // ActionDeleting indicates deletion is in progress. + ActionDeleting string = "Deleting" + // ActionDeleted indicates successful deletion. + ActionDeleted string = "Deleted" + // ActionValidating indicates validation is in progress. + ActionValidating string = "Validating" + // ActionValidated indicates successful validation. + ActionValidated string = "Validated" + // ActionWaiting indicates waiting for a condition. + ActionWaiting string = "Waiting" + // ActionProgressing indicates progression through a workflow. + ActionProgressing string = "Progressing" + // ActionFailed indicates a failed operation. + ActionFailed string = "Failed" +) diff --git a/apis/event/v1/doc.go b/apis/event/v1/doc.go new file mode 100644 index 000000000..773dc9667 --- /dev/null +++ b/apis/event/v1/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// package v1 contains the API Schema definitions for the Flux eventing API. +// +kubebuilder:object:generate=true +package v1 diff --git a/apis/event/v1/event.go b/apis/event/v1/event.go new file mode 100644 index 000000000..aa595f3e9 --- /dev/null +++ b/apis/event/v1/event.go @@ -0,0 +1,118 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Group is the API Group for the Event API. +const Group = "event.toolkit.fluxcd.io" + +// These constants define valid event severity values. +const ( + // EventSeverityTrace represents a trace event, usually + // informing about actions taken during reconciliation. + EventSeverityTrace string = "trace" + // EventSeverityInfo represents an informational event, usually + // informing about changes. + EventSeverityInfo string = "info" + // EventSeverityError represent an error event, usually a warning + // that something goes wrong. + EventSeverityError string = "error" +) + +// EventTypeTrace represents a trace event. +const EventTypeTrace string = "Trace" + +// Event is a report of an event issued by a controller. +type Event struct { + // The object that this event is about. + // +required + InvolvedObject corev1.ObjectReference `json:"involvedObject"` + + // RelatedObject is an optional secondary object for more complex actions. + // For simple events, this field may be left empty. + // +optional + RelatedObject *corev1.ObjectReference `json:"relatedObject,omitempty"` + + // Severity type of this event (trace, info, error) + // +kubebuilder:validation:Enum=trace;info;error + // +required + Severity string `json:"severity"` + + // The time at which this event was recorded. + // +required + Timestamp metav1.Time `json:"timestamp"` + + // A human-readable description of this event. + // Maximum length 39,000 characters. + // +kubebuilder:validation:MaxLength=39000 + // +required + Message string `json:"message"` + + // A machine understandable string that gives the reason + // for the transition into the object's current status. + // +required + Reason string `json:"reason"` + + // Action describes what action was taken/failed regarding the object. + // Examples: "Starting", "Syncing", "Deleting". + // +required + Action string `json:"action"` + + // Metadata of this event, e.g. apply change set. + // +optional + Metadata map[string]string `json:"metadata,omitempty"` + + // Name of the controller that emitted this event, e.g. `source-controller`. + // +required + ReportingController string `json:"reportingController"` + + // ID of the controller instance, e.g. `source-controller-xyzf`. + // +optional + ReportingInstance string `json:"reportingInstance,omitempty"` +} + +// HasReason returns true if the Reason equals the given value. +func (in *Event) HasReason(reason string) bool { + return in.Reason == reason +} + +// HasMetadata returns true if the given key/value pair is found in Metadata. +func (in *Event) HasMetadata(key string, val string) bool { + if v, ok := in.Metadata[key]; ok && v == val { + return true + } + return false +} + +// GetRevision looks up for the keys in Metadata that may contain +// the revision of the object that this event is about. +func (in *Event) GetRevision() (string, bool) { + if r, ok := in.Metadata[MetaCommitKey]; ok { + return r, true + } + if r, ok := in.Metadata[MetaOriginRevisionKey]; ok { + return r, true + } + if r, ok := in.Metadata[MetaRevisionKey]; ok { + return r, true + } + return "", false +} diff --git a/apis/event/v1/metadata.go b/apis/event/v1/metadata.go new file mode 100644 index 000000000..3fffd9664 --- /dev/null +++ b/apis/event/v1/metadata.go @@ -0,0 +1,46 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +// These constants define the Event metadata keys used throughout Flux controllers. +const ( + // MetaRevisionKey is the key used to hold the source artifact revision. + MetaRevisionKey string = "revision" + // MetaOriginRevisionKey is the key used to hold the source artifact origin revision. + MetaOriginRevisionKey string = "originRevision" + // MetaChecksumKey is the key used to hold the source artifact checksum. + // Deprecated: in favor of MetaDigestKey. + MetaChecksumKey string = "checksum" + // MetaDigestKey is the key used to hold the source artifact digest. + MetaDigestKey string = "digest" + // MetaTokenKey is the key used to hold an arbitrary token whose contents + // are defined on a per-event-emitter basis for uniquely identifying the + // contents of the event payload. For example, it could be the generation + // of an object, or the hash of a set of configurations, or even a + // base64-encoded set of configurations. This is useful for example for + // rate limiting the events. + MetaTokenKey string = "token" + // MetaCommitKey is the key used to hold the Git commit hash. + MetaCommitKey string = "commit" + // MetaCommitStatusKey is the key used to signal a Git commit status event. + MetaCommitStatusKey string = "commit_status" + // MetaCommitStatusUpdateValue is the value of MetaCommitStatusKey + // used to signal a Git commit status update. + MetaCommitStatusUpdateValue string = "update" + // MetaChangeRequestKey is the key used to hold the identifier of a change request. + MetaChangeRequestKey string = "change_request" +) diff --git a/apis/event/v1/zz_generated.deepcopy.go b/apis/event/v1/zz_generated.deepcopy.go new file mode 100644 index 000000000..8a7a7b36c --- /dev/null +++ b/apis/event/v1/zz_generated.deepcopy.go @@ -0,0 +1,54 @@ +//go:build !ignore_autogenerated + +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1 + +import ( + corev1 "k8s.io/api/core/v1" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Event) DeepCopyInto(out *Event) { + *out = *in + out.InvolvedObject = in.InvolvedObject + if in.RelatedObject != nil { + in, out := &in.RelatedObject, &out.RelatedObject + *out = new(corev1.ObjectReference) + **out = **in + } + in.Timestamp.DeepCopyInto(&out.Timestamp) + if in.Metadata != nil { + in, out := &in.Metadata, &out.Metadata + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Event. +func (in *Event) DeepCopy() *Event { + if in == nil { + return nil + } + out := new(Event) + in.DeepCopyInto(out) + return out +} diff --git a/apis/event/v1beta1/zz_generated.deepcopy.go b/apis/event/v1beta1/zz_generated.deepcopy.go index c4c998e9e..88d13ed52 100644 --- a/apis/event/v1beta1/zz_generated.deepcopy.go +++ b/apis/event/v1beta1/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated /* -Copyright 2025 The Flux authors +Copyright 2026 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/apis/kustomize/zz_generated.deepcopy.go b/apis/kustomize/zz_generated.deepcopy.go index 931fcfadb..a150c5283 100644 --- a/apis/kustomize/zz_generated.deepcopy.go +++ b/apis/kustomize/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated /* -Copyright 2025 The Flux authors +Copyright 2026 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/apis/meta/zz_generated.deepcopy.go b/apis/meta/zz_generated.deepcopy.go index 30088cd28..2616682c5 100644 --- a/apis/meta/zz_generated.deepcopy.go +++ b/apis/meta/zz_generated.deepcopy.go @@ -1,7 +1,7 @@ //go:build !ignore_autogenerated /* -Copyright 2025 The Flux authors +Copyright 2026 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/hack/boilerplate.go.txt b/hack/boilerplate.go.txt index f57d7e7fe..1d74bca92 100644 --- a/hack/boilerplate.go.txt +++ b/hack/boilerplate.go.txt @@ -1,5 +1,5 @@ /* -Copyright 2025 The Flux authors +Copyright 2026 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/runtime/events/fake.go b/runtime/events/fake.go new file mode 100644 index 000000000..6961098b8 --- /dev/null +++ b/runtime/events/fake.go @@ -0,0 +1,158 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + _ "k8s.io/client-go/tools/events" +) + +// FakeRecorder is used as a fake during tests. +// +// It was invented to be used in tests which require more precise control over +// e.g. assertions of specific event fields like Reason. For which string +// comparisons on the concentrated event message using record.FakeRecorder is +// not sufficient. +// +// To empty the Events channel into a slice of the recorded events, use +// GetEvents(). Not initializing Events will cause the recorder to not record +// any messages. +type FakeRecorder struct { + Events chan eventsv1.Event + IncludeObject bool +} + +// NewFakeRecorder creates new fake event recorder with an Events channel with +// the given size. Setting includeObject to true will cause the recorder to +// include the object reference in the events. +// +// To initialize a recorder which does not record any events, simply use: +// +// recorder := new(FakeRecorder) +func NewFakeRecorder(bufferSize int, includeObject bool) *FakeRecorder { + return &FakeRecorder{ + Events: make(chan eventsv1.Event, bufferSize), + IncludeObject: includeObject, + } +} + +// NewNopRecorder creates a new FakeRecorder that doesn't record any events. +// This is the most lightweight option for tests that don't need event recording +// functionality. The recorder will implement the EventRecorder interface but +// all event calls will be no-ops. +// +// Example: +// +// r := &MyReconciler{ +// Client: k8sClient, +// EventRecorder: events.NewNopRecorder(), +// } +func NewNopRecorder() *FakeRecorder { + return &FakeRecorder{ + Events: nil, + IncludeObject: false, + } +} + +// Event emits an event with the given message. +func (f *FakeRecorder) Event(obj runtime.Object, related runtime.Object, eventType, reason, action, message string) { + f.AnnotatedEventf(obj, related, nil, eventType, reason, action, "%s", message) +} + +// Eventf emits an event with the given message. +func (f *FakeRecorder) Eventf(obj runtime.Object, related runtime.Object, eventType, reason, action, message string, args ...interface{}) { + if f.Events != nil { + f.Events <- f.generateEvent(obj, related, nil, eventType, reason, action, message, args...) + } +} + +// AnnotatedEventf emits an event with annotations. +func (f *FakeRecorder) AnnotatedEventf(obj runtime.Object, related runtime.Object, + annotations map[string]string, + eventType, reason, action, + message string, args ...interface{}) { + if f.Events != nil { + f.Events <- f.generateEvent(obj, related, annotations, eventType, reason, action, message, args...) + } +} + +// GetEvents empties the Events channel and returns a slice of recorded events. +// If the Events channel is nil, it returns nil. +func (f *FakeRecorder) GetEvents() (events []eventsv1.Event) { + if f.Events != nil { + for { + select { + case e := <-f.Events: + events = append(events, e) + default: + return events + } + } + } + return nil +} + +// generateEvent generates a new mocked event with the given parameters. +func (f *FakeRecorder) generateEvent(obj runtime.Object, related runtime.Object, + annotations map[string]string, + eventType, reason, action, + message string, args ...interface{}) eventsv1.Event { + event := eventsv1.Event{ + Regarding: objectReference(obj, f.IncludeObject), + Type: eventType, + Reason: reason, + Action: action, + Note: fmt.Sprintf(message, args...), + } + if annotations != nil { + event.ObjectMeta.Annotations = annotations + } + + if related != nil { + relatedRef := objectReference(related, f.IncludeObject) + event.Related = &relatedRef + } + + return event +} + +// objectReference returns an object reference for the given object with the +// kind and (group) API version set. +func objectReference(obj runtime.Object, includeObject bool) corev1.ObjectReference { + if !includeObject { + return corev1.ObjectReference{} + } + + ref := corev1.ObjectReference{ + Kind: obj.GetObjectKind().GroupVersionKind().Kind, + APIVersion: obj.GetObjectKind().GroupVersionKind().GroupVersion().String(), + } + + // Extract name and namespace from object metadata + if metaObj, ok := obj.(metav1.Object); ok { + ref.Name = metaObj.GetName() + ref.Namespace = metaObj.GetNamespace() + } + + return ref +} diff --git a/runtime/events/recorder.go b/runtime/events/recorder.go index fb5bf9027..b963e4062 100644 --- a/runtime/events/recorder.go +++ b/runtime/events/recorder.go @@ -33,11 +33,11 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - kuberecorder "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/events" "k8s.io/client-go/tools/reference" ctrl "sigs.k8s.io/controller-runtime" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" "github.com/fluxcd/pkg/runtime/logger" ) @@ -48,70 +48,65 @@ import ( // // import ( // ... -// kuberecorder "k8s.io/client-go/tools/record" +// "k8s.io/client-go/tools/events" // ... // ) // // type MyTypeReconciler { // client.Client // // ... etc. -// kuberecorder.EventRecorder +// events.EventRecorder // } // // Use NewRecorder to create a working Recorder. -type Recorder struct { +type EventRecorder interface { + // Eventf records an event with a given/formatted message. + Eventf(object runtime.Object, related runtime.Object, eventtype, reason, action, messageFmt string, args ...interface{}) + + // AnnotatedEventf records an event with annotations and a formatted message. + AnnotatedEventf(object runtime.Object, related runtime.Object, annotations map[string]string, eventtype, reason, action, messageFmt string, args ...interface{}) +} + +type recorder struct { // URL address of the events endpoint. - Webhook string + webhook string // Name of the controller that emits events. - ReportingController string + reportingController string // Retryable HTTP client. - Client *retryablehttp.Client + client *retryablehttp.Client - // EventRecorder is the Kubernetes event recorder. - EventRecorder kuberecorder.EventRecorder + // AnnotatedEventRecorder is the Kubernetes event recorder. + events.AnnotatedEventRecorder // Scheme to look up the recorded objects. - Scheme *runtime.Scheme + scheme *runtime.Scheme // Log is the recorder logger. - Log logr.Logger + log logr.Logger } -var _ kuberecorder.EventRecorder = &Recorder{} +var _ events.AnnotatedEventRecorder = &recorder{} -// NewRecorder creates an event Recorder with a Kubernetes event recorder and an external event recorder based on the -// given webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the -// external recorder. -func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string) (*Recorder, error) { - if webhook != "" { - if _, err := url.Parse(webhook); err != nil { - return nil, err - } - } +// RecorderOption configures a recorder. +type RecorderOption func(*recorder) - httpClient := retryablehttp.NewClient() - httpClient.HTTPClient.Timeout = 5 * time.Second - httpClient.CheckRetry = checkRetry - httpClient.Logger = nil - - return &Recorder{ - Scheme: mgr.GetScheme(), - Webhook: webhook, - ReportingController: reportingController, - Client: httpClient, - EventRecorder: mgr.GetEventRecorderFor(reportingController), - Log: log, - }, nil +// NewRecorder creates an events.EventRecorder with a Kubernetes event recorder +// and an external event recorder based on the given webhook. The recorder +// performs automatic retries for connection errors and 500-range response codes +// from the external recorder. +func NewRecorder(mgr ctrl.Manager, log logr.Logger, webhook, reportingController string, opts ...RecorderOption) (EventRecorder, error) { + return NewRecorderForScheme(mgr.GetScheme(), mgr.GetAnnotatedEventRecorder(reportingController), log, webhook, reportingController, opts...) } -// NewRecorderForScheme creates an event Recorder with a Kubernetes event recorder and an external event recorder based on the -// given webhook. The recorder performs automatic retries for connection errors and 500-range response codes from the -// external recorder. +// NewRecorderForScheme creates an events.EventRecorder with a Kubernetes event +// recorder and an external event recorder based on the given webhook. The +// recorder performs automatic retries for connection errors and 500-range +// response codes from the external recorder. func NewRecorderForScheme(scheme *runtime.Scheme, - eventRecorder kuberecorder.EventRecorder, - log logr.Logger, webhook, reportingController string) (*Recorder, error) { + eventRecorder events.AnnotatedEventRecorder, + log logr.Logger, webhook, reportingController string, opts ...RecorderOption) (EventRecorder, error) { if webhook != "" { if _, err := url.Parse(webhook); err != nil { return nil, err @@ -123,14 +118,24 @@ func NewRecorderForScheme(scheme *runtime.Scheme, httpClient.CheckRetry = checkRetry httpClient.Logger = nil - return &Recorder{ - Scheme: scheme, - Webhook: webhook, - ReportingController: reportingController, - Client: httpClient, - EventRecorder: eventRecorder, - Log: log, - }, nil + r := &recorder{ + scheme: scheme, + webhook: webhook, + reportingController: reportingController, + client: httpClient, + AnnotatedEventRecorder: eventRecorder, + log: log, + } + for _, o := range opts { + o(r) + } + return r, nil +} + +func WithRetryMax(n int) RecorderOption { + return func(r *recorder) { + r.client.RetryMax = n + } } func checkRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { @@ -148,26 +153,23 @@ func responseIsEventDuplicated(resp *http.Response) bool { } // Event records an event in the webhook address. -func (r *Recorder) Event(object runtime.Object, eventtype, reason, message string) { - r.AnnotatedEventf(object, nil, eventtype, reason, "%s", message) -} - -// Event records an event in the webhook address. -func (r *Recorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { - r.AnnotatedEventf(object, nil, eventtype, reason, messageFmt, args...) +func (r *recorder) Eventf(object runtime.Object, related runtime.Object, eventtype, reason string, action string, messageFmt string, args ...interface{}) { + r.AnnotatedEventf(object, related, nil, eventtype, reason, action, messageFmt, args...) } // AnnotatedEventf constructs an event from the given information and performs a HTTP POST to the webhook address. // It also logs the event if debug logs are enabled in the logger. -func (r *Recorder) AnnotatedEventf( +func (r *recorder) AnnotatedEventf( object runtime.Object, + related runtime.Object, inputAnnotations map[string]string, eventtype, reason string, + action string, messageFmt string, args ...interface{}) { - ref, err := reference.GetReference(r.Scheme, object) + ref, err := reference.GetReference(r.scheme, object) if err != nil { - r.Log.Error(err, "failed to get object reference") + r.log.Error(err, "failed to get object reference") } // Add object annotations to the annotations. @@ -184,7 +186,7 @@ func (r *Recorder) AnnotatedEventf( } // Add object info in the logger. - log := r.Log.WithValues("name", ref.Name, "namespace", ref.Namespace, "reconciler kind", ref.Kind) + log := r.log.WithValues("name", ref.Name, "namespace", ref.Namespace, "reconciler kind", ref.Kind) // Log the event if in trace mode. if log.GetSink().Enabled(logger.TraceLevel) { @@ -202,20 +204,20 @@ func (r *Recorder) AnnotatedEventf( // Do not send trace events to notification controller, // traces are persisted as Kubernetes events only as normal events. if severity == eventv1.EventSeverityTrace { - r.EventRecorder.AnnotatedEventf(object, annotations, corev1.EventTypeNormal, reason, messageFmt, args...) + r.AnnotatedEventRecorder.AnnotatedEventf(object, related, annotations, corev1.EventTypeNormal, reason, action, messageFmt, args...) return } // Forward the event to the Kubernetes recorder. - r.EventRecorder.AnnotatedEventf(object, annotations, eventtype, reason, messageFmt, args...) + r.AnnotatedEventRecorder.AnnotatedEventf(object, related, annotations, eventtype, reason, action, messageFmt, args...) // If no webhook address is provided, skip posting to event recorder // endpoint. - if r.Webhook == "" { + if r.webhook == "" { return } - if r.Client == nil { + if r.client == nil { err := fmt.Errorf("retryable HTTP client has not been initialized") log.Error(err, "unable to record event") return @@ -253,18 +255,25 @@ func (r *Recorder) AnnotatedEventf( Timestamp: metav1.Now(), Message: message, Reason: reason, + Action: action, Metadata: annotations, - ReportingController: r.ReportingController, + ReportingController: r.reportingController, ReportingInstance: hostname, } + // Add related object reference if provided (optional). + relatedRef, err := reference.GetReference(r.scheme, related) + if err == nil { + event.RelatedObject = relatedRef + } + body, err := json.Marshal(event) if err != nil { log.Error(err, "failed to marshal object into json") return } - if _, err := r.Client.Post(r.Webhook, "application/json", body); err != nil { + if _, err := r.client.Post(r.webhook, "application/json", body); err != nil { log.Error(err, "unable to record event") return } diff --git a/runtime/events/recorder_fuzzer_test.go b/runtime/events/recorder_fuzzer_test.go index 50ae72710..90fc09b8f 100644 --- a/runtime/events/recorder_fuzzer_test.go +++ b/runtime/events/recorder_fuzzer_test.go @@ -36,7 +36,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" ctrl "sigs.k8s.io/controller-runtime" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" ) var ( diff --git a/runtime/events/recorder_test.go b/runtime/events/recorder_test.go index 5576735f0..b40e8cde2 100644 --- a/runtime/events/recorder_test.go +++ b/runtime/events/recorder_test.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" - eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1" + eventv1 "github.com/fluxcd/pkg/apis/event/v1" ) func TestEventRecorder_AnnotatedEventf(t *testing.T) { @@ -102,6 +102,7 @@ func TestEventRecorder_AnnotatedEventf(t *testing.T) { require.Equal(t, "webapp", payload.InvolvedObject.Name) require.Equal(t, "gitops-system", payload.InvolvedObject.Namespace) require.Equal(t, "sync", payload.Reason) + require.Equal(t, eventv1.ActionReconciling, payload.Action) require.Equal(t, "sync object", payload.Message) for k, v := range tt.expectedMetadata { @@ -117,11 +118,11 @@ func TestEventRecorder_AnnotatedEventf(t *testing.T) { const msg = "sync object" - eventRecorder.AnnotatedEventf(obj, tt.inputAnnotations, corev1.EventTypeNormal, "sync", "%s", msg) + eventRecorder.AnnotatedEventf(obj, nil, tt.inputAnnotations, corev1.EventTypeNormal, "sync", eventv1.ActionReconciling, "%s", msg) require.Equal(t, 1, requestCount) // When a trace event is sent, it's dropped, no new request. - eventRecorder.AnnotatedEventf(obj, tt.inputAnnotations, eventv1.EventTypeTrace, "sync", "%s", msg) + eventRecorder.AnnotatedEventf(obj, nil, tt.inputAnnotations, eventv1.EventTypeTrace, "sync", eventv1.ActionReconciling, "%s", msg) require.Equal(t, 1, requestCount) }) } @@ -142,15 +143,14 @@ func TestEventRecorder_AnnotatedEventf_Retry(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller") + eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller", WithRetryMax(2)) require.NoError(t, err) - eventRecorder.Client.RetryMax = 2 obj := &corev1.ConfigMap{} obj.Namespace = "gitops-system" obj.Name = "webapp" - eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + eventRecorder.AnnotatedEventf(obj, nil, nil, corev1.EventTypeNormal, "sync", eventv1.ActionReconciling, "sync %s", obj.Name) require.True(t, requestCount > 1) } @@ -169,15 +169,14 @@ func TestEventRecorder_AnnotatedEventf_RateLimited(t *testing.T) { })) defer ts.Close() - eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller") + eventRecorder, err := NewRecorder(env, ctrl.Log, ts.URL, "test-controller", WithRetryMax(2)) require.NoError(t, err) - eventRecorder.Client.RetryMax = 2 obj := &corev1.ConfigMap{} obj.Namespace = "gitops-system" obj.Name = "webapp" - eventRecorder.AnnotatedEventf(obj, nil, corev1.EventTypeNormal, "sync", "sync %s", obj.Name) + eventRecorder.AnnotatedEventf(obj, nil, nil, corev1.EventTypeNormal, "sync", eventv1.ActionReconciling, "sync %s", obj.Name) require.Equal(t, 1, requestCount) }