Files
Void-Homelab/docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md
2026-06-01 04:33:48 +10:00

20 KiB

Void 2.0 — Plan 4 Design Spec: Python void-workers (heavy ML ingest)

Date: 2026-06-01 Builds on: Plan 1 (Foundation, complete), Plan 2 (API + UI shell, complete, alpha-2 deployed), Plan 3 (capture pipeline + hybrid search, complete, alpha-3 deployed live on CT 311). Master spec: docs/superpowers/specs/2026-05-31-void-v2-design.md.

Goal

Stand up the Python void-workers service that handles ML-heavy ingest jobs the Node service intentionally defers: PDF text extraction (born-digital + scanned), image OCR, YouTube/audio transcription via Whisper, and upstream source-doc sync. Workers claim jobs from the same pg-boss queue Node uses, so the existing Plan 3 capture surface and Jobs UI work unchanged.

Scope

Plan 4 covers four worker kinds and the harness around them, structured as four phases each ending green and demonstrable:

  • Phase A — psycopg-based pg-boss client + worker harness + systemd unit + fixture worker.
  • Phase Bextract.pdf (pdftotext → Tesseract fallback) + extract.image (Tesseract OCR). CPU-only.
  • Phase Cingest.video (yt-dlp + ffmpeg + faster-whisper). GPU-when-available, CPU fallback.
  • Phase Dsync.source_doc (poll + sha256-diff upstream URLs into source_docs.body_text) + version bump to 2.0.0-alpha.4 + CHANGELOG + completion doc.

Out of scope (Plan 5+)

  • MCP server surface (Plan 5).
  • Companion chat in the right rail (Plan 5).
  • Sacred Valley widgets ported from Void 1.x (Plan 6).
  • Inbound email capture, Slack/Discord adapters, RSS firehose — different ingest shapes.
  • Re-transcription with a different Whisper model from the SPA — operator-only via direct enqueue for now.
  • Embedding chunks table — still whole-doc per Plan 3's decision.

Decisions locked by brainstorm

Question Answer
Plan 4 slice Full slice: PDFs + images + Whisper + yt-dlp + source-doc sync.
Host location Same CT 311 as Node, second systemd unit. No second LXC.
GPU/HA tension Single CT 311 with GPU device-nodes passed through; faster-whisper detects CUDA at startup and falls back to CPU on HA failover to Z3 (no GPU).
Job-claim mechanism Native SQL via psycopg against pg-boss tables. SELECT … FOR UPDATE SKIP LOCKED LIMIT 1. ~200 lines. No third-party pg-boss port.
Video ingest output Transcript + yt-dlp metadata (title, description, duration, thumbnail) → refs.body_text + refs.metadata. Audio temp file deleted after transcription.
Whisper model small.en by default. Env-configurable via WHISPER_MODEL (tiny.en / base.en / small.en / medium.en / large-v3).
PDF extract strategy pdftotext first; if extracted text is empty or < 200 chars total, fall back to per-page Tesseract OCR.
Source-doc sync trigger Cron in Node (/lib/cron), not a self-polling Python loop. Cron enqueues sync.source_doc jobs every 24 h for rows with sync_source='url'.
Sequencing Phase A → B → C → D. Phase boundary = green tests + commit + memory checkpoint.

