Files
Void-Homelab/docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md
2026-06-01 04:33:48 +10:00

385 lines
20 KiB
Markdown

# Void 2.0 — Plan 4 Design Spec: Python void-workers (heavy ML ingest)
**Date:** 2026-06-01
**Builds on:** Plan 1 (Foundation, complete), Plan 2 (API + UI shell, complete, alpha-2 deployed), Plan 3 (capture pipeline + hybrid search, complete, alpha-3 deployed live on CT 311).
**Master spec:** `docs/superpowers/specs/2026-05-31-void-v2-design.md`.
## Goal
Stand up the Python `void-workers` service that handles ML-heavy ingest jobs the Node service intentionally defers: PDF text extraction (born-digital + scanned), image OCR, YouTube/audio transcription via Whisper, and upstream source-doc sync. Workers claim jobs from the same pg-boss queue Node uses, so the existing Plan 3 capture surface and Jobs UI work unchanged.
## Scope
Plan 4 covers four worker kinds and the harness around them, structured as four phases each ending green and demonstrable:
- **Phase A** — psycopg-based pg-boss client + worker harness + systemd unit + fixture worker.
- **Phase B** — `extract.pdf` (pdftotext → Tesseract fallback) + `extract.image` (Tesseract OCR). CPU-only.
- **Phase C** — `ingest.video` (yt-dlp + ffmpeg + faster-whisper). GPU-when-available, CPU fallback.
- **Phase D** — `sync.source_doc` (poll + sha256-diff upstream URLs into source_docs.body_text) + version bump to `2.0.0-alpha.4` + CHANGELOG + completion doc.
## Out of scope (Plan 5+)
- MCP server surface (Plan 5).
- Companion chat in the right rail (Plan 5).
- Sacred Valley widgets ported from Void 1.x (Plan 6).
- Inbound email capture, Slack/Discord adapters, RSS firehose — different ingest shapes.
- Re-transcription with a different Whisper model from the SPA — operator-only via direct enqueue for now.
- Embedding chunks table — still whole-doc per Plan 3's decision.
## Decisions locked by brainstorm
| Question | Answer |
|---|---|
| Plan 4 slice | Full slice: PDFs + images + Whisper + yt-dlp + source-doc sync. |
| Host location | **Same CT 311** as Node, second systemd unit. No second LXC. |
| GPU/HA tension | Single CT 311 with GPU device-nodes passed through; faster-whisper detects CUDA at startup and falls back to CPU on HA failover to Z3 (no GPU). |
| Job-claim mechanism | **Native SQL via psycopg** against pg-boss tables. `SELECT … FOR UPDATE SKIP LOCKED LIMIT 1`. ~200 lines. No third-party pg-boss port. |
| Video ingest output | Transcript + yt-dlp metadata (title, description, duration, thumbnail) → `refs.body_text` + `refs.metadata`. Audio temp file deleted after transcription. |
| Whisper model | `small.en` by default. Env-configurable via `WHISPER_MODEL` (`tiny.en` / `base.en` / `small.en` / `medium.en` / `large-v3`). |
| PDF extract strategy | `pdftotext` first; if extracted text is empty or < 200 chars total, fall back to per-page Tesseract OCR. |
| Source-doc sync trigger | Cron in Node (`/lib/cron`), not a self-polling Python loop. Cron enqueues `sync.source_doc` jobs every 24 h for rows with `sync_source='url'`. |
| Sequencing | Phase A → B → C → D. Phase boundary = green tests + commit + memory checkpoint. |
## Architecture
```
┌──────────────────────────────────────────────┐
│ CT 311 (192.168.1.13) — Z host │
│ │
/api/* ────▶ │ void-server.service │
(Plan 3) │ (Node + pg-boss + Jobs UI) │
│ │ │
│ │ enqueue │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Postgres (CT 310 @ .215) │ │
│ │ pgboss.job / archive │ │
│ └──────────────┬───────────────┘ │
│ │ claim (FOR UPDATE SKIP │
│ │ LOCKED) │
│ ▼ │
│ void-workers.service │
│ (Python venv at /opt/void-workers/venv) │
│ ├─ pdf.py (pdftotext + Tesseract) │
│ ├─ image.py (Tesseract) │
│ ├─ video.py (yt-dlp + faster-whisper) │
│ └─ sourcedoc.py (fetch + sha256 diff) │
│ │
│ /var/lib/void/blobs/ (shared with Node) │
└──────────────────────────────────────────────┘
┌─────────────┴─────────────┐
▼ ▼
Ollama on CT 102 Karakeep on CT 100
(already wired) (already wired)
```
Both Node and Python processes run inside CT 311. They share:
- The blob store at `/var/lib/void/blobs/` (same disk, no NFS).
- The Postgres database via `DATABASE_URL`.
- The pg-boss schema. Python writes the same rows pg-boss writes.
GPU device nodes (`/dev/nvidia0` etc.) are bind-mounted into CT 311 by the Proxmox host. The CT also gets a memory + core bump (4 GB / 4 cores → 8 GB / 6 cores).
systemd memory limits (`MemoryMax=6G` on `void-workers.service`, `MemoryMax=1G` on `void-server.service`) keep one process from OOM-killing the other.
## Process model
Each worker runs as one OS process inside the same `void-workers.service`. Concurrency per kind is set at startup; the service supervisor spawns N worker greenlets/threads using `multiprocessing.dummy.Pool` (so we share the psycopg pool and Python imports):
| Worker | Default concurrency | Reason |
|---|---|---|
| `extract.pdf` | 2 | Tesseract is CPU-bound on a 6-core CT, leave room for Node + image |
| `extract.image` | 2 | Same |
| `ingest.video` | 1 | Whisper is GPU-bound (or memory-bound on CPU); serialize |
| `sync.source_doc` | 1 | Don't hammer upstream hosts |
Override via env `VOID_CONCURRENCY_<NAME>` (e.g., `VOID_CONCURRENCY_EXTRACT_PDF=4`).
## Job-claim contract (psycopg + pg-boss SQL)
A Python worker loop, per queue name:
```python
async def claim(queue):
async with pool.connection() as conn:
async with conn.transaction():
row = await conn.execute("""
SELECT id, data
FROM pgboss.job
WHERE name=%s
AND state IN ('created','retry')
AND start_after <= now()
ORDER BY priority DESC, created_on ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
""", (queue,)).fetchone()
if not row: return None
await conn.execute(
"UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s",
(row.id,))
return row
```
After the handler returns:
- **Success:** `UPDATE pgboss.job SET state='completed', completed_on=now(), output=%s WHERE id=%s` with the handler's return dict.
- **Failure:** `UPDATE pgboss.job SET state='failed', output=%s WHERE id=%s` (truncated stack as the output payload). pg-boss's archive task moves failed rows out on its schedule.
- **Retry:** pg-boss's own retry semantics rely on `retry_count` + `retry_limit` + `retry_backoff`. We honor them: if `retry_count + 1 <= retry_limit`, set `state='retry'` and `start_after = now() + interval` (exponential per the existing retry_delay config).
This intentionally matches what Node's pg-boss does so a job processed by either side looks identical to the Jobs UI.
## Phase A — Python harness + systemd unit
**New files (in the void-v2 repo at `workers/`):**
```
workers/
pyproject.toml # pin Python 3.12, list deps
void_workers/
__init__.py
boss.py # psycopg pool + claim/finish helpers
runner.py # main entrypoint; loads handlers, spawns loops
config.py # env-driven config (concurrency, model names, paths)
handlers/
__init__.py
echo.py # trivial — same shape as Node's echo
tests/
conftest.py # shared fixtures (DB url from env, fresh boss schema)
test_boss.py # claim → finish roundtrip
test_echo.py # enqueue from Node-side fixture, run echo handler
```
**Deploy artefacts:**
- `deploy/void-workers.service` — systemd unit running `/opt/void-workers/venv/bin/python -m void_workers.runner`. EnvironmentFile = `/opt/void-workers/.env`. MemoryMax = 6G. Restart=on-failure.
- `deploy/push-workers.sh` — sibling to `push.sh`. Rsyncs `workers/` to `/opt/void-workers/`, installs venv if missing, `pip install -e .`, restarts the unit.
**One-time setup on CT 311** (when Phase A first deploys):
- `apt install -y python3.12 python3.12-venv python3-pip ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils`.
- `useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers` (separate user from `void` so each systemd unit's filesystem ownership is distinct).
- `/opt/void-workers/.env``DATABASE_URL`, `WHISPER_MODEL=small.en`, `BLOB_ROOT=/var/lib/void/blobs`, optional `VOID_CONCURRENCY_*`. Grant `voidworkers` read access to `/var/lib/void/blobs` (add to `void` group, dataset mode 750).
**Testing:** vitest can't reach Python; Python's pytest can't reach Node. We test the boundary by:
1. **Python side:** pytest spawns a real Postgres connection (the same shared dev DB), creates the `pgboss` schema directly via `pg-boss` lib loaded transiently (or by inserting rows by hand matching pg-boss's columns), runs handlers, asserts state transitions.
2. **Node side:** an existing vitest test that enqueues an `echo` job and asserts a *handler in Python* completes it is impractical without a Python runtime in CI. We instead trust that both ends use the same SQL.
Phase A commits land as `feat(workers): Python harness + boss claim/finish` and `feat(workers): echo handler + runner loop` and `chore(deploy): void-workers.service + push-workers.sh`.
## Phase B — `extract.pdf` and `extract.image`
**Trigger.** Node's `lib/jobs/workers/blob.js` (Plan 3) inspects `kind` after creating the ref:
- `kind === 'pdf'` → enqueue `extract.pdf` with `{ ref_id, blob_path }`.
- `kind === 'image'` → enqueue `extract.image` with same shape.
That's a tiny Node-side change in the existing blob worker. Plan 3's whole-doc embed trigger still fires on `refs.update` later when Python writes `body_text` back, so the OCR'd text becomes searchable automatically.
**`workers/void_workers/handlers/pdf.py`:**
```python
def handle(job):
ref_id = job.data["ref_id"]
blob_path = job.data["blob_path"]
text = subprocess.check_output(
["pdftotext", "-layout", blob_path, "-"], timeout=120
).decode("utf-8", errors="replace")
if len(text.strip()) < 200:
# Born-digital extraction yielded nothing → OCR each page.
text = ocr_pdf_pages(blob_path)
body_text = text[:200_000]
repo.update_ref(ref_id, body_text=body_text,
metadata_patch={"extract": {"method": ..., "chars": len(text)}})
return {"ref_id": ref_id, "chars": len(body_text)}
```
OCR helper uses `pdftoppm` to rasterize each page → Pillow → `pytesseract.image_to_string`.
**`workers/void_workers/handlers/image.py`:**
```python
def handle(job):
ref_id = job.data["ref_id"]
blob_path = job.data["blob_path"]
text = pytesseract.image_to_string(Image.open(blob_path)).strip()
repo.update_ref(ref_id, body_text=text[:200_000])
return {"ref_id": ref_id, "chars": len(text)}
```
**`workers/void_workers/repo.py`** — thin psycopg shim. `update_ref(id, body_text, metadata_patch)` performs the same `recordAudit` write Node's `repos/refs.js` would (so the audit log captures the worker's edit with `actor_kind='worker'`).
**Tests:**
- `tests/test_pdf.py` — vendored `tests/fixtures/born_digital.pdf` + `tests/fixtures/scanned.pdf`; assert body_text length thresholds.
- `tests/test_image.py` — small PNG with a known string baked in.
## Phase C — `ingest.video`
**Trigger.** The dispatch happens in `lib/api/routes/capture.js`, not the URL worker. After URL validation, detect a video host (`youtube.com / youtu.be / vimeo.com`) and enqueue `ingest.video` directly instead of `ingest.url`. Idempotency key + response shape stay the same. This way the URL worker is never invoked for video URLs (no half-written ref), and the video worker is the sole creator of the `refs` row for video captures.
**`workers/void_workers/handlers/video.py`:**
```python
def handle(job):
url = job.data["url"]
space_id = job.data["space_id"]
# 1) metadata
info = yt_dlp_extract_info(url) # title, description, duration, thumbnail, uploader
if info is None: # 403/404/age-gated/etc.
return {"skipped": "yt-dlp"}
# 2) audio
audio_path = yt_dlp_download_audio(url, fmt="bestaudio[ext=opus]/bestaudio")
try:
# 3) transcribe
transcript = whisper_transcribe(audio_path) # see model.py
finally:
os.unlink(audio_path)
# 4) create ref
ref_id = repo.create_ref({
"space_id": space_id,
"kind": "video",
"source_url": url,
"title": info["title"],
"summary": (info.get("description") or "")[:5000],
"body_text": transcript[:200_000],
"source_kind": "youtube" if "youtube" in url else "video",
"external_id": sha256(space_id + url),
"metadata": {
"duration_s": info.get("duration"),
"uploader": info.get("uploader"),
"thumbnail": info.get("thumbnail"),
},
})
return {"ref_id": ref_id, "chars": len(transcript)}
```
**`workers/void_workers/model.py`:**
```python
_whisper_model = None
def whisper_model():
global _whisper_model
if _whisper_model is None:
from faster_whisper import WhisperModel
device = "cuda" if cuda_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
name = os.environ.get("WHISPER_MODEL", "small.en")
_whisper_model = WhisperModel(name, device=device, compute_type=compute_type)
return _whisper_model
def whisper_transcribe(path):
segments, _info = whisper_model().transcribe(path, vad_filter=True)
return "\n".join(s.text.strip() for s in segments).strip()
def cuda_available():
try:
import ctranslate2
return ctranslate2.get_cuda_device_count() > 0
except Exception:
return False
```
The model loads once on first job and stays warm. On HA failover to Z3 (no GPU), `cuda_available()` returns False; the next process restart reloads with CPU + int8. Job durations multiply by ~10x but jobs still complete.
**Tests:** integration test gated on `WHISPER_TEST_AUDIO=path/to/clip.opus`; pure-unit tests mock `faster_whisper.WhisperModel`.
## Phase D — `sync.source_doc` + version bump
**Cron in Node.** Add `lib/cron/sync_source_docs.js` (Node) that runs every 24 h via a small node-cron schedule (`node-cron` dep) and enqueues `sync.source_doc` for every `source_docs` row with `sync_source='url'`.
**`workers/void_workers/handlers/sourcedoc.py`:**
```python
def handle(job):
doc_id = job.data["source_doc_id"]
doc = repo.get_source_doc(doc_id)
res = safe_fetch(doc.upstream_url, headers={...}, timeout=15)
if not res.ok:
return {"skipped": f"status={res.status}"}
body = res.text()
new_sha = sha256(body)
old_sha = (doc.metadata or {}).get("body_sha")
if new_sha == old_sha:
repo.update_source_doc(doc_id, last_synced=now())
return {"unchanged": True}
repo.update_source_doc(doc_id, body_text=body[:1_000_000],
last_synced=now(),
metadata_patch={"body_sha": new_sha})
return {"updated": True, "chars": len(body)}
```
`safe_fetch` here is a Python port of `lib/ingest/safe_fetch.js` — same blocklist, same scheme check, same redirect handling. Different language, same contract.
**Version bump.** `package.json` + `server.js``2.0.0-alpha.4`. `tests/server.test.js` assertion bumped. `CHANGELOG.md` appends Plan 4 entry.
**Completion doc.** `docs/plan-4-complete.md` mirrors the Plan 3 doc's shape.
## Error handling
- **Per-worker timeout** — every handler runs under a per-kind `signal.alarm` (or async timeout). Default: `pdf` 120 s, `image` 60 s, `video` 30 min, `sourcedoc` 30 s.
- **Retry semantics** mirror pg-boss's. Permanent errors (e.g., 403 from upstream) return a `{skipped: ...}` output and mark the job completed (no retry). Transient errors throw → state=retry → exponential backoff.
- **Memory budget.** Whisper `medium.en` on CPU peaks around 3 GB resident; `small.en` around 1 GB. systemd `MemoryMax=6G` on `void-workers.service` keeps headroom. If Whisper blows the limit, the unit restarts, the in-progress job retries.
- **Concurrent boss writes.** Both Node and Python may write `pgboss.job` rows for different queues. Their access never conflicts because each worker only mutates rows it claimed via `FOR UPDATE SKIP LOCKED`.
## Observability
- **Structured logs** via Python's `structlog`. JSON output to stderr — systemd routes to journald.
- **Log fields** match Node: `job_id`, `job_name`, `entity_type`, `entity_id`, `outcome`, `duration_ms`.
- **Jobs UI** (Plan 3) shows Python-side jobs without changes — both worker classes write to the same `pgboss.job` rows.
- **Per-handler counters** exposed at `GET /metrics` from a small Python aiohttp endpoint on `localhost:9090` — text/plain Prometheus format, ungated (LAN-only). Optional; ship in Phase D.
## Testing strategy
- **Unit:** pure Python tests with mocked subprocess + mocked Whisper.
- **Repo:** psycopg roundtrips against the same test DB Node uses. The shared `resetDb` test helper drops the `pgboss` schema between suites; Python tests rely on the same shape.
- **Integration:** gated on env (`WHISPER_TEST_AUDIO`, `INGEST_LIVE_PDF=path/...`). Skip if not present.
- **Cross-language:** one end-to-end test fixture lives in `tests/integration/python-claims-node-enqueued.py` — manually enqueue a Python-handled job from the Node fixture, run the Python worker once, assert side effects in the DB. Run on-demand, not in vitest.
## Deploy delta
`.env` on CT 311 already has the right base. **New env vars** that need adding before Phase C:
```
WHISPER_MODEL=small.en
WHISPER_CACHE=/var/lib/void/whisper-models
VOID_CONCURRENCY_EXTRACT_PDF=2
VOID_CONCURRENCY_EXTRACT_IMAGE=2
VOID_CONCURRENCY_INGEST_VIDEO=1
VOID_CONCURRENCY_SYNC_SOURCE_DOC=1
```
`deploy/README.md` extends with the void-workers bootstrap section (Python deps, voidworkers user, model cache dir).
**Infrastructure changes (Phase C only — pre-authorized 2026-06-01):**
- `pct set 311 -memory 8192 -cores 6` (resize from 4 GB / 4 cores).
- Add device-node lines to CT 311's pct config (copy the six `dev0..dev5` lines from CT 102):
```
dev0: /dev/nvidia0,gid=44
dev1: /dev/nvidiactl,gid=44
dev2: /dev/nvidia-uvm,gid=44
dev3: /dev/nvidia-uvm-tools,gid=44
dev4: /dev/nvidia-caps/nvidia-cap1,gid=44
dev5: /dev/nvidia-caps/nvidia-cap2,gid=44
```
- One CT restart (~30 s downtime on void-server).
- Snapshot CT 311 before this change (standing rule).
Phases A + B do not need any infra change — pure code + service install.
## Known follow-ups (not Plan 4)
- Larger Whisper models (medium / large) with proper GPU memory accounting.
- Per-language Tesseract data (currently English only).
- Speaker diarization in transcripts.
- "Re-transcribe" UI action.
- Move the blob store off CT 311's rootfs to a dedicated ZFS dataset (current /var/lib/void is on rootfs).
- pg-boss archive rotation policy tuning once we have transcript-sized rows in the queue.
## Open items for the user
- **alpha-4 deploy.** Standing rule as alpha-3 — won't deploy without your explicit OK; alpha-3 stays live until then. (The Phase C CT 311 resize + GPU passthrough is pre-authorized but is a separate decision from "promote alpha-4 to alpha-3's slot in prod".)
- **`WHISPER_MODEL`** default is `small.en`. You can bump it to `medium.en` once GPU is wired and you decide quality vs speed.
- **`yt-dlp`** can require cookies for age-gated content. Plan 4 ships without cookies; we can add an opt-in `YT_DLP_COOKIES_FILE` env later.