Skip to content

Commit f0f7b94

Browse files
authored
CronJob to process Messages (#14)
1 parent f58b321 commit f0f7b94

10 files changed

Lines changed: 375 additions & 140 deletions

File tree

RELEASE_NOTES.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
### ✨ Features
22

3+
- Uses cron job to process queued campaigns, processes messages in batches, much more efficient for large campaigns.
4+
- Theme toggle updated.
5+
- Loading state for API keys.
36
- New features and improvements.
47

58
### 🐛 Bug Fixes
69

7-
- Template preview fixed.
10+
- Recipient count on campaign page was incorrect.
811
- Various bug fixes and optimizations.
912

1013
### 📚 Docs

apps/backend/src/campaign/mutation.ts

Lines changed: 9 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,7 @@ import { authProcedure } from "../trpc"
33
import { prisma } from "../utils/prisma"
44
import { TRPCError } from "@trpc/server"
55
import pMap from "p-map"
6-
import { LinkTracker } from "../lib/LinkTracker"
7-
import { v4 as uuidV4 } from "uuid"
86
import { Mailer } from "../lib/Mailer"
9-
import {
10-
replacePlaceholders,
11-
PlaceholderDataKey,
12-
} from "../utils/placeholder-parser"
137

148
const createCampaignSchema = z.object({
159
title: z.string().min(1, "Campaign title is required"),
@@ -378,105 +372,16 @@ export const startCampaign = authProcedure
378372
})
379373
}
380374

381-
const updatedCampaign = await prisma.$transaction(async (tx) => {
382-
const status =
383-
campaign.scheduledAt && campaign.scheduledAt > new Date()
384-
? "SCHEDULED"
385-
: "SENDING"
386-
387-
const linkTracker = new LinkTracker(tx)
388-
389-
const messagesData = await Promise.all(
390-
Array.from(subscribers.values()).map(async (subscriber) => {
391-
if (!campaign.content) {
392-
throw new TRPCError({
393-
code: "BAD_REQUEST",
394-
message: "Campaign must have content",
395-
})
396-
}
397-
398-
if (!generalSettings.baseURL) {
399-
throw new TRPCError({
400-
code: "BAD_REQUEST",
401-
message:
402-
"Base URL must be configured in settings before running a campaign",
403-
})
404-
}
405-
406-
const messageId = uuidV4()
407-
let content = campaign.Template
408-
? campaign.Template.content.replace(
409-
/{{content}}/g,
410-
campaign.content
411-
)
412-
: campaign.content
413-
414-
const placeholderData: Partial<Record<PlaceholderDataKey, string>> = {
415-
"subscriber.email": subscriber.email,
416-
"campaign.name": campaign.title,
417-
"campaign.subject": campaign.subject ?? "",
418-
"organization.name": organization.name,
419-
unsubscribe_link: `${generalSettings.baseURL}/unsubscribe?sid=${subscriber.id}&cid=${campaign.id}&mid=${messageId}`,
420-
current_date: new Date().toLocaleDateString("en-CA"),
421-
}
422-
423-
if (campaign.openTracking) {
424-
content += `<img src="${generalSettings.baseURL}/img/${messageId}/img.png" alt="" width="1" height="1" style="display:none" />`
425-
}
426-
427-
if (subscriber.name) {
428-
placeholderData["subscriber.name"] = subscriber.name
429-
}
430-
431-
if (subscriber.Metadata) {
432-
for (const meta of subscriber.Metadata) {
433-
placeholderData[`subscriber.metadata.${meta.key}`] = meta.value
434-
}
435-
}
436-
437-
content = replacePlaceholders(content, placeholderData)
438-
439-
const { content: finalContent, trackedIds } =
440-
await linkTracker.replaceMessageContentWithTrackedLinks(
441-
content,
442-
campaign.id,
443-
generalSettings.baseURL
444-
)
445-
446-
return {
447-
message: {
448-
id: messageId,
449-
subscriberId: subscriber.id,
450-
content: finalContent,
451-
},
452-
subscriberTrackedLinks: trackedIds.map((trackedLinkId) => ({
453-
messageId,
454-
subscriberId: subscriber.id,
455-
trackedLinkId,
456-
})),
457-
}
458-
})
459-
)
460-
461-
const createdCampaign = await tx.campaign.update({
462-
where: { id: campaign.id },
463-
data: {
464-
status,
465-
Messages: {
466-
create: messagesData.map(({ message }) => message),
467-
},
468-
},
469-
include: {
470-
Messages: true,
471-
_count: {
472-
select: {
473-
Messages: true,
474-
},
475-
},
476-
},
477-
})
375+
const status =
376+
campaign.scheduledAt && campaign.scheduledAt > new Date()
377+
? "SCHEDULED"
378+
: "SENDING"
478379

479-
return createdCampaign
380+
const updatedCampaign = await prisma.campaign.update({
381+
where: { id: campaign.id },
382+
data: {
383+
status,
384+
},
480385
})
481386

482387
return { campaign: updatedCampaign }

apps/backend/src/campaign/query.ts

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,7 @@ export const getCampaign = authProcedure
114114
Template: true,
115115
CampaignLists: {
116116
include: {
117-
List: {
118-
include: {
119-
_count: {
120-
select: {
121-
ListSubscribers: {
122-
where: {
123-
unsubscribedAt: null,
124-
},
125-
},
126-
},
127-
},
128-
},
129-
},
117+
List: true,
130118
},
131119
},
132120
},
@@ -139,6 +127,46 @@ export const getCampaign = authProcedure
139127
})
140128
}
141129

