feat(jobs): echo worker + CLI bootstrap
Job queue starts only in the CLI gate (not inside createApp), so tests manage their own queue lifecycle. waitForJob() takes a (name, id) pair to match pg-boss v10's getJobById signature. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
10
lib/jobs/index.js
Normal file
10
lib/jobs/index.js
Normal file
@@ -0,0 +1,10 @@
|
||||
import * as queue from './queue.js';
|
||||
import * as echo from './workers/echo.js';
|
||||
|
||||
const WORKERS = [echo];
|
||||
|
||||
export async function registerWorkers() {
|
||||
for (const w of WORKERS) {
|
||||
await queue.subscribe(w.NAME, w.handler, w.opts || {});
|
||||
}
|
||||
}
|
||||
5
lib/jobs/workers/echo.js
Normal file
5
lib/jobs/workers/echo.js
Normal file
@@ -0,0 +1,5 @@
|
||||
export const NAME = 'echo';
|
||||
|
||||
export async function handler(job) {
|
||||
return { pong: job.data?.ping ?? 0 };
|
||||
}
|
||||
16
server.js
16
server.js
@@ -3,6 +3,8 @@ import express from 'express';
|
||||
import { pool } from './lib/db/pool.js';
|
||||
import { log } from './lib/log.js';
|
||||
import { mountApi } from './lib/api/index.js';
|
||||
import * as queue from './lib/jobs/queue.js';
|
||||
import { registerWorkers } from './lib/jobs/index.js';
|
||||
|
||||
const VERSION = '2.0.0-alpha.2';
|
||||
|
||||
@@ -36,5 +38,17 @@ export function createApp() {
|
||||
|
||||
if (import.meta.url === `file://${process.argv[1]}`) {
|
||||
const port = process.env.PORT || 3000;
|
||||
createApp().listen(port, () => log.info({ port }, 'void-server listening'));
|
||||
const app = createApp();
|
||||
queue.start()
|
||||
.then(registerWorkers)
|
||||
.then(() => log.info('job queue ready'))
|
||||
.catch(err => log.error({ err }, 'queue boot failed'));
|
||||
app.listen(port, () => log.info({ port }, 'void-server listening'));
|
||||
for (const sig of ['SIGTERM', 'SIGINT']) {
|
||||
process.on(sig, async () => {
|
||||
log.info({ sig }, 'shutting down');
|
||||
try { await queue.stop(); } catch { /* */ }
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,18 +6,14 @@ export async function stopBoss() {
|
||||
try { await pool.query('DROP SCHEMA IF EXISTS pgboss CASCADE'); } catch { /* ignore */ }
|
||||
}
|
||||
|
||||
export async function waitForJob(id, { timeoutMs = 5_000 } = {}) {
|
||||
export async function waitForJob(name, id, { timeoutMs = 5_000 } = {}) {
|
||||
const boss = queue.instance();
|
||||
if (!boss) throw new Error('queue not started');
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < timeoutMs) {
|
||||
const j = await boss.getJobById(id);
|
||||
if (!j) {
|
||||
await new Promise(r => setTimeout(r, 50));
|
||||
continue;
|
||||
}
|
||||
if (['completed','failed','cancelled','expired'].includes(j.state)) return j;
|
||||
const j = await boss.getJobById(name, id, { includeArchive: true });
|
||||
if (j && ['completed','failed','cancelled','expired'].includes(j.state)) return j;
|
||||
await new Promise(r => setTimeout(r, 50));
|
||||
}
|
||||
throw new Error(`job ${id} did not finish in ${timeoutMs} ms`);
|
||||
throw new Error(`job ${name} ${id} did not finish in ${timeoutMs} ms`);
|
||||
}
|
||||
|
||||
22
tests/jobs/workers/echo.test.js
Normal file
22
tests/jobs/workers/echo.test.js
Normal file
@@ -0,0 +1,22 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import { resetDb } from '../../helpers/db.js';
|
||||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||||
import { stopBoss, waitForJob } from '../../helpers/boss.js';
|
||||
import * as queue from '../../../lib/jobs/queue.js';
|
||||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||||
|
||||
beforeEach(async () => {
|
||||
await resetDb(); await migrateUp();
|
||||
await queue.start();
|
||||
await registerWorkers();
|
||||
});
|
||||
afterEach(async () => { await stopBoss(); });
|
||||
|
||||
describe('echo worker', () => {
|
||||
it('completes with the expected output', async () => {
|
||||
const id = await queue.enqueue('echo', { ping: 7 });
|
||||
const j = await waitForJob('echo', id);
|
||||
expect(j.state).toBe('completed');
|
||||
expect(j.output).toEqual({ pong: 7 });
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user