All Void 2.0 superpowers specs and implementation plans now live at
docs/superpowers/{specs,plans}/ inside the repo. Previously they were
at /project/docs/superpowers/ which was not under git.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2431 lines
79 KiB
Markdown
2431 lines
79 KiB
Markdown
# Void 2.0 — Plan 3: Capture pipeline + hybrid search
|
||
|
||
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
||
|
||
**Goal:** Wire the Plan 2 SPA's stub Capture button to a real ingest pipeline. Add a pg-boss-backed job queue, three capture entry points (URL POST + Karakeep webhook + drag-drop attachment), workers that turn URLs and blobs into `refs`, an embeddings worker that fills the existing `embedding` columns via Ollama, and a hybrid FTS+vector search with RRF that replaces the Plan 2 FTS-only `/api/search`.
|
||
|
||
**Architecture:** Single-process void-server stays in Node. pg-boss runs as an embedded client inside the same Node process — its tables live in the shared void2-db alongside Void's tables. Workers register as in-process pollers via `pg-boss.work()`. Embeddings call Ollama at CT 102 over HTTP; on failure the search query gracefully degrades to FTS-only.
|
||
|
||
**Tech Stack:** Express 5, pg-boss 10, `@mozilla/readability` + `jsdom` for URL extraction, `multer` for upload streaming, native `fetch` for Ollama, vitest, supertest, vanilla DOM (via `dom.js`) for SPA additions.
|
||
|
||
**Spec:** `docs/superpowers/specs/2026-06-01-void-v2-plan3-capture.md` — read it first; this plan inherits every decision documented there.
|
||
|
||
---
|
||
|
||
## Out of scope (deferred)
|
||
|
||
- Whisper transcription, Tesseract OCR, yt-dlp video ingestion, scanned-PDF OCR — Plan 4 (Python `void-workers` service).
|
||
- AI Space/Project suggestion on capture — capture takes explicit `space_id`.
|
||
- Embedding chunks table — Plan 3 ships whole-doc embedding per entity row.
|
||
- MCP server — Plan 5+.
|
||
|
||
## Conventions (apply to every task)
|
||
|
||
1. **TDD.** Write the failing test first. Run it red. Implement. Run it green. Commit. Match the Plan 2 cadence.
|
||
2. **No raw SQL in routes** — repos only.
|
||
3. **Mutations pass `req.actor`** to the repo.
|
||
4. **Throw `NotFoundError` / `ValidationError` / `ForbiddenError`** — the existing error middleware handles them.
|
||
5. **Status codes**: 201 create, 200 read/update, 202 enqueued, 204 delete, 400 validation, 401 unauth, 403 capability deny, 404 not found, 409 conflict.
|
||
6. **Workers** use a single shape: `async function handler(job) { ... }`. Throw on retryable failure, log + return on permanent failure.
|
||
7. **Commit per task** with the message at the end of that task.
|
||
8. **Test isolation:** the existing `resetDb` + `migrateUp` helpers stay in use. For pg-boss tests, add a `stopBoss()` helper that drops the `pgboss` schema between suites.
|
||
|
||
---
|
||
|
||
## File structure (what gets created across Plan 3)
|
||
|
||
```
|
||
lib/
|
||
jobs/
|
||
queue.js # pg-boss singleton: start, stop, enqueue, work
|
||
index.js # registers all worker handlers; called from server.js
|
||
triggers.js # repo-level "after write → enqueue embed.text"
|
||
workers/
|
||
echo.js # trivial harness verification
|
||
url.js # ingest.url
|
||
blob.js # ingest.blob
|
||
karakeep.js # ingest.karakeep
|
||
embed.js # embed.text
|
||
ingest/
|
||
readability.js # @mozilla/readability wrapper
|
||
blob_store.js # sha256 + content-addressed path resolution
|
||
ai/
|
||
ollama.js # thin embed-text wrapper
|
||
karakeep/
|
||
client.js # thin GET /api/v1/bookmarks/:id wrapper
|
||
api/routes/
|
||
jobs.js # GET /api/jobs etc.
|
||
capture.js # POST /api/capture + POST /api/capture/upload
|
||
ingest.js # POST /api/ingest/karakeep (HMAC-verified)
|
||
db/repos/
|
||
jobs.js # thin SELECTs over pg-boss tables (typed views)
|
||
search.js # REWRITTEN: hybrid FTS+vector RRF
|
||
|
||
public/
|
||
views/jobs.js # SPA Jobs panel
|
||
components/dropzone.js # drag-drop wrapper for /capture/upload
|
||
|
||
tests/
|
||
jobs/queue.test.js
|
||
jobs/workers/echo.test.js
|
||
jobs/workers/url.test.js
|
||
jobs/workers/blob.test.js
|
||
jobs/workers/embed.test.js
|
||
jobs/workers/karakeep.test.js
|
||
jobs/triggers.test.js
|
||
ai/ollama.test.js
|
||
ingest/readability.test.js
|
||
ingest/blob_store.test.js
|
||
karakeep/client.test.js
|
||
api/jobs.test.js
|
||
api/capture.test.js
|
||
api/ingest.test.js
|
||
api/search.test.js # extended
|
||
repos/search.test.js # extended
|
||
integration/embed_live.test.js # skip if Ollama unreachable
|
||
helpers/boss.js # new: stopBoss() + waitForJob() helpers
|
||
```
|
||
|
||
---
|
||
|
||
## Phase A — Queue harness + Jobs API
|
||
|
||
### Task A1: Add pg-boss dependency
|
||
|
||
**Files:**
|
||
- Modify: `package.json` — add `"pg-boss": "^10.3.2"`.
|
||
|
||
- [ ] **Step 1:** `cd /project/src/void-v2 && npm i pg-boss@^10`
|
||
- [ ] **Step 2:** Verify the dep landed:
|
||
```bash
|
||
grep '"pg-boss"' package.json
|
||
```
|
||
Expected: `"pg-boss": "^10.x.x"`
|
||
- [ ] **Step 3:** Run the existing test suite — must still pass.
|
||
```bash
|
||
npx vitest run
|
||
```
|
||
Expected: 185 tests pass.
|
||
- [ ] **Step 4:** Commit.
|
||
```bash
|
||
git add package.json package-lock.json
|
||
git commit -m "chore(deps): add pg-boss ^10"
|
||
```
|
||
|
||
### Task A2: Create `lib/jobs/queue.js` singleton client
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/queue.js`
|
||
- Create: `tests/helpers/boss.js`
|
||
- Create: `tests/jobs/queue.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/queue.test.js
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import { resetDb } from '../helpers/db.js';
|
||
import { migrateUp } from '../../lib/db/migrate.js';
|
||
import { stopBoss } from '../helpers/boss.js';
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
|
||
beforeEach(async () => { await resetDb(); await migrateUp(); });
|
||
afterEach(async () => { await stopBoss(); });
|
||
|
||
describe('jobs/queue', () => {
|
||
it('starts, enqueues, and a worker receives the job', async () => {
|
||
await queue.start();
|
||
const received = new Promise(resolve => {
|
||
queue.subscribe('echo', async job => { resolve(job.data); });
|
||
});
|
||
const jobId = await queue.enqueue('echo', { hello: 'void' });
|
||
expect(jobId).toBeTruthy();
|
||
const data = await received;
|
||
expect(data).toEqual({ hello: 'void' });
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Create the helper:
|
||
|
||
```js
|
||
// tests/helpers/boss.js
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
import { pool } from '../../lib/db/pool.js';
|
||
|
||
export async function stopBoss() {
|
||
try { await queue.stop(); } catch { /* ignore */ }
|
||
await pool.query('DROP SCHEMA IF EXISTS pgboss CASCADE');
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 3: Run the test red.**
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/queue.test.js
|
||
```
|
||
Expected: FAIL — `lib/jobs/queue.js` does not exist.
|
||
|
||
- [ ] **Step 4: Implement `queue.js`.**
|
||
|
||
```js
|
||
// lib/jobs/queue.js
|
||
import PgBoss from 'pg-boss';
|
||
import { log } from '../log.js';
|
||
|
||
let boss = null;
|
||
|
||
export async function start() {
|
||
if (boss) return boss;
|
||
boss = new PgBoss({
|
||
connectionString: process.env.DATABASE_URL,
|
||
newJobCheckIntervalSeconds: 2,
|
||
archiveCompletedAfterSeconds: 86_400,
|
||
deleteAfterDays: 7
|
||
});
|
||
boss.on('error', err => log.error({ err }, 'pg-boss error'));
|
||
await boss.start();
|
||
return boss;
|
||
}
|
||
|
||
export async function stop() {
|
||
if (!boss) return;
|
||
await boss.stop({ graceful: true, timeout: 5_000 });
|
||
boss = null;
|
||
}
|
||
|
||
export async function enqueue(name, data, opts = {}) {
|
||
if (!boss) throw new Error('queue not started');
|
||
return await boss.send(name, data, opts);
|
||
}
|
||
|
||
export async function subscribe(name, handler, opts = {}) {
|
||
if (!boss) throw new Error('queue not started');
|
||
return await boss.work(name, opts, async ([job]) => handler(job));
|
||
}
|
||
|
||
export function instance() { return boss; }
|
||
```
|
||
|
||
- [ ] **Step 5: Run the test green.**
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/queue.test.js
|
||
```
|
||
Expected: 1 passed.
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/queue.js tests/helpers/boss.js tests/jobs/queue.test.js
|
||
git commit -m "feat(jobs): pg-boss singleton client"
|
||
```
|
||
|
||
### Task A3: Trivial `echo` worker + bootstrap registration
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/workers/echo.js`
|
||
- Create: `lib/jobs/index.js`
|
||
- Modify: `server.js` — call `jobs.start()` on boot, `jobs.stop()` on SIGTERM.
|
||
- Create: `tests/jobs/workers/echo.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/workers/echo.test.js
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import { resetDb } from '../../helpers/db.js';
|
||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||
import { stopBoss } from '../../helpers/boss.js';
|
||
import * as queue from '../../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||
|
||
beforeEach(async () => { await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); });
|
||
afterEach(async () => { await stopBoss(); });
|
||
|
||
describe('echo worker', () => {
|
||
it('completes successfully', async () => {
|
||
const id = await queue.enqueue('echo', { ping: 1 });
|
||
const boss = queue.instance();
|
||
// poll the job state up to 5s
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await boss.getJobById(id);
|
||
if (j?.state === 'completed') { expect(j.output).toEqual({ pong: 1 }); return; }
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
throw new Error('job did not complete');
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run red.**
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/workers/echo.test.js
|
||
```
|
||
Expected: FAIL — `lib/jobs/index.js` not found.
|
||
|
||
- [ ] **Step 3: Implement the echo worker.**
|
||
|
||
```js
|
||
// lib/jobs/workers/echo.js
|
||
export const NAME = 'echo';
|
||
|
||
export async function handler(job) {
|
||
return { pong: job.data?.ping ?? 0 };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Implement the registrar.**
|
||
|
||
```js
|
||
// lib/jobs/index.js
|
||
import * as queue from './queue.js';
|
||
import * as echo from './workers/echo.js';
|
||
|
||
const WORKERS = [echo];
|
||
|
||
export async function registerWorkers() {
|
||
for (const w of WORKERS) {
|
||
await queue.subscribe(w.NAME, w.handler, w.opts || {});
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 5: Wire into server.js — at the CLI block, NOT inside createApp().**
|
||
|
||
```js
|
||
// server.js — additions
|
||
import * as queue from './lib/jobs/queue.js';
|
||
import { registerWorkers } from './lib/jobs/index.js';
|
||
|
||
// Replace the existing `if (import.meta.url === ...)` block:
|
||
if (import.meta.url === `file://${process.argv[1]}`) {
|
||
const port = process.env.PORT || 3000;
|
||
const app = createApp();
|
||
queue.start()
|
||
.then(registerWorkers)
|
||
.catch(err => log.error({ err }, 'queue boot failed'));
|
||
app.listen(port, () => log.info({ port }, 'void-server listening'));
|
||
}
|
||
```
|
||
|
||
Tests construct the app via `createApp()` and manage the queue themselves through `queue.start()` / `stopBoss()`. Production path boots the queue once on startup.
|
||
|
||
- [ ] **Step 6: Run green.**
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/workers/echo.test.js
|
||
```
|
||
Expected: 1 passed.
|
||
|
||
- [ ] **Step 7: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/workers/echo.js lib/jobs/index.js server.js tests/jobs/workers/echo.test.js
|
||
git commit -m "feat(jobs): echo worker + bootstrap registration"
|
||
```
|
||
|
||
### Task A4: `lib/db/repos/jobs.js` thin SELECTs over pg-boss
|
||
|
||
**Files:**
|
||
- Create: `lib/db/repos/jobs.js`
|
||
- Create: `tests/repos/jobs.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/repos/jobs.test.js
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import { resetDb } from '../helpers/db.js';
|
||
import { migrateUp } from '../../lib/db/migrate.js';
|
||
import { stopBoss } from '../helpers/boss.js';
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../lib/jobs/index.js';
|
||
import * as jobs from '../../lib/db/repos/jobs.js';
|
||
|
||
beforeEach(async () => { await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); });
|
||
afterEach(async () => { await stopBoss(); });
|
||
|
||
describe('jobs repo', () => {
|
||
it('list returns recent jobs across states', async () => {
|
||
const id = await queue.enqueue('echo', { ping: 1 });
|
||
// wait for completion
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await queue.instance().getJobById(id);
|
||
if (j?.state === 'completed') break;
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
const rows = await jobs.list({ limit: 10 });
|
||
expect(rows.find(r => r.id === id)).toBeTruthy();
|
||
});
|
||
it('getById returns null on unknown id', async () => {
|
||
expect(await jobs.getById('00000000-0000-0000-0000-000000000000')).toBeNull();
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run red.**
|
||
|
||
```bash
|
||
npx vitest run tests/repos/jobs.test.js
|
||
```
|
||
Expected: FAIL — repo not found.
|
||
|
||
- [ ] **Step 3: Implement the repo.**
|
||
|
||
```js
|
||
// lib/db/repos/jobs.js
|
||
import { pool } from '../pool.js';
|
||
|
||
// pg-boss v10 stores jobs in pgboss.job (current) and pgboss.archive (finished).
|
||
// We expose a unified "list" that unions both, sorted by created.
|
||
export async function list({ state, name, limit = 50 } = {}) {
|
||
const where = [];
|
||
const args = [];
|
||
let i = 1;
|
||
if (state) { where.push(`state=$${i++}`); args.push(state); }
|
||
if (name) { where.push(`name=$${i++}`); args.push(name); }
|
||
args.push(limit);
|
||
const w = where.length ? `WHERE ${where.join(' AND ')}` : '';
|
||
const sql = `
|
||
SELECT id, name, state, data, output, retrycount, createdon, startedon, completedon
|
||
FROM (
|
||
SELECT id, name, state, data, output, retrycount, created_on AS createdon, started_on AS startedon, completed_on AS completedon
|
||
FROM pgboss.job
|
||
UNION ALL
|
||
SELECT id, name, state, data, output, retrycount, created_on AS createdon, started_on AS startedon, completed_on AS completedon
|
||
FROM pgboss.archive
|
||
) u ${w}
|
||
ORDER BY createdon DESC
|
||
LIMIT $${i}
|
||
`;
|
||
const { rows } = await pool.query(sql, args);
|
||
return rows;
|
||
}
|
||
|
||
export async function getById(id) {
|
||
const sql = `
|
||
SELECT id, name, state, data, output, retrycount, created_on, started_on, completed_on
|
||
FROM pgboss.job WHERE id=$1
|
||
UNION ALL
|
||
SELECT id, name, state, data, output, retrycount, created_on, started_on, completed_on
|
||
FROM pgboss.archive WHERE id=$1
|
||
LIMIT 1
|
||
`;
|
||
const { rows: [r] } = await pool.query(sql, [id]);
|
||
return r ?? null;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Run green.**
|
||
|
||
```bash
|
||
npx vitest run tests/repos/jobs.test.js
|
||
```
|
||
Expected: 2 passed.
|
||
|
||
- [ ] **Step 5: Commit.**
|
||
|
||
```bash
|
||
git add lib/db/repos/jobs.js tests/repos/jobs.test.js
|
||
git commit -m "feat(jobs): jobs repo (list + getById)"
|
||
```
|
||
|
||
### Task A5: `/api/jobs` routes
|
||
|
||
**Files:**
|
||
- Create: `lib/api/routes/jobs.js`
|
||
- Modify: `lib/api/index.js` — mount `/jobs`.
|
||
- Create: `tests/api/jobs.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/api/jobs.test.js
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import request from 'supertest';
|
||
import { setup } from './helpers.js';
|
||
import { stopBoss } from '../helpers/boss.js';
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../lib/jobs/index.js';
|
||
|
||
let app, ownerHeaders;
|
||
beforeEach(async () => {
|
||
({ app, ownerHeaders } = await setup());
|
||
await queue.start();
|
||
await registerWorkers();
|
||
});
|
||
afterEach(async () => { await stopBoss(); });
|
||
|
||
describe('jobs api', () => {
|
||
it('GET /api/jobs returns recent jobs', async () => {
|
||
const id = await queue.enqueue('echo', { ping: 7 });
|
||
// wait for completion
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await queue.instance().getJobById(id);
|
||
if (j?.state === 'completed') break;
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
const res = await request(app).get('/api/jobs?limit=5').set(ownerHeaders);
|
||
expect(res.status).toBe(200);
|
||
expect(res.body.find(r => r.id === id)).toBeTruthy();
|
||
});
|
||
it('GET /api/jobs/:id 404 on unknown', async () => {
|
||
const res = await request(app)
|
||
.get('/api/jobs/00000000-0000-0000-0000-000000000000')
|
||
.set(ownerHeaders);
|
||
expect(res.status).toBe(404);
|
||
});
|
||
it('agent token → 403 (owner-only)', async () => {
|
||
// reuse capability_routes pattern: an agent token is denied
|
||
const headers = { Authorization: 'Bearer bad-agent' };
|
||
const res = await request(app).get('/api/jobs').set(headers);
|
||
expect(res.status).toBe(401);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2: Run red.**
|
||
|
||
```bash
|
||
npx vitest run tests/api/jobs.test.js
|
||
```
|
||
Expected: FAIL — route 404.
|
||
|
||
- [ ] **Step 3: Implement the route.**
|
||
|
||
```js
|
||
// lib/api/routes/jobs.js
|
||
import { Router } from 'express';
|
||
import { z } from 'zod';
|
||
import * as repo from '../../db/repos/jobs.js';
|
||
import { validate } from '../validate.js';
|
||
import { requireOwner } from '../cap.js';
|
||
import { NotFoundError, asyncWrap } from '../errors.js';
|
||
import { parsePagination } from '../pagination.js';
|
||
|
||
const STATES = ['created','retry','active','completed','expired','cancelled','failed'];
|
||
|
||
const listQuery = z.object({
|
||
state: z.enum(STATES).optional(),
|
||
name: z.string().optional(),
|
||
limit: z.string().optional(),
|
||
offset: z.string().optional()
|
||
});
|
||
|
||
const idParams = z.object({ id: z.string().uuid() });
|
||
|
||
export const router = Router();
|
||
router.use(requireOwner);
|
||
|
||
router.get('/',
|
||
validate({ query: listQuery }),
|
||
asyncWrap(async (req, res) => {
|
||
const { limit } = parsePagination(req);
|
||
res.json(await repo.list({
|
||
state: req.validatedQuery.state,
|
||
name: req.validatedQuery.name,
|
||
limit
|
||
}));
|
||
})
|
||
);
|
||
|
||
router.get('/:id',
|
||
validate({ params: idParams }),
|
||
asyncWrap(async (req, res) => {
|
||
const row = await repo.getById(req.params.id);
|
||
if (!row) throw new NotFoundError('job not found');
|
||
res.json(row);
|
||
})
|
||
);
|
||
```
|
||
|
||
- [ ] **Step 4: Mount in `lib/api/index.js`.**
|
||
|
||
```js
|
||
// additions
|
||
import { router as jobsRouter } from './routes/jobs.js';
|
||
// alongside other api.use:
|
||
api.use('/jobs', jobsRouter);
|
||
```
|
||
|
||
- [ ] **Step 5: Run green.**
|
||
|
||
```bash
|
||
npx vitest run tests/api/jobs.test.js
|
||
```
|
||
Expected: 3 passed.
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/api/routes/jobs.js lib/api/index.js tests/api/jobs.test.js
|
||
git commit -m "feat(api): jobs routes (list + get, owner-only)"
|
||
```
|
||
|
||
### Task A6: `POST /api/jobs/:id/retry` and `DELETE /api/jobs/:id`
|
||
|
||
**Files:**
|
||
- Modify: `lib/api/routes/jobs.js`
|
||
- Modify: `lib/db/repos/jobs.js`
|
||
- Modify: `tests/repos/jobs.test.js`, `tests/api/jobs.test.js`
|
||
|
||
- [ ] **Step 1: Extend the repo test.**
|
||
|
||
```js
|
||
// append to tests/repos/jobs.test.js
|
||
it('retry resubmits a failed job', async () => {
|
||
// enqueue a job that will fail, then retry
|
||
// (We use a synthetic failed row inserted directly for determinism.)
|
||
const id = await queue.enqueue('echo', { ping: 'x' });
|
||
// mark it failed manually:
|
||
const { pool } = await import('../../lib/db/pool.js');
|
||
await pool.query(`UPDATE pgboss.job SET state='failed' WHERE id=$1`, [id]);
|
||
const out = await jobs.retry(id);
|
||
expect(out?.state).toBe('retry');
|
||
});
|
||
it('remove deletes by id', async () => {
|
||
const id = await queue.enqueue('echo', { ping: 'rm' });
|
||
await jobs.remove(id);
|
||
expect(await jobs.getById(id)).toBeNull();
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red — both new tests fail (no `retry` / `remove` exports).
|
||
|
||
- [ ] **Step 3: Add to the repo.**
|
||
|
||
```js
|
||
// lib/db/repos/jobs.js — append
|
||
export async function retry(id) {
|
||
// Move row from failed/expired/cancelled back to 'retry'. pg-boss exposes
|
||
// .complete/.fail/.cancel but no direct resubmit; we update state in
|
||
// place. The poller will pick it up.
|
||
const { rows: [r] } = await pool.query(
|
||
`UPDATE pgboss.job SET state='retry', retrycount=retrycount+1
|
||
WHERE id=$1 RETURNING *`,
|
||
[id]
|
||
);
|
||
if (!r) {
|
||
// try archive — copy back into the active table
|
||
const { rows: [a] } = await pool.query(
|
||
`INSERT INTO pgboss.job (id, name, data, retrylimit, retrydelay, retrybackoff, startafter, expirein, state, retrycount)
|
||
SELECT id, name, data, retrylimit, retrydelay, retrybackoff, now(), expirein, 'retry', retrycount+1
|
||
FROM pgboss.archive WHERE id=$1
|
||
ON CONFLICT (id) DO UPDATE SET state='retry'
|
||
RETURNING *`,
|
||
[id]
|
||
);
|
||
return a ?? null;
|
||
}
|
||
return r;
|
||
}
|
||
|
||
export async function remove(id) {
|
||
await pool.query(`DELETE FROM pgboss.job WHERE id=$1`, [id]);
|
||
await pool.query(`DELETE FROM pgboss.archive WHERE id=$1`, [id]);
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4:** Run repo tests green.
|
||
|
||
```bash
|
||
npx vitest run tests/repos/jobs.test.js
|
||
```
|
||
Expected: 4 passed.
|
||
|
||
- [ ] **Step 5: Add the route handlers.**
|
||
|
||
```js
|
||
// lib/api/routes/jobs.js — append
|
||
router.post('/:id/retry',
|
||
validate({ params: idParams }),
|
||
asyncWrap(async (req, res) => {
|
||
const row = await repo.retry(req.params.id);
|
||
if (!row) throw new NotFoundError('job not found');
|
||
res.json(row);
|
||
})
|
||
);
|
||
|
||
router.delete('/:id',
|
||
validate({ params: idParams }),
|
||
asyncWrap(async (req, res) => {
|
||
await repo.remove(req.params.id);
|
||
res.status(204).end();
|
||
})
|
||
);
|
||
```
|
||
|
||
- [ ] **Step 6: Extend API tests.**
|
||
|
||
```js
|
||
// append to tests/api/jobs.test.js
|
||
it('POST :id/retry resubmits', async () => {
|
||
const id = await queue.enqueue('echo', { ping: 'r' });
|
||
const { pool } = await import('../../lib/db/pool.js');
|
||
await pool.query(`UPDATE pgboss.job SET state='failed' WHERE id=$1`, [id]);
|
||
const res = await request(app).post(`/api/jobs/${id}/retry`).set(ownerHeaders);
|
||
expect(res.status).toBe(200);
|
||
expect(res.body.state).toBe('retry');
|
||
});
|
||
it('DELETE :id removes', async () => {
|
||
const id = await queue.enqueue('echo', { ping: 'd' });
|
||
const res = await request(app).delete(`/api/jobs/${id}`).set(ownerHeaders);
|
||
expect(res.status).toBe(204);
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 7:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/api/jobs.test.js tests/repos/jobs.test.js
|
||
```
|
||
Expected: all pass.
|
||
|
||
- [ ] **Step 8: Commit.**
|
||
|
||
```bash
|
||
git add lib/db/repos/jobs.js lib/api/routes/jobs.js tests/repos/jobs.test.js tests/api/jobs.test.js
|
||
git commit -m "feat(api): jobs retry + delete"
|
||
```
|
||
|
||
### Task A7: Minimal Jobs view + sidebar entry
|
||
|
||
**Files:**
|
||
- Create: `public/views/jobs.js`
|
||
- Modify: `public/router.js`, `public/app.js`, `public/components/sidebar.js`
|
||
|
||
- [ ] **Step 1: Add the route to the hash router.**
|
||
|
||
```js
|
||
// public/router.js — add to ROUTES
|
||
{ name: 'jobs', re: /^\/jobs$/, keys: [] },
|
||
```
|
||
|
||
- [ ] **Step 2: Register the view.**
|
||
|
||
```js
|
||
// public/app.js — extend VIEWS
|
||
jobs: () => import('./views/jobs.js'),
|
||
```
|
||
|
||
- [ ] **Step 3: Write the minimal view.**
|
||
|
||
```js
|
||
// public/views/jobs.js
|
||
import { api } from '../api.js';
|
||
import { el, mount } from '../dom.js';
|
||
|
||
function row(j) {
|
||
return el('li', {},
|
||
el('span', { class: 'status idle' }, j.state),
|
||
' ', el('span', { style: { fontFamily: 'var(--font-mono)' } }, j.name),
|
||
' ', el('span', { class: 'muted' }, (j.id || '').slice(0, 8))
|
||
);
|
||
}
|
||
|
||
export async function render(main) {
|
||
const wrap = el('div');
|
||
mount(main,
|
||
el('h1', { class: 'view-h1' }, 'Jobs'),
|
||
el('p', { class: 'view-sub muted' }, 'pg-boss queue — recent jobs across states.'),
|
||
wrap
|
||
);
|
||
try {
|
||
const rows = await api.get('/api/jobs?limit=50');
|
||
if (!rows.length) mount(wrap, el('p', { class: 'muted' }, 'No jobs yet.'));
|
||
else mount(wrap, el('ul', { class: 'plain' }, rows.map(row)));
|
||
} catch (e) {
|
||
mount(wrap, el('p', { class: 'muted' }, 'Could not load: ' + e.message));
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Add the sidebar link.**
|
||
|
||
```js
|
||
// public/components/sidebar.js — add inside the Navigate section, after Inbox:
|
||
navItem('Jobs', '/jobs'),
|
||
```
|
||
|
||
- [ ] **Step 5:** Run the full suite — server tests should still pass.
|
||
|
||
```bash
|
||
npx vitest run tests/server.test.js
|
||
```
|
||
Expected: 6/6 pass.
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add public/router.js public/app.js public/views/jobs.js public/components/sidebar.js
|
||
git commit -m "feat(ui): jobs view stub + sidebar entry"
|
||
```
|
||
|
||
### Task A8: Phase A close — full suite + memory checkpoint
|
||
|
||
- [ ] **Step 1:** Run the full suite.
|
||
|
||
```bash
|
||
npx vitest run
|
||
```
|
||
Expected: all green (185 + new Phase A tests).
|
||
|
||
- [ ] **Step 2:** Update memory `project_void_v2_execution.md`: mark "Plan 3 Phase A complete: queue harness + Jobs API + view stub".
|
||
|
||
---
|
||
|
||
## Phase B — Capture API + URL worker + blob storage
|
||
|
||
### Task B1: Add ingest deps
|
||
|
||
**Files:**
|
||
- Modify: `package.json` — add `@mozilla/readability`, `jsdom`, `multer`.
|
||
|
||
- [ ] **Step 1:** `cd /project/src/void-v2 && npm i @mozilla/readability jsdom multer`
|
||
- [ ] **Step 2:** Verify deps:
|
||
```bash
|
||
grep -E '"(@mozilla/readability|jsdom|multer)"' package.json
|
||
```
|
||
Expected: three lines.
|
||
- [ ] **Step 3:** Full suite still green.
|
||
```bash
|
||
npx vitest run
|
||
```
|
||
- [ ] **Step 4: Commit.**
|
||
```bash
|
||
git add package.json package-lock.json
|
||
git commit -m "chore(deps): readability + jsdom + multer for ingest"
|
||
```
|
||
|
||
### Task B2: `lib/ingest/readability.js`
|
||
|
||
**Files:**
|
||
- Create: `lib/ingest/readability.js`
|
||
- Create: `tests/ingest/readability.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/ingest/readability.test.js
|
||
import { describe, it, expect } from 'vitest';
|
||
import { extract } from '../../lib/ingest/readability.js';
|
||
|
||
const HTML = `
|
||
<html><head><title>Blackflame Notes</title>
|
||
<meta property="og:site_name" content="Hynesy"/>
|
||
</head><body><article>
|
||
<h1>Blackflame Notes</h1>
|
||
<p>An essay on the Cradle aesthetic and the blackflame motif.</p>
|
||
</article></body></html>`;
|
||
|
||
describe('readability.extract', () => {
|
||
it('pulls title and text', () => {
|
||
const out = extract(HTML, 'https://example.com/x');
|
||
expect(out.title).toMatch(/Blackflame/);
|
||
expect(out.textContent).toMatch(/Cradle/);
|
||
expect(out.siteName).toBe('Hynesy');
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/ingest/readability.js
|
||
import { JSDOM } from 'jsdom';
|
||
import { Readability } from '@mozilla/readability';
|
||
|
||
export function extract(html, url) {
|
||
const dom = new JSDOM(html, { url });
|
||
const reader = new Readability(dom.window.document);
|
||
const a = reader.parse();
|
||
if (!a) return { title: null, textContent: '', excerpt: null, byline: null, siteName: null };
|
||
return {
|
||
title: a.title,
|
||
textContent: a.textContent.trim(),
|
||
excerpt: a.excerpt || null,
|
||
byline: a.byline || null,
|
||
siteName: a.siteName || null
|
||
};
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/ingest/readability.test.js
|
||
```
|
||
|
||
- [ ] **Step 5: Commit.**
|
||
|
||
```bash
|
||
git add lib/ingest/readability.js tests/ingest/readability.test.js
|
||
git commit -m "feat(ingest): readability wrapper"
|
||
```
|
||
|
||
### Task B3: `lib/ingest/blob_store.js`
|
||
|
||
**Files:**
|
||
- Create: `lib/ingest/blob_store.js`
|
||
- Create: `tests/ingest/blob_store.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/ingest/blob_store.test.js
|
||
import { describe, it, expect, beforeEach } from 'vitest';
|
||
import fs from 'node:fs/promises';
|
||
import path from 'node:path';
|
||
import os from 'node:os';
|
||
import { BlobStore } from '../../lib/ingest/blob_store.js';
|
||
|
||
let root, store;
|
||
beforeEach(async () => {
|
||
root = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blob-'));
|
||
store = new BlobStore(root);
|
||
});
|
||
|
||
describe('blob_store', () => {
|
||
it('hashes content and resolves path', async () => {
|
||
const buf = Buffer.from('void');
|
||
const sha = await store.hash(buf);
|
||
expect(sha).toMatch(/^[0-9a-f]{64}$/);
|
||
const p = store.path(sha);
|
||
expect(p.startsWith(root)).toBe(true);
|
||
expect(p.includes(sha.slice(0, 2))).toBe(true);
|
||
});
|
||
|
||
it('write is idempotent (same content → same path)', async () => {
|
||
const buf = Buffer.from('void');
|
||
const a = await store.write(buf);
|
||
const b = await store.write(buf);
|
||
expect(a.path).toBe(b.path);
|
||
expect(a.sha).toBe(b.sha);
|
||
const onDisk = await fs.readFile(a.path);
|
||
expect(onDisk.equals(buf)).toBe(true);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/ingest/blob_store.js
|
||
import crypto from 'node:crypto';
|
||
import fs from 'node:fs/promises';
|
||
import path from 'node:path';
|
||
|
||
export class BlobStore {
|
||
constructor(root) { this.root = root; }
|
||
|
||
async hash(buf) {
|
||
return crypto.createHash('sha256').update(buf).digest('hex');
|
||
}
|
||
|
||
path(sha) {
|
||
return path.join(this.root, sha.slice(0, 2), sha);
|
||
}
|
||
|
||
async write(buf) {
|
||
const sha = await this.hash(buf);
|
||
const dest = this.path(sha);
|
||
try {
|
||
await fs.access(dest);
|
||
} catch {
|
||
await fs.mkdir(path.dirname(dest), { recursive: true });
|
||
await fs.writeFile(dest, buf);
|
||
}
|
||
return { sha, path: dest };
|
||
}
|
||
}
|
||
|
||
let _default = null;
|
||
export function defaultStore() {
|
||
if (!_default) _default = new BlobStore(process.env.BLOB_ROOT || '/var/lib/void/blobs');
|
||
return _default;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/ingest/blob_store.test.js
|
||
```
|
||
|
||
- [ ] **Step 5: Commit.**
|
||
|
||
```bash
|
||
git add lib/ingest/blob_store.js tests/ingest/blob_store.test.js
|
||
git commit -m "feat(ingest): content-addressed blob store"
|
||
```
|
||
|
||
### Task B4: `ingest.url` worker
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/workers/url.js`
|
||
- Modify: `lib/jobs/index.js` — register `url`.
|
||
- Create: `tests/jobs/workers/url.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/workers/url.test.js
|
||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||
import { resetDb } from '../../helpers/db.js';
|
||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||
import { stopBoss } from '../../helpers/boss.js';
|
||
import * as queue from '../../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||
import * as spaces from '../../../lib/db/repos/spaces.js';
|
||
import * as refs from '../../../lib/db/repos/refs.js';
|
||
|
||
const HTML = `<html><head><title>Blackflame</title></head><body><article><p>Cradle</p></article></body></html>`;
|
||
|
||
beforeEach(async () => {
|
||
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||
global.fetch = vi.fn(async () => new Response(HTML, { status: 200, headers: { 'content-type': 'text/html' }}));
|
||
});
|
||
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
|
||
|
||
describe('ingest.url worker', () => {
|
||
it('creates a ref from a URL', async () => {
|
||
const sp = await spaces.create({ slug: 's', name: 'S' }, { kind: 'user', id: null });
|
||
const id = await queue.enqueue('ingest.url', { space_id: sp.id, url: 'https://example.com/a' });
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await queue.instance().getJobById(id);
|
||
if (j?.state === 'completed') break;
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
const rows = await refs.list({ space_id: sp.id });
|
||
expect(rows[0].title).toMatch(/Blackflame/);
|
||
expect(rows[0].external_id).toBeTruthy();
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement the worker.**
|
||
|
||
```js
|
||
// lib/jobs/workers/url.js
|
||
import crypto from 'node:crypto';
|
||
import { extract } from '../../ingest/readability.js';
|
||
import * as refs from '../../db/repos/refs.js';
|
||
import { pool } from '../../db/pool.js';
|
||
|
||
export const NAME = 'ingest.url';
|
||
export const opts = { teamSize: 4, teamConcurrency: 4 };
|
||
|
||
function key(space_id, url) {
|
||
return crypto.createHash('sha256').update(space_id + '\x00' + url).digest('hex');
|
||
}
|
||
|
||
export async function handler(job) {
|
||
const { space_id, url } = job.data;
|
||
const idem = key(space_id, url);
|
||
|
||
// already ingested?
|
||
const { rows: [existing] } = await pool.query(
|
||
`SELECT id FROM refs WHERE source_kind='url' AND external_id=$1 LIMIT 1`,
|
||
[idem]
|
||
);
|
||
if (existing) return { ref_id: existing.id, idempotent: true };
|
||
|
||
const res = await fetch(url, {
|
||
headers: { 'User-Agent': 'void-ingest/2.0' },
|
||
signal: AbortSignal.timeout(15_000)
|
||
});
|
||
if (!res.ok) throw new Error(`fetch ${url} → ${res.status}`);
|
||
const html = await res.text();
|
||
|
||
const parsed = extract(html, url);
|
||
const row = await refs.create({
|
||
space_id,
|
||
kind: 'url',
|
||
source_url: url,
|
||
title: parsed.title || url,
|
||
summary: parsed.excerpt,
|
||
body_text: (parsed.textContent || '').slice(0, 200_000),
|
||
source_kind: 'url',
|
||
external_id: idem,
|
||
metadata: { site_name: parsed.siteName, byline: parsed.byline }
|
||
}, { kind: 'system', id: null });
|
||
|
||
return { ref_id: row.id };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Register.**
|
||
|
||
```js
|
||
// lib/jobs/index.js — extend WORKERS
|
||
import * as url from './workers/url.js';
|
||
const WORKERS = [echo, url];
|
||
```
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/workers/url.test.js
|
||
```
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/workers/url.js lib/jobs/index.js tests/jobs/workers/url.test.js
|
||
git commit -m "feat(jobs): ingest.url worker"
|
||
```
|
||
|
||
### Task B5: `ingest.blob` worker
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/workers/blob.js`
|
||
- Modify: `lib/jobs/index.js`
|
||
- Create: `tests/jobs/workers/blob.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/workers/blob.test.js
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import fs from 'node:fs/promises';
|
||
import path from 'node:path';
|
||
import os from 'node:os';
|
||
import { resetDb } from '../../helpers/db.js';
|
||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||
import { stopBoss } from '../../helpers/boss.js';
|
||
import * as queue from '../../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||
import * as spaces from '../../../lib/db/repos/spaces.js';
|
||
import * as refs from '../../../lib/db/repos/refs.js';
|
||
|
||
let tmpRoot;
|
||
beforeEach(async () => {
|
||
tmpRoot = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blobs-'));
|
||
process.env.BLOB_ROOT = tmpRoot;
|
||
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||
});
|
||
afterEach(async () => { await stopBoss(); });
|
||
|
||
describe('ingest.blob worker', () => {
|
||
it('creates a ref pointing at the blob', async () => {
|
||
const sp = await spaces.create({ slug: 'b', name: 'B' }, { kind: 'user', id: null });
|
||
// pre-stage an upload tmp file
|
||
const upTmp = path.join(tmpRoot, 'up.tmp');
|
||
await fs.writeFile(upTmp, Buffer.from('hello blob'));
|
||
const id = await queue.enqueue('ingest.blob', {
|
||
space_id: sp.id, tmp_path: upTmp, filename: 'hello.txt', content_type: 'text/plain'
|
||
});
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await queue.instance().getJobById(id);
|
||
if (j?.state === 'completed') break;
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
const rows = await refs.list({ space_id: sp.id });
|
||
expect(rows[0].kind).toBe('file');
|
||
expect(rows[0].blob_path).toBeTruthy();
|
||
expect(rows[0].title).toBe('hello.txt');
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/jobs/workers/blob.js
|
||
import fs from 'node:fs/promises';
|
||
import * as refs from '../../db/repos/refs.js';
|
||
import { defaultStore } from '../../ingest/blob_store.js';
|
||
|
||
export const NAME = 'ingest.blob';
|
||
export const opts = { teamSize: 2, teamConcurrency: 2 };
|
||
|
||
function kindFor(content_type, filename) {
|
||
if (content_type?.startsWith('image/')) return 'image';
|
||
if (content_type === 'application/pdf' || filename?.toLowerCase().endsWith('.pdf')) return 'pdf';
|
||
return 'file';
|
||
}
|
||
|
||
export async function handler(job) {
|
||
const { space_id, tmp_path, filename, content_type, meta = {} } = job.data;
|
||
const buf = await fs.readFile(tmp_path);
|
||
const { sha, path } = await defaultStore().write(buf);
|
||
// remove the temp file
|
||
try { await fs.unlink(tmp_path); } catch { /* */ }
|
||
|
||
const kind = kindFor(content_type, filename);
|
||
const row = await refs.create({
|
||
space_id,
|
||
kind,
|
||
source_url: null,
|
||
title: meta.title || filename || sha.slice(0, 12),
|
||
summary: null,
|
||
body_text: null,
|
||
blob_path: path,
|
||
metadata: { sha, content_type, filename, size: buf.length, ...(meta.metadata || {}) }
|
||
}, { kind: 'system', id: null });
|
||
return { ref_id: row.id, sha };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Register.**
|
||
|
||
```js
|
||
// lib/jobs/index.js
|
||
import * as blob from './workers/blob.js';
|
||
const WORKERS = [echo, url, blob];
|
||
```
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/workers/blob.test.js
|
||
```
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/workers/blob.js lib/jobs/index.js tests/jobs/workers/blob.test.js
|
||
git commit -m "feat(jobs): ingest.blob worker"
|
||
```
|
||
|
||
### Task B6: `/api/capture` POST + `/api/capture/upload`
|
||
|
||
**Files:**
|
||
- Create: `lib/api/routes/capture.js`
|
||
- Modify: `lib/api/index.js`
|
||
- Create: `tests/api/capture.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/api/capture.test.js
|
||
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
|
||
import fs from 'node:fs/promises';
|
||
import path from 'node:path';
|
||
import os from 'node:os';
|
||
import request from 'supertest';
|
||
import { setup } from './helpers.js';
|
||
import { stopBoss } from '../helpers/boss.js';
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../lib/jobs/index.js';
|
||
import * as spaces from '../../lib/db/repos/spaces.js';
|
||
|
||
let app, ownerHeaders, sp;
|
||
beforeEach(async () => {
|
||
({ app, ownerHeaders } = await setup());
|
||
sp = await spaces.create({ slug: 'c', name: 'C' }, { kind: 'user', id: null });
|
||
process.env.BLOB_ROOT = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blobs-'));
|
||
await queue.start(); await registerWorkers();
|
||
global.fetch = vi.fn(async () => new Response(
|
||
'<html><head><title>X</title></head><body><article><p>x</p></article></body></html>',
|
||
{ status: 200, headers: { 'content-type': 'text/html' }}
|
||
));
|
||
});
|
||
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
|
||
|
||
describe('capture api', () => {
|
||
it('POST /api/capture enqueues ingest.url', async () => {
|
||
const res = await request(app).post('/api/capture').set(ownerHeaders)
|
||
.send({ space_id: sp.id, url: 'https://example.com/a' });
|
||
expect(res.status).toBe(202);
|
||
expect(res.body.job_id).toBeTruthy();
|
||
expect(res.body.idempotency_key).toMatch(/^[0-9a-f]{64}$/);
|
||
});
|
||
it('POST /api/capture/upload enqueues ingest.blob', async () => {
|
||
const res = await request(app).post('/api/capture/upload').set(ownerHeaders)
|
||
.field('space_id', sp.id)
|
||
.attach('file', Buffer.from('hi'), { filename: 'a.txt', contentType: 'text/plain' });
|
||
expect(res.status).toBe(202);
|
||
expect(res.body.job_id).toBeTruthy();
|
||
});
|
||
it('POST /api/capture rejects missing url', async () => {
|
||
const res = await request(app).post('/api/capture').set(ownerHeaders)
|
||
.send({ space_id: sp.id });
|
||
expect(res.status).toBe(400);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/api/routes/capture.js
|
||
import { Router } from 'express';
|
||
import { z } from 'zod';
|
||
import crypto from 'node:crypto';
|
||
import fs from 'node:fs/promises';
|
||
import path from 'node:path';
|
||
import os from 'node:os';
|
||
import multer from 'multer';
|
||
import * as queue from '../../jobs/queue.js';
|
||
import { pool } from '../../db/pool.js';
|
||
import { validate } from '../validate.js';
|
||
import { requireWrite } from '../cap.js';
|
||
import { asyncWrap } from '../errors.js';
|
||
|
||
const captureBody = z.object({
|
||
space_id: z.string().uuid(),
|
||
url: z.string().url(),
|
||
hint: z.object({
|
||
project_id: z.string().uuid().optional(),
|
||
title: z.string().optional(),
|
||
tags: z.array(z.string()).optional()
|
||
}).optional()
|
||
});
|
||
|
||
const UPLOAD_TMP = process.env.UPLOAD_TMP || path.join(os.tmpdir(), 'void-uploads');
|
||
await fs.mkdir(UPLOAD_TMP, { recursive: true });
|
||
const upload = multer({ dest: UPLOAD_TMP, limits: { fileSize: 100 * 1024 * 1024 } });
|
||
|
||
function key(space_id, url) {
|
||
return crypto.createHash('sha256').update(space_id + '\x00' + url).digest('hex');
|
||
}
|
||
|
||
export const router = Router();
|
||
|
||
router.post('/',
|
||
requireWrite('ref'),
|
||
validate({ body: captureBody }),
|
||
asyncWrap(async (req, res) => {
|
||
const { space_id, url } = req.body;
|
||
const idem = key(space_id, url);
|
||
const { rows: [existing] } = await pool.query(
|
||
`SELECT id FROM refs WHERE source_kind='url' AND external_id=$1 LIMIT 1`,
|
||
[idem]
|
||
);
|
||
if (existing) {
|
||
return res.status(202).json({
|
||
job_id: null, idempotency_key: idem, ref_id: existing.id
|
||
});
|
||
}
|
||
const job_id = await queue.enqueue('ingest.url', { space_id, url });
|
||
res.status(202).json({ job_id, idempotency_key: idem });
|
||
})
|
||
);
|
||
|
||
router.post('/upload',
|
||
requireWrite('ref'),
|
||
upload.single('file'),
|
||
asyncWrap(async (req, res) => {
|
||
if (!req.file) {
|
||
return res.status(400).json({ error: { code: 'validation_failed', message: 'file required' } });
|
||
}
|
||
const space_id = req.body.space_id;
|
||
if (!space_id) {
|
||
return res.status(400).json({ error: { code: 'validation_failed', message: 'space_id required' } });
|
||
}
|
||
const meta = req.body.meta ? JSON.parse(req.body.meta) : {};
|
||
const job_id = await queue.enqueue('ingest.blob', {
|
||
space_id,
|
||
tmp_path: req.file.path,
|
||
filename: req.file.originalname,
|
||
content_type: req.file.mimetype,
|
||
meta
|
||
});
|
||
res.status(202).json({ job_id });
|
||
})
|
||
);
|
||
```
|
||
|
||
- [ ] **Step 4: Mount.**
|
||
|
||
```js
|
||
// lib/api/index.js — additions
|
||
import { router as captureRouter } from './routes/capture.js';
|
||
api.use('/capture', captureRouter);
|
||
```
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/api/capture.test.js
|
||
```
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/api/routes/capture.js lib/api/index.js tests/api/capture.test.js
|
||
git commit -m "feat(api): capture POST + upload"
|
||
```
|
||
|
||
### Task B7: Phase B close — full suite + memory checkpoint
|
||
|
||
- [ ] **Step 1:** `npx vitest run` — all green.
|
||
- [ ] **Step 2:** Update memory: "Plan 3 Phase B complete: capture API + URL + blob workers".
|
||
|
||
---
|
||
|
||
## Phase C — Embeddings + hybrid search
|
||
|
||
### Task C1: `lib/ai/ollama.js`
|
||
|
||
**Files:**
|
||
- Create: `lib/ai/ollama.js`
|
||
- Create: `tests/ai/ollama.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/ai/ollama.test.js
|
||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||
import { embedText } from '../../lib/ai/ollama.js';
|
||
|
||
beforeEach(() => {
|
||
global.fetch = vi.fn(async () => new Response(
|
||
JSON.stringify({ embedding: new Array(768).fill(0.1) }),
|
||
{ status: 200, headers: { 'content-type': 'application/json' }}
|
||
));
|
||
});
|
||
afterEach(() => vi.restoreAllMocks());
|
||
|
||
describe('ollama.embedText', () => {
|
||
it('returns 768-dim vector', async () => {
|
||
const v = await embedText('hello');
|
||
expect(v).toHaveLength(768);
|
||
expect(v[0]).toBeCloseTo(0.1, 5);
|
||
});
|
||
it('throws on non-200', async () => {
|
||
global.fetch = vi.fn(async () => new Response('boom', { status: 502 }));
|
||
await expect(embedText('x')).rejects.toThrow();
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/ai/ollama.js
|
||
export class OllamaError extends Error {
|
||
constructor(status, body) { super(`ollama ${status}: ${body}`); this.status = status; }
|
||
}
|
||
|
||
export async function embedText(text, { model = 'nomic-embed-text', timeoutMs = 60_000 } = {}) {
|
||
const url = (process.env.OLLAMA_URL || 'http://192.168.1.185:11434') + '/api/embeddings';
|
||
const res = await fetch(url, {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({ model, prompt: text }),
|
||
signal: AbortSignal.timeout(timeoutMs)
|
||
});
|
||
if (!res.ok) throw new OllamaError(res.status, await res.text());
|
||
const j = await res.json();
|
||
return j.embedding;
|
||
}
|
||
|
||
export function padTo(vector, dim) {
|
||
if (vector.length === dim) return vector;
|
||
if (vector.length > dim) return vector.slice(0, dim);
|
||
const out = vector.slice();
|
||
while (out.length < dim) out.push(0);
|
||
return out;
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/ai/ollama.test.js
|
||
```
|
||
|
||
- [ ] **Step 5: Commit.**
|
||
|
||
```bash
|
||
git add lib/ai/ollama.js tests/ai/ollama.test.js
|
||
git commit -m "feat(ai): ollama embed-text wrapper"
|
||
```
|
||
|
||
### Task C2: `embed.text` worker
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/workers/embed.js`
|
||
- Modify: `lib/jobs/index.js`
|
||
- Create: `tests/jobs/workers/embed.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/workers/embed.test.js
|
||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||
import { resetDb } from '../../helpers/db.js';
|
||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||
import { stopBoss } from '../../helpers/boss.js';
|
||
import { pool } from '../../../lib/db/pool.js';
|
||
import * as queue from '../../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||
import * as spaces from '../../../lib/db/repos/spaces.js';
|
||
import * as pages from '../../../lib/db/repos/pages.js';
|
||
|
||
beforeEach(async () => {
|
||
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||
global.fetch = vi.fn(async () => new Response(
|
||
JSON.stringify({ embedding: new Array(768).fill(0.42) }),
|
||
{ status: 200, headers: { 'content-type': 'application/json' }}
|
||
));
|
||
});
|
||
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
|
||
|
||
describe('embed.text worker', () => {
|
||
it('writes embedding to the page row', async () => {
|
||
const sp = await spaces.create({ slug: 'e', name: 'E' }, { kind: 'user', id: null });
|
||
const pg = await pages.create({ space_id: sp.id, slug: 'p', title: 'P', body_md: 'body' }, { kind: 'user', id: null });
|
||
const id = await queue.enqueue('embed.text', { entity_type: 'page', entity_id: pg.id });
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await queue.instance().getJobById(id);
|
||
if (j?.state === 'completed') break;
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
const { rows: [row] } = await pool.query('SELECT embedding FROM pages WHERE id=$1', [pg.id]);
|
||
expect(row.embedding).toBeTruthy();
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/jobs/workers/embed.js
|
||
import { embedText, padTo } from '../../ai/ollama.js';
|
||
import { pool } from '../../db/pool.js';
|
||
import { recordAudit } from '../../db/repos/audit.js';
|
||
|
||
export const NAME = 'embed.text';
|
||
export const opts = { teamSize: 2, teamConcurrency: 2 };
|
||
|
||
const STRING_BUILDERS = {
|
||
page: row => `${row.title}\n\n${row.body_md || ''}`,
|
||
ref: row => `${row.title || ''}\n${row.summary || ''}\n${row.body_text || ''}`,
|
||
source_doc: row => `${row.name}\n${row.body_text || ''}`,
|
||
conversation: row => `${row.title || ''}\n${row.summary || ''}`
|
||
};
|
||
|
||
const TABLE = { page: 'pages', ref: 'refs', source_doc: 'source_docs', conversation: 'conversations' };
|
||
|
||
export async function handler(job) {
|
||
const { entity_type, entity_id } = job.data;
|
||
const table = TABLE[entity_type];
|
||
if (!table) throw new Error(`unknown entity_type: ${entity_type}`);
|
||
const { rows: [row] } = await pool.query(`SELECT * FROM ${table} WHERE id=$1`, [entity_id]);
|
||
if (!row) return { skipped: 'gone' };
|
||
const text = STRING_BUILDERS[entity_type](row).slice(0, 6_000);
|
||
const v = await embedText(text);
|
||
const padded = padTo(v, 1024);
|
||
const literal = '[' + padded.join(',') + ']';
|
||
await pool.query(`UPDATE ${table} SET embedding=$1::vector WHERE id=$2`, [literal, entity_id]);
|
||
await recordAudit({ kind: 'worker', id: null }, 'update', entity_type, entity_id, null, { embedding: 'updated' });
|
||
return { entity_id };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Register.**
|
||
|
||
```js
|
||
// lib/jobs/index.js
|
||
import * as embed from './workers/embed.js';
|
||
const WORKERS = [echo, url, blob, embed];
|
||
```
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/workers/embed.test.js
|
||
```
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/workers/embed.js lib/jobs/index.js tests/jobs/workers/embed.test.js
|
||
git commit -m "feat(jobs): embed.text worker"
|
||
```
|
||
|
||
### Task C3: Repo-level triggers
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/triggers.js`
|
||
- Modify: pages.js / refs.js / source_docs.js repos — call `triggerEmbed(...)` after create/update.
|
||
- Create: `tests/jobs/triggers.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/triggers.test.js
|
||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||
import { resetDb } from '../helpers/db.js';
|
||
import { migrateUp } from '../../lib/db/migrate.js';
|
||
import { stopBoss } from '../helpers/boss.js';
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../lib/jobs/index.js';
|
||
import * as spaces from '../../lib/db/repos/spaces.js';
|
||
import * as pages from '../../lib/db/repos/pages.js';
|
||
import * as jobsRepo from '../../lib/db/repos/jobs.js';
|
||
|
||
beforeEach(async () => {
|
||
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||
global.fetch = vi.fn(async () => new Response(
|
||
JSON.stringify({ embedding: new Array(768).fill(0.1) }),
|
||
{ status: 200, headers: { 'content-type': 'application/json' }}
|
||
));
|
||
});
|
||
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
|
||
|
||
describe('triggers', () => {
|
||
it('pages.create enqueues embed.text', async () => {
|
||
const sp = await spaces.create({ slug: 's', name: 'S' }, { kind: 'user', id: null });
|
||
await pages.create({ space_id: sp.id, slug: 'p', title: 'P', body_md: 'b' }, { kind: 'user', id: null });
|
||
// give the queue a tick to land the job
|
||
await new Promise(r => setTimeout(r, 200));
|
||
const rows = await jobsRepo.list({ name: 'embed.text', limit: 5 });
|
||
expect(rows.length).toBeGreaterThan(0);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement triggers.js.**
|
||
|
||
```js
|
||
// lib/jobs/triggers.js
|
||
import * as queue from './queue.js';
|
||
import { log } from '../log.js';
|
||
|
||
export async function triggerEmbed(entity_type, entity_id) {
|
||
try {
|
||
await queue.enqueue('embed.text',
|
||
{ entity_type, entity_id },
|
||
{ singletonKey: `${entity_type}:${entity_id}` }
|
||
);
|
||
} catch (e) {
|
||
// Queue not running (server tests) — never block the write.
|
||
log.debug({ err: e, entity_type, entity_id }, 'triggerEmbed skipped');
|
||
}
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Wire into pages.js.**
|
||
|
||
Locate `lib/db/repos/pages.js` and append `await triggerEmbed('page', r.id);` (with import) at the end of `create` and `update`. Do the same in `lib/db/repos/refs.js` for create/update/upsertByExternal, and in `lib/db/repos/source_docs.js` for create/update.
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/jobs/triggers.test.js
|
||
```
|
||
|
||
- [ ] **Step 6:** Run the full suite — must still be green; triggers are no-op when the queue isn't running.
|
||
|
||
```bash
|
||
npx vitest run
|
||
```
|
||
|
||
- [ ] **Step 7: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/triggers.js lib/db/repos/pages.js lib/db/repos/refs.js lib/db/repos/source_docs.js tests/jobs/triggers.test.js
|
||
git commit -m "feat(jobs): repo-level embed triggers"
|
||
```
|
||
|
||
### Task C4: Hybrid search with RRF + graceful FTS fallback
|
||
|
||
**Files:**
|
||
- Modify: `lib/db/repos/search.js` (rewrite)
|
||
- Modify: `tests/repos/search.test.js` — extend
|
||
- Modify: `tests/api/search.test.js` — extend
|
||
|
||
- [ ] **Step 1: Extend the repo test with vector-fixture rows.**
|
||
|
||
```js
|
||
// append to tests/repos/search.test.js
|
||
it('hybrid: vector-only hit (no FTS match) still ranks', async () => {
|
||
const sp = await spacesRepo.create({ slug: 'h', name: 'H' }, owner);
|
||
const page = await pagesRepo.create({ space_id: sp.id, slug: 'p', title: 'Unrelated', body_md: 'nothing matches FTS' }, owner);
|
||
// hand-craft a fixture vector that is close to the query embedding
|
||
const v = '[' + new Array(1024).fill(0.5).join(',') + ']';
|
||
const { pool } = await import('../../lib/db/pool.js');
|
||
await pool.query('UPDATE pages SET embedding=$1::vector WHERE id=$2', [v, page.id]);
|
||
// monkey-patch fetch so the query also embeds to the same vector
|
||
global.fetch = vi.fn(async () => new Response(
|
||
JSON.stringify({ embedding: new Array(768).fill(0.5) }),
|
||
{ status: 200, headers: { 'content-type': 'application/json' }}
|
||
));
|
||
const hits = await search.fts({ q: 'whatever' });
|
||
expect(hits.find(h => h.id === page.id)).toBeTruthy();
|
||
vi.restoreAllMocks();
|
||
});
|
||
|
||
it('hybrid: Ollama down → FTS-only fallback still returns FTS hits', async () => {
|
||
const sp = await spacesRepo.create({ slug: 'd', name: 'D' }, owner);
|
||
await pagesRepo.create({ space_id: sp.id, slug: 'p', title: 'blackflame palette', body_md: '' }, owner);
|
||
global.fetch = vi.fn(async () => { throw new Error('connect ECONNREFUSED'); });
|
||
const hits = await search.fts({ q: 'blackflame' });
|
||
expect(hits.length).toBeGreaterThan(0);
|
||
vi.restoreAllMocks();
|
||
});
|
||
```
|
||
|
||
(Add `import { vi } from 'vitest';` if missing.)
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Rewrite search repo with hybrid + RRF.**
|
||
|
||
```js
|
||
// lib/db/repos/search.js — REPLACE EXISTING
|
||
import { pool } from '../pool.js';
|
||
import { embedText, padTo, OllamaError } from '../../ai/ollama.js';
|
||
|
||
const PAGES_TSV = `to_tsvector('english', p.title || ' ' || coalesce(p.body_md,''))`;
|
||
const REFS_TSV = `to_tsvector('english', coalesce(r.title,'') || ' ' || coalesce(r.summary,'') || ' ' || coalesce(r.body_text,''))`;
|
||
const SD_TSV = `to_tsvector('english', sd.name || ' ' || coalesce(sd.body_text,''))`;
|
||
const MSG_TSV = `to_tsvector('english', m.body)`;
|
||
|
||
// --- FTS branch (unchanged behavior, returns flat rows) ---
|
||
function ftsBranches({ kinds, spaceFilterPresent }) {
|
||
const want = k => !kinds || kinds.includes(k);
|
||
const branches = [];
|
||
if (want('page')) branches.push(`
|
||
SELECT 'page'::text AS kind, p.id, p.space_id, p.title AS title_or_snippet,
|
||
ts_rank(${PAGES_TSV}, q.tsq) AS rank
|
||
FROM pages p, q
|
||
WHERE ${PAGES_TSV} @@ q.tsq
|
||
AND ($2::uuid IS NULL OR p.space_id = $2)`);
|
||
if (want('ref')) branches.push(`
|
||
SELECT 'ref', r.id, r.space_id,
|
||
coalesce(r.title, r.source_url, '(untitled)'),
|
||
ts_rank(${REFS_TSV}, q.tsq) FROM refs r, q
|
||
WHERE ${REFS_TSV} @@ q.tsq
|
||
AND ($2::uuid IS NULL OR r.space_id = $2)`);
|
||
if (want('source_doc')) branches.push(`
|
||
SELECT 'source_doc', sd.id, res.space_id, sd.name,
|
||
ts_rank(${SD_TSV}, q.tsq)
|
||
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id, q
|
||
WHERE ${SD_TSV} @@ q.tsq
|
||
AND ($2::uuid IS NULL OR res.space_id = $2)`);
|
||
if (want('message') && !spaceFilterPresent) branches.push(`
|
||
SELECT 'message', m.id, NULL::uuid, substring(m.body, 1, 200),
|
||
ts_rank(${MSG_TSV}, q.tsq) FROM messages m, q
|
||
WHERE ${MSG_TSV} @@ q.tsq`);
|
||
return branches;
|
||
}
|
||
|
||
async function ftsRows({ q, space_id, kinds, limit }) {
|
||
const branches = ftsBranches({ kinds, spaceFilterPresent: space_id != null });
|
||
if (!branches.length) return [];
|
||
const sql = `WITH q AS (SELECT plainto_tsquery('english', $1) AS tsq)
|
||
SELECT * FROM (${branches.join('\n UNION ALL ')}) u
|
||
ORDER BY rank DESC LIMIT $3`;
|
||
const { rows } = await pool.query(sql, [q, space_id, limit * 3]);
|
||
return rows;
|
||
}
|
||
|
||
// --- Vector branch ---
|
||
async function vectorRows({ qvec, space_id, kinds, limit }) {
|
||
const want = k => !kinds || kinds.includes(k);
|
||
const literal = '[' + qvec.join(',') + ']';
|
||
const parts = [];
|
||
if (want('page')) parts.push(`
|
||
SELECT 'page'::text AS kind, p.id, p.space_id, p.title AS title_or_snippet,
|
||
(p.embedding <=> $1::vector) AS dist
|
||
FROM pages p
|
||
WHERE p.embedding IS NOT NULL
|
||
AND ($2::uuid IS NULL OR p.space_id = $2)
|
||
ORDER BY p.embedding <=> $1::vector LIMIT $3`);
|
||
if (want('ref')) parts.push(`
|
||
SELECT 'ref', r.id, r.space_id,
|
||
coalesce(r.title, r.source_url, '(untitled)'),
|
||
(r.embedding <=> $1::vector)
|
||
FROM refs r
|
||
WHERE r.embedding IS NOT NULL
|
||
AND ($2::uuid IS NULL OR r.space_id = $2)
|
||
ORDER BY r.embedding <=> $1::vector LIMIT $3`);
|
||
if (want('source_doc')) parts.push(`
|
||
SELECT 'source_doc', sd.id, res.space_id, sd.name,
|
||
(sd.embedding <=> $1::vector)
|
||
FROM source_docs sd JOIN resources res ON res.id = sd.resource_id
|
||
WHERE sd.embedding IS NOT NULL
|
||
AND ($2::uuid IS NULL OR res.space_id = $2)
|
||
ORDER BY sd.embedding <=> $1::vector LIMIT $3`);
|
||
// messages not embedded yet (Plan 5 may add); skip in vector branch.
|
||
if (!parts.length) return [];
|
||
const sql = parts.join(' UNION ALL ');
|
||
const all = [];
|
||
for (const part of parts) {
|
||
const { rows } = await pool.query(part, [literal, space_id, limit * 3]);
|
||
all.push(...rows);
|
||
}
|
||
return all;
|
||
}
|
||
|
||
// RRF fusion. k=60 per Cormack et al.
|
||
function fuse(fts, vec, limit, offset) {
|
||
const K = 60;
|
||
const score = new Map(); // "kind:id" → { kind, id, space_id, title_or_snippet, rrf }
|
||
function add(rows, getRank) {
|
||
rows.forEach((r, idx) => {
|
||
const k = `${r.kind}:${r.id}`;
|
||
const cur = score.get(k) || { ...r, rrf: 0 };
|
||
cur.rrf += 1 / (K + getRank(idx, r));
|
||
score.set(k, cur);
|
||
});
|
||
}
|
||
add(fts, idx => idx + 1);
|
||
add(vec, idx => idx + 1);
|
||
const sorted = [...score.values()].sort((a, b) => b.rrf - a.rrf);
|
||
return sorted.slice(offset, offset + limit).map(({ rrf, ...rest }) => ({ ...rest, rank: rrf }));
|
||
}
|
||
|
||
export async function fts({ q, space_id = null, kinds = null, limit = 50, offset = 0 } = {}) {
|
||
if (!q) return [];
|
||
const normalizedKinds = Array.isArray(kinds) && kinds.length ? kinds : null;
|
||
|
||
const ftsP = ftsRows({ q, space_id, kinds: normalizedKinds, limit });
|
||
|
||
let vec = [];
|
||
try {
|
||
const raw = await embedText(q, { timeoutMs: 5_000 });
|
||
vec = await vectorRows({ qvec: padTo(raw, 1024), space_id, kinds: normalizedKinds, limit });
|
||
} catch (e) {
|
||
// Ollama down / slow → fall back to FTS-only
|
||
}
|
||
|
||
const ftsResult = await ftsP;
|
||
return fuse(ftsResult, vec, limit, offset);
|
||
}
|
||
|
||
export async function hybrid(args) { return fts(args); }
|
||
```
|
||
|
||
- [ ] **Step 4:** Run green.
|
||
|
||
```bash
|
||
npx vitest run tests/repos/search.test.js tests/api/search.test.js
|
||
```
|
||
|
||
- [ ] **Step 5:** Run the full suite — all green.
|
||
|
||
```bash
|
||
npx vitest run
|
||
```
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/db/repos/search.js tests/repos/search.test.js tests/api/search.test.js
|
||
git commit -m "feat(search): hybrid FTS+vector with RRF + Ollama-down fallback"
|
||
```
|
||
|
||
### Task C5: Integration test (skip if Ollama unreachable)
|
||
|
||
**Files:**
|
||
- Create: `tests/integration/embed_live.test.js`
|
||
|
||
- [ ] **Step 1: Write the gated test.**
|
||
|
||
```js
|
||
// tests/integration/embed_live.test.js
|
||
import { describe, it, expect } from 'vitest';
|
||
|
||
async function ollamaUp() {
|
||
try {
|
||
const res = await fetch((process.env.OLLAMA_URL || 'http://192.168.1.185:11434') + '/api/tags',
|
||
{ signal: AbortSignal.timeout(2_000) });
|
||
return res.ok;
|
||
} catch { return false; }
|
||
}
|
||
|
||
describe.runIf(await ollamaUp())('Ollama live embed', () => {
|
||
it('returns 768 dims for nomic-embed-text', async () => {
|
||
const { embedText } = await import('../../lib/ai/ollama.js');
|
||
const v = await embedText('the cradle aesthetic');
|
||
expect(v).toHaveLength(768);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run — passes locally if CT 102 is up; auto-skips otherwise.
|
||
|
||
```bash
|
||
npx vitest run tests/integration/embed_live.test.js
|
||
```
|
||
|
||
- [ ] **Step 3: Commit.**
|
||
|
||
```bash
|
||
git add tests/integration/embed_live.test.js
|
||
git commit -m "test(ai): live ollama embed integration (gated)"
|
||
```
|
||
|
||
### Task C6: Phase C close — snapshot + memory checkpoint
|
||
|
||
- [ ] **Step 1:** Run full suite green.
|
||
- [ ] **Step 2:** Snapshot CT 310 + 311 (standing backup rule before the Phase C deploy lands):
|
||
```bash
|
||
ssh root@192.168.1.124 "pct snapshot 310 plan3_phase_c_$(date +%Y%m%d_%H%M) --description 'Plan 3 Phase C green'"
|
||
ssh root@192.168.1.124 "pct snapshot 311 plan3_phase_c_$(date +%Y%m%d_%H%M) --description 'Plan 3 Phase C green'"
|
||
```
|
||
- [ ] **Step 3:** Update memory: "Plan 3 Phase C complete: embed.text + hybrid search live".
|
||
|
||
---
|
||
|
||
## Phase D — Karakeep webhook + drag-drop UI + Jobs UI
|
||
|
||
### Task D1: Karakeep client
|
||
|
||
**Files:**
|
||
- Create: `lib/karakeep/client.js`
|
||
- Create: `tests/karakeep/client.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/karakeep/client.test.js
|
||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||
import { getBookmark } from '../../lib/karakeep/client.js';
|
||
|
||
beforeEach(() => {
|
||
global.fetch = vi.fn(async (url) => new Response(
|
||
JSON.stringify({ id: 'b-1', url: 'https://example.com', title: 'X', tags: [{ name: 'archive' }] }),
|
||
{ status: 200, headers: { 'content-type': 'application/json' }}
|
||
));
|
||
});
|
||
afterEach(() => vi.restoreAllMocks());
|
||
|
||
describe('karakeep.client', () => {
|
||
it('fetches a bookmark by id', async () => {
|
||
process.env.KARAKEEP_API_URL = 'https://karakeep.test';
|
||
process.env.KARAKEEP_API_TOKEN = 'tok';
|
||
const bm = await getBookmark('b-1');
|
||
expect(bm.url).toBe('https://example.com');
|
||
expect(global.fetch.mock.calls[0][1].headers.Authorization).toBe('Bearer tok');
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/karakeep/client.js
|
||
export async function getBookmark(id) {
|
||
const base = process.env.KARAKEEP_API_URL || 'https://karakeep.hynesy.com';
|
||
const tok = process.env.KARAKEEP_API_TOKEN || '';
|
||
const res = await fetch(`${base}/api/v1/bookmarks/${encodeURIComponent(id)}`, {
|
||
headers: { Authorization: 'Bearer ' + tok },
|
||
signal: AbortSignal.timeout(10_000)
|
||
});
|
||
if (res.status === 404) return null;
|
||
if (!res.ok) throw new Error(`karakeep ${res.status}`);
|
||
return res.json();
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4:** Run green.
|
||
|
||
- [ ] **Step 5: Commit.**
|
||
|
||
```bash
|
||
git add lib/karakeep/client.js tests/karakeep/client.test.js
|
||
git commit -m "feat(karakeep): thin bookmark fetch client"
|
||
```
|
||
|
||
### Task D2: `ingest.karakeep` worker
|
||
|
||
**Files:**
|
||
- Create: `lib/jobs/workers/karakeep.js`
|
||
- Modify: `lib/jobs/index.js`
|
||
- Create: `tests/jobs/workers/karakeep.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/jobs/workers/karakeep.test.js
|
||
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
|
||
import { resetDb } from '../../helpers/db.js';
|
||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||
import { stopBoss } from '../../helpers/boss.js';
|
||
import * as queue from '../../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||
import * as spaces from '../../../lib/db/repos/spaces.js';
|
||
import * as refs from '../../../lib/db/repos/refs.js';
|
||
|
||
beforeEach(async () => {
|
||
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||
});
|
||
afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); });
|
||
|
||
describe('ingest.karakeep worker', () => {
|
||
it('fetches bookmark and creates ref', async () => {
|
||
const sp = await spaces.create({ slug: 'k', name: 'K' }, { kind: 'user', id: null });
|
||
// first response is karakeep API, second is URL HTML fetch
|
||
const responses = [
|
||
new Response(JSON.stringify({ id: 'b-1', url: 'https://example.com/a', title: 'A', tags: [] }),
|
||
{ status: 200, headers: { 'content-type': 'application/json' }}),
|
||
new Response('<html><head><title>A</title></head><body><article><p>x</p></article></body></html>',
|
||
{ status: 200, headers: { 'content-type': 'text/html' }})
|
||
];
|
||
global.fetch = vi.fn(async () => responses.shift());
|
||
const id = await queue.enqueue('ingest.karakeep', { bookmark_id: 'b-1', space_id: sp.id });
|
||
for (let i = 0; i < 50; i++) {
|
||
const j = await queue.instance().getJobById(id);
|
||
if (j?.state === 'completed') break;
|
||
await new Promise(r => setTimeout(r, 100));
|
||
}
|
||
const rows = await refs.list({ space_id: sp.id });
|
||
expect(rows[0].source_kind).toBe('karakeep');
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement.**
|
||
|
||
```js
|
||
// lib/jobs/workers/karakeep.js
|
||
import crypto from 'node:crypto';
|
||
import { getBookmark } from '../../karakeep/client.js';
|
||
import { extract } from '../../ingest/readability.js';
|
||
import * as refs from '../../db/repos/refs.js';
|
||
import { pool } from '../../db/pool.js';
|
||
|
||
export const NAME = 'ingest.karakeep';
|
||
export const opts = { teamSize: 4, teamConcurrency: 4 };
|
||
|
||
function key(space_id, bookmark_id) {
|
||
return crypto.createHash('sha256')
|
||
.update(space_id + '\x00karakeep:' + bookmark_id).digest('hex');
|
||
}
|
||
|
||
export async function handler(job) {
|
||
const { bookmark_id, space_id } = job.data;
|
||
const bm = await getBookmark(bookmark_id);
|
||
if (!bm) return { skipped: 'gone' };
|
||
|
||
const idem = key(space_id, bookmark_id);
|
||
const { rows: [existing] } = await pool.query(
|
||
`SELECT id FROM refs WHERE source_kind='karakeep' AND external_id=$1 LIMIT 1`,
|
||
[idem]
|
||
);
|
||
if (existing) return { ref_id: existing.id, idempotent: true };
|
||
|
||
const html = bm.html_content || (await (await fetch(bm.url, {
|
||
headers: { 'User-Agent': 'void-ingest/2.0' }, signal: AbortSignal.timeout(15_000)
|
||
})).text());
|
||
const parsed = extract(html || '', bm.url);
|
||
|
||
const row = await refs.create({
|
||
space_id,
|
||
kind: 'url',
|
||
source_url: bm.url,
|
||
title: bm.title || parsed.title || bm.url,
|
||
summary: parsed.excerpt,
|
||
body_text: (parsed.textContent || '').slice(0, 200_000),
|
||
source_kind: 'karakeep',
|
||
external_id: idem,
|
||
metadata: { karakeep_id: bookmark_id, tags: (bm.tags || []).map(t => t.name) }
|
||
}, { kind: 'system', id: null });
|
||
|
||
return { ref_id: row.id };
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 4: Register.**
|
||
|
||
```js
|
||
import * as karakeep from './workers/karakeep.js';
|
||
const WORKERS = [echo, url, blob, embed, karakeep];
|
||
```
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/jobs/workers/karakeep.js lib/jobs/index.js tests/jobs/workers/karakeep.test.js
|
||
git commit -m "feat(jobs): ingest.karakeep worker"
|
||
```
|
||
|
||
### Task D3: `/api/ingest/karakeep` HMAC webhook
|
||
|
||
**Files:**
|
||
- Create: `lib/api/routes/ingest.js`
|
||
- Modify: `lib/api/index.js`
|
||
- Create: `tests/api/ingest.test.js`
|
||
|
||
- [ ] **Step 1: Write the failing test.**
|
||
|
||
```js
|
||
// tests/api/ingest.test.js
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import crypto from 'node:crypto';
|
||
import request from 'supertest';
|
||
import { setup } from './helpers.js';
|
||
import { stopBoss } from '../helpers/boss.js';
|
||
import * as queue from '../../lib/jobs/queue.js';
|
||
import { registerWorkers } from '../../lib/jobs/index.js';
|
||
import * as spaces from '../../lib/db/repos/spaces.js';
|
||
|
||
let app, sp;
|
||
const SECRET = 'test-karakeep-secret';
|
||
beforeEach(async () => {
|
||
({ app } = await setup());
|
||
process.env.KARAKEEP_WEBHOOK_SECRET = SECRET;
|
||
sp = await spaces.create({ slug: 'kw', name: 'KW' }, { kind: 'user', id: null });
|
||
process.env.KARAKEEP_DEFAULT_SPACE_ID = sp.id;
|
||
await queue.start(); await registerWorkers();
|
||
});
|
||
afterEach(async () => { await stopBoss(); });
|
||
|
||
function sign(body) {
|
||
return 'sha256=' + crypto.createHmac('sha256', SECRET).update(body).digest('hex');
|
||
}
|
||
|
||
describe('karakeep webhook', () => {
|
||
it('enqueues on valid signature', async () => {
|
||
const body = JSON.stringify({ event: 'bookmark.created', bookmark_id: 'b-1' });
|
||
const res = await request(app).post('/api/ingest/karakeep')
|
||
.set('X-Karakeep-Signature', sign(body))
|
||
.set('Content-Type', 'application/json')
|
||
.send(body);
|
||
expect(res.status).toBe(202);
|
||
expect(res.body.job_id).toBeTruthy();
|
||
});
|
||
it('401 on bad signature', async () => {
|
||
const res = await request(app).post('/api/ingest/karakeep')
|
||
.set('X-Karakeep-Signature', 'sha256=wrong')
|
||
.set('Content-Type', 'application/json')
|
||
.send('{"event":"bookmark.created","bookmark_id":"b-1"}');
|
||
expect(res.status).toBe(401);
|
||
});
|
||
});
|
||
```
|
||
|
||
- [ ] **Step 2:** Run red.
|
||
|
||
- [ ] **Step 3: Implement the route with raw body capture.**
|
||
|
||
```js
|
||
// lib/api/routes/ingest.js
|
||
import { Router, raw } from 'express';
|
||
import crypto from 'node:crypto';
|
||
import * as queue from '../../jobs/queue.js';
|
||
import { asyncWrap } from '../errors.js';
|
||
|
||
function verify(rawBody, headerSig) {
|
||
const secret = process.env.KARAKEEP_WEBHOOK_SECRET || '';
|
||
if (!headerSig || !secret) return false;
|
||
const expected = 'sha256=' +
|
||
crypto.createHmac('sha256', secret).update(rawBody).digest('hex');
|
||
try { return crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(headerSig)); }
|
||
catch { return false; }
|
||
}
|
||
|
||
export const router = Router();
|
||
|
||
// Karakeep route bypasses agentOrOwner — HMAC is its own auth.
|
||
router.post('/karakeep',
|
||
raw({ type: '*/*', limit: '1mb' }),
|
||
asyncWrap(async (req, res) => {
|
||
const sig = req.headers['x-karakeep-signature'];
|
||
if (!verify(req.body, sig)) return res.status(401).json({ error: { code: 'unauthorized', message: 'bad signature' } });
|
||
const payload = JSON.parse(req.body.toString('utf-8'));
|
||
if (payload.event !== 'bookmark.created') return res.status(202).json({ skipped: true });
|
||
const space_id = process.env.KARAKEEP_DEFAULT_SPACE_ID;
|
||
if (!space_id) return res.status(503).json({ error: { code: 'unconfigured', message: 'KARAKEEP_DEFAULT_SPACE_ID not set' } });
|
||
const job_id = await queue.enqueue('ingest.karakeep', {
|
||
bookmark_id: payload.bookmark_id, space_id
|
||
});
|
||
res.status(202).json({ job_id });
|
||
})
|
||
);
|
||
```
|
||
|
||
- [ ] **Step 4: Mount before the JSON body parser AND before agentOrOwner.**
|
||
|
||
Two changes:
|
||
|
||
1. `server.js` — capture the raw body alongside the parsed body so HMAC verify still works:
|
||
```js
|
||
app.use(express.json({
|
||
limit: '10mb',
|
||
verify: (req, _res, buf) => { req.rawBody = buf; }
|
||
}));
|
||
```
|
||
2. `lib/api/index.js` — mount the ingest router on `app` ahead of `mountApi`, OR inside `mountApi` before the `agentOrOwner` middleware. Either works; pick the former so it's obviously skipped from the owner gate:
|
||
```js
|
||
// server.js — additions
|
||
import { router as ingestRouter } from './lib/api/routes/ingest.js';
|
||
// after app.use(express.json...):
|
||
app.use('/api/ingest', ingestRouter);
|
||
// THEN: mountApi(app);
|
||
```
|
||
|
||
And inside `lib/api/routes/ingest.js`, drop the per-route `raw()` middleware — use `req.rawBody` populated by the verify hook instead:
|
||
|
||
```js
|
||
// lib/api/routes/ingest.js — replace the route body
|
||
import { Router } from 'express';
|
||
import crypto from 'node:crypto';
|
||
import * as queue from '../../jobs/queue.js';
|
||
import { asyncWrap } from '../errors.js';
|
||
|
||
function verify(rawBody, headerSig) {
|
||
const secret = process.env.KARAKEEP_WEBHOOK_SECRET || '';
|
||
if (!headerSig || !secret || !rawBody) return false;
|
||
const expected = 'sha256=' +
|
||
crypto.createHmac('sha256', secret).update(rawBody).digest('hex');
|
||
try { return crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(headerSig)); }
|
||
catch { return false; }
|
||
}
|
||
|
||
export const router = Router();
|
||
|
||
router.post('/karakeep', asyncWrap(async (req, res) => {
|
||
const sig = req.headers['x-karakeep-signature'];
|
||
if (!verify(req.rawBody, sig)) {
|
||
return res.status(401).json({ error: { code: 'unauthorized', message: 'bad signature' } });
|
||
}
|
||
const payload = req.body;
|
||
if (payload.event !== 'bookmark.created') return res.status(202).json({ skipped: true });
|
||
const space_id = process.env.KARAKEEP_DEFAULT_SPACE_ID;
|
||
if (!space_id) return res.status(503).json({ error: { code: 'unconfigured', message: 'KARAKEEP_DEFAULT_SPACE_ID not set' } });
|
||
const job_id = await queue.enqueue('ingest.karakeep', {
|
||
bookmark_id: payload.bookmark_id, space_id
|
||
});
|
||
res.status(202).json({ job_id });
|
||
}));
|
||
```
|
||
|
||
- [ ] **Step 5:** Run green.
|
||
|
||
- [ ] **Step 6: Commit.**
|
||
|
||
```bash
|
||
git add lib/api/routes/ingest.js lib/api/index.js tests/api/ingest.test.js
|
||
git commit -m "feat(api): karakeep webhook (HMAC)"
|
||
```
|
||
|
||
### Task D4: Drag-drop SPA component
|
||
|
||
**Files:**
|
||
- Create: `public/components/dropzone.js`
|
||
- Modify: `public/app.js` — initialize dropzone on `<main>`.
|
||
|
||
- [ ] **Step 1: Implement the component.**
|
||
|
||
```js
|
||
// public/components/dropzone.js
|
||
import { api } from '../api.js';
|
||
import { el, mount } from '../dom.js';
|
||
|
||
export function attachDropzone(target) {
|
||
function highlight(on) { target.style.outline = on ? '2px dashed var(--accent)' : ''; }
|
||
target.addEventListener('dragover', e => { e.preventDefault(); highlight(true); });
|
||
target.addEventListener('dragleave', () => highlight(false));
|
||
target.addEventListener('drop', async e => {
|
||
e.preventDefault(); highlight(false);
|
||
const files = [...e.dataTransfer.files];
|
||
const space_id = localStorage.getItem('last_space_id');
|
||
if (!space_id) { alert('Open a space first so we know where to drop these.'); return; }
|
||
for (const f of files) {
|
||
const fd = new FormData();
|
||
fd.append('file', f);
|
||
fd.append('space_id', space_id);
|
||
await fetch('/api/capture/upload', {
|
||
method: 'POST',
|
||
headers: { Authorization: 'Bearer ' + localStorage.getItem('void_token') },
|
||
body: fd
|
||
});
|
||
}
|
||
alert(`${files.length} file(s) queued.`);
|
||
});
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2: Wire into app.js.**
|
||
|
||
```js
|
||
// additions
|
||
import { attachDropzone } from './components/dropzone.js';
|
||
// inside init() after renderRightrail(...):
|
||
attachDropzone(document.getElementById('main'));
|
||
```
|
||
|
||
- [ ] **Step 3:** Run server tests — still green.
|
||
|
||
```bash
|
||
npx vitest run tests/server.test.js
|
||
```
|
||
|
||
- [ ] **Step 4: Commit.**
|
||
|
||
```bash
|
||
git add public/components/dropzone.js public/app.js
|
||
git commit -m "feat(ui): drag-drop capture"
|
||
```
|
||
|
||
### Task D5: Expand Jobs UI
|
||
|
||
**Files:**
|
||
- Modify: `public/views/jobs.js`
|
||
|
||
- [ ] **Step 1: Replace with the table view.**
|
||
|
||
```js
|
||
// public/views/jobs.js — REPLACE
|
||
import { api } from '../api.js';
|
||
import { el, mount, clear } from '../dom.js';
|
||
|
||
function badge(state) {
|
||
const cls = state === 'completed' ? 'ok' : state === 'failed' ? 'bad'
|
||
: state === 'active' ? 'warn' : 'idle';
|
||
return el('span', { class: 'status ' + cls }, state);
|
||
}
|
||
|
||
function row(j, onActed) {
|
||
return el('li', {},
|
||
badge(j.state), ' ',
|
||
el('span', { style: { fontFamily: 'var(--font-mono)' } }, j.name), ' ',
|
||
el('span', { class: 'muted' }, (j.id || '').slice(0, 8)), ' ',
|
||
el('button', {
|
||
class: 'ghost',
|
||
onclick: async () => { await api.post(`/api/jobs/${j.id}/retry`); onActed(); }
|
||
}, 'retry'),
|
||
' ',
|
||
el('button', {
|
||
class: 'ghost',
|
||
onclick: async () => { await api.del(`/api/jobs/${j.id}`); onActed(); }
|
||
}, 'delete')
|
||
);
|
||
}
|
||
|
||
async function refresh(container) {
|
||
const rows = await api.get('/api/jobs?limit=100');
|
||
clear(container);
|
||
if (!rows.length) { container.appendChild(el('p', { class: 'muted' }, 'No jobs.')); return; }
|
||
const byState = new Map();
|
||
for (const r of rows) {
|
||
if (!byState.has(r.state)) byState.set(r.state, []);
|
||
byState.get(r.state).push(r);
|
||
}
|
||
for (const [state, items] of byState) {
|
||
container.appendChild(el('div', { class: 'sb-title' }, `${state} (${items.length})`));
|
||
container.appendChild(el('ul', { class: 'plain' }, items.map(j => row(j, () => refresh(container)))));
|
||
}
|
||
}
|
||
|
||
export async function render(main) {
|
||
const wrap = el('div');
|
||
mount(main,
|
||
el('h1', { class: 'view-h1' }, 'Jobs'),
|
||
el('p', { class: 'view-sub muted' }, 'pg-boss queue — polls every 10 s.'),
|
||
wrap
|
||
);
|
||
await refresh(wrap);
|
||
const handle = setInterval(() => refresh(wrap), 10_000);
|
||
// Stop polling when the view changes.
|
||
window.addEventListener('hashchange', () => clearInterval(handle), { once: true });
|
||
}
|
||
```
|
||
|
||
- [ ] **Step 2:** Run server tests.
|
||
|
||
```bash
|
||
npx vitest run tests/server.test.js
|
||
```
|
||
|
||
- [ ] **Step 3: Commit.**
|
||
|
||
```bash
|
||
git add public/views/jobs.js
|
||
git commit -m "feat(ui): Jobs panel with retry/delete + polling"
|
||
```
|
||
|
||
### Task D6: Phase D close — version bump, changelog, full suite
|
||
|
||
**Files:**
|
||
- Modify: `package.json`, `server.js`, `CHANGELOG.md`, `tests/server.test.js`
|
||
|
||
- [ ] **Step 1: Bump version.**
|
||
|
||
`package.json`: `"version": "2.0.0-alpha.3"`.
|
||
`server.js`: `const VERSION = '2.0.0-alpha.3';`.
|
||
`tests/server.test.js`: assertion to `'2.0.0-alpha.3'`.
|
||
|
||
- [ ] **Step 2: Append CHANGELOG entry.**
|
||
|
||
```md
|
||
## [2.0.0-alpha.3] — 2026-06-NN
|
||
|
||
### Added (Plan 3: Capture pipeline + hybrid search)
|
||
- pg-boss-backed job queue inside void-server. Owner-only /api/jobs.
|
||
- /api/capture POST (URL) + /api/capture/upload (multipart) enqueue ingest jobs.
|
||
- ingest.url worker: fetch + readability extract → refs row, idempotent by sha256(space+url).
|
||
- ingest.blob worker: sha256 + content-addressed blob store at /var/lib/void/blobs/.
|
||
- embed.text worker: Ollama nomic-embed-text (768 dims) zero-padded to 1024.
|
||
- Repo-level triggers fire embed.text after page / ref / source_doc create/update; singleton key coalesces rapid edits.
|
||
- Hybrid /api/search: FTS + vector via RRF (k=60). Graceful fallback to FTS-only when Ollama is down.
|
||
- Karakeep webhook /api/ingest/karakeep with HMAC verification.
|
||
- Drag-drop upload onto the main panel.
|
||
- Jobs view with state grouping, retry, delete, 10 s polling.
|
||
```
|
||
|
||
- [ ] **Step 3: Full suite green.**
|
||
|
||
```bash
|
||
npx vitest run
|
||
```
|
||
|
||
- [ ] **Step 4: Commit.**
|
||
|
||
```bash
|
||
git add package.json server.js CHANGELOG.md tests/server.test.js
|
||
git commit -m "chore: version 2.0.0-alpha.3 + changelog"
|
||
```
|
||
|
||
### Task D7: Plan 3 close — snapshot + completion doc + memory
|
||
|
||
- [ ] **Step 1: Snapshot CT 310 + 311** with name `plan3_complete_<timestamp>`.
|
||
- [ ] **Step 2:** Write `docs/plan-3-complete.md` mirroring `plan-2-complete.md`'s shape.
|
||
- [ ] **Step 3: Commit the completion doc.**
|
||
```bash
|
||
git add docs/plan-3-complete.md
|
||
git commit -m "docs: Plan 3 completion summary"
|
||
```
|
||
- [ ] **Step 4: Update memory** to mark Plan 3 complete, set Plan 4 (`void-workers` Python) as next.
|
||
|
||
---
|
||
|
||
## Spec coverage check
|
||
|
||
Every spec section maps to at least one task:
|
||
- pg-boss bootstrap + Jobs API → Phase A (A1–A7).
|
||
- /api/capture + URL worker + blob storage → Phase B (B1–B6).
|
||
- Embeddings + Ollama + RRF hybrid search → Phase C (C1–C5).
|
||
- Karakeep webhook + drag-drop + Jobs UI fill-in → Phase D (D1–D5).
|
||
- Version bump + changelog + completion doc → D6–D7.
|
||
- Standing backup rule honored: snapshots at end of Phase C and Plan 3.
|
||
|
||
## Type & name consistency
|
||
|
||
- Worker name strings used: `echo`, `ingest.url`, `ingest.blob`, `ingest.karakeep`, `embed.text` — same strings in `NAME`, in registration, in tests, and in `triggerEmbed` callers.
|
||
- Idempotency keys: `sha256(space_id + '\x00' + url)` for URL ingest and `sha256(space_id + '\x00karakeep:' + bookmark_id)` for Karakeep. Stored as `refs.external_id` with `source_kind` set to `'url'` or `'karakeep'`.
|
||
- Response shape from `/api/capture`: `{ job_id, idempotency_key, ref_id? }` — consistent across tasks B6 and A6 (`ref_id` only set when idempotent hit).
|
||
- `triggerEmbed(entity_type, entity_id)` signature is used identically in pages, refs, and source_docs repos.
|