Files
Void-Homelab/docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md
root c4663992ec docs: Plan 4 implementation plan
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:39:55 +10:00

72 KiB

Void 2.0 — Plan 4: Python void-workers (heavy ML ingest)

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Stand up the Python void-workers service alongside the existing Node void-server on CT 311. Workers claim jobs from the same pg-boss queue via native SQL (SELECT ... FOR UPDATE SKIP LOCKED) and process four kinds: extract.pdf, extract.image, ingest.video, sync.source_doc. Whisper detects CUDA at startup and falls back to CPU when the HA replica is running on Z3 (no GPU).

Architecture: Single CT, two systemd units (void-server.service + void-workers.service). Shared blob store at /var/lib/void/blobs. Shared Postgres at CT 310. Python venv at /opt/void-workers/venv. Per-kind handlers run as threads inside one Python process; concurrency configured via env. Node-side hooks: capture.js routes YouTube URLs to ingest.video; blob worker enqueues extract.pdf/extract.image based on kind.

Tech Stack: Python 3.12, psycopg 3, pdftotext + Tesseract + Poppler + Pillow, faster-whisper (CTranslate2 backend), yt-dlp, ffmpeg, structlog, pytest, vitest (existing), node-cron (new Node dep for Phase D).

Spec: docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md.


Out of scope (Plan 5+)

  • MCP server surface, Companion chat (Plan 5).
  • Sacred Valley widgets (Plan 6).
  • Speaker diarization, per-language Tesseract data, larger Whisper models with proper GPU budgeting.
  • "Re-transcribe" UI action.

Conventions

  1. TDD. Write the failing test first. Run it red. Implement. Run it green. Commit.
  2. Worker handler shape: def handler(job_data: dict) -> dict — return the success output, raise for retry.
  3. Worker name strings: extract.pdf, extract.image, ingest.video, sync.source_doc. Exact strings — used in Node enqueue calls AND Python SUBSCRIBE registrations.
  4. All Python repo writes pass actor={'kind': 'worker', 'id': None} when emitting audit_log rows.
  5. Commit per task.
  6. Snapshot CT 310 + CT 311 before any phase that touches infra (the standing rule). Phase A is code-only. Phase B installs system deps. Phase C resizes the CT + adds GPU passthrough — explicit snapshot before that one.

File structure (what this plan creates)

workers/                                # NEW — Python service lives in same repo
  pyproject.toml                        # Python 3.12 + deps (psycopg, structlog, ...)
  README.md                             # short ops notes
  void_workers/
    __init__.py
    config.py                           # env loader
    log.py                              # structlog setup
    boss.py                             # claim / finish / fail / retry against pgboss tables
    repo.py                             # psycopg shim for refs / source_docs updates + audit
    safe_fetch.py                       # Python port of lib/ingest/safe_fetch.js
    runner.py                           # main: load handlers, spawn loops, signal handlers
    handlers/
      __init__.py
      echo.py                           # trivial — proves harness end-to-end
      pdf.py                            # extract.pdf  (Phase B)
      image.py                          # extract.image (Phase B)
      video.py                          # ingest.video (Phase C)
      sourcedoc.py                      # sync.source_doc (Phase D)
    model.py                            # Whisper loader (CUDA detect + CPU fallback) — Phase C
  tests/
    conftest.py                         # shared fixtures (DB url, pgboss schema reset, sample blobs)
    fixtures/
      born_digital.pdf                  # tiny PDF with embedded text
      scanned.pdf                       # tiny PDF that is a single page of an image
      eng_text.png                      # PNG with known text baked in
    test_boss.py
    test_echo.py
    test_pdf.py
    test_image.py
    test_video.py
    test_sourcedoc.py
    test_safe_fetch.py

deploy/
  void-workers.service                  # NEW systemd unit
  push-workers.sh                       # NEW — sibling to push.sh
  README.md                             # extend: workers bootstrap section

lib/                                    # Node-side changes (already exists)
  jobs/workers/blob.js                  # MODIFY: after creating ref, enqueue extract.pdf/extract.image
  api/routes/capture.js                 # MODIFY: detect YouTube/Vimeo, enqueue ingest.video
  cron/sync_source_docs.js              # NEW — node-cron, fires sync.source_doc daily

package.json                            # ADD node-cron, bump version in Phase D
server.js                               # bump VERSION + boot the cron in Phase D
CHANGELOG.md                            # Phase D entry
tests/server.test.js                    # version assertion bump
docs/plan-4-complete.md                 # Phase D

Phase A — Python harness + systemd unit

Task A1: Workers skeleton + venv install on the dev box

