diff --git a/docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md b/docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md new file mode 100644 index 0000000..fd70819 --- /dev/null +++ b/docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md @@ -0,0 +1,2121 @@ +# Void 2.0 — Plan 4: Python void-workers (heavy ML ingest) + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Stand up the Python `void-workers` service alongside the existing Node `void-server` on CT 311. Workers claim jobs from the same pg-boss queue via native SQL (`SELECT ... FOR UPDATE SKIP LOCKED`) and process four kinds: `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Whisper detects CUDA at startup and falls back to CPU when the HA replica is running on Z3 (no GPU). + +**Architecture:** Single CT, two systemd units (`void-server.service` + `void-workers.service`). Shared blob store at `/var/lib/void/blobs`. Shared Postgres at CT 310. Python venv at `/opt/void-workers/venv`. Per-kind handlers run as threads inside one Python process; concurrency configured via env. Node-side hooks: capture.js routes YouTube URLs to `ingest.video`; blob worker enqueues `extract.pdf`/`extract.image` based on `kind`. + +**Tech Stack:** Python 3.12, psycopg 3, pdftotext + Tesseract + Poppler + Pillow, faster-whisper (CTranslate2 backend), yt-dlp, ffmpeg, structlog, pytest, vitest (existing), node-cron (new Node dep for Phase D). + +**Spec:** `docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md`. + +--- + +## Out of scope (Plan 5+) + +- MCP server surface, Companion chat (Plan 5). +- Sacred Valley widgets (Plan 6). +- Speaker diarization, per-language Tesseract data, larger Whisper models with proper GPU budgeting. +- "Re-transcribe" UI action. + +## Conventions + +1. **TDD.** Write the failing test first. Run it red. Implement. Run it green. Commit. +2. **Worker handler shape:** `def handler(job_data: dict) -> dict` — return the success output, raise for retry. +3. **Worker name strings:** `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Exact strings — used in Node enqueue calls AND Python `SUBSCRIBE` registrations. +4. **All Python repo writes pass `actor={'kind': 'worker', 'id': None}`** when emitting audit_log rows. +5. **Commit per task.** +6. **Snapshot CT 310 + CT 311 before any phase that touches infra** (the standing rule). Phase A is code-only. Phase B installs system deps. Phase C resizes the CT + adds GPU passthrough — explicit snapshot before that one. + +--- + +## File structure (what this plan creates) + +``` +workers/ # NEW — Python service lives in same repo + pyproject.toml # Python 3.12 + deps (psycopg, structlog, ...) + README.md # short ops notes + void_workers/ + __init__.py + config.py # env loader + log.py # structlog setup + boss.py # claim / finish / fail / retry against pgboss tables + repo.py # psycopg shim for refs / source_docs updates + audit + safe_fetch.py # Python port of lib/ingest/safe_fetch.js + runner.py # main: load handlers, spawn loops, signal handlers + handlers/ + __init__.py + echo.py # trivial — proves harness end-to-end + pdf.py # extract.pdf (Phase B) + image.py # extract.image (Phase B) + video.py # ingest.video (Phase C) + sourcedoc.py # sync.source_doc (Phase D) + model.py # Whisper loader (CUDA detect + CPU fallback) — Phase C + tests/ + conftest.py # shared fixtures (DB url, pgboss schema reset, sample blobs) + fixtures/ + born_digital.pdf # tiny PDF with embedded text + scanned.pdf # tiny PDF that is a single page of an image + eng_text.png # PNG with known text baked in + test_boss.py + test_echo.py + test_pdf.py + test_image.py + test_video.py + test_sourcedoc.py + test_safe_fetch.py + +deploy/ + void-workers.service # NEW systemd unit + push-workers.sh # NEW — sibling to push.sh + README.md # extend: workers bootstrap section + +lib/ # Node-side changes (already exists) + jobs/workers/blob.js # MODIFY: after creating ref, enqueue extract.pdf/extract.image + api/routes/capture.js # MODIFY: detect YouTube/Vimeo, enqueue ingest.video + cron/sync_source_docs.js # NEW — node-cron, fires sync.source_doc daily + +package.json # ADD node-cron, bump version in Phase D +server.js # bump VERSION + boot the cron in Phase D +CHANGELOG.md # Phase D entry +tests/server.test.js # version assertion bump +docs/plan-4-complete.md # Phase D +``` + +--- + +## Phase A — Python harness + systemd unit + +### Task A1: Workers skeleton + venv install on the dev box + +**Files:** +- Create: `workers/pyproject.toml` +- Create: `workers/void_workers/__init__.py` +- Create: `workers/void_workers/config.py` +- Create: `workers/void_workers/log.py` +- Create: `workers/README.md` + +- [ ] **Step 1:** Create `workers/pyproject.toml`: + + ```toml + [project] + name = "void-workers" + version = "0.1.0" + requires-python = ">=3.12" + dependencies = [ + "psycopg[binary,pool]>=3.2", + "structlog>=24.1", + ] + + [project.optional-dependencies] + pdf = ["pdfplumber>=0.11", "pytesseract>=0.3.13", "pillow>=10.3"] + image = ["pytesseract>=0.3.13", "pillow>=10.3"] + video = ["yt-dlp>=2024.10.0", "faster-whisper>=1.0.3"] + test = ["pytest>=8.0", "pytest-asyncio>=0.23"] + all = ["void-workers[pdf,image,video,test]"] + + [tool.setuptools.packages.find] + where = ["."] + include = ["void_workers*"] + ``` + +- [ ] **Step 2:** Create `workers/void_workers/__init__.py`: + + ```python + __version__ = "0.1.0" + ``` + +- [ ] **Step 3:** Create `workers/void_workers/config.py`: + + ```python + import os + + def env(name, default=None, required=False): + v = os.environ.get(name, default) + if required and v is None: + raise RuntimeError(f"env {name} is required") + return v + + def env_int(name, default): + return int(os.environ.get(name, default)) + + DATABASE_URL = env("DATABASE_URL", required=True) + BLOB_ROOT = env("BLOB_ROOT", "/var/lib/void/blobs") + WHISPER_MODEL = env("WHISPER_MODEL", "small.en") + WHISPER_CACHE = env("WHISPER_CACHE", "/var/lib/void/whisper-models") + ALLOW_PRIVATE = env("VOID_INGEST_ALLOW_PRIVATE", "false") == "true" + + CONCURRENCY = { + "extract.pdf": env_int("VOID_CONCURRENCY_EXTRACT_PDF", 2), + "extract.image": env_int("VOID_CONCURRENCY_EXTRACT_IMAGE", 2), + "ingest.video": env_int("VOID_CONCURRENCY_INGEST_VIDEO", 1), + "sync.source_doc": env_int("VOID_CONCURRENCY_SYNC_SOURCE_DOC", 1), + "echo": env_int("VOID_CONCURRENCY_ECHO", 1), + } + + POLL_INTERVAL_MS = env_int("VOID_POLL_INTERVAL_MS", 1000) + ``` + +- [ ] **Step 4:** Create `workers/void_workers/log.py`: + + ```python + import logging + import structlog + + def init(): + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.JSONRenderer(), + ], + wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), + ) + return structlog.get_logger() + + log = init() + ``` + +- [ ] **Step 5:** Create `workers/README.md`: + + ```markdown + # void-workers + + Python ML ingest service alongside void-server (Node). Sibling of `lib/`. + + ## Local dev + + ```bash + cd workers + python3.12 -m venv .venv + . .venv/bin/activate + pip install -e ".[all]" + export DATABASE_URL="postgres://..." + python -m void_workers.runner + ``` + + ## Tests + + ```bash + pip install -e ".[test,all]" + pytest -v + ``` + + See `../docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md` for the full plan. + ``` + +- [ ] **Step 6:** Create the venv + install: + + ```bash + cd /project/src/void-v2/workers + python3.12 -m venv .venv + . .venv/bin/activate + pip install -e ".[all]" + python -c "from void_workers import __version__; print(__version__)" + ``` + Expected: `0.1.0`. + +- [ ] **Step 7: Commit.** + + ```bash + cd /project/src/void-v2 + git add workers/pyproject.toml workers/README.md workers/void_workers/__init__.py workers/void_workers/config.py workers/void_workers/log.py + git commit -m "feat(workers): Python skeleton + config + structlog" + ``` + +### Task A2: pgboss claim/finish helpers in `boss.py` + +**Files:** +- Create: `workers/void_workers/boss.py` +- Create: `workers/tests/conftest.py` +- Create: `workers/tests/test_boss.py` + +- [ ] **Step 1:** Create `workers/tests/conftest.py`: + + ```python + import os + import pytest + import psycopg + + DB_URL = os.environ["DATABASE_URL"] + + @pytest.fixture + def conn(): + with psycopg.connect(DB_URL, autocommit=True) as c: + yield c + + @pytest.fixture(autouse=True) + def reset_pgboss(conn): + """Drop and recreate the pgboss schema before each test.""" + conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE") + # We do NOT recreate it — pg-boss's first start() makes the schema. + # Tests that need it call ensure_pgboss(conn). + yield + + def ensure_pgboss(conn): + """Bring pgboss schema up by shelling out to Node's queue.start().""" + # In tests we cannot rely on Node — so we create the bare minimum + # tables we touch. Real pgboss tables are richer; we only need the + # columns boss.py reads and writes. + conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss") + conn.execute(""" + CREATE TABLE IF NOT EXISTS pgboss.job ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + name text NOT NULL, + priority int NOT NULL DEFAULT 0, + data jsonb, + state text NOT NULL DEFAULT 'created', + retry_limit int NOT NULL DEFAULT 5, + retry_count int NOT NULL DEFAULT 0, + retry_delay int NOT NULL DEFAULT 10, + retry_backoff boolean NOT NULL DEFAULT true, + start_after timestamptz NOT NULL DEFAULT now(), + started_on timestamptz, + completed_on timestamptz, + created_on timestamptz NOT NULL DEFAULT now(), + output jsonb + ) + """) + + @pytest.fixture + def boss_ready(conn): + ensure_pgboss(conn) + yield conn + ``` + +- [ ] **Step 2:** Write the failing test `workers/tests/test_boss.py`: + + ```python + import json + from void_workers.boss import Boss + + def test_claim_returns_none_when_no_jobs(boss_ready): + boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) + assert boss.claim("echo") is None + + def test_claim_then_complete_round_trip(boss_ready): + boss_ready.execute( + "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", + (json.dumps({"ping": 1}),) + ) + boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) + job = boss.claim("echo") + assert job is not None + assert job["data"] == {"ping": 1} + boss.complete(job["id"], {"pong": 1}) + row = boss_ready.execute( + "SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],) + ).fetchone() + assert row[0] == "completed" + assert row[1] == {"pong": 1} + + def test_fail_records_output_and_state(boss_ready): + boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')") + boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) + job = boss.claim("echo") + boss.fail(job["id"], {"name": "BoomError", "message": "boom"}) + row = boss_ready.execute( + "SELECT state, output, retry_count FROM pgboss.job WHERE id=%s", + (job["id"],) + ).fetchone() + # retry_limit defaults to 5 and retry_count was 0 → should move to 'retry' + assert row[0] == "retry" + assert row[2] == 1 + + def test_fail_past_retry_limit_marks_failed(boss_ready): + boss_ready.execute( + "INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)" + ) + boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) + job = boss.claim("echo") + boss.fail(job["id"], {"name": "Done"}) + row = boss_ready.execute( + "SELECT state FROM pgboss.job WHERE id=%s", (job["id"],) + ).fetchone() + assert row[0] == "failed" + ``` + +- [ ] **Step 3:** Run red: + + ```bash + cd /project/src/void-v2/workers + . .venv/bin/activate + DATABASE_URL="$(grep ^DATABASE_URL= ../.env 2>/dev/null | cut -d= -f2- || + echo "postgres://void:$(ssh root@192.168.1.124 cat /root/void2-db-pass.txt | sed s/^DB_PASS=//)@192.168.1.215:5432/void")" \ + pytest tests/test_boss.py -v + ``` + Expected: 4 failed (boss.py does not exist). + +- [ ] **Step 4:** Implement `workers/void_workers/boss.py`: + + ```python + import json + import psycopg + from psycopg.rows import dict_row + + class Boss: + def __init__(self, dsn): + self.dsn = dsn + + def _conn(self): + return psycopg.connect(self.dsn, autocommit=False, row_factory=dict_row) + + def claim(self, queue): + """Atomically claim one job for the given queue. Returns dict or None.""" + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT id, data, retry_count, retry_limit + FROM pgboss.job + WHERE name=%s + AND state IN ('created','retry') + AND start_after <= now() + ORDER BY priority DESC, created_on ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + """, (queue,)) + row = cur.fetchone() + if not row: return None + cur.execute( + "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s", + (row["id"],) + ) + conn.commit() + return row + + def complete(self, job_id, output): + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + UPDATE pgboss.job + SET state='completed', completed_on=now(), output=%s + WHERE id=%s + """, (json.dumps(output), job_id)) + conn.commit() + + def fail(self, job_id, output): + """Mark the job retry (if budget left) or failed (otherwise).""" + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT retry_count, retry_limit, retry_delay, retry_backoff " + "FROM pgboss.job WHERE id=%s FOR UPDATE", + (job_id,) + ) + row = cur.fetchone() + if not row: return + new_count = row["retry_count"] + 1 + if new_count > row["retry_limit"]: + cur.execute( + "UPDATE pgboss.job SET state='failed', output=%s, " + "completed_on=now() WHERE id=%s", + (json.dumps(output), job_id) + ) + else: + # exponential backoff: delay * 2^(count-1) + delay = row["retry_delay"] + if row["retry_backoff"]: + delay = delay * (2 ** (new_count - 1)) + cur.execute(""" + UPDATE pgboss.job + SET state='retry', + retry_count=%s, + start_after=now() + (%s || ' seconds')::interval, + output=%s + WHERE id=%s + """, (new_count, str(delay), json.dumps(output), job_id)) + conn.commit() + ``` + +- [ ] **Step 5:** Run green: + + ```bash + DATABASE_URL=... pytest tests/test_boss.py -v + ``` + Expected: 4 passed. + +- [ ] **Step 6: Commit.** + + ```bash + cd /project/src/void-v2 + git add workers/void_workers/boss.py workers/tests/conftest.py workers/tests/test_boss.py + git commit -m "feat(workers): pgboss claim/complete/fail via psycopg" + ``` + +### Task A3: Echo handler + runner loop + +**Files:** +- Create: `workers/void_workers/handlers/__init__.py` +- Create: `workers/void_workers/handlers/echo.py` +- Create: `workers/void_workers/runner.py` +- Create: `workers/tests/test_echo.py` + +- [ ] **Step 1:** Write failing test `workers/tests/test_echo.py`: + + ```python + import json + import threading + import time + from void_workers.boss import Boss + from void_workers.runner import Runner + from void_workers.config import DATABASE_URL + + def test_echo_handler_runs(boss_ready): + boss_ready.execute( + "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", + (json.dumps({"ping": 42}),) + ) + runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}}) + t = threading.Thread(target=runner.run, kwargs={"once": True}, daemon=True) + t.start() + t.join(timeout=5) + row = boss_ready.execute( + "SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1" + ).fetchone() + assert row[0] == "completed" + assert row[1] == {"pong": 42} + ``` + +- [ ] **Step 2:** Run red: + + ```bash + DATABASE_URL=... pytest tests/test_echo.py -v + ``` + Expected: ImportError / 1 failed. + +- [ ] **Step 3:** Implement `workers/void_workers/handlers/__init__.py`: + + ```python + from . import echo + + REGISTRY = { + echo.NAME: echo.handle, + } + ``` + +- [ ] **Step 4:** Implement `workers/void_workers/handlers/echo.py`: + + ```python + NAME = "echo" + + def handle(job_data: dict) -> dict: + return {"pong": job_data.get("ping", 0)} + ``` + +- [ ] **Step 5:** Implement `workers/void_workers/runner.py`: + + ```python + import signal + import threading + import time + from concurrent.futures import ThreadPoolExecutor + from .boss import Boss + from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL + from .handlers import REGISTRY + from .log import log + + class Runner: + def __init__(self, dsn=None, handlers=None): + self.boss = Boss(dsn or DATABASE_URL) + self.handlers = handlers or { + name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY + } + self._stop = threading.Event() + + def _process_one(self, queue): + job = self.boss.claim(queue) + if not job: return False + log.info("job_claimed", job_id=str(job["id"]), name=queue) + handler = REGISTRY[queue] + try: + out = handler(job["data"] or {}) + self.boss.complete(job["id"], out or {}) + log.info("job_completed", job_id=str(job["id"]), name=queue) + except Exception as e: + log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e)) + self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)}) + return True + + def _loop(self, queue): + interval = POLL_INTERVAL_MS / 1000.0 + while not self._stop.is_set(): + try: + did = self._process_one(queue) + except Exception as e: + log.error("loop_error", queue=queue, err=str(e)) + did = False + if not did: + self._stop.wait(interval) + + def run(self, once=False): + if once: + for q in self.handlers: + self._process_one(q) + return + executor = ThreadPoolExecutor(max_workers=sum(h["concurrency"] for h in self.handlers.values())) + for q, cfg in self.handlers.items(): + for _ in range(cfg["concurrency"]): + executor.submit(self._loop, q) + for sig in (signal.SIGTERM, signal.SIGINT): + signal.signal(sig, lambda *_: self._stop.set()) + while not self._stop.is_set(): + time.sleep(0.5) + executor.shutdown(wait=True) + + if __name__ == "__main__": + Runner().run() + ``` + +- [ ] **Step 6:** Run green: + + ```bash + DATABASE_URL=... pytest tests/test_echo.py -v + ``` + Expected: 1 passed. + +- [ ] **Step 7: Commit.** + + ```bash + cd /project/src/void-v2 + git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/echo.py workers/void_workers/runner.py workers/tests/test_echo.py + git commit -m "feat(workers): runner loop + echo handler" + ``` + +### Task A4: systemd unit + push-workers.sh + +**Files:** +- Create: `deploy/void-workers.service` +- Create: `deploy/push-workers.sh` +- Modify: `deploy/README.md` (append workers bootstrap section) + +- [ ] **Step 1:** Create `deploy/void-workers.service`: + + ```ini + [Unit] + Description=Void 2.0 workers + After=network-online.target void-server.service + Wants=network-online.target + + [Service] + Type=simple + User=voidworkers + WorkingDirectory=/opt/void-workers + EnvironmentFile=/opt/void-workers/.env + ExecStart=/opt/void-workers/venv/bin/python -m void_workers.runner + Restart=on-failure + RestartSec=5 + StandardOutput=journal + StandardError=journal + MemoryMax=6G + + [Install] + WantedBy=multi-user.target + ``` + +- [ ] **Step 2:** Create `deploy/push-workers.sh`: + + ```bash + #!/usr/bin/env bash + set -euo pipefail + + # Push Python void-workers source to CT 311 and restart the service. + # Run from /project/src/void-v2. + + TARGET=${TARGET:-root@192.168.1.13} + REMOTE_DIR=${REMOTE_DIR:-/opt/void-workers} + + rsync -avz --delete \ + --exclude .venv \ + --exclude __pycache__ \ + --exclude '*.egg-info' \ + --exclude tests \ + workers/ "$TARGET:$REMOTE_DIR/" + + ssh "$TARGET" " + cd $REMOTE_DIR + if [ ! -d venv ]; then + python3.12 -m venv venv + fi + . venv/bin/activate + pip install --quiet --upgrade pip + pip install --quiet -e '.[all]' + systemctl restart void-workers + " + echo "Workers deployed." + ``` + + Make it executable: + ```bash + chmod +x deploy/push-workers.sh + ``` + +- [ ] **Step 3:** Extend `deploy/README.md` — append the following section: + + ```markdown + + ## Workers (Python void-workers — Plan 4+) + + Runs alongside void-server as a second systemd unit. + + One-time setup on CT 311: + + ```bash + apt install -y python3.12 python3.12-venv python3-pip \ + ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils + + useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers + mkdir -p /opt/void-workers /var/lib/void/whisper-models + chown voidworkers: /opt/void-workers + chown -R voidworkers: /var/lib/void/whisper-models + + # voidworkers needs to read the shared blob store + usermod -aG void voidworkers + + install -m 644 deploy/void-workers.service /etc/systemd/system/ + systemctl daemon-reload + systemctl enable void-workers + + # /opt/void-workers/.env — see plan §Deploy delta + ``` + + Deploy after edits: + + ```bash + cd /project/src/void-v2 + ./deploy/push-workers.sh + ``` + ``` + +- [ ] **Step 4:** Run the existing Node test suite — must still be green (we have not modified anything Node touches yet). + + ```bash + cd /project/src/void-v2 + npx vitest run + ``` + +- [ ] **Step 5: Commit.** + + ```bash + git add deploy/void-workers.service deploy/push-workers.sh deploy/README.md + git commit -m "feat(workers): systemd unit + push-workers.sh" + ``` + +### Task A5: Phase A close — memory checkpoint + +- [ ] **Step 1:** Update memory `project_void_v2_execution.md`: mark "Plan 4 Phase A complete: harness + echo running locally; void-workers.service installed on CT 311 pending real deps (Phase B+)". +- [ ] **Step 2:** Note the boss.py contract decision (psycopg + native SQL) so future plans don't re-litigate it. + +--- + +## Phase B — `extract.pdf` + `extract.image` + +### Task B1: Install system deps on the dev box + +These deps support local pytest runs. The same packages will be installed on CT 311 when Phase A's systemd unit lands there. + +- [ ] **Step 1:** Verify deps locally (we are running on Ubuntu 24.04 LXC per CLAUDE.md): + + ```bash + command -v pdftotext && command -v tesseract && command -v ffmpeg || echo "missing deps" + apt list --installed 2>/dev/null | grep -E "poppler|tesseract|ffmpeg" | head -5 + ``` + +- [ ] **Step 2:** If any are missing, install: + + ```bash + apt install -y poppler-utils tesseract-ocr tesseract-ocr-eng ffmpeg + ``` + +- [ ] **Step 3:** Install pdf/image Python extras into the workers venv: + + ```bash + cd /project/src/void-v2/workers + . .venv/bin/activate + pip install -e '.[pdf,image,test]' + python -c "import pdfplumber, pytesseract; print('ok')" + ``` + +- [ ] **Step 4:** Add a `tests/fixtures/` README explaining how fixtures were generated (so we can regenerate them): + + Create `workers/tests/fixtures/README.md`: + + ```markdown + # Test fixtures + + - `born_digital.pdf` — generated by + `pdftotext -version 2>&1 | head -1 > /tmp/b.txt && enscript -p /tmp/b.ps /tmp/b.txt && ps2pdf /tmp/b.ps born_digital.pdf` + (or any tool that produces a PDF with embedded text). Must contain + the string "void-workers". + - `scanned.pdf` — `convert -density 200 eng_text.png scanned.pdf` (PDF + that is a single page of an image — `pdftotext` will yield nothing, + Tesseract fallback must pick it up). + - `eng_text.png` — a PNG with a clear English sentence on white + background. Must contain the string "blackflame". + ``` + + Generate them now: + + ```bash + cd /project/src/void-v2/workers/tests/fixtures + # eng_text.png: render text with ImageMagick (apt: imagemagick) + apt install -y imagemagick ghostscript + convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 36 \ + -fill black -annotate +50+100 "blackflame palette" eng_text.png + # born_digital.pdf: simple TXT → PDF + echo "void-workers proof-of-life document" > /tmp/b.txt + enscript -p /tmp/b.ps /tmp/b.txt 2>/dev/null || echo "enscript missing — using ImageMagick" + if [ -f /tmp/b.ps ]; then + ps2pdf /tmp/b.ps born_digital.pdf + else + convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 24 \ + -fill black -annotate +50+100 "void-workers proof-of-life" born_digital.pdf + # NOTE: this is a single-image PDF too — pdftotext will return empty. + # Override: use poppler's pdftohtml tools or create with text. + fi + # scanned.pdf: PNG → PDF + convert -density 200 eng_text.png scanned.pdf + ls -la + ``` + + If `enscript` is not available, install or use another text-PDF tool. The key fixture invariant is that `pdftotext born_digital.pdf -` returns non-empty text. + +- [ ] **Step 5: Commit fixtures.** + + ```bash + cd /project/src/void-v2 + git add workers/tests/fixtures + git commit -m "test(workers): pdf/image test fixtures" + ``` + +### Task B2: `extract.pdf` worker — pdftotext path + +**Files:** +- Create: `workers/void_workers/handlers/pdf.py` +- Modify: `workers/void_workers/handlers/__init__.py` +- Create: `workers/void_workers/repo.py` +- Create: `workers/tests/test_pdf.py` + +- [ ] **Step 1:** Write the failing test: + + ```python + # workers/tests/test_pdf.py + import json + import os + import psycopg + from pathlib import Path + from void_workers.handlers.pdf import handle as handle_pdf + from void_workers.config import DATABASE_URL + + FIXTURES = Path(__file__).parent / "fixtures" + + def _seed_space_and_ref(conn, blob_path, kind="pdf"): + """Bring up the Void schema (matches migrate.js) just enough to insert a ref.""" + sp = conn.execute( + "INSERT INTO spaces(slug, name) VALUES('plan4-tests', 'P4') " + "ON CONFLICT (slug) DO UPDATE SET name=EXCLUDED.name RETURNING id" + ).fetchone()[0] + ref = conn.execute( + "INSERT INTO refs(space_id, kind, source_url, title, blob_path) " + "VALUES(%s, %s, NULL, 'fixture', %s) RETURNING id", + (sp, kind, str(blob_path)) + ).fetchone()[0] + return sp, ref + + def test_pdf_born_digital_uses_pdftotext(conn): + """A PDF with embedded text gets pdftotext-extracted body.""" + conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") + # Apply Void migrations against the conn + _run_node_migrations() + blob = FIXTURES / "born_digital.pdf" + sp, ref = _seed_space_and_ref(conn, blob) + out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)}) + assert out["ref_id"] == str(ref) + assert out["chars"] > 0 + row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() + assert "void-workers" in (row[0] or "").lower() + + def _run_node_migrations(): + """Shell to Node's migrate runner — same DB.""" + import subprocess + subprocess.run( + ["node", "lib/db/migrate.js", "up"], + cwd="/project/src/void-v2", + check=True + ) + ``` + +- [ ] **Step 2:** Run red: + + ```bash + cd /project/src/void-v2/workers + . .venv/bin/activate + DATABASE_URL=... pytest tests/test_pdf.py -v + ``` + Expected: ImportError on pdf module. + +- [ ] **Step 3:** Implement `workers/void_workers/repo.py`: + + ```python + import json + import psycopg + from psycopg.rows import dict_row + from .config import DATABASE_URL + + def _conn(): + return psycopg.connect(DATABASE_URL, autocommit=True, row_factory=dict_row) + + def update_ref(ref_id, *, body_text=None, metadata_patch=None): + """UPDATE refs ... + emit audit_log row with actor_kind='worker'.""" + with _conn() as conn: + before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone() + sets, args = [], [] + if body_text is not None: + sets.append("body_text=%s"); args.append(body_text) + if metadata_patch is not None: + sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") + args.append(json.dumps(metadata_patch)) + if not sets: return before + sets.append("updated_at=now()") + args.append(ref_id) + after = conn.execute( + f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *", + args + ).fetchone() + # audit log — match Node's audit.js diff shape (simple) + diff = {} + if body_text is not None and (before or {}).get("body_text") != body_text: + diff["body_text"] = {"before": "(redacted)", "after_len": len(body_text)} + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'ref', %s, 'update', %s) + """, (ref_id, json.dumps({"kind": "update", "changes": diff}))) + return after + ``` + +- [ ] **Step 4:** Implement `workers/void_workers/handlers/pdf.py`: + + ```python + import subprocess + from .. import repo + + NAME = "extract.pdf" + + def _pdftotext(blob_path): + return subprocess.check_output( + ["pdftotext", "-layout", blob_path, "-"], + timeout=120 + ).decode("utf-8", errors="replace") + + def handle(job_data: dict) -> dict: + ref_id = job_data["ref_id"] + blob_path = job_data["blob_path"] + text = _pdftotext(blob_path).strip() + method = "pdftotext" + # Fallback (Tesseract OCR) lands in Task B3. + body_text = text[:200_000] + repo.update_ref(ref_id, body_text=body_text, + metadata_patch={"extract": {"method": method, "chars": len(body_text)}}) + return {"ref_id": ref_id, "chars": len(body_text), "method": method} + ``` + +- [ ] **Step 5:** Add `pdf.handle` to the registry — edit `workers/void_workers/handlers/__init__.py`: + + ```python + from . import echo, pdf + + REGISTRY = { + echo.NAME: echo.handle, + pdf.NAME: pdf.handle, + } + ``` + +- [ ] **Step 6:** Run green: + + ```bash + DATABASE_URL=... pytest tests/test_pdf.py::test_pdf_born_digital_uses_pdftotext -v + ``` + Expected: 1 passed. + +- [ ] **Step 7: Commit.** + + ```bash + cd /project/src/void-v2 + git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/pdf.py workers/void_workers/repo.py workers/tests/test_pdf.py + git commit -m "feat(workers): extract.pdf via pdftotext" + ``` + +### Task B3: PDF Tesseract fallback for scanned pages + +**Files:** +- Modify: `workers/void_workers/handlers/pdf.py` +- Modify: `workers/tests/test_pdf.py` + +- [ ] **Step 1:** Add the failing test (append to `test_pdf.py`): + + ```python + def test_pdf_scanned_falls_back_to_tesseract(conn): + conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") + _run_node_migrations() + blob = FIXTURES / "scanned.pdf" + sp, ref = _seed_space_and_ref(conn, blob) + out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)}) + assert out["method"] == "tesseract" + row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() + assert "blackflame" in (row[0] or "").lower() + ``` + +- [ ] **Step 2:** Run red. + +- [ ] **Step 3:** Implement the fallback in `workers/void_workers/handlers/pdf.py`: + + ```python + import io + import subprocess + import tempfile + from pathlib import Path + from PIL import Image + import pytesseract + from .. import repo + + NAME = "extract.pdf" + FALLBACK_THRESHOLD = 200 # chars + + def _pdftotext(blob_path): + return subprocess.check_output( + ["pdftotext", "-layout", blob_path, "-"], timeout=120 + ).decode("utf-8", errors="replace") + + def _ocr_pdf(blob_path): + """Rasterize each page with pdftoppm, OCR each with Tesseract.""" + with tempfile.TemporaryDirectory() as tmp: + subprocess.run( + ["pdftoppm", "-r", "200", "-png", blob_path, f"{tmp}/p"], + check=True, timeout=300 + ) + pages = sorted(Path(tmp).glob("p-*.png")) + parts = [] + for p in pages: + img = Image.open(p) + parts.append(pytesseract.image_to_string(img, lang="eng")) + return "\n".join(parts) + + def handle(job_data: dict) -> dict: + ref_id = job_data["ref_id"] + blob_path = job_data["blob_path"] + method = "pdftotext" + text = _pdftotext(blob_path).strip() + if len(text) < FALLBACK_THRESHOLD: + method = "tesseract" + text = _ocr_pdf(blob_path).strip() + body_text = text[:200_000] + repo.update_ref(ref_id, body_text=body_text, + metadata_patch={"extract": {"method": method, "chars": len(body_text)}}) + return {"ref_id": ref_id, "chars": len(body_text), "method": method} + ``` + +- [ ] **Step 4:** Run green: + + ```bash + DATABASE_URL=... pytest tests/test_pdf.py -v + ``` + Expected: 2 passed. + +- [ ] **Step 5: Commit.** + + ```bash + git add workers/void_workers/handlers/pdf.py workers/tests/test_pdf.py + git commit -m "feat(workers): pdf Tesseract fallback for scanned pages" + ``` + +### Task B4: `extract.image` worker + +**Files:** +- Create: `workers/void_workers/handlers/image.py` +- Modify: `workers/void_workers/handlers/__init__.py` +- Create: `workers/tests/test_image.py` + +- [ ] **Step 1:** Write the failing test `workers/tests/test_image.py`: + + ```python + import os + from pathlib import Path + from void_workers.handlers.image import handle as handle_image + + FIXTURES = Path(__file__).parent / "fixtures" + + def test_image_ocr(conn): + conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") + import subprocess + subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True) + sp = conn.execute( + "INSERT INTO spaces(slug, name) VALUES('plan4-img', 'I') RETURNING id" + ).fetchone()[0] + blob = FIXTURES / "eng_text.png" + ref = conn.execute( + "INSERT INTO refs(space_id, kind, blob_path) VALUES(%s, 'image', %s) RETURNING id", + (sp, str(blob)) + ).fetchone()[0] + out = handle_image({"ref_id": str(ref), "blob_path": str(blob)}) + assert out["chars"] > 0 + row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() + assert "blackflame" in (row[0] or "").lower() + ``` + +- [ ] **Step 2:** Run red. + +- [ ] **Step 3:** Implement `workers/void_workers/handlers/image.py`: + + ```python + from PIL import Image + import pytesseract + from .. import repo + + NAME = "extract.image" + + def handle(job_data: dict) -> dict: + ref_id = job_data["ref_id"] + blob_path = job_data["blob_path"] + text = pytesseract.image_to_string(Image.open(blob_path), lang="eng").strip() + body_text = text[:200_000] + repo.update_ref(ref_id, body_text=body_text, + metadata_patch={"extract": {"method": "tesseract", "chars": len(body_text)}}) + return {"ref_id": ref_id, "chars": len(body_text)} + ``` + +- [ ] **Step 4:** Register — edit `workers/void_workers/handlers/__init__.py`: + + ```python + from . import echo, pdf, image + + REGISTRY = { + echo.NAME: echo.handle, + pdf.NAME: pdf.handle, + image.NAME: image.handle, + } + ``` + +- [ ] **Step 5:** Run green. + +- [ ] **Step 6: Commit.** + + ```bash + git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/image.py workers/tests/test_image.py + git commit -m "feat(workers): extract.image via Tesseract" + ``` + +### Task B5: Node side — blob worker enqueues extract jobs + +**Files:** +- Modify: `lib/jobs/workers/blob.js` +- Modify: `tests/jobs/workers/blob.test.js` + +- [ ] **Step 1:** Extend the existing blob.test.js with a new assertion. Append: + + ```js + it('enqueues extract.pdf after creating a pdf ref', async () => { + const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null }); + const upTmp = path.join(tmpRoot, 'doc.tmp'); + await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...')); + const id = await queue.enqueue('ingest.blob', { + space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf' + }); + await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); + const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js'); + const followUp = (await jobsRepo.list({ name: 'extract.pdf' })).find(j => j.data?.blob_path); + expect(followUp).toBeTruthy(); + }); + + it('enqueues extract.image after creating an image ref', async () => { + const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null }); + const upTmp = path.join(tmpRoot, 'pic.tmp'); + await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47])); + const id = await queue.enqueue('ingest.blob', { + space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png' + }); + await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); + const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js'); + const followUp = (await jobsRepo.list({ name: 'extract.image' })).find(j => j.data?.blob_path); + expect(followUp).toBeTruthy(); + }); + ``` + +- [ ] **Step 2:** Run red: + + ```bash + cd /project/src/void-v2 + npx vitest run tests/jobs/workers/blob.test.js + ``` + Expected: 2 new tests fail (no extract.* enqueue yet). + +- [ ] **Step 3:** Modify `lib/jobs/workers/blob.js` to enqueue the follow-up job: + + ```js + // existing imports + import * as queue from '../queue.js'; + + // ... inside handler, after `const row = await refs.create(...)`: + if (kind === 'pdf') { + await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path }); + } else if (kind === 'image') { + await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path }); + } + return { ref_id: row.id, sha }; + ``` + + (Apply the exact edit at the `return { ref_id: row.id, sha };` site.) + +- [ ] **Step 4:** Run green: + + ```bash + npx vitest run tests/jobs/workers/blob.test.js + ``` + Expected: 4 passed. + +- [ ] **Step 5:** Run the full suite — must still be green. + + ```bash + npx vitest run + ``` + +- [ ] **Step 6: Commit.** + + ```bash + git add lib/jobs/workers/blob.js tests/jobs/workers/blob.test.js + git commit -m "feat(jobs): blob worker fans out to extract.pdf / extract.image" + ``` + +### Task B6: Phase B close — memory + full suite + +- [ ] **Step 1:** `npx vitest run` — full Node suite green. +- [ ] **Step 2:** `cd workers && DATABASE_URL=... pytest -v` — full Python suite green. +- [ ] **Step 3:** Update memory: "Plan 4 Phase B complete: extract.pdf + extract.image workers; Node blob worker fans out." + +--- + +## Phase C — `ingest.video` + CT 311 resize + GPU passthrough + +### Task C1: Snapshot + CT 311 resize + GPU passthrough + +User pre-authorized this 2026-06-01. + +- [ ] **Step 1:** Snapshot CT 310 + CT 311. + + ```bash + SNAP=plan4_pre_resize_$(date +%Y%m%d_%H%M) + ssh root@192.168.1.124 "pct snapshot 310 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" & + ssh root@192.168.1.124 "pct snapshot 311 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" & + wait + ssh root@192.168.1.124 "pct listsnapshot 310 | tail -3; echo ---; pct listsnapshot 311 | tail -3" + ``` + +- [ ] **Step 2:** Resize CT 311. + + ```bash + ssh root@192.168.1.124 "pct set 311 -memory 8192 -cores 6" + ssh root@192.168.1.124 "pct config 311 | grep -E 'cores|memory'" + ``` + +- [ ] **Step 3:** Add GPU device-node passthrough lines (copy from CT 102). + + ```bash + ssh root@192.168.1.124 " + pct set 311 -dev0 /dev/nvidia0,gid=44 + pct set 311 -dev1 /dev/nvidiactl,gid=44 + pct set 311 -dev2 /dev/nvidia-uvm,gid=44 + pct set 311 -dev3 /dev/nvidia-uvm-tools,gid=44 + pct set 311 -dev4 /dev/nvidia-caps/nvidia-cap1,gid=44 + pct set 311 -dev5 /dev/nvidia-caps/nvidia-cap2,gid=44 + pct config 311 | grep dev + " + ``` + +- [ ] **Step 4:** Restart CT 311. + + ```bash + ssh root@192.168.1.124 "pct reboot 311" + sleep 10 + ssh root@192.168.1.13 "ls -la /dev/nvidia* 2>&1 | head; nvidia-smi --query-gpu=name --format=csv,noheader 2>&1 || echo 'nvidia-smi missing (no driver in CT) — expected'" + curl -s http://192.168.1.13:3000/health + ``` + Expected: device nodes visible, health returns alpha-3. + +- [ ] **Step 5: Commit (config-as-comment).** No code change yet — we record the infra step in the next phase's commit message. + +### Task C2: Install ffmpeg + faster-whisper deps on CT 311 + +- [ ] **Step 1:** Install system deps: + + ```bash + ssh root@192.168.1.13 " + apt update + apt install -y python3.12 python3.12-venv python3-pip \ + ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils \ + nvidia-utils-535 nvidia-cuda-toolkit 2>&1 | tail -5 + python3.12 --version + ffmpeg -version | head -1 + tesseract --version 2>&1 | head -1 + " + ``` + + Note: `nvidia-cuda-toolkit` is for `ctranslate2`'s CUDA runtime; if you'd rather not pull the full toolkit (~1 GB), use the official `libcudnn8 libcublas12` runtime packages instead. + +- [ ] **Step 2:** Create the voidworkers user + dirs on CT 311: + + ```bash + ssh root@192.168.1.13 " + id voidworkers >/dev/null 2>&1 || useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers + mkdir -p /opt/void-workers /var/lib/void/whisper-models + chown voidworkers: /opt/void-workers + chown -R voidworkers: /var/lib/void/whisper-models + usermod -aG void voidworkers + chmod -R g+rX /var/lib/void/blobs # ensure voidworkers (in void group) can read blobs + + install -m 644 /opt/void-workers/.env 2>/dev/null || true + cat > /opt/void-workers/.env < 0 + except Exception as e: + log.info("ctranslate2_cuda_probe_failed", err=str(e)) + return False + + def whisper_model(): + global _whisper_model + if _whisper_model is None: + from faster_whisper import WhisperModel + name = os.environ.get("WHISPER_MODEL", "small.en") + cache = os.environ.get("WHISPER_CACHE", "/var/lib/void/whisper-models") + device = "cuda" if cuda_available() else "cpu" + compute_type = "float16" if device == "cuda" else "int8" + log.info("whisper_loading", model=name, device=device, compute_type=compute_type, cache=cache) + _whisper_model = WhisperModel(name, device=device, compute_type=compute_type, download_root=cache) + return _whisper_model + + def whisper_transcribe(audio_path): + segments, _info = whisper_model().transcribe(audio_path, vad_filter=True) + return "\n".join(s.text.strip() for s in segments).strip() + ``` + +- [ ] **Step 4:** Run green: + + ```bash + pytest tests/test_model.py -v + ``` + Expected: 2 passed. + +- [ ] **Step 5: Commit.** + + ```bash + git add workers/void_workers/model.py workers/tests/test_model.py + git commit -m "feat(workers): whisper loader with CUDA detect + CPU fallback" + ``` + +### Task C4: `ingest.video` worker (yt-dlp + transcribe + create ref) + +**Files:** +- Create: `workers/void_workers/handlers/video.py` +- Modify: `workers/void_workers/handlers/__init__.py` +- Create: `workers/tests/test_video.py` +- Modify: `workers/void_workers/repo.py` — add `create_ref()` + +- [ ] **Step 1:** Failing test (mocked yt-dlp + mocked Whisper): + + ```python + # workers/tests/test_video.py + from unittest.mock import patch, MagicMock + from void_workers.handlers.video import handle as handle_video + + def test_video_creates_ref_with_transcript_and_metadata(conn, monkeypatch): + conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") + import subprocess + subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True) + sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('plan4-vid', 'V') RETURNING id").fetchone()[0] + + info = { + "title": "Sample video", + "description": "a description", + "duration": 90, + "uploader": "Channel", + "thumbnail": "https://i.ytimg.com/t.jpg" + } + with patch("void_workers.handlers.video._yt_dlp_info", return_value=info), \ + patch("void_workers.handlers.video._yt_dlp_audio", return_value="/tmp/fake.opus"), \ + patch("void_workers.handlers.video.whisper_transcribe", return_value="hello world transcript"), \ + patch("os.unlink"): + out = handle_video({"space_id": str(sp), "url": "https://youtu.be/abc"}) + + assert "ref_id" in out + row = conn.execute("SELECT title, body_text, source_kind FROM refs WHERE id=%s", (out["ref_id"],)).fetchone() + assert row[0] == "Sample video" + assert "hello world" in row[1] + assert row[2] == "youtube" + ``` + +- [ ] **Step 2:** Run red. + +- [ ] **Step 3:** Implement `workers/void_workers/handlers/video.py`: + + ```python + import hashlib + import os + import subprocess + import tempfile + import json + from .. import repo + from ..model import whisper_transcribe + + NAME = "ingest.video" + + def _yt_dlp_info(url): + """Returns dict of metadata, or None if yt-dlp could not extract.""" + try: + out = subprocess.check_output( + ["yt-dlp", "-J", "--no-warnings", "--no-playlist", url], + timeout=60 + ) + return json.loads(out) + except subprocess.CalledProcessError as e: + return None + + def _yt_dlp_audio(url): + """Downloads bestaudio to a temp .opus and returns the path.""" + tmp_dir = tempfile.mkdtemp(prefix="void-yt-") + out_template = os.path.join(tmp_dir, "audio.%(ext)s") + subprocess.run( + ["yt-dlp", "-x", "--audio-format", "opus", "-o", out_template, + "--no-warnings", "--no-playlist", url], + check=True, timeout=600 + ) + for f in os.listdir(tmp_dir): + if f.startswith("audio."): + return os.path.join(tmp_dir, f) + raise RuntimeError("yt-dlp produced no audio file") + + def _idem(space_id, url): + return hashlib.sha256((space_id + "\x00" + url).encode()).hexdigest() + + def _kind(url): + return "youtube" if ("youtube.com" in url or "youtu.be" in url) else "video" + + def handle(job_data: dict) -> dict: + space_id = job_data["space_id"] + url = job_data["url"] + idem = _idem(space_id, url) + + info = _yt_dlp_info(url) + if info is None: + return {"skipped": "yt-dlp"} + + audio_path = _yt_dlp_audio(url) + try: + transcript = whisper_transcribe(audio_path) + finally: + try: os.unlink(audio_path) + except OSError: pass + + ref_id = repo.create_ref({ + "space_id": space_id, + "kind": "video", + "source_url": url, + "title": info.get("title") or url, + "summary": (info.get("description") or "")[:5000], + "body_text": transcript[:200_000], + "source_kind": _kind(url), + "external_id": idem, + "metadata": { + "duration_s": info.get("duration"), + "uploader": info.get("uploader"), + "thumbnail": info.get("thumbnail"), + "extract": {"method": "whisper", "chars": len(transcript)}, + } + }) + return {"ref_id": ref_id, "chars": len(transcript)} + ``` + +- [ ] **Step 4:** Extend `workers/void_workers/repo.py` with `create_ref`: + + ```python + def create_ref(input_): + """INSERT INTO refs(...) RETURNING id; emit audit_log.""" + fields = ["space_id","kind","source_url","title","summary","body_text", + "blob_path","metadata","source_kind","external_id","captured_at"] + cols, vals = [], [] + for f in fields: + if f in input_: + cols.append(f); vals.append(input_[f]) + placeholders = ",".join(f"%s" for _ in cols) + sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id" + with _conn() as conn: + ref_id = conn.execute(sql, vals).fetchone()["id"] + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'ref', %s, 'create', %s) + """, (ref_id, json.dumps({"kind": "create"}))) + return str(ref_id) + ``` + + (Make sure `json` import is at the top of `repo.py` if not already there. Also note `metadata` must be `json.dumps`'d before insertion because we're passing a dict; tweak the field handling to JSON-encode dict/list values.) + + Patch `create_ref` to JSON-encode jsonb fields: + + ```python + def _maybe_json(v): + return json.dumps(v) if isinstance(v, (dict, list)) else v + + def create_ref(input_): + fields = ["space_id","kind","source_url","title","summary","body_text", + "blob_path","metadata","source_kind","external_id","captured_at"] + cols, vals = [], [] + for f in fields: + if f in input_: + cols.append(f); vals.append(_maybe_json(input_[f])) + placeholders = ",".join(f"%s" for _ in cols) + sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id" + with _conn() as conn: + ref_id = conn.execute(sql, vals).fetchone()["id"] + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'ref', %s, 'create', %s) + """, (ref_id, json.dumps({"kind": "create"}))) + return str(ref_id) + ``` + +- [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`: + + ```python + from . import echo, pdf, image, video + REGISTRY = { + echo.NAME: echo.handle, + pdf.NAME: pdf.handle, + image.NAME: image.handle, + video.NAME: video.handle, + } + ``` + +- [ ] **Step 6:** Run green: + + ```bash + pytest tests/test_video.py -v + ``` + Expected: 1 passed. + +- [ ] **Step 7: Commit.** + + ```bash + git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/video.py workers/void_workers/repo.py workers/tests/test_video.py + git commit -m "feat(workers): ingest.video via yt-dlp + Whisper" + ``` + +### Task C5: Node — capture.js routes YouTube to `ingest.video` + +**Files:** +- Modify: `lib/api/routes/capture.js` +- Modify: `tests/api/capture.test.js` + +- [ ] **Step 1:** Failing test — append to `tests/api/capture.test.js`: + + ```js + it('POST /api/capture with YouTube URL enqueues ingest.video, not ingest.url', async () => { + const res = await request(app).post('/api/capture').set(ownerHeaders) + .send({ space_id: sp.id, url: 'https://youtu.be/abc' }); + expect(res.status).toBe(202); + expect(res.body.job_id).toBeTruthy(); + const { default: jobsRepo } = await import('../../lib/db/repos/jobs.js'); + const rows = await jobsRepo.list({ name: 'ingest.video' }); + expect(rows.find(r => r.id === res.body.job_id)).toBeTruthy(); + const urlRows = await jobsRepo.list({ name: 'ingest.url' }); + expect(urlRows.find(r => r.id === res.body.job_id)).toBeFalsy(); + }); + ``` + +- [ ] **Step 2:** Run red. + +- [ ] **Step 3:** Modify `lib/api/routes/capture.js` — replace the `POST /` handler's enqueue logic: + + ```js + // helper near the top + const VIDEO_HOST_RE = /(^|\.)(youtube\.com|youtu\.be|vimeo\.com)$/i; + + function isVideoUrl(url) { + try { return VIDEO_HOST_RE.test(new URL(url).hostname); } + catch { return false; } + } + + // inside router.post('/'), after computing idem + the existing-ref check: + const isVideo = isVideoUrl(url); + const job_name = isVideo ? 'ingest.video' : 'ingest.url'; + const job_id = await queue.enqueue(job_name, { space_id, url }); + res.status(202).json({ job_id, idempotency_key: idem }); + ``` + +- [ ] **Step 4:** Run green: + + ```bash + npx vitest run tests/api/capture.test.js + ``` + Expected: existing + new tests pass. + +- [ ] **Step 5: Commit.** + + ```bash + git add lib/api/routes/capture.js tests/api/capture.test.js + git commit -m "feat(api): capture routes YouTube/Vimeo URLs to ingest.video" + ``` + +### Task C6: Phase C close — snapshot + memory + +- [ ] **Step 1:** Run full Node + Python test suites green. + + ```bash + cd /project/src/void-v2 && npx vitest run + cd workers && DATABASE_URL=... pytest -v + ``` + +- [ ] **Step 2:** Snapshot CT 310 + 311 — `plan4_phase_c_`. + +- [ ] **Step 3:** Update memory: "Plan 4 Phase C complete: Whisper + yt-dlp live; CT 311 resized 6c/8G + GPU passthrough; voidworkers.service running on CT 311." + +--- + +## Phase D — Source-doc sync + version bump + completion doc + +### Task D1: Python `safe_fetch` port + +**Files:** +- Create: `workers/void_workers/safe_fetch.py` +- Create: `workers/tests/test_safe_fetch.py` + +- [ ] **Step 1:** Failing test: + + ```python + # workers/tests/test_safe_fetch.py + import pytest + from void_workers.safe_fetch import safe_fetch, SafeFetchError + + def test_rejects_file_scheme(): + with pytest.raises(SafeFetchError): + safe_fetch("file:///etc/passwd") + + def test_rejects_loopback(): + with pytest.raises(SafeFetchError): + safe_fetch("http://127.0.0.1/x") + + def test_rejects_rfc1918(): + with pytest.raises(SafeFetchError): + safe_fetch("http://192.168.1.1/x") + + def test_rejects_metadata_endpoint(): + with pytest.raises(SafeFetchError): + safe_fetch("http://169.254.169.254/latest/") + ``` + +- [ ] **Step 2:** Run red. + +- [ ] **Step 3:** Implement `workers/void_workers/safe_fetch.py`: + + ```python + import socket + import ipaddress + import urllib.request + import os + from urllib.parse import urlparse + + BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [ + "0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8", + "172.16.0.0/12", "192.168.0.0/16", + "169.254.0.0/16", "100.64.0.0/10", + ]] + + class SafeFetchError(Exception): + pass + + def _is_blocked(addr): + if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true": + return False + try: + ip = ipaddress.ip_address(addr) + except ValueError: + return True + if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified: + return True + if isinstance(ip, ipaddress.IPv4Address): + return any(ip in n for n in BLOCK_V4_NETS) + # IPv6: ULA + link-local + if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"): + return True + return False + + def _resolve(host): + try: + infos = socket.getaddrinfo(host, None) + except socket.gaierror as e: + raise SafeFetchError(f"no DNS for {host}: {e}") + addrs = list({i[4][0] for i in infos}) + for a in addrs: + if _is_blocked(a): + raise SafeFetchError(f"{host} resolves to blocked address {a}") + if not addrs: + raise SafeFetchError(f"no addresses for {host}") + return addrs[0] + + def safe_fetch(url, *, headers=None, timeout=15, max_hops=5): + current = url + for hop in range(max_hops + 1): + u = urlparse(current) + if u.scheme not in ("http", "https"): + raise SafeFetchError(f"unsupported scheme {u.scheme}") + host = u.hostname + # If host is a literal IP, validate; else resolve + validate + try: + ipaddress.ip_address(host) + if _is_blocked(host): + raise SafeFetchError(f"blocked literal IP {host}") + except ValueError: + _resolve(host) + req = urllib.request.Request(current, headers=headers or {}) + try: + opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler) + with opener.open(req, timeout=timeout) as r: + return r.read() + except urllib.error.HTTPError as e: + if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops: + current = e.headers["Location"] + continue + raise + raise SafeFetchError(f"too many redirects ({max_hops})") + ``` + +- [ ] **Step 4:** Run green. + +- [ ] **Step 5: Commit.** + + ```bash + git add workers/void_workers/safe_fetch.py workers/tests/test_safe_fetch.py + git commit -m "feat(workers): safe_fetch (Python port)" + ``` + +### Task D2: `sync.source_doc` worker + +**Files:** +- Create: `workers/void_workers/handlers/sourcedoc.py` +- Modify: `workers/void_workers/handlers/__init__.py` +- Create: `workers/tests/test_sourcedoc.py` +- Modify: `workers/void_workers/repo.py` — add `get_source_doc`, `update_source_doc` + +- [ ] **Step 1:** Failing test: + + ```python + # workers/tests/test_sourcedoc.py + from unittest.mock import patch + from void_workers.handlers.sourcedoc import handle as handle_sd + + def test_sourcedoc_updates_body_text(conn): + conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") + import subprocess + subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True) + sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('sd', 'SD') RETURNING id").fetchone()[0] + res = conn.execute( + "INSERT INTO resources(space_id, slug, name, runtime_type) " + "VALUES(%s, 'r', 'R', 'lxc') RETURNING id", (sp,) + ).fetchone()[0] + sd = conn.execute( + "INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) " + "VALUES(%s, 'doc', 'https://example.com/r', 'url') RETURNING id", + (res,) + ).fetchone()[0] + with patch("void_workers.handlers.sourcedoc.safe_fetch", + return_value=b"hello world doc body"): + out = handle_sd({"source_doc_id": str(sd)}) + assert out.get("updated") is True + row = conn.execute("SELECT body_text FROM source_docs WHERE id=%s", (sd,)).fetchone() + assert "hello world" in (row[0] or "") + ``` + +- [ ] **Step 2:** Run red. + +- [ ] **Step 3:** Extend `workers/void_workers/repo.py` with the two helpers: + + ```python + def get_source_doc(source_doc_id): + with _conn() as conn: + return conn.execute( + "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) + ).fetchone() + + def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None): + with _conn() as conn: + before = conn.execute( + "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) + ).fetchone() + sets, args = [], [] + if body_text is not None: + sets.append("body_text=%s"); args.append(body_text) + if last_synced is not None: + sets.append("last_synced=%s"); args.append(last_synced) + if metadata_patch is not None: + sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") + args.append(json.dumps(metadata_patch)) + if not sets: return before + sets.append("updated_at=now()") + args.append(source_doc_id) + after = conn.execute( + f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args + ).fetchone() + conn.execute(""" + INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) + VALUES('worker', NULL, 'source_doc', %s, 'update', %s) + """, (source_doc_id, json.dumps({"kind":"update","changes":{"body_text":"updated" if body_text is not None else None}}))) + return after + ``` + +- [ ] **Step 4:** Implement `workers/void_workers/handlers/sourcedoc.py`: + + ```python + import hashlib + from datetime import datetime, timezone + from .. import repo + from ..safe_fetch import safe_fetch + + NAME = "sync.source_doc" + + def _sha(data): + return hashlib.sha256(data).hexdigest() + + def handle(job_data: dict) -> dict: + sd_id = job_data["source_doc_id"] + doc = repo.get_source_doc(sd_id) + if not doc: + return {"skipped": "gone"} + body = safe_fetch(doc["upstream_url"], headers={"User-Agent": "void-ingest/2.0"}) + new_sha = _sha(body) + old_sha = ((doc.get("metadata") or {}).get("body_sha")) + now = datetime.now(timezone.utc) + if new_sha == old_sha: + repo.update_source_doc(sd_id, last_synced=now) + return {"unchanged": True} + text = body.decode("utf-8", errors="replace")[:1_000_000] + repo.update_source_doc( + sd_id, + body_text=text, + last_synced=now, + metadata_patch={"body_sha": new_sha} + ) + return {"updated": True, "chars": len(text)} + ``` + +- [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`: + + ```python + from . import echo, pdf, image, video, sourcedoc + REGISTRY = { + echo.NAME: echo.handle, + pdf.NAME: pdf.handle, + image.NAME: image.handle, + video.NAME: video.handle, + sourcedoc.NAME: sourcedoc.handle, + } + ``` + +- [ ] **Step 6:** Run green. + +- [ ] **Step 7: Commit.** + + ```bash + git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/sourcedoc.py workers/void_workers/repo.py workers/tests/test_sourcedoc.py + git commit -m "feat(workers): sync.source_doc with sha256 diff" + ``` + +### Task D3: Node-side cron — `lib/cron/sync_source_docs.js` + +**Files:** +- Modify: `package.json` — add `node-cron`. +- Create: `lib/cron/index.js` +- Create: `lib/cron/sync_source_docs.js` +- Modify: `server.js` — start the cron in the CLI gate. +- Create: `tests/cron/sync_source_docs.test.js` + +- [ ] **Step 1:** Add dep: + + ```bash + cd /project/src/void-v2 && npm i node-cron@^3 + ``` + +- [ ] **Step 2:** Failing test `tests/cron/sync_source_docs.test.js`: + + ```js + import { describe, it, expect, beforeEach, afterEach } from 'vitest'; + import { resetDb } from '../helpers/db.js'; + import { migrateUp } from '../../lib/db/migrate.js'; + import { stopBoss } from '../helpers/boss.js'; + import { pool } from '../../lib/db/pool.js'; + import * as queue from '../../lib/jobs/queue.js'; + import { registerWorkers } from '../../lib/jobs/index.js'; + import { runSync } from '../../lib/cron/sync_source_docs.js'; + import * as jobs from '../../lib/db/repos/jobs.js'; + + beforeEach(async () => { + await resetDb(); await migrateUp(); + await queue.start(); await registerWorkers(); + }); + afterEach(async () => { await stopBoss(); }); + + describe('cron/sync_source_docs.runSync', () => { + it('enqueues sync.source_doc for each url-synced row', async () => { + const sp = (await pool.query( + `INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id` + )).rows[0].id; + const res = (await pool.query( + `INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`, + [sp] + )).rows[0].id; + await pool.query( + `INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`, + [res] + ); + const enqueued = await runSync(); + expect(enqueued).toBe(1); + const queued = await jobs.list({ name: 'sync.source_doc' }); + expect(queued.length).toBe(1); + }); + }); + ``` + +- [ ] **Step 3:** Run red. + +- [ ] **Step 4:** Implement `lib/cron/sync_source_docs.js`: + + ```js + import { pool } from '../db/pool.js'; + import * as queue from '../jobs/queue.js'; + import { log } from '../log.js'; + + export async function runSync() { + const { rows } = await pool.query( + `SELECT id FROM source_docs WHERE sync_source = 'url'` + ); + for (const r of rows) { + await queue.enqueue('sync.source_doc', { source_doc_id: r.id }); + } + return rows.length; + } + ``` + +- [ ] **Step 5:** Implement `lib/cron/index.js`: + + ```js + import cron from 'node-cron'; + import { runSync } from './sync_source_docs.js'; + import { log } from '../log.js'; + + export function startCron() { + // Daily at 03:00 local time + cron.schedule('0 3 * * *', async () => { + try { + const n = await runSync(); + log.info({ enqueued: n }, 'cron sync.source_doc complete'); + } catch (e) { + log.error({ err: e }, 'cron sync.source_doc failed'); + } + }); + log.info('cron started'); + } + ``` + +- [ ] **Step 6:** Wire into `server.js` — extend the CLI gate: + + ```js + // additions + import { startCron } from './lib/cron/index.js'; + + // Inside the `if (import.meta.url === ...)` block, after registerWorkers: + startCron(); + ``` + +- [ ] **Step 7:** Run green: + + ```bash + npx vitest run tests/cron/sync_source_docs.test.js + ``` + +- [ ] **Step 8: Commit.** + + ```bash + git add package.json package-lock.json lib/cron/index.js lib/cron/sync_source_docs.js server.js tests/cron/sync_source_docs.test.js + git commit -m "feat(cron): daily sync.source_doc enqueue" + ``` + +### Task D4: Version bump + CHANGELOG + completion doc + +**Files:** +- Modify: `package.json` → `2.0.0-alpha.4` +- Modify: `server.js` → `VERSION = '2.0.0-alpha.4'` +- Modify: `tests/server.test.js` → assert `2.0.0-alpha.4` +- Modify: `CHANGELOG.md` — append `[2.0.0-alpha.4]` entry +- Create: `docs/plan-4-complete.md` + +- [ ] **Step 1:** Bump version literals (three files). Use Edit. + +- [ ] **Step 2:** Append CHANGELOG entry: + + ```markdown + ## [2.0.0-alpha.4] — 2026-06-NN + + ### Added (Plan 4: Python void-workers) + - `void-workers.service` — Python 3.12 service alongside `void-server` + on CT 311. psycopg-based pg-boss client matches Node's claim/finish + semantics via `SELECT ... FOR UPDATE SKIP LOCKED`. + - **`extract.pdf`** — pdftotext first; per-page Tesseract OCR fallback + when extraction yields < 200 chars. + - **`extract.image`** — Tesseract OCR with English data. + - **`ingest.video`** — yt-dlp metadata + audio extraction + faster-whisper + transcription. CUDA at startup, CPU fallback on HA failover to Z3. + YouTube / youtu.be / Vimeo URLs route through `/api/capture` to + `ingest.video` instead of `ingest.url`. + - **`sync.source_doc`** — fetch upstream + sha256 diff + update + `body_text`. Node cron enqueues daily at 03:00. + - **Node `blob.js`** fans out to `extract.pdf` / `extract.image` after + creating PDF / image refs. + - **CT 311 infrastructure**: resized to 6 cores / 8 GB RAM, GPU device + nodes passed through (shared with CT 102's Ollama). + ``` + +- [ ] **Step 3:** Create `docs/plan-4-complete.md` (use the Plan 3 doc as a template — sections: Date / Version / Tests / Commits / Snapshots / Scope per phase / Security findings handled / Open items / What's left after Plan 4). + +- [ ] **Step 4:** Run full test suites green (Node + Python). + + ```bash + cd /project/src/void-v2 && npx vitest run + cd workers && DATABASE_URL=... pytest -v + ``` + +- [ ] **Step 5: Commit.** + + ```bash + git add package.json server.js tests/server.test.js CHANGELOG.md docs/plan-4-complete.md + git commit -m "chore: version 2.0.0-alpha.4 + changelog + plan-4 completion doc" + ``` + +### Task D5: Plan 4 close — snapshot + memory + +- [ ] **Step 1:** Snapshot CT 310 + 311 — `plan4_complete_`. +- [ ] **Step 2:** Update memory `project_void_v2_execution.md` — mark Plan 4 complete; note alpha-4 source-built but not yet deployed (deploy is a separate user-OK decision per the standing rule). + +--- + +## Spec coverage check + +Every Plan 4 spec section maps to at least one task: + +| Spec § | Task(s) | +|---|---| +| Phase A harness | A1, A2, A3, A4 | +| Phase B PDF + image | B1, B2, B3, B4, B5 | +| Phase C Whisper + yt-dlp + GPU | C1, C2, C3, C4, C5 | +| Phase D source-doc sync + version | D1, D2, D3, D4 | +| `safe_fetch` Python port | D1 | +| Concurrency env config | A1 (`config.py`) — applies across handlers | +| systemd `MemoryMax=6G` | A4 (`void-workers.service`) | +| CT 311 resize + GPU passthrough | C1 | +| YouTube routing in capture.js | C5 | +| `blob.js` fans out | B5 | +| Repo audit shim (`actor_kind='worker'`) | B2 (`repo.update_ref`), C4 (`repo.create_ref`), D2 (`repo.update_source_doc`) | + +## Type & name consistency + +- Worker name strings: `echo`, `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc` — exact same strings used in `NAME` constants, registry keys, Node enqueue calls, and tests. +- Handler signature: `def handle(job_data: dict) -> dict` everywhere. +- Job payload shapes: + - `extract.pdf` / `extract.image`: `{ ref_id, blob_path }` (both producers Node-side use these keys; consumer reads same keys). + - `ingest.video`: `{ space_id, url }`. + - `sync.source_doc`: `{ source_doc_id }` (Node cron and Python worker both use this exact key). +- `repo.update_ref` / `repo.create_ref` / `repo.update_source_doc` / `repo.get_source_doc` — same names across all tasks that touch them.