diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index f3cec0823..fb04ca9c2 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -41,6 +41,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes | | `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes | | `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes | +| `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours). | `86400` | No | | `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No | | `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No | | `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No | @@ -105,6 +106,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS` | JSON string or path to a file containing GCP service account credentials for the Pub/Sub publish topic. Required if GCP Pub/Sub is chosen and not using implicit credentials. | `nil` | Conditional | | `PUBLISH_GCP_PUBSUB_SUBSCRIPTION` | Name of the GCP Pub/Sub subscription to read published events from. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional | | `PUBLISH_GCP_PUBSUB_TOPIC` | Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional | +| `PUBLISH_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours). | `86400` | No | | `PUBLISH_MAX_CONCURRENCY` | Maximum number of messages to process concurrently from the publish queue. | `1` | No | | `PUBLISH_RABBITMQ_EXCHANGE` | Name of the RabbitMQ exchange for the publish queue. | `nil` | No | | `PUBLISH_RABBITMQ_QUEUE` | Name of the RabbitMQ queue for publishing events. Required if RabbitMQ is the chosen publish MQ provider. | `nil` | Conditional | @@ -166,6 +168,9 @@ alert: # Enables or disables audit logging for significant events. audit_log: true +# Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours). +delivery_idempotency_key_ttl: 86400 + # Maximum number of delivery attempts to process concurrently. delivery_max_concurrency: 1 @@ -451,6 +456,9 @@ portal: # Required: Y postgres: "" +# Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours). +publish_idempotency_key_ttl: 86400 + publishmq: # Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured. aws_sqs: diff --git a/internal/config/config.go b/internal/config/config.go index 01b3d9ffb..b61ee8807 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -83,6 +83,10 @@ type Config struct { MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"` DeliveryTimeoutSeconds int `yaml:"delivery_timeout_seconds" env:"DELIVERY_TIMEOUT_SECONDS" desc:"Timeout in seconds for HTTP requests made during event delivery to webhook destinations." required:"N"` + // Idempotency + PublishIdempotencyKeyTTL int `yaml:"publish_idempotency_key_ttl" env:"PUBLISH_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours)." required:"N"` + DeliveryIdempotencyKeyTTL int `yaml:"delivery_idempotency_key_ttl" env:"DELIVERY_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours)." required:"N"` + // Destination Registry DestinationMetadataPath string `yaml:"destination_metadata_path" env:"DESTINATION_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set." required:"N"` @@ -160,6 +164,8 @@ func (c *Config) InitDefaults() { c.RetryMaxLimit = 10 c.MaxDestinationsPerTenant = 20 c.DeliveryTimeoutSeconds = 5 + c.PublishIdempotencyKeyTTL = 86400 // 24 hours + c.DeliveryIdempotencyKeyTTL = 86400 // 24 hours c.LogBatchThresholdSeconds = 10 c.LogBatchSize = 1000 diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index 60ea4fd28..73390aab4 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -14,7 +14,6 @@ import ( "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/mqs" - "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/scheduler" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -110,7 +109,6 @@ type AlertMonitor interface { func NewMessageHandler( logger *logging.Logger, - redisClient redis.Cmdable, logMQ LogPublisher, entityStore DestinationGetter, logStore EventGetter, @@ -120,6 +118,7 @@ func NewMessageHandler( retryBackoff backoff.Backoff, retryMaxLimit int, alertMonitor AlertMonitor, + idempotence idempotence.Idempotence, ) consumer.MessageHandler { return &messageHandler{ eventTracer: eventTracer, @@ -131,11 +130,8 @@ func NewMessageHandler( retryScheduler: retryScheduler, retryBackoff: retryBackoff, retryMaxLimit: retryMaxLimit, - idempotence: idempotence.New(redisClient, - idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(24*time.Hour), - ), - alertMonitor: alertMonitor, + idempotence: idempotence, + alertMonitor: alertMonitor, } } diff --git a/internal/deliverymq/messagehandler_test.go b/internal/deliverymq/messagehandler_test.go index 16147ecc3..1d5c787d9 100644 --- a/internal/deliverymq/messagehandler_test.go +++ b/internal/deliverymq/messagehandler_test.go @@ -10,6 +10,7 @@ import ( "github.com/hookdeck/outpost/internal/backoff" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testutil" @@ -48,7 +49,6 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), newMockLogPublisher(nil), destGetter, eventGetter, @@ -58,6 +58,7 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -112,7 +113,6 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -122,6 +122,7 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -173,7 +174,6 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -183,6 +183,7 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -244,7 +245,6 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -254,6 +254,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -318,7 +319,6 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -328,6 +328,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -382,7 +383,6 @@ func TestMessageHandler_EventGetterError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -392,6 +392,7 @@ func TestMessageHandler_EventGetterError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message simulating a retry @@ -448,7 +449,6 @@ func TestMessageHandler_RetryFlow(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -458,6 +458,7 @@ func TestMessageHandler_RetryFlow(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message simulating a retry @@ -517,7 +518,6 @@ func TestMessageHandler_Idempotency(t *testing.T) { redis := testutil.CreateTestRedisClient(t) handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - redis, logPublisher, destGetter, eventGetter, @@ -527,6 +527,7 @@ func TestMessageHandler_Idempotency(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create message with fixed ID for idempotency check @@ -585,7 +586,6 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) { redis := testutil.CreateTestRedisClient(t) handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - redis, logPublisher, destGetter, eventGetter, @@ -595,6 +595,7 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create retry message @@ -662,7 +663,6 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -672,6 +672,7 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -725,7 +726,6 @@ func TestMessageHandler_LogPublisherError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -735,6 +735,7 @@ func TestMessageHandler_LogPublisherError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -787,7 +788,6 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -797,6 +797,7 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, newMockAlertMonitor(), + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -850,7 +851,6 @@ func TestManualDelivery_Success(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -860,6 +860,7 @@ func TestManualDelivery_Success(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -924,7 +925,6 @@ func TestManualDelivery_PublishError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -934,6 +934,7 @@ func TestManualDelivery_PublishError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -990,7 +991,6 @@ func TestManualDelivery_CancelError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1000,6 +1000,7 @@ func TestManualDelivery_CancelError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1058,7 +1059,6 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1068,6 +1068,7 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1131,7 +1132,6 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1141,6 +1141,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message @@ -1192,7 +1193,6 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), logPublisher, destGetter, eventGetter, @@ -1202,6 +1202,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, 10, alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Create and handle message diff --git a/internal/deliverymq/retry_test.go b/internal/deliverymq/retry_test.go index 8c4dfb8b0..49dfc32fd 100644 --- a/internal/deliverymq/retry_test.go +++ b/internal/deliverymq/retry_test.go @@ -9,6 +9,7 @@ import ( "github.com/hookdeck/outpost/internal/backoff" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/mqs" @@ -53,7 +54,6 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { // Setup message handler handler := deliverymq.NewMessageHandler( testutil.CreateTestLogger(t), - testutil.CreateTestRedisClient(t), s.logPublisher, s.destGetter, s.eventGetter, @@ -63,6 +63,7 @@ func (s *RetryDeliveryMQSuite) SetupTest(t *testing.T) { &backoff.ConstantBackoff{Interval: 1 * time.Second}, s.retryMaxCount, s.alertMonitor, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) // Setup message consumer diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index c061ce2a3..401e6eac1 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -4,7 +4,6 @@ import ( "context" "errors" "slices" - "time" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/emetrics" @@ -12,7 +11,6 @@ import ( "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/models" - "github.com/hookdeck/outpost/internal/redis" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -38,19 +36,16 @@ type eventHandler struct { func NewEventHandler( logger *logging.Logger, - redisClient redis.Cmdable, deliveryMQ *deliverymq.DeliveryMQ, entityStore models.EntityStore, eventTracer eventtracer.EventTracer, topics []string, + idempotence idempotence.Idempotence, ) EventHandler { emeter, _ := emetrics.New() eventHandler := &eventHandler{ - logger: logger, - idempotence: idempotence.New(redisClient, - idempotence.WithTimeout(5*time.Second), - idempotence.WithSuccessfulTTL(24*time.Hour), - ), + logger: logger, + idempotence: idempotence, deliveryMQ: deliveryMQ, entityStore: entityStore, eventTracer: eventTracer, diff --git a/internal/publishmq/eventhandler_test.go b/internal/publishmq/eventhandler_test.go index 951cc17ee..1ba9568e5 100644 --- a/internal/publishmq/eventhandler_test.go +++ b/internal/publishmq/eventhandler_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hookdeck/outpost/internal/deliverymq" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/publishmq" @@ -33,11 +34,11 @@ func TestIntegrationPublishMQEventHandler_Concurrency(t *testing.T) { require.NoError(t, err) defer cleanup() eventHandler := publishmq.NewEventHandler(logger, - testutil.CreateTestRedisClient(t), deliveryMQ, entityStore, mockEventTracer, testutil.TestTopics, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) tenant := models.Tenant{ @@ -97,11 +98,11 @@ func TestEventHandler_WildcardTopic(t *testing.T) { require.NoError(t, err) eventHandler := publishmq.NewEventHandler(logger, - testutil.CreateTestRedisClient(t), deliveryMQ, entityStore, mockEventTracer, testutil.TestTopics, + idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)), ) tenant := models.Tenant{ diff --git a/internal/services/api/api.go b/internal/services/api/api.go index e8e069130..ffc764e14 100644 --- a/internal/services/api/api.go +++ b/internal/services/api/api.go @@ -14,6 +14,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logstore" "github.com/hookdeck/outpost/internal/models" @@ -122,7 +123,11 @@ func NewService(ctx context.Context, wg *sync.WaitGroup, cfg *config.Config, log ) logger.Debug("creating event handler and router") - eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, cfg.Topics) + publishIdempotence := idempotence.New(redisClient, + idempotence.WithTimeout(5*time.Second), + idempotence.WithSuccessfulTTL(time.Duration(cfg.PublishIdempotencyKeyTTL)*time.Second), + ) + eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, cfg.Topics, publishIdempotence) router := NewRouter( RouterConfig{ ServiceName: cfg.OpenTelemetry.GetServiceName(), diff --git a/internal/services/api/router_test.go b/internal/services/api/router_test.go index efaccfa83..dcdf0e478 100644 --- a/internal/services/api/router_test.go +++ b/internal/services/api/router_test.go @@ -5,11 +5,13 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/gin-gonic/gin" "github.com/hookdeck/outpost/internal/clickhouse" "github.com/hookdeck/outpost/internal/deliverymq" "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logstore" @@ -25,12 +27,6 @@ import ( const baseAPIPath = "/api/v1" -func testRouterWithCHDB(t *testing.T, config *clickhouse.ClickHouseConfig) clickhouse.DB { - chDB, err := clickhouse.New(config) - require.NoError(t, err) - return chDB -} - func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *testing.T) clickhouse.DB) (http.Handler, *logging.Logger, redis.Client) { gin.SetMode(gin.TestMode) logger := testutil.CreateTestLogger(t) @@ -40,7 +36,7 @@ func setupTestRouter(t *testing.T, apiKey, jwtSecret string, funcs ...func(t *te eventTracer := eventtracer.NewNoopEventTracer() entityStore := setupTestEntityStore(t, redisClient, nil) logStore := setupTestLogStore(t, funcs...) - eventHandler := publishmq.NewEventHandler(logger, redisClient, deliveryMQ, entityStore, eventTracer, testutil.TestTopics) + eventHandler := publishmq.NewEventHandler(logger, deliveryMQ, entityStore, eventTracer, testutil.TestTopics, idempotence.New(redisClient, idempotence.WithSuccessfulTTL(24*time.Hour))) router := api.NewRouter( api.RouterConfig{ ServiceName: "", diff --git a/internal/services/delivery/delivery.go b/internal/services/delivery/delivery.go index 8a13b31d0..6f8de68da 100644 --- a/internal/services/delivery/delivery.go +++ b/internal/services/delivery/delivery.go @@ -13,6 +13,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" destregistrydefault "github.com/hookdeck/outpost/internal/destregistry/providers" "github.com/hookdeck/outpost/internal/eventtracer" + "github.com/hookdeck/outpost/internal/idempotence" "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/logmq" "github.com/hookdeck/outpost/internal/logstore" @@ -139,11 +140,15 @@ func NewService(ctx context.Context, alert.WithDeploymentID(cfg.DeploymentID), ) + deliveryIdempotence := idempotence.New(redisClient, + idempotence.WithTimeout(5*time.Second), + idempotence.WithSuccessfulTTL(time.Duration(cfg.DeliveryIdempotencyKeyTTL)*time.Second), + ) + retryBackoff, retryMaxLimit := cfg.GetRetryBackoff() handler = deliverymq.NewMessageHandler( logger, - redisClient, logMQ, entityStore, logStore, @@ -153,6 +158,7 @@ func NewService(ctx context.Context, retryBackoff, retryMaxLimit, alertMonitor, + deliveryIdempotence, ) }