import { pool } from '../pool.js'; // pg-boss v10 stores jobs in pgboss.job (current, partitioned per queue) // and pgboss.archive (finished + cleaned). Unify them for list/get views. const SELECT_COLS = ` id, name, state, data, output, retry_count AS retrycount, created_on AS created_on, started_on AS started_on, completed_on AS completed_on `; export async function list({ state, name, limit = 50 } = {}) { const where = []; const args = []; let i = 1; if (state) { where.push(`state=$${i++}`); args.push(state); } if (name) { where.push(`name=$${i++}`); args.push(name); } args.push(limit); const w = where.length ? `WHERE ${where.join(' AND ')}` : ''; const sql = ` SELECT * FROM ( SELECT ${SELECT_COLS} FROM pgboss.job UNION ALL SELECT ${SELECT_COLS} FROM pgboss.archive ) u ${w} ORDER BY created_on DESC LIMIT $${i} `; const { rows } = await pool.query(sql, args); return rows; } export async function getById(id) { const sql = ` SELECT ${SELECT_COLS} FROM pgboss.job WHERE id=$1 UNION ALL SELECT ${SELECT_COLS} FROM pgboss.archive WHERE id=$1 LIMIT 1 `; const { rows: [r] } = await pool.query(sql, [id]); return r ?? null; } export async function retry(id) { // pg-boss v10 has no public resubmit. Update state in the partition // table; the poller picks it up. const { rows: [r] } = await pool.query( `UPDATE pgboss.job SET state='retry', retry_count=retry_count+1 WHERE id=$1 RETURNING ${SELECT_COLS}`, [id] ); if (r) return r; // Fall back: copy from archive into the partition for the same name. const { rows: [a] } = await pool.query( `WITH src AS (DELETE FROM pgboss.archive WHERE id=$1 RETURNING *) INSERT INTO pgboss.job (id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff, start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on, keep_until, output, dead_letter, policy) SELECT id, name, priority, data, 'retry', retry_limit, retry_count + 1, retry_delay, retry_backoff, now(), NULL, singleton_key, singleton_on, expire_in, now(), NULL, keep_until, output, dead_letter, policy FROM src RETURNING ${SELECT_COLS}`, [id] ); return a ?? null; } export async function remove(id) { await pool.query(`DELETE FROM pgboss.job WHERE id=$1`, [id]); await pool.query(`DELETE FROM pgboss.archive WHERE id=$1`, [id]); }