From 17a13dddb8a1e2e85e8a7f32dfe431b69d706d3d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 03:26:37 +1000 Subject: [PATCH] feat(jobs): pg-boss singleton client Per-name ensureQueue promise dedup so concurrent enqueue+subscribe on the same queue do not race createQueue (Postgres deadlock). Co-Authored-By: Claude Opus 4.7 --- lib/jobs/queue.js | 53 ++++++++++++++++++++++++++++++++++++++++ tests/helpers/boss.js | 23 +++++++++++++++++ tests/jobs/queue.test.js | 21 ++++++++++++++++ 3 files changed, 97 insertions(+) create mode 100644 lib/jobs/queue.js create mode 100644 tests/helpers/boss.js create mode 100644 tests/jobs/queue.test.js diff --git a/lib/jobs/queue.js b/lib/jobs/queue.js new file mode 100644 index 0000000..0ff80b2 --- /dev/null +++ b/lib/jobs/queue.js @@ -0,0 +1,53 @@ +import PgBoss from 'pg-boss'; +import { log } from '../log.js'; + +let boss = null; +const ensuring = new Map(); // name → Promise resolving when the queue is created + +export async function start() { + if (boss) return boss; + boss = new PgBoss({ + connectionString: process.env.DATABASE_URL, + newJobCheckIntervalSeconds: 2, + archiveCompletedAfterSeconds: 86_400, + deleteAfterDays: 7 + }); + boss.on('error', err => log.error({ err }, 'pg-boss error')); + await boss.start(); + return boss; +} + +export async function stop() { + if (!boss) return; + try { await boss.stop({ graceful: true, timeout: 5_000 }); } + catch (e) { log.warn({ err: e }, 'pg-boss stop'); } + boss = null; + ensuring.clear(); +} + +function ensureQueue(name) { + if (!ensuring.has(name)) { + ensuring.set(name, boss.createQueue(name).catch(err => { + ensuring.delete(name); + throw err; + })); + } + return ensuring.get(name); +} + +export async function enqueue(name, data, opts = {}) { + if (!boss) throw new Error('queue not started'); + await ensureQueue(name); + return await boss.send(name, data, opts); +} + +export async function subscribe(name, handler, opts = {}) { + if (!boss) throw new Error('queue not started'); + await ensureQueue(name); + return await boss.work(name, opts, async (jobs) => { + const job = Array.isArray(jobs) ? jobs[0] : jobs; + return handler(job); + }); +} + +export function instance() { return boss; } diff --git a/tests/helpers/boss.js b/tests/helpers/boss.js new file mode 100644 index 0000000..1d630b7 --- /dev/null +++ b/tests/helpers/boss.js @@ -0,0 +1,23 @@ +import * as queue from '../../lib/jobs/queue.js'; +import { pool } from '../../lib/db/pool.js'; + +export async function stopBoss() { + try { await queue.stop(); } catch { /* ignore */ } + try { await pool.query('DROP SCHEMA IF EXISTS pgboss CASCADE'); } catch { /* ignore */ } +} + +export async function waitForJob(id, { timeoutMs = 5_000 } = {}) { + const boss = queue.instance(); + if (!boss) throw new Error('queue not started'); + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + const j = await boss.getJobById(id); + if (!j) { + await new Promise(r => setTimeout(r, 50)); + continue; + } + if (['completed','failed','cancelled','expired'].includes(j.state)) return j; + await new Promise(r => setTimeout(r, 50)); + } + throw new Error(`job ${id} did not finish in ${timeoutMs} ms`); +} diff --git a/tests/jobs/queue.test.js b/tests/jobs/queue.test.js new file mode 100644 index 0000000..ff6d7ea --- /dev/null +++ b/tests/jobs/queue.test.js @@ -0,0 +1,21 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../helpers/db.js'; +import { migrateUp } from '../../lib/db/migrate.js'; +import { stopBoss } from '../helpers/boss.js'; +import * as queue from '../../lib/jobs/queue.js'; + +beforeEach(async () => { await resetDb(); await migrateUp(); }); +afterEach(async () => { await stopBoss(); }); + +describe('jobs/queue', () => { + it('starts, enqueues, and a worker receives the job', async () => { + await queue.start(); + const received = new Promise(resolve => { + queue.subscribe('echo-q', async job => { resolve(job.data); }); + }); + const jobId = await queue.enqueue('echo-q', { hello: 'void' }); + expect(jobId).toBeTruthy(); + const data = await received; + expect(data).toEqual({ hello: 'void' }); + }); +});