diff --git a/lib/jobs/workers/blob.js b/lib/jobs/workers/blob.js index 5571fdb..ad4fd89 100644 --- a/lib/jobs/workers/blob.js +++ b/lib/jobs/workers/blob.js @@ -1,6 +1,7 @@ import fs from 'node:fs/promises'; import * as refs from '../../db/repos/refs.js'; import { defaultStore } from '../../ingest/blob_store.js'; +import * as queue from '../queue.js'; export const NAME = 'ingest.blob'; @@ -27,5 +28,12 @@ export async function handler(job) { blob_path: path, metadata: { sha, content_type, filename, size: buf.length, ...(meta.metadata || {}) } }, { kind: 'system', id: null }); + + // Plan 4: hand off to the Python void-workers for OCR / extraction. + if (kind === 'pdf') { + await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path }); + } else if (kind === 'image') { + await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path }); + } return { ref_id: row.id, sha }; } diff --git a/tests/jobs/workers/blob.test.js b/tests/jobs/workers/blob.test.js index ae96a5f..cb977f1 100644 --- a/tests/jobs/workers/blob.test.js +++ b/tests/jobs/workers/blob.test.js @@ -9,6 +9,7 @@ import * as queue from '../../../lib/jobs/queue.js'; import { registerWorkers } from '../../../lib/jobs/index.js'; import * as spaces from '../../../lib/db/repos/spaces.js'; import * as refs from '../../../lib/db/repos/refs.js'; +import * as jobsRepo from '../../../lib/db/repos/jobs.js'; let tmpRoot; beforeEach(async () => { @@ -45,4 +46,28 @@ describe('ingest.blob worker', () => { const rows = await refs.list({ space_id: sp.id }); expect(rows[0].kind).toBe('image'); }); + + it('enqueues extract.pdf after creating a pdf ref', async () => { + const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null }); + const upTmp = path.join(tmpRoot, 'doc.tmp'); + await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...')); + const id = await queue.enqueue('ingest.blob', { + space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf' + }); + await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); + const pending = await jobsRepo.list({ name: 'extract.pdf' }); + expect(pending.length).toBeGreaterThan(0); + }); + + it('enqueues extract.image after creating an image ref', async () => { + const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null }); + const upTmp = path.join(tmpRoot, 'pic.tmp'); + await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47])); + const id = await queue.enqueue('ingest.blob', { + space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png' + }); + await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); + const pending = await jobsRepo.list({ name: 'extract.image' }); + expect(pending.length).toBeGreaterThan(0); + }); });