diff --git a/lib/jobs/index.js b/lib/jobs/index.js index 0b28f47..e900f48 100644 --- a/lib/jobs/index.js +++ b/lib/jobs/index.js @@ -1,8 +1,9 @@ import * as queue from './queue.js'; import * as echo from './workers/echo.js'; import * as url from './workers/url.js'; +import * as blob from './workers/blob.js'; -const WORKERS = [echo, url]; +const WORKERS = [echo, url, blob]; export async function registerWorkers() { for (const w of WORKERS) { diff --git a/lib/jobs/workers/blob.js b/lib/jobs/workers/blob.js new file mode 100644 index 0000000..5571fdb --- /dev/null +++ b/lib/jobs/workers/blob.js @@ -0,0 +1,31 @@ +import fs from 'node:fs/promises'; +import * as refs from '../../db/repos/refs.js'; +import { defaultStore } from '../../ingest/blob_store.js'; + +export const NAME = 'ingest.blob'; + +function kindFor(content_type, filename) { + if (content_type?.startsWith('image/')) return 'image'; + if (content_type === 'application/pdf' || filename?.toLowerCase().endsWith('.pdf')) return 'pdf'; + return 'file'; +} + +export async function handler(job) { + const { space_id, tmp_path, filename, content_type, meta = {} } = job.data; + const buf = await fs.readFile(tmp_path); + const { sha, path } = await defaultStore().write(buf); + try { await fs.unlink(tmp_path); } catch { /* */ } + + const kind = kindFor(content_type, filename); + const row = await refs.create({ + space_id, + kind, + source_url: null, + title: meta.title || filename || sha.slice(0, 12), + summary: null, + body_text: null, + blob_path: path, + metadata: { sha, content_type, filename, size: buf.length, ...(meta.metadata || {}) } + }, { kind: 'system', id: null }); + return { ref_id: row.id, sha }; +} diff --git a/tests/jobs/workers/blob.test.js b/tests/jobs/workers/blob.test.js new file mode 100644 index 0000000..ae96a5f --- /dev/null +++ b/tests/jobs/workers/blob.test.js @@ -0,0 +1,48 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import fs from 'node:fs/promises'; +import path from 'node:path'; +import os from 'node:os'; +import { resetDb } from '../../helpers/db.js'; +import { migrateUp } from '../../../lib/db/migrate.js'; +import { stopBoss, waitForJob } from '../../helpers/boss.js'; +import * as queue from '../../../lib/jobs/queue.js'; +import { registerWorkers } from '../../../lib/jobs/index.js'; +import * as spaces from '../../../lib/db/repos/spaces.js'; +import * as refs from '../../../lib/db/repos/refs.js'; + +let tmpRoot; +beforeEach(async () => { + tmpRoot = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blobs-')); + process.env.BLOB_ROOT = tmpRoot; + await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); +}); +afterEach(async () => { await stopBoss(); }); + +describe('ingest.blob worker', () => { + it('creates a ref pointing at the blob (kind=file)', async () => { + const sp = await spaces.create({ slug: 'b', name: 'B' }, { kind: 'user', id: null }); + const upTmp = path.join(tmpRoot, 'up.tmp'); + await fs.writeFile(upTmp, Buffer.from('hello blob')); + const id = await queue.enqueue('ingest.blob', { + space_id: sp.id, tmp_path: upTmp, filename: 'hello.txt', content_type: 'text/plain' + }); + const j = await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); + expect(j.state).toBe('completed'); + const rows = await refs.list({ space_id: sp.id }); + expect(rows[0].kind).toBe('file'); + expect(rows[0].blob_path).toBeTruthy(); + expect(rows[0].title).toBe('hello.txt'); + }); + + it('classifies image content_type as image kind', async () => { + const sp = await spaces.create({ slug: 'b2', name: 'B2' }, { kind: 'user', id: null }); + const upTmp = path.join(tmpRoot, 'pic.tmp'); + await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47])); // PNG magic + const id = await queue.enqueue('ingest.blob', { + space_id: sp.id, tmp_path: upTmp, filename: 'x.png', content_type: 'image/png' + }); + await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); + const rows = await refs.list({ space_id: sp.id }); + expect(rows[0].kind).toBe('image'); + }); +});