feat(jobs): ingest.url worker (fetch + readability + idempotent ref)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
root
2026-06-01 03:35:44 +10:00
parent 6e973404e9
commit 3ccfd20b5f
3 changed files with 92 additions and 1 deletions

View File

@@ -1,7 +1,8 @@
import * as queue from './queue.js'; import * as queue from './queue.js';
import * as echo from './workers/echo.js'; import * as echo from './workers/echo.js';
import * as url from './workers/url.js';
const WORKERS = [echo]; const WORKERS = [echo, url];
export async function registerWorkers() { export async function registerWorkers() {
for (const w of WORKERS) { for (const w of WORKERS) {

43
lib/jobs/workers/url.js Normal file
View File

@@ -0,0 +1,43 @@
import crypto from 'node:crypto';
import { extract } from '../../ingest/readability.js';
import * as refs from '../../db/repos/refs.js';
import { pool } from '../../db/pool.js';
export const NAME = 'ingest.url';
function key(space_id, url) {
return crypto.createHash('sha256').update(space_id + '\x00' + url).digest('hex');
}
export async function handler(job) {
const { space_id, url } = job.data;
const idem = key(space_id, url);
const { rows: [existing] } = await pool.query(
`SELECT id FROM refs WHERE source_kind='url' AND external_id=$1 LIMIT 1`,
[idem]
);
if (existing) return { ref_id: existing.id, idempotent: true };
const res = await fetch(url, {
headers: { 'User-Agent': 'void-ingest/2.0' },
signal: AbortSignal.timeout(15_000)
});
if (!res.ok) throw new Error(`fetch ${url}${res.status}`);
const html = await res.text();
const parsed = extract(html, url);
const row = await refs.create({
space_id,
kind: 'url',
source_url: url,
title: parsed.title || url,
summary: parsed.excerpt,
body_text: (parsed.textContent || '').slice(0, 200_000),
source_kind: 'url',
external_id: idem,
metadata: { site_name: parsed.siteName, byline: parsed.byline }
}, { kind: 'system', id: null });
return { ref_id: row.id };
}

View File

@@ -0,0 +1,47 @@
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss, waitForJob } from '../../helpers/boss.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
import * as spaces from '../../../lib/db/repos/spaces.js';
import * as refs from '../../../lib/db/repos/refs.js';
const HTML = `<html><head><title>Blackflame</title></head><body>
<article>
<h1>Blackflame</h1>
<p>An essay on the Cradle aesthetic and the blackflame motif. Long enough for readability to consider this main content. Lorem ipsum dolor sit amet.</p>
<p>Another paragraph that pads out the article for readability detection.</p>
</article></body></html>`;
beforeEach(async () => {
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
global.fetch = vi.fn(async () => new Response(HTML, {
status: 200, headers: { 'content-type': 'text/html' }
}));
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('ingest.url worker', () => {
it('creates a ref from a URL', async () => {
const sp = await spaces.create({ slug: 'u', name: 'U' }, { kind: 'user', id: null });
const id = await queue.enqueue('ingest.url', { space_id: sp.id, url: 'https://example.com/a' });
const j = await waitForJob('ingest.url', id, { timeoutMs: 10_000 });
expect(j.state).toBe('completed');
const rows = await refs.list({ space_id: sp.id });
expect(rows[0].title).toMatch(/Blackflame/);
expect(rows[0].external_id).toBeTruthy();
expect(rows[0].source_kind).toBe('url');
});
it('idempotent on repeat enqueue (same space_id + url)', async () => {
const sp = await spaces.create({ slug: 'u2', name: 'U2' }, { kind: 'user', id: null });
const id1 = await queue.enqueue('ingest.url', { space_id: sp.id, url: 'https://example.com/b' });
await waitForJob('ingest.url', id1, { timeoutMs: 10_000 });
const id2 = await queue.enqueue('ingest.url', { space_id: sp.id, url: 'https://example.com/b' });
const j2 = await waitForJob('ingest.url', id2, { timeoutMs: 10_000 });
expect(j2.output.idempotent).toBe(true);
const rows = await refs.list({ space_id: sp.id });
expect(rows.length).toBe(1);
});
});