Skip to content

Commit dacbfe1

Browse files
committed
fixed job svc, work again after upgrade to v5
1 parent b2daeef commit dacbfe1

12 files changed

Lines changed: 2921 additions & 8775 deletions

File tree

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
ARG BASE=node
2-
ARG BASE_VERSION=20-bookworm
2+
ARG BASE_VERSION=22-bookworm
33
FROM ${BASE}:${BASE_VERSION} AS build
44

55
LABEL org.opencontainers.image.source="https://github.com/constructive-io/constructive"

docker-compose.jobs.yml

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,25 @@ services:
7272
aliases:
7373
- constructive-server
7474

75+
# Local SMTP catch-all with web UI (http://localhost:8025)
76+
mailpit:
77+
container_name: mailpit
78+
image: axllent/mailpit:latest
79+
ports:
80+
- "1025:1025" # SMTP
81+
- "8025:8025" # Web UI
82+
networks:
83+
constructive-net:
84+
aliases:
85+
- mailpit
86+
7587
# Send email link function (invite, password reset, verification)
7688
send-email-link:
7789
container_name: send-email-link
7890
image: constructive:dev
7991
entrypoint: ["node", "functions/send-email-link/dist/index.js"]
92+
depends_on:
93+
- mailpit
8094
environment:
8195
NODE_ENV: development
8296
LOG_LEVEL: info
@@ -94,11 +108,15 @@ services:
94108
MAILGUN_DOMAIN: "mg.constructive.io"
95109
MAILGUN_FROM: "no-reply@mg.constructive.io"
96110
MAILGUN_REPLY: "info@mg.constructive.io"
97-
# Local dashboard port for generated links, used only for
98-
# localhost-style hosts in DRY RUN mode:
99-
# http://localhost:LOCAL_APP_PORT/...
100-
LOCAL_APP_PORT: "3000"
101-
SEND_EMAIL_LINK_DRY_RUN: "${SEND_EMAIL_LINK_DRY_RUN:-true}"
111+
# SMTP configuration (simple-smtp-server / nodemailer)
112+
EMAIL_SEND_USE_SMTP: "true"
113+
SMTP_HOST: "mailpit"
114+
SMTP_PORT: "1025"
115+
SMTP_SECURE: "false"
116+
SMTP_FROM: "no-reply@constructive.local"
117+
SMTP_TLS_REJECT_UNAUTHORIZED: "false"
118+
# Disable dry run so emails are actually sent to Mailpit
119+
SEND_EMAIL_LINK_DRY_RUN: "false"
102120
ports:
103121
- "8082:8080"
104122
networks:

jobs/job-scheduler/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@
3939
"@constructive-io/job-pg": "workspace:^",
4040
"@constructive-io/job-utils": "workspace:^",
4141
"@pgpmjs/logger": "workspace:^",
42-
"node-schedule": "1.3.2"
42+
"node-schedule": "^2.1.1"
4343
}
4444
}

