feat(jobs): ingest.blob worker (content-addressed)
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
import * as queue from './queue.js';
|
||||
import * as echo from './workers/echo.js';
|
||||
import * as url from './workers/url.js';
|
||||
import * as blob from './workers/blob.js';
|
||||
|
||||
const WORKERS = [echo, url];
|
||||
const WORKERS = [echo, url, blob];
|
||||
|
||||
export async function registerWorkers() {
|
||||
for (const w of WORKERS) {
|
||||
|
||||
31
lib/jobs/workers/blob.js
Normal file
31
lib/jobs/workers/blob.js
Normal file
@@ -0,0 +1,31 @@
|
||||
import fs from 'node:fs/promises';
|
||||
import * as refs from '../../db/repos/refs.js';
|
||||
import { defaultStore } from '../../ingest/blob_store.js';
|
||||
|
||||
export const NAME = 'ingest.blob';
|
||||
|
||||
function kindFor(content_type, filename) {
|
||||
if (content_type?.startsWith('image/')) return 'image';
|
||||
if (content_type === 'application/pdf' || filename?.toLowerCase().endsWith('.pdf')) return 'pdf';
|
||||
return 'file';
|
||||
}
|
||||
|
||||
export async function handler(job) {
|
||||
const { space_id, tmp_path, filename, content_type, meta = {} } = job.data;
|
||||
const buf = await fs.readFile(tmp_path);
|
||||
const { sha, path } = await defaultStore().write(buf);
|
||||
try { await fs.unlink(tmp_path); } catch { /* */ }
|
||||
|
||||
const kind = kindFor(content_type, filename);
|
||||
const row = await refs.create({
|
||||
space_id,
|
||||
kind,
|
||||
source_url: null,
|
||||
title: meta.title || filename || sha.slice(0, 12),
|
||||
summary: null,
|
||||
body_text: null,
|
||||
blob_path: path,
|
||||
metadata: { sha, content_type, filename, size: buf.length, ...(meta.metadata || {}) }
|
||||
}, { kind: 'system', id: null });
|
||||
return { ref_id: row.id, sha };
|
||||
}
|
||||
48
tests/jobs/workers/blob.test.js
Normal file
48
tests/jobs/workers/blob.test.js
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
import os from 'node:os';
|
||||
import { resetDb } from '../../helpers/db.js';
|
||||
import { migrateUp } from '../../../lib/db/migrate.js';
|
||||
import { stopBoss, waitForJob } from '../../helpers/boss.js';
|
||||
import * as queue from '../../../lib/jobs/queue.js';
|
||||
import { registerWorkers } from '../../../lib/jobs/index.js';
|
||||
import * as spaces from '../../../lib/db/repos/spaces.js';
|
||||
import * as refs from '../../../lib/db/repos/refs.js';
|
||||
|
||||
let tmpRoot;
|
||||
beforeEach(async () => {
|
||||
tmpRoot = await fs.mkdtemp(path.join(os.tmpdir(), 'void-blobs-'));
|
||||
process.env.BLOB_ROOT = tmpRoot;
|
||||
await resetDb(); await migrateUp(); await queue.start(); await registerWorkers();
|
||||
});
|
||||
afterEach(async () => { await stopBoss(); });
|
||||
|
||||
describe('ingest.blob worker', () => {
|
||||
it('creates a ref pointing at the blob (kind=file)', async () => {
|
||||
const sp = await spaces.create({ slug: 'b', name: 'B' }, { kind: 'user', id: null });
|
||||
const upTmp = path.join(tmpRoot, 'up.tmp');
|
||||
await fs.writeFile(upTmp, Buffer.from('hello blob'));
|
||||
const id = await queue.enqueue('ingest.blob', {
|
||||
space_id: sp.id, tmp_path: upTmp, filename: 'hello.txt', content_type: 'text/plain'
|
||||
});
|
||||
const j = await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
|
||||
expect(j.state).toBe('completed');
|
||||
const rows = await refs.list({ space_id: sp.id });
|
||||
expect(rows[0].kind).toBe('file');
|
||||
expect(rows[0].blob_path).toBeTruthy();
|
||||
expect(rows[0].title).toBe('hello.txt');
|
||||
});
|
||||
|
||||
it('classifies image content_type as image kind', async () => {
|
||||
const sp = await spaces.create({ slug: 'b2', name: 'B2' }, { kind: 'user', id: null });
|
||||
const upTmp = path.join(tmpRoot, 'pic.tmp');
|
||||
await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47])); // PNG magic
|
||||
const id = await queue.enqueue('ingest.blob', {
|
||||
space_id: sp.id, tmp_path: upTmp, filename: 'x.png', content_type: 'image/png'
|
||||
});
|
||||
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
|
||||
const rows = await refs.list({ space_id: sp.id });
|
||||
expect(rows[0].kind).toBe('image');
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user