feat(jobs): repo-level embed triggers (pages/refs/source_docs)
create/update on embeddable repos enqueue embed.text with a singleton key that coalesces rapid edits. No-op when the queue is not running (server tests construct createApp without booting pg-boss). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import { pool } from '../pool.js';
|
import { pool } from '../pool.js';
|
||||||
import { recordAudit } from './audit_stub.js';
|
import { recordAudit } from './audit_stub.js';
|
||||||
|
import { triggerEmbed } from '../../jobs/triggers.js';
|
||||||
|
|
||||||
async function snapshot(client, page_id, body_md, edited_by) {
|
async function snapshot(client, page_id, body_md, edited_by) {
|
||||||
await client.query(
|
await client.query(
|
||||||
@@ -21,6 +22,7 @@ export async function create({ space_id, slug, title, body_md = '', parent_id },
|
|||||||
await snapshot(client, r.id, body_md, actor?.kind);
|
await snapshot(client, r.id, body_md, actor?.kind);
|
||||||
await client.query('COMMIT');
|
await client.query('COMMIT');
|
||||||
await recordAudit(actor, 'create', 'page', r.id, null, r);
|
await recordAudit(actor, 'create', 'page', r.id, null, r);
|
||||||
|
await triggerEmbed('page', r.id);
|
||||||
return r;
|
return r;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
await client.query('ROLLBACK'); throw e;
|
await client.query('ROLLBACK'); throw e;
|
||||||
@@ -79,6 +81,7 @@ export async function update(id, patch, actor) {
|
|||||||
}
|
}
|
||||||
await client.query('COMMIT');
|
await client.query('COMMIT');
|
||||||
await recordAudit(actor, 'update', 'page', id, before, r);
|
await recordAudit(actor, 'update', 'page', id, before, r);
|
||||||
|
if (patch.embedding === undefined) await triggerEmbed('page', id);
|
||||||
return r;
|
return r;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
await client.query('ROLLBACK'); throw e;
|
await client.query('ROLLBACK'); throw e;
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { pool } from '../pool.js';
|
import { pool } from '../pool.js';
|
||||||
import { recordAudit } from './audit_stub.js';
|
import { recordAudit } from './audit_stub.js';
|
||||||
|
import { triggerEmbed } from '../../jobs/triggers.js';
|
||||||
|
|
||||||
const FIELDS = [
|
const FIELDS = [
|
||||||
'space_id','kind','source_url','title','description','summary',
|
'space_id','kind','source_url','title','description','summary',
|
||||||
@@ -21,6 +22,7 @@ export async function create(input, actor) {
|
|||||||
vals
|
vals
|
||||||
);
|
);
|
||||||
await recordAudit(actor, 'create', 'ref', r.id, null, r);
|
await recordAudit(actor, 'create', 'ref', r.id, null, r);
|
||||||
|
await triggerEmbed('ref', r.id);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,6 +74,7 @@ export async function update(id, patch, actor) {
|
|||||||
vals
|
vals
|
||||||
);
|
);
|
||||||
await recordAudit(actor, 'update', 'ref', id, before, r);
|
await recordAudit(actor, 'update', 'ref', id, before, r);
|
||||||
|
if (patch.embedding === undefined) await triggerEmbed('ref', id);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { pool } from '../pool.js';
|
import { pool } from '../pool.js';
|
||||||
import { recordAudit } from './audit_stub.js';
|
import { recordAudit } from './audit_stub.js';
|
||||||
|
import { triggerEmbed } from '../../jobs/triggers.js';
|
||||||
|
|
||||||
const FIELDS = ['resource_id','name','upstream_url','version','format','sync_source','local_path','body_text','embedding','last_synced','metadata'];
|
const FIELDS = ['resource_id','name','upstream_url','version','format','sync_source','local_path','body_text','embedding','last_synced','metadata'];
|
||||||
|
|
||||||
@@ -14,6 +15,7 @@ export async function create(input, actor) {
|
|||||||
vals
|
vals
|
||||||
);
|
);
|
||||||
await recordAudit(actor, 'create', 'source_doc', r.id, null, r);
|
await recordAudit(actor, 'create', 'source_doc', r.id, null, r);
|
||||||
|
await triggerEmbed('source_doc', r.id);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -43,6 +45,7 @@ export async function update(id, patch, actor) {
|
|||||||
vals
|
vals
|
||||||
);
|
);
|
||||||
await recordAudit(actor, 'update', 'source_doc', id, before, r);
|
await recordAudit(actor, 'update', 'source_doc', id, before, r);
|
||||||
|
if (patch.embedding === undefined) await triggerEmbed('source_doc', id);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
18
lib/jobs/triggers.js
Normal file
18
lib/jobs/triggers.js
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
import * as queue from './queue.js';
|
||||||
|
import { log } from '../log.js';
|
||||||
|
|
||||||
|
// Fire-and-forget enqueue of an embed.text job after a repo write.
|
||||||
|
// Never blocks the write: if the queue is not running (server tests),
|
||||||
|
// or pg-boss errors transiently, we log + move on. Pending rows get
|
||||||
|
// picked up by a future re-embed cron in Plan 4+.
|
||||||
|
export async function triggerEmbed(entity_type, entity_id) {
|
||||||
|
if (!queue.instance()) return; // not started — no-op
|
||||||
|
try {
|
||||||
|
await queue.enqueue('embed.text',
|
||||||
|
{ entity_type, entity_id },
|
||||||
|
{ singletonKey: `${entity_type}:${entity_id}` }
|
||||||
|
);
|
||||||
|
} catch (e) {
|
||||||
|
log.warn({ err: e, entity_type, entity_id }, 'triggerEmbed failed');
|
||||||
|
}
|
||||||
|
}
|
||||||
61
tests/jobs/triggers.test.js
Normal file
61
tests/jobs/triggers.test.js
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||||||
|
import { resetDb } from '../helpers/db.js';
|
||||||
|
import { migrateUp } from '../../lib/db/migrate.js';
|
||||||
|
import { stopBoss, waitForJob } from '../helpers/boss.js';
|
||||||
|
import { pool } from '../../lib/db/pool.js';
|
||||||
|
import * as queue from '../../lib/jobs/queue.js';
|
||||||
|
import { registerWorkers } from '../../lib/jobs/index.js';
|
||||||
|
import * as spaces from '../../lib/db/repos/spaces.js';
|
||||||
|
import * as pages from '../../lib/db/repos/pages.js';
|
||||||
|
import * as refs from '../../lib/db/repos/refs.js';
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||||||
|
global.fetch = vi.fn(async () => new Response(
|
||||||
|
JSON.stringify({ embedding: new Array(768).fill(0.1) }),
|
||||||
|
{ status: 200, headers: { 'content-type': 'application/json' }}
|
||||||
|
));
|
||||||
|
});
|
||||||
|
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
|
||||||
|
|
||||||
|
describe('repo embed triggers', () => {
|
||||||
|
it('pages.create enqueues embed.text and the worker runs', async () => {
|
||||||
|
const sp = await spaces.create({ slug: 't', name: 'T' }, { kind: 'user', id: null });
|
||||||
|
const pg = await pages.create(
|
||||||
|
{ space_id: sp.id, slug: 'p', title: 'P', body_md: 'b' },
|
||||||
|
{ kind: 'user', id: null }
|
||||||
|
);
|
||||||
|
// wait until the page row has an embedding
|
||||||
|
for (let i = 0; i < 100; i++) {
|
||||||
|
const { rows: [row] } = await pool.query('SELECT embedding FROM pages WHERE id=$1', [pg.id]);
|
||||||
|
if (row.embedding) return;
|
||||||
|
await new Promise(r => setTimeout(r, 100));
|
||||||
|
}
|
||||||
|
throw new Error('page was not embedded within 10 s');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('refs.create enqueues embed.text', async () => {
|
||||||
|
const sp = await spaces.create({ slug: 't2', name: 'T2' }, { kind: 'user', id: null });
|
||||||
|
const r = await refs.create({
|
||||||
|
space_id: sp.id, kind: 'url', source_url: 'https://example.com/x', title: 'X'
|
||||||
|
}, { kind: 'user', id: null });
|
||||||
|
for (let i = 0; i < 100; i++) {
|
||||||
|
const { rows: [row] } = await pool.query('SELECT embedding FROM refs WHERE id=$1', [r.id]);
|
||||||
|
if (row.embedding) return;
|
||||||
|
await new Promise(r => setTimeout(r, 100));
|
||||||
|
}
|
||||||
|
throw new Error('ref was not embedded within 10 s');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('no-op when queue is not running (matches prior server.test behavior)', async () => {
|
||||||
|
// Stop the queue mid-test
|
||||||
|
await stopBoss();
|
||||||
|
const sp = await spaces.create({ slug: 't3', name: 'T3' }, { kind: 'user', id: null });
|
||||||
|
// Should not throw
|
||||||
|
const pg = await pages.create(
|
||||||
|
{ space_id: sp.id, slug: 'p', title: 'P', body_md: 'b' },
|
||||||
|
{ kind: 'user', id: null }
|
||||||
|
);
|
||||||
|
expect(pg.id).toBeTruthy();
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user