130+
const listSubscribers = await prisma.listSubscriber.findMany({
131+
where: {
132+
listId: {
133+
in: campaign.CampaignLists.map((cl) => cl.listId),
134+
},
135+
unsubscribedAt: null,
136+
},
137+
select: {
138+
id: true,
139+
},
140+
distinct: ["subscriberId"],
141+
})
142+
143+
// Add the count to each list for backward compatibility
144+
const campaignWithCounts = {
145+
...campaign,
146+
CampaignLists: await Promise.all(
147+
campaign.CampaignLists.map(async (cl) => {
148+
const count = await prisma.listSubscriber.count({
149+
where: {
150+
listId: cl.listId,
151+
unsubscribedAt: null,
152+
},
153+
})
154+
155+
return {
156+
...cl,
157+
List: {
158+
...cl.List,
159+
_count: {
160+
ListSubscribers: count,
161+
},
162+
},
163+
}
164+
})
165+
),
166+
// Add the unique subscriber count directly to the campaign object
167+
uniqueRecipientCount: listSubscribers.length,
168+
}
169+
142170
const promises = {
143171
totalMessages: prisma.message.count({
144172
where: {
@@ -198,7 +226,7 @@ export const getCampaign = authProcedure
198226
const result = await resolveProps(promises)
199227

200228
return {
201-
campaign,
229+
campaign: campaignWithCounts,
202230
stats: {
203231
totalMessages: result.totalMessages,
204232
queuedMessages: result.queuedMessages,

apps/backend/src/cron/cron.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import cron from "node-cron"
22
import { sendMessagesCron } from "./sendMessages"
33
import { dailyMaintenanceCron } from "./dailyMaintenance"
4+
import { processQueuedCampaigns } from "./processQueuedCampaigns"
45

56
type CronJob = {
67
name: string
@@ -23,7 +24,18 @@ const dailyMaintenanceJob: CronJob = {
2324
enabled: true,
2425
}
2526

26-
const cronJobs: CronJob[] = [sendMessagesJob, dailyMaintenanceJob]
27+
const processQueuedCampaignsJob: CronJob = {
28+
name: "process-queued-campaigns",
29+
schedule: "* * * * * *", // Runs every second
30+
job: processQueuedCampaigns,
31+
enabled: true,
32+
}
33+
34+
const cronJobs: CronJob[] = [
35+
sendMessagesJob,
36+
dailyMaintenanceJob,
37+
processQueuedCampaignsJob,
38+
]
2739

2840
export const initializeCronJobs = () => {
2941
const scheduledJobs = cronJobs

0 commit comments

Comments
 (0)