# Void 2.0 — Plan 4: Python void-workers (heavy ML ingest) > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Stand up the Python `void-workers` service alongside the existing Node `void-server` on CT 311. Workers claim jobs from the same pg-boss queue via native SQL (`SELECT ... FOR UPDATE SKIP LOCKED`) and process four kinds: `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Whisper detects CUDA at startup and falls back to CPU when the HA replica is running on Z3 (no GPU). **Architecture:** Single CT, two systemd units (`void-server.service` + `void-workers.service`). Shared blob store at `/var/lib/void/blobs`. Shared Postgres at CT 310. Python venv at `/opt/void-workers/venv`. Per-kind handlers run as threads inside one Python process; concurrency configured via env. Node-side hooks: capture.js routes YouTube URLs to `ingest.video`; blob worker enqueues `extract.pdf`/`extract.image` based on `kind`. **Tech Stack:** Python 3.12, psycopg 3, pdftotext + Tesseract + Poppler + Pillow, faster-whisper (CTranslate2 backend), yt-dlp, ffmpeg, structlog, pytest, vitest (existing), node-cron (new Node dep for Phase D). **Spec:** `docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md`. --- ## Out of scope (Plan 5+) - MCP server surface, Companion chat (Plan 5). - Sacred Valley widgets (Plan 6). - Speaker diarization, per-language Tesseract data, larger Whisper models with proper GPU budgeting. - "Re-transcribe" UI action. ## Conventions 1. **TDD.** Write the failing test first. Run it red. Implement. Run it green. Commit. 2. **Worker handler shape:** `def handler(job_data: dict) -> dict` — return the success output, raise for retry. 3. **Worker name strings:** `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Exact strings — used in Node enqueue calls AND Python `SUBSCRIBE` registrations. 4. **All Python repo writes pass `actor={'kind': 'worker', 'id': None}`** when emitting audit_log rows. 5. **Commit per task.** 6. **Snapshot CT 310 + CT 311 before any phase that touches infra** (the standing rule). Phase A is code-only. Phase B installs system deps. Phase C resizes the CT + adds GPU passthrough — explicit snapshot before that one. --- ## File structure (what this plan creates) ``` workers/ # NEW — Python service lives in same repo pyproject.toml # Python 3.12 + deps (psycopg, structlog, ...) README.md # short ops notes void_workers/ __init__.py config.py # env loader log.py # structlog setup boss.py # claim / finish / fail / retry against pgboss tables repo.py # psycopg shim for refs / source_docs updates + audit safe_fetch.py # Python port of lib/ingest/safe_fetch.js runner.py # main: load handlers, spawn loops, signal handlers handlers/ __init__.py echo.py # trivial — proves harness end-to-end pdf.py # extract.pdf (Phase B) image.py # extract.image (Phase B) video.py # ingest.video (Phase C) sourcedoc.py # sync.source_doc (Phase D) model.py # Whisper loader (CUDA detect + CPU fallback) — Phase C tests/ conftest.py # shared fixtures (DB url, pgboss schema reset, sample blobs) fixtures/ born_digital.pdf # tiny PDF with embedded text scanned.pdf # tiny PDF that is a single page of an image eng_text.png # PNG with known text baked in test_boss.py test_echo.py test_pdf.py test_image.py test_video.py test_sourcedoc.py test_safe_fetch.py deploy/ void-workers.service # NEW systemd unit push-workers.sh # NEW — sibling to push.sh README.md # extend: workers bootstrap section lib/ # Node-side changes (already exists) jobs/workers/blob.js # MODIFY: after creating ref, enqueue extract.pdf/extract.image api/routes/capture.js # MODIFY: detect YouTube/Vimeo, enqueue ingest.video cron/sync_source_docs.js # NEW — node-cron, fires sync.source_doc daily package.json # ADD node-cron, bump version in Phase D server.js # bump VERSION + boot the cron in Phase D CHANGELOG.md # Phase D entry tests/server.test.js # version assertion bump docs/plan-4-complete.md # Phase D ``` --- ## Phase A — Python harness + systemd unit ### Task A1: Workers skeleton + venv install on the dev box **Files:** - Create: `workers/pyproject.toml` - Create: `workers/void_workers/__init__.py` - Create: `workers/void_workers/config.py` - Create: `workers/void_workers/log.py` - Create: `workers/README.md` - [ ] **Step 1:** Create `workers/pyproject.toml`: ```toml [project] name = "void-workers" version = "0.1.0" requires-python = ">=3.12" dependencies = [ "psycopg[binary,pool]>=3.2", "structlog>=24.1", ] [project.optional-dependencies] pdf = ["pdfplumber>=0.11", "pytesseract>=0.3.13", "pillow>=10.3"] image = ["pytesseract>=0.3.13", "pillow>=10.3"] video = ["yt-dlp>=2024.10.0", "faster-whisper>=1.0.3"] test = ["pytest>=8.0", "pytest-asyncio>=0.23"] all = ["void-workers[pdf,image,video,test]"] [tool.setuptools.packages.find] where = ["."] include = ["void_workers*"] ``` - [ ] **Step 2:** Create `workers/void_workers/__init__.py`: ```python __version__ = "0.1.0" ``` - [ ] **Step 3:** Create `workers/void_workers/config.py`: ```python import os def env(name, default=None, required=False): v = os.environ.get(name, default) if required and v is None: raise RuntimeError(f"env {name} is required") return v def env_int(name, default): return int(os.environ.get(name, default)) DATABASE_URL = env("DATABASE_URL", required=True) BLOB_ROOT = env("BLOB_ROOT", "/var/lib/void/blobs") WHISPER_MODEL = env("WHISPER_MODEL", "small.en") WHISPER_CACHE = env("WHISPER_CACHE", "/var/lib/void/whisper-models") ALLOW_PRIVATE = env("VOID_INGEST_ALLOW_PRIVATE", "false") == "true" CONCURRENCY = { "extract.pdf": env_int("VOID_CONCURRENCY_EXTRACT_PDF", 2), "extract.image": env_int("VOID_CONCURRENCY_EXTRACT_IMAGE", 2), "ingest.video": env_int("VOID_CONCURRENCY_INGEST_VIDEO", 1), "sync.source_doc": env_int("VOID_CONCURRENCY_SYNC_SOURCE_DOC", 1), "echo": env_int("VOID_CONCURRENCY_ECHO", 1), } POLL_INTERVAL_MS = env_int("VOID_POLL_INTERVAL_MS", 1000) ``` - [ ] **Step 4:** Create `workers/void_workers/log.py`: ```python import logging import structlog def init(): structlog.configure( processors=[ structlog.contextvars.merge_contextvars, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), ], wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), ) return structlog.get_logger() log = init() ``` - [ ] **Step 5:** Create `workers/README.md`: ```markdown # void-workers Python ML ingest service alongside void-server (Node). Sibling of `lib/`. ## Local dev ```bash cd workers python3.12 -m venv .venv . .venv/bin/activate pip install -e ".[all]" export DATABASE_URL="postgres://..." python -m void_workers.runner ``` ## Tests ```bash pip install -e ".[test,all]" pytest -v ``` See `../docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md` for the full plan. ``` - [ ] **Step 6:** Create the venv + install: ```bash cd /project/src/void-v2/workers python3.12 -m venv .venv . .venv/bin/activate pip install -e ".[all]" python -c "from void_workers import __version__; print(__version__)" ``` Expected: `0.1.0`. - [ ] **Step 7: Commit.** ```bash cd /project/src/void-v2 git add workers/pyproject.toml workers/README.md workers/void_workers/__init__.py workers/void_workers/config.py workers/void_workers/log.py git commit -m "feat(workers): Python skeleton + config + structlog" ``` ### Task A2: pgboss claim/finish helpers in `boss.py` **Files:** - Create: `workers/void_workers/boss.py` - Create: `workers/tests/conftest.py` - Create: `workers/tests/test_boss.py` - [ ] **Step 1:** Create `workers/tests/conftest.py`: ```python import os import pytest import psycopg DB_URL = os.environ["DATABASE_URL"] @pytest.fixture def conn(): with psycopg.connect(DB_URL, autocommit=True) as c: yield c @pytest.fixture(autouse=True) def reset_pgboss(conn): """Drop and recreate the pgboss schema before each test.""" conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE") # We do NOT recreate it — pg-boss's first start() makes the schema. # Tests that need it call ensure_pgboss(conn). yield def ensure_pgboss(conn): """Bring pgboss schema up by shelling out to Node's queue.start().""" # In tests we cannot rely on Node — so we create the bare minimum # tables we touch. Real pgboss tables are richer; we only need the # columns boss.py reads and writes. conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss") conn.execute(""" CREATE TABLE IF NOT EXISTS pgboss.job ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), name text NOT NULL, priority int NOT NULL DEFAULT 0, data jsonb, state text NOT NULL DEFAULT 'created', retry_limit int NOT NULL DEFAULT 5, retry_count int NOT NULL DEFAULT 0, retry_delay int NOT NULL DEFAULT 10, retry_backoff boolean NOT NULL DEFAULT true, start_after timestamptz NOT NULL DEFAULT now(), started_on timestamptz, completed_on timestamptz, created_on timestamptz NOT NULL DEFAULT now(), output jsonb ) """) @pytest.fixture def boss_ready(conn): ensure_pgboss(conn) yield conn ``` - [ ] **Step 2:** Write the failing test `workers/tests/test_boss.py`: ```python import json from void_workers.boss import Boss def test_claim_returns_none_when_no_jobs(boss_ready): boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) assert boss.claim("echo") is None def test_claim_then_complete_round_trip(boss_ready): boss_ready.execute( "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", (json.dumps({"ping": 1}),) ) boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) job = boss.claim("echo") assert job is not None assert job["data"] == {"ping": 1} boss.complete(job["id"], {"pong": 1}) row = boss_ready.execute( "SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],) ).fetchone() assert row[0] == "completed" assert row[1] == {"pong": 1} def test_fail_records_output_and_state(boss_ready): boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')") boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) job = boss.claim("echo") boss.fail(job["id"], {"name": "BoomError", "message": "boom"}) row = boss_ready.execute( "SELECT state, output, retry_count FROM pgboss.job WHERE id=%s", (job["id"],) ).fetchone() # retry_limit defaults to 5 and retry_count was 0 → should move to 'retry' assert row[0] == "retry" assert row[2] == 1 def test_fail_past_retry_limit_marks_failed(boss_ready): boss_ready.execute( "INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)" ) boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL) job = boss.claim("echo") boss.fail(job["id"], {"name": "Done"}) row = boss_ready.execute( "SELECT state FROM pgboss.job WHERE id=%s", (job["id"],) ).fetchone() assert row[0] == "failed" ``` - [ ] **Step 3:** Run red: ```bash cd /project/src/void-v2/workers . .venv/bin/activate DATABASE_URL="$(grep ^DATABASE_URL= ../.env 2>/dev/null | cut -d= -f2- || echo "postgres://void:$(ssh root@192.168.1.124 cat /root/void2-db-pass.txt | sed s/^DB_PASS=//)@192.168.1.215:5432/void")" \ pytest tests/test_boss.py -v ``` Expected: 4 failed (boss.py does not exist). - [ ] **Step 4:** Implement `workers/void_workers/boss.py`: ```python import json import psycopg from psycopg.rows import dict_row class Boss: def __init__(self, dsn): self.dsn = dsn def _conn(self): return psycopg.connect(self.dsn, autocommit=False, row_factory=dict_row) def claim(self, queue): """Atomically claim one job for the given queue. Returns dict or None.""" with self._conn() as conn: with conn.cursor() as cur: cur.execute(""" SELECT id, data, retry_count, retry_limit FROM pgboss.job WHERE name=%s AND state IN ('created','retry') AND start_after <= now() ORDER BY priority DESC, created_on ASC FOR UPDATE SKIP LOCKED LIMIT 1 """, (queue,)) row = cur.fetchone() if not row: return None cur.execute( "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s", (row["id"],) ) conn.commit() return row def complete(self, job_id, output): with self._conn() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE pgboss.job SET state='completed', completed_on=now(), output=%s WHERE id=%s """, (json.dumps(output), job_id)) conn.commit() def fail(self, job_id, output): """Mark the job retry (if budget left) or failed (otherwise).""" with self._conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT retry_count, retry_limit, retry_delay, retry_backoff " "FROM pgboss.job WHERE id=%s FOR UPDATE", (job_id,) ) row = cur.fetchone() if not row: return new_count = row["retry_count"] + 1 if new_count > row["retry_limit"]: cur.execute( "UPDATE pgboss.job SET state='failed', output=%s, " "completed_on=now() WHERE id=%s", (json.dumps(output), job_id) ) else: # exponential backoff: delay * 2^(count-1) delay = row["retry_delay"] if row["retry_backoff"]: delay = delay * (2 ** (new_count - 1)) cur.execute(""" UPDATE pgboss.job SET state='retry', retry_count=%s, start_after=now() + (%s || ' seconds')::interval, output=%s WHERE id=%s """, (new_count, str(delay), json.dumps(output), job_id)) conn.commit() ``` - [ ] **Step 5:** Run green: ```bash DATABASE_URL=... pytest tests/test_boss.py -v ``` Expected: 4 passed. - [ ] **Step 6: Commit.** ```bash cd /project/src/void-v2 git add workers/void_workers/boss.py workers/tests/conftest.py workers/tests/test_boss.py git commit -m "feat(workers): pgboss claim/complete/fail via psycopg" ``` ### Task A3: Echo handler + runner loop **Files:** - Create: `workers/void_workers/handlers/__init__.py` - Create: `workers/void_workers/handlers/echo.py` - Create: `workers/void_workers/runner.py` - Create: `workers/tests/test_echo.py` - [ ] **Step 1:** Write failing test `workers/tests/test_echo.py`: ```python import json import threading import time from void_workers.boss import Boss from void_workers.runner import Runner from void_workers.config import DATABASE_URL def test_echo_handler_runs(boss_ready): boss_ready.execute( "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", (json.dumps({"ping": 42}),) ) runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}}) t = threading.Thread(target=runner.run, kwargs={"once": True}, daemon=True) t.start() t.join(timeout=5) row = boss_ready.execute( "SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1" ).fetchone() assert row[0] == "completed" assert row[1] == {"pong": 42} ``` - [ ] **Step 2:** Run red: ```bash DATABASE_URL=... pytest tests/test_echo.py -v ``` Expected: ImportError / 1 failed. - [ ] **Step 3:** Implement `workers/void_workers/handlers/__init__.py`: ```python from . import echo REGISTRY = { echo.NAME: echo.handle, } ``` - [ ] **Step 4:** Implement `workers/void_workers/handlers/echo.py`: ```python NAME = "echo" def handle(job_data: dict) -> dict: return {"pong": job_data.get("ping", 0)} ``` - [ ] **Step 5:** Implement `workers/void_workers/runner.py`: ```python import signal import threading import time from concurrent.futures import ThreadPoolExecutor from .boss import Boss from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL from .handlers import REGISTRY from .log import log class Runner: def __init__(self, dsn=None, handlers=None): self.boss = Boss(dsn or DATABASE_URL) self.handlers = handlers or { name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY } self._stop = threading.Event() def _process_one(self, queue): job = self.boss.claim(queue) if not job: return False log.info("job_claimed", job_id=str(job["id"]), name=queue) handler = REGISTRY[queue] try: out = handler(job["data"] or {}) self.boss.complete(job["id"], out or {}) log.info("job_completed", job_id=str(job["id"]), name=queue) except Exception as e: log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e)) self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)}) return True def _loop(self, queue): interval = POLL_INTERVAL_MS / 1000.0 while not self._stop.is_set(): try: did = self._process_one(queue) except Exception as e: log.error("loop_error", queue=queue, err=str(e)) did = False if not did: self._stop.wait(interval) def run(self, once=False): if once: for q in self.handlers: self._process_one(q) return executor = ThreadPoolExecutor(max_workers=sum(h["concurrency"] for h in self.handlers.values())) for q, cfg in self.handlers.items(): for _ in range(cfg["concurrency"]): executor.submit(self._loop, q) for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, lambda *_: self._stop.set()) while not self._stop.is_set(): time.sleep(0.5) executor.shutdown(wait=True) if __name__ == "__main__": Runner().run() ``` - [ ] **Step 6:** Run green: ```bash DATABASE_URL=... pytest tests/test_echo.py -v ``` Expected: 1 passed. - [ ] **Step 7: Commit.** ```bash cd /project/src/void-v2 git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/echo.py workers/void_workers/runner.py workers/tests/test_echo.py git commit -m "feat(workers): runner loop + echo handler" ``` ### Task A4: systemd unit + push-workers.sh **Files:** - Create: `deploy/void-workers.service` - Create: `deploy/push-workers.sh` - Modify: `deploy/README.md` (append workers bootstrap section) - [ ] **Step 1:** Create `deploy/void-workers.service`: ```ini [Unit] Description=Void 2.0 workers After=network-online.target void-server.service Wants=network-online.target [Service] Type=simple User=voidworkers WorkingDirectory=/opt/void-workers EnvironmentFile=/opt/void-workers/.env ExecStart=/opt/void-workers/venv/bin/python -m void_workers.runner Restart=on-failure RestartSec=5 StandardOutput=journal StandardError=journal MemoryMax=6G [Install] WantedBy=multi-user.target ``` - [ ] **Step 2:** Create `deploy/push-workers.sh`: ```bash #!/usr/bin/env bash set -euo pipefail # Push Python void-workers source to CT 311 and restart the service. # Run from /project/src/void-v2. TARGET=${TARGET:-root@192.168.1.13} REMOTE_DIR=${REMOTE_DIR:-/opt/void-workers} rsync -avz --delete \ --exclude .venv \ --exclude __pycache__ \ --exclude '*.egg-info' \ --exclude tests \ workers/ "$TARGET:$REMOTE_DIR/" ssh "$TARGET" " cd $REMOTE_DIR if [ ! -d venv ]; then python3.12 -m venv venv fi . venv/bin/activate pip install --quiet --upgrade pip pip install --quiet -e '.[all]' systemctl restart void-workers " echo "Workers deployed." ``` Make it executable: ```bash chmod +x deploy/push-workers.sh ``` - [ ] **Step 3:** Extend `deploy/README.md` — append the following section: ```markdown ## Workers (Python void-workers — Plan 4+) Runs alongside void-server as a second systemd unit. One-time setup on CT 311: ```bash apt install -y python3.12 python3.12-venv python3-pip \ ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers mkdir -p /opt/void-workers /var/lib/void/whisper-models chown voidworkers: /opt/void-workers chown -R voidworkers: /var/lib/void/whisper-models # voidworkers needs to read the shared blob store usermod -aG void voidworkers install -m 644 deploy/void-workers.service /etc/systemd/system/ systemctl daemon-reload systemctl enable void-workers # /opt/void-workers/.env — see plan §Deploy delta ``` Deploy after edits: ```bash cd /project/src/void-v2 ./deploy/push-workers.sh ``` ``` - [ ] **Step 4:** Run the existing Node test suite — must still be green (we have not modified anything Node touches yet). ```bash cd /project/src/void-v2 npx vitest run ``` - [ ] **Step 5: Commit.** ```bash git add deploy/void-workers.service deploy/push-workers.sh deploy/README.md git commit -m "feat(workers): systemd unit + push-workers.sh" ``` ### Task A5: Phase A close — memory checkpoint - [ ] **Step 1:** Update memory `project_void_v2_execution.md`: mark "Plan 4 Phase A complete: harness + echo running locally; void-workers.service installed on CT 311 pending real deps (Phase B+)". - [ ] **Step 2:** Note the boss.py contract decision (psycopg + native SQL) so future plans don't re-litigate it. --- ## Phase B — `extract.pdf` + `extract.image` ### Task B1: Install system deps on the dev box These deps support local pytest runs. The same packages will be installed on CT 311 when Phase A's systemd unit lands there. - [ ] **Step 1:** Verify deps locally (we are running on Ubuntu 24.04 LXC per CLAUDE.md): ```bash command -v pdftotext && command -v tesseract && command -v ffmpeg || echo "missing deps" apt list --installed 2>/dev/null | grep -E "poppler|tesseract|ffmpeg" | head -5 ``` - [ ] **Step 2:** If any are missing, install: ```bash apt install -y poppler-utils tesseract-ocr tesseract-ocr-eng ffmpeg ``` - [ ] **Step 3:** Install pdf/image Python extras into the workers venv: ```bash cd /project/src/void-v2/workers . .venv/bin/activate pip install -e '.[pdf,image,test]' python -c "import pdfplumber, pytesseract; print('ok')" ``` - [ ] **Step 4:** Add a `tests/fixtures/` README explaining how fixtures were generated (so we can regenerate them): Create `workers/tests/fixtures/README.md`: ```markdown # Test fixtures - `born_digital.pdf` — generated by `pdftotext -version 2>&1 | head -1 > /tmp/b.txt && enscript -p /tmp/b.ps /tmp/b.txt && ps2pdf /tmp/b.ps born_digital.pdf` (or any tool that produces a PDF with embedded text). Must contain the string "void-workers". - `scanned.pdf` — `convert -density 200 eng_text.png scanned.pdf` (PDF that is a single page of an image — `pdftotext` will yield nothing, Tesseract fallback must pick it up). - `eng_text.png` — a PNG with a clear English sentence on white background. Must contain the string "blackflame". ``` Generate them now: ```bash cd /project/src/void-v2/workers/tests/fixtures # eng_text.png: render text with ImageMagick (apt: imagemagick) apt install -y imagemagick ghostscript convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 36 \ -fill black -annotate +50+100 "blackflame palette" eng_text.png # born_digital.pdf: simple TXT → PDF echo "void-workers proof-of-life document" > /tmp/b.txt enscript -p /tmp/b.ps /tmp/b.txt 2>/dev/null || echo "enscript missing — using ImageMagick" if [ -f /tmp/b.ps ]; then ps2pdf /tmp/b.ps born_digital.pdf else convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 24 \ -fill black -annotate +50+100 "void-workers proof-of-life" born_digital.pdf # NOTE: this is a single-image PDF too — pdftotext will return empty. # Override: use poppler's pdftohtml tools or create with text. fi # scanned.pdf: PNG → PDF convert -density 200 eng_text.png scanned.pdf ls -la ``` If `enscript` is not available, install or use another text-PDF tool. The key fixture invariant is that `pdftotext born_digital.pdf -` returns non-empty text. - [ ] **Step 5: Commit fixtures.** ```bash cd /project/src/void-v2 git add workers/tests/fixtures git commit -m "test(workers): pdf/image test fixtures" ``` ### Task B2: `extract.pdf` worker — pdftotext path **Files:** - Create: `workers/void_workers/handlers/pdf.py` - Modify: `workers/void_workers/handlers/__init__.py` - Create: `workers/void_workers/repo.py` - Create: `workers/tests/test_pdf.py` - [ ] **Step 1:** Write the failing test: ```python # workers/tests/test_pdf.py import json import os import psycopg from pathlib import Path from void_workers.handlers.pdf import handle as handle_pdf from void_workers.config import DATABASE_URL FIXTURES = Path(__file__).parent / "fixtures" def _seed_space_and_ref(conn, blob_path, kind="pdf"): """Bring up the Void schema (matches migrate.js) just enough to insert a ref.""" sp = conn.execute( "INSERT INTO spaces(slug, name) VALUES('plan4-tests', 'P4') " "ON CONFLICT (slug) DO UPDATE SET name=EXCLUDED.name RETURNING id" ).fetchone()[0] ref = conn.execute( "INSERT INTO refs(space_id, kind, source_url, title, blob_path) " "VALUES(%s, %s, NULL, 'fixture', %s) RETURNING id", (sp, kind, str(blob_path)) ).fetchone()[0] return sp, ref def test_pdf_born_digital_uses_pdftotext(conn): """A PDF with embedded text gets pdftotext-extracted body.""" conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") # Apply Void migrations against the conn _run_node_migrations() blob = FIXTURES / "born_digital.pdf" sp, ref = _seed_space_and_ref(conn, blob) out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)}) assert out["ref_id"] == str(ref) assert out["chars"] > 0 row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() assert "void-workers" in (row[0] or "").lower() def _run_node_migrations(): """Shell to Node's migrate runner — same DB.""" import subprocess subprocess.run( ["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True ) ``` - [ ] **Step 2:** Run red: ```bash cd /project/src/void-v2/workers . .venv/bin/activate DATABASE_URL=... pytest tests/test_pdf.py -v ``` Expected: ImportError on pdf module. - [ ] **Step 3:** Implement `workers/void_workers/repo.py`: ```python import json import psycopg from psycopg.rows import dict_row from .config import DATABASE_URL def _conn(): return psycopg.connect(DATABASE_URL, autocommit=True, row_factory=dict_row) def update_ref(ref_id, *, body_text=None, metadata_patch=None): """UPDATE refs ... + emit audit_log row with actor_kind='worker'.""" with _conn() as conn: before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone() sets, args = [], [] if body_text is not None: sets.append("body_text=%s"); args.append(body_text) if metadata_patch is not None: sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") args.append(json.dumps(metadata_patch)) if not sets: return before sets.append("updated_at=now()") args.append(ref_id) after = conn.execute( f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *", args ).fetchone() # audit log — match Node's audit.js diff shape (simple) diff = {} if body_text is not None and (before or {}).get("body_text") != body_text: diff["body_text"] = {"before": "(redacted)", "after_len": len(body_text)} conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'ref', %s, 'update', %s) """, (ref_id, json.dumps({"kind": "update", "changes": diff}))) return after ``` - [ ] **Step 4:** Implement `workers/void_workers/handlers/pdf.py`: ```python import subprocess from .. import repo NAME = "extract.pdf" def _pdftotext(blob_path): return subprocess.check_output( ["pdftotext", "-layout", blob_path, "-"], timeout=120 ).decode("utf-8", errors="replace") def handle(job_data: dict) -> dict: ref_id = job_data["ref_id"] blob_path = job_data["blob_path"] text = _pdftotext(blob_path).strip() method = "pdftotext" # Fallback (Tesseract OCR) lands in Task B3. body_text = text[:200_000] repo.update_ref(ref_id, body_text=body_text, metadata_patch={"extract": {"method": method, "chars": len(body_text)}}) return {"ref_id": ref_id, "chars": len(body_text), "method": method} ``` - [ ] **Step 5:** Add `pdf.handle` to the registry — edit `workers/void_workers/handlers/__init__.py`: ```python from . import echo, pdf REGISTRY = { echo.NAME: echo.handle, pdf.NAME: pdf.handle, } ``` - [ ] **Step 6:** Run green: ```bash DATABASE_URL=... pytest tests/test_pdf.py::test_pdf_born_digital_uses_pdftotext -v ``` Expected: 1 passed. - [ ] **Step 7: Commit.** ```bash cd /project/src/void-v2 git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/pdf.py workers/void_workers/repo.py workers/tests/test_pdf.py git commit -m "feat(workers): extract.pdf via pdftotext" ``` ### Task B3: PDF Tesseract fallback for scanned pages **Files:** - Modify: `workers/void_workers/handlers/pdf.py` - Modify: `workers/tests/test_pdf.py` - [ ] **Step 1:** Add the failing test (append to `test_pdf.py`): ```python def test_pdf_scanned_falls_back_to_tesseract(conn): conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") _run_node_migrations() blob = FIXTURES / "scanned.pdf" sp, ref = _seed_space_and_ref(conn, blob) out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)}) assert out["method"] == "tesseract" row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() assert "blackflame" in (row[0] or "").lower() ``` - [ ] **Step 2:** Run red. - [ ] **Step 3:** Implement the fallback in `workers/void_workers/handlers/pdf.py`: ```python import io import subprocess import tempfile from pathlib import Path from PIL import Image import pytesseract from .. import repo NAME = "extract.pdf" FALLBACK_THRESHOLD = 200 # chars def _pdftotext(blob_path): return subprocess.check_output( ["pdftotext", "-layout", blob_path, "-"], timeout=120 ).decode("utf-8", errors="replace") def _ocr_pdf(blob_path): """Rasterize each page with pdftoppm, OCR each with Tesseract.""" with tempfile.TemporaryDirectory() as tmp: subprocess.run( ["pdftoppm", "-r", "200", "-png", blob_path, f"{tmp}/p"], check=True, timeout=300 ) pages = sorted(Path(tmp).glob("p-*.png")) parts = [] for p in pages: img = Image.open(p) parts.append(pytesseract.image_to_string(img, lang="eng")) return "\n".join(parts) def handle(job_data: dict) -> dict: ref_id = job_data["ref_id"] blob_path = job_data["blob_path"] method = "pdftotext" text = _pdftotext(blob_path).strip() if len(text) < FALLBACK_THRESHOLD: method = "tesseract" text = _ocr_pdf(blob_path).strip() body_text = text[:200_000] repo.update_ref(ref_id, body_text=body_text, metadata_patch={"extract": {"method": method, "chars": len(body_text)}}) return {"ref_id": ref_id, "chars": len(body_text), "method": method} ``` - [ ] **Step 4:** Run green: ```bash DATABASE_URL=... pytest tests/test_pdf.py -v ``` Expected: 2 passed. - [ ] **Step 5: Commit.** ```bash git add workers/void_workers/handlers/pdf.py workers/tests/test_pdf.py git commit -m "feat(workers): pdf Tesseract fallback for scanned pages" ``` ### Task B4: `extract.image` worker **Files:** - Create: `workers/void_workers/handlers/image.py` - Modify: `workers/void_workers/handlers/__init__.py` - Create: `workers/tests/test_image.py` - [ ] **Step 1:** Write the failing test `workers/tests/test_image.py`: ```python import os from pathlib import Path from void_workers.handlers.image import handle as handle_image FIXTURES = Path(__file__).parent / "fixtures" def test_image_ocr(conn): conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") import subprocess subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True) sp = conn.execute( "INSERT INTO spaces(slug, name) VALUES('plan4-img', 'I') RETURNING id" ).fetchone()[0] blob = FIXTURES / "eng_text.png" ref = conn.execute( "INSERT INTO refs(space_id, kind, blob_path) VALUES(%s, 'image', %s) RETURNING id", (sp, str(blob)) ).fetchone()[0] out = handle_image({"ref_id": str(ref), "blob_path": str(blob)}) assert out["chars"] > 0 row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone() assert "blackflame" in (row[0] or "").lower() ``` - [ ] **Step 2:** Run red. - [ ] **Step 3:** Implement `workers/void_workers/handlers/image.py`: ```python from PIL import Image import pytesseract from .. import repo NAME = "extract.image" def handle(job_data: dict) -> dict: ref_id = job_data["ref_id"] blob_path = job_data["blob_path"] text = pytesseract.image_to_string(Image.open(blob_path), lang="eng").strip() body_text = text[:200_000] repo.update_ref(ref_id, body_text=body_text, metadata_patch={"extract": {"method": "tesseract", "chars": len(body_text)}}) return {"ref_id": ref_id, "chars": len(body_text)} ``` - [ ] **Step 4:** Register — edit `workers/void_workers/handlers/__init__.py`: ```python from . import echo, pdf, image REGISTRY = { echo.NAME: echo.handle, pdf.NAME: pdf.handle, image.NAME: image.handle, } ``` - [ ] **Step 5:** Run green. - [ ] **Step 6: Commit.** ```bash git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/image.py workers/tests/test_image.py git commit -m "feat(workers): extract.image via Tesseract" ``` ### Task B5: Node side — blob worker enqueues extract jobs **Files:** - Modify: `lib/jobs/workers/blob.js` - Modify: `tests/jobs/workers/blob.test.js` - [ ] **Step 1:** Extend the existing blob.test.js with a new assertion. Append: ```js it('enqueues extract.pdf after creating a pdf ref', async () => { const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null }); const upTmp = path.join(tmpRoot, 'doc.tmp'); await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...')); const id = await queue.enqueue('ingest.blob', { space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf' }); await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js'); const followUp = (await jobsRepo.list({ name: 'extract.pdf' })).find(j => j.data?.blob_path); expect(followUp).toBeTruthy(); }); it('enqueues extract.image after creating an image ref', async () => { const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null }); const upTmp = path.join(tmpRoot, 'pic.tmp'); await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47])); const id = await queue.enqueue('ingest.blob', { space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png' }); await waitForJob('ingest.blob', id, { timeoutMs: 10_000 }); const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js'); const followUp = (await jobsRepo.list({ name: 'extract.image' })).find(j => j.data?.blob_path); expect(followUp).toBeTruthy(); }); ``` - [ ] **Step 2:** Run red: ```bash cd /project/src/void-v2 npx vitest run tests/jobs/workers/blob.test.js ``` Expected: 2 new tests fail (no extract.* enqueue yet). - [ ] **Step 3:** Modify `lib/jobs/workers/blob.js` to enqueue the follow-up job: ```js // existing imports import * as queue from '../queue.js'; // ... inside handler, after `const row = await refs.create(...)`: if (kind === 'pdf') { await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path }); } else if (kind === 'image') { await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path }); } return { ref_id: row.id, sha }; ``` (Apply the exact edit at the `return { ref_id: row.id, sha };` site.) - [ ] **Step 4:** Run green: ```bash npx vitest run tests/jobs/workers/blob.test.js ``` Expected: 4 passed. - [ ] **Step 5:** Run the full suite — must still be green. ```bash npx vitest run ``` - [ ] **Step 6: Commit.** ```bash git add lib/jobs/workers/blob.js tests/jobs/workers/blob.test.js git commit -m "feat(jobs): blob worker fans out to extract.pdf / extract.image" ``` ### Task B6: Phase B close — memory + full suite - [ ] **Step 1:** `npx vitest run` — full Node suite green. - [ ] **Step 2:** `cd workers && DATABASE_URL=... pytest -v` — full Python suite green. - [ ] **Step 3:** Update memory: "Plan 4 Phase B complete: extract.pdf + extract.image workers; Node blob worker fans out." --- ## Phase C — `ingest.video` + CT 311 resize + GPU passthrough ### Task C1: Snapshot + CT 311 resize + GPU passthrough User pre-authorized this 2026-06-01. - [ ] **Step 1:** Snapshot CT 310 + CT 311. ```bash SNAP=plan4_pre_resize_$(date +%Y%m%d_%H%M) ssh root@192.168.1.124 "pct snapshot 310 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" & ssh root@192.168.1.124 "pct snapshot 311 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" & wait ssh root@192.168.1.124 "pct listsnapshot 310 | tail -3; echo ---; pct listsnapshot 311 | tail -3" ``` - [ ] **Step 2:** Resize CT 311. ```bash ssh root@192.168.1.124 "pct set 311 -memory 8192 -cores 6" ssh root@192.168.1.124 "pct config 311 | grep -E 'cores|memory'" ``` - [ ] **Step 3:** Add GPU device-node passthrough lines (copy from CT 102). ```bash ssh root@192.168.1.124 " pct set 311 -dev0 /dev/nvidia0,gid=44 pct set 311 -dev1 /dev/nvidiactl,gid=44 pct set 311 -dev2 /dev/nvidia-uvm,gid=44 pct set 311 -dev3 /dev/nvidia-uvm-tools,gid=44 pct set 311 -dev4 /dev/nvidia-caps/nvidia-cap1,gid=44 pct set 311 -dev5 /dev/nvidia-caps/nvidia-cap2,gid=44 pct config 311 | grep dev " ``` - [ ] **Step 4:** Restart CT 311. ```bash ssh root@192.168.1.124 "pct reboot 311" sleep 10 ssh root@192.168.1.13 "ls -la /dev/nvidia* 2>&1 | head; nvidia-smi --query-gpu=name --format=csv,noheader 2>&1 || echo 'nvidia-smi missing (no driver in CT) — expected'" curl -s http://192.168.1.13:3000/health ``` Expected: device nodes visible, health returns alpha-3. - [ ] **Step 5: Commit (config-as-comment).** No code change yet — we record the infra step in the next phase's commit message. ### Task C2: Install ffmpeg + faster-whisper deps on CT 311 - [ ] **Step 1:** Install system deps: ```bash ssh root@192.168.1.13 " apt update apt install -y python3.12 python3.12-venv python3-pip \ ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils \ nvidia-utils-535 nvidia-cuda-toolkit 2>&1 | tail -5 python3.12 --version ffmpeg -version | head -1 tesseract --version 2>&1 | head -1 " ``` Note: `nvidia-cuda-toolkit` is for `ctranslate2`'s CUDA runtime; if you'd rather not pull the full toolkit (~1 GB), use the official `libcudnn8 libcublas12` runtime packages instead. - [ ] **Step 2:** Create the voidworkers user + dirs on CT 311: ```bash ssh root@192.168.1.13 " id voidworkers >/dev/null 2>&1 || useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers mkdir -p /opt/void-workers /var/lib/void/whisper-models chown voidworkers: /opt/void-workers chown -R voidworkers: /var/lib/void/whisper-models usermod -aG void voidworkers chmod -R g+rX /var/lib/void/blobs # ensure voidworkers (in void group) can read blobs install -m 644 /opt/void-workers/.env 2>/dev/null || true cat > /opt/void-workers/.env < 0 except Exception as e: log.info("ctranslate2_cuda_probe_failed", err=str(e)) return False def whisper_model(): global _whisper_model if _whisper_model is None: from faster_whisper import WhisperModel name = os.environ.get("WHISPER_MODEL", "small.en") cache = os.environ.get("WHISPER_CACHE", "/var/lib/void/whisper-models") device = "cuda" if cuda_available() else "cpu" compute_type = "float16" if device == "cuda" else "int8" log.info("whisper_loading", model=name, device=device, compute_type=compute_type, cache=cache) _whisper_model = WhisperModel(name, device=device, compute_type=compute_type, download_root=cache) return _whisper_model def whisper_transcribe(audio_path): segments, _info = whisper_model().transcribe(audio_path, vad_filter=True) return "\n".join(s.text.strip() for s in segments).strip() ``` - [ ] **Step 4:** Run green: ```bash pytest tests/test_model.py -v ``` Expected: 2 passed. - [ ] **Step 5: Commit.** ```bash git add workers/void_workers/model.py workers/tests/test_model.py git commit -m "feat(workers): whisper loader with CUDA detect + CPU fallback" ``` ### Task C4: `ingest.video` worker (yt-dlp + transcribe + create ref) **Files:** - Create: `workers/void_workers/handlers/video.py` - Modify: `workers/void_workers/handlers/__init__.py` - Create: `workers/tests/test_video.py` - Modify: `workers/void_workers/repo.py` — add `create_ref()` - [ ] **Step 1:** Failing test (mocked yt-dlp + mocked Whisper): ```python # workers/tests/test_video.py from unittest.mock import patch, MagicMock from void_workers.handlers.video import handle as handle_video def test_video_creates_ref_with_transcript_and_metadata(conn, monkeypatch): conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") import subprocess subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True) sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('plan4-vid', 'V') RETURNING id").fetchone()[0] info = { "title": "Sample video", "description": "a description", "duration": 90, "uploader": "Channel", "thumbnail": "https://i.ytimg.com/t.jpg" } with patch("void_workers.handlers.video._yt_dlp_info", return_value=info), \ patch("void_workers.handlers.video._yt_dlp_audio", return_value="/tmp/fake.opus"), \ patch("void_workers.handlers.video.whisper_transcribe", return_value="hello world transcript"), \ patch("os.unlink"): out = handle_video({"space_id": str(sp), "url": "https://youtu.be/abc"}) assert "ref_id" in out row = conn.execute("SELECT title, body_text, source_kind FROM refs WHERE id=%s", (out["ref_id"],)).fetchone() assert row[0] == "Sample video" assert "hello world" in row[1] assert row[2] == "youtube" ``` - [ ] **Step 2:** Run red. - [ ] **Step 3:** Implement `workers/void_workers/handlers/video.py`: ```python import hashlib import os import subprocess import tempfile import json from .. import repo from ..model import whisper_transcribe NAME = "ingest.video" def _yt_dlp_info(url): """Returns dict of metadata, or None if yt-dlp could not extract.""" try: out = subprocess.check_output( ["yt-dlp", "-J", "--no-warnings", "--no-playlist", url], timeout=60 ) return json.loads(out) except subprocess.CalledProcessError as e: return None def _yt_dlp_audio(url): """Downloads bestaudio to a temp .opus and returns the path.""" tmp_dir = tempfile.mkdtemp(prefix="void-yt-") out_template = os.path.join(tmp_dir, "audio.%(ext)s") subprocess.run( ["yt-dlp", "-x", "--audio-format", "opus", "-o", out_template, "--no-warnings", "--no-playlist", url], check=True, timeout=600 ) for f in os.listdir(tmp_dir): if f.startswith("audio."): return os.path.join(tmp_dir, f) raise RuntimeError("yt-dlp produced no audio file") def _idem(space_id, url): return hashlib.sha256((space_id + "\x00" + url).encode()).hexdigest() def _kind(url): return "youtube" if ("youtube.com" in url or "youtu.be" in url) else "video" def handle(job_data: dict) -> dict: space_id = job_data["space_id"] url = job_data["url"] idem = _idem(space_id, url) info = _yt_dlp_info(url) if info is None: return {"skipped": "yt-dlp"} audio_path = _yt_dlp_audio(url) try: transcript = whisper_transcribe(audio_path) finally: try: os.unlink(audio_path) except OSError: pass ref_id = repo.create_ref({ "space_id": space_id, "kind": "video", "source_url": url, "title": info.get("title") or url, "summary": (info.get("description") or "")[:5000], "body_text": transcript[:200_000], "source_kind": _kind(url), "external_id": idem, "metadata": { "duration_s": info.get("duration"), "uploader": info.get("uploader"), "thumbnail": info.get("thumbnail"), "extract": {"method": "whisper", "chars": len(transcript)}, } }) return {"ref_id": ref_id, "chars": len(transcript)} ``` - [ ] **Step 4:** Extend `workers/void_workers/repo.py` with `create_ref`: ```python def create_ref(input_): """INSERT INTO refs(...) RETURNING id; emit audit_log.""" fields = ["space_id","kind","source_url","title","summary","body_text", "blob_path","metadata","source_kind","external_id","captured_at"] cols, vals = [], [] for f in fields: if f in input_: cols.append(f); vals.append(input_[f]) placeholders = ",".join(f"%s" for _ in cols) sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id" with _conn() as conn: ref_id = conn.execute(sql, vals).fetchone()["id"] conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'ref', %s, 'create', %s) """, (ref_id, json.dumps({"kind": "create"}))) return str(ref_id) ``` (Make sure `json` import is at the top of `repo.py` if not already there. Also note `metadata` must be `json.dumps`'d before insertion because we're passing a dict; tweak the field handling to JSON-encode dict/list values.) Patch `create_ref` to JSON-encode jsonb fields: ```python def _maybe_json(v): return json.dumps(v) if isinstance(v, (dict, list)) else v def create_ref(input_): fields = ["space_id","kind","source_url","title","summary","body_text", "blob_path","metadata","source_kind","external_id","captured_at"] cols, vals = [], [] for f in fields: if f in input_: cols.append(f); vals.append(_maybe_json(input_[f])) placeholders = ",".join(f"%s" for _ in cols) sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id" with _conn() as conn: ref_id = conn.execute(sql, vals).fetchone()["id"] conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'ref', %s, 'create', %s) """, (ref_id, json.dumps({"kind": "create"}))) return str(ref_id) ``` - [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`: ```python from . import echo, pdf, image, video REGISTRY = { echo.NAME: echo.handle, pdf.NAME: pdf.handle, image.NAME: image.handle, video.NAME: video.handle, } ``` - [ ] **Step 6:** Run green: ```bash pytest tests/test_video.py -v ``` Expected: 1 passed. - [ ] **Step 7: Commit.** ```bash git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/video.py workers/void_workers/repo.py workers/tests/test_video.py git commit -m "feat(workers): ingest.video via yt-dlp + Whisper" ``` ### Task C5: Node — capture.js routes YouTube to `ingest.video` **Files:** - Modify: `lib/api/routes/capture.js` - Modify: `tests/api/capture.test.js` - [ ] **Step 1:** Failing test — append to `tests/api/capture.test.js`: ```js it('POST /api/capture with YouTube URL enqueues ingest.video, not ingest.url', async () => { const res = await request(app).post('/api/capture').set(ownerHeaders) .send({ space_id: sp.id, url: 'https://youtu.be/abc' }); expect(res.status).toBe(202); expect(res.body.job_id).toBeTruthy(); const { default: jobsRepo } = await import('../../lib/db/repos/jobs.js'); const rows = await jobsRepo.list({ name: 'ingest.video' }); expect(rows.find(r => r.id === res.body.job_id)).toBeTruthy(); const urlRows = await jobsRepo.list({ name: 'ingest.url' }); expect(urlRows.find(r => r.id === res.body.job_id)).toBeFalsy(); }); ``` - [ ] **Step 2:** Run red. - [ ] **Step 3:** Modify `lib/api/routes/capture.js` — replace the `POST /` handler's enqueue logic: ```js // helper near the top const VIDEO_HOST_RE = /(^|\.)(youtube\.com|youtu\.be|vimeo\.com)$/i; function isVideoUrl(url) { try { return VIDEO_HOST_RE.test(new URL(url).hostname); } catch { return false; } } // inside router.post('/'), after computing idem + the existing-ref check: const isVideo = isVideoUrl(url); const job_name = isVideo ? 'ingest.video' : 'ingest.url'; const job_id = await queue.enqueue(job_name, { space_id, url }); res.status(202).json({ job_id, idempotency_key: idem }); ``` - [ ] **Step 4:** Run green: ```bash npx vitest run tests/api/capture.test.js ``` Expected: existing + new tests pass. - [ ] **Step 5: Commit.** ```bash git add lib/api/routes/capture.js tests/api/capture.test.js git commit -m "feat(api): capture routes YouTube/Vimeo URLs to ingest.video" ``` ### Task C6: Phase C close — snapshot + memory - [ ] **Step 1:** Run full Node + Python test suites green. ```bash cd /project/src/void-v2 && npx vitest run cd workers && DATABASE_URL=... pytest -v ``` - [ ] **Step 2:** Snapshot CT 310 + 311 — `plan4_phase_c_`. - [ ] **Step 3:** Update memory: "Plan 4 Phase C complete: Whisper + yt-dlp live; CT 311 resized 6c/8G + GPU passthrough; voidworkers.service running on CT 311." --- ## Phase D — Source-doc sync + version bump + completion doc ### Task D1: Python `safe_fetch` port **Files:** - Create: `workers/void_workers/safe_fetch.py` - Create: `workers/tests/test_safe_fetch.py` - [ ] **Step 1:** Failing test: ```python # workers/tests/test_safe_fetch.py import pytest from void_workers.safe_fetch import safe_fetch, SafeFetchError def test_rejects_file_scheme(): with pytest.raises(SafeFetchError): safe_fetch("file:///etc/passwd") def test_rejects_loopback(): with pytest.raises(SafeFetchError): safe_fetch("http://127.0.0.1/x") def test_rejects_rfc1918(): with pytest.raises(SafeFetchError): safe_fetch("http://192.168.1.1/x") def test_rejects_metadata_endpoint(): with pytest.raises(SafeFetchError): safe_fetch("http://169.254.169.254/latest/") ``` - [ ] **Step 2:** Run red. - [ ] **Step 3:** Implement `workers/void_workers/safe_fetch.py`: ```python import socket import ipaddress import urllib.request import os from urllib.parse import urlparse BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [ "0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "169.254.0.0/16", "100.64.0.0/10", ]] class SafeFetchError(Exception): pass def _is_blocked(addr): if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true": return False try: ip = ipaddress.ip_address(addr) except ValueError: return True if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified: return True if isinstance(ip, ipaddress.IPv4Address): return any(ip in n for n in BLOCK_V4_NETS) # IPv6: ULA + link-local if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"): return True return False def _resolve(host): try: infos = socket.getaddrinfo(host, None) except socket.gaierror as e: raise SafeFetchError(f"no DNS for {host}: {e}") addrs = list({i[4][0] for i in infos}) for a in addrs: if _is_blocked(a): raise SafeFetchError(f"{host} resolves to blocked address {a}") if not addrs: raise SafeFetchError(f"no addresses for {host}") return addrs[0] def safe_fetch(url, *, headers=None, timeout=15, max_hops=5): current = url for hop in range(max_hops + 1): u = urlparse(current) if u.scheme not in ("http", "https"): raise SafeFetchError(f"unsupported scheme {u.scheme}") host = u.hostname # If host is a literal IP, validate; else resolve + validate try: ipaddress.ip_address(host) if _is_blocked(host): raise SafeFetchError(f"blocked literal IP {host}") except ValueError: _resolve(host) req = urllib.request.Request(current, headers=headers or {}) try: opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler) with opener.open(req, timeout=timeout) as r: return r.read() except urllib.error.HTTPError as e: if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops: current = e.headers["Location"] continue raise raise SafeFetchError(f"too many redirects ({max_hops})") ``` - [ ] **Step 4:** Run green. - [ ] **Step 5: Commit.** ```bash git add workers/void_workers/safe_fetch.py workers/tests/test_safe_fetch.py git commit -m "feat(workers): safe_fetch (Python port)" ``` ### Task D2: `sync.source_doc` worker **Files:** - Create: `workers/void_workers/handlers/sourcedoc.py` - Modify: `workers/void_workers/handlers/__init__.py` - Create: `workers/tests/test_sourcedoc.py` - Modify: `workers/void_workers/repo.py` — add `get_source_doc`, `update_source_doc` - [ ] **Step 1:** Failing test: ```python # workers/tests/test_sourcedoc.py from unittest.mock import patch from void_workers.handlers.sourcedoc import handle as handle_sd def test_sourcedoc_updates_body_text(conn): conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public") import subprocess subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True) sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('sd', 'SD') RETURNING id").fetchone()[0] res = conn.execute( "INSERT INTO resources(space_id, slug, name, runtime_type) " "VALUES(%s, 'r', 'R', 'lxc') RETURNING id", (sp,) ).fetchone()[0] sd = conn.execute( "INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) " "VALUES(%s, 'doc', 'https://example.com/r', 'url') RETURNING id", (res,) ).fetchone()[0] with patch("void_workers.handlers.sourcedoc.safe_fetch", return_value=b"hello world doc body"): out = handle_sd({"source_doc_id": str(sd)}) assert out.get("updated") is True row = conn.execute("SELECT body_text FROM source_docs WHERE id=%s", (sd,)).fetchone() assert "hello world" in (row[0] or "") ``` - [ ] **Step 2:** Run red. - [ ] **Step 3:** Extend `workers/void_workers/repo.py` with the two helpers: ```python def get_source_doc(source_doc_id): with _conn() as conn: return conn.execute( "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) ).fetchone() def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None): with _conn() as conn: before = conn.execute( "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,) ).fetchone() sets, args = [], [] if body_text is not None: sets.append("body_text=%s"); args.append(body_text) if last_synced is not None: sets.append("last_synced=%s"); args.append(last_synced) if metadata_patch is not None: sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb") args.append(json.dumps(metadata_patch)) if not sets: return before sets.append("updated_at=now()") args.append(source_doc_id) after = conn.execute( f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args ).fetchone() conn.execute(""" INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff) VALUES('worker', NULL, 'source_doc', %s, 'update', %s) """, (source_doc_id, json.dumps({"kind":"update","changes":{"body_text":"updated" if body_text is not None else None}}))) return after ``` - [ ] **Step 4:** Implement `workers/void_workers/handlers/sourcedoc.py`: ```python import hashlib from datetime import datetime, timezone from .. import repo from ..safe_fetch import safe_fetch NAME = "sync.source_doc" def _sha(data): return hashlib.sha256(data).hexdigest() def handle(job_data: dict) -> dict: sd_id = job_data["source_doc_id"] doc = repo.get_source_doc(sd_id) if not doc: return {"skipped": "gone"} body = safe_fetch(doc["upstream_url"], headers={"User-Agent": "void-ingest/2.0"}) new_sha = _sha(body) old_sha = ((doc.get("metadata") or {}).get("body_sha")) now = datetime.now(timezone.utc) if new_sha == old_sha: repo.update_source_doc(sd_id, last_synced=now) return {"unchanged": True} text = body.decode("utf-8", errors="replace")[:1_000_000] repo.update_source_doc( sd_id, body_text=text, last_synced=now, metadata_patch={"body_sha": new_sha} ) return {"updated": True, "chars": len(text)} ``` - [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`: ```python from . import echo, pdf, image, video, sourcedoc REGISTRY = { echo.NAME: echo.handle, pdf.NAME: pdf.handle, image.NAME: image.handle, video.NAME: video.handle, sourcedoc.NAME: sourcedoc.handle, } ``` - [ ] **Step 6:** Run green. - [ ] **Step 7: Commit.** ```bash git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/sourcedoc.py workers/void_workers/repo.py workers/tests/test_sourcedoc.py git commit -m "feat(workers): sync.source_doc with sha256 diff" ``` ### Task D3: Node-side cron — `lib/cron/sync_source_docs.js` **Files:** - Modify: `package.json` — add `node-cron`. - Create: `lib/cron/index.js` - Create: `lib/cron/sync_source_docs.js` - Modify: `server.js` — start the cron in the CLI gate. - Create: `tests/cron/sync_source_docs.test.js` - [ ] **Step 1:** Add dep: ```bash cd /project/src/void-v2 && npm i node-cron@^3 ``` - [ ] **Step 2:** Failing test `tests/cron/sync_source_docs.test.js`: ```js import { describe, it, expect, beforeEach, afterEach } from 'vitest'; import { resetDb } from '../helpers/db.js'; import { migrateUp } from '../../lib/db/migrate.js'; import { stopBoss } from '../helpers/boss.js'; import { pool } from '../../lib/db/pool.js'; import * as queue from '../../lib/jobs/queue.js'; import { registerWorkers } from '../../lib/jobs/index.js'; import { runSync } from '../../lib/cron/sync_source_docs.js'; import * as jobs from '../../lib/db/repos/jobs.js'; beforeEach(async () => { await resetDb(); await migrateUp(); await queue.start(); await registerWorkers(); }); afterEach(async () => { await stopBoss(); }); describe('cron/sync_source_docs.runSync', () => { it('enqueues sync.source_doc for each url-synced row', async () => { const sp = (await pool.query( `INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id` )).rows[0].id; const res = (await pool.query( `INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`, [sp] )).rows[0].id; await pool.query( `INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`, [res] ); const enqueued = await runSync(); expect(enqueued).toBe(1); const queued = await jobs.list({ name: 'sync.source_doc' }); expect(queued.length).toBe(1); }); }); ``` - [ ] **Step 3:** Run red. - [ ] **Step 4:** Implement `lib/cron/sync_source_docs.js`: ```js import { pool } from '../db/pool.js'; import * as queue from '../jobs/queue.js'; import { log } from '../log.js'; export async function runSync() { const { rows } = await pool.query( `SELECT id FROM source_docs WHERE sync_source = 'url'` ); for (const r of rows) { await queue.enqueue('sync.source_doc', { source_doc_id: r.id }); } return rows.length; } ``` - [ ] **Step 5:** Implement `lib/cron/index.js`: ```js import cron from 'node-cron'; import { runSync } from './sync_source_docs.js'; import { log } from '../log.js'; export function startCron() { // Daily at 03:00 local time cron.schedule('0 3 * * *', async () => { try { const n = await runSync(); log.info({ enqueued: n }, 'cron sync.source_doc complete'); } catch (e) { log.error({ err: e }, 'cron sync.source_doc failed'); } }); log.info('cron started'); } ``` - [ ] **Step 6:** Wire into `server.js` — extend the CLI gate: ```js // additions import { startCron } from './lib/cron/index.js'; // Inside the `if (import.meta.url === ...)` block, after registerWorkers: startCron(); ``` - [ ] **Step 7:** Run green: ```bash npx vitest run tests/cron/sync_source_docs.test.js ``` - [ ] **Step 8: Commit.** ```bash git add package.json package-lock.json lib/cron/index.js lib/cron/sync_source_docs.js server.js tests/cron/sync_source_docs.test.js git commit -m "feat(cron): daily sync.source_doc enqueue" ``` ### Task D4: Version bump + CHANGELOG + completion doc **Files:** - Modify: `package.json` → `2.0.0-alpha.4` - Modify: `server.js` → `VERSION = '2.0.0-alpha.4'` - Modify: `tests/server.test.js` → assert `2.0.0-alpha.4` - Modify: `CHANGELOG.md` — append `[2.0.0-alpha.4]` entry - Create: `docs/plan-4-complete.md` - [ ] **Step 1:** Bump version literals (three files). Use Edit. - [ ] **Step 2:** Append CHANGELOG entry: ```markdown ## [2.0.0-alpha.4] — 2026-06-NN ### Added (Plan 4: Python void-workers) - `void-workers.service` — Python 3.12 service alongside `void-server` on CT 311. psycopg-based pg-boss client matches Node's claim/finish semantics via `SELECT ... FOR UPDATE SKIP LOCKED`. - **`extract.pdf`** — pdftotext first; per-page Tesseract OCR fallback when extraction yields < 200 chars. - **`extract.image`** — Tesseract OCR with English data. - **`ingest.video`** — yt-dlp metadata + audio extraction + faster-whisper transcription. CUDA at startup, CPU fallback on HA failover to Z3. YouTube / youtu.be / Vimeo URLs route through `/api/capture` to `ingest.video` instead of `ingest.url`. - **`sync.source_doc`** — fetch upstream + sha256 diff + update `body_text`. Node cron enqueues daily at 03:00. - **Node `blob.js`** fans out to `extract.pdf` / `extract.image` after creating PDF / image refs. - **CT 311 infrastructure**: resized to 6 cores / 8 GB RAM, GPU device nodes passed through (shared with CT 102's Ollama). ``` - [ ] **Step 3:** Create `docs/plan-4-complete.md` (use the Plan 3 doc as a template — sections: Date / Version / Tests / Commits / Snapshots / Scope per phase / Security findings handled / Open items / What's left after Plan 4). - [ ] **Step 4:** Run full test suites green (Node + Python). ```bash cd /project/src/void-v2 && npx vitest run cd workers && DATABASE_URL=... pytest -v ``` - [ ] **Step 5: Commit.** ```bash git add package.json server.js tests/server.test.js CHANGELOG.md docs/plan-4-complete.md git commit -m "chore: version 2.0.0-alpha.4 + changelog + plan-4 completion doc" ``` ### Task D5: Plan 4 close — snapshot + memory - [ ] **Step 1:** Snapshot CT 310 + 311 — `plan4_complete_`. - [ ] **Step 2:** Update memory `project_void_v2_execution.md` — mark Plan 4 complete; note alpha-4 source-built but not yet deployed (deploy is a separate user-OK decision per the standing rule). --- ## Spec coverage check Every Plan 4 spec section maps to at least one task: | Spec § | Task(s) | |---|---| | Phase A harness | A1, A2, A3, A4 | | Phase B PDF + image | B1, B2, B3, B4, B5 | | Phase C Whisper + yt-dlp + GPU | C1, C2, C3, C4, C5 | | Phase D source-doc sync + version | D1, D2, D3, D4 | | `safe_fetch` Python port | D1 | | Concurrency env config | A1 (`config.py`) — applies across handlers | | systemd `MemoryMax=6G` | A4 (`void-workers.service`) | | CT 311 resize + GPU passthrough | C1 | | YouTube routing in capture.js | C5 | | `blob.js` fans out | B5 | | Repo audit shim (`actor_kind='worker'`) | B2 (`repo.update_ref`), C4 (`repo.create_ref`), D2 (`repo.update_source_doc`) | ## Type & name consistency - Worker name strings: `echo`, `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc` — exact same strings used in `NAME` constants, registry keys, Node enqueue calls, and tests. - Handler signature: `def handle(job_data: dict) -> dict` everywhere. - Job payload shapes: - `extract.pdf` / `extract.image`: `{ ref_id, blob_path }` (both producers Node-side use these keys; consumer reads same keys). - `ingest.video`: `{ space_id, url }`. - `sync.source_doc`: `{ source_doc_id }` (Node cron and Python worker both use this exact key). - `repo.update_ref` / `repo.create_ref` / `repo.update_source_doc` / `repo.get_source_doc` — same names across all tasks that touch them.