Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/pages/references/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes |
| `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes |
| `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes |
| `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours). | `86400` | No |
| `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No |
| `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No |
| `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No |
Expand Down Expand Up @@ -105,6 +106,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS` | JSON string or path to a file containing GCP service account credentials for the Pub/Sub publish topic. Required if GCP Pub/Sub is chosen and not using implicit credentials. | `nil` | Conditional |
| `PUBLISH_GCP_PUBSUB_SUBSCRIPTION` | Name of the GCP Pub/Sub subscription to read published events from. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional |
| `PUBLISH_GCP_PUBSUB_TOPIC` | Name of the GCP Pub/Sub topic for publishing events. Required if GCP Pub/Sub is the chosen publish MQ provider. | `nil` | Conditional |
| `PUBLISH_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours). | `86400` | No |
Copy link
Copy Markdown
Contributor

@alexbouchardd alexbouchardd Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of bundling configs in the docs that 99% of people won't ever have to change with configs you expects users to change like PUBLISH_GCP_PUBSUB_SERVICE_ACCOUNT_CREDENTIALS

We need a better way of surfacing "standard" and "advanced" configs

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

| `PUBLISH_MAX_CONCURRENCY` | Maximum number of messages to process concurrently from the publish queue. | `1` | No |
| `PUBLISH_RABBITMQ_EXCHANGE` | Name of the RabbitMQ exchange for the publish queue. | `nil` | No |
| `PUBLISH_RABBITMQ_QUEUE` | Name of the RabbitMQ queue for publishing events. Required if RabbitMQ is the chosen publish MQ provider. | `nil` | Conditional |
Expand Down Expand Up @@ -166,6 +168,9 @@ alert:
# Enables or disables audit logging for significant events.
audit_log: true

# Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours).
delivery_idempotency_key_ttl: 86400

# Maximum number of delivery attempts to process concurrently.
delivery_max_concurrency: 1

Expand Down Expand Up @@ -451,6 +456,9 @@ portal:
# Required: Y
postgres: ""

# Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours).
publish_idempotency_key_ttl: 86400