Files:

  • Create: workers/pyproject.toml

  • Create: workers/void_workers/__init__.py

  • Create: workers/void_workers/config.py

  • Create: workers/void_workers/log.py

  • Create: workers/README.md

  • Step 1: Create workers/pyproject.toml:

    [project]
    name = "void-workers"
    version = "0.1.0"
    requires-python = ">=3.12"
    dependencies = [
      "psycopg[binary,pool]>=3.2",
      "structlog>=24.1",
    ]
    
    [project.optional-dependencies]
    pdf = ["pdfplumber>=0.11", "pytesseract>=0.3.13", "pillow>=10.3"]
    image = ["pytesseract>=0.3.13", "pillow>=10.3"]
    video = ["yt-dlp>=2024.10.0", "faster-whisper>=1.0.3"]
    test = ["pytest>=8.0", "pytest-asyncio>=0.23"]
    all = ["void-workers[pdf,image,video,test]"]
    
    [tool.setuptools.packages.find]
    where = ["."]
    include = ["void_workers*"]
    
  • Step 2: Create workers/void_workers/__init__.py:

    __version__ = "0.1.0"
    
  • Step 3: Create workers/void_workers/config.py:

    import os
    
    def env(name, default=None, required=False):
        v = os.environ.get(name, default)
        if required and v is None:
            raise RuntimeError(f"env {name} is required")
        return v
    
    def env_int(name, default):
        return int(os.environ.get(name, default))
    
    DATABASE_URL = env("DATABASE_URL", required=True)
    BLOB_ROOT = env("BLOB_ROOT", "/var/lib/void/blobs")
    WHISPER_MODEL = env("WHISPER_MODEL", "small.en")
    WHISPER_CACHE = env("WHISPER_CACHE", "/var/lib/void/whisper-models")
    ALLOW_PRIVATE = env("VOID_INGEST_ALLOW_PRIVATE", "false") == "true"
    
    CONCURRENCY = {
        "extract.pdf":      env_int("VOID_CONCURRENCY_EXTRACT_PDF", 2),
        "extract.image":    env_int("VOID_CONCURRENCY_EXTRACT_IMAGE", 2),
        "ingest.video":     env_int("VOID_CONCURRENCY_INGEST_VIDEO", 1),
        "sync.source_doc":  env_int("VOID_CONCURRENCY_SYNC_SOURCE_DOC", 1),
        "echo":             env_int("VOID_CONCURRENCY_ECHO", 1),
    }
    
    POLL_INTERVAL_MS = env_int("VOID_POLL_INTERVAL_MS", 1000)
    
  • Step 4: Create workers/void_workers/log.py:

    import logging
    import structlog
    
    def init():
        structlog.configure(
            processors=[
                structlog.contextvars.merge_contextvars,
                structlog.processors.add_log_level,
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.JSONRenderer(),
            ],
            wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
        )
        return structlog.get_logger()
    
    log = init()
    
  • Step 5: Create workers/README.md:

    # void-workers
    
    Python ML ingest service alongside void-server (Node). Sibling of `lib/`.
    
    ## Local dev
    
    ```bash
    cd workers
    python3.12 -m venv .venv
    . .venv/bin/activate
    pip install -e ".[all]"
    export DATABASE_URL="postgres://..."
    python -m void_workers.runner
    

    Tests

    pip install -e ".[test,all]"
    pytest -v
    

    See ../docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md for the full plan.

    
    
  • Step 6: Create the venv + install:

    cd /project/src/void-v2/workers
    python3.12 -m venv .venv
    . .venv/bin/activate
    pip install -e ".[all]"
    python -c "from void_workers import __version__; print(__version__)"
    

    Expected: 0.1.0.

  • Step 7: Commit.

    cd /project/src/void-v2
    git add workers/pyproject.toml workers/README.md workers/void_workers/__init__.py workers/void_workers/config.py workers/void_workers/log.py
    git commit -m "feat(workers): Python skeleton + config + structlog"
    

Task A2: pgboss claim/finish helpers in boss.py

Files:

  • Create: workers/void_workers/boss.py

  • Create: workers/tests/conftest.py

  • Create: workers/tests/test_boss.py

  • Step 1: Create workers/tests/conftest.py:

    import os
    import pytest
    import psycopg
    
    DB_URL = os.environ["DATABASE_URL"]
    
    @pytest.fixture
    def conn():
        with psycopg.connect(DB_URL, autocommit=True) as c:
            yield c
    
    @pytest.fixture(autouse=True)
    def reset_pgboss(conn):
        """Drop and recreate the pgboss schema before each test."""
        conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE")
        # We do NOT recreate it — pg-boss's first start() makes the schema.
        # Tests that need it call ensure_pgboss(conn).
        yield
    
    def ensure_pgboss(conn):
        """Bring pgboss schema up by shelling out to Node's queue.start()."""
        # In tests we cannot rely on Node — so we create the bare minimum
        # tables we touch. Real pgboss tables are richer; we only need the
        # columns boss.py reads and writes.
        conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss")
        conn.execute("""
          CREATE TABLE IF NOT EXISTS pgboss.job (
            id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
            name text NOT NULL,
            priority int NOT NULL DEFAULT 0,
            data jsonb,
            state text NOT NULL DEFAULT 'created',
            retry_limit int NOT NULL DEFAULT 5,
            retry_count int NOT NULL DEFAULT 0,
            retry_delay int NOT NULL DEFAULT 10,
            retry_backoff boolean NOT NULL DEFAULT true,
            start_after timestamptz NOT NULL DEFAULT now(),
            started_on timestamptz,
            completed_on timestamptz,
            created_on timestamptz NOT NULL DEFAULT now(),
            output jsonb
          )
        """)
    
    @pytest.fixture
    def boss_ready(conn):
        ensure_pgboss(conn)
        yield conn
    
  • Step 2: Write the failing test workers/tests/test_boss.py:

    import json
    from void_workers.boss import Boss
    
    def test_claim_returns_none_when_no_jobs(boss_ready):
        boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
        assert boss.claim("echo") is None
    
    def test_claim_then_complete_round_trip(boss_ready):
        boss_ready.execute(
            "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
            (json.dumps({"ping": 1}),)
        )
        boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
        job = boss.claim("echo")
        assert job is not None
        assert job["data"] == {"ping": 1}
        boss.complete(job["id"], {"pong": 1})
        row = boss_ready.execute(
            "SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],)
        ).fetchone()
        assert row[0] == "completed"
        assert row[1] == {"pong": 1}
    
    def test_fail_records_output_and_state(boss_ready):
        boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')")
        boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
        job = boss.claim("echo")
        boss.fail(job["id"], {"name": "BoomError", "message": "boom"})
        row = boss_ready.execute(
            "SELECT state, output, retry_count FROM pgboss.job WHERE id=%s",
            (job["id"],)
        ).fetchone()
        # retry_limit defaults to 5 and retry_count was 0 → should move to 'retry'
        assert row[0] == "retry"
        assert row[2] == 1
    
    def test_fail_past_retry_limit_marks_failed(boss_ready):
        boss_ready.execute(
            "INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)"
        )
        boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
        job = boss.claim("echo")
        boss.fail(job["id"], {"name": "Done"})
        row = boss_ready.execute(
            "SELECT state FROM pgboss.job WHERE id=%s", (job["id"],)
        ).fetchone()
        assert row[0] == "failed"
    
  • Step 3: Run red:

    cd /project/src/void-v2/workers
    . .venv/bin/activate
    DATABASE_URL="$(grep ^DATABASE_URL= ../.env 2>/dev/null | cut -d= -f2- ||
                    echo "postgres://void:$(ssh root@192.168.1.124 cat /root/void2-db-pass.txt | sed s/^DB_PASS=//)@192.168.1.215:5432/void")" \
      pytest tests/test_boss.py -v
    

    Expected: 4 failed (boss.py does not exist).

  • Step 4: Implement workers/void_workers/boss.py:

    import json
    import psycopg
    from psycopg.rows import dict_row
    
    class Boss:
        def __init__(self, dsn):
            self.dsn = dsn
    
        def _conn(self):
            return psycopg.connect(self.dsn, autocommit=False, row_factory=dict_row)
    
        def claim(self, queue):
            """Atomically claim one job for the given queue. Returns dict or None."""
            with self._conn() as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                      SELECT id, data, retry_count, retry_limit
                        FROM pgboss.job
                       WHERE name=%s
                         AND state IN ('created','retry')
                         AND start_after <= now()
                       ORDER BY priority DESC, created_on ASC
                       FOR UPDATE SKIP LOCKED
                       LIMIT 1
                    """, (queue,))
                    row = cur.fetchone()
                    if not row: return None
                    cur.execute(
                        "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s",
                        (row["id"],)
                    )
                    conn.commit()
                    return row
    
        def complete(self, job_id, output):
            with self._conn() as conn:
                with conn.cursor() as cur:
                    cur.execute("""
                      UPDATE pgboss.job
                         SET state='completed', completed_on=now(), output=%s
                       WHERE id=%s
                    """, (json.dumps(output), job_id))
                conn.commit()
    
        def fail(self, job_id, output):
            """Mark the job retry (if budget left) or failed (otherwise)."""
            with self._conn() as conn:
                with conn.cursor() as cur:
                    cur.execute(
                        "SELECT retry_count, retry_limit, retry_delay, retry_backoff "
                        "FROM pgboss.job WHERE id=%s FOR UPDATE",
                        (job_id,)
                    )
                    row = cur.fetchone()
                    if not row: return
                    new_count = row["retry_count"] + 1
                    if new_count > row["retry_limit"]:
                        cur.execute(
                            "UPDATE pgboss.job SET state='failed', output=%s, "
                            "completed_on=now() WHERE id=%s",
                            (json.dumps(output), job_id)
                        )
                    else:
                        # exponential backoff: delay * 2^(count-1)
                        delay = row["retry_delay"]
                        if row["retry_backoff"]:
                            delay = delay * (2 ** (new_count - 1))
                        cur.execute("""
                          UPDATE pgboss.job
                             SET state='retry',
                                 retry_count=%s,
                                 start_after=now() + (%s || ' seconds')::interval,
                                 output=%s
                           WHERE id=%s
                        """, (new_count, str(delay), json.dumps(output), job_id))
                conn.commit()
    
  • Step 5: Run green:

    DATABASE_URL=... pytest tests/test_boss.py -v
    

    Expected: 4 passed.

  • Step 6: Commit.

    cd /project/src/void-v2
    git add workers/void_workers/boss.py workers/tests/conftest.py workers/tests/test_boss.py
    git commit -m "feat(workers): pgboss claim/complete/fail via psycopg"
    

Task A3: Echo handler + runner loop

Files:

  • Create: workers/void_workers/handlers/__init__.py

  • Create: workers/void_workers/handlers/echo.py

  • Create: workers/void_workers/runner.py

  • Create: workers/tests/test_echo.py

  • Step 1: Write failing test workers/tests/test_echo.py:

    import json
    import threading
    import time
    from void_workers.boss import Boss
    from void_workers.runner import Runner
    from void_workers.config import DATABASE_URL
    
    def test_echo_handler_runs(boss_ready):
        boss_ready.execute(
            "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
            (json.dumps({"ping": 42}),)
        )
        runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}})
        t = threading.Thread(target=runner.run, kwargs={"once": True}, daemon=True)
        t.start()
        t.join(timeout=5)
        row = boss_ready.execute(
            "SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1"
        ).fetchone()
        assert row[0] == "completed"
        assert row[1] == {"pong": 42}
    
  • Step 2: Run red:

    DATABASE_URL=... pytest tests/test_echo.py -v
    

    Expected: ImportError / 1 failed.

  • Step 3: Implement workers/void_workers/handlers/__init__.py:

    from . import echo
    
    REGISTRY = {
        echo.NAME: echo.handle,
    }
    
  • Step 4: Implement workers/void_workers/handlers/echo.py:

    NAME = "echo"
    
    def handle(job_data: dict) -> dict:
        return {"pong": job_data.get("ping", 0)}
    
  • Step 5: Implement workers/void_workers/runner.py:

    import signal
    import threading
    import time
    from concurrent.futures import ThreadPoolExecutor
    from .boss import Boss
    from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL
    from .handlers import REGISTRY
    from .log import log
    
    class Runner:
        def __init__(self, dsn=None, handlers=None):
            self.boss = Boss(dsn or DATABASE_URL)
            self.handlers = handlers or {
                name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY
            }
            self._stop = threading.Event()
    
        def _process_one(self, queue):
            job = self.boss.claim(queue)
            if not job: return False
            log.info("job_claimed", job_id=str(job["id"]), name=queue)
            handler = REGISTRY[queue]
            try:
                out = handler(job["data"] or {})
                self.boss.complete(job["id"], out or {})
                log.info("job_completed", job_id=str(job["id"]), name=queue)
            except Exception as e:
                log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e))
                self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)})
            return True
    
        def _loop(self, queue):
            interval = POLL_INTERVAL_MS / 1000.0
            while not self._stop.is_set():
                try:
                    did = self._process_one(queue)
                except Exception as e:
                    log.error("loop_error", queue=queue, err=str(e))
                    did = False
                if not did:
                    self._stop.wait(interval)
    
        def run(self, once=False):
            if once:
                for q in self.handlers:
                    self._process_one(q)
                return
            executor = ThreadPoolExecutor(max_workers=sum(h["concurrency"] for h in self.handlers.values()))
            for q, cfg in self.handlers.items():
                for _ in range(cfg["concurrency"]):
                    executor.submit(self._loop, q)
            for sig in (signal.SIGTERM, signal.SIGINT):
                signal.signal(sig, lambda *_: self._stop.set())
            while not self._stop.is_set():
                time.sleep(0.5)
            executor.shutdown(wait=True)
    
    if __name__ == "__main__":
        Runner().run()
    
  • Step 6: Run green:

    DATABASE_URL=... pytest tests/test_echo.py -v
    

    Expected: 1 passed.

  • Step 7: Commit.

    cd /project/src/void-v2
    git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/echo.py workers/void_workers/runner.py workers/tests/test_echo.py
    git commit -m "feat(workers): runner loop + echo handler"
    

Task A4: systemd unit + push-workers.sh

Files:

  • Create: deploy/void-workers.service

  • Create: deploy/push-workers.sh

  • Modify: deploy/README.md (append workers bootstrap section)

  • Step 1: Create deploy/void-workers.service:

    [Unit]
    Description=Void 2.0 workers
    After=network-online.target void-server.service
    Wants=network-online.target
    
    [Service]
    Type=simple
    User=voidworkers
    WorkingDirectory=/opt/void-workers
    EnvironmentFile=/opt/void-workers/.env
    ExecStart=/opt/void-workers/venv/bin/python -m void_workers.runner
    Restart=on-failure
    RestartSec=5
    StandardOutput=journal
    StandardError=journal
    MemoryMax=6G
    
    [Install]
    WantedBy=multi-user.target
    
  • Step 2: Create deploy/push-workers.sh:

    #!/usr/bin/env bash
    set -euo pipefail
    
    # Push Python void-workers source to CT 311 and restart the service.
    # Run from /project/src/void-v2.
    
    TARGET=${TARGET:-root@192.168.1.13}
    REMOTE_DIR=${REMOTE_DIR:-/opt/void-workers}
    
    rsync -avz --delete \
      --exclude .venv \
      --exclude __pycache__ \
      --exclude '*.egg-info' \
      --exclude tests \
      workers/ "$TARGET:$REMOTE_DIR/"
    
    ssh "$TARGET" "
      cd $REMOTE_DIR
      if [ ! -d venv ]; then
        python3.12 -m venv venv
      fi
      . venv/bin/activate
      pip install --quiet --upgrade pip
      pip install --quiet -e '.[all]'
      systemctl restart void-workers
    "
    echo "Workers deployed."
    

    Make it executable:

    chmod +x deploy/push-workers.sh
    
  • Step 3: Extend deploy/README.md — append the following section:

    
    ## Workers (Python void-workers — Plan 4+)
    
    Runs alongside void-server as a second systemd unit.
    
    One-time setup on CT 311:
    
    ```bash
    apt install -y python3.12 python3.12-venv python3-pip \
                   ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils
    
    useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers
    mkdir -p /opt/void-workers /var/lib/void/whisper-models
    chown voidworkers: /opt/void-workers
    chown -R voidworkers: /var/lib/void/whisper-models
    
    # voidworkers needs to read the shared blob store
    usermod -aG void voidworkers
    
    install -m 644 deploy/void-workers.service /etc/systemd/system/
    systemctl daemon-reload
    systemctl enable void-workers
    
    # /opt/void-workers/.env — see plan §Deploy delta
    

    Deploy after edits:

    cd /project/src/void-v2
    ./deploy/push-workers.sh
    
    
    
  • Step 4: Run the existing Node test suite — must still be green (we have not modified anything Node touches yet).

    cd /project/src/void-v2
    npx vitest run
    
  • Step 5: Commit.

    git add deploy/void-workers.service deploy/push-workers.sh deploy/README.md
    git commit -m "feat(workers): systemd unit + push-workers.sh"
    

