From 1f0e9a5f1b2fb6b072950d8ef8056ed714e4597a Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 04:59:53 +1000 Subject: [PATCH] feat(workers): extract.pdf with Tesseract fallback pdftotext first; falls back to per-page pdftoppm rasterization + Tesseract OCR when the extracted text is < 200 chars. Updates refs.body_text + metadata.extract.{method,chars} via the repo shim; audit entry emitted with actor_kind='worker'. born_digital.pdf fixture padded so pdftotext yields > 200 chars and the test exercises the pdftotext path, not the OCR fallback. Co-Authored-By: Claude Opus 4.7 --- workers/tests/fixtures/born_digital.pdf | Bin 2471 -> 2749 bytes workers/tests/test_pdf.py | 58 +++++++++++++ workers/void_workers/handlers/__init__.py | 3 +- workers/void_workers/handlers/pdf.py | 47 ++++++++++ workers/void_workers/repo.py | 99 ++++++++++++++++++++++ 5 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 workers/tests/test_pdf.py create mode 100644 workers/void_workers/handlers/pdf.py create mode 100644 workers/void_workers/repo.py diff --git a/workers/tests/fixtures/born_digital.pdf b/workers/tests/fixtures/born_digital.pdf index f5c97ada6c6dd36a1e035e8239f3b318deca1bfa..3e722a9fa1500baddfdfed39e8bdfcbbad9059a5 100644 GIT binary patch delta 762 zcmZ23yjOI>+j`T$(|NZI1onK^eip{wA*-1&v0C2p>4o-N*1qfB`2Lu1a%uM8+Ub4V zLJC{w+?**p*WisCPwibl@v{xhmNP#_9erCMIA8tqhtHgk-F-G!viDb1U9nB*N-IA1 zW_x{c<>H`?Q|3Hh`|g18!Sw4&syFZYu3vUI{!E%x(b4bg&oAH(uU|Q#$5?b;_=$^L zV)ou*yEtO>TIaNcP7GQzx$67Dp84y1HQt@9{a@J+t9U*t9O!K|Mihxq0^i&@0C}3_F;lv}>cUj3=o(#qbsU6&d z)V!49lA_eaTrMChKPijLOu;|_L>pTeZ!Tb*z|3N4Wo$N?h1DFwuxCB2W|o+0oNQ>6 zq-$wvoTh7Nn3k+-00g?p21aR#rsk%J7Un7H`jh9fs%`dTJH)7JX>6u#XrOLjpsvZK z@0*|El30?e;bLWEU}R=sW?*PwVhU6-*_6Y~-on&G0c4bcLY@K_m|d5#bu^opa7&dj0H zFvQG_jVG&c*~B`TTe!M9Ia``J8n~EPSh`p^o4J{|nj0IMIhvZ7m>JmF5L6M%WoO4# ZT#{H+Qc;we#${w^Xu+kb>gw;t1pw|#eBA&5 diff --git a/workers/tests/test_pdf.py b/workers/tests/test_pdf.py new file mode 100644 index 0000000..f11670f --- /dev/null +++ b/workers/tests/test_pdf.py @@ -0,0 +1,58 @@ +import subprocess +from pathlib import Path +from void_workers.handlers.pdf import handle as handle_pdf + +FIXTURES = Path(__file__).parent / "fixtures" + + +def _run_node_migrations(): + subprocess.run( + ["node", "lib/db/migrate.js", "up"], + cwd="/project/src/void-v2", + check=True + ) + + +def _reset_void_schema(conn): + """Mirror tests/helpers/db.js::resetDb on the Node side.""" + conn.execute("DROP SCHEMA IF EXISTS public CASCADE") + conn.execute("CREATE SCHEMA public") + conn.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto") + conn.execute("CREATE EXTENSION IF NOT EXISTS vector") + + +def _seed_space_and_ref(conn, blob_path, kind="pdf"): + sp = conn.execute( + "INSERT INTO spaces(slug, name) VALUES('plan4-tests', 'P4') " + "ON CONFLICT (slug) DO UPDATE SET name=EXCLUDED.name RETURNING id" + ).fetchone()[0] + ref = conn.execute( + "INSERT INTO refs(space_id, kind, source_url, title, blob_path) " + "VALUES(%s, %s, NULL, 'fixture', %s) RETURNING id", + (sp, kind, str(blob_path)) + ).fetchone()[0] + return sp, ref + + +def test_pdf_born_digital_uses_pdftotext(conn): + _reset_void_schema(conn) + _run_node_migrations() + blob = FIXTURES / "born_digital.pdf" + sp, ref = _seed_space_and_ref(conn, blob) + out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)}) + assert out["ref_id"] == str(ref) + assert out["method"] == "pdftotext" + assert out["chars"] > 0 + row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() + assert "void-workers" in (row[0] or "").lower() + + +def test_pdf_scanned_falls_back_to_tesseract(conn): + _reset_void_schema(conn) + _run_node_migrations() + blob = FIXTURES / "scanned.pdf" + sp, ref = _seed_space_and_ref(conn, blob) + out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)}) + assert out["method"] == "tesseract" + row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() + assert "blackflame" in (row[0] or "").lower() diff --git a/workers/void_workers/handlers/__init__.py b/workers/void_workers/handlers/__init__.py index 462c509..258277d 100644 --- a/workers/void_workers/handlers/__init__.py +++ b/workers/void_workers/handlers/__init__.py @@ -1,5 +1,6 @@ -from . import echo +from . import echo, pdf REGISTRY = { echo.NAME: echo.handle, + pdf.NAME: pdf.handle, } diff --git a/workers/void_workers/handlers/pdf.py b/workers/void_workers/handlers/pdf.py new file mode 100644 index 0000000..71eaffc --- /dev/null +++ b/workers/void_workers/handlers/pdf.py @@ -0,0 +1,47 @@ +import subprocess +import tempfile +from pathlib import Path +from PIL import Image +import pytesseract +from .. import repo + +NAME = "extract.pdf" +FALLBACK_THRESHOLD = 200 # chars below which we OCR + + +def _pdftotext(blob_path): + return subprocess.check_output( + ["pdftotext", "-layout", blob_path, "-"], timeout=120 + ).decode("utf-8", errors="replace") + + +def _ocr_pdf(blob_path): + """Rasterize each page with pdftoppm, OCR each with Tesseract.""" + with tempfile.TemporaryDirectory() as tmp: + subprocess.run( + ["pdftoppm", "-r", "200", "-png", blob_path, f"{tmp}/p"], + check=True, timeout=300 + ) + pages = sorted(Path(tmp).glob("p-*.png")) + parts = [] + for p in pages: + img = Image.open(p) + parts.append(pytesseract.image_to_string(img, lang="eng")) + return "\n".join(parts) + + +def handle(job_data: dict) -> dict: + ref_id = job_data["ref_id"] + blob_path = job_data["blob_path"] + method = "pdftotext" + text = _pdftotext(blob_path).strip() + if len(text) < FALLBACK_THRESHOLD: + method = "tesseract" + text = _ocr_pdf(blob_path).strip() + body_text = text[:200_000] + repo.update_ref( + ref_id, + body_text=body_text, + metadata_patch={"extract": {"method": method, "chars": len(body_text)}} + ) + return {"ref_id": ref_id, "chars": len(body_text), "method": method} diff --git a/workers/void_workers/repo.py b/workers/void_workers/repo.py new file mode 100644 index 0000000..a35e790 --- /dev/null +++ b/workers/void_workers/repo.py @@ -0,0 +1,99 @@ +import json +import psycopg +from psycopg.rows import dict_row +from .config import DATABASE_URL + + +def _conn(): + return psycopg.connect( + DATABASE_URL, autocommit=True, row_factory=dict_row, client_encoding='UTF8' + ) + + +def _maybe_json(v): + return json.dumps(v) if isinstance(v, (dict, list)) else v + + +def update_ref(ref_id, *, body_text=None, metadata_patch=None): + """UPDATE refs ... and emit an audit_log row with actor_kind='worker'.""" + with _conn() as conn: + before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone() + sets, args = [], [] + if body_text is not None: + sets.append("body_text=%s"); args.append(body_text) + if metadata_patch is not None: + sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") + args.append(json.dumps(metadata_patch)) + if not sets: + return before + sets.append("updated_at=now()") + args.append(ref_id) + after = conn.execute( + f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *", + args + ).fetchone() + diff = {} + if body_text is not None and (before or {}).get("body_text") != body_text: + diff["body_text"] = {"before_len": len((before or {}).get("body_text") or ""), + "after_len": len(body_text)} + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'ref', %s, 'update', %s) + """, (ref_id, json.dumps({"kind": "update", "changes": diff}))) + return after + + +def create_ref(input_): + """INSERT INTO refs(...) RETURNING id; emit an audit_log row.""" + fields = ["space_id", "kind", "source_url", "title", "summary", "body_text", + "blob_path", "metadata", "source_kind", "external_id", "captured_at"] + cols, vals = [], [] + for f in fields: + if f in input_: + cols.append(f) + vals.append(_maybe_json(input_[f])) + placeholders = ",".join("%s" for _ in cols) + sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id" + with _conn() as conn: + ref_id = conn.execute(sql, vals).fetchone()["id"] + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'ref', %s, 'create', %s) + """, (ref_id, json.dumps({"kind": "create"}))) + return str(ref_id) + + +def get_source_doc(source_doc_id): + with _conn() as conn: + return conn.execute( + "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) + ).fetchone() + + +def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None): + with _conn() as conn: + before = conn.execute( + "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) + ).fetchone() + sets, args = [], [] + if body_text is not None: + sets.append("body_text=%s"); args.append(body_text) + if last_synced is not None: + sets.append("last_synced=%s"); args.append(last_synced) + if metadata_patch is not None: + sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") + args.append(json.dumps(metadata_patch)) + if not sets: + return before + sets.append("updated_at=now()") + args.append(source_doc_id) + after = conn.execute( + f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args + ).fetchone() + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'source_doc', %s, 'update', %s) + """, (source_doc_id, + json.dumps({"kind": "update", + "changes": {"body_text": "updated" if body_text is not None else None}}))) + return after