publishmq:
# Configuration for using AWS SQS as the publish message queue. Only one publish MQ provider should be configured.
aws_sqs:
Expand Down
6 changes: 6 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type Config struct {
MaxDestinationsPerTenant int `yaml:"max_destinations_per_tenant" env:"MAX_DESTINATIONS_PER_TENANT" desc:"Maximum number of destinations allowed per tenant/organization." required:"N"`
DeliveryTimeoutSeconds int `yaml:"delivery_timeout_seconds" env:"DELIVERY_TIMEOUT_SECONDS" desc:"Timeout in seconds for HTTP requests made during event delivery to webhook destinations." required:"N"`

// Idempotency
PublishIdempotencyKeyTTL int `yaml:"publish_idempotency_key_ttl" env:"PUBLISH_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 86400 (24 hours)." required:"N"`
DeliveryIdempotencyKeyTTL int `yaml:"delivery_idempotency_key_ttl" env:"DELIVERY_IDEMPOTENCY_KEY_TTL" desc:"Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 86400 (24 hours)." required:"N"`

// Destination Registry
DestinationMetadataPath string `yaml:"destination_metadata_path" env:"DESTINATION_METADATA_PATH" desc:"Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set." required:"N"`

Expand Down Expand Up @@ -160,6 +164,8 @@ func (c *Config) InitDefaults() {
c.RetryMaxLimit = 10
c.MaxDestinationsPerTenant = 20
c.DeliveryTimeoutSeconds = 5
c.PublishIdempotencyKeyTTL = 86400 // 24 hours
c.DeliveryIdempotencyKeyTTL = 86400 // 24 hours
c.LogBatchThresholdSeconds = 10
c.LogBatchSize = 1000

Expand Down
10 changes: 3 additions & 7 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/models"
"github.com/hookdeck/outpost/internal/mqs"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/scheduler"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -110,7 +109,6 @@ type AlertMonitor interface {

func NewMessageHandler(
logger *logging.Logger,
redisClient redis.Cmdable,
logMQ LogPublisher,
entityStore DestinationGetter,
logStore EventGetter,
Expand All @@ -120,6 +118,7 @@ func NewMessageHandler(
retryBackoff backoff.Backoff,
retryMaxLimit int,
alertMonitor AlertMonitor,
idempotence idempotence.Idempotence,
) consumer.MessageHandler {
return &messageHandler{
eventTracer: eventTracer,
Expand All @@ -131,11 +130,8 @@ func NewMessageHandler(
retryScheduler: retryScheduler,
retryBackoff: retryBackoff,
retryMaxLimit: retryMaxLimit,
idempotence: idempotence.New(redisClient,
idempotence.WithTimeout(5*time.Second),
idempotence.WithSuccessfulTTL(24*time.Hour),
),
alertMonitor: alertMonitor,
idempotence: idempotence,
alertMonitor: alertMonitor,
}
}

Expand Down
37 changes: 19 additions & 18 deletions internal/deliverymq/messagehandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hookdeck/outpost/internal/backoff"
"github.com/hookdeck/outpost/internal/deliverymq"
"github.com/hookdeck/outpost/internal/destregistry"
"github.com/hookdeck/outpost/internal/idempotence"
"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/models"
"github.com/hookdeck/outpost/internal/util/testutil"
Expand Down Expand Up @@ -48,7 +49,6 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
newMockLogPublisher(nil),
destGetter,
eventGetter,
Expand All @@ -58,6 +58,7 @@ func TestMessageHandler_DestinationGetterError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -112,7 +113,6 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -122,6 +122,7 @@ func TestMessageHandler_DestinationNotFound(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -173,7 +174,6 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -183,6 +183,7 @@ func TestMessageHandler_DestinationDeleted(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -244,7 +245,6 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -254,6 +254,7 @@ func TestMessageHandler_PublishError_EligibleForRetry(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -318,7 +319,6 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -328,6 +328,7 @@ func TestMessageHandler_PublishError_NotEligible(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -382,7 +383,6 @@ func TestMessageHandler_EventGetterError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -392,6 +392,7 @@ func TestMessageHandler_EventGetterError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message simulating a retry
Expand Down Expand Up @@ -448,7 +449,6 @@ func TestMessageHandler_RetryFlow(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -458,6 +458,7 @@ func TestMessageHandler_RetryFlow(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
newMockAlertMonitor(),
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message simulating a retry
Expand Down Expand Up @@ -517,7 +518,6 @@ func TestMessageHandler_Idempotency(t *testing.T) {
redis := testutil.CreateTestRedisClient(t)
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
redis,
logPublisher,
destGetter,
eventGetter,
Expand All @@ -527,6 +527,7 @@ func TestMessageHandler_Idempotency(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
newMockAlertMonitor(),
idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create message with fixed ID for idempotency check
Expand Down Expand Up @@ -585,7 +586,6 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) {
redis := testutil.CreateTestRedisClient(t)
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
redis,
logPublisher,
destGetter,
eventGetter,
Expand All @@ -595,6 +595,7 @@ func TestMessageHandler_IdempotencyWithSystemError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
newMockAlertMonitor(),
idempotence.New(redis, idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create retry message
Expand Down Expand Up @@ -662,7 +663,6 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -672,6 +672,7 @@ func TestMessageHandler_DestinationDisabled(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -725,7 +726,6 @@ func TestMessageHandler_LogPublisherError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -735,6 +735,7 @@ func TestMessageHandler_LogPublisherError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
newMockAlertMonitor(),
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -787,7 +788,6 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -797,6 +797,7 @@ func TestMessageHandler_PublishAndLogError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
newMockAlertMonitor(),
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -850,7 +851,6 @@ func TestManualDelivery_Success(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -860,6 +860,7 @@ func TestManualDelivery_Success(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -924,7 +925,6 @@ func TestManualDelivery_PublishError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -934,6 +934,7 @@ func TestManualDelivery_PublishError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -990,7 +991,6 @@ func TestManualDelivery_CancelError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -1000,6 +1000,7 @@ func TestManualDelivery_CancelError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -1058,7 +1059,6 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -1068,6 +1068,7 @@ func TestManualDelivery_DestinationDisabled(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -1131,7 +1132,6 @@ func TestMessageHandler_PublishSuccess(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -1141,6 +1141,7 @@ func TestMessageHandler_PublishSuccess(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down Expand Up @@ -1192,7 +1193,6 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) {
// Setup message handler
handler := deliverymq.NewMessageHandler(
testutil.CreateTestLogger(t),
testutil.CreateTestRedisClient(t),
logPublisher,
destGetter,
eventGetter,
Expand All @@ -1202,6 +1202,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) {
&backoff.ConstantBackoff{Interval: 1 * time.Second},
10,
alertMonitor,
idempotence.New(testutil.CreateTestRedisClient(t), idempotence.WithSuccessfulTTL(24*time.Hour)),
)

// Create and handle message
Expand Down
Loading
Loading