Task A5: Phase A close — memory checkpoint

  • Step 1: Update memory project_void_v2_execution.md: mark "Plan 4 Phase A complete: harness + echo running locally; void-workers.service installed on CT 311 pending real deps (Phase B+)".
  • Step 2: Note the boss.py contract decision (psycopg + native SQL) so future plans don't re-litigate it.

Phase B — extract.pdf + extract.image

Task B1: Install system deps on the dev box

These deps support local pytest runs. The same packages will be installed on CT 311 when Phase A's systemd unit lands there.

  • Step 1: Verify deps locally (we are running on Ubuntu 24.04 LXC per CLAUDE.md):

    command -v pdftotext && command -v tesseract && command -v ffmpeg || echo "missing deps"
    apt list --installed 2>/dev/null | grep -E "poppler|tesseract|ffmpeg" | head -5
    
  • Step 2: If any are missing, install:

    apt install -y poppler-utils tesseract-ocr tesseract-ocr-eng ffmpeg
    
  • Step 3: Install pdf/image Python extras into the workers venv:

    cd /project/src/void-v2/workers
    . .venv/bin/activate
    pip install -e '.[pdf,image,test]'
    python -c "import pdfplumber, pytesseract; print('ok')"
    
  • Step 4: Add a tests/fixtures/ README explaining how fixtures were generated (so we can regenerate them):

    Create workers/tests/fixtures/README.md:

    # Test fixtures
    
    - `born_digital.pdf` — generated by
      `pdftotext -version 2>&1 | head -1 > /tmp/b.txt && enscript -p /tmp/b.ps /tmp/b.txt && ps2pdf /tmp/b.ps born_digital.pdf`
      (or any tool that produces a PDF with embedded text). Must contain
      the string "void-workers".
    - `scanned.pdf``convert -density 200 eng_text.png scanned.pdf` (PDF
      that is a single page of an image — `pdftotext` will yield nothing,
      Tesseract fallback must pick it up).
    - `eng_text.png` — a PNG with a clear English sentence on white
      background. Must contain the string "blackflame".
    

    Generate them now:

    cd /project/src/void-v2/workers/tests/fixtures
    # eng_text.png: render text with ImageMagick (apt: imagemagick)
    apt install -y imagemagick ghostscript
    convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 36 \
            -fill black -annotate +50+100 "blackflame palette" eng_text.png
    # born_digital.pdf: simple TXT → PDF
    echo "void-workers proof-of-life document" > /tmp/b.txt
    enscript -p /tmp/b.ps /tmp/b.txt 2>/dev/null || echo "enscript missing — using ImageMagick"
    if [ -f /tmp/b.ps ]; then
      ps2pdf /tmp/b.ps born_digital.pdf
    else
      convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 24 \
              -fill black -annotate +50+100 "void-workers proof-of-life" born_digital.pdf
      # NOTE: this is a single-image PDF too — pdftotext will return empty.
      # Override: use poppler's pdftohtml tools or create with text.
    fi
    # scanned.pdf: PNG → PDF
    convert -density 200 eng_text.png scanned.pdf
    ls -la
    

    If enscript is not available, install or use another text-PDF tool. The key fixture invariant is that pdftotext born_digital.pdf - returns non-empty text.

  • Step 5: Commit fixtures.

    cd /project/src/void-v2
    git add workers/tests/fixtures
    git commit -m "test(workers): pdf/image test fixtures"
    

Task B2: extract.pdf worker — pdftotext path

