diff --git a/lib/db/repos/pages.js b/lib/db/repos/pages.js index 140f50b..f25006b 100644 --- a/lib/db/repos/pages.js +++ b/lib/db/repos/pages.js @@ -1,5 +1,6 @@ import { pool } from '../pool.js'; import { recordAudit } from './audit_stub.js'; +import { triggerEmbed } from '../../jobs/triggers.js'; async function snapshot(client, page_id, body_md, edited_by) { await client.query( @@ -21,6 +22,7 @@ export async function create({ space_id, slug, title, body_md = '', parent_id }, await snapshot(client, r.id, body_md, actor?.kind); await client.query('COMMIT'); await recordAudit(actor, 'create', 'page', r.id, null, r); + await triggerEmbed('page', r.id); return r; } catch (e) { await client.query('ROLLBACK'); throw e; @@ -79,6 +81,7 @@ export async function update(id, patch, actor) { } await client.query('COMMIT'); await recordAudit(actor, 'update', 'page', id, before, r); + if (patch.embedding === undefined) await triggerEmbed('page', id); return r; } catch (e) { await client.query('ROLLBACK'); throw e; diff --git a/lib/db/repos/refs.js b/lib/db/repos/refs.js index 348189e..9d1a2ea 100644 --- a/lib/db/repos/refs.js +++ b/lib/db/repos/refs.js @@ -1,5 +1,6 @@ import { pool } from '../pool.js'; import { recordAudit } from './audit_stub.js'; +import { triggerEmbed } from '../../jobs/triggers.js'; const FIELDS = [ 'space_id','kind','source_url','title','description','summary', @@ -21,6 +22,7 @@ export async function create(input, actor) { vals ); await recordAudit(actor, 'create', 'ref', r.id, null, r); + await triggerEmbed('ref', r.id); return r; } @@ -72,6 +74,7 @@ export async function update(id, patch, actor) { vals ); await recordAudit(actor, 'update', 'ref', id, before, r); + if (patch.embedding === undefined) await triggerEmbed('ref', id); return r; } diff --git a/lib/db/repos/source_docs.js b/lib/db/repos/source_docs.js index b20606f..5470b9b 100644 --- a/lib/db/repos/source_docs.js +++ b/lib/db/repos/source_docs.js @@ -1,5 +1,6 @@ import { pool } from '../pool.js'; import { recordAudit } from './audit_stub.js'; +import { triggerEmbed } from '../../jobs/triggers.js'; const FIELDS = ['resource_id','name','upstream_url','version','format','sync_source','local_path','body_text','embedding','last_synced','metadata']; @@ -14,6 +15,7 @@ export async function create(input, actor) { vals ); await recordAudit(actor, 'create', 'source_doc', r.id, null, r); + await triggerEmbed('source_doc', r.id); return r; } @@ -43,6 +45,7 @@ export async function update(id, patch, actor) { vals ); await recordAudit(actor, 'update', 'source_doc', id, before, r); + if (patch.embedding === undefined) await triggerEmbed('source_doc', id); return r; } diff --git a/lib/jobs/triggers.js b/lib/jobs/triggers.js new file mode 100644 index 0000000..e010ca6 --- /dev/null +++ b/lib/jobs/triggers.js @@ -0,0 +1,18 @@ +import * as queue from './queue.js'; +import { log } from '../log.js'; + +// Fire-and-forget enqueue of an embed.text job after a repo write. +// Never blocks the write: if the queue is not running (server tests), +// or pg-boss errors transiently, we log + move on. Pending rows get +// picked up by a future re-embed cron in Plan 4+. +export async function triggerEmbed(entity_type, entity_id) { + if (!queue.instance()) return; // not started — no-op + try { + await queue.enqueue('embed.text', + { entity_type, entity_id }, + { singletonKey: `${entity_type}:${entity_id}` } + ); + } catch (e) { + log.warn({ err: e, entity_type, entity_id }, 'triggerEmbed failed'); + } +} diff --git a/tests/jobs/triggers.test.js b/tests/jobs/triggers.test.js new file mode 100644 index 0000000..c1d0671 --- /dev/null +++ b/tests/jobs/triggers.test.js @@ -0,0 +1,61 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../helpers/db.js'; +import { migrateUp } from '../../lib/db/migrate.js'; +import { stopBoss, waitForJob } from '../helpers/boss.js'; +import { pool } from '../../lib/db/pool.js'; +import * as queue from '../../lib/jobs/queue.js'; +import { registerWorkers } from '../../lib/jobs/index.js'; +import * as spaces from '../../lib/db/repos/spaces.js'; +import * as pages from '../../lib/db/repos/pages.js'; +import * as refs from '../../lib/db/repos/refs.js'; + +beforeEach(async () => { + await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); + global.fetch = vi.fn(async () => new Response( + JSON.stringify({ embedding: new Array(768).fill(0.1) }), + { status: 200, headers: { 'content-type': 'application/json' }} + )); +}); +afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); }); + +describe('repo embed triggers', () => { + it('pages.create enqueues embed.text and the worker runs', async () => { + const sp = await spaces.create({ slug: 't', name: 'T' }, { kind: 'user', id: null }); + const pg = await pages.create( + { space_id: sp.id, slug: 'p', title: 'P', body_md: 'b' }, + { kind: 'user', id: null } + ); + // wait until the page row has an embedding + for (let i = 0; i < 100; i++) { + const { rows: [row] } = await pool.query('SELECT embedding FROM pages WHERE id=$1', [pg.id]); + if (row.embedding) return; + await new Promise(r => setTimeout(r, 100)); + } + throw new Error('page was not embedded within 10 s'); + }); + + it('refs.create enqueues embed.text', async () => { + const sp = await spaces.create({ slug: 't2', name: 'T2' }, { kind: 'user', id: null }); + const r = await refs.create({ + space_id: sp.id, kind: 'url', source_url: 'https://example.com/x', title: 'X' + }, { kind: 'user', id: null }); + for (let i = 0; i < 100; i++) { + const { rows: [row] } = await pool.query('SELECT embedding FROM refs WHERE id=$1', [r.id]); + if (row.embedding) return; + await new Promise(r => setTimeout(r, 100)); + } + throw new Error('ref was not embedded within 10 s'); + }); + + it('no-op when queue is not running (matches prior server.test behavior)', async () => { + // Stop the queue mid-test + await stopBoss(); + const sp = await spaces.create({ slug: 't3', name: 'T3' }, { kind: 'user', id: null }); + // Should not throw + const pg = await pages.create( + { space_id: sp.id, slug: 'p', title: 'P', body_md: 'b' }, + { kind: 'user', id: null } + ); + expect(pg.id).toBeTruthy(); + }); +});