feat(jobs): embed.text worker (Ollama → vector(1024))

Pads nomic-embed-text's 768 dims to 1024 zeros so a later 1024-dim model
swap is a re-embed, not a migration (per master spec).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
root
2026-06-01 03:43:57 +10:00
parent 5799ea663e
commit 37b7753360
3 changed files with 84 additions and 1 deletions

View File

@@ -2,8 +2,9 @@ import * as queue from './queue.js';
import * as echo from './workers/echo.js'; import * as echo from './workers/echo.js';
import * as url from './workers/url.js'; import * as url from './workers/url.js';
import * as blob from './workers/blob.js'; import * as blob from './workers/blob.js';
import * as embed from './workers/embed.js';
const WORKERS = [echo, url, blob]; const WORKERS = [echo, url, blob, embed];
export async function registerWorkers() { export async function registerWorkers() {
for (const w of WORKERS) { for (const w of WORKERS) {

29
lib/jobs/workers/embed.js Normal file
View File

@@ -0,0 +1,29 @@
import { embedText, padTo } from '../../ai/ollama.js';
import { pool } from '../../db/pool.js';
import { recordAudit } from '../../db/repos/audit.js';
export const NAME = 'embed.text';
const STRING_BUILDERS = {
page: row => `${row.title}\n\n${row.body_md || ''}`,
ref: row => `${row.title || ''}\n${row.summary || ''}\n${row.body_text || ''}`,
source_doc: row => `${row.name}\n${row.body_text || ''}`,
conversation: row => `${row.title || ''}\n${row.summary || ''}`
};
const TABLE = { page: 'pages', ref: 'refs', source_doc: 'source_docs', conversation: 'conversations' };
export async function handler(job) {
const { entity_type, entity_id } = job.data;
const table = TABLE[entity_type];
if (!table) throw new Error(`unknown entity_type: ${entity_type}`);
const { rows: [row] } = await pool.query(`SELECT * FROM ${table} WHERE id=$1`, [entity_id]);
if (!row) return { skipped: 'gone' };
const text = STRING_BUILDERS[entity_type](row).slice(0, 6_000);
const v = await embedText(text);
const padded = padTo(v, 1024);
const literal = '[' + padded.join(',') + ']';
await pool.query(`UPDATE ${table} SET embedding=$1::vector WHERE id=$2`, [literal, entity_id]);
await recordAudit({ kind: 'worker', id: null }, 'update', entity_type, entity_id, null, { embedding: 'updated' });
return { entity_id };
}

View File

@@ -0,0 +1,53 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss, waitForJob } from '../../helpers/boss.js';
import { pool } from '../../../lib/db/pool.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
import * as spaces from '../../../lib/db/repos/spaces.js';
import * as pages from '../../../lib/db/repos/pages.js';
beforeEach(async () => {
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
global.fetch = vi.fn(async () => new Response(
JSON.stringify({ embedding: new Array(768).fill(0.42) }),
{ status: 200, headers: { 'content-type': 'application/json' }}
));
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('embed.text worker', () => {
it('writes a vector(1024) to the page row', async () => {
const sp = await spaces.create({ slug: 'e', name: 'E' }, { kind: 'user', id: null });
const pg = await pages.create(
{ space_id: sp.id, slug: 'p', title: 'P', body_md: 'body' },
{ kind: 'user', id: null }
);
const id = await queue.enqueue('embed.text', { entity_type: 'page', entity_id: pg.id });
const j = await waitForJob('embed.text', id, { timeoutMs: 10_000 });
expect(j.state).toBe('completed');
const { rows: [row] } = await pool.query(
'SELECT embedding::text AS embedding FROM pages WHERE id=$1', [pg.id]
);
expect(row.embedding).toBeTruthy();
// 1024 floats → at least 1023 commas in the literal
const commas = (row.embedding.match(/,/g) || []).length;
expect(commas).toBe(1023);
});
it('emits an audit log entry', async () => {
const sp = await spaces.create({ slug: 'e2', name: 'E2' }, { kind: 'user', id: null });
const pg = await pages.create(
{ space_id: sp.id, slug: 'p', title: 'P', body_md: 'body' },
{ kind: 'user', id: null }
);
const id = await queue.enqueue('embed.text', { entity_type: 'page', entity_id: pg.id });
await waitForJob('embed.text', id, { timeoutMs: 10_000 });
const { rows } = await pool.query(
`SELECT * FROM audit_log WHERE entity_type='page' AND entity_id=$1 AND actor_kind='worker'`,
[pg.id]
);
expect(rows.length).toBeGreaterThan(0);
});
});