Files
Void-Homelab/workers/void_workers/repo.py
root 1f0e9a5f1b feat(workers): extract.pdf with Tesseract fallback
pdftotext first; falls back to per-page pdftoppm rasterization +
Tesseract OCR when the extracted text is < 200 chars. Updates
refs.body_text + metadata.extract.{method,chars} via the repo shim;
audit entry emitted with actor_kind='worker'.

born_digital.pdf fixture padded so pdftotext yields > 200 chars and
the test exercises the pdftotext path, not the OCR fallback.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:59:53 +10:00

100 lines
3.9 KiB
Python

import json
import psycopg
from psycopg.rows import dict_row
from .config import DATABASE_URL
def _conn():
return psycopg.connect(
DATABASE_URL, autocommit=True, row_factory=dict_row, client_encoding='UTF8'
)
def _maybe_json(v):
return json.dumps(v) if isinstance(v, (dict, list)) else v
def update_ref(ref_id, *, body_text=None, metadata_patch=None):
"""UPDATE refs ... and emit an audit_log row with actor_kind='worker'."""
with _conn() as conn:
before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone()
sets, args = [], []
if body_text is not None:
sets.append("body_text=%s"); args.append(body_text)
if metadata_patch is not None:
sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
args.append(json.dumps(metadata_patch))
if not sets:
return before
sets.append("updated_at=now()")
args.append(ref_id)
after = conn.execute(
f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *",
args
).fetchone()
diff = {}
if body_text is not None and (before or {}).get("body_text") != body_text:
diff["body_text"] = {"before_len": len((before or {}).get("body_text") or ""),
"after_len": len(body_text)}
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'ref', %s, 'update', %s)
""", (ref_id, json.dumps({"kind": "update", "changes": diff})))
return after
def create_ref(input_):
"""INSERT INTO refs(...) RETURNING id; emit an audit_log row."""
fields = ["space_id", "kind", "source_url", "title", "summary", "body_text",
"blob_path", "metadata", "source_kind", "external_id", "captured_at"]
cols, vals = [], []
for f in fields:
if f in input_:
cols.append(f)
vals.append(_maybe_json(input_[f]))
placeholders = ",".join("%s" for _ in cols)
sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
with _conn() as conn:
ref_id = conn.execute(sql, vals).fetchone()["id"]
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'ref', %s, 'create', %s)
""", (ref_id, json.dumps({"kind": "create"})))
return str(ref_id)
def get_source_doc(source_doc_id):
with _conn() as conn:
return conn.execute(
"SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
).fetchone()
def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None):
with _conn() as conn:
before = conn.execute(
"SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
).fetchone()
sets, args = [], []
if body_text is not None:
sets.append("body_text=%s"); args.append(body_text)
if last_synced is not None:
sets.append("last_synced=%s"); args.append(last_synced)
if metadata_patch is not None:
sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
args.append(json.dumps(metadata_patch))
if not sets:
return before
sets.append("updated_at=now()")
args.append(source_doc_id)
after = conn.execute(
f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args
).fetchone()
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'source_doc', %s, 'update', %s)
""", (source_doc_id,
json.dumps({"kind": "update",
"changes": {"body_text": "updated" if body_text is not None else None}})))
return after