From f0b95cc1ebc041c4645b0b5700e46a068eaf4d82 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 20 Oct 2025 19:52:17 +0700 Subject: [PATCH 1/3] feat: support configurable idempotency key ttl --- docs/pages/references/configuration.mdx | 8 ++++++++ internal/config/config.go | 6 ++++++ internal/deliverymq/messagehandler.go | 3 ++- internal/deliverymq/messagehandler_test.go | 18 ++++++++++++++++++ internal/deliverymq/retry_test.go | 1 + internal/publishmq/eventhandler.go | 3 ++- internal/publishmq/eventhandler_test.go | 2 ++ internal/services/api/api.go | 2 +- internal/services/api/router_test.go | 2 +- internal/services/delivery/delivery.go | 1 + 10 files changed, 42 insertions(+), 4 deletions(-) diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index f3cec0823..fb04ca9c2 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -41,6 +41,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes | | `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes | | `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes | +| `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours). | `86400` | No | | `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No | | `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No | | `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No | @@ -105,6 +106,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS` | JSON string or path to a file containing GCP service account credentials for the Pub/Sub publish topic. Required if GCP Pub/Sub is chosen and not using implicit credentials. | `nil` | Conditional | | `PUBLISH_GCP_PUBSUB_SUBSCRIPTION` | Name of the GCP Pub/Sub subscription to read published events from. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional | | `PUBLISH_GCP_PUBSUB_TOPIC` | Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional | +| `PUBLISH_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours). | `86400` | No | | `PUBLISH_MAX_CONCURRENCY` | Maximum number of messages to process concurrently from the publish queue. | `1` | No | | `PUBLISH_RABBITMQ_EXCHANGE` | Name of the RabbitMQ exchange for the publish queue. | `nil` | No | | `PUBLISH_RABBITMQ_QUEUE` | Name of the RabbitMQ queue for publishing events. Required if RabbitMQ is the chosen publish MQ provider. | `nil` | Conditional | @@ -166,6 +168,9 @@ alert: # Enables or disables audit logging for significant events. audit_log: true +# Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours). +delivery_idempotency_key_ttl: 86400 + # Maximum number of delivery attempts to process concurrently. delivery_max_concurrency: 1 @@ -451,6 +456,9 @@ portal: # Required: Y postgres: "" +# Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours). +publish_idempotency_key_ttl: 86400 + publishmq: # Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured. aws_sqs: diff --git a/internal/config/config.go b/internal/config/config.go index 01b3d9ffb..b61ee8807 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -83,6 +83,10 @@ type Config struct { MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"` DeliveryTimeoutSeconds int `yaml:"delivery_timeout_seconds" env:"DELIVERY_TIMEOUT_SECONDS" desc:"Timeout in seconds for HTTP requests made during event delivery to webhook destinations." required:"N"` + // Idempotency + PublishIdempotencyKeyTTL int `yaml:"publish_idempotency_key_ttl" env:"PUBLISH_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours)." required:"N"` + DeliveryIdempotencyKeyTTL int `yaml:"delivery_idempotency_key_ttl" env:"DELIVERY_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours)." required:"N"` + // Destination Registry DestinationMetadataPath string `yaml:"destination_metadata_path" env:"DESTINATION_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set." required:"N"` @@ -160,6 +164,8 @@ func (c *Config) InitDefaults() { c.RetryMaxLimit = 10 c.MaxDestinationsPerTenant = 20 c.DeliveryTimeoutSeconds = 5 + c.PublishIdempotencyKeyTTL = 86400 // 24 hours + c.DeliveryIdempotencyKeyTTL = 86400 // 24 hours c.LogBatchThresholdSeconds = 10 c.LogBatchSize = 1000 diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 60ea4fd28..f2d854405 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -120,6 +120,7 @@ func NewMessageHandler( retryBackoff backoff.Backoff, retryMaxLimit int, alertMonitor AlertMonitor, + idempotencyKeyTTL time.Duration, ) consumer.MessageHandler { return &messageHandler{ eventTracer: eventTracer, @@ -133,7 +134,7 @@ func NewMessageHandler( retryMaxLimit: retryMaxLimit, idempotence: idempotence.New(redisClient, idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(24*time.Hour), + idempotence.WithSuccessfulTTL(idempotencyKeyTTL), ), alertMonitor: alertMonitor, } diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 16147ecc3..4611be44e 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -58,6 +58,7 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -122,6 +123,7 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -183,6 +185,7 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -254,6 +257,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -328,6 +332,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -392,6 +397,7 @@ func TestMessageHandler_EventGetterError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message simulating a retry @@ -458,6 +464,7 @@ func TestMessageHandler_RetryFlow(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + 24*time.Hour, ) // Create and handle message simulating a retry @@ -527,6 +534,7 @@ func TestMessageHandler_Idempotency(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + 24*time.Hour, ) // Create message with fixed ID for idempotency check @@ -595,6 +603,7 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + 24*time.Hour, ) // Create retry message @@ -672,6 +681,7 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -735,6 +745,7 @@ func TestMessageHandler_LogPublisherError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + 24*time.Hour, ) // Create and handle message @@ -797,6 +808,7 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + 24*time.Hour, ) // Create and handle message @@ -860,6 +872,7 @@ func TestManualDelivery_Success(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -934,6 +947,7 @@ func TestManualDelivery_PublishError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -1000,6 +1014,7 @@ func TestManualDelivery_CancelError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -1068,6 +1083,7 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -1141,6 +1157,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message @@ -1202,6 +1219,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + 24*time.Hour, ) // Create and handle message diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 8c4dfb8b0..2a70a883e 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -63,6 +63,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, s.retryMaxCount, s.alertMonitor, + 24*time.Hour, ) // Setup message consumer diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index c061ce2a3..9f92afc8e 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -43,13 +43,14 @@ func NewEventHandler( entityStore models.EntityStore, eventTracer eventtracer.EventTracer, topics []string, + idempotencyKeyTTL time.Duration, ) EventHandler { emeter, _ := emetrics.New() eventHandler := &eventHandler{ logger: logger, idempotence: idempotence.New(redisClient, idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(24*time.Hour), + idempotence.WithSuccessfulTTL(idempotencyKeyTTL), ), deliveryMQ: deliveryMQ, entityStore: entityStore, diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index 951cc17ee..136a8f252 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -38,6 +38,7 @@ func TestIntegrationPublishMQEventHandler_Concurrency(t *testing.T) { entityStore, mockEventTracer, testutil.TestTopics, + 24*time.Hour, ) tenant := models.Tenant{ @@ -102,6 +103,7 @@ func TestEventHandler_WildcardTopic(t *testing.T) { entityStore, mockEventTracer, testutil.TestTopics, + 24*time.Hour, ) tenant := models.Tenant{ diff --git a/internal/services/api/api.go b/internal/services/api/api.go index e8e069130..dab1a48ed 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -122,7 +122,7 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log ) logger.Debug("creating event handler and router") - eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, cfg.Topics) + eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, cfg.Topics, time.Duration(cfg.PublishIdempotencyKeyTTL)*time.Second) router := NewRouter( RouterConfig{ ServiceName: cfg.OpenTelemetry.GetServiceName(), diff --git a/internal/services/api/router_test.go b/internal/services/api/router_test.go index efaccfa83..d1c7de4dd 100644 --- a/internal/services/api/router_test.go +++ b/internal/services/api/router_test.go @@ -40,7 +40,7 @@ func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *te eventTracer := eventtracer.NewNoopEventTracer() entityStore := setupTestEntityStore(t, redisClient, nil) logStore := setupTestLogStore(t, funcs...) - eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, testutil.TestTopics) + eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, testutil.TestTopics, 24*time.Hour) router := api.NewRouter( api.RouterConfig{ ServiceName: "", diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 8a13b31d0..2e79ce09f 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -153,6 +153,7 @@ func NewService(ctx context.Context, retryBackoff, retryMaxLimit, alertMonitor, + time.Duration(cfg.DeliveryIdempotencyKeyTTL)*time.Second, ) } From 5e4f083e140670e56d1ab7379d1caf719a290824 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 20 Oct 2025 20:21:12 +0700 Subject: [PATCH 2/3] refactor: idempotence params # Conflicts: # internal/services/api/router_test.go # Conflicts: # internal/services/delivery/delivery.go --- internal/deliverymq/messagehandler.go | 11 ++--- internal/deliverymq/messagehandler_test.go | 55 ++++++++-------------- internal/deliverymq/retry_test.go | 4 +- internal/publishmq/eventhandler.go | 12 ++--- internal/publishmq/eventhandler_test.go | 7 ++- internal/services/api/api.go | 7 ++- internal/services/api/router_test.go | 4 +- internal/services/delivery/delivery.go | 9 +++- 8 files changed, 46 insertions(+), 63 deletions(-) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index f2d854405..73390aab4 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -14,7 +14,6 @@ import ( "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/mqs" - "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/scheduler" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -110,7 +109,6 @@ type AlertMonitor interface { func NewMessageHandler( logger *logging.Logger, - redisClient redis.Cmdable, logMQ LogPublisher, entityStore DestinationGetter, logStore EventGetter, @@ -120,7 +118,7 @@ func NewMessageHandler( retryBackoff backoff.Backoff, retryMaxLimit int, alertMonitor AlertMonitor, - idempotencyKeyTTL time.Duration, + idempotence idempotence.Idempotence, ) consumer.MessageHandler { return &messageHandler{ eventTracer: eventTracer, @@ -132,11 +130,8 @@ func NewMessageHandler( retryScheduler: retryScheduler, retryBackoff: retryBackoff, retryMaxLimit: retryMaxLimit, - idempotence: idempotence.New(redisClient, - idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(idempotencyKeyTTL), - ), - alertMonitor: alertMonitor, + idempotence: idempotence, + alertMonitor: alertMonitor, } } diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 4611be44e..1d5c787d9 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -10,6 +10,7 @@ import ( "github.com/hookdeck/outpost/internal/backoff" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testutil" @@ -48,7 +49,6 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), newMockLogPublisher(nil), destGetter, eventGetter, @@ -58,7 +58,7 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -113,7 +113,6 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -123,7 +122,7 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -175,7 +174,6 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -185,7 +183,7 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -247,7 +245,6 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -257,7 +254,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -322,7 +319,6 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -332,7 +328,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -387,7 +383,6 @@ func TestMessageHandler_EventGetterError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -397,7 +392,7 @@ func TestMessageHandler_EventGetterError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message simulating a retry @@ -454,7 +449,6 @@ func TestMessageHandler_RetryFlow(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -464,7 +458,7 @@ func TestMessageHandler_RetryFlow(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message simulating a retry @@ -524,7 +518,6 @@ func TestMessageHandler_Idempotency(t *testing.T) { redis := testutil.CreateTestRedisClient(t) handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - redis, logPublisher, destGetter, eventGetter, @@ -534,7 +527,7 @@ func TestMessageHandler_Idempotency(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), - 24*time.Hour, + idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create message with fixed ID for idempotency check @@ -593,7 +586,6 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) { redis := testutil.CreateTestRedisClient(t) handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - redis, logPublisher, destGetter, eventGetter, @@ -603,7 +595,7 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), - 24*time.Hour, + idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create retry message @@ -671,7 +663,6 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -681,7 +672,7 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -735,7 +726,6 @@ func TestMessageHandler_LogPublisherError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -745,7 +735,7 @@ func TestMessageHandler_LogPublisherError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -798,7 +788,6 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -808,7 +797,7 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -862,7 +851,6 @@ func TestManualDelivery_Success(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -872,7 +860,7 @@ func TestManualDelivery_Success(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -937,7 +925,6 @@ func TestManualDelivery_PublishError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -947,7 +934,7 @@ func TestManualDelivery_PublishError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1004,7 +991,6 @@ func TestManualDelivery_CancelError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1014,7 +1000,7 @@ func TestManualDelivery_CancelError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1073,7 +1059,6 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1083,7 +1068,7 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1147,7 +1132,6 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1157,7 +1141,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1209,7 +1193,6 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1219,7 +1202,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 2a70a883e..49dfc32fd 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -9,6 +9,7 @@ import ( "github.com/hookdeck/outpost/internal/backoff" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/mqs" @@ -53,7 +54,6 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), s.logPublisher, s.destGetter, s.eventGetter, @@ -63,7 +63,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, s.retryMaxCount, s.alertMonitor, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Setup message consumer diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index 9f92afc8e..401e6eac1 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -4,7 +4,6 @@ import ( "context" "errors" "slices" - "time" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/emetrics" @@ -12,7 +11,6 @@ import ( "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/redis" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -38,20 +36,16 @@ type eventHandler struct { func NewEventHandler( logger *logging.Logger, - redisClient redis.Cmdable, deliveryMQ *deliverymq.DeliveryMQ, entityStore models.EntityStore, eventTracer eventtracer.EventTracer, topics []string, - idempotencyKeyTTL time.Duration, + idempotence idempotence.Idempotence, ) EventHandler { emeter, _ := emetrics.New() eventHandler := &eventHandler{ - logger: logger, - idempotence: idempotence.New(redisClient, - idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(idempotencyKeyTTL), - ), + logger: logger, + idempotence: idempotence, deliveryMQ: deliveryMQ, entityStore: entityStore, eventTracer: eventTracer, diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index 136a8f252..1ba9568e5 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hookdeck/outpost/internal/deliverymq" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/publishmq" @@ -33,12 +34,11 @@ func TestIntegrationPublishMQEventHandler_Concurrency(t *testing.T) { require.NoError(t, err) defer cleanup() eventHandler := publishmq.NewEventHandler(logger, - testutil.CreateTestRedisClient(t), deliveryMQ, entityStore, mockEventTracer, testutil.TestTopics, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) tenant := models.Tenant{ @@ -98,12 +98,11 @@ func TestEventHandler_WildcardTopic(t *testing.T) { require.NoError(t, err) eventHandler := publishmq.NewEventHandler(logger, - testutil.CreateTestRedisClient(t), deliveryMQ, entityStore, mockEventTracer, testutil.TestTopics, - 24*time.Hour, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) tenant := models.Tenant{ diff --git a/internal/services/api/api.go b/internal/services/api/api.go index dab1a48ed..ffc764e14 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -14,6 +14,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logstore" "github.com/hookdeck/outpost/internal/models" @@ -122,7 +123,11 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log ) logger.Debug("creating event handler and router") - eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, cfg.Topics, time.Duration(cfg.PublishIdempotencyKeyTTL)*time.Second) + publishIdempotence := idempotence.New(redisClient, + idempotence.WithTimeout(5*time.Second), + idempotence.WithSuccessfulTTL(time.Duration(cfg.PublishIdempotencyKeyTTL)*time.Second), + ) + eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, cfg.Topics, publishIdempotence) router := NewRouter( RouterConfig{ ServiceName: cfg.OpenTelemetry.GetServiceName(), diff --git a/internal/services/api/router_test.go b/internal/services/api/router_test.go index d1c7de4dd..1fb4a8b1a 100644 --- a/internal/services/api/router_test.go +++ b/internal/services/api/router_test.go @@ -5,11 +5,13 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/gin-gonic/gin" "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logstore" @@ -40,7 +42,7 @@ func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *te eventTracer := eventtracer.NewNoopEventTracer() entityStore := setupTestEntityStore(t, redisClient, nil) logStore := setupTestLogStore(t, funcs...) - eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, testutil.TestTopics, 24*time.Hour) + eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, testutil.TestTopics, idempotence.New(redisClient, idempotence.WithSuccessfulTTL(24*time.Hour))) router := api.NewRouter( api.RouterConfig{ ServiceName: "", diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 2e79ce09f..6f8de68da 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -13,6 +13,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logmq" "github.com/hookdeck/outpost/internal/logstore" @@ -139,11 +140,15 @@ func NewService(ctx context.Context, alert.WithDeploymentID(cfg.DeploymentID), ) + deliveryIdempotence := idempotence.New(redisClient, + idempotence.WithTimeout(5*time.Second), + idempotence.WithSuccessfulTTL(time.Duration(cfg.DeliveryIdempotencyKeyTTL)*time.Second), + ) + retryBackoff, retryMaxLimit := cfg.GetRetryBackoff() handler = deliverymq.NewMessageHandler( logger, - redisClient, logMQ, entityStore, logStore, @@ -153,7 +158,7 @@ func NewService(ctx context.Context, retryBackoff, retryMaxLimit, alertMonitor, - time.Duration(cfg.DeliveryIdempotencyKeyTTL)*time.Second, + deliveryIdempotence, ) } From 92952d2b4954fed096d1e8b941e37ddd62bd3180 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Mon, 20 Oct 2025 20:24:35 +0700 Subject: [PATCH 3/3] chore: remove unused func --- internal/services/api/router_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/internal/services/api/router_test.go b/internal/services/api/router_test.go index 1fb4a8b1a..dcdf0e478 100644 --- a/internal/services/api/router_test.go +++ b/internal/services/api/router_test.go @@ -27,12 +27,6 @@ import ( const baseAPIPath = "/api/v1" -func testRouterWithCHDB(t *testing.T, config *clickhouse.ClickHouseConfig) clickhouse.DB { - chDB, err := clickhouse.New(config) - require.NoError(t, err) - return chDB -} - func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *testing.T) clickhouse.DB) (http.Handler, *logging.Logger, redis.Client) { gin.SetMode(gin.TestMode) logger := testutil.CreateTestLogger(t)