Files:

  • Create: workers/void_workers/handlers/pdf.py

  • Modify: workers/void_workers/handlers/__init__.py

  • Create: workers/void_workers/repo.py

  • Create: workers/tests/test_pdf.py

  • Step 1: Write the failing test:

    # workers/tests/test_pdf.py
    import json
    import os
    import psycopg
    from pathlib import Path
    from void_workers.handlers.pdf import handle as handle_pdf
    from void_workers.config import DATABASE_URL
    
    FIXTURES = Path(__file__).parent / "fixtures"
    
    def _seed_space_and_ref(conn, blob_path, kind="pdf"):
        """Bring up the Void schema (matches migrate.js) just enough to insert a ref."""
        sp = conn.execute(
            "INSERT INTO spaces(slug, name) VALUES('plan4-tests', 'P4') "
            "ON CONFLICT (slug) DO UPDATE SET name=EXCLUDED.name RETURNING id"
        ).fetchone()[0]
        ref = conn.execute(
            "INSERT INTO refs(space_id, kind, source_url, title, blob_path) "
            "VALUES(%s, %s, NULL, 'fixture', %s) RETURNING id",
            (sp, kind, str(blob_path))
        ).fetchone()[0]
        return sp, ref
    
    def test_pdf_born_digital_uses_pdftotext(conn):
        """A PDF with embedded text gets pdftotext-extracted body."""
        conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
        # Apply Void migrations against the conn
        _run_node_migrations()
        blob = FIXTURES / "born_digital.pdf"
        sp, ref = _seed_space_and_ref(conn, blob)
        out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)})
        assert out["ref_id"] == str(ref)
        assert out["chars"] > 0
        row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
        assert "void-workers" in (row[0] or "").lower()
    
    def _run_node_migrations():
        """Shell to Node's migrate runner — same DB."""
        import subprocess
        subprocess.run(
            ["node", "lib/db/migrate.js", "up"],
            cwd="/project/src/void-v2",
            check=True
        )
    
  • Step 2: Run red:

    cd /project/src/void-v2/workers
    . .venv/bin/activate
    DATABASE_URL=... pytest tests/test_pdf.py -v
    

    Expected: ImportError on pdf module.

  • Step 3: Implement workers/void_workers/repo.py:

    import json
    import psycopg
    from psycopg.rows import dict_row
    from .config import DATABASE_URL
    
    def _conn():
        return psycopg.connect(DATABASE_URL, autocommit=True, row_factory=dict_row)
    
    def update_ref(ref_id, *, body_text=None, metadata_patch=None):
        """UPDATE refs ... + emit audit_log row with actor_kind='worker'."""
        with _conn() as conn:
            before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone()
            sets, args = [], []
            if body_text is not None:
                sets.append("body_text=%s"); args.append(body_text)
            if metadata_patch is not None:
                sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
                args.append(json.dumps(metadata_patch))
            if not sets: return before
            sets.append("updated_at=now()")
            args.append(ref_id)
            after = conn.execute(
                f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *",
                args
            ).fetchone()
            # audit log — match Node's audit.js diff shape (simple)
            diff = {}
            if body_text is not None and (before or {}).get("body_text") != body_text:
                diff["body_text"] = {"before": "(redacted)", "after_len": len(body_text)}
            conn.execute("""
              INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
              VALUES('worker', NULL, 'ref', %s, 'update', %s)
            """, (ref_id, json.dumps({"kind": "update", "changes": diff})))
            return after
    
  • Step 4: Implement workers/void_workers/handlers/pdf.py:

    import subprocess
    from .. import repo
    
    NAME = "extract.pdf"
    
    def _pdftotext(blob_path):
        return subprocess.check_output(
            ["pdftotext", "-layout", blob_path, "-"],
            timeout=120
        ).decode("utf-8", errors="replace")
    
    def handle(job_data: dict) -> dict:
        ref_id = job_data["ref_id"]
        blob_path = job_data["blob_path"]
        text = _pdftotext(blob_path).strip()
        method = "pdftotext"
        # Fallback (Tesseract OCR) lands in Task B3.
        body_text = text[:200_000]
        repo.update_ref(ref_id, body_text=body_text,
                        metadata_patch={"extract": {"method": method, "chars": len(body_text)}})
        return {"ref_id": ref_id, "chars": len(body_text), "method": method}
    
  • Step 5: Add pdf.handle to the registry — edit workers/void_workers/handlers/__init__.py:

    from . import echo, pdf
    
    REGISTRY = {
        echo.NAME: echo.handle,
        pdf.NAME: pdf.handle,
    }
    
  • Step 6: Run green:

    DATABASE_URL=... pytest tests/test_pdf.py::test_pdf_born_digital_uses_pdftotext -v
    

    Expected: 1 passed.

  • Step 7: Commit.

    cd /project/src/void-v2
    git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/pdf.py workers/void_workers/repo.py workers/tests/test_pdf.py
    git commit -m "feat(workers): extract.pdf via pdftotext"
    

Task B3: PDF Tesseract fallback for scanned pages

Files:

  • Modify: workers/void_workers/handlers/pdf.py

  • Modify: workers/tests/test_pdf.py

  • Step 1: Add the failing test (append to test_pdf.py):

    def test_pdf_scanned_falls_back_to_tesseract(conn):
        conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
        _run_node_migrations()
        blob = FIXTURES / "scanned.pdf"
        sp, ref = _seed_space_and_ref(conn, blob)
        out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)})
        assert out["method"] == "tesseract"
        row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
        assert "blackflame" in (row[0] or "").lower()
    
  • Step 2: Run red.

  • Step 3: Implement the fallback in workers/void_workers/handlers/pdf.py:

    import io
    import subprocess
    import tempfile
    from pathlib import Path
    from PIL import Image
    import pytesseract
    from .. import repo
    
    NAME = "extract.pdf"
    FALLBACK_THRESHOLD = 200   # chars
    
    def _pdftotext(blob_path):
        return subprocess.check_output(
            ["pdftotext", "-layout", blob_path, "-"], timeout=120
        ).decode("utf-8", errors="replace")
    
    def _ocr_pdf(blob_path):
        """Rasterize each page with pdftoppm, OCR each with Tesseract."""
        with tempfile.TemporaryDirectory() as tmp:
            subprocess.run(
                ["pdftoppm", "-r", "200", "-png", blob_path, f"{tmp}/p"],
                check=True, timeout=300
            )
            pages = sorted(Path(tmp).glob("p-*.png"))
            parts = []
            for p in pages:
                img = Image.open(p)
                parts.append(pytesseract.image_to_string(img, lang="eng"))
            return "\n".join(parts)
    
    def handle(job_data: dict) -> dict:
        ref_id = job_data["ref_id"]
        blob_path = job_data["blob_path"]
        method = "pdftotext"
        text = _pdftotext(blob_path).strip()
        if len(text) < FALLBACK_THRESHOLD:
            method = "tesseract"
            text = _ocr_pdf(blob_path).strip()
        body_text = text[:200_000]
        repo.update_ref(ref_id, body_text=body_text,
                        metadata_patch={"extract": {"method": method, "chars": len(body_text)}})
        return {"ref_id": ref_id, "chars": len(body_text), "method": method}
    
  • Step 4: Run green:

    DATABASE_URL=... pytest tests/test_pdf.py -v
    

    Expected: 2 passed.

  • Step 5: Commit.

    git add workers/void_workers/handlers/pdf.py workers/tests/test_pdf.py
    git commit -m "feat(workers): pdf Tesseract fallback for scanned pages"
    

Task B4: extract.image worker

Files:

  • Create: workers/void_workers/handlers/image.py

  • Modify: workers/void_workers/handlers/__init__.py

  • Create: workers/tests/test_image.py

  • Step 1: Write the failing test workers/tests/test_image.py:

    import os
    from pathlib import Path
    from void_workers.handlers.image import handle as handle_image
    
    FIXTURES = Path(__file__).parent / "fixtures"
    
    def test_image_ocr(conn):
        conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
        import subprocess
        subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
        sp = conn.execute(
            "INSERT INTO spaces(slug, name) VALUES('plan4-img', 'I') RETURNING id"
        ).fetchone()[0]
        blob = FIXTURES / "eng_text.png"
        ref = conn.execute(
            "INSERT INTO refs(space_id, kind, blob_path) VALUES(%s, 'image', %s) RETURNING id",
            (sp, str(blob))
        ).fetchone()[0]
        out = handle_image({"ref_id": str(ref), "blob_path": str(blob)})
        assert out["chars"] > 0
        row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
        assert "blackflame" in (row[0] or "").lower()
    
  • Step 2: Run red.

  • Step 3: Implement workers/void_workers/handlers/image.py:

    from PIL import Image
    import pytesseract
    from .. import repo
    
    NAME = "extract.image"
    
    def handle(job_data: dict) -> dict:
        ref_id = job_data["ref_id"]
        blob_path = job_data["blob_path"]
        text = pytesseract.image_to_string(Image.open(blob_path), lang="eng").strip()
        body_text = text[:200_000]
        repo.update_ref(ref_id, body_text=body_text,
                        metadata_patch={"extract": {"method": "tesseract", "chars": len(body_text)}})
        return {"ref_id": ref_id, "chars": len(body_text)}
    
  • Step 4: Register — edit workers/void_workers/handlers/__init__.py:

    from . import echo, pdf, image
    
    REGISTRY = {
        echo.NAME: echo.handle,
        pdf.NAME: pdf.handle,
        image.NAME: image.handle,
    }
    
  • Step 5: Run green.

  • Step 6: Commit.

    git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/image.py workers/tests/test_image.py
    git commit -m "feat(workers): extract.image via Tesseract"
    

