Files
Void-Homelab/docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md
root c4663992ec docs: Plan 4 implementation plan
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:39:55 +10:00

2122 lines
72 KiB
Markdown

# Void 2.0 — Plan 4: Python void-workers (heavy ML ingest)
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Stand up the Python `void-workers` service alongside the existing Node `void-server` on CT 311. Workers claim jobs from the same pg-boss queue via native SQL (`SELECT ... FOR UPDATE SKIP LOCKED`) and process four kinds: `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Whisper detects CUDA at startup and falls back to CPU when the HA replica is running on Z3 (no GPU).
**Architecture:** Single CT, two systemd units (`void-server.service` + `void-workers.service`). Shared blob store at `/var/lib/void/blobs`. Shared Postgres at CT 310. Python venv at `/opt/void-workers/venv`. Per-kind handlers run as threads inside one Python process; concurrency configured via env. Node-side hooks: capture.js routes YouTube URLs to `ingest.video`; blob worker enqueues `extract.pdf`/`extract.image` based on `kind`.
**Tech Stack:** Python 3.12, psycopg 3, pdftotext + Tesseract + Poppler + Pillow, faster-whisper (CTranslate2 backend), yt-dlp, ffmpeg, structlog, pytest, vitest (existing), node-cron (new Node dep for Phase D).
**Spec:** `docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md`.
---
## Out of scope (Plan 5+)
- MCP server surface, Companion chat (Plan 5).
- Sacred Valley widgets (Plan 6).
- Speaker diarization, per-language Tesseract data, larger Whisper models with proper GPU budgeting.
- "Re-transcribe" UI action.
## Conventions
1. **TDD.** Write the failing test first. Run it red. Implement. Run it green. Commit.
2. **Worker handler shape:** `def handler(job_data: dict) -> dict` — return the success output, raise for retry.
3. **Worker name strings:** `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Exact strings — used in Node enqueue calls AND Python `SUBSCRIBE` registrations.
4. **All Python repo writes pass `actor={'kind': 'worker', 'id': None}`** when emitting audit_log rows.
5. **Commit per task.**
6. **Snapshot CT 310 + CT 311 before any phase that touches infra** (the standing rule). Phase A is code-only. Phase B installs system deps. Phase C resizes the CT + adds GPU passthrough — explicit snapshot before that one.
---
## File structure (what this plan creates)
```
workers/ # NEW — Python service lives in same repo
pyproject.toml # Python 3.12 + deps (psycopg, structlog, ...)
README.md # short ops notes
void_workers/
__init__.py
config.py # env loader
log.py # structlog setup
boss.py # claim / finish / fail / retry against pgboss tables
repo.py # psycopg shim for refs / source_docs updates + audit
safe_fetch.py # Python port of lib/ingest/safe_fetch.js
runner.py # main: load handlers, spawn loops, signal handlers
handlers/
__init__.py
echo.py # trivial — proves harness end-to-end
pdf.py # extract.pdf (Phase B)
image.py # extract.image (Phase B)
video.py # ingest.video (Phase C)
sourcedoc.py # sync.source_doc (Phase D)
model.py # Whisper loader (CUDA detect + CPU fallback) — Phase C
tests/
conftest.py # shared fixtures (DB url, pgboss schema reset, sample blobs)
fixtures/
born_digital.pdf # tiny PDF with embedded text
scanned.pdf # tiny PDF that is a single page of an image
eng_text.png # PNG with known text baked in
test_boss.py
test_echo.py
test_pdf.py
test_image.py
test_video.py
test_sourcedoc.py
test_safe_fetch.py
deploy/
void-workers.service # NEW systemd unit
push-workers.sh # NEW — sibling to push.sh
README.md # extend: workers bootstrap section
lib/ # Node-side changes (already exists)
jobs/workers/blob.js # MODIFY: after creating ref, enqueue extract.pdf/extract.image
api/routes/capture.js # MODIFY: detect YouTube/Vimeo, enqueue ingest.video
cron/sync_source_docs.js # NEW — node-cron, fires sync.source_doc daily
package.json # ADD node-cron, bump version in Phase D
server.js # bump VERSION + boot the cron in Phase D
CHANGELOG.md # Phase D entry
tests/server.test.js # version assertion bump
docs/plan-4-complete.md # Phase D
```
---
## Phase A — Python harness + systemd unit
### Task A1: Workers skeleton + venv install on the dev box
**Files:**
- Create: `workers/pyproject.toml`
- Create: `workers/void_workers/__init__.py`
- Create: `workers/void_workers/config.py`
- Create: `workers/void_workers/log.py`
- Create: `workers/README.md`
- [ ] **Step 1:** Create `workers/pyproject.toml`:
```toml
[project]
name = "void-workers"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"psycopg[binary,pool]>=3.2",
"structlog>=24.1",
]
[project.optional-dependencies]
pdf = ["pdfplumber>=0.11", "pytesseract>=0.3.13", "pillow>=10.3"]
image = ["pytesseract>=0.3.13", "pillow>=10.3"]
video = ["yt-dlp>=2024.10.0", "faster-whisper>=1.0.3"]
test = ["pytest>=8.0", "pytest-asyncio>=0.23"]
all = ["void-workers[pdf,image,video,test]"]
[tool.setuptools.packages.find]
where = ["."]
include = ["void_workers*"]
```
- [ ] **Step 2:** Create `workers/void_workers/__init__.py`:
```python
__version__ = "0.1.0"
```
- [ ] **Step 3:** Create `workers/void_workers/config.py`:
```python
import os
def env(name, default=None, required=False):
v = os.environ.get(name, default)
if required and v is None:
raise RuntimeError(f"env {name} is required")
return v
def env_int(name, default):
return int(os.environ.get(name, default))
DATABASE_URL = env("DATABASE_URL", required=True)
BLOB_ROOT = env("BLOB_ROOT", "/var/lib/void/blobs")
WHISPER_MODEL = env("WHISPER_MODEL", "small.en")
WHISPER_CACHE = env("WHISPER_CACHE", "/var/lib/void/whisper-models")
ALLOW_PRIVATE = env("VOID_INGEST_ALLOW_PRIVATE", "false") == "true"
CONCURRENCY = {
"extract.pdf": env_int("VOID_CONCURRENCY_EXTRACT_PDF", 2),
"extract.image": env_int("VOID_CONCURRENCY_EXTRACT_IMAGE", 2),
"ingest.video": env_int("VOID_CONCURRENCY_INGEST_VIDEO", 1),
"sync.source_doc": env_int("VOID_CONCURRENCY_SYNC_SOURCE_DOC", 1),
"echo": env_int("VOID_CONCURRENCY_ECHO", 1),
}
POLL_INTERVAL_MS = env_int("VOID_POLL_INTERVAL_MS", 1000)
```
- [ ] **Step 4:** Create `workers/void_workers/log.py`:
```python
import logging
import structlog
def init():
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)
return structlog.get_logger()
log = init()
```
- [ ] **Step 5:** Create `workers/README.md`:
```markdown
# void-workers
Python ML ingest service alongside void-server (Node). Sibling of `lib/`.
## Local dev
```bash
cd workers
python3.12 -m venv .venv
. .venv/bin/activate
pip install -e ".[all]"
export DATABASE_URL="postgres://..."
python -m void_workers.runner
```
## Tests
```bash
pip install -e ".[test,all]"
pytest -v
```
See `../docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md` for the full plan.
```
- [ ] **Step 6:** Create the venv + install:
```bash
cd /project/src/void-v2/workers
python3.12 -m venv .venv
. .venv/bin/activate
pip install -e ".[all]"
python -c "from void_workers import __version__; print(__version__)"
```
Expected: `0.1.0`.
- [ ] **Step 7: Commit.**
```bash
cd /project/src/void-v2
git add workers/pyproject.toml workers/README.md workers/void_workers/__init__.py workers/void_workers/config.py workers/void_workers/log.py
git commit -m "feat(workers): Python skeleton + config + structlog"
```
### Task A2: pgboss claim/finish helpers in `boss.py`
**Files:**
- Create: `workers/void_workers/boss.py`
- Create: `workers/tests/conftest.py`
- Create: `workers/tests/test_boss.py`
- [ ] **Step 1:** Create `workers/tests/conftest.py`:
```python
import os
import pytest
import psycopg
DB_URL = os.environ["DATABASE_URL"]
@pytest.fixture
def conn():
with psycopg.connect(DB_URL, autocommit=True) as c:
yield c
@pytest.fixture(autouse=True)
def reset_pgboss(conn):
"""Drop and recreate the pgboss schema before each test."""
conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE")
# We do NOT recreate it — pg-boss's first start() makes the schema.
# Tests that need it call ensure_pgboss(conn).
yield
def ensure_pgboss(conn):
"""Bring pgboss schema up by shelling out to Node's queue.start()."""
# In tests we cannot rely on Node — so we create the bare minimum
# tables we touch. Real pgboss tables are richer; we only need the
# columns boss.py reads and writes.
conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss")
conn.execute("""
CREATE TABLE IF NOT EXISTS pgboss.job (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
name text NOT NULL,
priority int NOT NULL DEFAULT 0,
data jsonb,
state text NOT NULL DEFAULT 'created',
retry_limit int NOT NULL DEFAULT 5,
retry_count int NOT NULL DEFAULT 0,
retry_delay int NOT NULL DEFAULT 10,
retry_backoff boolean NOT NULL DEFAULT true,
start_after timestamptz NOT NULL DEFAULT now(),
started_on timestamptz,
completed_on timestamptz,
created_on timestamptz NOT NULL DEFAULT now(),
output jsonb
)
""")
@pytest.fixture
def boss_ready(conn):
ensure_pgboss(conn)
yield conn
```
- [ ] **Step 2:** Write the failing test `workers/tests/test_boss.py`:
```python
import json
from void_workers.boss import Boss
def test_claim_returns_none_when_no_jobs(boss_ready):
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
assert boss.claim("echo") is None
def test_claim_then_complete_round_trip(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
(json.dumps({"ping": 1}),)
)
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
job = boss.claim("echo")
assert job is not None
assert job["data"] == {"ping": 1}
boss.complete(job["id"], {"pong": 1})
row = boss_ready.execute(
"SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],)
).fetchone()
assert row[0] == "completed"
assert row[1] == {"pong": 1}
def test_fail_records_output_and_state(boss_ready):
boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')")
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
job = boss.claim("echo")
boss.fail(job["id"], {"name": "BoomError", "message": "boom"})
row = boss_ready.execute(
"SELECT state, output, retry_count FROM pgboss.job WHERE id=%s",
(job["id"],)
).fetchone()
# retry_limit defaults to 5 and retry_count was 0 → should move to 'retry'
assert row[0] == "retry"
assert row[2] == 1
def test_fail_past_retry_limit_marks_failed(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)"
)
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
job = boss.claim("echo")
boss.fail(job["id"], {"name": "Done"})
row = boss_ready.execute(
"SELECT state FROM pgboss.job WHERE id=%s", (job["id"],)
).fetchone()
assert row[0] == "failed"
```
- [ ] **Step 3:** Run red:
```bash
cd /project/src/void-v2/workers
. .venv/bin/activate
DATABASE_URL="$(grep ^DATABASE_URL= ../.env 2>/dev/null | cut -d= -f2- ||
echo "postgres://void:$(ssh root@192.168.1.124 cat /root/void2-db-pass.txt | sed s/^DB_PASS=//)@192.168.1.215:5432/void")" \
pytest tests/test_boss.py -v
```
Expected: 4 failed (boss.py does not exist).
- [ ] **Step 4:** Implement `workers/void_workers/boss.py`:
```python
import json
import psycopg
from psycopg.rows import dict_row
class Boss:
def __init__(self, dsn):
self.dsn = dsn
def _conn(self):
return psycopg.connect(self.dsn, autocommit=False, row_factory=dict_row)
def claim(self, queue):
"""Atomically claim one job for the given queue. Returns dict or None."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, data, retry_count, retry_limit
FROM pgboss.job
WHERE name=%s
AND state IN ('created','retry')
AND start_after <= now()
ORDER BY priority DESC, created_on ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
""", (queue,))
row = cur.fetchone()
if not row: return None
cur.execute(
"UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s",
(row["id"],)
)
conn.commit()
return row
def complete(self, job_id, output):
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE pgboss.job
SET state='completed', completed_on=now(), output=%s
WHERE id=%s
""", (json.dumps(output), job_id))
conn.commit()
def fail(self, job_id, output):
"""Mark the job retry (if budget left) or failed (otherwise)."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT retry_count, retry_limit, retry_delay, retry_backoff "
"FROM pgboss.job WHERE id=%s FOR UPDATE",
(job_id,)
)
row = cur.fetchone()
if not row: return
new_count = row["retry_count"] + 1
if new_count > row["retry_limit"]:
cur.execute(
"UPDATE pgboss.job SET state='failed', output=%s, "
"completed_on=now() WHERE id=%s",
(json.dumps(output), job_id)
)
else:
# exponential backoff: delay * 2^(count-1)
delay = row["retry_delay"]
if row["retry_backoff"]:
delay = delay * (2 ** (new_count - 1))
cur.execute("""
UPDATE pgboss.job
SET state='retry',
retry_count=%s,
start_after=now() + (%s || ' seconds')::interval,
output=%s
WHERE id=%s
""", (new_count, str(delay), json.dumps(output), job_id))
conn.commit()
```
- [ ] **Step 5:** Run green:
```bash
DATABASE_URL=... pytest tests/test_boss.py -v
```
Expected: 4 passed.
- [ ] **Step 6: Commit.**
```bash
cd /project/src/void-v2
git add workers/void_workers/boss.py workers/tests/conftest.py workers/tests/test_boss.py
git commit -m "feat(workers): pgboss claim/complete/fail via psycopg"
```
### Task A3: Echo handler + runner loop
**Files:**
- Create: `workers/void_workers/handlers/__init__.py`
- Create: `workers/void_workers/handlers/echo.py`
- Create: `workers/void_workers/runner.py`
- Create: `workers/tests/test_echo.py`
- [ ] **Step 1:** Write failing test `workers/tests/test_echo.py`:
```python
import json
import threading
import time
from void_workers.boss import Boss
from void_workers.runner import Runner
from void_workers.config import DATABASE_URL
def test_echo_handler_runs(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
(json.dumps({"ping": 42}),)
)
runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}})
t = threading.Thread(target=runner.run, kwargs={"once": True}, daemon=True)
t.start()
t.join(timeout=5)
row = boss_ready.execute(
"SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1"
).fetchone()
assert row[0] == "completed"
assert row[1] == {"pong": 42}
```
- [ ] **Step 2:** Run red:
```bash
DATABASE_URL=... pytest tests/test_echo.py -v
```
Expected: ImportError / 1 failed.
- [ ] **Step 3:** Implement `workers/void_workers/handlers/__init__.py`:
```python
from . import echo
REGISTRY = {
echo.NAME: echo.handle,
}
```
- [ ] **Step 4:** Implement `workers/void_workers/handlers/echo.py`:
```python
NAME = "echo"
def handle(job_data: dict) -> dict:
return {"pong": job_data.get("ping", 0)}
```
- [ ] **Step 5:** Implement `workers/void_workers/runner.py`:
```python
import signal
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from .boss import Boss
from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL
from .handlers import REGISTRY
from .log import log
class Runner:
def __init__(self, dsn=None, handlers=None):
self.boss = Boss(dsn or DATABASE_URL)
self.handlers = handlers or {
name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY
}
self._stop = threading.Event()
def _process_one(self, queue):
job = self.boss.claim(queue)
if not job: return False
log.info("job_claimed", job_id=str(job["id"]), name=queue)
handler = REGISTRY[queue]
try:
out = handler(job["data"] or {})
self.boss.complete(job["id"], out or {})
log.info("job_completed", job_id=str(job["id"]), name=queue)
except Exception as e:
log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e))
self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)})
return True
def _loop(self, queue):
interval = POLL_INTERVAL_MS / 1000.0
while not self._stop.is_set():
try:
did = self._process_one(queue)
except Exception as e:
log.error("loop_error", queue=queue, err=str(e))
did = False
if not did:
self._stop.wait(interval)
def run(self, once=False):
if once:
for q in self.handlers:
self._process_one(q)
return
executor = ThreadPoolExecutor(max_workers=sum(h["concurrency"] for h in self.handlers.values()))
for q, cfg in self.handlers.items():
for _ in range(cfg["concurrency"]):
executor.submit(self._loop, q)
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda *_: self._stop.set())
while not self._stop.is_set():
time.sleep(0.5)
executor.shutdown(wait=True)
if __name__ == "__main__":
Runner().run()
```
- [ ] **Step 6:** Run green:
```bash
DATABASE_URL=... pytest tests/test_echo.py -v
```
Expected: 1 passed.
- [ ] **Step 7: Commit.**
```bash
cd /project/src/void-v2
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/echo.py workers/void_workers/runner.py workers/tests/test_echo.py
git commit -m "feat(workers): runner loop + echo handler"
```
### Task A4: systemd unit + push-workers.sh
**Files:**
- Create: `deploy/void-workers.service`
- Create: `deploy/push-workers.sh`
- Modify: `deploy/README.md` (append workers bootstrap section)
- [ ] **Step 1:** Create `deploy/void-workers.service`:
```ini
[Unit]
Description=Void 2.0 workers
After=network-online.target void-server.service
Wants=network-online.target
[Service]
Type=simple
User=voidworkers
WorkingDirectory=/opt/void-workers
EnvironmentFile=/opt/void-workers/.env
ExecStart=/opt/void-workers/venv/bin/python -m void_workers.runner
Restart=on-failure
RestartSec=5
StandardOutput=journal
StandardError=journal
MemoryMax=6G
[Install]
WantedBy=multi-user.target
```
- [ ] **Step 2:** Create `deploy/push-workers.sh`:
```bash
#!/usr/bin/env bash
set -euo pipefail
# Push Python void-workers source to CT 311 and restart the service.
# Run from /project/src/void-v2.
TARGET=${TARGET:-root@192.168.1.13}
REMOTE_DIR=${REMOTE_DIR:-/opt/void-workers}
rsync -avz --delete \
--exclude .venv \
--exclude __pycache__ \
--exclude '*.egg-info' \
--exclude tests \
workers/ "$TARGET:$REMOTE_DIR/"
ssh "$TARGET" "
cd $REMOTE_DIR
if [ ! -d venv ]; then
python3.12 -m venv venv
fi
. venv/bin/activate
pip install --quiet --upgrade pip
pip install --quiet -e '.[all]'
systemctl restart void-workers
"
echo "Workers deployed."
```
Make it executable:
```bash
chmod +x deploy/push-workers.sh
```
- [ ] **Step 3:** Extend `deploy/README.md` — append the following section:
```markdown
## Workers (Python void-workers — Plan 4+)
Runs alongside void-server as a second systemd unit.
One-time setup on CT 311:
```bash
apt install -y python3.12 python3.12-venv python3-pip \
ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils
useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers
mkdir -p /opt/void-workers /var/lib/void/whisper-models
chown voidworkers: /opt/void-workers
chown -R voidworkers: /var/lib/void/whisper-models
# voidworkers needs to read the shared blob store
usermod -aG void voidworkers
install -m 644 deploy/void-workers.service /etc/systemd/system/
systemctl daemon-reload
systemctl enable void-workers
# /opt/void-workers/.env — see plan §Deploy delta
```
Deploy after edits:
```bash
cd /project/src/void-v2
./deploy/push-workers.sh
```
```
- [ ] **Step 4:** Run the existing Node test suite — must still be green (we have not modified anything Node touches yet).
```bash
cd /project/src/void-v2
npx vitest run
```
- [ ] **Step 5: Commit.**
```bash
git add deploy/void-workers.service deploy/push-workers.sh deploy/README.md
git commit -m "feat(workers): systemd unit + push-workers.sh"
```
### Task A5: Phase A close — memory checkpoint
- [ ] **Step 1:** Update memory `project_void_v2_execution.md`: mark "Plan 4 Phase A complete: harness + echo running locally; void-workers.service installed on CT 311 pending real deps (Phase B+)".
- [ ] **Step 2:** Note the boss.py contract decision (psycopg + native SQL) so future plans don't re-litigate it.
---
## Phase B — `extract.pdf` + `extract.image`
### Task B1: Install system deps on the dev box
These deps support local pytest runs. The same packages will be installed on CT 311 when Phase A's systemd unit lands there.
- [ ] **Step 1:** Verify deps locally (we are running on Ubuntu 24.04 LXC per CLAUDE.md):
```bash
command -v pdftotext && command -v tesseract && command -v ffmpeg || echo "missing deps"
apt list --installed 2>/dev/null | grep -E "poppler|tesseract|ffmpeg" | head -5
```
- [ ] **Step 2:** If any are missing, install:
```bash
apt install -y poppler-utils tesseract-ocr tesseract-ocr-eng ffmpeg
```
- [ ] **Step 3:** Install pdf/image Python extras into the workers venv:
```bash
cd /project/src/void-v2/workers
. .venv/bin/activate
pip install -e '.[pdf,image,test]'
python -c "import pdfplumber, pytesseract; print('ok')"
```
- [ ] **Step 4:** Add a `tests/fixtures/` README explaining how fixtures were generated (so we can regenerate them):
Create `workers/tests/fixtures/README.md`:
```markdown
# Test fixtures
- `born_digital.pdf` — generated by
`pdftotext -version 2>&1 | head -1 > /tmp/b.txt && enscript -p /tmp/b.ps /tmp/b.txt && ps2pdf /tmp/b.ps born_digital.pdf`
(or any tool that produces a PDF with embedded text). Must contain
the string "void-workers".
- `scanned.pdf` — `convert -density 200 eng_text.png scanned.pdf` (PDF
that is a single page of an image — `pdftotext` will yield nothing,
Tesseract fallback must pick it up).
- `eng_text.png` — a PNG with a clear English sentence on white
background. Must contain the string "blackflame".
```
Generate them now:
```bash
cd /project/src/void-v2/workers/tests/fixtures
# eng_text.png: render text with ImageMagick (apt: imagemagick)
apt install -y imagemagick ghostscript
convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 36 \
-fill black -annotate +50+100 "blackflame palette" eng_text.png
# born_digital.pdf: simple TXT → PDF
echo "void-workers proof-of-life document" > /tmp/b.txt
enscript -p /tmp/b.ps /tmp/b.txt 2>/dev/null || echo "enscript missing — using ImageMagick"
if [ -f /tmp/b.ps ]; then
ps2pdf /tmp/b.ps born_digital.pdf
else
convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 24 \
-fill black -annotate +50+100 "void-workers proof-of-life" born_digital.pdf
# NOTE: this is a single-image PDF too — pdftotext will return empty.
# Override: use poppler's pdftohtml tools or create with text.
fi
# scanned.pdf: PNG → PDF
convert -density 200 eng_text.png scanned.pdf
ls -la
```
If `enscript` is not available, install or use another text-PDF tool. The key fixture invariant is that `pdftotext born_digital.pdf -` returns non-empty text.
- [ ] **Step 5: Commit fixtures.**
```bash
cd /project/src/void-v2
git add workers/tests/fixtures
git commit -m "test(workers): pdf/image test fixtures"
```
### Task B2: `extract.pdf` worker — pdftotext path
**Files:**
- Create: `workers/void_workers/handlers/pdf.py`
- Modify: `workers/void_workers/handlers/__init__.py`
- Create: `workers/void_workers/repo.py`
- Create: `workers/tests/test_pdf.py`
- [ ] **Step 1:** Write the failing test:
```python
# workers/tests/test_pdf.py
import json
import os
import psycopg
from pathlib import Path
from void_workers.handlers.pdf import handle as handle_pdf
from void_workers.config import DATABASE_URL
FIXTURES = Path(__file__).parent / "fixtures"
def _seed_space_and_ref(conn, blob_path, kind="pdf"):
"""Bring up the Void schema (matches migrate.js) just enough to insert a ref."""
sp = conn.execute(
"INSERT INTO spaces(slug, name) VALUES('plan4-tests', 'P4') "
"ON CONFLICT (slug) DO UPDATE SET name=EXCLUDED.name RETURNING id"
).fetchone()[0]
ref = conn.execute(
"INSERT INTO refs(space_id, kind, source_url, title, blob_path) "
"VALUES(%s, %s, NULL, 'fixture', %s) RETURNING id",
(sp, kind, str(blob_path))
).fetchone()[0]
return sp, ref
def test_pdf_born_digital_uses_pdftotext(conn):
"""A PDF with embedded text gets pdftotext-extracted body."""
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
# Apply Void migrations against the conn
_run_node_migrations()
blob = FIXTURES / "born_digital.pdf"
sp, ref = _seed_space_and_ref(conn, blob)
out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)})
assert out["ref_id"] == str(ref)
assert out["chars"] > 0
row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
assert "void-workers" in (row[0] or "").lower()
def _run_node_migrations():
"""Shell to Node's migrate runner — same DB."""
import subprocess
subprocess.run(
["node", "lib/db/migrate.js", "up"],
cwd="/project/src/void-v2",
check=True
)
```
- [ ] **Step 2:** Run red:
```bash
cd /project/src/void-v2/workers
. .venv/bin/activate
DATABASE_URL=... pytest tests/test_pdf.py -v
```
Expected: ImportError on pdf module.
- [ ] **Step 3:** Implement `workers/void_workers/repo.py`:
```python
import json
import psycopg
from psycopg.rows import dict_row
from .config import DATABASE_URL
def _conn():
return psycopg.connect(DATABASE_URL, autocommit=True, row_factory=dict_row)
def update_ref(ref_id, *, body_text=None, metadata_patch=None):
"""UPDATE refs ... + emit audit_log row with actor_kind='worker'."""
with _conn() as conn:
before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone()
sets, args = [], []
if body_text is not None:
sets.append("body_text=%s"); args.append(body_text)
if metadata_patch is not None:
sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
args.append(json.dumps(metadata_patch))
if not sets: return before
sets.append("updated_at=now()")
args.append(ref_id)
after = conn.execute(
f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *",
args
).fetchone()
# audit log — match Node's audit.js diff shape (simple)
diff = {}
if body_text is not None and (before or {}).get("body_text") != body_text:
diff["body_text"] = {"before": "(redacted)", "after_len": len(body_text)}
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'ref', %s, 'update', %s)
""", (ref_id, json.dumps({"kind": "update", "changes": diff})))
return after
```
- [ ] **Step 4:** Implement `workers/void_workers/handlers/pdf.py`:
```python
import subprocess
from .. import repo
NAME = "extract.pdf"
def _pdftotext(blob_path):
return subprocess.check_output(
["pdftotext", "-layout", blob_path, "-"],
timeout=120
).decode("utf-8", errors="replace")
def handle(job_data: dict) -> dict:
ref_id = job_data["ref_id"]
blob_path = job_data["blob_path"]
text = _pdftotext(blob_path).strip()
method = "pdftotext"
# Fallback (Tesseract OCR) lands in Task B3.
body_text = text[:200_000]
repo.update_ref(ref_id, body_text=body_text,
metadata_patch={"extract": {"method": method, "chars": len(body_text)}})
return {"ref_id": ref_id, "chars": len(body_text), "method": method}
```
- [ ] **Step 5:** Add `pdf.handle` to the registry — edit `workers/void_workers/handlers/__init__.py`:
```python
from . import echo, pdf
REGISTRY = {
echo.NAME: echo.handle,
pdf.NAME: pdf.handle,
}
```
- [ ] **Step 6:** Run green:
```bash
DATABASE_URL=... pytest tests/test_pdf.py::test_pdf_born_digital_uses_pdftotext -v
```
Expected: 1 passed.
- [ ] **Step 7: Commit.**
```bash
cd /project/src/void-v2
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/pdf.py workers/void_workers/repo.py workers/tests/test_pdf.py
git commit -m "feat(workers): extract.pdf via pdftotext"
```
### Task B3: PDF Tesseract fallback for scanned pages
**Files:**
- Modify: `workers/void_workers/handlers/pdf.py`
- Modify: `workers/tests/test_pdf.py`
- [ ] **Step 1:** Add the failing test (append to `test_pdf.py`):
```python
def test_pdf_scanned_falls_back_to_tesseract(conn):
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
_run_node_migrations()
blob = FIXTURES / "scanned.pdf"
sp, ref = _seed_space_and_ref(conn, blob)
out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)})
assert out["method"] == "tesseract"
row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
assert "blackflame" in (row[0] or "").lower()
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Implement the fallback in `workers/void_workers/handlers/pdf.py`:
```python
import io
import subprocess
import tempfile
from pathlib import Path
from PIL import Image
import pytesseract
from .. import repo
NAME = "extract.pdf"
FALLBACK_THRESHOLD = 200 # chars
def _pdftotext(blob_path):
return subprocess.check_output(
["pdftotext", "-layout", blob_path, "-"], timeout=120
).decode("utf-8", errors="replace")
def _ocr_pdf(blob_path):
"""Rasterize each page with pdftoppm, OCR each with Tesseract."""
with tempfile.TemporaryDirectory() as tmp:
subprocess.run(
["pdftoppm", "-r", "200", "-png", blob_path, f"{tmp}/p"],
check=True, timeout=300
)
pages = sorted(Path(tmp).glob("p-*.png"))
parts = []
for p in pages:
img = Image.open(p)
parts.append(pytesseract.image_to_string(img, lang="eng"))
return "\n".join(parts)
def handle(job_data: dict) -> dict:
ref_id = job_data["ref_id"]
blob_path = job_data["blob_path"]
method = "pdftotext"
text = _pdftotext(blob_path).strip()
if len(text) < FALLBACK_THRESHOLD:
method = "tesseract"
text = _ocr_pdf(blob_path).strip()
body_text = text[:200_000]
repo.update_ref(ref_id, body_text=body_text,
metadata_patch={"extract": {"method": method, "chars": len(body_text)}})
return {"ref_id": ref_id, "chars": len(body_text), "method": method}
```
- [ ] **Step 4:** Run green:
```bash
DATABASE_URL=... pytest tests/test_pdf.py -v
```
Expected: 2 passed.
- [ ] **Step 5: Commit.**
```bash
git add workers/void_workers/handlers/pdf.py workers/tests/test_pdf.py
git commit -m "feat(workers): pdf Tesseract fallback for scanned pages"
```
### Task B4: `extract.image` worker
**Files:**
- Create: `workers/void_workers/handlers/image.py`
- Modify: `workers/void_workers/handlers/__init__.py`
- Create: `workers/tests/test_image.py`
- [ ] **Step 1:** Write the failing test `workers/tests/test_image.py`:
```python
import os
from pathlib import Path
from void_workers.handlers.image import handle as handle_image
FIXTURES = Path(__file__).parent / "fixtures"
def test_image_ocr(conn):
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
import subprocess
subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
sp = conn.execute(
"INSERT INTO spaces(slug, name) VALUES('plan4-img', 'I') RETURNING id"
).fetchone()[0]
blob = FIXTURES / "eng_text.png"
ref = conn.execute(
"INSERT INTO refs(space_id, kind, blob_path) VALUES(%s, 'image', %s) RETURNING id",
(sp, str(blob))
).fetchone()[0]
out = handle_image({"ref_id": str(ref), "blob_path": str(blob)})
assert out["chars"] > 0
row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
assert "blackflame" in (row[0] or "").lower()
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Implement `workers/void_workers/handlers/image.py`:
```python
from PIL import Image
import pytesseract
from .. import repo
NAME = "extract.image"
def handle(job_data: dict) -> dict:
ref_id = job_data["ref_id"]
blob_path = job_data["blob_path"]
text = pytesseract.image_to_string(Image.open(blob_path), lang="eng").strip()
body_text = text[:200_000]
repo.update_ref(ref_id, body_text=body_text,
metadata_patch={"extract": {"method": "tesseract", "chars": len(body_text)}})
return {"ref_id": ref_id, "chars": len(body_text)}
```
- [ ] **Step 4:** Register — edit `workers/void_workers/handlers/__init__.py`:
```python
from . import echo, pdf, image
REGISTRY = {
echo.NAME: echo.handle,
pdf.NAME: pdf.handle,
image.NAME: image.handle,
}
```
- [ ] **Step 5:** Run green.
- [ ] **Step 6: Commit.**
```bash
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/image.py workers/tests/test_image.py
git commit -m "feat(workers): extract.image via Tesseract"
```
### Task B5: Node side — blob worker enqueues extract jobs
**Files:**
- Modify: `lib/jobs/workers/blob.js`
- Modify: `tests/jobs/workers/blob.test.js`
- [ ] **Step 1:** Extend the existing blob.test.js with a new assertion. Append:
```js
it('enqueues extract.pdf after creating a pdf ref', async () => {
const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null });
const upTmp = path.join(tmpRoot, 'doc.tmp');
await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...'));
const id = await queue.enqueue('ingest.blob', {
space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf'
});
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js');
const followUp = (await jobsRepo.list({ name: 'extract.pdf' })).find(j => j.data?.blob_path);
expect(followUp).toBeTruthy();
});
it('enqueues extract.image after creating an image ref', async () => {
const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null });
const upTmp = path.join(tmpRoot, 'pic.tmp');
await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47]));
const id = await queue.enqueue('ingest.blob', {
space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png'
});
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js');
const followUp = (await jobsRepo.list({ name: 'extract.image' })).find(j => j.data?.blob_path);
expect(followUp).toBeTruthy();
});
```
- [ ] **Step 2:** Run red:
```bash
cd /project/src/void-v2
npx vitest run tests/jobs/workers/blob.test.js
```
Expected: 2 new tests fail (no extract.* enqueue yet).
- [ ] **Step 3:** Modify `lib/jobs/workers/blob.js` to enqueue the follow-up job:
```js
// existing imports
import * as queue from '../queue.js';
// ... inside handler, after `const row = await refs.create(...)`:
if (kind === 'pdf') {
await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path });
} else if (kind === 'image') {
await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path });
}
return { ref_id: row.id, sha };
```
(Apply the exact edit at the `return { ref_id: row.id, sha };` site.)
- [ ] **Step 4:** Run green:
```bash
npx vitest run tests/jobs/workers/blob.test.js
```
Expected: 4 passed.
- [ ] **Step 5:** Run the full suite — must still be green.
```bash
npx vitest run
```
- [ ] **Step 6: Commit.**
```bash
git add lib/jobs/workers/blob.js tests/jobs/workers/blob.test.js
git commit -m "feat(jobs): blob worker fans out to extract.pdf / extract.image"
```
### Task B6: Phase B close — memory + full suite
- [ ] **Step 1:** `npx vitest run` — full Node suite green.
- [ ] **Step 2:** `cd workers && DATABASE_URL=... pytest -v` — full Python suite green.
- [ ] **Step 3:** Update memory: "Plan 4 Phase B complete: extract.pdf + extract.image workers; Node blob worker fans out."
---
## Phase C — `ingest.video` + CT 311 resize + GPU passthrough
### Task C1: Snapshot + CT 311 resize + GPU passthrough
User pre-authorized this 2026-06-01.
- [ ] **Step 1:** Snapshot CT 310 + CT 311.
```bash
SNAP=plan4_pre_resize_$(date +%Y%m%d_%H%M)
ssh root@192.168.1.124 "pct snapshot 310 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" &
ssh root@192.168.1.124 "pct snapshot 311 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" &
wait
ssh root@192.168.1.124 "pct listsnapshot 310 | tail -3; echo ---; pct listsnapshot 311 | tail -3"
```
- [ ] **Step 2:** Resize CT 311.
```bash
ssh root@192.168.1.124 "pct set 311 -memory 8192 -cores 6"
ssh root@192.168.1.124 "pct config 311 | grep -E 'cores|memory'"
```
- [ ] **Step 3:** Add GPU device-node passthrough lines (copy from CT 102).
```bash
ssh root@192.168.1.124 "
pct set 311 -dev0 /dev/nvidia0,gid=44
pct set 311 -dev1 /dev/nvidiactl,gid=44
pct set 311 -dev2 /dev/nvidia-uvm,gid=44
pct set 311 -dev3 /dev/nvidia-uvm-tools,gid=44
pct set 311 -dev4 /dev/nvidia-caps/nvidia-cap1,gid=44
pct set 311 -dev5 /dev/nvidia-caps/nvidia-cap2,gid=44
pct config 311 | grep dev
"
```
- [ ] **Step 4:** Restart CT 311.
```bash
ssh root@192.168.1.124 "pct reboot 311"
sleep 10
ssh root@192.168.1.13 "ls -la /dev/nvidia* 2>&1 | head; nvidia-smi --query-gpu=name --format=csv,noheader 2>&1 || echo 'nvidia-smi missing (no driver in CT) — expected'"
curl -s http://192.168.1.13:3000/health
```
Expected: device nodes visible, health returns alpha-3.
- [ ] **Step 5: Commit (config-as-comment).** No code change yet — we record the infra step in the next phase's commit message.
### Task C2: Install ffmpeg + faster-whisper deps on CT 311
- [ ] **Step 1:** Install system deps:
```bash
ssh root@192.168.1.13 "
apt update
apt install -y python3.12 python3.12-venv python3-pip \
ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils \
nvidia-utils-535 nvidia-cuda-toolkit 2>&1 | tail -5
python3.12 --version
ffmpeg -version | head -1
tesseract --version 2>&1 | head -1
"
```
Note: `nvidia-cuda-toolkit` is for `ctranslate2`'s CUDA runtime; if you'd rather not pull the full toolkit (~1 GB), use the official `libcudnn8 libcublas12` runtime packages instead.
- [ ] **Step 2:** Create the voidworkers user + dirs on CT 311:
```bash
ssh root@192.168.1.13 "
id voidworkers >/dev/null 2>&1 || useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers
mkdir -p /opt/void-workers /var/lib/void/whisper-models
chown voidworkers: /opt/void-workers
chown -R voidworkers: /var/lib/void/whisper-models
usermod -aG void voidworkers
chmod -R g+rX /var/lib/void/blobs # ensure voidworkers (in void group) can read blobs
install -m 644 /opt/void-workers/.env 2>/dev/null || true
cat > /opt/void-workers/.env <<EOF
DATABASE_URL=$(grep ^DATABASE_URL= /opt/void-server/.env | cut -d= -f2-)
BLOB_ROOT=/var/lib/void/blobs
WHISPER_MODEL=small.en
WHISPER_CACHE=/var/lib/void/whisper-models
EOF
chmod 600 /opt/void-workers/.env
chown voidworkers: /opt/void-workers/.env
"
```
- [ ] **Step 3:** Copy the systemd unit:
```bash
scp /project/src/void-v2/deploy/void-workers.service root@192.168.1.13:/etc/systemd/system/void-workers.service
ssh root@192.168.1.13 "systemctl daemon-reload && systemctl enable void-workers"
```
(Don't start it yet — venv is not installed; `push-workers.sh` will set it up.)
- [ ] **Step 4: Initial push.**
```bash
cd /project/src/void-v2
./deploy/push-workers.sh
ssh root@192.168.1.13 "systemctl status void-workers --no-pager -n 5"
```
Expected: active (running), waiting for jobs.
- [ ] **Step 5:** Sanity — enqueue an echo via Node, observe Python claim it.
```bash
TOKEN=$(ssh root@192.168.1.13 "grep ^OWNER_TOKEN= /opt/void-server/.env | cut -d= -f2-")
# Manually insert an echo via psql since /api/capture doesn't enqueue 'echo':
DBPASS=$(ssh root@192.168.1.124 'cat /root/void2-db-pass.txt' | tr -d '\r\n' | sed 's/^DB_PASS=//')
PGPASSWORD="$DBPASS" psql -h 192.168.1.215 -U void -d void -c "INSERT INTO pgboss.job(name, data) VALUES('echo', '{\"ping\": 99}'::jsonb) RETURNING id"
sleep 3
PGPASSWORD="$DBPASS" psql -h 192.168.1.215 -U void -d void -c "SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1"
```
Expected: `completed | {"pong": 99}`.
- [ ] **Step 6: Commit deploy-side changes (none in repo from this task, but mark the milestone).**
No commit — this task is operational on CT 311. Note in memory.
### Task C3: Whisper model loader (`model.py`)
**Files:**
- Create: `workers/void_workers/model.py`
- Create: `workers/tests/test_model.py`
- [ ] **Step 1:** Failing test (uses mock):
```python
# workers/tests/test_model.py
from unittest.mock import patch, MagicMock
from void_workers import model
def test_model_returns_singleton(monkeypatch):
m = MagicMock()
monkeypatch.setattr(model, "_whisper_model", None)
with patch("void_workers.model.cuda_available", return_value=False):
with patch("faster_whisper.WhisperModel", return_value=m):
a = model.whisper_model()
b = model.whisper_model()
assert a is b
def test_transcribe_returns_joined_segments(monkeypatch):
seg1 = MagicMock(text=" Hello world ")
seg2 = MagicMock(text=" second line")
fake_model = MagicMock()
fake_model.transcribe.return_value = ([seg1, seg2], MagicMock())
monkeypatch.setattr(model, "_whisper_model", fake_model)
out = model.whisper_transcribe("/tmp/whatever.opus")
assert "Hello world" in out
assert "second line" in out
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Implement `workers/void_workers/model.py`:
```python
import os
from .log import log
_whisper_model = None
def cuda_available():
try:
import ctranslate2
return ctranslate2.get_cuda_device_count() > 0
except Exception as e:
log.info("ctranslate2_cuda_probe_failed", err=str(e))
return False
def whisper_model():
global _whisper_model
if _whisper_model is None:
from faster_whisper import WhisperModel
name = os.environ.get("WHISPER_MODEL", "small.en")
cache = os.environ.get("WHISPER_CACHE", "/var/lib/void/whisper-models")
device = "cuda" if cuda_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"
log.info("whisper_loading", model=name, device=device, compute_type=compute_type, cache=cache)
_whisper_model = WhisperModel(name, device=device, compute_type=compute_type, download_root=cache)
return _whisper_model
def whisper_transcribe(audio_path):
segments, _info = whisper_model().transcribe(audio_path, vad_filter=True)
return "\n".join(s.text.strip() for s in segments).strip()
```
- [ ] **Step 4:** Run green:
```bash
pytest tests/test_model.py -v
```
Expected: 2 passed.
- [ ] **Step 5: Commit.**
```bash
git add workers/void_workers/model.py workers/tests/test_model.py
git commit -m "feat(workers): whisper loader with CUDA detect + CPU fallback"
```
### Task C4: `ingest.video` worker (yt-dlp + transcribe + create ref)
**Files:**
- Create: `workers/void_workers/handlers/video.py`
- Modify: `workers/void_workers/handlers/__init__.py`
- Create: `workers/tests/test_video.py`
- Modify: `workers/void_workers/repo.py` — add `create_ref()`
- [ ] **Step 1:** Failing test (mocked yt-dlp + mocked Whisper):
```python
# workers/tests/test_video.py
from unittest.mock import patch, MagicMock
from void_workers.handlers.video import handle as handle_video
def test_video_creates_ref_with_transcript_and_metadata(conn, monkeypatch):
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
import subprocess
subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('plan4-vid', 'V') RETURNING id").fetchone()[0]
info = {
"title": "Sample video",
"description": "a description",
"duration": 90,
"uploader": "Channel",
"thumbnail": "https://i.ytimg.com/t.jpg"
}
with patch("void_workers.handlers.video._yt_dlp_info", return_value=info), \
patch("void_workers.handlers.video._yt_dlp_audio", return_value="/tmp/fake.opus"), \
patch("void_workers.handlers.video.whisper_transcribe", return_value="hello world transcript"), \
patch("os.unlink"):
out = handle_video({"space_id": str(sp), "url": "https://youtu.be/abc"})
assert "ref_id" in out
row = conn.execute("SELECT title, body_text, source_kind FROM refs WHERE id=%s", (out["ref_id"],)).fetchone()
assert row[0] == "Sample video"
assert "hello world" in row[1]
assert row[2] == "youtube"
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Implement `workers/void_workers/handlers/video.py`:
```python
import hashlib
import os
import subprocess
import tempfile
import json
from .. import repo
from ..model import whisper_transcribe
NAME = "ingest.video"
def _yt_dlp_info(url):
"""Returns dict of metadata, or None if yt-dlp could not extract."""
try:
out = subprocess.check_output(
["yt-dlp", "-J", "--no-warnings", "--no-playlist", url],
timeout=60
)
return json.loads(out)
except subprocess.CalledProcessError as e:
return None
def _yt_dlp_audio(url):
"""Downloads bestaudio to a temp .opus and returns the path."""
tmp_dir = tempfile.mkdtemp(prefix="void-yt-")
out_template = os.path.join(tmp_dir, "audio.%(ext)s")
subprocess.run(
["yt-dlp", "-x", "--audio-format", "opus", "-o", out_template,
"--no-warnings", "--no-playlist", url],
check=True, timeout=600
)
for f in os.listdir(tmp_dir):
if f.startswith("audio."):
return os.path.join(tmp_dir, f)
raise RuntimeError("yt-dlp produced no audio file")
def _idem(space_id, url):
return hashlib.sha256((space_id + "\x00" + url).encode()).hexdigest()
def _kind(url):
return "youtube" if ("youtube.com" in url or "youtu.be" in url) else "video"
def handle(job_data: dict) -> dict:
space_id = job_data["space_id"]
url = job_data["url"]
idem = _idem(space_id, url)
info = _yt_dlp_info(url)
if info is None:
return {"skipped": "yt-dlp"}
audio_path = _yt_dlp_audio(url)
try:
transcript = whisper_transcribe(audio_path)
finally:
try: os.unlink(audio_path)
except OSError: pass
ref_id = repo.create_ref({
"space_id": space_id,
"kind": "video",
"source_url": url,
"title": info.get("title") or url,
"summary": (info.get("description") or "")[:5000],
"body_text": transcript[:200_000],
"source_kind": _kind(url),
"external_id": idem,
"metadata": {
"duration_s": info.get("duration"),
"uploader": info.get("uploader"),
"thumbnail": info.get("thumbnail"),
"extract": {"method": "whisper", "chars": len(transcript)},
}
})
return {"ref_id": ref_id, "chars": len(transcript)}
```
- [ ] **Step 4:** Extend `workers/void_workers/repo.py` with `create_ref`:
```python
def create_ref(input_):
"""INSERT INTO refs(...) RETURNING id; emit audit_log."""
fields = ["space_id","kind","source_url","title","summary","body_text",
"blob_path","metadata","source_kind","external_id","captured_at"]
cols, vals = [], []
for f in fields:
if f in input_:
cols.append(f); vals.append(input_[f])
placeholders = ",".join(f"%s" for _ in cols)
sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
with _conn() as conn:
ref_id = conn.execute(sql, vals).fetchone()["id"]
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'ref', %s, 'create', %s)
""", (ref_id, json.dumps({"kind": "create"})))
return str(ref_id)
```
(Make sure `json` import is at the top of `repo.py` if not already there. Also note `metadata` must be `json.dumps`'d before insertion because we're passing a dict; tweak the field handling to JSON-encode dict/list values.)
Patch `create_ref` to JSON-encode jsonb fields:
```python
def _maybe_json(v):
return json.dumps(v) if isinstance(v, (dict, list)) else v
def create_ref(input_):
fields = ["space_id","kind","source_url","title","summary","body_text",
"blob_path","metadata","source_kind","external_id","captured_at"]
cols, vals = [], []
for f in fields:
if f in input_:
cols.append(f); vals.append(_maybe_json(input_[f]))
placeholders = ",".join(f"%s" for _ in cols)
sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
with _conn() as conn:
ref_id = conn.execute(sql, vals).fetchone()["id"]
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'ref', %s, 'create', %s)
""", (ref_id, json.dumps({"kind": "create"})))
return str(ref_id)
```
- [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`:
```python
from . import echo, pdf, image, video
REGISTRY = {
echo.NAME: echo.handle,
pdf.NAME: pdf.handle,
image.NAME: image.handle,
video.NAME: video.handle,
}
```
- [ ] **Step 6:** Run green:
```bash
pytest tests/test_video.py -v
```
Expected: 1 passed.
- [ ] **Step 7: Commit.**
```bash
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/video.py workers/void_workers/repo.py workers/tests/test_video.py
git commit -m "feat(workers): ingest.video via yt-dlp + Whisper"
```
### Task C5: Node — capture.js routes YouTube to `ingest.video`
**Files:**
- Modify: `lib/api/routes/capture.js`
- Modify: `tests/api/capture.test.js`
- [ ] **Step 1:** Failing test — append to `tests/api/capture.test.js`:
```js
it('POST /api/capture with YouTube URL enqueues ingest.video, not ingest.url', async () => {
const res = await request(app).post('/api/capture').set(ownerHeaders)
.send({ space_id: sp.id, url: 'https://youtu.be/abc' });
expect(res.status).toBe(202);
expect(res.body.job_id).toBeTruthy();
const { default: jobsRepo } = await import('../../lib/db/repos/jobs.js');
const rows = await jobsRepo.list({ name: 'ingest.video' });
expect(rows.find(r => r.id === res.body.job_id)).toBeTruthy();
const urlRows = await jobsRepo.list({ name: 'ingest.url' });
expect(urlRows.find(r => r.id === res.body.job_id)).toBeFalsy();
});
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Modify `lib/api/routes/capture.js` — replace the `POST /` handler's enqueue logic:
```js
// helper near the top
const VIDEO_HOST_RE = /(^|\.)(youtube\.com|youtu\.be|vimeo\.com)$/i;
function isVideoUrl(url) {
try { return VIDEO_HOST_RE.test(new URL(url).hostname); }
catch { return false; }
}
// inside router.post('/'), after computing idem + the existing-ref check:
const isVideo = isVideoUrl(url);
const job_name = isVideo ? 'ingest.video' : 'ingest.url';
const job_id = await queue.enqueue(job_name, { space_id, url });
res.status(202).json({ job_id, idempotency_key: idem });
```
- [ ] **Step 4:** Run green:
```bash
npx vitest run tests/api/capture.test.js
```
Expected: existing + new tests pass.
- [ ] **Step 5: Commit.**
```bash
git add lib/api/routes/capture.js tests/api/capture.test.js
git commit -m "feat(api): capture routes YouTube/Vimeo URLs to ingest.video"
```
### Task C6: Phase C close — snapshot + memory
- [ ] **Step 1:** Run full Node + Python test suites green.
```bash
cd /project/src/void-v2 && npx vitest run
cd workers && DATABASE_URL=... pytest -v
```
- [ ] **Step 2:** Snapshot CT 310 + 311 — `plan4_phase_c_<timestamp>`.
- [ ] **Step 3:** Update memory: "Plan 4 Phase C complete: Whisper + yt-dlp live; CT 311 resized 6c/8G + GPU passthrough; voidworkers.service running on CT 311."
---
## Phase D — Source-doc sync + version bump + completion doc
### Task D1: Python `safe_fetch` port
**Files:**
- Create: `workers/void_workers/safe_fetch.py`
- Create: `workers/tests/test_safe_fetch.py`
- [ ] **Step 1:** Failing test:
```python
# workers/tests/test_safe_fetch.py
import pytest
from void_workers.safe_fetch import safe_fetch, SafeFetchError
def test_rejects_file_scheme():
with pytest.raises(SafeFetchError):
safe_fetch("file:///etc/passwd")
def test_rejects_loopback():
with pytest.raises(SafeFetchError):
safe_fetch("http://127.0.0.1/x")
def test_rejects_rfc1918():
with pytest.raises(SafeFetchError):
safe_fetch("http://192.168.1.1/x")
def test_rejects_metadata_endpoint():
with pytest.raises(SafeFetchError):
safe_fetch("http://169.254.169.254/latest/")
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Implement `workers/void_workers/safe_fetch.py`:
```python
import socket
import ipaddress
import urllib.request
import os
from urllib.parse import urlparse
BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [
"0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8",
"172.16.0.0/12", "192.168.0.0/16",
"169.254.0.0/16", "100.64.0.0/10",
]]
class SafeFetchError(Exception):
pass
def _is_blocked(addr):
if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true":
return False
try:
ip = ipaddress.ip_address(addr)
except ValueError:
return True
if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
return True
if isinstance(ip, ipaddress.IPv4Address):
return any(ip in n for n in BLOCK_V4_NETS)
# IPv6: ULA + link-local
if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"):
return True
return False
def _resolve(host):
try:
infos = socket.getaddrinfo(host, None)
except socket.gaierror as e:
raise SafeFetchError(f"no DNS for {host}: {e}")
addrs = list({i[4][0] for i in infos})
for a in addrs:
if _is_blocked(a):
raise SafeFetchError(f"{host} resolves to blocked address {a}")
if not addrs:
raise SafeFetchError(f"no addresses for {host}")
return addrs[0]
def safe_fetch(url, *, headers=None, timeout=15, max_hops=5):
current = url
for hop in range(max_hops + 1):
u = urlparse(current)
if u.scheme not in ("http", "https"):
raise SafeFetchError(f"unsupported scheme {u.scheme}")
host = u.hostname
# If host is a literal IP, validate; else resolve + validate
try:
ipaddress.ip_address(host)
if _is_blocked(host):
raise SafeFetchError(f"blocked literal IP {host}")
except ValueError:
_resolve(host)
req = urllib.request.Request(current, headers=headers or {})
try:
opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler)
with opener.open(req, timeout=timeout) as r:
return r.read()
except urllib.error.HTTPError as e:
if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops:
current = e.headers["Location"]
continue
raise
raise SafeFetchError(f"too many redirects ({max_hops})")
```
- [ ] **Step 4:** Run green.
- [ ] **Step 5: Commit.**
```bash
git add workers/void_workers/safe_fetch.py workers/tests/test_safe_fetch.py
git commit -m "feat(workers): safe_fetch (Python port)"
```
### Task D2: `sync.source_doc` worker
**Files:**
- Create: `workers/void_workers/handlers/sourcedoc.py`
- Modify: `workers/void_workers/handlers/__init__.py`
- Create: `workers/tests/test_sourcedoc.py`
- Modify: `workers/void_workers/repo.py` — add `get_source_doc`, `update_source_doc`
- [ ] **Step 1:** Failing test:
```python
# workers/tests/test_sourcedoc.py
from unittest.mock import patch
from void_workers.handlers.sourcedoc import handle as handle_sd
def test_sourcedoc_updates_body_text(conn):
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
import subprocess
subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('sd', 'SD') RETURNING id").fetchone()[0]
res = conn.execute(
"INSERT INTO resources(space_id, slug, name, runtime_type) "
"VALUES(%s, 'r', 'R', 'lxc') RETURNING id", (sp,)
).fetchone()[0]
sd = conn.execute(
"INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) "
"VALUES(%s, 'doc', 'https://example.com/r', 'url') RETURNING id",
(res,)
).fetchone()[0]
with patch("void_workers.handlers.sourcedoc.safe_fetch",
return_value=b"hello world doc body"):
out = handle_sd({"source_doc_id": str(sd)})
assert out.get("updated") is True
row = conn.execute("SELECT body_text FROM source_docs WHERE id=%s", (sd,)).fetchone()
assert "hello world" in (row[0] or "")
```
- [ ] **Step 2:** Run red.
- [ ] **Step 3:** Extend `workers/void_workers/repo.py` with the two helpers:
```python
def get_source_doc(source_doc_id):
with _conn() as conn:
return conn.execute(
"SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
).fetchone()
def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None):
with _conn() as conn:
before = conn.execute(
"SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
).fetchone()
sets, args = [], []
if body_text is not None:
sets.append("body_text=%s"); args.append(body_text)
if last_synced is not None:
sets.append("last_synced=%s"); args.append(last_synced)
if metadata_patch is not None:
sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
args.append(json.dumps(metadata_patch))
if not sets: return before
sets.append("updated_at=now()")
args.append(source_doc_id)
after = conn.execute(
f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args
).fetchone()
conn.execute("""
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
VALUES('worker', NULL, 'source_doc', %s, 'update', %s)
""", (source_doc_id, json.dumps({"kind":"update","changes":{"body_text":"updated" if body_text is not None else None}})))
return after
```
- [ ] **Step 4:** Implement `workers/void_workers/handlers/sourcedoc.py`:
```python
import hashlib
from datetime import datetime, timezone
from .. import repo
from ..safe_fetch import safe_fetch
NAME = "sync.source_doc"
def _sha(data):
return hashlib.sha256(data).hexdigest()
def handle(job_data: dict) -> dict:
sd_id = job_data["source_doc_id"]
doc = repo.get_source_doc(sd_id)
if not doc:
return {"skipped": "gone"}
body = safe_fetch(doc["upstream_url"], headers={"User-Agent": "void-ingest/2.0"})
new_sha = _sha(body)
old_sha = ((doc.get("metadata") or {}).get("body_sha"))
now = datetime.now(timezone.utc)
if new_sha == old_sha:
repo.update_source_doc(sd_id, last_synced=now)
return {"unchanged": True}
text = body.decode("utf-8", errors="replace")[:1_000_000]
repo.update_source_doc(
sd_id,
body_text=text,
last_synced=now,
metadata_patch={"body_sha": new_sha}
)
return {"updated": True, "chars": len(text)}
```
- [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`:
```python
from . import echo, pdf, image, video, sourcedoc
REGISTRY = {
echo.NAME: echo.handle,
pdf.NAME: pdf.handle,
image.NAME: image.handle,
video.NAME: video.handle,
sourcedoc.NAME: sourcedoc.handle,
}
```
- [ ] **Step 6:** Run green.
- [ ] **Step 7: Commit.**
```bash
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/sourcedoc.py workers/void_workers/repo.py workers/tests/test_sourcedoc.py
git commit -m "feat(workers): sync.source_doc with sha256 diff"
```
### Task D3: Node-side cron — `lib/cron/sync_source_docs.js`
**Files:**
- Modify: `package.json` — add `node-cron`.
- Create: `lib/cron/index.js`
- Create: `lib/cron/sync_source_docs.js`
- Modify: `server.js` — start the cron in the CLI gate.
- Create: `tests/cron/sync_source_docs.test.js`
- [ ] **Step 1:** Add dep:
```bash
cd /project/src/void-v2 && npm i node-cron@^3
```
- [ ] **Step 2:** Failing test `tests/cron/sync_source_docs.test.js`:
```js
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { resetDb } from '../helpers/db.js';
import { migrateUp } from '../../lib/db/migrate.js';
import { stopBoss } from '../helpers/boss.js';
import { pool } from '../../lib/db/pool.js';
import * as queue from '../../lib/jobs/queue.js';
import { registerWorkers } from '../../lib/jobs/index.js';
import { runSync } from '../../lib/cron/sync_source_docs.js';
import * as jobs from '../../lib/db/repos/jobs.js';
beforeEach(async () => {
await resetDb(); await migrateUp();
await queue.start(); await registerWorkers();
});
afterEach(async () => { await stopBoss(); });
describe('cron/sync_source_docs.runSync', () => {
it('enqueues sync.source_doc for each url-synced row', async () => {
const sp = (await pool.query(
`INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id`
)).rows[0].id;
const res = (await pool.query(
`INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`,
[sp]
)).rows[0].id;
await pool.query(
`INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`,
[res]
);
const enqueued = await runSync();
expect(enqueued).toBe(1);
const queued = await jobs.list({ name: 'sync.source_doc' });
expect(queued.length).toBe(1);
});
});
```
- [ ] **Step 3:** Run red.
- [ ] **Step 4:** Implement `lib/cron/sync_source_docs.js`:
```js
import { pool } from '../db/pool.js';
import * as queue from '../jobs/queue.js';
import { log } from '../log.js';
export async function runSync() {
const { rows } = await pool.query(
`SELECT id FROM source_docs WHERE sync_source = 'url'`
);
for (const r of rows) {
await queue.enqueue('sync.source_doc', { source_doc_id: r.id });
}
return rows.length;
}
```
- [ ] **Step 5:** Implement `lib/cron/index.js`:
```js
import cron from 'node-cron';
import { runSync } from './sync_source_docs.js';
import { log } from '../log.js';
export function startCron() {
// Daily at 03:00 local time
cron.schedule('0 3 * * *', async () => {
try {
const n = await runSync();
log.info({ enqueued: n }, 'cron sync.source_doc complete');
} catch (e) {
log.error({ err: e }, 'cron sync.source_doc failed');
}
});
log.info('cron started');
}
```
- [ ] **Step 6:** Wire into `server.js` — extend the CLI gate:
```js
// additions
import { startCron } from './lib/cron/index.js';
// Inside the `if (import.meta.url === ...)` block, after registerWorkers:
startCron();
```
- [ ] **Step 7:** Run green:
```bash
npx vitest run tests/cron/sync_source_docs.test.js
```
- [ ] **Step 8: Commit.**
```bash
git add package.json package-lock.json lib/cron/index.js lib/cron/sync_source_docs.js server.js tests/cron/sync_source_docs.test.js
git commit -m "feat(cron): daily sync.source_doc enqueue"
```
### Task D4: Version bump + CHANGELOG + completion doc
**Files:**
- Modify: `package.json` → `2.0.0-alpha.4`
- Modify: `server.js` → `VERSION = '2.0.0-alpha.4'`
- Modify: `tests/server.test.js` → assert `2.0.0-alpha.4`
- Modify: `CHANGELOG.md` — append `[2.0.0-alpha.4]` entry
- Create: `docs/plan-4-complete.md`
- [ ] **Step 1:** Bump version literals (three files). Use Edit.
- [ ] **Step 2:** Append CHANGELOG entry:
```markdown
## [2.0.0-alpha.4] — 2026-06-NN
### Added (Plan 4: Python void-workers)
- `void-workers.service` — Python 3.12 service alongside `void-server`
on CT 311. psycopg-based pg-boss client matches Node's claim/finish
semantics via `SELECT ... FOR UPDATE SKIP LOCKED`.
- **`extract.pdf`** — pdftotext first; per-page Tesseract OCR fallback
when extraction yields < 200 chars.
- **`extract.image`** — Tesseract OCR with English data.
- **`ingest.video`** — yt-dlp metadata + audio extraction + faster-whisper
transcription. CUDA at startup, CPU fallback on HA failover to Z3.
YouTube / youtu.be / Vimeo URLs route through `/api/capture` to
`ingest.video` instead of `ingest.url`.
- **`sync.source_doc`** — fetch upstream + sha256 diff + update
`body_text`. Node cron enqueues daily at 03:00.
- **Node `blob.js`** fans out to `extract.pdf` / `extract.image` after
creating PDF / image refs.
- **CT 311 infrastructure**: resized to 6 cores / 8 GB RAM, GPU device
nodes passed through (shared with CT 102's Ollama).
```
- [ ] **Step 3:** Create `docs/plan-4-complete.md` (use the Plan 3 doc as a template — sections: Date / Version / Tests / Commits / Snapshots / Scope per phase / Security findings handled / Open items / What's left after Plan 4).
- [ ] **Step 4:** Run full test suites green (Node + Python).
```bash
cd /project/src/void-v2 && npx vitest run
cd workers && DATABASE_URL=... pytest -v
```
- [ ] **Step 5: Commit.**
```bash
git add package.json server.js tests/server.test.js CHANGELOG.md docs/plan-4-complete.md
git commit -m "chore: version 2.0.0-alpha.4 + changelog + plan-4 completion doc"
```
### Task D5: Plan 4 close — snapshot + memory
- [ ] **Step 1:** Snapshot CT 310 + 311 — `plan4_complete_<timestamp>`.
- [ ] **Step 2:** Update memory `project_void_v2_execution.md` — mark Plan 4 complete; note alpha-4 source-built but not yet deployed (deploy is a separate user-OK decision per the standing rule).
---
## Spec coverage check
Every Plan 4 spec section maps to at least one task:
| Spec § | Task(s) |
|---|---|
| Phase A harness | A1, A2, A3, A4 |
| Phase B PDF + image | B1, B2, B3, B4, B5 |
| Phase C Whisper + yt-dlp + GPU | C1, C2, C3, C4, C5 |
| Phase D source-doc sync + version | D1, D2, D3, D4 |
| `safe_fetch` Python port | D1 |
| Concurrency env config | A1 (`config.py`) — applies across handlers |
| systemd `MemoryMax=6G` | A4 (`void-workers.service`) |
| CT 311 resize + GPU passthrough | C1 |
| YouTube routing in capture.js | C5 |
| `blob.js` fans out | B5 |
| Repo audit shim (`actor_kind='worker'`) | B2 (`repo.update_ref`), C4 (`repo.create_ref`), D2 (`repo.update_source_doc`) |
## Type & name consistency
- Worker name strings: `echo`, `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc` — exact same strings used in `NAME` constants, registry keys, Node enqueue calls, and tests.
- Handler signature: `def handle(job_data: dict) -> dict` everywhere.
- Job payload shapes:
- `extract.pdf` / `extract.image`: `{ ref_id, blob_path }` (both producers Node-side use these keys; consumer reads same keys).
- `ingest.video`: `{ space_id, url }`.
- `sync.source_doc`: `{ source_doc_id }` (Node cron and Python worker both use this exact key).
- `repo.update_ref` / `repo.create_ref` / `repo.update_source_doc` / `repo.get_source_doc` — same names across all tasks that touch them.