import PgBoss from 'pg-boss'; import { log } from '../log.js'; let boss = null; const ensuring = new Map(); // name → Promise resolving when the queue is created export async function start() { if (boss) return boss; boss = new PgBoss({ connectionString: process.env.DATABASE_URL, newJobCheckIntervalSeconds: 2, archiveCompletedAfterSeconds: 86_400, deleteAfterDays: 7 }); boss.on('error', err => log.error({ err }, 'pg-boss error')); await boss.start(); return boss; } export async function stop() { if (!boss) return; try { await boss.stop({ graceful: true, timeout: 5_000 }); } catch (e) { log.warn({ err: e }, 'pg-boss stop'); } boss = null; ensuring.clear(); } function ensureQueue(name) { if (!ensuring.has(name)) { ensuring.set(name, boss.createQueue(name).catch(err => { ensuring.delete(name); throw err; })); } return ensuring.get(name); } export async function enqueue(name, data, opts = {}) { if (!boss) throw new Error('queue not started'); await ensureQueue(name); return await boss.send(name, data, opts); } export async function subscribe(name, handler, opts = {}) { if (!boss) throw new Error('queue not started'); await ensureQueue(name); return await boss.work(name, opts, async (jobs) => { const job = Array.isArray(jobs) ? jobs[0] : jobs; return handler(job); }); } export function instance() { return boss; }