feat(jobs): pg-boss singleton client
Per-name ensureQueue promise dedup so concurrent enqueue+subscribe on the same queue do not race createQueue (Postgres deadlock). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
53
lib/jobs/queue.js
Normal file
53
lib/jobs/queue.js
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import PgBoss from 'pg-boss';
|
||||||
|
import { log } from '../log.js';
|
||||||
|
|
||||||
|
let boss = null;
|
||||||
|
const ensuring = new Map(); // name → Promise resolving when the queue is created
|
||||||
|
|
||||||
|
export async function start() {
|
||||||
|
if (boss) return boss;
|
||||||
|
boss = new PgBoss({
|
||||||
|
connectionString: process.env.DATABASE_URL,
|
||||||
|
newJobCheckIntervalSeconds: 2,
|
||||||
|
archiveCompletedAfterSeconds: 86_400,
|
||||||
|
deleteAfterDays: 7
|
||||||
|
});
|
||||||
|
boss.on('error', err => log.error({ err }, 'pg-boss error'));
|
||||||
|
await boss.start();
|
||||||
|
return boss;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function stop() {
|
||||||
|
if (!boss) return;
|
||||||
|
try { await boss.stop({ graceful: true, timeout: 5_000 }); }
|
||||||
|
catch (e) { log.warn({ err: e }, 'pg-boss stop'); }
|
||||||
|
boss = null;
|
||||||
|
ensuring.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
function ensureQueue(name) {
|
||||||
|
if (!ensuring.has(name)) {
|
||||||
|
ensuring.set(name, boss.createQueue(name).catch(err => {
|
||||||
|
ensuring.delete(name);
|
||||||
|
throw err;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
return ensuring.get(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function enqueue(name, data, opts = {}) {
|
||||||
|
if (!boss) throw new Error('queue not started');
|
||||||
|
await ensureQueue(name);
|
||||||
|
return await boss.send(name, data, opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function subscribe(name, handler, opts = {}) {
|
||||||
|
if (!boss) throw new Error('queue not started');
|
||||||
|
await ensureQueue(name);
|
||||||
|
return await boss.work(name, opts, async (jobs) => {
|
||||||
|
const job = Array.isArray(jobs) ? jobs[0] : jobs;
|
||||||
|
return handler(job);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function instance() { return boss; }
|
||||||
23
tests/helpers/boss.js
Normal file
23
tests/helpers/boss.js
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
import * as queue from '../../lib/jobs/queue.js';
|
||||||
|
import { pool } from '../../lib/db/pool.js';
|
||||||
|
|
||||||
|
export async function stopBoss() {
|
||||||
|
try { await queue.stop(); } catch { /* ignore */ }
|
||||||
|
try { await pool.query('DROP SCHEMA IF EXISTS pgboss CASCADE'); } catch { /* ignore */ }
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function waitForJob(id, { timeoutMs = 5_000 } = {}) {
|
||||||
|
const boss = queue.instance();
|
||||||
|
if (!boss) throw new Error('queue not started');
|
||||||
|
const start = Date.now();
|
||||||
|
while (Date.now() - start < timeoutMs) {
|
||||||
|
const j = await boss.getJobById(id);
|
||||||
|
if (!j) {
|
||||||
|
await new Promise(r => setTimeout(r, 50));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (['completed','failed','cancelled','expired'].includes(j.state)) return j;
|
||||||
|
await new Promise(r => setTimeout(r, 50));
|
||||||
|
}
|
||||||
|
throw new Error(`job ${id} did not finish in ${timeoutMs} ms`);
|
||||||
|
}
|
||||||
21
tests/jobs/queue.test.js
Normal file
21
tests/jobs/queue.test.js
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||||
|
import { resetDb } from '../helpers/db.js';
|
||||||
|
import { migrateUp } from '../../lib/db/migrate.js';
|
||||||
|
import { stopBoss } from '../helpers/boss.js';
|
||||||
|
import * as queue from '../../lib/jobs/queue.js';
|
||||||
|
|
||||||
|
beforeEach(async () => { await resetDb(); await migrateUp(); });
|
||||||
|
afterEach(async () => { await stopBoss(); });
|
||||||
|
|
||||||
|
describe('jobs/queue', () => {
|
||||||
|
it('starts, enqueues, and a worker receives the job', async () => {
|
||||||
|
await queue.start();
|
||||||
|
const received = new Promise(resolve => {
|
||||||
|
queue.subscribe('echo-q', async job => { resolve(job.data); });
|
||||||
|
});
|
||||||
|
const jobId = await queue.enqueue('echo-q', { hello: 'void' });
|
||||||
|
expect(jobId).toBeTruthy();
|
||||||
|
const data = await received;
|
||||||
|
expect(data).toEqual({ hello: 'void' });
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user