Skip to content

Commit db546ec

Browse files
authored
Merge pull request #719 from constructive-io/fix/job-upgrade-v5
fixed job svc, work again after upgrade to v5
2 parents b2daeef + 9558b6b commit db546ec

11 files changed

Lines changed: 2898 additions & 8770 deletions

File tree

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
ARG BASE=node
2-
ARG BASE_VERSION=20-bookworm
2+
ARG BASE_VERSION=22-bookworm
33
FROM ${BASE}:${BASE_VERSION} AS build
44

55
LABEL org.opencontainers.image.source="https://github.com/constructive-io/constructive"

jobs/job-scheduler/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@
3939
"@constructive-io/job-pg": "workspace:^",
4040
"@constructive-io/job-utils": "workspace:^",
4141
"@pgpmjs/logger": "workspace:^",
42-
"node-schedule": "1.3.2"
42+
"node-schedule": "^2.1.1"
4343
}
4444
}

jobs/job-scheduler/src/index.ts

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -199,59 +199,55 @@ export default class Scheduler {
199199
}
200200
}
201201
}
202-
listen() {
202+
async listen(): Promise<void> {
203203
if (this.stopped) return;
204-
const listenForChanges = (
205-
err: Error | null,
206-
client: PoolClient,
207-
release: () => void
208-
) => {
209-
if (err) {
210-
log.error('Error connecting with notify listener', err);
211-
if (err instanceof Error && err.stack) {
212-
log.debug(err.stack);
213-
}
214-
// Try again in 5 seconds
215-
// should this really be done in the node process?
216-
if (!this.stopped) {
217-
setTimeout(this.listen, 5000);
218-
}
219-
return;
204+
let client: PoolClient;
205+
let release: () => void;
206+
try {
207+
client = await this.pgPool.connect();
208+
release = () => client.release();
209+
} catch (err) {
210+
log.error('Error connecting with notify listener', err);
211+
if (err instanceof Error && err.stack) {
212+
log.debug(err.stack);
213+
}
214+
if (!this.stopped) {
215+
setTimeout(() => this.listen(), 5000);
220216
}
217+
return;
218+
}
219+
if (this.stopped) {
220+
release();
221+
return;
222+
}
223+
this.listenClient = client;
224+
this.listenRelease = release;
225+
client.on('notification', () => {
226+
log.info('a NEW scheduled JOB!');
227+
if (this.doNextTimer) {
228+
// Must be idle, do something!
229+
this.doNext(client);
230+
}
231+
});
232+
client.query('LISTEN "scheduled_jobs:insert"');
233+
client.on('error', (e: unknown) => {
221234
if (this.stopped) {
222235
release();
223236
return;
224237
}
225-
this.listenClient = client;
226-
this.listenRelease = release;
227-
client.on('notification', () => {
228-
log.info('a NEW scheduled JOB!');
229-
if (this.doNextTimer) {
230-
// Must be idle, do something!
231-
this.doNext(client);
232-
}
233-
});
234-
client.query('LISTEN "scheduled_jobs:insert"');
235-
client.on('error', (e: unknown) => {
236-
if (this.stopped) {
237-
release();
238-
return;
239-
}
240-
log.error('Error with database notify listener', e);
241-
if (e instanceof Error && e.stack) {
242-
log.debug(e.stack);
243-
}
244-
release();
245-
if (!this.stopped) {
246-
this.listen();
247-
}
248-
});
249-
log.info(
250-
`${this.workerId} connected and looking for scheduled jobs...`
251-
);
252-
this.doNext(client);
253-
};
254-
this.pgPool.connect(listenForChanges);
238+
log.error('Error with database notify listener', e);
239+
if (e instanceof Error && e.stack) {
240+
log.debug(e.stack);
241+
}
242+
release();
243+
if (!this.stopped) {
244+
this.listen();
245+
}
246+
});
247+
log.info(
248+
`${this.workerId} connected and looking for scheduled jobs...`
249+
);
250+
this.doNext(client);
255251
}
256252

257253
async stop(): Promise<void> {

jobs/knative-job-fn/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
},
3838
"dependencies": {
3939
"@pgpmjs/logger": "workspace:^",
40-
"body-parser": "1.19.0",
4140
"express": "5.2.1"
4241
}
4342
}

jobs/knative-job-fn/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import express from 'express';
2-
import bodyParser from 'body-parser';
32
import http from 'node:http';
43
import https from 'node:https';
54
import { URL } from 'node:url';
@@ -131,7 +130,7 @@ const logger = createLogger('knative-job-fn');
131130
const createJobApp = () => {
132131
const app: any = express();
133132

134-
app.use(bodyParser.json());
133+
app.use(express.json());
135134

136135
// Basic request logging for all incoming job invocations.
137136
app.use((req: any, res: any, next: any) => {

jobs/knative-job-server/package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
"@constructive-io/job-pg": "workspace:^",
4343
"@constructive-io/job-utils": "workspace:^",
4444
"@pgpmjs/logger": "workspace:^",
45-
"body-parser": "1.19.0",
4645
"express": "5.2.1",
4746
"pg": "8.17.1"
4847
}

jobs/knative-job-server/src/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import express from 'express';
2-
import bodyParser from 'body-parser';
32
import type { Pool, PoolClient } from 'pg';
43
import * as jobs from '@constructive-io/job-utils';
54
import poolManager from '@constructive-io/job-pg';
@@ -39,7 +38,7 @@ const logger = createLogger('knative-job-server');
3938

4039
export default (pgPool: Pool = poolManager.getPool()) => {
4140
const app = express();
42-
app.use(bodyParser.json());
41+
app.use(express.json());
4342

4443
const withClient =
4544
(cb: WithClientHandler) =>

jobs/knative-job-worker/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@
4040
"@constructive-io/job-pg": "workspace:^",
4141
"@constructive-io/job-utils": "workspace:^",
4242
"@pgpmjs/logger": "workspace:^",
43-
"pg": "8.17.1",
44-
"request": "2.88.2"
43+
"pg": "8.17.1"
4544
},
4645
"devDependencies": {
4746
"@pgpm/database-jobs": "^0.16.0",

jobs/knative-job-worker/src/index.ts

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -183,56 +183,52 @@ export default class Worker {
183183
}
184184
}
185185
}
186-
listen() {
186+
async listen(): Promise<void> {
187187
if (this.stopped) return;
188-
const listenForChanges = (
189-
err: Error | null,
190-
client: PoolClient,
191-
release: () => void
192-
) => {
193-
if (err) {
194-
log.error('Error connecting with notify listener', err);
195-
if (err instanceof Error && err.stack) {
196-
log.debug(err.stack);
197-
}
198-
// Try again in 5 seconds
199-
// should this really be done in the node process?
200-
if (!this.stopped) {
201-
setTimeout(this.listen, 5000);
202-
}
203-
return;
188+
let client: PoolClient;
189+
let release: () => void;
190+
try {
191+
client = await this.pgPool.connect();
192+
release = () => client.release();
193+
} catch (err) {
194+
log.error('Error connecting with notify listener', err);
195+
if (err instanceof Error && err.stack) {
196+
log.debug(err.stack);
197+
}
198+
if (!this.stopped) {
199+
setTimeout(() => this.listen(), 5000);
200+
}
201+
return;
202+
}
203+
if (this.stopped) {
204+
release();
205+
return;
206+
}
207+
this.listenClient = client;
208+
this.listenRelease = release;
209+
client.on('notification', () => {
210+
if (this.doNextTimer) {
211+
// Must be idle, do something!
212+
this.doNext(client);
204213
}
214+
});
215+
client.query('LISTEN "jobs:insert"');
216+
client.on('error', (e: unknown) => {
205217
if (this.stopped) {
206218
release();
207219
return;
208220
}
209-
this.listenClient = client;
210-
this.listenRelease = release;
211-
client.on('notification', () => {
212-
if (this.doNextTimer) {
213-
// Must be idle, do something!
214-
this.doNext(client);
215-
}
216-
});
217-
client.query('LISTEN "jobs:insert"');
218-
client.on('error', (e: unknown) => {
219-
if (this.stopped) {
220-
release();
221-
return;
222-
}
223-
log.error('Error with database notify listener', e);
224-
if (e instanceof Error && e.stack) {
225-
log.debug(e.stack);
226-
}
227-
release();
228-
if (!this.stopped) {
229-
this.listen();
230-
}
231-
});
232-
log.info(`${this.workerId} connected and looking for jobs...`);
233-
this.doNext(client);
234-
};
235-
this.pgPool.connect(listenForChanges);
221+
log.error('Error with database notify listener', e);
222+
if (e instanceof Error && e.stack) {
223+
log.debug(e.stack);
224+
}
225+
release();
226+
if (!this.stopped) {
227+
this.listen();
228+
}
229+
});
230+
log.info(`${this.workerId} connected and looking for jobs...`);
231+
this.doNext(client);
236232
}
237233

238234
async stop(): Promise<void> {

jobs/knative-job-worker/src/req.ts

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import requestLib from 'request';
1+
import http from 'node:http';
2+
import https from 'node:https';
3+
import { URL } from 'node:url';
24
import {
35
getCallbackBaseUrl,
46
getJobGatewayConfig,
@@ -47,35 +49,55 @@ const request = (
4749
databaseId
4850
});
4951
return new Promise<boolean>((resolve, reject) => {
50-
requestLib.post(
52+
let parsed: URL;
53+
try {
54+
parsed = new URL(url);
55+
} catch (e) {
56+
return reject(e);
57+
}
58+
59+
const isHttps = parsed.protocol === 'https:';
60+
const client = isHttps ? https : http;
61+
const payload = JSON.stringify(body);
62+
63+
const req = client.request(
5164
{
65+
hostname: parsed.hostname,
66+
port: parsed.port || (isHttps ? 443 : 80),
67+
path: parsed.pathname + parsed.search,
68+
method: 'POST',
5269
headers: {
5370
'Content-Type': 'application/json',
71+
'Content-Length': Buffer.byteLength(payload),
5472

5573
// these are used by job-worker/job-fn
5674
'X-Worker-Id': workerId,
57-
'X-Job-Id': jobId,
75+
'X-Job-Id': String(jobId),
5876
'X-Database-Id': databaseId,
5977

6078
// async HTTP completion callback
6179
'X-Callback-Url': completeUrl
62-
},
63-
url,
64-
json: true,
65-
body
66-
},
67-
function (error: unknown) {
68-
if (error) {
69-
log.error(`request error for job[${jobId}] fn[${fn}]`, error);
70-
if (error instanceof Error && error.stack) {
71-
log.debug(error.stack);
72-
}
73-
return reject(error);
7480
}
75-
log.debug(`request success for job[${jobId}] fn[${fn}]`);
76-
return resolve(true);
81+
},
82+
(res) => {
83+
res.on('data', () => {});
84+
res.on('end', () => {
85+
log.debug(`request success for job[${jobId}] fn[${fn}]`);
86+
resolve(true);
87+
});
7788
}
7889
);
90+
91+
req.on('error', (error) => {
92+
log.error(`request error for job[${jobId}] fn[${fn}]`, error);
93+
if (error.stack) {
94+
log.debug(error.stack);
95+
}
96+
reject(error);
97+
});
98+
99+
req.write(payload);
100+
req.end();
79101
});
80102
};
81103

0 commit comments

Comments
 (0)