diff --git a/lib/jobs/index.js b/lib/jobs/index.js index e900f48..6c087bf 100644 --- a/lib/jobs/index.js +++ b/lib/jobs/index.js @@ -2,8 +2,9 @@ import * as queue from './queue.js'; import * as echo from './workers/echo.js'; import * as url from './workers/url.js'; import * as blob from './workers/blob.js'; +import * as embed from './workers/embed.js'; -const WORKERS = [echo, url, blob]; +const WORKERS = [echo, url, blob, embed]; export async function registerWorkers() { for (const w of WORKERS) { diff --git a/lib/jobs/workers/embed.js b/lib/jobs/workers/embed.js new file mode 100644 index 0000000..3f596b0 --- /dev/null +++ b/lib/jobs/workers/embed.js @@ -0,0 +1,29 @@ +import { embedText, padTo } from '../../ai/ollama.js'; +import { pool } from '../../db/pool.js'; +import { recordAudit } from '../../db/repos/audit.js'; + +export const NAME = 'embed.text'; + +const STRING_BUILDERS = { + page: row => `${row.title}\n\n${row.body_md || ''}`, + ref: row => `${row.title || ''}\n${row.summary || ''}\n${row.body_text || ''}`, + source_doc: row => `${row.name}\n${row.body_text || ''}`, + conversation: row => `${row.title || ''}\n${row.summary || ''}` +}; + +const TABLE = { page: 'pages', ref: 'refs', source_doc: 'source_docs', conversation: 'conversations' }; + +export async function handler(job) { + const { entity_type, entity_id } = job.data; + const table = TABLE[entity_type]; + if (!table) throw new Error(`unknown entity_type: ${entity_type}`); + const { rows: [row] } = await pool.query(`SELECT * FROM ${table} WHERE id=$1`, [entity_id]); + if (!row) return { skipped: 'gone' }; + const text = STRING_BUILDERS[entity_type](row).slice(0, 6_000); + const v = await embedText(text); + const padded = padTo(v, 1024); + const literal = '[' + padded.join(',') + ']'; + await pool.query(`UPDATE ${table} SET embedding=$1::vector WHERE id=$2`, [literal, entity_id]); + await recordAudit({ kind: 'worker', id: null }, 'update', entity_type, entity_id, null, { embedding: 'updated' }); + return { entity_id }; +} diff --git a/tests/jobs/workers/embed.test.js b/tests/jobs/workers/embed.test.js new file mode 100644 index 0000000..d7c3708 --- /dev/null +++ b/tests/jobs/workers/embed.test.js @@ -0,0 +1,53 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../../helpers/db.js'; +import { migrateUp } from '../../../lib/db/migrate.js'; +import { stopBoss, waitForJob } from '../../helpers/boss.js'; +import { pool } from '../../../lib/db/pool.js'; +import * as queue from '../../../lib/jobs/queue.js'; +import { registerWorkers } from '../../../lib/jobs/index.js'; +import * as spaces from '../../../lib/db/repos/spaces.js'; +import * as pages from '../../../lib/db/repos/pages.js'; + +beforeEach(async () => { + await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); + global.fetch = vi.fn(async () => new Response( + JSON.stringify({ embedding: new Array(768).fill(0.42) }), + { status: 200, headers: { 'content-type': 'application/json' }} + )); +}); +afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); }); + +describe('embed.text worker', () => { + it('writes a vector(1024) to the page row', async () => { + const sp = await spaces.create({ slug: 'e', name: 'E' }, { kind: 'user', id: null }); + const pg = await pages.create( + { space_id: sp.id, slug: 'p', title: 'P', body_md: 'body' }, + { kind: 'user', id: null } + ); + const id = await queue.enqueue('embed.text', { entity_type: 'page', entity_id: pg.id }); + const j = await waitForJob('embed.text', id, { timeoutMs: 10_000 }); + expect(j.state).toBe('completed'); + const { rows: [row] } = await pool.query( + 'SELECT embedding::text AS embedding FROM pages WHERE id=$1', [pg.id] + ); + expect(row.embedding).toBeTruthy(); + // 1024 floats → at least 1023 commas in the literal + const commas = (row.embedding.match(/,/g) || []).length; + expect(commas).toBe(1023); + }); + + it('emits an audit log entry', async () => { + const sp = await spaces.create({ slug: 'e2', name: 'E2' }, { kind: 'user', id: null }); + const pg = await pages.create( + { space_id: sp.id, slug: 'p', title: 'P', body_md: 'body' }, + { kind: 'user', id: null } + ); + const id = await queue.enqueue('embed.text', { entity_type: 'page', entity_id: pg.id }); + await waitForJob('embed.text', id, { timeoutMs: 10_000 }); + const { rows } = await pool.query( + `SELECT * FROM audit_log WHERE entity_type='page' AND entity_id=$1 AND actor_kind='worker'`, + [pg.id] + ); + expect(rows.length).toBeGreaterThan(0); + }); +});