Skip to content

Commit 34057b1

Browse files
committed
fix: subscription webhook correctness
- Emit subscription.renewed from scanner when advance_subscription_period succeeds (was only in hourly job fallback, which rarely runs) - Fix subscription.canceled spam: select subs before UPDATE, dispatch webhooks only for those rows (was re-dispatching for all historically canceled subs every hour) - Emit subscription.canceled on immediate cancel (at_period_end: false) in API handler with "immediate": true in payload - Remove duplicate subscription.renewed from hourly job since scanner now handles it
1 parent 030f22f commit 34057b1

3 files changed

Lines changed: 71 additions & 35 deletions

File tree

src/api/subscriptions.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use actix_web::{web, HttpRequest, HttpResponse};
22
use sqlx::SqlitePool;
33

44
use crate::subscriptions::{self, CreateSubscriptionRequest};
5+
use crate::config::Config;
56

67
pub async fn create(
78
req: HttpRequest,
@@ -42,6 +43,7 @@ pub struct CancelBody {
4243
pub async fn cancel(
4344
req: HttpRequest,
4445
pool: web::Data<SqlitePool>,
46+
config: web::Data<Config>,
4547
path: web::Path<String>,
4648
body: web::Json<CancelBody>,
4749
) -> HttpResponse {
@@ -56,7 +58,28 @@ pub async fn cancel(
5658
match subscriptions::cancel_subscription(pool.get_ref(), &sub_id, &merchant.id, at_period_end)
5759
.await
5860
{
59-
Ok(Some(sub)) => HttpResponse::Ok().json(sub),
61+
Ok(Some(sub)) => {
62+
// Dispatch webhook for immediate cancels (at_period_end=false and status is now canceled)
63+
// Period-end cancels are handled by the hourly process_renewals job when they actually cancel
64+
if !at_period_end && sub.status == "canceled" {
65+
let http = reqwest::Client::new();
66+
let payload = serde_json::json!({
67+
"subscription_id": sub.id,
68+
"price_id": sub.price_id,
69+
"immediate": true,
70+
});
71+
let _ = crate::webhooks::dispatch_event(
72+
pool.get_ref(),
73+
&http,
74+
&merchant.id,
75+
"subscription.canceled",
76+
payload,
77+
&config.encryption_key,
78+
)
79+
.await;
80+
}
81+
HttpResponse::Ok().json(sub)
82+
}
6083
Ok(None) => {
6184
HttpResponse::NotFound().json(serde_json::json!({"error": "Subscription not found"}))
6285
}

src/scanner/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,25 @@ async fn on_invoice_confirmed(
715715
new_period_end = %sub.current_period_end,
716716
"Subscription advanced on payment confirmation"
717717
);
718+
// Dispatch subscription.renewed webhook
719+
let payload = serde_json::json!({
720+
"subscription_id": sub.id,
721+
"invoice_id": invoice.id,
722+
"new_period_start": sub.current_period_start,
723+
"new_period_end": sub.current_period_end,
724+
});
725+
if let Err(e) = crate::webhooks::dispatch_event(
726+
pool,
727+
http,
728+
&invoice.merchant_id,
729+
"subscription.renewed",
730+
payload,
731+
&config.encryption_key,
732+
)
733+
.await
734+
{
735+
tracing::error!(sub_id, error = %e, "Failed to dispatch subscription.renewed webhook");
736+
}
718737
}
719738
Ok(None) => {
720739
tracing::warn!(sub_id, "Subscription not found for confirmed invoice");

src/subscriptions/mod.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -175,28 +175,33 @@ pub async fn process_renewals(
175175
let mut actions = 0u32;
176176

177177
// 1. Cancel subscriptions marked for end-of-period cancellation
178-
let canceled = sqlx::query(
179-
"UPDATE subscriptions SET status = 'canceled'
180-
WHERE cancel_at_period_end = 1 AND current_period_end <= ? AND status = 'active'",
181-
)
182-
.bind(&now_str)
183-
.execute(pool)
184-
.await?;
178+
// First, select the ones we're about to cancel (before updating) so we dispatch webhooks only for these
179+
let q = format!(
180+
"SELECT {} FROM subscriptions WHERE cancel_at_period_end = 1 AND current_period_end <= ? AND status = 'active'",
181+
SUB_COLS
182+
);
183+
let to_cancel: Vec<Subscription> = sqlx::query_as::<_, Subscription>(&q)
184+
.bind(&now_str)
185+
.fetch_all(pool)
186+
.await?;
187+
188+
if !to_cancel.is_empty() {
189+
// Now update them
190+
sqlx::query(
191+
"UPDATE subscriptions SET status = 'canceled'
192+
WHERE cancel_at_period_end = 1 AND current_period_end <= ? AND status = 'active'",
193+
)
194+
.bind(&now_str)
195+
.execute(pool)
196+
.await?;
185197

186-
if canceled.rows_affected() > 0 {
187198
tracing::info!(
188-
count = canceled.rows_affected(),
199+
count = to_cancel.len(),
189200
"Subscriptions canceled at period end"
190201
);
191-
// Fire webhooks for canceled subscriptions
192-
let q = format!(
193-
"SELECT {} FROM subscriptions WHERE status = 'canceled' AND cancel_at_period_end = 1",
194-
SUB_COLS
195-
);
196-
let canceled_subs: Vec<Subscription> = sqlx::query_as::<_, Subscription>(&q)
197-
.fetch_all(pool)
198-
.await?;
199-
for sub in &canceled_subs {
202+
203+
// Fire webhooks only for the subscriptions we just canceled
204+
for sub in &to_cancel {
200205
let payload = serde_json::json!({
201206
"subscription_id": sub.id,
202207
"price_id": sub.price_id,
@@ -211,7 +216,7 @@ pub async fn process_renewals(
211216
)
212217
.await;
213218
}
214-
actions += canceled.rows_affected() as u32;
219+
actions += to_cancel.len() as u32;
215220
}
216221

217222
// 2. Generate draft invoices for active subscriptions approaching period end
@@ -326,6 +331,8 @@ pub async fn process_renewals(
326331
}
327332

328333
// 3. Advance paid periods (subscriptions past period_end with confirmed invoice)
334+
// Note: subscription.renewed webhook is dispatched by the scanner on payment confirmation.
335+
// This step is a fallback for edge cases (e.g., server restart during scan).
329336
let q = format!(
330337
"SELECT {} FROM subscriptions WHERE status = 'active' AND current_period_end <= ? AND cancel_at_period_end = 0",
331338
SUB_COLS
@@ -346,21 +353,8 @@ pub async fn process_renewals(
346353

347354
if let Some((ref status,)) = inv_status {
348355
if status == "confirmed" {
349-
if let Some(new_sub) = advance_subscription_period(pool, &sub.id).await? {
350-
let payload = serde_json::json!({
351-
"subscription_id": new_sub.id,
352-
"new_period_start": new_sub.current_period_start,
353-
"new_period_end": new_sub.current_period_end,
354-
});
355-
let _ = crate::webhooks::dispatch_event(
356-
pool,
357-
http,
358-
&sub.merchant_id,
359-
"subscription.renewed",
360-
payload,
361-
encryption_key,
362-
)
363-
.await;
356+
// Just advance the period — webhook already dispatched by scanner
357+
if advance_subscription_period(pool, &sub.id).await?.is_some() {
364358
actions += 1;
365359
}
366360
continue;

0 commit comments

Comments
 (0)