Architecture

                ┌──────────────────────────────────────────────┐
                │  CT 311 (192.168.1.13) — Z host              │
                │                                              │
   /api/* ────▶ │  void-server.service                         │
   (Plan 3)     │  (Node + pg-boss + Jobs UI)                  │
                │      │                                       │
                │      │   enqueue                             │
                │      ▼                                       │
                │  ┌──────────────────────────────┐            │
                │  │  Postgres (CT 310 @ .215)    │            │
                │  │  pgboss.job / archive        │            │
                │  └──────────────┬───────────────┘            │
                │                 │  claim (FOR UPDATE SKIP    │
                │                 │   LOCKED)                  │
                │                 ▼                            │
                │  void-workers.service                        │
                │  (Python venv at /opt/void-workers/venv)     │
                │   ├─ pdf.py        (pdftotext + Tesseract)   │
                │   ├─ image.py      (Tesseract)               │
                │   ├─ video.py      (yt-dlp + faster-whisper) │
                │   └─ sourcedoc.py  (fetch + sha256 diff)     │
                │                                              │
                │   /var/lib/void/blobs/  (shared with Node)   │
                └──────────────────────────────────────────────┘
                            │
              ┌─────────────┴─────────────┐
              ▼                           ▼
        Ollama on CT 102            Karakeep on CT 100
         (already wired)             (already wired)

Both Node and Python processes run inside CT 311. They share:

  • The blob store at /var/lib/void/blobs/ (same disk, no NFS).
  • The Postgres database via DATABASE_URL.
  • The pg-boss schema. Python writes the same rows pg-boss writes.

GPU device nodes (/dev/nvidia0 etc.) are bind-mounted into CT 311 by the Proxmox host. The CT also gets a memory + core bump (4 GB / 4 cores → 8 GB / 6 cores).

systemd memory limits (MemoryMax=6G on void-workers.service, MemoryMax=1G on void-server.service) keep one process from OOM-killing the other.

Process model

Each worker runs as one OS process inside the same void-workers.service. Concurrency per kind is set at startup; the service supervisor spawns N worker greenlets/threads using multiprocessing.dummy.Pool (so we share the psycopg pool and Python imports):

Worker Default concurrency Reason
extract.pdf 2 Tesseract is CPU-bound on a 6-core CT, leave room for Node + image
extract.image 2 Same
ingest.video 1 Whisper is GPU-bound (or memory-bound on CPU); serialize
sync.source_doc 1 Don't hammer upstream hosts

Override via env VOID_CONCURRENCY_<NAME> (e.g., VOID_CONCURRENCY_EXTRACT_PDF=4).

Job-claim contract (psycopg + pg-boss SQL)

A Python worker loop, per queue name:

async def claim(queue):
    async with pool.connection() as conn:
        async with conn.transaction():
            row = await conn.execute("""
              SELECT id, data
                FROM pgboss.job
               WHERE name=%s
                 AND state IN ('created','retry')
                 AND start_after <= now()
                 ORDER BY priority DESC, created_on ASC
                 FOR UPDATE SKIP LOCKED
                 LIMIT 1
            """, (queue,)).fetchone()
            if not row: return None
            await conn.execute(
              "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s",
              (row.id,))
            return row

After the handler returns:

  • Success: UPDATE pgboss.job SET state='completed', completed_on=now(), output=%s WHERE id=%s with the handler's return dict.
  • Failure: UPDATE pgboss.job SET state='failed', output=%s WHERE id=%s (truncated stack as the output payload). pg-boss's archive task moves failed rows out on its schedule.
  • Retry: pg-boss's own retry semantics rely on retry_count + retry_limit + retry_backoff. We honor them: if retry_count + 1 <= retry_limit, set state='retry' and start_after = now() + interval (exponential per the existing retry_delay config).

This intentionally matches what Node's pg-boss does so a job processed by either side looks identical to the Jobs UI.

Phase A — Python harness + systemd unit

New files (in the void-v2 repo at workers/):

workers/
  pyproject.toml          # pin Python 3.12, list deps
  void_workers/
    __init__.py
    boss.py               # psycopg pool + claim/finish helpers
    runner.py             # main entrypoint; loads handlers, spawns loops
    config.py             # env-driven config (concurrency, model names, paths)
    handlers/
      __init__.py
      echo.py             # trivial — same shape as Node's echo
  tests/
    conftest.py           # shared fixtures (DB url from env, fresh boss schema)
    test_boss.py          # claim → finish roundtrip
    test_echo.py          # enqueue from Node-side fixture, run echo handler

Deploy artefacts:

  • deploy/void-workers.service — systemd unit running /opt/void-workers/venv/bin/python -m void_workers.runner. EnvironmentFile = /opt/void-workers/.env. MemoryMax = 6G. Restart=on-failure.
  • deploy/push-workers.sh — sibling to push.sh. Rsyncs workers/ to /opt/void-workers/, installs venv if missing, pip install -e ., restarts the unit.

One-time setup on CT 311 (when Phase A first deploys):

  • apt install -y python3.12 python3.12-venv python3-pip ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils.
  • useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers (separate user from void so each systemd unit's filesystem ownership is distinct).
  • /opt/void-workers/.envDATABASE_URL, WHISPER_MODEL=small.en, BLOB_ROOT=/var/lib/void/blobs, optional VOID_CONCURRENCY_*. Grant voidworkers read access to /var/lib/void/blobs (add to void group, dataset mode 750).

Testing: vitest can't reach Python; Python's pytest can't reach Node. We test the boundary by:

  1. Python side: pytest spawns a real Postgres connection (the same shared dev DB), creates the pgboss schema directly via pg-boss lib loaded transiently (or by inserting rows by hand matching pg-boss's columns), runs handlers, asserts state transitions.
  2. Node side: an existing vitest test that enqueues an echo job and asserts a handler in Python completes it is impractical without a Python runtime in CI. We instead trust that both ends use the same SQL.

Phase A commits land as feat(workers): Python harness + boss claim/finish and feat(workers): echo handler + runner loop and chore(deploy): void-workers.service + push-workers.sh.

Phase B — extract.pdf and extract.image

Trigger. Node's lib/jobs/workers/blob.js (Plan 3) inspects kind after creating the ref:

  • kind === 'pdf' → enqueue extract.pdf with { ref_id, blob_path }.
  • kind === 'image' → enqueue extract.image with same shape.

That's a tiny Node-side change in the existing blob worker. Plan 3's whole-doc embed trigger still fires on refs.update later when Python writes body_text back, so the OCR'd text becomes searchable automatically.

workers/void_workers/handlers/pdf.py:

def handle(job):
    ref_id = job.data["ref_id"]
    blob_path = job.data["blob_path"]
    text = subprocess.check_output(
        ["pdftotext", "-layout", blob_path, "-"], timeout=120
    ).decode("utf-8", errors="replace")
    if len(text.strip()) < 200:
        # Born-digital extraction yielded nothing → OCR each page.
        text = ocr_pdf_pages(blob_path)
    body_text = text[:200_000]
    repo.update_ref(ref_id, body_text=body_text,
                    metadata_patch={"extract": {"method": ..., "chars": len(text)}})
    return {"ref_id": ref_id, "chars": len(body_text)}

OCR helper uses pdftoppm to rasterize each page → Pillow → pytesseract.image_to_string.

workers/void_workers/handlers/image.py:

def handle(job):
    ref_id = job.data["ref_id"]
    blob_path = job.data["blob_path"]
    text = pytesseract.image_to_string(Image.open(blob_path)).strip()
    repo.update_ref(ref_id, body_text=text[:200_000])
    return {"ref_id": ref_id, "chars": len(text)}

workers/void_workers/repo.py — thin psycopg shim. update_ref(id, body_text, metadata_patch) performs the same recordAudit write Node's repos/refs.js would (so the audit log captures the worker's edit with actor_kind='worker').

Tests:

  • tests/test_pdf.py — vendored tests/fixtures/born_digital.pdf + tests/fixtures/scanned.pdf; assert body_text length thresholds.
  • tests/test_image.py — small PNG with a known string baked in.

Phase C — ingest.video

Trigger. The dispatch happens in lib/api/routes/capture.js, not the URL worker. After URL validation, detect a video host (youtube.com / youtu.be / vimeo.com) and enqueue ingest.video directly instead of ingest.url. Idempotency key + response shape stay the same. This way the URL worker is never invoked for video URLs (no half-written ref), and the video worker is the sole creator of the refs row for video captures.

workers/void_workers/handlers/video.py:

def handle(job):
    url = job.data["url"]
    space_id = job.data["space_id"]

    # 1) metadata
    info = yt_dlp_extract_info(url)        # title, description, duration, thumbnail, uploader
    if info is None:                       # 403/404/age-gated/etc.
        return {"skipped": "yt-dlp"}

    # 2) audio
    audio_path = yt_dlp_download_audio(url, fmt="bestaudio[ext=opus]/bestaudio")
    try:
        # 3) transcribe
        transcript = whisper_transcribe(audio_path)   # see model.py
    finally:
        os.unlink(audio_path)

    # 4) create ref
    ref_id = repo.create_ref({
        "space_id": space_id,
        "kind": "video",
        "source_url": url,
        "title": info["title"],
        "summary": (info.get("description") or "")[:5000],
        "body_text": transcript[:200_000],
        "source_kind": "youtube" if "youtube" in url else "video",
        "external_id": sha256(space_id + url),
        "metadata": {
            "duration_s": info.get("duration"),
            "uploader": info.get("uploader"),
            "thumbnail": info.get("thumbnail"),
        },
    })
    return {"ref_id": ref_id, "chars": len(transcript)}

workers/void_workers/model.py:

_whisper_model = None
def whisper_model():
    global _whisper_model
    if _whisper_model is None:
        from faster_whisper import WhisperModel
        device = "cuda" if cuda_available() else "cpu"
        compute_type = "float16" if device == "cuda" else "int8"
        name = os.environ.get("WHISPER_MODEL", "small.en")
        _whisper_model = WhisperModel(name, device=device, compute_type=compute_type)
    return _whisper_model

def whisper_transcribe(path):
    segments, _info = whisper_model().transcribe(path, vad_filter=True)
    return "\n".join(s.text.strip() for s in segments).strip()

def cuda_available():
    try:
        import ctranslate2
        return ctranslate2.get_cuda_device_count() > 0
    except Exception:
        return False

The model loads once on first job and stays warm. On HA failover to Z3 (no GPU), cuda_available() returns False; the next process restart reloads with CPU + int8. Job durations multiply by ~10x but jobs still complete.

Tests: integration test gated on WHISPER_TEST_AUDIO=path/to/clip.opus; pure-unit tests mock faster_whisper.WhisperModel.

Phase D — sync.source_doc + version bump

Cron in Node. Add lib/cron/sync_source_docs.js (Node) that runs every 24 h via a small node-cron schedule (node-cron dep) and enqueues sync.source_doc for every source_docs row with sync_source='url'.

workers/void_workers/handlers/sourcedoc.py:

def handle(job):
    doc_id = job.data["source_doc_id"]
    doc = repo.get_source_doc(doc_id)
    res = safe_fetch(doc.upstream_url, headers={...}, timeout=15)
    if not res.ok:
        return {"skipped": f"status={res.status}"}
    body = res.text()
    new_sha = sha256(body)
    old_sha = (doc.metadata or {}).get("body_sha")
    if new_sha == old_sha:
        repo.update_source_doc(doc_id, last_synced=now())
        return {"unchanged": True}
    repo.update_source_doc(doc_id, body_text=body[:1_000_000],
                          last_synced=now(),
                          metadata_patch={"body_sha": new_sha})
    return {"updated": True, "chars": len(body)}

safe_fetch here is a Python port of lib/ingest/safe_fetch.js — same blocklist, same scheme check, same redirect handling. Different language, same contract.

Version bump. package.json + server.js2.0.0-alpha.4. tests/server.test.js assertion bumped. CHANGELOG.md appends Plan 4 entry.

Completion doc. docs/plan-4-complete.md mirrors the Plan 3 doc's shape.

Error handling

  • Per-worker timeout — every handler runs under a per-kind signal.alarm (or async timeout). Default: pdf 120 s, image 60 s, video 30 min, sourcedoc 30 s.
  • Retry semantics mirror pg-boss's. Permanent errors (e.g., 403 from upstream) return a {skipped: ...} output and mark the job completed (no retry). Transient errors throw → state=retry → exponential backoff.
  • Memory budget. Whisper medium.en on CPU peaks around 3 GB resident; small.en around 1 GB. systemd MemoryMax=6G on void-workers.service keeps headroom. If Whisper blows the limit, the unit restarts, the in-progress job retries.
  • Concurrent boss writes. Both Node and Python may write pgboss.job rows for different queues. Their access never conflicts because each worker only mutates rows it claimed via FOR UPDATE SKIP LOCKED.

Observability

  • Structured logs via Python's structlog. JSON output to stderr — systemd routes to journald.
  • Log fields match Node: job_id, job_name, entity_type, entity_id, outcome, duration_ms.
  • Jobs UI (Plan 3) shows Python-side jobs without changes — both worker classes write to the same pgboss.job rows.
  • Per-handler counters exposed at GET /metrics from a small Python aiohttp endpoint on localhost:9090 — text/plain Prometheus format, ungated (LAN-only). Optional; ship in Phase D.

Testing strategy

  • Unit: pure Python tests with mocked subprocess + mocked Whisper.
  • Repo: psycopg roundtrips against the same test DB Node uses. The shared resetDb test helper drops the pgboss schema between suites; Python tests rely on the same shape.
  • Integration: gated on env (WHISPER_TEST_AUDIO, INGEST_LIVE_PDF=path/...). Skip if not present.
  • Cross-language: one end-to-end test fixture lives in tests/integration/python-claims-node-enqueued.py — manually enqueue a Python-handled job from the Node fixture, run the Python worker once, assert side effects in the DB. Run on-demand, not in vitest.

Deploy delta

.env on CT 311 already has the right base. New env vars that need adding before Phase C:

WHISPER_MODEL=small.en
WHISPER_CACHE=/var/lib/void/whisper-models
VOID_CONCURRENCY_EXTRACT_PDF=2
VOID_CONCURRENCY_EXTRACT_IMAGE=2
VOID_CONCURRENCY_INGEST_VIDEO=1
VOID_CONCURRENCY_SYNC_SOURCE_DOC=1

deploy/README.md extends with the void-workers bootstrap section (Python deps, voidworkers user, model cache dir).

Infrastructure changes (Phase C only — pre-authorized 2026-06-01):

  • pct set 311 -memory 8192 -cores 6 (resize from 4 GB / 4 cores).
  • Add device-node lines to CT 311's pct config (copy the six dev0..dev5 lines from CT 102):
    dev0: /dev/nvidia0,gid=44
    dev1: /dev/nvidiactl,gid=44
    dev2: /dev/nvidia-uvm,gid=44
    dev3: /dev/nvidia-uvm-tools,gid=44
    dev4: /dev/nvidia-caps/nvidia-cap1,gid=44
    dev5: /dev/nvidia-caps/nvidia-cap2,gid=44
    
  • One CT restart (~30 s downtime on void-server).
  • Snapshot CT 311 before this change (standing rule).

Phases A + B do not need any infra change — pure code + service install.

Known follow-ups (not Plan 4)

  • Larger Whisper models (medium / large) with proper GPU memory accounting.
  • Per-language Tesseract data (currently English only).
  • Speaker diarization in transcripts.
  • "Re-transcribe" UI action.
  • Move the blob store off CT 311's rootfs to a dedicated ZFS dataset (current /var/lib/void is on rootfs).
  • pg-boss archive rotation policy tuning once we have transcript-sized rows in the queue.

Open items for the user

  • alpha-4 deploy. Standing rule as alpha-3 — won't deploy without your explicit OK; alpha-3 stays live until then. (The Phase C CT 311 resize + GPU passthrough is pre-authorized but is a separate decision from "promote alpha-4 to alpha-3's slot in prod".)
  • WHISPER_MODEL default is small.en. You can bump it to medium.en once GPU is wired and you decide quality vs speed.
  • yt-dlp can require cookies for age-gated content. Plan 4 ships without cookies; we can add an opt-in YT_DLP_COOKIES_FILE env later.