Task B5: Node side — blob worker enqueues extract jobs

Files:

  • Modify: lib/jobs/workers/blob.js

  • Modify: tests/jobs/workers/blob.test.js

  • Step 1: Extend the existing blob.test.js with a new assertion. Append:

    it('enqueues extract.pdf after creating a pdf ref', async () => {
      const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null });
      const upTmp = path.join(tmpRoot, 'doc.tmp');
      await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...'));
      const id = await queue.enqueue('ingest.blob', {
        space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf'
      });
      await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
      const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js');
      const followUp = (await jobsRepo.list({ name: 'extract.pdf' })).find(j => j.data?.blob_path);
      expect(followUp).toBeTruthy();
    });
    
    it('enqueues extract.image after creating an image ref', async () => {
      const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null });
      const upTmp = path.join(tmpRoot, 'pic.tmp');
      await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47]));
      const id = await queue.enqueue('ingest.blob', {
        space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png'
      });
      await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
      const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js');
      const followUp = (await jobsRepo.list({ name: 'extract.image' })).find(j => j.data?.blob_path);
      expect(followUp).toBeTruthy();
    });
    
  • Step 2: Run red:

    cd /project/src/void-v2
    npx vitest run tests/jobs/workers/blob.test.js
    

    Expected: 2 new tests fail (no extract.* enqueue yet).

  • Step 3: Modify lib/jobs/workers/blob.js to enqueue the follow-up job:

    // existing imports
    import * as queue from '../queue.js';
    
    // ... inside handler, after `const row = await refs.create(...)`:
    if (kind === 'pdf') {
      await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path });
    } else if (kind === 'image') {
      await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path });
    }
    return { ref_id: row.id, sha };
    

    (Apply the exact edit at the return { ref_id: row.id, sha }; site.)

  • Step 4: Run green:

    npx vitest run tests/jobs/workers/blob.test.js
    

    Expected: 4 passed.

  • Step 5: Run the full suite — must still be green.

    npx vitest run
    
  • Step 6: Commit.

    git add lib/jobs/workers/blob.js tests/jobs/workers/blob.test.js
    git commit -m "feat(jobs): blob worker fans out to extract.pdf / extract.image"
    

Task B6: Phase B close — memory + full suite

  • Step 1: npx vitest run — full Node suite green.
  • Step 2: cd workers && DATABASE_URL=... pytest -v — full Python suite green.
  • Step 3: Update memory: "Plan 4 Phase B complete: extract.pdf + extract.image workers; Node blob worker fans out."

Phase C — ingest.video + CT 311 resize + GPU passthrough

Task C1: Snapshot + CT 311 resize + GPU passthrough

User pre-authorized this 2026-06-01.

  • Step 1: Snapshot CT 310 + CT 311.

    SNAP=plan4_pre_resize_$(date +%Y%m%d_%H%M)
    ssh root@192.168.1.124 "pct snapshot 310 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" &
    ssh root@192.168.1.124 "pct snapshot 311 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" &
    wait
    ssh root@192.168.1.124 "pct listsnapshot 310 | tail -3; echo ---; pct listsnapshot 311 | tail -3"
    
  • Step 2: Resize CT 311.

    ssh root@192.168.1.124 "pct set 311 -memory 8192 -cores 6"
    ssh root@192.168.1.124 "pct config 311 | grep -E 'cores|memory'"
    
  • Step 3: Add GPU device-node passthrough lines (copy from CT 102).

    ssh root@192.168.1.124 "
      pct set 311 -dev0 /dev/nvidia0,gid=44
      pct set 311 -dev1 /dev/nvidiactl,gid=44
      pct set 311 -dev2 /dev/nvidia-uvm,gid=44
      pct set 311 -dev3 /dev/nvidia-uvm-tools,gid=44
      pct set 311 -dev4 /dev/nvidia-caps/nvidia-cap1,gid=44
      pct set 311 -dev5 /dev/nvidia-caps/nvidia-cap2,gid=44
      pct config 311 | grep dev
    "
    
  • Step 4: Restart CT 311.

    ssh root@192.168.1.124 "pct reboot 311"
    sleep 10
    ssh root@192.168.1.13 "ls -la /dev/nvidia* 2>&1 | head; nvidia-smi --query-gpu=name --format=csv,noheader 2>&1 || echo 'nvidia-smi missing (no driver in CT) — expected'"
    curl -s http://192.168.1.13:3000/health
    

    Expected: device nodes visible, health returns alpha-3.

  • Step 5: Commit (config-as-comment). No code change yet — we record the infra step in the next phase's commit message.

Task C2: Install ffmpeg + faster-whisper deps on CT 311

  • Step 1: Install system deps:

    ssh root@192.168.1.13 "
      apt update
      apt install -y python3.12 python3.12-venv python3-pip \
                     ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils \
                     nvidia-utils-535 nvidia-cuda-toolkit 2>&1 | tail -5
      python3.12 --version
      ffmpeg -version | head -1
      tesseract --version 2>&1 | head -1
    "
    

    Note: nvidia-cuda-toolkit is for ctranslate2's CUDA runtime; if you'd rather not pull the full toolkit (~1 GB), use the official libcudnn8 libcublas12 runtime packages instead.

  • Step 2: Create the voidworkers user + dirs on CT 311:

    ssh root@192.168.1.13 "
      id voidworkers >/dev/null 2>&1 || useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers
      mkdir -p /opt/void-workers /var/lib/void/whisper-models
      chown voidworkers: /opt/void-workers
      chown -R voidworkers: /var/lib/void/whisper-models
      usermod -aG void voidworkers
      chmod -R g+rX /var/lib/void/blobs   # ensure voidworkers (in void group) can read blobs
    
      install -m 644 /opt/void-workers/.env 2>/dev/null || true
      cat > /opt/void-workers/.env <<EOF
    

DATABASE_URL=$(grep ^DATABASE_URL= /opt/void-server/.env | cut -d= -f2-) BLOB_ROOT=/var/lib/void/blobs WHISPER_MODEL=small.en WHISPER_CACHE=/var/lib/void/whisper-models EOF chmod 600 /opt/void-workers/.env chown voidworkers: /opt/void-workers/.env "


- [ ] **Step 3:** Copy the systemd unit:

