From 53ffd705c46c5cdb8eb1068dd6b6a8ae8419905a Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 03:28:06 +1000 Subject: [PATCH] feat(jobs): echo worker + CLI bootstrap Job queue starts only in the CLI gate (not inside createApp), so tests manage their own queue lifecycle. waitForJob() takes a (name, id) pair to match pg-boss v10's getJobById signature. Co-Authored-By: Claude Opus 4.7 --- lib/jobs/index.js | 10 ++++++++++ lib/jobs/workers/echo.js | 5 +++++ server.js | 16 +++++++++++++++- tests/helpers/boss.js | 12 ++++-------- tests/jobs/workers/echo.test.js | 22 ++++++++++++++++++++++ 5 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 lib/jobs/index.js create mode 100644 lib/jobs/workers/echo.js create mode 100644 tests/jobs/workers/echo.test.js diff --git a/lib/jobs/index.js b/lib/jobs/index.js new file mode 100644 index 0000000..040ef92 --- /dev/null +++ b/lib/jobs/index.js @@ -0,0 +1,10 @@ +import * as queue from './queue.js'; +import * as echo from './workers/echo.js'; + +const WORKERS = [echo]; + +export async function registerWorkers() { + for (const w of WORKERS) { + await queue.subscribe(w.NAME, w.handler, w.opts || {}); + } +} diff --git a/lib/jobs/workers/echo.js b/lib/jobs/workers/echo.js new file mode 100644 index 0000000..f645a39 --- /dev/null +++ b/lib/jobs/workers/echo.js @@ -0,0 +1,5 @@ +export const NAME = 'echo'; + +export async function handler(job) { + return { pong: job.data?.ping ?? 0 }; +} diff --git a/server.js b/server.js index 6e83e0a..e0feaae 100644 --- a/server.js +++ b/server.js @@ -3,6 +3,8 @@ import express from 'express'; import { pool } from './lib/db/pool.js'; import { log } from './lib/log.js'; import { mountApi } from './lib/api/index.js'; +import * as queue from './lib/jobs/queue.js'; +import { registerWorkers } from './lib/jobs/index.js'; const VERSION = '2.0.0-alpha.2'; @@ -36,5 +38,17 @@ export function createApp() { if (import.meta.url === `file://${process.argv[1]}`) { const port = process.env.PORT || 3000; - createApp().listen(port, () => log.info({ port }, 'void-server listening')); + const app = createApp(); + queue.start() + .then(registerWorkers) + .then(() => log.info('job queue ready')) + .catch(err => log.error({ err }, 'queue boot failed')); + app.listen(port, () => log.info({ port }, 'void-server listening')); + for (const sig of ['SIGTERM', 'SIGINT']) { + process.on(sig, async () => { + log.info({ sig }, 'shutting down'); + try { await queue.stop(); } catch { /* */ } + process.exit(0); + }); + } } diff --git a/tests/helpers/boss.js b/tests/helpers/boss.js index 1d630b7..99808f5 100644 --- a/tests/helpers/boss.js +++ b/tests/helpers/boss.js @@ -6,18 +6,14 @@ export async function stopBoss() { try { await pool.query('DROP SCHEMA IF EXISTS pgboss CASCADE'); } catch { /* ignore */ } } -export async function waitForJob(id, { timeoutMs = 5_000 } = {}) { +export async function waitForJob(name, id, { timeoutMs = 5_000 } = {}) { const boss = queue.instance(); if (!boss) throw new Error('queue not started'); const start = Date.now(); while (Date.now() - start < timeoutMs) { - const j = await boss.getJobById(id); - if (!j) { - await new Promise(r => setTimeout(r, 50)); - continue; - } - if (['completed','failed','cancelled','expired'].includes(j.state)) return j; + const j = await boss.getJobById(name, id, { includeArchive: true }); + if (j && ['completed','failed','cancelled','expired'].includes(j.state)) return j; await new Promise(r => setTimeout(r, 50)); } - throw new Error(`job ${id} did not finish in ${timeoutMs} ms`); + throw new Error(`job ${name} ${id} did not finish in ${timeoutMs} ms`); } diff --git a/tests/jobs/workers/echo.test.js b/tests/jobs/workers/echo.test.js new file mode 100644 index 0000000..192158f --- /dev/null +++ b/tests/jobs/workers/echo.test.js @@ -0,0 +1,22 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../../helpers/db.js'; +import { migrateUp } from '../../../lib/db/migrate.js'; +import { stopBoss, waitForJob } from '../../helpers/boss.js'; +import * as queue from '../../../lib/jobs/queue.js'; +import { registerWorkers } from '../../../lib/jobs/index.js'; + +beforeEach(async () => { + await resetDb(); await migrateUp(); + await queue.start(); + await registerWorkers(); +}); +afterEach(async () => { await stopBoss(); }); + +describe('echo worker', () => { + it('completes with the expected output', async () => { + const id = await queue.enqueue('echo', { ping: 7 }); + const j = await waitForJob('echo', id); + expect(j.state).toBe('completed'); + expect(j.output).toEqual({ pong: 7 }); + }); +});