Files
Void-Homelab/docs/superpowers/plans/2026-06-01-void-v2-plan3-capture.md
root 54ba68a11c docs: move void-v2 specs + plans into the repo
All Void 2.0 superpowers specs and implementation plans now live at
docs/superpowers/{specs,plans}/ inside the repo. Previously they were
at /project/docs/superpowers/ which was not under git.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:11:32 +10:00

2431 lines
79 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Void 2.0 — Plan 3: Capture pipeline + hybrid search
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Wire the Plan 2 SPA's stub Capture button to a real ingest pipeline. Add a pg-boss-backed job queue, three capture entry points (URL POST + Karakeep webhook + drag-drop attachment), workers that turn URLs and blobs into `refs`, an embeddings worker that fills the existing `embedding` columns via Ollama, and a hybrid FTS+vector search with RRF that replaces the Plan 2 FTS-only `/api/search`.
**Architecture:** Single-process void-server stays in Node. pg-boss runs as an embedded client inside the same Node process — its tables live in the shared void2-db alongside Void's tables. Workers register as in-process pollers via `pg-boss.work()`. Embeddings call Ollama at CT 102 over HTTP; on failure the search query gracefully degrades to FTS-only.
**Tech Stack:** Express 5, pg-boss 10, `@mozilla/readability` + `jsdom` for URL extraction, `multer` for upload streaming, native `fetch` for Ollama, vitest, supertest, vanilla DOM (via `dom.js`) for SPA additions.
**Spec:** `docs/superpowers/specs/2026-06-01-void-v2-plan3-capture.md` — read it first; this plan inherits every decision documented there.
---
## Out of scope (deferred)
- Whisper transcription, Tesseract OCR, yt-dlp video ingestion, scanned-PDF OCR — Plan 4 (Python `void-workers` service).
- AI Space/Project suggestion on capture — capture takes explicit `space_id`.
- Embedding chunks table — Plan 3 ships whole-doc embedding per entity row.
- MCP server — Plan 5+.
## Conventions (apply to every task)
1. **TDD.** Write the failing test first. Run it red. Implement. Run it green. Commit. Match the Plan 2 cadence.
2. **No raw SQL in routes** — repos only.
3. **Mutations pass `req.actor`** to the repo.
4. **Throw `NotFoundError` / `ValidationError` / `ForbiddenError`** — the existing error middleware handles them.
5. **Status codes**: 201 create, 200 read/update, 202 enqueued, 204 delete, 400 validation, 401 unauth, 403 capability deny, 404 not found, 409 conflict.
6. **Workers** use a single shape: `async function handler(job) { ... }`. Throw on retryable failure, log + return on permanent failure.
7. **Commit per task** with the message at the end of that task.
8. **Test isolation:** the existing `resetDb` + `migrateUp` helpers stay in use. For pg-boss tests, add a `stopBoss()` helper that drops the `pgboss` schema between suites.
---
## File structure (what gets created across Plan 3)
```
lib/
jobs/
queue.js # pg-boss singleton: start, stop, enqueue, work
index.js # registers all worker handlers; called from server.js
triggers.js # repo-level "after write → enqueue embed.text"
workers/
echo.js # trivial harness verification
url.js # ingest.url
blob.js # ingest.blob
karakeep.js # ingest.karakeep
embed.js # embed.text
ingest/
readability.js # @mozilla/readability wrapper
blob_store.js # sha256 + content-addressed path resolution
ai/
ollama.js # thin embed-text wrapper
karakeep/
client.js # thin GET /api/v1/bookmarks/:id wrapper
api/routes/
jobs.js # GET /api/jobs etc.
capture.js # POST /api/capture + POST /api/capture/upload
ingest.js # POST /api/ingest/karakeep (HMAC-verified)
db/repos/
jobs.js # thin SELECTs over pg-boss tables (typed views)
search.js # REWRITTEN: hybrid FTS+vector RRF
public/
views/jobs.js # SPA Jobs panel
components/dropzone.js # drag-drop wrapper for /capture/upload
tests/
jobs/queue.test.js
jobs/workers/echo.test.js
jobs/workers/url.test.js
jobs/workers/blob.test.js
jobs/workers/embed.test.js
jobs/workers/karakeep.test.js
jobs/triggers.test.js
ai/ollama.test.js
ingest/readability.test.js
ingest/blob_store.test.js
karakeep/client.test.js
api/jobs.test.js
api/capture.test.js
api/ingest.test.js
api/search.test.js # extended
repos/search.test.js # extended
integration/embed_live.test.js # skip if Ollama unreachable
helpers/boss.js # new: stopBoss() + waitForJob() helpers
```
---
## Phase A — Queue harness + Jobs API
### Task A1: Add pg-boss dependency
**Files:**
- Modify: `package.json` — add `"pg-boss": "^10.3.2"`.
- [ ] **Step 1:** `cd /project/src/void-v2 && npm i pg-boss@^10`
- [ ] **Step 2:** Verify the dep landed:
```bash
grep '"pg-boss"' package.json
```
Expected: `"pg-boss": "^10.x.x"`
- [ ] **Step 3:** Run the existing test suite — must still pass.
```bash
npx vitest run
```
Expected: 185 tests pass.
- [ ] **Step 4:** Commit.
```bash
git add package.json package-lock.json
git commit -m "chore(deps): add pg-boss ^10"
```
### Task A2: Create `lib/jobs/queue.js` singleton client
**Files:**
- Create: `lib/jobs/queue.js`
- Create: `tests/helpers/boss.js`
- Create: `tests/jobs/queue.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/queue.test.js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../helpers/db.js';
import { migrateUp } from '../../lib/db/migrate.js';
import { stopBoss } from '../helpers/boss.js';
import * as queue from '../../lib/jobs/queue.js';
beforeEach(async () => { await resetDb(); await migrateUp(); });
afterEach(async () => { await stopBoss(); });
describe('jobs/queue', () => {
it('starts, enqueues, and a worker receives the job', async () => {
await queue.start();
const received = new Promise(resolve => {
queue.subscribe('echo', async job => { resolve(job.data); });
});
const jobId = await queue.enqueue('echo', { hello: 'void' });
expect(jobId).toBeTruthy();
const data = await received;
expect(data).toEqual({ hello: 'void' });
});
});
```
- [ ] **Step 2:** Create the helper:
```js
// tests/helpers/boss.js
import * as queue from '../../lib/jobs/queue.js';
import { pool } from '../../lib/db/pool.js';
export async function stopBoss() {
try { await queue.stop(); } catch { /* ignore */ }
await pool.query('DROP SCHEMA IF EXISTS pgboss CASCADE');
}
```
- [ ] **Step 3: Run the test red.**
```bash
npx vitest run tests/jobs/queue.test.js
```
Expected: FAIL — `lib/jobs/queue.js` does not exist.
- [ ] **Step 4: Implement `queue.js`.**
```js
// lib/jobs/queue.js
import PgBoss from 'pg-boss';
import { log } from '../log.js';
let boss = null;
export async function start() {
if (boss) return boss;
boss = new PgBoss({
connectionString: process.env.DATABASE_URL,
newJobCheckIntervalSeconds: 2,
archiveCompletedAfterSeconds: 86_400,
deleteAfterDays: 7
});
boss.on('error', err => log.error({ err }, 'pg-boss error'));
await boss.start();
return boss;
}
export async function stop() {
if (!boss) return;
await boss.stop({ graceful: true, timeout: 5_000 });
boss = null;
}
export async function enqueue(name, data, opts = {}) {
if (!boss) throw new Error('queue not started');
return await boss.send(name, data, opts);
}
export async function subscribe(name, handler, opts = {}) {
if (!boss) throw new Error('queue not started');
return await boss.work(name, opts, async ([job]) => handler(job));
}
export function instance() { return boss; }
```
- [ ] **Step 5: Run the test green.**
```bash
npx vitest run tests/jobs/queue.test.js
```
Expected: 1 passed.
- [ ] **Step 6: Commit.**
```bash
git add lib/jobs/queue.js tests/helpers/boss.js tests/jobs/queue.test.js
git commit -m "feat(jobs): pg-boss singleton client"
```
### Task A3: Trivial `echo` worker + bootstrap registration
**Files:**
- Create: `lib/jobs/workers/echo.js`
- Create: `lib/jobs/index.js`
- Modify: `server.js` — call `jobs.start()` on boot, `jobs.stop()` on SIGTERM.
- Create: `tests/jobs/workers/echo.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/workers/echo.test.js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss } from '../../helpers/boss.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
beforeEach(async () => { await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); });
afterEach(async () => { await stopBoss(); });
describe('echo worker', () => {
it('completes successfully', async () => {
const id = await queue.enqueue('echo', { ping: 1 });
const boss = queue.instance();
// poll the job state up to 5s
for (let i = 0; i < 50; i++) {
const j = await boss.getJobById(id);
if (j?.state === 'completed') { expect(j.output).toEqual({ pong: 1 }); return; }
await new Promise(r => setTimeout(r, 100));
}
throw new Error('job did not complete');
});
});
```
- [ ] **Step 2: Run red.**
```bash
npx vitest run tests/jobs/workers/echo.test.js
```
Expected: FAIL — `lib/jobs/index.js` not found.
- [ ] **Step 3: Implement the echo worker.**
```js
// lib/jobs/workers/echo.js
export const NAME = 'echo';
export async function handler(job) {
return { pong: job.data?.ping ?? 0 };
}
```
- [ ] **Step 4: Implement the registrar.**
```js
// lib/jobs/index.js
import * as queue from './queue.js';
import * as echo from './workers/echo.js';
const WORKERS = [echo];
export async function registerWorkers() {
for (const w of WORKERS) {
await queue.subscribe(w.NAME, w.handler, w.opts || {});
}
}
```
- [ ] **Step 5: Wire into server.js — at the CLI block, NOT inside createApp().**
```js
// server.js — additions
import * as queue from './lib/jobs/queue.js';
import { registerWorkers } from './lib/jobs/index.js';
// Replace the existing `if (import.meta.url === ...)` block:
if (import.meta.url === `file://${process.argv[1]}`) {
const port = process.env.PORT || 3000;
const app = createApp();
queue.start()
.then(registerWorkers)
.catch(err => log.error({ err }, 'queue boot failed'));
app.listen(port, () => log.info({ port }, 'void-server listening'));
}
```
Tests construct the app via `createApp()` and manage the queue themselves through `queue.start()` / `stopBoss()`. Production path boots the queue once on startup.
- [ ] **Step 6: Run green.**
```bash
npx vitest run tests/jobs/workers/echo.test.js
```
Expected: 1 passed.
- [ ] **Step 7: Commit.**
```bash
git add lib/jobs/workers/echo.js lib/jobs/index.js server.js tests/jobs/workers/echo.test.js
git commit -m "feat(jobs): echo worker + bootstrap registration"
```
### Task A4: `lib/db/repos/jobs.js` thin SELECTs over pg-boss
**Files:**
- Create: `lib/db/repos/jobs.js`
- Create: `tests/repos/jobs.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/repos/jobs.test.js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../helpers/db.js';
import { migrateUp } from '../../lib/db/migrate.js';
import { stopBoss } from '../helpers/boss.js';
import * as queue from '../../lib/jobs/queue.js';
import { registerWorkers } from '../../lib/jobs/index.js';
import * as jobs from '../../lib/db/repos/jobs.js';
beforeEach(async () => { await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); });
afterEach(async () => { await stopBoss(); });
describe('jobs repo', () => {
it('list returns recent jobs across states', async () => {
const id = await queue.enqueue('echo', { ping: 1 });
// wait for completion
for (let i = 0; i < 50; i++) {
const j = await queue.instance().getJobById(id);
if (j?.state === 'completed') break;
await new Promise(r => setTimeout(r, 100));
}
const rows = await jobs.list({ limit: 10 });
expect(rows.find(r => r.id === id)).toBeTruthy();
});
it('getById returns null on unknown id', async () => {
expect(await jobs.getById('00000000-0000-0000-0000-000000000000')).toBeNull();
});
});
```
- [ ] **Step 2: Run red.**
```bash
npx vitest run tests/repos/jobs.test.js
```
Expected: FAIL — repo not found.
- [ ] **Step 3: Implement the repo.**
```js
// lib/db/repos/jobs.js
import { pool } from '../pool.js';
// pg-boss v10 stores jobs in pgboss.job (current) and pgboss.archive (finished).
// We expose a unified "list" that unions both, sorted by created.
export async function list({ state, name, limit = 50 } = {}) {
const where = [];
const args = [];
let i = 1;
if (state) { where.push(`state=$${i++}`); args.push(state); }
if (name) { where.push(`name=$${i++}`); args.push(name); }
args.push(limit);
const w = where.length ? `WHERE ${where.join(' AND ')}` : '';
const sql = `
SELECT id, name, state, data, output, retrycount, createdon, startedon, completedon
FROM (
SELECT id, name, state, data, output, retrycount, created_on AS createdon, started_on AS startedon, completed_on AS completedon
FROM pgboss.job
UNION ALL
SELECT id, name, state, data, output, retrycount, created_on AS createdon, started_on AS startedon, completed_on AS completedon
FROM pgboss.archive
) u ${w}
ORDER BY createdon DESC
LIMIT $${i}
`;
const { rows } = await pool.query(sql, args);
return rows;
}
export async function getById(id) {
const sql = `
SELECT id, name, state, data, output, retrycount, created_on, started_on, completed_on
FROM pgboss.job WHERE id=$1
UNION ALL
SELECT id, name, state, data, output, retrycount, created_on, started_on, completed_on
FROM pgboss.archive WHERE id=$1
LIMIT 1
`;
const { rows: [r] } = await pool.query(sql, [id]);
return r ?? null;
}
```
- [ ] **Step 4: Run green.**
```bash
npx vitest run tests/repos/jobs.test.js
```
Expected: 2 passed.
- [ ] **Step 5: Commit.**
```bash
git add lib/db/repos/jobs.js tests/repos/jobs.test.js
git commit -m "feat(jobs): jobs repo (list + getById)"
```
### Task A5: `/api/jobs` routes
**Files:**
- Create: `lib/api/routes/jobs.js`
- Modify: `lib/api/index.js` — mount `/jobs`.
- Create: `tests/api/jobs.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/api/jobs.test.js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import request from 'supertest';
import { setup } from './helpers.js';
import { stopBoss } from '../helpers/boss.js';
import * as queue from '../../lib/jobs/queue.js';
import { registerWorkers } from '../../lib/jobs/index.js';
let app, ownerHeaders;
beforeEach(async () => {
({ app, ownerHeaders } = await setup());
await queue.start();
await registerWorkers();
});
afterEach(async () => { await stopBoss(); });
describe('jobs api', () => {
it('GET /api/jobs returns recent jobs', async () => {
const id = await queue.enqueue('echo', { ping: 7 });
// wait for completion
for (let i = 0; i < 50; i++) {
const j = await queue.instance().getJobById(id);
if (j?.state === 'completed') break;
await new Promise(r => setTimeout(r, 100));
}
const res = await request(app).get('/api/jobs?limit=5').set(ownerHeaders);
expect(res.status).toBe(200);
expect(res.body.find(r => r.id === id)).toBeTruthy();
});
it('GET /api/jobs/:id 404 on unknown', async () => {
const res = await request(app)
.get('/api/jobs/00000000-0000-0000-0000-000000000000')
.set(ownerHeaders);
expect(res.status).toBe(404);
});
it('agent token → 403 (owner-only)', async () => {
// reuse capability_routes pattern: an agent token is denied
const headers = { Authorization: 'Bearer bad-agent' };
const res = await request(app).get('/api/jobs').set(headers);
expect(res.status).toBe(401);
});
});
```
- [ ] **Step 2: Run red.**
```bash
npx vitest run tests/api/jobs.test.js
```
Expected: FAIL — route 404.
- [ ] **Step 3: Implement the route.**
```js
// lib/api/routes/jobs.js
import { Router } from 'express';
import { z } from 'zod';
import * as repo from '../../db/repos/jobs.js';
import { validate } from '../validate.js';
import { requireOwner } from '../cap.js';
import { NotFoundError, asyncWrap } from '../errors.js';
import { parsePagination } from '../pagination.js';
const STATES = ['created','retry','active','completed','expired','cancelled','failed'];
const listQuery = z.object({
state: z.enum(STATES).optional(),
name: z.string().optional(),
limit: z.string().optional(),
offset: z.string().optional()
});
const idParams = z.object({ id: z.string().uuid() });
export const router = Router();
router.use(requireOwner);
router.get('/',
validate({ query: listQuery }),
asyncWrap(async (req, res) => {
const { limit } = parsePagination(req);
res.json(await repo.list({
state: req.validatedQuery.state,
name: req.validatedQuery.name,
limit
}));
})
);
router.get('/:id',
validate({ params: idParams }),
asyncWrap(async (req, res) => {
const row = await repo.getById(req.params.id);
if (!row) throw new NotFoundError('job not found');
res.json(row);
})
);
```
- [ ] **Step 4: Mount in `lib/api/index.js`.**
```js
// additions
import { router as jobsRouter } from './routes/jobs.js';
// alongside other api.use:
api.use('/jobs', jobsRouter);
```
- [ ] **Step 5: Run green.**
```bash
npx vitest run tests/api/jobs.test.js
```
Expected: 3 passed.
- [ ] **Step 6: Commit.**
```bash
git add lib/api/routes/jobs.js lib/api/index.js tests/api/jobs.test.js
git commit -m "feat(api): jobs routes (list + get, owner-only)"
```
### Task A6: `POST /api/jobs/:id/retry` and `DELETE /api/jobs/:id`
**Files:**
- Modify: `lib/api/routes/jobs.js`
- Modify: `lib/db/repos/jobs.js`
- Modify: `tests/repos/jobs.test.js`, `tests/api/jobs.test.js`
- [ ] **Step 1: Extend the repo test.**
```js
// append to tests/repos/jobs.test.js
it('retry resubmits a failed job', async () => {
// enqueue a job that will fail, then retry
// (We use a synthetic failed row inserted directly for determinism.)
const id = await queue.enqueue('echo', { ping: 'x' });
// mark it failed manually:
const { pool } = await import('../../lib/db/pool.js');
await pool.query(`UPDATE pgboss.job SET state='failed' WHERE id=$1`, [id]);
const out = await jobs.retry(id);
expect(out?.state).toBe('retry');
});
it('remove deletes by id', async () => {
const id = await queue.enqueue('echo', { ping: 'rm' });
await jobs.remove(id);
expect(await jobs.getById(id)).toBeNull();
});
```
- [ ] **Step 2:** Run red — both new tests fail (no `retry` / `remove` exports).
- [ ] **Step 3: Add to the repo.**
```js
// lib/db/repos/jobs.js — append
export async function retry(id) {
// Move row from failed/expired/cancelled back to 'retry'. pg-boss exposes
// .complete/.fail/.cancel but no direct resubmit; we update state in
// place. The poller will pick it up.
const { rows: [r] } = await pool.query(
`UPDATE pgboss.job SET state='retry', retrycount=retrycount+1
WHERE id=$1 RETURNING *`,
[id]
);
if (!r) {
// try archive — copy back into the active table
const { rows: [a] } = await pool.query(
`INSERT INTO pgboss.job (id, name, data, retrylimit, retrydelay, retrybackoff, startafter, expirein, state, retrycount)
SELECT id, name, data, retrylimit, retrydelay, retrybackoff, now(), expirein, 'retry', retrycount+1
FROM pgboss.archive WHERE id=$1
ON CONFLICT (id) DO UPDATE SET state='retry'
RETURNING *`,
[id]
);
return a ?? null;
}
return r;
}
export async function remove(id) {
await pool.query(`DELETE FROM pgboss.job WHERE id=$1`, [id]);
await pool.query(`DELETE FROM pgboss.archive WHERE id=$1`, [id]);
}
```
- [ ] **Step 4:** Run repo tests green.
```bash
npx vitest run tests/repos/jobs.test.js
```
Expected: 4 passed.
- [ ] **Step 5: Add the route handlers.**
```js
// lib/api/routes/jobs.js — append
router.post('/:id/retry',
validate({ params: idParams }),
asyncWrap(async (req, res) => {
const row = await repo.retry(req.params.id);
if (!row) throw new NotFoundError('job not found');
res.json(row);
})
);
router.delete('/:id',
validate({ params: idParams }),
asyncWrap(async (req, res) => {
await repo.remove(req.params.id);
res.status(204).end();
})
);
```
- [ ] **Step 6: Extend API tests.**
```js
// append to tests/api/jobs.test.js
it('POST :id/retry resubmits', async () => {
const id = await queue.enqueue('echo', { ping: 'r' });
const { pool } = await import('../../lib/db/pool.js');
await pool.query(`UPDATE pgboss.job SET state='failed' WHERE id=$1`, [id]);
const res = await request(app).post(`/api/jobs/${id}/retry`).set(ownerHeaders);
expect(res.status).toBe(200);
expect(res.body.state).toBe('retry');
});
it('DELETE :id removes', async () => {
const id = await queue.enqueue('echo', { ping: 'd' });
const res = await request(app).delete(`/api/jobs/${id}`).set(ownerHeaders);
expect(res.status).toBe(204);
});
```
- [ ] **Step 7:** Run green.
```bash
npx vitest run tests/api/jobs.test.js tests/repos/jobs.test.js
```
Expected: all pass.
- [ ] **Step 8: Commit.**
```bash
git add lib/db/repos/jobs.js lib/api/routes/jobs.js tests/repos/jobs.test.js tests/api/jobs.test.js
git commit -m "feat(api): jobs retry + delete"
```
### Task A7: Minimal Jobs view + sidebar entry
**Files:**
- Create: `public/views/jobs.js`
- Modify: `public/router.js`, `public/app.js`, `public/components/sidebar.js`
- [ ] **Step 1: Add the route to the hash router.**
```js
// public/router.js — add to ROUTES
{ name: 'jobs', re: /^\/jobs$/, keys: [] },
```
- [ ] **Step 2: Register the view.**
```js
// public/app.js — extend VIEWS
jobs: () => import('./views/jobs.js'),
```
- [ ] **Step 3: Write the minimal view.**
```js
// public/views/jobs.js
import { api } from '../api.js';
import { el, mount } from '../dom.js';
function row(j) {
return el('li', {},
el('span', { class: 'status idle' }, j.state),
' ', el('span', { style: { fontFamily: 'var(--font-mono)' } }, j.name),
' ', el('span', { class: 'muted' }, (j.id || '').slice(0, 8))
);
}
export async function render(main) {
const wrap = el('div');
mount(main,
el('h1', { class: 'view-h1' }, 'Jobs'),
el('p', { class: 'view-sub muted' }, 'pg-boss queue — recent jobs across states.'),
wrap
);
try {
const rows = await api.get('/api/jobs?limit=50');
if (!rows.length) mount(wrap, el('p', { class: 'muted' }, 'No jobs yet.'));
else mount(wrap, el('ul', { class: 'plain' }, rows.map(row)));
} catch (e) {
mount(wrap, el('p', { class: 'muted' }, 'Could not load: ' + e.message));
}
}
```
- [ ] **Step 4: Add the sidebar link.**
```js
// public/components/sidebar.js — add inside the Navigate section, after Inbox:
navItem('Jobs', '/jobs'),
```
- [ ] **Step 5:** Run the full suite — server tests should still pass.
```bash
npx vitest run tests/server.test.js
```
Expected: 6/6 pass.
- [ ] **Step 6: Commit.**
```bash
git add public/router.js public/app.js public/views/jobs.js public/components/sidebar.js
git commit -m "feat(ui): jobs view stub + sidebar entry"
```
### Task A8: Phase A close — full suite + memory checkpoint
- [ ] **Step 1:** Run the full suite.
```bash
npx vitest run
```
Expected: all green (185 + new Phase A tests).
- [ ] **Step 2:** Update memory `project_void_v2_execution.md`: mark "Plan 3 Phase A complete: queue harness + Jobs API + view stub".
---
## Phase B — Capture API + URL worker + blob storage
### Task B1: Add ingest deps
**Files:**
- Modify: `package.json` — add `@mozilla/readability`, `jsdom`, `multer`.
- [ ] **Step 1:** `cd /project/src/void-v2 && npm i @mozilla/readability jsdom multer`
- [ ] **Step 2:** Verify deps:
```bash
grep -E '"(@mozilla/readability|jsdom|multer)"' package.json
```
Expected: three lines.
- [ ] **Step 3:** Full suite still green.
```bash
npx vitest run
```
- [ ] **Step 4: Commit.**
```bash
git add package.json package-lock.json
git commit -m "chore(deps): readability + jsdom + multer for ingest"
```
### Task B2: `lib/ingest/readability.js`
**Files:**
- Create: `lib/ingest/readability.js`
- Create: `tests/ingest/readability.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/ingest/readability.test.js
import { describe, it, expect } from 'vitest';
import { extract } from '../../lib/ingest/readability.js';
const HTML = `
<html><head><title>Blackflame Notes</title>
<meta property="og:site_name" content="Hynesy"/>
</head><body><article>
<h1>Blackflame Notes</h1>
<p>An essay on the Cradle aesthetic and the blackflame motif.</p>
</article></body></html>`;
describe('readability.extract', () => {
it('pulls title and text', () => {
const out = extract(HTML, 'https://example.com/x');
expect(out.title).toMatch(/Blackflame/);
expect(out.textContent).toMatch(/Cradle/);
expect(out.siteName).toBe('Hynesy');
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/ingest/readability.js
import { JSDOM } from 'jsdom';
import { Readability } from '@mozilla/readability';
export function extract(html, url) {
const dom = new JSDOM(html, { url });
const reader = new Readability(dom.window.document);
const a = reader.parse();
if (!a) return { title: null, textContent: '', excerpt: null, byline: null, siteName: null };
return {
title: a.title,
textContent: a.textContent.trim(),
excerpt: a.excerpt || null,
byline: a.byline || null,
siteName: a.siteName || null
};
}
```
- [ ] **Step 4:** Run green.
```bash
npx vitest run tests/ingest/readability.test.js
```
- [ ] **Step 5: Commit.**
```bash
git add lib/ingest/readability.js tests/ingest/readability.test.js
git commit -m "feat(ingest): readability wrapper"
```
### Task B3: `lib/ingest/blob_store.js`
**Files:**
- Create: `lib/ingest/blob_store.js`
- Create: `tests/ingest/blob_store.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/ingest/blob_store.test.js
import { describe, it, expect, beforeEach } from 'vitest';
import fs from 'node:fs/promises';
import path from 'node:path';
import os from 'node:os';
import { BlobStore } from '../../lib/ingest/blob_store.js';
let root, store;
beforeEach(async () => {
root = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blob-'));
store = new BlobStore(root);
});
describe('blob_store', () => {
it('hashes content and resolves path', async () => {
const buf = Buffer.from('void');
const sha = await store.hash(buf);
expect(sha).toMatch(/^[0-9a-f]{64}$/);
const p = store.path(sha);
expect(p.startsWith(root)).toBe(true);
expect(p.includes(sha.slice(0, 2))).toBe(true);
});
it('write is idempotent (same content → same path)', async () => {
const buf = Buffer.from('void');
const a = await store.write(buf);
const b = await store.write(buf);
expect(a.path).toBe(b.path);
expect(a.sha).toBe(b.sha);
const onDisk = await fs.readFile(a.path);
expect(onDisk.equals(buf)).toBe(true);
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/ingest/blob_store.js
import crypto from 'node:crypto';
import fs from 'node:fs/promises';
import path from 'node:path';
export class BlobStore {
constructor(root) { this.root = root; }
async hash(buf) {
return crypto.createHash('sha256').update(buf).digest('hex');
}
path(sha) {
return path.join(this.root, sha.slice(0, 2), sha);
}
async write(buf) {
const sha = await this.hash(buf);
const dest = this.path(sha);
try {
await fs.access(dest);
} catch {
await fs.mkdir(path.dirname(dest), { recursive: true });
await fs.writeFile(dest, buf);
}
return { sha, path: dest };
}
}
let _default = null;
export function defaultStore() {
if (!_default) _default = new BlobStore(process.env.BLOB_ROOT || '/var/lib/void/blobs');
return _default;
}
```
- [ ] **Step 4:** Run green.
```bash
npx vitest run tests/ingest/blob_store.test.js
```
- [ ] **Step 5: Commit.**
```bash
git add lib/ingest/blob_store.js tests/ingest/blob_store.test.js
git commit -m "feat(ingest): content-addressed blob store"
```
### Task B4: `ingest.url` worker
**Files:**
- Create: `lib/jobs/workers/url.js`
- Modify: `lib/jobs/index.js` — register `url`.
- Create: `tests/jobs/workers/url.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/workers/url.test.js
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss } from '../../helpers/boss.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
import * as spaces from '../../../lib/db/repos/spaces.js';
import * as refs from '../../../lib/db/repos/refs.js';
const HTML = `<html><head><title>Blackflame</title></head><body><article><p>Cradle</p></article></body></html>`;
beforeEach(async () => {
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
global.fetch = vi.fn(async () => new Response(HTML, { status: 200, headers: { 'content-type': 'text/html' }}));
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('ingest.url worker', () => {
it('creates a ref from a URL', async () => {
const sp = await spaces.create({ slug: 's', name: 'S' }, { kind: 'user', id: null });
const id = await queue.enqueue('ingest.url', { space_id: sp.id, url: 'https://example.com/a' });
for (let i = 0; i < 50; i++) {
const j = await queue.instance().getJobById(id);
if (j?.state === 'completed') break;
await new Promise(r => setTimeout(r, 100));
}
const rows = await refs.list({ space_id: sp.id });
expect(rows[0].title).toMatch(/Blackflame/);
expect(rows[0].external_id).toBeTruthy();
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement the worker.**
```js
// lib/jobs/workers/url.js
import crypto from 'node:crypto';
import { extract } from '../../ingest/readability.js';
import * as refs from '../../db/repos/refs.js';
import { pool } from '../../db/pool.js';
export const NAME = 'ingest.url';
export const opts = { teamSize: 4, teamConcurrency: 4 };
function key(space_id, url) {
return crypto.createHash('sha256').update(space_id + '\x00' + url).digest('hex');
}
export async function handler(job) {
const { space_id, url } = job.data;
const idem = key(space_id, url);
// already ingested?
const { rows: [existing] } = await pool.query(
`SELECT id FROM refs WHERE source_kind='url' AND external_id=$1 LIMIT 1`,
[idem]
);
if (existing) return { ref_id: existing.id, idempotent: true };
const res = await fetch(url, {
headers: { 'User-Agent': 'void-ingest/2.0' },
signal: AbortSignal.timeout(15_000)
});
if (!res.ok) throw new Error(`fetch ${url} → ${res.status}`);
const html = await res.text();
const parsed = extract(html, url);
const row = await refs.create({
space_id,
kind: 'url',
source_url: url,
title: parsed.title || url,
summary: parsed.excerpt,
body_text: (parsed.textContent || '').slice(0, 200_000),
source_kind: 'url',
external_id: idem,
metadata: { site_name: parsed.siteName, byline: parsed.byline }
}, { kind: 'system', id: null });
return { ref_id: row.id };
}
```
- [ ] **Step 4: Register.**
```js
// lib/jobs/index.js — extend WORKERS
import * as url from './workers/url.js';
const WORKERS = [echo, url];
```
- [ ] **Step 5:** Run green.
```bash
npx vitest run tests/jobs/workers/url.test.js
```
- [ ] **Step 6: Commit.**
```bash
git add lib/jobs/workers/url.js lib/jobs/index.js tests/jobs/workers/url.test.js
git commit -m "feat(jobs): ingest.url worker"
```
### Task B5: `ingest.blob` worker
**Files:**
- Create: `lib/jobs/workers/blob.js`
- Modify: `lib/jobs/index.js`
- Create: `tests/jobs/workers/blob.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/workers/blob.test.js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import fs from 'node:fs/promises';
import path from 'node:path';
import os from 'node:os';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss } from '../../helpers/boss.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
import * as spaces from '../../../lib/db/repos/spaces.js';
import * as refs from '../../../lib/db/repos/refs.js';
let tmpRoot;
beforeEach(async () => {
tmpRoot = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blobs-'));
process.env.BLOB_ROOT = tmpRoot;
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
});
afterEach(async () => { await stopBoss(); });
describe('ingest.blob worker', () => {
it('creates a ref pointing at the blob', async () => {
const sp = await spaces.create({ slug: 'b', name: 'B' }, { kind: 'user', id: null });
// pre-stage an upload tmp file
const upTmp = path.join(tmpRoot, 'up.tmp');
await fs.writeFile(upTmp, Buffer.from('hello blob'));
const id = await queue.enqueue('ingest.blob', {
space_id: sp.id, tmp_path: upTmp, filename: 'hello.txt', content_type: 'text/plain'
});
for (let i = 0; i < 50; i++) {
const j = await queue.instance().getJobById(id);
if (j?.state === 'completed') break;
await new Promise(r => setTimeout(r, 100));
}
const rows = await refs.list({ space_id: sp.id });
expect(rows[0].kind).toBe('file');
expect(rows[0].blob_path).toBeTruthy();
expect(rows[0].title).toBe('hello.txt');
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/jobs/workers/blob.js
import fs from 'node:fs/promises';
import * as refs from '../../db/repos/refs.js';
import { defaultStore } from '../../ingest/blob_store.js';
export const NAME = 'ingest.blob';
export const opts = { teamSize: 2, teamConcurrency: 2 };
function kindFor(content_type, filename) {
if (content_type?.startsWith('image/')) return 'image';
if (content_type === 'application/pdf' || filename?.toLowerCase().endsWith('.pdf')) return 'pdf';
return 'file';
}
export async function handler(job) {
const { space_id, tmp_path, filename, content_type, meta = {} } = job.data;
const buf = await fs.readFile(tmp_path);
const { sha, path } = await defaultStore().write(buf);
// remove the temp file
try { await fs.unlink(tmp_path); } catch { /* */ }
const kind = kindFor(content_type, filename);
const row = await refs.create({
space_id,
kind,
source_url: null,
title: meta.title || filename || sha.slice(0, 12),
summary: null,
body_text: null,
blob_path: path,
metadata: { sha, content_type, filename, size: buf.length, ...(meta.metadata || {}) }
}, { kind: 'system', id: null });
return { ref_id: row.id, sha };
}
```
- [ ] **Step 4: Register.**
```js
// lib/jobs/index.js
import * as blob from './workers/blob.js';
const WORKERS = [echo, url, blob];
```
- [ ] **Step 5:** Run green.
```bash
npx vitest run tests/jobs/workers/blob.test.js
```
- [ ] **Step 6: Commit.**
```bash
git add lib/jobs/workers/blob.js lib/jobs/index.js tests/jobs/workers/blob.test.js
git commit -m "feat(jobs): ingest.blob worker"
```
### Task B6: `/api/capture` POST + `/api/capture/upload`
**Files:**
- Create: `lib/api/routes/capture.js`
- Modify: `lib/api/index.js`
- Create: `tests/api/capture.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/api/capture.test.js
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import fs from 'node:fs/promises';
import path from 'node:path';
import os from 'node:os';
import request from 'supertest';
import { setup } from './helpers.js';
import { stopBoss } from '../helpers/boss.js';
import * as queue from '../../lib/jobs/queue.js';
import { registerWorkers } from '../../lib/jobs/index.js';
import * as spaces from '../../lib/db/repos/spaces.js';
let app, ownerHeaders, sp;
beforeEach(async () => {
({ app, ownerHeaders } = await setup());
sp = await spaces.create({ slug: 'c', name: 'C' }, { kind: 'user', id: null });
process.env.BLOB_ROOT = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blobs-'));
await queue.start(); await registerWorkers();
global.fetch = vi.fn(async () => new Response(
'<html><head><title>X</title></head><body><article><p>x</p></article></body></html>',
{ status: 200, headers: { 'content-type': 'text/html' }}
));
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('capture api', () => {
it('POST /api/capture enqueues ingest.url', async () => {
const res = await request(app).post('/api/capture').set(ownerHeaders)
.send({ space_id: sp.id, url: 'https://example.com/a' });
expect(res.status).toBe(202);
expect(res.body.job_id).toBeTruthy();
expect(res.body.idempotency_key).toMatch(/^[0-9a-f]{64}$/);
});
it('POST /api/capture/upload enqueues ingest.blob', async () => {
const res = await request(app).post('/api/capture/upload').set(ownerHeaders)
.field('space_id', sp.id)
.attach('file', Buffer.from('hi'), { filename: 'a.txt', contentType: 'text/plain' });
expect(res.status).toBe(202);
expect(res.body.job_id).toBeTruthy();
});
it('POST /api/capture rejects missing url', async () => {
const res = await request(app).post('/api/capture').set(ownerHeaders)
.send({ space_id: sp.id });
expect(res.status).toBe(400);
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/api/routes/capture.js
import { Router } from 'express';
import { z } from 'zod';
import crypto from 'node:crypto';
import fs from 'node:fs/promises';
import path from 'node:path';
import os from 'node:os';
import multer from 'multer';
import * as queue from '../../jobs/queue.js';
import { pool } from '../../db/pool.js';
import { validate } from '../validate.js';
import { requireWrite } from '../cap.js';
import { asyncWrap } from '../errors.js';
const captureBody = z.object({
space_id: z.string().uuid(),
url: z.string().url(),
hint: z.object({
project_id: z.string().uuid().optional(),
title: z.string().optional(),
tags: z.array(z.string()).optional()
}).optional()
});
const UPLOAD_TMP = process.env.UPLOAD_TMP || path.join(os.tmpdir(), 'void-uploads');
await fs.mkdir(UPLOAD_TMP, { recursive: true });
const upload = multer({ dest: UPLOAD_TMP, limits: { fileSize: 100 * 1024 * 1024 } });
function key(space_id, url) {
return crypto.createHash('sha256').update(space_id + '\x00' + url).digest('hex');
}
export const router = Router();
router.post('/',
requireWrite('ref'),
validate({ body: captureBody }),
asyncWrap(async (req, res) => {
const { space_id, url } = req.body;
const idem = key(space_id, url);
const { rows: [existing] } = await pool.query(
`SELECT id FROM refs WHERE source_kind='url' AND external_id=$1 LIMIT 1`,
[idem]
);
if (existing) {
return res.status(202).json({
job_id: null, idempotency_key: idem, ref_id: existing.id
});
}
const job_id = await queue.enqueue('ingest.url', { space_id, url });
res.status(202).json({ job_id, idempotency_key: idem });
})
);
router.post('/upload',
requireWrite('ref'),
upload.single('file'),
asyncWrap(async (req, res) => {
if (!req.file) {
return res.status(400).json({ error: { code: 'validation_failed', message: 'file required' } });
}
const space_id = req.body.space_id;
if (!space_id) {
return res.status(400).json({ error: { code: 'validation_failed', message: 'space_id required' } });
}
const meta = req.body.meta ? JSON.parse(req.body.meta) : {};
const job_id = await queue.enqueue('ingest.blob', {
space_id,
tmp_path: req.file.path,
filename: req.file.originalname,
content_type: req.file.mimetype,
meta
});
res.status(202).json({ job_id });
})
);
```
- [ ] **Step 4: Mount.**
```js
// lib/api/index.js — additions
import { router as captureRouter } from './routes/capture.js';
api.use('/capture', captureRouter);
```
- [ ] **Step 5:** Run green.
```bash
npx vitest run tests/api/capture.test.js
```
- [ ] **Step 6: Commit.**
```bash
git add lib/api/routes/capture.js lib/api/index.js tests/api/capture.test.js
git commit -m "feat(api): capture POST + upload"
```
### Task B7: Phase B close — full suite + memory checkpoint
- [ ] **Step 1:** `npx vitest run` — all green.
- [ ] **Step 2:** Update memory: "Plan 3 Phase B complete: capture API + URL + blob workers".
---
## Phase C — Embeddings + hybrid search
### Task C1: `lib/ai/ollama.js`
**Files:**
- Create: `lib/ai/ollama.js`
- Create: `tests/ai/ollama.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/ai/ollama.test.js
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { embedText } from '../../lib/ai/ollama.js';
beforeEach(() => {
global.fetch = vi.fn(async () => new Response(
JSON.stringify({ embedding: new Array(768).fill(0.1) }),
{ status: 200, headers: { 'content-type': 'application/json' }}
));
});
afterEach(() => vi.restoreAllMocks());
describe('ollama.embedText', () => {
it('returns 768-dim vector', async () => {
const v = await embedText('hello');
expect(v).toHaveLength(768);
expect(v[0]).toBeCloseTo(0.1, 5);
});
it('throws on non-200', async () => {
global.fetch = vi.fn(async () => new Response('boom', { status: 502 }));
await expect(embedText('x')).rejects.toThrow();
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/ai/ollama.js
export class OllamaError extends Error {
constructor(status, body) { super(`ollama ${status}: ${body}`); this.status = status; }
}
export async function embedText(text, { model = 'nomic-embed-text', timeoutMs = 60_000 } = {}) {
const url = (process.env.OLLAMA_URL || 'http://192.168.1.185:11434') + '/api/embeddings';
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ model, prompt: text }),
signal: AbortSignal.timeout(timeoutMs)
});
if (!res.ok) throw new OllamaError(res.status, await res.text());
const j = await res.json();
return j.embedding;
}
export function padTo(vector, dim) {
if (vector.length === dim) return vector;
if (vector.length > dim) return vector.slice(0, dim);
const out = vector.slice();
while (out.length < dim) out.push(0);
return out;
}
```
- [ ] **Step 4:** Run green.
```bash
npx vitest run tests/ai/ollama.test.js
```
- [ ] **Step 5: Commit.**
```bash
git add lib/ai/ollama.js tests/ai/ollama.test.js
git commit -m "feat(ai): ollama embed-text wrapper"
```
### Task C2: `embed.text` worker
**Files:**
- Create: `lib/jobs/workers/embed.js`
- Modify: `lib/jobs/index.js`
- Create: `tests/jobs/workers/embed.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/workers/embed.test.js
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss } from '../../helpers/boss.js';
import { pool } from '../../../lib/db/pool.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
import * as spaces from '../../../lib/db/repos/spaces.js';
import * as pages from '../../../lib/db/repos/pages.js';
beforeEach(async () => {
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
global.fetch = vi.fn(async () => new Response(
JSON.stringify({ embedding: new Array(768).fill(0.42) }),
{ status: 200, headers: { 'content-type': 'application/json' }}
));
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('embed.text worker', () => {
it('writes embedding to the page row', async () => {
const sp = await spaces.create({ slug: 'e', name: 'E' }, { kind: 'user', id: null });
const pg = await pages.create({ space_id: sp.id, slug: 'p', title: 'P', body_md: 'body' }, { kind: 'user', id: null });
const id = await queue.enqueue('embed.text', { entity_type: 'page', entity_id: pg.id });
for (let i = 0; i < 50; i++) {
const j = await queue.instance().getJobById(id);
if (j?.state === 'completed') break;
await new Promise(r => setTimeout(r, 100));
}
const { rows: [row] } = await pool.query('SELECT embedding FROM pages WHERE id=$1', [pg.id]);
expect(row.embedding).toBeTruthy();
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/jobs/workers/embed.js
import { embedText, padTo } from '../../ai/ollama.js';
import { pool } from '../../db/pool.js';
import { recordAudit } from '../../db/repos/audit.js';
export const NAME = 'embed.text';
export const opts = { teamSize: 2, teamConcurrency: 2 };
const STRING_BUILDERS = {
page: row => `${row.title}\n\n${row.body_md || ''}`,
ref: row => `${row.title || ''}\n${row.summary || ''}\n${row.body_text || ''}`,
source_doc: row => `${row.name}\n${row.body_text || ''}`,
conversation: row => `${row.title || ''}\n${row.summary || ''}`
};
const TABLE = { page: 'pages', ref: 'refs', source_doc: 'source_docs', conversation: 'conversations' };
export async function handler(job) {
const { entity_type, entity_id } = job.data;
const table = TABLE[entity_type];
if (!table) throw new Error(`unknown entity_type: ${entity_type}`);
const { rows: [row] } = await pool.query(`SELECT * FROM ${table} WHERE id=$1`, [entity_id]);
if (!row) return { skipped: 'gone' };
const text = STRING_BUILDERS[entity_type](row).slice(0, 6_000);
const v = await embedText(text);
const padded = padTo(v, 1024);
const literal = '[' + padded.join(',') + ']';
await pool.query(`UPDATE ${table} SET embedding=$1::vector WHERE id=$2`, [literal, entity_id]);
await recordAudit({ kind: 'worker', id: null }, 'update', entity_type, entity_id, null, { embedding: 'updated' });
return { entity_id };
}
```
- [ ] **Step 4: Register.**
```js
// lib/jobs/index.js
import * as embed from './workers/embed.js';
const WORKERS = [echo, url, blob, embed];
```
- [ ] **Step 5:** Run green.
```bash
npx vitest run tests/jobs/workers/embed.test.js
```
- [ ] **Step 6: Commit.**
```bash
git add lib/jobs/workers/embed.js lib/jobs/index.js tests/jobs/workers/embed.test.js
git commit -m "feat(jobs): embed.text worker"
```
### Task C3: Repo-level triggers
**Files:**
- Create: `lib/jobs/triggers.js`
- Modify: pages.js / refs.js / source_docs.js repos — call `triggerEmbed(...)` after create/update.
- Create: `tests/jobs/triggers.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/triggers.test.js
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../helpers/db.js';
import { migrateUp } from '../../lib/db/migrate.js';
import { stopBoss } from '../helpers/boss.js';
import * as queue from '../../lib/jobs/queue.js';
import { registerWorkers } from '../../lib/jobs/index.js';
import * as spaces from '../../lib/db/repos/spaces.js';
import * as pages from '../../lib/db/repos/pages.js';
import * as jobsRepo from '../../lib/db/repos/jobs.js';
beforeEach(async () => {
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
global.fetch = vi.fn(async () => new Response(
JSON.stringify({ embedding: new Array(768).fill(0.1) }),
{ status: 200, headers: { 'content-type': 'application/json' }}
));
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('triggers', () => {
it('pages.create enqueues embed.text', async () => {
const sp = await spaces.create({ slug: 's', name: 'S' }, { kind: 'user', id: null });
await pages.create({ space_id: sp.id, slug: 'p', title: 'P', body_md: 'b' }, { kind: 'user', id: null });
// give the queue a tick to land the job
await new Promise(r => setTimeout(r, 200));
const rows = await jobsRepo.list({ name: 'embed.text', limit: 5 });
expect(rows.length).toBeGreaterThan(0);
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement triggers.js.**
```js
// lib/jobs/triggers.js
import * as queue from './queue.js';
import { log } from '../log.js';
export async function triggerEmbed(entity_type, entity_id) {
try {
await queue.enqueue('embed.text',
{ entity_type, entity_id },
{ singletonKey: `${entity_type}:${entity_id}` }
);
} catch (e) {
// Queue not running (server tests) — never block the write.
log.debug({ err: e, entity_type, entity_id }, 'triggerEmbed skipped');
}
}
```
- [ ] **Step 4: Wire into pages.js.**
Locate `lib/db/repos/pages.js` and append `await triggerEmbed('page', r.id);` (with import) at the end of `create` and `update`. Do the same in `lib/db/repos/refs.js` for create/update/upsertByExternal, and in `lib/db/repos/source_docs.js` for create/update.
- [ ] **Step 5:** Run green.
```bash
npx vitest run tests/jobs/triggers.test.js
```
- [ ] **Step 6:** Run the full suite — must still be green; triggers are no-op when the queue isn't running.
```bash
npx vitest run
```
- [ ] **Step 7: Commit.**
```bash
git add lib/jobs/triggers.js lib/db/repos/pages.js lib/db/repos/refs.js lib/db/repos/source_docs.js tests/jobs/triggers.test.js
git commit -m "feat(jobs): repo-level embed triggers"
```
### Task C4: Hybrid search with RRF + graceful FTS fallback
**Files:**
- Modify: `lib/db/repos/search.js` (rewrite)
- Modify: `tests/repos/search.test.js` — extend
- Modify: `tests/api/search.test.js` — extend
- [ ] **Step 1: Extend the repo test with vector-fixture rows.**
```js
// append to tests/repos/search.test.js
it('hybrid: vector-only hit (no FTS match) still ranks', async () => {
const sp = await spacesRepo.create({ slug: 'h', name: 'H' }, owner);
const page = await pagesRepo.create({ space_id: sp.id, slug: 'p', title: 'Unrelated', body_md: 'nothing matches FTS' }, owner);
// hand-craft a fixture vector that is close to the query embedding
const v = '[' + new Array(1024).fill(0.5).join(',') + ']';
const { pool } = await import('../../lib/db/pool.js');
await pool.query('UPDATE pages SET embedding=$1::vector WHERE id=$2', [v, page.id]);
// monkey-patch fetch so the query also embeds to the same vector
global.fetch = vi.fn(async () => new Response(
JSON.stringify({ embedding: new Array(768).fill(0.5) }),
{ status: 200, headers: { 'content-type': 'application/json' }}
));
const hits = await search.fts({ q: 'whatever' });
expect(hits.find(h => h.id === page.id)).toBeTruthy();
vi.restoreAllMocks();
});
it('hybrid: Ollama down → FTS-only fallback still returns FTS hits', async () => {
const sp = await spacesRepo.create({ slug: 'd', name: 'D' }, owner);
await pagesRepo.create({ space_id: sp.id, slug: 'p', title: 'blackflame palette', body_md: '' }, owner);
global.fetch = vi.fn(async () => { throw new Error('connect ECONNREFUSED'); });
const hits = await search.fts({ q: 'blackflame' });
expect(hits.length).toBeGreaterThan(0);
vi.restoreAllMocks();
});
```
(Add `import { vi } from 'vitest';` if missing.)
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Rewrite search repo with hybrid + RRF.**
```js
// lib/db/repos/search.js — REPLACE EXISTING
import { pool } from '../pool.js';
import { embedText, padTo, OllamaError } from '../../ai/ollama.js';
const PAGES_TSV = `to_tsvector('english', p.title || ' ' || coalesce(p.body_md,''))`;
const REFS_TSV = `to_tsvector('english', coalesce(r.title,'') || ' ' || coalesce(r.summary,'') || ' ' || coalesce(r.body_text,''))`;
const SD_TSV = `to_tsvector('english', sd.name || ' ' || coalesce(sd.body_text,''))`;
const MSG_TSV = `to_tsvector('english', m.body)`;
// --- FTS branch (unchanged behavior, returns flat rows) ---
function ftsBranches({ kinds, spaceFilterPresent }) {
const want = k => !kinds || kinds.includes(k);
const branches = [];
if (want('page')) branches.push(`
SELECT 'page'::text AS kind, p.id, p.space_id, p.title AS title_or_snippet,
ts_rank(${PAGES_TSV}, q.tsq) AS rank
FROM pages p, q
WHERE ${PAGES_TSV} @@ q.tsq
AND ($2::uuid IS NULL OR p.space_id = $2)`);
if (want('ref')) branches.push(`
SELECT 'ref', r.id, r.space_id,
coalesce(r.title, r.source_url, '(untitled)'),
ts_rank(${REFS_TSV}, q.tsq) FROM refs r, q
WHERE ${REFS_TSV} @@ q.tsq
AND ($2::uuid IS NULL OR r.space_id = $2)`);
if (want('source_doc')) branches.push(`
SELECT 'source_doc', sd.id, res.space_id, sd.name,
ts_rank(${SD_TSV}, q.tsq)
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id, q
WHERE ${SD_TSV} @@ q.tsq
AND ($2::uuid IS NULL OR res.space_id = $2)`);
if (want('message') && !spaceFilterPresent) branches.push(`
SELECT 'message', m.id, NULL::uuid, substring(m.body, 1, 200),
ts_rank(${MSG_TSV}, q.tsq) FROM messages m, q
WHERE ${MSG_TSV} @@ q.tsq`);
return branches;
}
async function ftsRows({ q, space_id, kinds, limit }) {
const branches = ftsBranches({ kinds, spaceFilterPresent: space_id != null });
if (!branches.length) return [];
const sql = `WITH q AS (SELECT plainto_tsquery('english', $1) AS tsq)
SELECT * FROM (${branches.join('\n UNION ALL ')}) u
ORDER BY rank DESC LIMIT $3`;
const { rows } = await pool.query(sql, [q, space_id, limit * 3]);
return rows;
}
// --- Vector branch ---
async function vectorRows({ qvec, space_id, kinds, limit }) {
const want = k => !kinds || kinds.includes(k);
const literal = '[' + qvec.join(',') + ']';
const parts = [];
if (want('page')) parts.push(`
SELECT 'page'::text AS kind, p.id, p.space_id, p.title AS title_or_snippet,
(p.embedding <=> $1::vector) AS dist
FROM pages p
WHERE p.embedding IS NOT NULL
AND ($2::uuid IS NULL OR p.space_id = $2)
ORDER BY p.embedding <=> $1::vector LIMIT $3`);
if (want('ref')) parts.push(`
SELECT 'ref', r.id, r.space_id,
coalesce(r.title, r.source_url, '(untitled)'),
(r.embedding <=> $1::vector)
FROM refs r
WHERE r.embedding IS NOT NULL
AND ($2::uuid IS NULL OR r.space_id = $2)
ORDER BY r.embedding <=> $1::vector LIMIT $3`);
if (want('source_doc')) parts.push(`
SELECT 'source_doc', sd.id, res.space_id, sd.name,
(sd.embedding <=> $1::vector)
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id
WHERE sd.embedding IS NOT NULL
AND ($2::uuid IS NULL OR res.space_id = $2)
ORDER BY sd.embedding <=> $1::vector LIMIT $3`);
// messages not embedded yet (Plan 5 may add); skip in vector branch.
if (!parts.length) return [];
const sql = parts.join(' UNION ALL ');
const all = [];
for (const part of parts) {
const { rows } = await pool.query(part, [literal, space_id, limit * 3]);
all.push(...rows);
}
return all;
}
// RRF fusion. k=60 per Cormack et al.
function fuse(fts, vec, limit, offset) {
const K = 60;
const score = new Map(); // "kind:id" → { kind, id, space_id, title_or_snippet, rrf }
function add(rows, getRank) {
rows.forEach((r, idx) => {
const k = `${r.kind}:${r.id}`;
const cur = score.get(k) || { ...r, rrf: 0 };
cur.rrf += 1 / (K + getRank(idx, r));
score.set(k, cur);
});
}
add(fts, idx => idx + 1);
add(vec, idx => idx + 1);
const sorted = [...score.values()].sort((a, b) => b.rrf - a.rrf);
return sorted.slice(offset, offset + limit).map(({ rrf, ...rest }) => ({ ...rest, rank: rrf }));
}
export async function fts({ q, space_id = null, kinds = null, limit = 50, offset = 0 } = {}) {
if (!q) return [];
const normalizedKinds = Array.isArray(kinds) && kinds.length ? kinds : null;
const ftsP = ftsRows({ q, space_id, kinds: normalizedKinds, limit });
let vec = [];
try {
const raw = await embedText(q, { timeoutMs: 5_000 });
vec = await vectorRows({ qvec: padTo(raw, 1024), space_id, kinds: normalizedKinds, limit });
} catch (e) {
// Ollama down / slow → fall back to FTS-only
}
const ftsResult = await ftsP;
return fuse(ftsResult, vec, limit, offset);
}
export async function hybrid(args) { return fts(args); }
```
- [ ] **Step 4:** Run green.
```bash
npx vitest run tests/repos/search.test.js tests/api/search.test.js
```
- [ ] **Step 5:** Run the full suite — all green.
```bash
npx vitest run
```
- [ ] **Step 6: Commit.**
```bash
git add lib/db/repos/search.js tests/repos/search.test.js tests/api/search.test.js
git commit -m "feat(search): hybrid FTS+vector with RRF + Ollama-down fallback"
```
### Task C5: Integration test (skip if Ollama unreachable)
**Files:**
- Create: `tests/integration/embed_live.test.js`
- [ ] **Step 1: Write the gated test.**
```js
// tests/integration/embed_live.test.js
import { describe, it, expect } from 'vitest';
async function ollamaUp() {
try {
const res = await fetch((process.env.OLLAMA_URL || 'http://192.168.1.185:11434') + '/api/tags',
{ signal: AbortSignal.timeout(2_000) });
return res.ok;
} catch { return false; }
}
describe.runIf(await ollamaUp())('Ollama live embed', () => {
it('returns 768 dims for nomic-embed-text', async () => {
const { embedText } = await import('../../lib/ai/ollama.js');
const v = await embedText('the cradle aesthetic');
expect(v).toHaveLength(768);
});
});
```
- [ ] **Step 2:** Run — passes locally if CT 102 is up; auto-skips otherwise.
```bash
npx vitest run tests/integration/embed_live.test.js
```
- [ ] **Step 3: Commit.**
```bash
git add tests/integration/embed_live.test.js
git commit -m "test(ai): live ollama embed integration (gated)"
```
### Task C6: Phase C close — snapshot + memory checkpoint
- [ ] **Step 1:** Run full suite green.
- [ ] **Step 2:** Snapshot CT 310 + 311 (standing backup rule before the Phase C deploy lands):
```bash
ssh root@192.168.1.124 "pct snapshot 310 plan3_phase_c_$(date +%Y%m%d_%H%M) --description 'Plan 3 Phase C green'"
ssh root@192.168.1.124 "pct snapshot 311 plan3_phase_c_$(date +%Y%m%d_%H%M) --description 'Plan 3 Phase C green'"
```
- [ ] **Step 3:** Update memory: "Plan 3 Phase C complete: embed.text + hybrid search live".
---
## Phase D — Karakeep webhook + drag-drop UI + Jobs UI
### Task D1: Karakeep client
**Files:**
- Create: `lib/karakeep/client.js`
- Create: `tests/karakeep/client.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/karakeep/client.test.js
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { getBookmark } from '../../lib/karakeep/client.js';
beforeEach(() => {
global.fetch = vi.fn(async (url) => new Response(
JSON.stringify({ id: 'b-1', url: 'https://example.com', title: 'X', tags: [{ name: 'archive' }] }),
{ status: 200, headers: { 'content-type': 'application/json' }}
));
});
afterEach(() => vi.restoreAllMocks());
describe('karakeep.client', () => {
it('fetches a bookmark by id', async () => {
process.env.KARAKEEP_API_URL = 'https://karakeep.test';
process.env.KARAKEEP_API_TOKEN = 'tok';
const bm = await getBookmark('b-1');
expect(bm.url).toBe('https://example.com');
expect(global.fetch.mock.calls[0][1].headers.Authorization).toBe('Bearer tok');
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/karakeep/client.js
export async function getBookmark(id) {
const base = process.env.KARAKEEP_API_URL || 'https://karakeep.hynesy.com';
const tok = process.env.KARAKEEP_API_TOKEN || '';
const res = await fetch(`${base}/api/v1/bookmarks/${encodeURIComponent(id)}`, {
headers: { Authorization: 'Bearer ' + tok },
signal: AbortSignal.timeout(10_000)
});
if (res.status === 404) return null;
if (!res.ok) throw new Error(`karakeep ${res.status}`);
return res.json();
}
```
- [ ] **Step 4:** Run green.
- [ ] **Step 5: Commit.**
```bash
git add lib/karakeep/client.js tests/karakeep/client.test.js
git commit -m "feat(karakeep): thin bookmark fetch client"
```
### Task D2: `ingest.karakeep` worker
**Files:**
- Create: `lib/jobs/workers/karakeep.js`
- Modify: `lib/jobs/index.js`
- Create: `tests/jobs/workers/karakeep.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/jobs/workers/karakeep.test.js
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../../helpers/db.js';
import { migrateUp } from '../../../lib/db/migrate.js';
import { stopBoss } from '../../helpers/boss.js';
import * as queue from '../../../lib/jobs/queue.js';
import { registerWorkers } from '../../../lib/jobs/index.js';
import * as spaces from '../../../lib/db/repos/spaces.js';
import * as refs from '../../../lib/db/repos/refs.js';
beforeEach(async () => {
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
});
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
describe('ingest.karakeep worker', () => {
it('fetches bookmark and creates ref', async () => {
const sp = await spaces.create({ slug: 'k', name: 'K' }, { kind: 'user', id: null });
// first response is karakeep API, second is URL HTML fetch
const responses = [
new Response(JSON.stringify({ id: 'b-1', url: 'https://example.com/a', title: 'A', tags: [] }),
{ status: 200, headers: { 'content-type': 'application/json' }}),
new Response('<html><head><title>A</title></head><body><article><p>x</p></article></body></html>',
{ status: 200, headers: { 'content-type': 'text/html' }})
];
global.fetch = vi.fn(async () => responses.shift());
const id = await queue.enqueue('ingest.karakeep', { bookmark_id: 'b-1', space_id: sp.id });
for (let i = 0; i < 50; i++) {
const j = await queue.instance().getJobById(id);
if (j?.state === 'completed') break;
await new Promise(r => setTimeout(r, 100));
}
const rows = await refs.list({ space_id: sp.id });
expect(rows[0].source_kind).toBe('karakeep');
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement.**
```js
// lib/jobs/workers/karakeep.js
import crypto from 'node:crypto';
import { getBookmark } from '../../karakeep/client.js';
import { extract } from '../../ingest/readability.js';
import * as refs from '../../db/repos/refs.js';
import { pool } from '../../db/pool.js';
export const NAME = 'ingest.karakeep';
export const opts = { teamSize: 4, teamConcurrency: 4 };
function key(space_id, bookmark_id) {
return crypto.createHash('sha256')
.update(space_id + '\x00karakeep:' + bookmark_id).digest('hex');
}
export async function handler(job) {
const { bookmark_id, space_id } = job.data;
const bm = await getBookmark(bookmark_id);
if (!bm) return { skipped: 'gone' };
const idem = key(space_id, bookmark_id);
const { rows: [existing] } = await pool.query(
`SELECT id FROM refs WHERE source_kind='karakeep' AND external_id=$1 LIMIT 1`,
[idem]
);
if (existing) return { ref_id: existing.id, idempotent: true };
const html = bm.html_content || (await (await fetch(bm.url, {
headers: { 'User-Agent': 'void-ingest/2.0' }, signal: AbortSignal.timeout(15_000)
})).text());
const parsed = extract(html || '', bm.url);
const row = await refs.create({
space_id,
kind: 'url',
source_url: bm.url,
title: bm.title || parsed.title || bm.url,
summary: parsed.excerpt,
body_text: (parsed.textContent || '').slice(0, 200_000),
source_kind: 'karakeep',
external_id: idem,
metadata: { karakeep_id: bookmark_id, tags: (bm.tags || []).map(t => t.name) }
}, { kind: 'system', id: null });
return { ref_id: row.id };
}
```
- [ ] **Step 4: Register.**
```js
import * as karakeep from './workers/karakeep.js';
const WORKERS = [echo, url, blob, embed, karakeep];
```
- [ ] **Step 5:** Run green.
- [ ] **Step 6: Commit.**
```bash
git add lib/jobs/workers/karakeep.js lib/jobs/index.js tests/jobs/workers/karakeep.test.js
git commit -m "feat(jobs): ingest.karakeep worker"
```
### Task D3: `/api/ingest/karakeep` HMAC webhook
**Files:**
- Create: `lib/api/routes/ingest.js`
- Modify: `lib/api/index.js`
- Create: `tests/api/ingest.test.js`
- [ ] **Step 1: Write the failing test.**
```js
// tests/api/ingest.test.js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import crypto from 'node:crypto';
import request from 'supertest';
import { setup } from './helpers.js';
import { stopBoss } from '../helpers/boss.js';
import * as queue from '../../lib/jobs/queue.js';
import { registerWorkers } from '../../lib/jobs/index.js';
import * as spaces from '../../lib/db/repos/spaces.js';
let app, sp;
const SECRET = 'test-karakeep-secret';
beforeEach(async () => {
({ app } = await setup());
process.env.KARAKEEP_WEBHOOK_SECRET = SECRET;
sp = await spaces.create({ slug: 'kw', name: 'KW' }, { kind: 'user', id: null });
process.env.KARAKEEP_DEFAULT_SPACE_ID = sp.id;
await queue.start(); await registerWorkers();
});
afterEach(async () => { await stopBoss(); });
function sign(body) {
return 'sha256=' + crypto.createHmac('sha256', SECRET).update(body).digest('hex');
}
describe('karakeep webhook', () => {
it('enqueues on valid signature', async () => {
const body = JSON.stringify({ event: 'bookmark.created', bookmark_id: 'b-1' });
const res = await request(app).post('/api/ingest/karakeep')
.set('X-Karakeep-Signature', sign(body))
.set('Content-Type', 'application/json')
.send(body);
expect(res.status).toBe(202);
expect(res.body.job_id).toBeTruthy();
});
it('401 on bad signature', async () => {
const res = await request(app).post('/api/ingest/karakeep')
.set('X-Karakeep-Signature', 'sha256=wrong')
.set('Content-Type', 'application/json')
.send('{"event":"bookmark.created","bookmark_id":"b-1"}');
expect(res.status).toBe(401);
});
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3: Implement the route with raw body capture.**
```js
// lib/api/routes/ingest.js
import { Router, raw } from 'express';
import crypto from 'node:crypto';
import * as queue from '../../jobs/queue.js';
import { asyncWrap } from '../errors.js';
function verify(rawBody, headerSig) {
const secret = process.env.KARAKEEP_WEBHOOK_SECRET || '';
if (!headerSig || !secret) return false;
const expected = 'sha256=' +
crypto.createHmac('sha256', secret).update(rawBody).digest('hex');
try { return crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(headerSig)); }
catch { return false; }
}
export const router = Router();
// Karakeep route bypasses agentOrOwner — HMAC is its own auth.
router.post('/karakeep',
raw({ type: '*/*', limit: '1mb' }),
asyncWrap(async (req, res) => {
const sig = req.headers['x-karakeep-signature'];
if (!verify(req.body, sig)) return res.status(401).json({ error: { code: 'unauthorized', message: 'bad signature' } });
const payload = JSON.parse(req.body.toString('utf-8'));
if (payload.event !== 'bookmark.created') return res.status(202).json({ skipped: true });
const space_id = process.env.KARAKEEP_DEFAULT_SPACE_ID;
if (!space_id) return res.status(503).json({ error: { code: 'unconfigured', message: 'KARAKEEP_DEFAULT_SPACE_ID not set' } });
const job_id = await queue.enqueue('ingest.karakeep', {
bookmark_id: payload.bookmark_id, space_id
});
res.status(202).json({ job_id });
})
);
```
- [ ] **Step 4: Mount before the JSON body parser AND before agentOrOwner.**
Two changes:
1. `server.js` — capture the raw body alongside the parsed body so HMAC verify still works:
```js
app.use(express.json({
limit: '10mb',
verify: (req, _res, buf) => { req.rawBody = buf; }
}));
```
2. `lib/api/index.js` — mount the ingest router on `app` ahead of `mountApi`, OR inside `mountApi` before the `agentOrOwner` middleware. Either works; pick the former so it's obviously skipped from the owner gate:
```js
// server.js — additions
import { router as ingestRouter } from './lib/api/routes/ingest.js';
// after app.use(express.json...):
app.use('/api/ingest', ingestRouter);
// THEN: mountApi(app);
```
And inside `lib/api/routes/ingest.js`, drop the per-route `raw()` middleware — use `req.rawBody` populated by the verify hook instead:
```js
// lib/api/routes/ingest.js — replace the route body
import { Router } from 'express';
import crypto from 'node:crypto';
import * as queue from '../../jobs/queue.js';
import { asyncWrap } from '../errors.js';
function verify(rawBody, headerSig) {
const secret = process.env.KARAKEEP_WEBHOOK_SECRET || '';
if (!headerSig || !secret || !rawBody) return false;
const expected = 'sha256=' +
crypto.createHmac('sha256', secret).update(rawBody).digest('hex');
try { return crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(headerSig)); }
catch { return false; }
}
export const router = Router();
router.post('/karakeep', asyncWrap(async (req, res) => {
const sig = req.headers['x-karakeep-signature'];
if (!verify(req.rawBody, sig)) {
return res.status(401).json({ error: { code: 'unauthorized', message: 'bad signature' } });
}
const payload = req.body;
if (payload.event !== 'bookmark.created') return res.status(202).json({ skipped: true });
const space_id = process.env.KARAKEEP_DEFAULT_SPACE_ID;
if (!space_id) return res.status(503).json({ error: { code: 'unconfigured', message: 'KARAKEEP_DEFAULT_SPACE_ID not set' } });
const job_id = await queue.enqueue('ingest.karakeep', {
bookmark_id: payload.bookmark_id, space_id
});
res.status(202).json({ job_id });
}));
```
- [ ] **Step 5:** Run green.
- [ ] **Step 6: Commit.**
```bash
git add lib/api/routes/ingest.js lib/api/index.js tests/api/ingest.test.js
git commit -m "feat(api): karakeep webhook (HMAC)"
```
### Task D4: Drag-drop SPA component
**Files:**
- Create: `public/components/dropzone.js`
- Modify: `public/app.js` — initialize dropzone on `<main>`.
- [ ] **Step 1: Implement the component.**
```js
// public/components/dropzone.js
import { api } from '../api.js';
import { el, mount } from '../dom.js';
export function attachDropzone(target) {
function highlight(on) { target.style.outline = on ? '2px dashed var(--accent)' : ''; }
target.addEventListener('dragover', e => { e.preventDefault(); highlight(true); });
target.addEventListener('dragleave', () => highlight(false));
target.addEventListener('drop', async e => {
e.preventDefault(); highlight(false);
const files = [...e.dataTransfer.files];
const space_id = localStorage.getItem('last_space_id');
if (!space_id) { alert('Open a space first so we know where to drop these.'); return; }
for (const f of files) {
const fd = new FormData();
fd.append('file', f);
fd.append('space_id', space_id);
await fetch('/api/capture/upload', {
method: 'POST',
headers: { Authorization: 'Bearer ' + localStorage.getItem('void_token') },
body: fd
});
}
alert(`${files.length} file(s) queued.`);
});
}
```
- [ ] **Step 2: Wire into app.js.**
```js
// additions
import { attachDropzone } from './components/dropzone.js';
// inside init() after renderRightrail(...):
attachDropzone(document.getElementById('main'));
```
- [ ] **Step 3:** Run server tests — still green.
```bash
npx vitest run tests/server.test.js
```
- [ ] **Step 4: Commit.**
```bash
git add public/components/dropzone.js public/app.js
git commit -m "feat(ui): drag-drop capture"
```
### Task D5: Expand Jobs UI
**Files:**
- Modify: `public/views/jobs.js`
- [ ] **Step 1: Replace with the table view.**
```js
// public/views/jobs.js — REPLACE
import { api } from '../api.js';
import { el, mount, clear } from '../dom.js';
function badge(state) {
const cls = state === 'completed' ? 'ok' : state === 'failed' ? 'bad'
: state === 'active' ? 'warn' : 'idle';
return el('span', { class: 'status ' + cls }, state);
}
function row(j, onActed) {
return el('li', {},
badge(j.state), ' ',
el('span', { style: { fontFamily: 'var(--font-mono)' } }, j.name), ' ',
el('span', { class: 'muted' }, (j.id || '').slice(0, 8)), ' ',
el('button', {
class: 'ghost',
onclick: async () => { await api.post(`/api/jobs/${j.id}/retry`); onActed(); }
}, 'retry'),
' ',
el('button', {
class: 'ghost',
onclick: async () => { await api.del(`/api/jobs/${j.id}`); onActed(); }
}, 'delete')
);
}
async function refresh(container) {
const rows = await api.get('/api/jobs?limit=100');
clear(container);
if (!rows.length) { container.appendChild(el('p', { class: 'muted' }, 'No jobs.')); return; }
const byState = new Map();
for (const r of rows) {
if (!byState.has(r.state)) byState.set(r.state, []);
byState.get(r.state).push(r);
}
for (const [state, items] of byState) {
container.appendChild(el('div', { class: 'sb-title' }, `${state} (${items.length})`));
container.appendChild(el('ul', { class: 'plain' }, items.map(j => row(j, () => refresh(container)))));
}
}
export async function render(main) {
const wrap = el('div');
mount(main,
el('h1', { class: 'view-h1' }, 'Jobs'),
el('p', { class: 'view-sub muted' }, 'pg-boss queue — polls every 10 s.'),
wrap
);
await refresh(wrap);
const handle = setInterval(() => refresh(wrap), 10_000);
// Stop polling when the view changes.
window.addEventListener('hashchange', () => clearInterval(handle), { once: true });
}
```
- [ ] **Step 2:** Run server tests.
```bash
npx vitest run tests/server.test.js
```
- [ ] **Step 3: Commit.**
```bash
git add public/views/jobs.js
git commit -m "feat(ui): Jobs panel with retry/delete + polling"
```
### Task D6: Phase D close — version bump, changelog, full suite
**Files:**
- Modify: `package.json`, `server.js`, `CHANGELOG.md`, `tests/server.test.js`
- [ ] **Step 1: Bump version.**
`package.json`: `"version": "2.0.0-alpha.3"`.
`server.js`: `const VERSION = '2.0.0-alpha.3';`.
`tests/server.test.js`: assertion to `'2.0.0-alpha.3'`.
- [ ] **Step 2: Append CHANGELOG entry.**
```md
## [2.0.0-alpha.3] — 2026-06-NN
### Added (Plan 3: Capture pipeline + hybrid search)
- pg-boss-backed job queue inside void-server. Owner-only /api/jobs.
- /api/capture POST (URL) + /api/capture/upload (multipart) enqueue ingest jobs.
- ingest.url worker: fetch + readability extract → refs row, idempotent by sha256(space+url).
- ingest.blob worker: sha256 + content-addressed blob store at /var/lib/void/blobs/.
- embed.text worker: Ollama nomic-embed-text (768 dims) zero-padded to 1024.
- Repo-level triggers fire embed.text after page / ref / source_doc create/update; singleton key coalesces rapid edits.
- Hybrid /api/search: FTS + vector via RRF (k=60). Graceful fallback to FTS-only when Ollama is down.
- Karakeep webhook /api/ingest/karakeep with HMAC verification.
- Drag-drop upload onto the main panel.
- Jobs view with state grouping, retry, delete, 10 s polling.
```
- [ ] **Step 3: Full suite green.**
```bash
npx vitest run
```
- [ ] **Step 4: Commit.**
```bash
git add package.json server.js CHANGELOG.md tests/server.test.js
git commit -m "chore: version 2.0.0-alpha.3 + changelog"
```
### Task D7: Plan 3 close — snapshot + completion doc + memory
- [ ] **Step 1: Snapshot CT 310 + 311** with name `plan3_complete_<timestamp>`.
- [ ] **Step 2:** Write `docs/plan-3-complete.md` mirroring `plan-2-complete.md`'s shape.
- [ ] **Step 3: Commit the completion doc.**
```bash
git add docs/plan-3-complete.md
git commit -m "docs: Plan 3 completion summary"
```
- [ ] **Step 4: Update memory** to mark Plan 3 complete, set Plan 4 (`void-workers` Python) as next.
---
## Spec coverage check
Every spec section maps to at least one task:
- pg-boss bootstrap + Jobs API → Phase A (A1A7).
- /api/capture + URL worker + blob storage → Phase B (B1B6).
- Embeddings + Ollama + RRF hybrid search → Phase C (C1C5).
- Karakeep webhook + drag-drop + Jobs UI fill-in → Phase D (D1D5).
- Version bump + changelog + completion doc → D6D7.
- Standing backup rule honored: snapshots at end of Phase C and Plan 3.
## Type & name consistency
- Worker name strings used: `echo`, `ingest.url`, `ingest.blob`, `ingest.karakeep`, `embed.text` — same strings in `NAME`, in registration, in tests, and in `triggerEmbed` callers.
- Idempotency keys: `sha256(space_id + '\x00' + url)` for URL ingest and `sha256(space_id + '\x00karakeep:' + bookmark_id)` for Karakeep. Stored as `refs.external_id` with `source_kind` set to `'url'` or `'karakeep'`.
- Response shape from `/api/capture`: `{ job_id, idempotency_key, ref_id? }` — consistent across tasks B6 and A6 (`ref_id` only set when idempotent hit).
- `triggerEmbed(entity_type, entity_id)` signature is used identically in pages, refs, and source_docs repos.