jobs/job-scheduler/src/index.ts

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -199,59 +199,55 @@ export default class Scheduler {
199199
}
200200
}
201201
}
202-
listen() {
202+
async listen(): Promise<void> {
203203
if (this.stopped) return;
204-
const listenForChanges = (
205-
err: Error | null,
206-
client: PoolClient,
207-
release: () => void
208-
) => {
209-
if (err) {
210-
log.error('Error connecting with notify listener', err);
211-
if (err instanceof Error && err.stack) {
212-
log.debug(err.stack);
213-
}
214-
// Try again in 5 seconds
215-
// should this really be done in the node process?
216-
if (!this.stopped) {
217-
setTimeout(this.listen, 5000);
218-
}
219-
return;
204+
let client: PoolClient;
205+
let release: () => void;
206+
try {
207+
client = await this.pgPool.connect();
208+
release = () => client.release();
209+
} catch (err) {
210+
log.error('Error connecting with notify listener', err);
211+
if (err instanceof Error && err.stack) {
212+
log.debug(err.stack);
213+
}
214+
if (!this.stopped) {
215+
setTimeout(() => this.listen(), 5000);
220216
}
217+
return;
218+
}
219+
if (this.stopped) {
220+
release();
221+
return;
222+
}
223+
this.listenClient = client;
224+
this.listenRelease = release;
225+
client.on('notification', () => {
226+
log.info('a NEW scheduled JOB!');
227+
if (this.doNextTimer) {
228+
// Must be idle, do something!
229+
this.doNext(client);
230+
}
231+
});
232+
client.query('LISTEN "scheduled_jobs:insert"');
233+
client.on('error', (e: unknown) => {
221234
if (this.stopped) {
222235
release();
223236
return;
224237
}
225-
this.listenClient = client;
226-
this.listenRelease = release;
227-
client.on('notification', () => {
228-
log.info('a NEW scheduled JOB!');
229-
if (this.doNextTimer) {
230-
// Must be idle, do something!
231-
this.doNext(client);
232-
}
233-
});
234-
client.query('LISTEN "scheduled_jobs:insert"');
235-
client.on('error', (e: unknown) => {
236-
if (this.stopped) {
237-
release();
238-
return;
239-
}
240-
log.error('Error with database notify listener', e);
241-
if (e instanceof Error && e.stack) {
242-
log.debug(e.stack);
243-
}
244-
release();
245-
if (!this.stopped) {
246-
this.listen();
247-
}
248-
});
249-
log.info(
250-
`${this.workerId} connected and looking for scheduled jobs...`
251-
);
252-
this.doNext(client);
253-
};
254-
this.pgPool.connect(listenForChanges);
238+
log.error('Error with database notify listener', e);
239+
if (e instanceof Error && e.stack) {
240+
log.debug(e.stack);
241+
}
242+
release();
243+
if (!this.stopped) {
244+
this.listen();
245+
}
246+
});
247+
log.info(
248+
`${this.workerId} connected and looking for scheduled jobs...`
249+
);
250+
this.doNext(client);
255251
}
256252

257253
async stop(): Promise<void> {

jobs/knative-job-fn/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
},
3838
"dependencies": {
3939
"@pgpmjs/logger": "workspace:^",
40-
"body-parser": "1.19.0",
4140
"express": "5.2.1"
4241
}
4342
}

jobs/knative-job-fn/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import express from 'express';
2-
import bodyParser from 'body-parser';
32
import http from 'node:http';
43
import https from 'node:https';
54
import { URL } from 'node:url';
@@ -131,7 +130,7 @@ const logger = createLogger('knative-job-fn');
131130
const createJobApp = () => {
132131
const app: any = express();
133132

134-
app.use(bodyParser.json());
133+
app.use(express.json());
135134

136135
// Basic request logging for all incoming job invocations.
137136
app.use((req: any, res: any, next: any) => {

jobs/knative-job-server/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"@constructive-io/job-pg": "workspace:^",
4343
"@constructive-io/job-utils": "workspace:^",
4444
"@pgpmjs/logger": "workspace:^",
45-
"body-parser": "1.19.0",
4645
"express": "5.2.1",
4746
"pg": "8.17.1"
4847
}

jobs/knative-job-server/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import express from 'express';
2-
import bodyParser from 'body-parser';
32
import type { Pool, PoolClient } from 'pg';
43
import * as jobs from '@constructive-io/job-utils';
54
import poolManager from '@constructive-io/job-pg';
@@ -39,7 +38,7 @@ const logger = createLogger('knative-job-server');
3938

4039
export default (pgPool: Pool = poolManager.getPool()) => {
4140
const app = express();
42-
app.use(bodyParser.json());
41+
app.use(express.json());
4342

4443
const withClient =
4544
(cb: WithClientHandler) =>

jobs/knative-job-worker/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@
4040
"@constructive-io/job-pg": "workspace:^",
4141
"@constructive-io/job-utils": "workspace:^",
4242
"@pgpmjs/logger": "workspace:^",
43-
"pg": "8.17.1",
44-
"request": "2.88.2"
43+
"pg": "8.17.1"
4544
},
4645
"devDependencies": {
4746
"@pgpm/database-jobs": "^0.16.0",

jobs/knative-job-worker/src/index.ts

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -183,56 +183,52 @@ export default class Worker {
183183
}
184184
}
185185
}
186-
listen() {
186+
async listen(): Promise<void> {
187187
if (this.stopped) return;
188-
const listenForChanges = (
189-
err: Error | null,
190-
client: PoolClient,
191-
release: () => void
192-
) => {
193-
if (err) {
194-
log.error('Error connecting with notify listener', err);
195-
if (err instanceof Error && err.stack) {
196-
log.debug(err.stack);
197-
}
198-
// Try again in 5 seconds
199-
// should this really be done in the node process?
200-
if (!this.stopped) {
201-
setTimeout(this.listen, 5000);
202-
}
203-
return;
188+
let client: PoolClient;
189+
let release: () => void;
190+
try {
191+
client = await this.pgPool.connect();
192+
release = () => client.release();
193+
} catch (err) {
194+
log.error('Error connecting with notify listener', err);
195+
if (err instanceof Error && err.stack) {
196+
log.debug(err.stack);
197+
}
198+
if (!this.stopped) {
199+
setTimeout(() => this.listen(), 5000);
200+
}
201+
return;
202+
}
203+
if (this.stopped) {
204+
release();
205+
return;
206+
}
207+
this.listenClient = client;
208+
this.listenRelease = release;
209+
client.on('notification', () => {
210+
if (this.doNextTimer) {
211+
// Must be idle, do something!
212+
this.doNext(client);
204213
}
214+
});
215+
client.query('LISTEN "jobs:insert"');
216+
client.on('error', (e: unknown) => {
205217
if (this.stopped) {
206218
release();
207219
return;
208220
}
209-
this.listenClient = client;
210-
this.listenRelease = release;
211-
client.on('notification', () => {
212-
if (this.doNextTimer) {
213-
// Must be idle, do something!
214-
this.doNext(client);
215-
}
216-
});
217-
client.query('LISTEN "jobs:insert"');
218-
client.on('error', (e: unknown) => {
219-
if (this.stopped) {
220-
release();
221-
return;
222-
}
223-
log.error('Error with database notify listener', e);
224-
if (e instanceof Error && e.stack) {
225-
log.debug(e.stack);
226-
}
227-
release();
228-
if (!this.stopped) {
229-
this.listen();
230-
}
231-
});
232-
log.info(`${this.workerId} connected and looking for jobs...`);
233-
this.doNext(client);
234-
};
235-
this.pgPool.connect(listenForChanges);
221+
log.error('Error with database notify listener', e);
222+
if (e instanceof Error && e.stack) {
223+
log.debug(e.stack);
224+
}
225+
release();
226+
if (!this.stopped) {
227+
this.listen();
228+
}
229+
});
230+
log.info(`${this.workerId} connected and looking for jobs...`);
231+
this.doNext(client);
236232
}
237233

238234
async stop(): Promise<void> {

0 commit comments

Comments
 (0)