```bash
scp /project/src/void-v2/deploy/void-workers.service root@192.168.1.13:/etc/systemd/system/void-workers.service
ssh root@192.168.1.13 "systemctl daemon-reload && systemctl enable void-workers"

(Don't start it yet — venv is not installed; push-workers.sh will set it up.)

  • Step 4: Initial push.

    cd /project/src/void-v2
    ./deploy/push-workers.sh
    ssh root@192.168.1.13 "systemctl status void-workers --no-pager -n 5"
    

    Expected: active (running), waiting for jobs.

  • Step 5: Sanity — enqueue an echo via Node, observe Python claim it.

    TOKEN=$(ssh root@192.168.1.13 "grep ^OWNER_TOKEN= /opt/void-server/.env | cut -d= -f2-")
    # Manually insert an echo via psql since /api/capture doesn't enqueue 'echo':
    DBPASS=$(ssh root@192.168.1.124 'cat /root/void2-db-pass.txt' | tr -d '\r\n' | sed 's/^DB_PASS=//')
    PGPASSWORD="$DBPASS" psql -h 192.168.1.215 -U void -d void -c "INSERT INTO pgboss.job(name, data) VALUES('echo', '{\"ping\": 99}'::jsonb) RETURNING id"
    sleep 3
    PGPASSWORD="$DBPASS" psql -h 192.168.1.215 -U void -d void -c "SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1"
    

    Expected: completed | {"pong": 99}.

  • Step 6: Commit deploy-side changes (none in repo from this task, but mark the milestone).

    No commit — this task is operational on CT 311. Note in memory.

Task C3: Whisper model loader (model.py)

Files:

  • Create: workers/void_workers/model.py

  • Create: workers/tests/test_model.py

  • Step 1: Failing test (uses mock):

    # workers/tests/test_model.py
    from unittest.mock import patch, MagicMock
    from void_workers import model
    
    def test_model_returns_singleton(monkeypatch):
        m = MagicMock()
        monkeypatch.setattr(model, "_whisper_model", None)
        with patch("void_workers.model.cuda_available", return_value=False):
            with patch("faster_whisper.WhisperModel", return_value=m):
                a = model.whisper_model()
                b = model.whisper_model()
                assert a is b
    
    def test_transcribe_returns_joined_segments(monkeypatch):
        seg1 = MagicMock(text=" Hello world ")
        seg2 = MagicMock(text=" second line")
        fake_model = MagicMock()
        fake_model.transcribe.return_value = ([seg1, seg2], MagicMock())
        monkeypatch.setattr(model, "_whisper_model", fake_model)
        out = model.whisper_transcribe("/tmp/whatever.opus")
        assert "Hello world" in out
        assert "second line" in out
    
  • Step 2: Run red.

  • Step 3: Implement workers/void_workers/model.py:

    import os
    from .log import log
    
    _whisper_model = None
    
    def cuda_available():
        try:
            import ctranslate2
            return ctranslate2.get_cuda_device_count() > 0
        except Exception as e:
            log.info("ctranslate2_cuda_probe_failed", err=str(e))
            return False
    
    def whisper_model():
        global _whisper_model
        if _whisper_model is None:
            from faster_whisper import WhisperModel
            name = os.environ.get("WHISPER_MODEL", "small.en")
            cache = os.environ.get("WHISPER_CACHE", "/var/lib/void/whisper-models")
            device = "cuda" if cuda_available() else "cpu"
            compute_type = "float16" if device == "cuda" else "int8"
            log.info("whisper_loading", model=name, device=device, compute_type=compute_type, cache=cache)
            _whisper_model = WhisperModel(name, device=device, compute_type=compute_type, download_root=cache)
        return _whisper_model
    
    def whisper_transcribe(audio_path):
        segments, _info = whisper_model().transcribe(audio_path, vad_filter=True)
        return "\n".join(s.text.strip() for s in segments).strip()
    
  • Step 4: Run green:

    pytest tests/test_model.py -v
    

    Expected: 2 passed.

  • Step 5: Commit.

    git add workers/void_workers/model.py workers/tests/test_model.py
    git commit -m "feat(workers): whisper loader with CUDA detect + CPU fallback"
    

Task C4: ingest.video worker (yt-dlp + transcribe + create ref)

Files:

  • Create: workers/void_workers/handlers/video.py

  • Modify: workers/void_workers/handlers/__init__.py

  • Create: workers/tests/test_video.py

  • Modify: workers/void_workers/repo.py — add create_ref()

  • Step 1: Failing test (mocked yt-dlp + mocked Whisper):

    # workers/tests/test_video.py
    from unittest.mock import patch, MagicMock
    from void_workers.handlers.video import handle as handle_video
    
    def test_video_creates_ref_with_transcript_and_metadata(conn, monkeypatch):
        conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
        import subprocess
        subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
        sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('plan4-vid', 'V') RETURNING id").fetchone()[0]
    
        info = {
            "title": "Sample video",
            "description": "a description",
            "duration": 90,
            "uploader": "Channel",
            "thumbnail": "https://i.ytimg.com/t.jpg"
        }
        with patch("void_workers.handlers.video._yt_dlp_info", return_value=info), \
             patch("void_workers.handlers.video._yt_dlp_audio", return_value="/tmp/fake.opus"), \
             patch("void_workers.handlers.video.whisper_transcribe", return_value="hello world transcript"), \
             patch("os.unlink"):
            out = handle_video({"space_id": str(sp), "url": "https://youtu.be/abc"})
    
        assert "ref_id" in out
        row = conn.execute("SELECT title, body_text, source_kind FROM refs WHERE id=%s", (out["ref_id"],)).fetchone()
        assert row[0] == "Sample video"
        assert "hello world" in row[1]
        assert row[2] == "youtube"
    
  • Step 2: Run red.

  • Step 3: Implement workers/void_workers/handlers/video.py:

    import hashlib
    import os
    import subprocess
    import tempfile
    import json
    from .. import repo
    from ..model import whisper_transcribe
    
    NAME = "ingest.video"
    
    def _yt_dlp_info(url):
        """Returns dict of metadata, or None if yt-dlp could not extract."""
        try:
            out = subprocess.check_output(
                ["yt-dlp", "-J", "--no-warnings", "--no-playlist", url],
                timeout=60
            )
            return json.loads(out)
        except subprocess.CalledProcessError as e:
            return None
    
    def _yt_dlp_audio(url):
        """Downloads bestaudio to a temp .opus and returns the path."""
        tmp_dir = tempfile.mkdtemp(prefix="void-yt-")
        out_template = os.path.join(tmp_dir, "audio.%(ext)s")
        subprocess.run(
            ["yt-dlp", "-x", "--audio-format", "opus", "-o", out_template,
             "--no-warnings", "--no-playlist", url],
            check=True, timeout=600
        )
        for f in os.listdir(tmp_dir):
            if f.startswith("audio."):
                return os.path.join(tmp_dir, f)
        raise RuntimeError("yt-dlp produced no audio file")
    
    def _idem(space_id, url):
        return hashlib.sha256((space_id + "\x00" + url).encode()).hexdigest()
    
    def _kind(url):
        return "youtube" if ("youtube.com" in url or "youtu.be" in url) else "video"
    
    def handle(job_data: dict) -> dict:
        space_id = job_data["space_id"]
        url = job_data["url"]
        idem = _idem(space_id, url)
    
        info = _yt_dlp_info(url)
        if info is None:
            return {"skipped": "yt-dlp"}
    
        audio_path = _yt_dlp_audio(url)
        try:
            transcript = whisper_transcribe(audio_path)
        finally:
            try: os.unlink(audio_path)
            except OSError: pass
    
        ref_id = repo.create_ref({
            "space_id": space_id,
            "kind": "video",
            "source_url": url,
            "title": info.get("title") or url,
            "summary": (info.get("description") or "")[:5000],
            "body_text": transcript[:200_000],
            "source_kind": _kind(url),
            "external_id": idem,
            "metadata": {
                "duration_s": info.get("duration"),
                "uploader": info.get("uploader"),
                "thumbnail": info.get("thumbnail"),
                "extract": {"method": "whisper", "chars": len(transcript)},
            }
        })
        return {"ref_id": ref_id, "chars": len(transcript)}
    
  • Step 4: Extend workers/void_workers/repo.py with create_ref:

    def create_ref(input_):
        """INSERT INTO refs(...) RETURNING id; emit audit_log."""
        fields = ["space_id","kind","source_url","title","summary","body_text",
                  "blob_path","metadata","source_kind","external_id","captured_at"]
        cols, vals = [], []
        for f in fields:
            if f in input_:
                cols.append(f); vals.append(input_[f])
        placeholders = ",".join(f"%s" for _ in cols)
        sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
        with _conn() as conn:
            ref_id = conn.execute(sql, vals).fetchone()["id"]
            conn.execute("""
              INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
              VALUES('worker', NULL, 'ref', %s, 'create', %s)
            """, (ref_id, json.dumps({"kind": "create"})))
            return str(ref_id)
    

    (Make sure json import is at the top of repo.py if not already there. Also note metadata must be json.dumps'd before insertion because we're passing a dict; tweak the field handling to JSON-encode dict/list values.)

    Patch create_ref to JSON-encode jsonb fields:

    def _maybe_json(v):
        return json.dumps(v) if isinstance(v, (dict, list)) else v
    
    def create_ref(input_):
        fields = ["space_id","kind","source_url","title","summary","body_text",
                  "blob_path","metadata","source_kind","external_id","captured_at"]
        cols, vals = [], []
        for f in fields:
            if f in input_:
                cols.append(f); vals.append(_maybe_json(input_[f]))
        placeholders = ",".join(f"%s" for _ in cols)
        sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
        with _conn() as conn:
            ref_id = conn.execute(sql, vals).fetchone()["id"]
            conn.execute("""
              INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
              VALUES('worker', NULL, 'ref', %s, 'create', %s)
            """, (ref_id, json.dumps({"kind": "create"})))
            return str(ref_id)
    
  • Step 5: Register — edit workers/void_workers/handlers/__init__.py:

    from . import echo, pdf, image, video
    REGISTRY = {
        echo.NAME: echo.handle,
        pdf.NAME: pdf.handle,
        image.NAME: image.handle,
        video.NAME: video.handle,
    }
    
  • Step 6: Run green:

    pytest tests/test_video.py -v
    

    Expected: 1 passed.

  • Step 7: Commit.

    git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/video.py workers/void_workers/repo.py workers/tests/test_video.py
    git commit -m "feat(workers): ingest.video via yt-dlp + Whisper"
    

Task C5: Node — capture.js routes YouTube to ingest.video

Files:

  • Modify: lib/api/routes/capture.js

  • Modify: tests/api/capture.test.js

  • Step 1: Failing test — append to tests/api/capture.test.js:

    it('POST /api/capture with YouTube URL enqueues ingest.video, not ingest.url', async () => {
      const res = await request(app).post('/api/capture').set(ownerHeaders)
        .send({ space_id: sp.id, url: 'https://youtu.be/abc' });
      expect(res.status).toBe(202);
      expect(res.body.job_id).toBeTruthy();
      const { default: jobsRepo } = await import('../../lib/db/repos/jobs.js');
      const rows = await jobsRepo.list({ name: 'ingest.video' });
      expect(rows.find(r => r.id === res.body.job_id)).toBeTruthy();
      const urlRows = await jobsRepo.list({ name: 'ingest.url' });
      expect(urlRows.find(r => r.id === res.body.job_id)).toBeFalsy();
    });
    
  • Step 2: Run red.

  • Step 3: Modify lib/api/routes/capture.js — replace the POST / handler's enqueue logic:

    // helper near the top
    const VIDEO_HOST_RE = /(^|\.)(youtube\.com|youtu\.be|vimeo\.com)$/i;
    
    function isVideoUrl(url) {
      try { return VIDEO_HOST_RE.test(new URL(url).hostname); }
      catch { return false; }
    }
    
    // inside router.post('/'), after computing idem + the existing-ref check:
    const isVideo = isVideoUrl(url);
    const job_name = isVideo ? 'ingest.video' : 'ingest.url';
    const job_id = await queue.enqueue(job_name, { space_id, url });
    res.status(202).json({ job_id, idempotency_key: idem });
    
  • Step 4: Run green:

    npx vitest run tests/api/capture.test.js
    

    Expected: existing + new tests pass.

  • Step 5: Commit.

    git add lib/api/routes/capture.js tests/api/capture.test.js
    git commit -m "feat(api): capture routes YouTube/Vimeo URLs to ingest.video"
    

Task C6: Phase C close — snapshot + memory

  • Step 1: Run full Node + Python test suites green.

    cd /project/src/void-v2 && npx vitest run
    cd workers && DATABASE_URL=... pytest -v
    
  • Step 2: Snapshot CT 310 + 311 — plan4_phase_c_<timestamp>.

  • Step 3: Update memory: "Plan 4 Phase C complete: Whisper + yt-dlp live; CT 311 resized 6c/8G + GPU passthrough; voidworkers.service running on CT 311."


Phase D — Source-doc sync + version bump + completion doc

Task D1: Python safe_fetch port

Files:

  • Create: workers/void_workers/safe_fetch.py

  • Create: workers/tests/test_safe_fetch.py

  • Step 1: Failing test:

    # workers/tests/test_safe_fetch.py
    import pytest
    from void_workers.safe_fetch import safe_fetch, SafeFetchError
    
    def test_rejects_file_scheme():
        with pytest.raises(SafeFetchError):
            safe_fetch("file:///etc/passwd")
    
    def test_rejects_loopback():
        with pytest.raises(SafeFetchError):
            safe_fetch("http://127.0.0.1/x")
    
    def test_rejects_rfc1918():
        with pytest.raises(SafeFetchError):
            safe_fetch("http://192.168.1.1/x")
    
    def test_rejects_metadata_endpoint():
        with pytest.raises(SafeFetchError):
            safe_fetch("http://169.254.169.254/latest/")
    
  • Step 2: Run red.

  • Step 3: Implement workers/void_workers/safe_fetch.py:

    import socket
    import ipaddress
    import urllib.request
    import os
    from urllib.parse import urlparse
    
    BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [
        "0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8",
        "172.16.0.0/12", "192.168.0.0/16",
        "169.254.0.0/16", "100.64.0.0/10",
    ]]
    
    class SafeFetchError(Exception):
        pass
    
    def _is_blocked(addr):
        if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true":
            return False
        try:
            ip = ipaddress.ip_address(addr)
        except ValueError:
            return True
        if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
            return True
        if isinstance(ip, ipaddress.IPv4Address):
            return any(ip in n for n in BLOCK_V4_NETS)
        # IPv6: ULA + link-local
        if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"):
            return True
        return False
    
    def _resolve(host):
        try:
            infos = socket.getaddrinfo(host, None)
        except socket.gaierror as e:
            raise SafeFetchError(f"no DNS for {host}: {e}")
        addrs = list({i[4][0] for i in infos})
        for a in addrs:
            if _is_blocked(a):
                raise SafeFetchError(f"{host} resolves to blocked address {a}")
        if not addrs:
            raise SafeFetchError(f"no addresses for {host}")
        return addrs[0]
    
    def safe_fetch(url, *, headers=None, timeout=15, max_hops=5):
        current = url
        for hop in range(max_hops + 1):
            u = urlparse(current)
            if u.scheme not in ("http", "https"):
                raise SafeFetchError(f"unsupported scheme {u.scheme}")
            host = u.hostname
            # If host is a literal IP, validate; else resolve + validate
            try:
                ipaddress.ip_address(host)
                if _is_blocked(host):
                    raise SafeFetchError(f"blocked literal IP {host}")
            except ValueError:
                _resolve(host)
            req = urllib.request.Request(current, headers=headers or {})
            try:
                opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler)
                with opener.open(req, timeout=timeout) as r:
                    return r.read()
            except urllib.error.HTTPError as e:
                if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops:
                    current = e.headers["Location"]
                    continue
                raise
        raise SafeFetchError(f"too many redirects ({max_hops})")
    
  • Step 4: Run green.

  • Step 5: Commit.

    git add workers/void_workers/safe_fetch.py workers/tests/test_safe_fetch.py
    git commit -m "feat(workers): safe_fetch (Python port)"
    

Task D2: sync.source_doc worker

Files:

  • Create: workers/void_workers/handlers/sourcedoc.py

  • Modify: workers/void_workers/handlers/__init__.py

  • Create: workers/tests/test_sourcedoc.py

  • Modify: workers/void_workers/repo.py — add get_source_doc, update_source_doc

  • Step 1: Failing test:

    # workers/tests/test_sourcedoc.py
    from unittest.mock import patch
    from void_workers.handlers.sourcedoc import handle as handle_sd
    
    def test_sourcedoc_updates_body_text(conn):
        conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
        import subprocess
        subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
        sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('sd', 'SD') RETURNING id").fetchone()[0]
        res = conn.execute(
            "INSERT INTO resources(space_id, slug, name, runtime_type) "
            "VALUES(%s, 'r', 'R', 'lxc') RETURNING id", (sp,)
        ).fetchone()[0]
        sd = conn.execute(
            "INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) "
            "VALUES(%s, 'doc', 'https://example.com/r', 'url') RETURNING id",
            (res,)
        ).fetchone()[0]
        with patch("void_workers.handlers.sourcedoc.safe_fetch",
                   return_value=b"hello world doc body"):
            out = handle_sd({"source_doc_id": str(sd)})
        assert out.get("updated") is True
        row = conn.execute("SELECT body_text FROM source_docs WHERE id=%s", (sd,)).fetchone()
        assert "hello world" in (row[0] or "")
    
  • Step 2: Run red.

  • Step 3: Extend workers/void_workers/repo.py with the two helpers:

    def get_source_doc(source_doc_id):
        with _conn() as conn:
            return conn.execute(
                "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
            ).fetchone()
    
    def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None):
        with _conn() as conn:
            before = conn.execute(
                "SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
            ).fetchone()
            sets, args = [], []
            if body_text is not None:
                sets.append("body_text=%s"); args.append(body_text)
            if last_synced is not None:
                sets.append("last_synced=%s"); args.append(last_synced)
            if metadata_patch is not None:
                sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
                args.append(json.dumps(metadata_patch))
            if not sets: return before
            sets.append("updated_at=now()")
            args.append(source_doc_id)
            after = conn.execute(
                f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args
            ).fetchone()
            conn.execute("""
              INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
              VALUES('worker', NULL, 'source_doc', %s, 'update', %s)
            """, (source_doc_id, json.dumps({"kind":"update","changes":{"body_text":"updated" if body_text is not None else None}})))
            return after
    
  • Step 4: Implement workers/void_workers/handlers/sourcedoc.py:

    import hashlib
    from datetime import datetime, timezone
    from .. import repo
    from ..safe_fetch import safe_fetch
    
    NAME = "sync.source_doc"
    
    def _sha(data):
        return hashlib.sha256(data).hexdigest()
    
    def handle(job_data: dict) -> dict:
        sd_id = job_data["source_doc_id"]
        doc = repo.get_source_doc(sd_id)
        if not doc:
            return {"skipped": "gone"}
        body = safe_fetch(doc["upstream_url"], headers={"User-Agent": "void-ingest/2.0"})
        new_sha = _sha(body)
        old_sha = ((doc.get("metadata") or {}).get("body_sha"))
        now = datetime.now(timezone.utc)
        if new_sha == old_sha:
            repo.update_source_doc(sd_id, last_synced=now)
            return {"unchanged": True}
        text = body.decode("utf-8", errors="replace")[:1_000_000]
        repo.update_source_doc(
            sd_id,
            body_text=text,
            last_synced=now,
            metadata_patch={"body_sha": new_sha}
        )
        return {"updated": True, "chars": len(text)}
    
  • Step 5: Register — edit workers/void_workers/handlers/__init__.py:

    from . import echo, pdf, image, video, sourcedoc
    REGISTRY = {
        echo.NAME: echo.handle,
        pdf.NAME: pdf.handle,
        image.NAME: image.handle,
        video.NAME: video.handle,
        sourcedoc.NAME: sourcedoc.handle,
    }
    
  • Step 6: Run green.

  • Step 7: Commit.

    git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/sourcedoc.py workers/void_workers/repo.py workers/tests/test_sourcedoc.py
    git commit -m "feat(workers): sync.source_doc with sha256 diff"
    

Task D3: Node-side cron — lib/cron/sync_source_docs.js

Files:

  • Modify: package.json — add node-cron.

  • Create: lib/cron/index.js

  • Create: lib/cron/sync_source_docs.js

  • Modify: server.js — start the cron in the CLI gate.

  • Create: tests/cron/sync_source_docs.test.js

  • Step 1: Add dep:

    cd /project/src/void-v2 && npm i node-cron@^3
    
  • Step 2: Failing test tests/cron/sync_source_docs.test.js:

    import { describe, it, expect, beforeEach, afterEach } from 'vitest';
    import { resetDb } from '../helpers/db.js';
    import { migrateUp } from '../../lib/db/migrate.js';
    import { stopBoss } from '../helpers/boss.js';
    import { pool } from '../../lib/db/pool.js';
    import * as queue from '../../lib/jobs/queue.js';
    import { registerWorkers } from '../../lib/jobs/index.js';
    import { runSync } from '../../lib/cron/sync_source_docs.js';
    import * as jobs from '../../lib/db/repos/jobs.js';
    
    beforeEach(async () => {
      await resetDb(); await migrateUp();
      await queue.start(); await registerWorkers();
    });
    afterEach(async () => { await stopBoss(); });
    
    describe('cron/sync_source_docs.runSync', () => {
      it('enqueues sync.source_doc for each url-synced row', async () => {
        const sp = (await pool.query(
          `INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id`
        )).rows[0].id;
        const res = (await pool.query(
          `INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`,
          [sp]
        )).rows[0].id;
        await pool.query(
          `INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`,
          [res]
        );
        const enqueued = await runSync();
        expect(enqueued).toBe(1);
        const queued = await jobs.list({ name: 'sync.source_doc' });
        expect(queued.length).toBe(1);
      });
    });
    
  • Step 3: Run red.

  • Step 4: Implement lib/cron/sync_source_docs.js:

    import { pool } from '../db/pool.js';
    import * as queue from '../jobs/queue.js';
    import { log } from '../log.js';
    
    export async function runSync() {
        const { rows } = await pool.query(
            `SELECT id FROM source_docs WHERE sync_source = 'url'`
        );
        for (const r of rows) {
            await queue.enqueue('sync.source_doc', { source_doc_id: r.id });
        }
        return rows.length;
    }
    
  • Step 5: Implement lib/cron/index.js:

    import cron from 'node-cron';
    import { runSync } from './sync_source_docs.js';
    import { log } from '../log.js';
    
    export function startCron() {
        // Daily at 03:00 local time
        cron.schedule('0 3 * * *', async () => {
            try {
                const n = await runSync();
                log.info({ enqueued: n }, 'cron sync.source_doc complete');
            } catch (e) {
                log.error({ err: e }, 'cron sync.source_doc failed');
            }
        });
        log.info('cron started');
    }
    
  • Step 6: Wire into server.js — extend the CLI gate:

    // additions
    import { startCron } from './lib/cron/index.js';
    
    // Inside the `if (import.meta.url === ...)` block, after registerWorkers:
    startCron();
    
  • Step 7: Run green:

    npx vitest run tests/cron/sync_source_docs.test.js
    
  • Step 8: Commit.

    git add package.json package-lock.json lib/cron/index.js lib/cron/sync_source_docs.js server.js tests/cron/sync_source_docs.test.js
    git commit -m "feat(cron): daily sync.source_doc enqueue"
    

Task D4: Version bump + CHANGELOG + completion doc

Files:

  • Modify: package.json2.0.0-alpha.4

  • Modify: server.jsVERSION = '2.0.0-alpha.4'

  • Modify: tests/server.test.js → assert 2.0.0-alpha.4

  • Modify: CHANGELOG.md — append [2.0.0-alpha.4] entry

  • Create: docs/plan-4-complete.md

  • Step 1: Bump version literals (three files). Use Edit.

  • Step 2: Append CHANGELOG entry:

    ## [2.0.0-alpha.4] — 2026-06-NN
    
    ### Added (Plan 4: Python void-workers)
    - `void-workers.service` — Python 3.12 service alongside `void-server`
      on CT 311. psycopg-based pg-boss client matches Node's claim/finish
      semantics via `SELECT ... FOR UPDATE SKIP LOCKED`.
    - **`extract.pdf`** — pdftotext first; per-page Tesseract OCR fallback
      when extraction yields < 200 chars.
    - **`extract.image`** — Tesseract OCR with English data.
    - **`ingest.video`** — yt-dlp metadata + audio extraction + faster-whisper
      transcription. CUDA at startup, CPU fallback on HA failover to Z3.
      YouTube / youtu.be / Vimeo URLs route through `/api/capture` to
      `ingest.video` instead of `ingest.url`.
    - **`sync.source_doc`** — fetch upstream + sha256 diff + update
      `body_text`. Node cron enqueues daily at 03:00.
    - **Node `blob.js`** fans out to `extract.pdf` / `extract.image` after
      creating PDF / image refs.
    - **CT 311 infrastructure**: resized to 6 cores / 8 GB RAM, GPU device
      nodes passed through (shared with CT 102's Ollama).
    
  • Step 3: Create docs/plan-4-complete.md (use the Plan 3 doc as a template — sections: Date / Version / Tests / Commits / Snapshots / Scope per phase / Security findings handled / Open items / What's left after Plan 4).

  • Step 4: Run full test suites green (Node + Python).

    cd /project/src/void-v2 && npx vitest run
    cd workers && DATABASE_URL=... pytest -v
    
  • Step 5: Commit.

    git add package.json server.js tests/server.test.js CHANGELOG.md docs/plan-4-complete.md
    git commit -m "chore: version 2.0.0-alpha.4 + changelog + plan-4 completion doc"
    

Task D5: Plan 4 close — snapshot + memory

  • Step 1: Snapshot CT 310 + 311 — plan4_complete_<timestamp>.
  • Step 2: Update memory project_void_v2_execution.md — mark Plan 4 complete; note alpha-4 source-built but not yet deployed (deploy is a separate user-OK decision per the standing rule).

Spec coverage check

Every Plan 4 spec section maps to at least one task:

Spec § Task(s)
Phase A harness A1, A2, A3, A4
Phase B PDF + image B1, B2, B3, B4, B5
Phase C Whisper + yt-dlp + GPU C1, C2, C3, C4, C5
Phase D source-doc sync + version D1, D2, D3, D4
safe_fetch Python port D1
Concurrency env config A1 (config.py) — applies across handlers
systemd MemoryMax=6G A4 (void-workers.service)
CT 311 resize + GPU passthrough C1
YouTube routing in capture.js C5
blob.js fans out B5
Repo audit shim (actor_kind='worker') B2 (repo.update_ref), C4 (repo.create_ref), D2 (repo.update_source_doc)

Type & name consistency

  • Worker name strings: echo, extract.pdf, extract.image, ingest.video, sync.source_doc — exact same strings used in NAME constants, registry keys, Node enqueue calls, and tests.
  • Handler signature: def handle(job_data: dict) -> dict everywhere.
  • Job payload shapes:
    • extract.pdf / extract.image: { ref_id, blob_path } (both producers Node-side use these keys; consumer reads same keys).
    • ingest.video: { space_id, url }.
    • sync.source_doc: { source_doc_id } (Node cron and Python worker both use this exact key).
  • repo.update_ref / repo.create_ref / repo.update_source_doc / repo.get_source_doc — same names across all tasks that touch them.