feat(jobs): blob worker fans out to extract.pdf / extract.image
After creating a ref, the Node-side ingest.blob worker enqueues a follow-up job for the Python void-workers (Plan 4) to OCR / extract text. Other kinds (file) get no follow-up. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import fs from 'node:fs/promises';
|
||||
import * as refs from '../../db/repos/refs.js';
|
||||
import { defaultStore } from '../../ingest/blob_store.js';
|
||||
import * as queue from '../queue.js';
|
||||
|
||||
export const NAME = 'ingest.blob';
|
||||
|
||||
@@ -27,5 +28,12 @@ export async function handler(job) {
|
||||
blob_path: path,
|
||||
metadata: { sha, content_type, filename, size: buf.length, ...(meta.metadata || {}) }
|
||||
}, { kind: 'system', id: null });
|
||||
|
||||
// Plan 4: hand off to the Python void-workers for OCR / extraction.
|
||||
if (kind === 'pdf') {
|
||||
await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path });
|
||||
} else if (kind === 'image') {
|
||||
await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path });
|
||||
}
|
||||
return { ref_id: row.id, sha };
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import * as queue from '../../../lib/jobs/queue.js';
|
||||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||||
import * as spaces from '../../../lib/db/repos/spaces.js';
|
||||
import * as refs from '../../../lib/db/repos/refs.js';
|
||||
import * as jobsRepo from '../../../lib/db/repos/jobs.js';
|
||||
|
||||
let tmpRoot;
|
||||
beforeEach(async () => {
|
||||
@@ -45,4 +46,28 @@ describe('ingest.blob worker', () => {
|
||||
const rows = await refs.list({ space_id: sp.id });
|
||||
expect(rows[0].kind).toBe('image');
|
||||
});
|
||||
|
||||
it('enqueues extract.pdf after creating a pdf ref', async () => {
|
||||
const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null });
|
||||
const upTmp = path.join(tmpRoot, 'doc.tmp');
|
||||
await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...'));
|
||||
const id = await queue.enqueue('ingest.blob', {
|
||||
space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf'
|
||||
});
|
||||
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
|
||||
const pending = await jobsRepo.list({ name: 'extract.pdf' });
|
||||
expect(pending.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('enqueues extract.image after creating an image ref', async () => {
|
||||
const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null });
|
||||
const upTmp = path.join(tmpRoot, 'pic.tmp');
|
||||
await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47]));
|
||||
const id = await queue.enqueue('ingest.blob', {
|
||||
space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png'
|
||||
});
|
||||
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
|
||||
const pending = await jobsRepo.list({ name: 'extract.image' });
|
||||
expect(pending.length).toBeGreaterThan(0);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user