From d1e986bc9cd351d1967a94bc3c018b62dac0636d Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 03:55:03 +1000 Subject: [PATCH] feat(jobs): ingest.karakeep worker Co-Authored-By: Claude Opus 4.7 --- lib/jobs/index.js | 3 +- lib/jobs/workers/karakeep.js | 50 +++++++++++++++++++++++++++ tests/jobs/workers/karakeep.test.js | 52 +++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 lib/jobs/workers/karakeep.js create mode 100644 tests/jobs/workers/karakeep.test.js diff --git a/lib/jobs/index.js b/lib/jobs/index.js index 6c087bf..8bc66b9 100644 --- a/lib/jobs/index.js +++ b/lib/jobs/index.js @@ -3,8 +3,9 @@ import * as echo from './workers/echo.js'; import * as url from './workers/url.js'; import * as blob from './workers/blob.js'; import * as embed from './workers/embed.js'; +import * as karakeep from './workers/karakeep.js'; -const WORKERS = [echo, url, blob, embed]; +const WORKERS = [echo, url, blob, embed, karakeep]; export async function registerWorkers() { for (const w of WORKERS) { diff --git a/lib/jobs/workers/karakeep.js b/lib/jobs/workers/karakeep.js new file mode 100644 index 0000000..7aa596a --- /dev/null +++ b/lib/jobs/workers/karakeep.js @@ -0,0 +1,50 @@ +import crypto from 'node:crypto'; +import { getBookmark } from '../../karakeep/client.js'; +import { safeFetch } from '../../ingest/safe_fetch.js'; +import { extract } from '../../ingest/readability.js'; +import * as refs from '../../db/repos/refs.js'; +import { pool } from '../../db/pool.js'; + +export const NAME = 'ingest.karakeep'; + +function key(space_id, bookmark_id) { + return crypto.createHash('sha256') + .update(space_id + '\x00karakeep:' + bookmark_id).digest('hex'); +} + +export async function handler(job) { + const { bookmark_id, space_id } = job.data; + const bm = await getBookmark(bookmark_id); + if (!bm) return { skipped: 'gone' }; + + const idem = key(space_id, bookmark_id); + const { rows: [existing] } = await pool.query( + `SELECT id FROM refs WHERE source_kind='karakeep' AND external_id=$1 LIMIT 1`, + [idem] + ); + if (existing) return { ref_id: existing.id, idempotent: true }; + + let html = bm.html_content; + if (!html && bm.url) { + const res = await safeFetch(bm.url, { + headers: { 'User-Agent': 'void-ingest/2.0' }, + signal: AbortSignal.timeout(15_000) + }); + if (res.ok) html = await res.text(); + } + const parsed = html ? extract(html, bm.url) : { title: null, textContent: '', excerpt: null }; + + const row = await refs.create({ + space_id, + kind: 'url', + source_url: bm.url, + title: bm.title || parsed.title || bm.url, + summary: parsed.excerpt, + body_text: (parsed.textContent || '').slice(0, 200_000), + source_kind: 'karakeep', + external_id: idem, + metadata: { karakeep_id: bookmark_id, tags: (bm.tags || []).map(t => t.name) } + }, { kind: 'system', id: null }); + + return { ref_id: row.id }; +} diff --git a/tests/jobs/workers/karakeep.test.js b/tests/jobs/workers/karakeep.test.js new file mode 100644 index 0000000..6c46db0 --- /dev/null +++ b/tests/jobs/workers/karakeep.test.js @@ -0,0 +1,52 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { resetDb } from '../../helpers/db.js'; +import { migrateUp } from '../../../lib/db/migrate.js'; +import { stopBoss, waitForJob } from '../../helpers/boss.js'; +import * as queue from '../../../lib/jobs/queue.js'; +import { registerWorkers } from '../../../lib/jobs/index.js'; +import * as spaces from '../../../lib/db/repos/spaces.js'; +import * as refs from '../../../lib/db/repos/refs.js'; + +const BM_RESPONSE = JSON.stringify({ + id: 'b-1', url: 'https://example.com/a', title: 'A', + html_content: 'A

Article body content long enough to satisfy readability heuristics.

Another paragraph.

', + tags: [{ name: 'archive' }, { name: 'inbox' }] +}); + +beforeEach(async () => { + await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); + global.fetch = vi.fn(async () => new Response(BM_RESPONSE, { + status: 200, headers: { 'content-type': 'application/json' } + })); +}); +afterEach(async () => { await stopBoss(); vi.restoreAllMocks(); }); + +describe('ingest.karakeep worker', () => { + it('creates a ref with source_kind=karakeep', async () => { + const sp = await spaces.create({ slug: 'k', name: 'K' }, { kind: 'user', id: null }); + const id = await queue.enqueue('ingest.karakeep', { bookmark_id: 'b-1', space_id: sp.id }); + const j = await waitForJob('ingest.karakeep', id, { timeoutMs: 10_000 }); + expect(j.state).toBe('completed'); + const rows = await refs.list({ space_id: sp.id }); + expect(rows[0].source_kind).toBe('karakeep'); + expect(rows[0].title).toBe('A'); + expect(rows[0].metadata.tags).toEqual(['archive','inbox']); + }); + + it('idempotent on repeat ingest of the same bookmark_id', async () => { + const sp = await spaces.create({ slug: 'k2', name: 'K2' }, { kind: 'user', id: null }); + const id1 = await queue.enqueue('ingest.karakeep', { bookmark_id: 'b-1', space_id: sp.id }); + await waitForJob('ingest.karakeep', id1, { timeoutMs: 10_000 }); + const id2 = await queue.enqueue('ingest.karakeep', { bookmark_id: 'b-1', space_id: sp.id }); + const j2 = await waitForJob('ingest.karakeep', id2, { timeoutMs: 10_000 }); + expect(j2.output.idempotent).toBe(true); + }); + + it('skipped when bookmark is gone (Karakeep 404)', async () => { + global.fetch = vi.fn(async () => new Response('', { status: 404 })); + const sp = await spaces.create({ slug: 'k3', name: 'K3' }, { kind: 'user', id: null }); + const id = await queue.enqueue('ingest.karakeep', { bookmark_id: 'gone', space_id: sp.id }); + const j = await waitForJob('ingest.karakeep', id, { timeoutMs: 10_000 }); + expect(j.output.skipped).toBe('gone'); + }); +});