From 57efa4cbaa3511f2ff4387e0aa6a03bf77ec9916 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 03:29:03 +1000 Subject: [PATCH] feat(jobs): jobs repo (list/getById/retry/remove) Unifies pgboss.job (current, per-queue partitioned) and pgboss.archive under one SELECT for operator views. retry promotes archived rows back into the active partition. Co-Authored-By: Claude Opus 4.7 --- lib/db/repos/jobs.js | 73 ++++++++++++++++++++++++++++++++++++++++ tests/repos/jobs.test.js | 44 ++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 lib/db/repos/jobs.js create mode 100644 tests/repos/jobs.test.js diff --git a/lib/db/repos/jobs.js b/lib/db/repos/jobs.js new file mode 100644 index 0000000..1f96b0a --- /dev/null +++ b/lib/db/repos/jobs.js @@ -0,0 +1,73 @@ +import { pool } from '../pool.js'; + +// pg-boss v10 stores jobs in pgboss.job (current, partitioned per queue) +// and pgboss.archive (finished + cleaned). Unify them for list/get views. + +const SELECT_COLS = ` + id, name, state, data, output, retry_count AS retrycount, + created_on AS created_on, started_on AS started_on, completed_on AS completed_on +`; + +export async function list({ state, name, limit = 50 } = {}) { + const where = []; + const args = []; + let i = 1; + if (state) { where.push(`state=$${i++}`); args.push(state); } + if (name) { where.push(`name=$${i++}`); args.push(name); } + args.push(limit); + const w = where.length ? `WHERE ${where.join(' AND ')}` : ''; + const sql = ` + SELECT * FROM ( + SELECT ${SELECT_COLS} FROM pgboss.job + UNION ALL + SELECT ${SELECT_COLS} FROM pgboss.archive + ) u ${w} + ORDER BY created_on DESC + LIMIT $${i} + `; + const { rows } = await pool.query(sql, args); + return rows; +} + +export async function getById(id) { + const sql = ` + SELECT ${SELECT_COLS} FROM pgboss.job WHERE id=$1 + UNION ALL + SELECT ${SELECT_COLS} FROM pgboss.archive WHERE id=$1 + LIMIT 1 + `; + const { rows: [r] } = await pool.query(sql, [id]); + return r ?? null; +} + +export async function retry(id) { + // pg-boss v10 has no public resubmit. Update state in the partition + // table; the poller picks it up. + const { rows: [r] } = await pool.query( + `UPDATE pgboss.job SET state='retry', retry_count=retry_count+1 + WHERE id=$1 RETURNING ${SELECT_COLS}`, + [id] + ); + if (r) return r; + // Fall back: copy from archive into the partition for the same name. + const { rows: [a] } = await pool.query( + `WITH src AS (DELETE FROM pgboss.archive WHERE id=$1 RETURNING *) + INSERT INTO pgboss.job (id, name, priority, data, state, retry_limit, retry_count, + retry_delay, retry_backoff, start_after, started_on, + singleton_key, singleton_on, expire_in, created_on, + completed_on, keep_until, output, dead_letter, policy) + SELECT id, name, priority, data, 'retry', retry_limit, retry_count + 1, + retry_delay, retry_backoff, now(), NULL, + singleton_key, singleton_on, expire_in, now(), + NULL, keep_until, output, dead_letter, policy + FROM src + RETURNING ${SELECT_COLS}`, + [id] + ); + return a ?? null; +} + +export async function remove(id) { + await pool.query(`DELETE FROM pgboss.job WHERE id=$1`, [id]); + await pool.query(`DELETE FROM pgboss.archive WHERE id=$1`, [id]); +} diff --git a/tests/repos/jobs.test.js b/tests/repos/jobs.test.js new file mode 100644 index 0000000..98c7810 --- /dev/null +++ b/tests/repos/jobs.test.js @@ -0,0 +1,44 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../helpers/db.js'; +import { migrateUp } from '../../lib/db/migrate.js'; +import { stopBoss, waitForJob } from '../helpers/boss.js'; +import { pool } from '../../lib/db/pool.js'; +import * as queue from '../../lib/jobs/queue.js'; +import { registerWorkers } from '../../lib/jobs/index.js'; +import * as jobs from '../../lib/db/repos/jobs.js'; + +beforeEach(async () => { + await resetDb(); await migrateUp(); + await queue.start(); + await registerWorkers(); +}); +afterEach(async () => { await stopBoss(); }); + +describe('jobs repo', () => { + it('list returns recent jobs across states', async () => { + const id = await queue.enqueue('echo', { ping: 1 }); + await waitForJob('echo', id); + const rows = await jobs.list({ limit: 10 }); + expect(rows.find(r => r.id === id)).toBeTruthy(); + }); + + it('getById returns null on unknown id', async () => { + expect(await jobs.getById('00000000-0000-0000-0000-000000000000')).toBeNull(); + }); + + it('retry resubmits a failed job', async () => { + const id = await queue.enqueue('echo', { ping: 'x' }); + await waitForJob('echo', id); + // mark it failed directly so retry has something to flip. + await pool.query(`UPDATE pgboss.job SET state='failed' WHERE id=$1`, [id]); + const out = await jobs.retry(id); + expect(out?.state).toBe('retry'); + }); + + it('remove deletes by id', async () => { + const id = await queue.enqueue('echo', { ping: 'rm' }); + await waitForJob('echo', id); + await jobs.remove(id); + expect(await jobs.getById(id)).toBeNull(); + }); +});