feat(jobs): jobs repo (list/getById/retry/remove)
Unifies pgboss.job (current, per-queue partitioned) and pgboss.archive under one SELECT for operator views. retry promotes archived rows back into the active partition. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
73
lib/db/repos/jobs.js
Normal file
73
lib/db/repos/jobs.js
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
import { pool } from '../pool.js';
|
||||||
|
|
||||||
|
// pg-boss v10 stores jobs in pgboss.job (current, partitioned per queue)
|
||||||
|
// and pgboss.archive (finished + cleaned). Unify them for list/get views.
|
||||||
|
|
||||||
|
const SELECT_COLS = `
|
||||||
|
id, name, state, data, output, retry_count AS retrycount,
|
||||||
|
created_on AS created_on, started_on AS started_on, completed_on AS completed_on
|
||||||
|
`;
|
||||||
|
|
||||||
|
export async function list({ state, name, limit = 50 } = {}) {
|
||||||
|
const where = [];
|
||||||
|
const args = [];
|
||||||
|
let i = 1;
|
||||||
|
if (state) { where.push(`state=$${i++}`); args.push(state); }
|
||||||
|
if (name) { where.push(`name=$${i++}`); args.push(name); }
|
||||||
|
args.push(limit);
|
||||||
|
const w = where.length ? `WHERE ${where.join(' AND ')}` : '';
|
||||||
|
const sql = `
|
||||||
|
SELECT * FROM (
|
||||||
|
SELECT ${SELECT_COLS} FROM pgboss.job
|
||||||
|
UNION ALL
|
||||||
|
SELECT ${SELECT_COLS} FROM pgboss.archive
|
||||||
|
) u ${w}
|
||||||
|
ORDER BY created_on DESC
|
||||||
|
LIMIT $${i}
|
||||||
|
`;
|
||||||
|
const { rows } = await pool.query(sql, args);
|
||||||
|
return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getById(id) {
|
||||||
|
const sql = `
|
||||||
|
SELECT ${SELECT_COLS} FROM pgboss.job WHERE id=$1
|
||||||
|
UNION ALL
|
||||||
|
SELECT ${SELECT_COLS} FROM pgboss.archive WHERE id=$1
|
||||||
|
LIMIT 1
|
||||||
|
`;
|
||||||
|
const { rows: [r] } = await pool.query(sql, [id]);
|
||||||
|
return r ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function retry(id) {
|
||||||
|
// pg-boss v10 has no public resubmit. Update state in the partition
|
||||||
|
// table; the poller picks it up.
|
||||||
|
const { rows: [r] } = await pool.query(
|
||||||
|
`UPDATE pgboss.job SET state='retry', retry_count=retry_count+1
|
||||||
|
WHERE id=$1 RETURNING ${SELECT_COLS}`,
|
||||||
|
[id]
|
||||||
|
);
|
||||||
|
if (r) return r;
|
||||||
|
// Fall back: copy from archive into the partition for the same name.
|
||||||
|
const { rows: [a] } = await pool.query(
|
||||||
|
`WITH src AS (DELETE FROM pgboss.archive WHERE id=$1 RETURNING *)
|
||||||
|
INSERT INTO pgboss.job (id, name, priority, data, state, retry_limit, retry_count,
|
||||||
|
retry_delay, retry_backoff, start_after, started_on,
|
||||||
|
singleton_key, singleton_on, expire_in, created_on,
|
||||||
|
completed_on, keep_until, output, dead_letter, policy)
|
||||||
|
SELECT id, name, priority, data, 'retry', retry_limit, retry_count + 1,
|
||||||
|
retry_delay, retry_backoff, now(), NULL,
|
||||||
|
singleton_key, singleton_on, expire_in, now(),
|
||||||
|
NULL, keep_until, output, dead_letter, policy
|
||||||
|
FROM src
|
||||||
|
RETURNING ${SELECT_COLS}`,
|
||||||
|
[id]
|
||||||
|
);
|
||||||
|
return a ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function remove(id) {
|
||||||
|
await pool.query(`DELETE FROM pgboss.job WHERE id=$1`, [id]);
|
||||||
|
await pool.query(`DELETE FROM pgboss.archive WHERE id=$1`, [id]);
|
||||||
|
}
|
||||||
44
tests/repos/jobs.test.js
Normal file
44
tests/repos/jobs.test.js
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||||
|
import { resetDb } from '../helpers/db.js';
|
||||||
|
import { migrateUp } from '../../lib/db/migrate.js';
|
||||||
|
import { stopBoss, waitForJob } from '../helpers/boss.js';
|
||||||
|
import { pool } from '../../lib/db/pool.js';
|
||||||
|
import * as queue from '../../lib/jobs/queue.js';
|
||||||
|
import { registerWorkers } from '../../lib/jobs/index.js';
|
||||||
|
import * as jobs from '../../lib/db/repos/jobs.js';
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await resetDb(); await migrateUp();
|
||||||
|
await queue.start();
|
||||||
|
await registerWorkers();
|
||||||
|
});
|
||||||
|
afterEach(async () => { await stopBoss(); });
|
||||||
|
|
||||||
|
describe('jobs repo', () => {
|
||||||
|
it('list returns recent jobs across states', async () => {
|
||||||
|
const id = await queue.enqueue('echo', { ping: 1 });
|
||||||
|
await waitForJob('echo', id);
|
||||||
|
const rows = await jobs.list({ limit: 10 });
|
||||||
|
expect(rows.find(r => r.id === id)).toBeTruthy();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('getById returns null on unknown id', async () => {
|
||||||
|
expect(await jobs.getById('00000000-0000-0000-0000-000000000000')).toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('retry resubmits a failed job', async () => {
|
||||||
|
const id = await queue.enqueue('echo', { ping: 'x' });
|
||||||
|
await waitForJob('echo', id);
|
||||||
|
// mark it failed directly so retry has something to flip.
|
||||||
|
await pool.query(`UPDATE pgboss.job SET state='failed' WHERE id=$1`, [id]);
|
||||||
|
const out = await jobs.retry(id);
|
||||||
|
expect(out?.state).toBe('retry');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('remove deletes by id', async () => {
|
||||||
|
const id = await queue.enqueue('echo', { ping: 'rm' });
|
||||||
|
await waitForJob('echo', id);
|
||||||
|
await jobs.remove(id);
|
||||||
|
expect(await jobs.getById(id)).toBeNull();
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user