From 7514d9bee6b996f5cd28df7ece864bef4d70160c Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 04:33:48 +1000 Subject: [PATCH] docs: Plan 4 design spec (Python void-workers) Co-Authored-By: Claude Opus 4.7 --- .../specs/2026-06-01-void-v2-plan4-workers.md | 384 ++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md diff --git a/docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md b/docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md new file mode 100644 index 0000000..60e8be2 --- /dev/null +++ b/docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md @@ -0,0 +1,384 @@ +# Void 2.0 — Plan 4 Design Spec: Python void-workers (heavy ML ingest) + +**Date:** 2026-06-01 +**Builds on:** Plan 1 (Foundation, complete), Plan 2 (API + UI shell, complete, alpha-2 deployed), Plan 3 (capture pipeline + hybrid search, complete, alpha-3 deployed live on CT 311). +**Master spec:** `docs/superpowers/specs/2026-05-31-void-v2-design.md`. + +## Goal + +Stand up the Python `void-workers` service that handles ML-heavy ingest jobs the Node service intentionally defers: PDF text extraction (born-digital + scanned), image OCR, YouTube/audio transcription via Whisper, and upstream source-doc sync. Workers claim jobs from the same pg-boss queue Node uses, so the existing Plan 3 capture surface and Jobs UI work unchanged. + +## Scope + +Plan 4 covers four worker kinds and the harness around them, structured as four phases each ending green and demonstrable: +- **Phase A** — psycopg-based pg-boss client + worker harness + systemd unit + fixture worker. +- **Phase B** — `extract.pdf` (pdftotext → Tesseract fallback) + `extract.image` (Tesseract OCR). CPU-only. +- **Phase C** — `ingest.video` (yt-dlp + ffmpeg + faster-whisper). GPU-when-available, CPU fallback. +- **Phase D** — `sync.source_doc` (poll + sha256-diff upstream URLs into source_docs.body_text) + version bump to `2.0.0-alpha.4` + CHANGELOG + completion doc. + +## Out of scope (Plan 5+) + +- MCP server surface (Plan 5). +- Companion chat in the right rail (Plan 5). +- Sacred Valley widgets ported from Void 1.x (Plan 6). +- Inbound email capture, Slack/Discord adapters, RSS firehose — different ingest shapes. +- Re-transcription with a different Whisper model from the SPA — operator-only via direct enqueue for now. +- Embedding chunks table — still whole-doc per Plan 3's decision. + +## Decisions locked by brainstorm + +| Question | Answer | +|---|---| +| Plan 4 slice | Full slice: PDFs + images + Whisper + yt-dlp + source-doc sync. | +| Host location | **Same CT 311** as Node, second systemd unit. No second LXC. | +| GPU/HA tension | Single CT 311 with GPU device-nodes passed through; faster-whisper detects CUDA at startup and falls back to CPU on HA failover to Z3 (no GPU). | +| Job-claim mechanism | **Native SQL via psycopg** against pg-boss tables. `SELECT … FOR UPDATE SKIP LOCKED LIMIT 1`. ~200 lines. No third-party pg-boss port. | +| Video ingest output | Transcript + yt-dlp metadata (title, description, duration, thumbnail) → `refs.body_text` + `refs.metadata`. Audio temp file deleted after transcription. | +| Whisper model | `small.en` by default. Env-configurable via `WHISPER_MODEL` (`tiny.en` / `base.en` / `small.en` / `medium.en` / `large-v3`). | +| PDF extract strategy | `pdftotext` first; if extracted text is empty or < 200 chars total, fall back to per-page Tesseract OCR. | +| Source-doc sync trigger | Cron in Node (`/lib/cron`), not a self-polling Python loop. Cron enqueues `sync.source_doc` jobs every 24 h for rows with `sync_source='url'`. | +| Sequencing | Phase A → B → C → D. Phase boundary = green tests + commit + memory checkpoint. | + +## Architecture + +``` + ┌──────────────────────────────────────────────┐ + │ CT 311 (192.168.1.13) — Z host │ + │ │ + /api/* ────▶ │ void-server.service │ + (Plan 3) │ (Node + pg-boss + Jobs UI) │ + │ │ │ + │ │ enqueue │ + │ ▼ │ + │ ┌──────────────────────────────┐ │ + │ │ Postgres (CT 310 @ .215) │ │ + │ │ pgboss.job / archive │ │ + │ └──────────────┬───────────────┘ │ + │ │ claim (FOR UPDATE SKIP │ + │ │ LOCKED) │ + │ ▼ │ + │ void-workers.service │ + │ (Python venv at /opt/void-workers/venv) │ + │ ├─ pdf.py (pdftotext + Tesseract) │ + │ ├─ image.py (Tesseract) │ + │ ├─ video.py (yt-dlp + faster-whisper) │ + │ └─ sourcedoc.py (fetch + sha256 diff) │ + │ │ + │ /var/lib/void/blobs/ (shared with Node) │ + └──────────────────────────────────────────────┘ + │ + ┌─────────────┴─────────────┐ + ▼ ▼ + Ollama on CT 102 Karakeep on CT 100 + (already wired) (already wired) +``` + +Both Node and Python processes run inside CT 311. They share: +- The blob store at `/var/lib/void/blobs/` (same disk, no NFS). +- The Postgres database via `DATABASE_URL`. +- The pg-boss schema. Python writes the same rows pg-boss writes. + +GPU device nodes (`/dev/nvidia0` etc.) are bind-mounted into CT 311 by the Proxmox host. The CT also gets a memory + core bump (4 GB / 4 cores → 8 GB / 6 cores). + +systemd memory limits (`MemoryMax=6G` on `void-workers.service`, `MemoryMax=1G` on `void-server.service`) keep one process from OOM-killing the other. + +## Process model + +Each worker runs as one OS process inside the same `void-workers.service`. Concurrency per kind is set at startup; the service supervisor spawns N worker greenlets/threads using `multiprocessing.dummy.Pool` (so we share the psycopg pool and Python imports): + +| Worker | Default concurrency | Reason | +|---|---|---| +| `extract.pdf` | 2 | Tesseract is CPU-bound on a 6-core CT, leave room for Node + image | +| `extract.image` | 2 | Same | +| `ingest.video` | 1 | Whisper is GPU-bound (or memory-bound on CPU); serialize | +| `sync.source_doc` | 1 | Don't hammer upstream hosts | + +Override via env `VOID_CONCURRENCY_` (e.g., `VOID_CONCURRENCY_EXTRACT_PDF=4`). + +## Job-claim contract (psycopg + pg-boss SQL) + +A Python worker loop, per queue name: + +```python +async def claim(queue): + async with pool.connection() as conn: + async with conn.transaction(): + row = await conn.execute(""" + SELECT id, data + FROM pgboss.job + WHERE name=%s + AND state IN ('created','retry') + AND start_after <= now() + ORDER BY priority DESC, created_on ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + """, (queue,)).fetchone() + if not row: return None + await conn.execute( + "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s", + (row.id,)) + return row +``` + +After the handler returns: + +- **Success:** `UPDATE pgboss.job SET state='completed', completed_on=now(), output=%s WHERE id=%s` with the handler's return dict. +- **Failure:** `UPDATE pgboss.job SET state='failed', output=%s WHERE id=%s` (truncated stack as the output payload). pg-boss's archive task moves failed rows out on its schedule. +- **Retry:** pg-boss's own retry semantics rely on `retry_count` + `retry_limit` + `retry_backoff`. We honor them: if `retry_count + 1 <= retry_limit`, set `state='retry'` and `start_after = now() + interval` (exponential per the existing retry_delay config). + +This intentionally matches what Node's pg-boss does so a job processed by either side looks identical to the Jobs UI. + +## Phase A — Python harness + systemd unit + +**New files (in the void-v2 repo at `workers/`):** + +``` +workers/ + pyproject.toml # pin Python 3.12, list deps + void_workers/ + __init__.py + boss.py # psycopg pool + claim/finish helpers + runner.py # main entrypoint; loads handlers, spawns loops + config.py # env-driven config (concurrency, model names, paths) + handlers/ + __init__.py + echo.py # trivial — same shape as Node's echo + tests/ + conftest.py # shared fixtures (DB url from env, fresh boss schema) + test_boss.py # claim → finish roundtrip + test_echo.py # enqueue from Node-side fixture, run echo handler +``` + +**Deploy artefacts:** + +- `deploy/void-workers.service` — systemd unit running `/opt/void-workers/venv/bin/python -m void_workers.runner`. EnvironmentFile = `/opt/void-workers/.env`. MemoryMax = 6G. Restart=on-failure. +- `deploy/push-workers.sh` — sibling to `push.sh`. Rsyncs `workers/` to `/opt/void-workers/`, installs venv if missing, `pip install -e .`, restarts the unit. + +**One-time setup on CT 311** (when Phase A first deploys): +- `apt install -y python3.12 python3.12-venv python3-pip ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils`. +- `useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers` (separate user from `void` so each systemd unit's filesystem ownership is distinct). +- `/opt/void-workers/.env` — `DATABASE_URL`, `WHISPER_MODEL=small.en`, `BLOB_ROOT=/var/lib/void/blobs`, optional `VOID_CONCURRENCY_*`. Grant `voidworkers` read access to `/var/lib/void/blobs` (add to `void` group, dataset mode 750). + +**Testing:** vitest can't reach Python; Python's pytest can't reach Node. We test the boundary by: + +1. **Python side:** pytest spawns a real Postgres connection (the same shared dev DB), creates the `pgboss` schema directly via `pg-boss` lib loaded transiently (or by inserting rows by hand matching pg-boss's columns), runs handlers, asserts state transitions. +2. **Node side:** an existing vitest test that enqueues an `echo` job and asserts a *handler in Python* completes it is impractical without a Python runtime in CI. We instead trust that both ends use the same SQL. + +Phase A commits land as `feat(workers): Python harness + boss claim/finish` and `feat(workers): echo handler + runner loop` and `chore(deploy): void-workers.service + push-workers.sh`. + +## Phase B — `extract.pdf` and `extract.image` + +**Trigger.** Node's `lib/jobs/workers/blob.js` (Plan 3) inspects `kind` after creating the ref: +- `kind === 'pdf'` → enqueue `extract.pdf` with `{ ref_id, blob_path }`. +- `kind === 'image'` → enqueue `extract.image` with same shape. + +That's a tiny Node-side change in the existing blob worker. Plan 3's whole-doc embed trigger still fires on `refs.update` later when Python writes `body_text` back, so the OCR'd text becomes searchable automatically. + +**`workers/void_workers/handlers/pdf.py`:** + +```python +def handle(job): + ref_id = job.data["ref_id"] + blob_path = job.data["blob_path"] + text = subprocess.check_output( + ["pdftotext", "-layout", blob_path, "-"], timeout=120 + ).decode("utf-8", errors="replace") + if len(text.strip()) < 200: + # Born-digital extraction yielded nothing → OCR each page. + text = ocr_pdf_pages(blob_path) + body_text = text[:200_000] + repo.update_ref(ref_id, body_text=body_text, + metadata_patch={"extract": {"method": ..., "chars": len(text)}}) + return {"ref_id": ref_id, "chars": len(body_text)} +``` + +OCR helper uses `pdftoppm` to rasterize each page → Pillow → `pytesseract.image_to_string`. + +**`workers/void_workers/handlers/image.py`:** + +```python +def handle(job): + ref_id = job.data["ref_id"] + blob_path = job.data["blob_path"] + text = pytesseract.image_to_string(Image.open(blob_path)).strip() + repo.update_ref(ref_id, body_text=text[:200_000]) + return {"ref_id": ref_id, "chars": len(text)} +``` + +**`workers/void_workers/repo.py`** — thin psycopg shim. `update_ref(id, body_text, metadata_patch)` performs the same `recordAudit` write Node's `repos/refs.js` would (so the audit log captures the worker's edit with `actor_kind='worker'`). + +**Tests:** + +- `tests/test_pdf.py` — vendored `tests/fixtures/born_digital.pdf` + `tests/fixtures/scanned.pdf`; assert body_text length thresholds. +- `tests/test_image.py` — small PNG with a known string baked in. + +## Phase C — `ingest.video` + +**Trigger.** The dispatch happens in `lib/api/routes/capture.js`, not the URL worker. After URL validation, detect a video host (`youtube.com / youtu.be / vimeo.com`) and enqueue `ingest.video` directly instead of `ingest.url`. Idempotency key + response shape stay the same. This way the URL worker is never invoked for video URLs (no half-written ref), and the video worker is the sole creator of the `refs` row for video captures. + +**`workers/void_workers/handlers/video.py`:** + +```python +def handle(job): + url = job.data["url"] + space_id = job.data["space_id"] + + # 1) metadata + info = yt_dlp_extract_info(url) # title, description, duration, thumbnail, uploader + if info is None: # 403/404/age-gated/etc. + return {"skipped": "yt-dlp"} + + # 2) audio + audio_path = yt_dlp_download_audio(url, fmt="bestaudio[ext=opus]/bestaudio") + try: + # 3) transcribe + transcript = whisper_transcribe(audio_path) # see model.py + finally: + os.unlink(audio_path) + + # 4) create ref + ref_id = repo.create_ref({ + "space_id": space_id, + "kind": "video", + "source_url": url, + "title": info["title"], + "summary": (info.get("description") or "")[:5000], + "body_text": transcript[:200_000], + "source_kind": "youtube" if "youtube" in url else "video", + "external_id": sha256(space_id + url), + "metadata": { + "duration_s": info.get("duration"), + "uploader": info.get("uploader"), + "thumbnail": info.get("thumbnail"), + }, + }) + return {"ref_id": ref_id, "chars": len(transcript)} +``` + +**`workers/void_workers/model.py`:** + +```python +_whisper_model = None +def whisper_model(): + global _whisper_model + if _whisper_model is None: + from faster_whisper import WhisperModel + device = "cuda" if cuda_available() else "cpu" + compute_type = "float16" if device == "cuda" else "int8" + name = os.environ.get("WHISPER_MODEL", "small.en") + _whisper_model = WhisperModel(name, device=device, compute_type=compute_type) + return _whisper_model + +def whisper_transcribe(path): + segments, _info = whisper_model().transcribe(path, vad_filter=True) + return "\n".join(s.text.strip() for s in segments).strip() + +def cuda_available(): + try: + import ctranslate2 + return ctranslate2.get_cuda_device_count() > 0 + except Exception: + return False +``` + +The model loads once on first job and stays warm. On HA failover to Z3 (no GPU), `cuda_available()` returns False; the next process restart reloads with CPU + int8. Job durations multiply by ~10x but jobs still complete. + +**Tests:** integration test gated on `WHISPER_TEST_AUDIO=path/to/clip.opus`; pure-unit tests mock `faster_whisper.WhisperModel`. + +## Phase D — `sync.source_doc` + version bump + +**Cron in Node.** Add `lib/cron/sync_source_docs.js` (Node) that runs every 24 h via a small node-cron schedule (`node-cron` dep) and enqueues `sync.source_doc` for every `source_docs` row with `sync_source='url'`. + +**`workers/void_workers/handlers/sourcedoc.py`:** + +```python +def handle(job): + doc_id = job.data["source_doc_id"] + doc = repo.get_source_doc(doc_id) + res = safe_fetch(doc.upstream_url, headers={...}, timeout=15) + if not res.ok: + return {"skipped": f"status={res.status}"} + body = res.text() + new_sha = sha256(body) + old_sha = (doc.metadata or {}).get("body_sha") + if new_sha == old_sha: + repo.update_source_doc(doc_id, last_synced=now()) + return {"unchanged": True} + repo.update_source_doc(doc_id, body_text=body[:1_000_000], + last_synced=now(), + metadata_patch={"body_sha": new_sha}) + return {"updated": True, "chars": len(body)} +``` + +`safe_fetch` here is a Python port of `lib/ingest/safe_fetch.js` — same blocklist, same scheme check, same redirect handling. Different language, same contract. + +**Version bump.** `package.json` + `server.js` → `2.0.0-alpha.4`. `tests/server.test.js` assertion bumped. `CHANGELOG.md` appends Plan 4 entry. + +**Completion doc.** `docs/plan-4-complete.md` mirrors the Plan 3 doc's shape. + +## Error handling + +- **Per-worker timeout** — every handler runs under a per-kind `signal.alarm` (or async timeout). Default: `pdf` 120 s, `image` 60 s, `video` 30 min, `sourcedoc` 30 s. +- **Retry semantics** mirror pg-boss's. Permanent errors (e.g., 403 from upstream) return a `{skipped: ...}` output and mark the job completed (no retry). Transient errors throw → state=retry → exponential backoff. +- **Memory budget.** Whisper `medium.en` on CPU peaks around 3 GB resident; `small.en` around 1 GB. systemd `MemoryMax=6G` on `void-workers.service` keeps headroom. If Whisper blows the limit, the unit restarts, the in-progress job retries. +- **Concurrent boss writes.** Both Node and Python may write `pgboss.job` rows for different queues. Their access never conflicts because each worker only mutates rows it claimed via `FOR UPDATE SKIP LOCKED`. + +## Observability + +- **Structured logs** via Python's `structlog`. JSON output to stderr — systemd routes to journald. +- **Log fields** match Node: `job_id`, `job_name`, `entity_type`, `entity_id`, `outcome`, `duration_ms`. +- **Jobs UI** (Plan 3) shows Python-side jobs without changes — both worker classes write to the same `pgboss.job` rows. +- **Per-handler counters** exposed at `GET /metrics` from a small Python aiohttp endpoint on `localhost:9090` — text/plain Prometheus format, ungated (LAN-only). Optional; ship in Phase D. + +## Testing strategy + +- **Unit:** pure Python tests with mocked subprocess + mocked Whisper. +- **Repo:** psycopg roundtrips against the same test DB Node uses. The shared `resetDb` test helper drops the `pgboss` schema between suites; Python tests rely on the same shape. +- **Integration:** gated on env (`WHISPER_TEST_AUDIO`, `INGEST_LIVE_PDF=path/...`). Skip if not present. +- **Cross-language:** one end-to-end test fixture lives in `tests/integration/python-claims-node-enqueued.py` — manually enqueue a Python-handled job from the Node fixture, run the Python worker once, assert side effects in the DB. Run on-demand, not in vitest. + +## Deploy delta + +`.env` on CT 311 already has the right base. **New env vars** that need adding before Phase C: + +``` +WHISPER_MODEL=small.en +WHISPER_CACHE=/var/lib/void/whisper-models +VOID_CONCURRENCY_EXTRACT_PDF=2 +VOID_CONCURRENCY_EXTRACT_IMAGE=2 +VOID_CONCURRENCY_INGEST_VIDEO=1 +VOID_CONCURRENCY_SYNC_SOURCE_DOC=1 +``` + +`deploy/README.md` extends with the void-workers bootstrap section (Python deps, voidworkers user, model cache dir). + +**Infrastructure changes (Phase C only — pre-authorized 2026-06-01):** +- `pct set 311 -memory 8192 -cores 6` (resize from 4 GB / 4 cores). +- Add device-node lines to CT 311's pct config (copy the six `dev0..dev5` lines from CT 102): + ``` + dev0: /dev/nvidia0,gid=44 + dev1: /dev/nvidiactl,gid=44 + dev2: /dev/nvidia-uvm,gid=44 + dev3: /dev/nvidia-uvm-tools,gid=44 + dev4: /dev/nvidia-caps/nvidia-cap1,gid=44 + dev5: /dev/nvidia-caps/nvidia-cap2,gid=44 + ``` +- One CT restart (~30 s downtime on void-server). +- Snapshot CT 311 before this change (standing rule). + +Phases A + B do not need any infra change — pure code + service install. + +## Known follow-ups (not Plan 4) + +- Larger Whisper models (medium / large) with proper GPU memory accounting. +- Per-language Tesseract data (currently English only). +- Speaker diarization in transcripts. +- "Re-transcribe" UI action. +- Move the blob store off CT 311's rootfs to a dedicated ZFS dataset (current /var/lib/void is on rootfs). +- pg-boss archive rotation policy tuning once we have transcript-sized rows in the queue. + +## Open items for the user + +- **alpha-4 deploy.** Standing rule as alpha-3 — won't deploy without your explicit OK; alpha-3 stays live until then. (The Phase C CT 311 resize + GPU passthrough is pre-authorized but is a separate decision from "promote alpha-4 to alpha-3's slot in prod".) +- **`WHISPER_MODEL`** default is `small.en`. You can bump it to `medium.en` once GPU is wired and you decide quality vs speed. +- **`yt-dlp`** can require cookies for age-gated content. Plan 4 ships without cookies; we can add an opt-in `YT_DLP_COOKIES_FILE` env later.