2122 lines
72 KiB
Markdown
2122 lines
72 KiB
Markdown
# Void 2.0 — Plan 4: Python void-workers (heavy ML ingest)
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Stand up the Python `void-workers` service alongside the existing Node `void-server` on CT 311. Workers claim jobs from the same pg-boss queue via native SQL (`SELECT ... FOR UPDATE SKIP LOCKED`) and process four kinds: `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Whisper detects CUDA at startup and falls back to CPU when the HA replica is running on Z3 (no GPU).
|
|
|
|
**Architecture:** Single CT, two systemd units (`void-server.service` + `void-workers.service`). Shared blob store at `/var/lib/void/blobs`. Shared Postgres at CT 310. Python venv at `/opt/void-workers/venv`. Per-kind handlers run as threads inside one Python process; concurrency configured via env. Node-side hooks: capture.js routes YouTube URLs to `ingest.video`; blob worker enqueues `extract.pdf`/`extract.image` based on `kind`.
|
|
|
|
**Tech Stack:** Python 3.12, psycopg 3, pdftotext + Tesseract + Poppler + Pillow, faster-whisper (CTranslate2 backend), yt-dlp, ffmpeg, structlog, pytest, vitest (existing), node-cron (new Node dep for Phase D).
|
|
|
|
**Spec:** `docs/superpowers/specs/2026-06-01-void-v2-plan4-workers.md`.
|
|
|
|
---
|
|
|
|
## Out of scope (Plan 5+)
|
|
|
|
- MCP server surface, Companion chat (Plan 5).
|
|
- Sacred Valley widgets (Plan 6).
|
|
- Speaker diarization, per-language Tesseract data, larger Whisper models with proper GPU budgeting.
|
|
- "Re-transcribe" UI action.
|
|
|
|
## Conventions
|
|
|
|
1. **TDD.** Write the failing test first. Run it red. Implement. Run it green. Commit.
|
|
2. **Worker handler shape:** `def handler(job_data: dict) -> dict` — return the success output, raise for retry.
|
|
3. **Worker name strings:** `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc`. Exact strings — used in Node enqueue calls AND Python `SUBSCRIBE` registrations.
|
|
4. **All Python repo writes pass `actor={'kind': 'worker', 'id': None}`** when emitting audit_log rows.
|
|
5. **Commit per task.**
|
|
6. **Snapshot CT 310 + CT 311 before any phase that touches infra** (the standing rule). Phase A is code-only. Phase B installs system deps. Phase C resizes the CT + adds GPU passthrough — explicit snapshot before that one.
|
|
|
|
---
|
|
|
|
## File structure (what this plan creates)
|
|
|
|
```
|
|
workers/ # NEW — Python service lives in same repo
|
|
pyproject.toml # Python 3.12 + deps (psycopg, structlog, ...)
|
|
README.md # short ops notes
|
|
void_workers/
|
|
__init__.py
|
|
config.py # env loader
|
|
log.py # structlog setup
|
|
boss.py # claim / finish / fail / retry against pgboss tables
|
|
repo.py # psycopg shim for refs / source_docs updates + audit
|
|
safe_fetch.py # Python port of lib/ingest/safe_fetch.js
|
|
runner.py # main: load handlers, spawn loops, signal handlers
|
|
handlers/
|
|
__init__.py
|
|
echo.py # trivial — proves harness end-to-end
|
|
pdf.py # extract.pdf (Phase B)
|
|
image.py # extract.image (Phase B)
|
|
video.py # ingest.video (Phase C)
|
|
sourcedoc.py # sync.source_doc (Phase D)
|
|
model.py # Whisper loader (CUDA detect + CPU fallback) — Phase C
|
|
tests/
|
|
conftest.py # shared fixtures (DB url, pgboss schema reset, sample blobs)
|
|
fixtures/
|
|
born_digital.pdf # tiny PDF with embedded text
|
|
scanned.pdf # tiny PDF that is a single page of an image
|
|
eng_text.png # PNG with known text baked in
|
|
test_boss.py
|
|
test_echo.py
|
|
test_pdf.py
|
|
test_image.py
|
|
test_video.py
|
|
test_sourcedoc.py
|
|
test_safe_fetch.py
|
|
|
|
deploy/
|
|
void-workers.service # NEW systemd unit
|
|
push-workers.sh # NEW — sibling to push.sh
|
|
README.md # extend: workers bootstrap section
|
|
|
|
lib/ # Node-side changes (already exists)
|
|
jobs/workers/blob.js # MODIFY: after creating ref, enqueue extract.pdf/extract.image
|
|
api/routes/capture.js # MODIFY: detect YouTube/Vimeo, enqueue ingest.video
|
|
cron/sync_source_docs.js # NEW — node-cron, fires sync.source_doc daily
|
|
|
|
package.json # ADD node-cron, bump version in Phase D
|
|
server.js # bump VERSION + boot the cron in Phase D
|
|
CHANGELOG.md # Phase D entry
|
|
tests/server.test.js # version assertion bump
|
|
docs/plan-4-complete.md # Phase D
|
|
```
|
|
|
|
---
|
|
|
|
## Phase A — Python harness + systemd unit
|
|
|
|
### Task A1: Workers skeleton + venv install on the dev box
|
|
|
|
**Files:**
|
|
- Create: `workers/pyproject.toml`
|
|
- Create: `workers/void_workers/__init__.py`
|
|
- Create: `workers/void_workers/config.py`
|
|
- Create: `workers/void_workers/log.py`
|
|
- Create: `workers/README.md`
|
|
|
|
- [ ] **Step 1:** Create `workers/pyproject.toml`:
|
|
|
|
```toml
|
|
[project]
|
|
name = "void-workers"
|
|
version = "0.1.0"
|
|
requires-python = ">=3.12"
|
|
dependencies = [
|
|
"psycopg[binary,pool]>=3.2",
|
|
"structlog>=24.1",
|
|
]
|
|
|
|
[project.optional-dependencies]
|
|
pdf = ["pdfplumber>=0.11", "pytesseract>=0.3.13", "pillow>=10.3"]
|
|
image = ["pytesseract>=0.3.13", "pillow>=10.3"]
|
|
video = ["yt-dlp>=2024.10.0", "faster-whisper>=1.0.3"]
|
|
test = ["pytest>=8.0", "pytest-asyncio>=0.23"]
|
|
all = ["void-workers[pdf,image,video,test]"]
|
|
|
|
[tool.setuptools.packages.find]
|
|
where = ["."]
|
|
include = ["void_workers*"]
|
|
```
|
|
|
|
- [ ] **Step 2:** Create `workers/void_workers/__init__.py`:
|
|
|
|
```python
|
|
__version__ = "0.1.0"
|
|
```
|
|
|
|
- [ ] **Step 3:** Create `workers/void_workers/config.py`:
|
|
|
|
```python
|
|
import os
|
|
|
|
def env(name, default=None, required=False):
|
|
v = os.environ.get(name, default)
|
|
if required and v is None:
|
|
raise RuntimeError(f"env {name} is required")
|
|
return v
|
|
|
|
def env_int(name, default):
|
|
return int(os.environ.get(name, default))
|
|
|
|
DATABASE_URL = env("DATABASE_URL", required=True)
|
|
BLOB_ROOT = env("BLOB_ROOT", "/var/lib/void/blobs")
|
|
WHISPER_MODEL = env("WHISPER_MODEL", "small.en")
|
|
WHISPER_CACHE = env("WHISPER_CACHE", "/var/lib/void/whisper-models")
|
|
ALLOW_PRIVATE = env("VOID_INGEST_ALLOW_PRIVATE", "false") == "true"
|
|
|
|
CONCURRENCY = {
|
|
"extract.pdf": env_int("VOID_CONCURRENCY_EXTRACT_PDF", 2),
|
|
"extract.image": env_int("VOID_CONCURRENCY_EXTRACT_IMAGE", 2),
|
|
"ingest.video": env_int("VOID_CONCURRENCY_INGEST_VIDEO", 1),
|
|
"sync.source_doc": env_int("VOID_CONCURRENCY_SYNC_SOURCE_DOC", 1),
|
|
"echo": env_int("VOID_CONCURRENCY_ECHO", 1),
|
|
}
|
|
|
|
POLL_INTERVAL_MS = env_int("VOID_POLL_INTERVAL_MS", 1000)
|
|
```
|
|
|
|
- [ ] **Step 4:** Create `workers/void_workers/log.py`:
|
|
|
|
```python
|
|
import logging
|
|
import structlog
|
|
|
|
def init():
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.contextvars.merge_contextvars,
|
|
structlog.processors.add_log_level,
|
|
structlog.processors.TimeStamper(fmt="iso"),
|
|
structlog.processors.JSONRenderer(),
|
|
],
|
|
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
|
|
)
|
|
return structlog.get_logger()
|
|
|
|
log = init()
|
|
```
|
|
|
|
- [ ] **Step 5:** Create `workers/README.md`:
|
|
|
|
```markdown
|
|
# void-workers
|
|
|
|
Python ML ingest service alongside void-server (Node). Sibling of `lib/`.
|
|
|
|
## Local dev
|
|
|
|
```bash
|
|
cd workers
|
|
python3.12 -m venv .venv
|
|
. .venv/bin/activate
|
|
pip install -e ".[all]"
|
|
export DATABASE_URL="postgres://..."
|
|
python -m void_workers.runner
|
|
```
|
|
|
|
## Tests
|
|
|
|
```bash
|
|
pip install -e ".[test,all]"
|
|
pytest -v
|
|
```
|
|
|
|
See `../docs/superpowers/plans/2026-06-01-void-v2-plan4-workers.md` for the full plan.
|
|
```
|
|
|
|
- [ ] **Step 6:** Create the venv + install:
|
|
|
|
```bash
|
|
cd /project/src/void-v2/workers
|
|
python3.12 -m venv .venv
|
|
. .venv/bin/activate
|
|
pip install -e ".[all]"
|
|
python -c "from void_workers import __version__; print(__version__)"
|
|
```
|
|
Expected: `0.1.0`.
|
|
|
|
- [ ] **Step 7: Commit.**
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
git add workers/pyproject.toml workers/README.md workers/void_workers/__init__.py workers/void_workers/config.py workers/void_workers/log.py
|
|
git commit -m "feat(workers): Python skeleton + config + structlog"
|
|
```
|
|
|
|
### Task A2: pgboss claim/finish helpers in `boss.py`
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/boss.py`
|
|
- Create: `workers/tests/conftest.py`
|
|
- Create: `workers/tests/test_boss.py`
|
|
|
|
- [ ] **Step 1:** Create `workers/tests/conftest.py`:
|
|
|
|
```python
|
|
import os
|
|
import pytest
|
|
import psycopg
|
|
|
|
DB_URL = os.environ["DATABASE_URL"]
|
|
|
|
@pytest.fixture
|
|
def conn():
|
|
with psycopg.connect(DB_URL, autocommit=True) as c:
|
|
yield c
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_pgboss(conn):
|
|
"""Drop and recreate the pgboss schema before each test."""
|
|
conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE")
|
|
# We do NOT recreate it — pg-boss's first start() makes the schema.
|
|
# Tests that need it call ensure_pgboss(conn).
|
|
yield
|
|
|
|
def ensure_pgboss(conn):
|
|
"""Bring pgboss schema up by shelling out to Node's queue.start()."""
|
|
# In tests we cannot rely on Node — so we create the bare minimum
|
|
# tables we touch. Real pgboss tables are richer; we only need the
|
|
# columns boss.py reads and writes.
|
|
conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS pgboss.job (
|
|
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
name text NOT NULL,
|
|
priority int NOT NULL DEFAULT 0,
|
|
data jsonb,
|
|
state text NOT NULL DEFAULT 'created',
|
|
retry_limit int NOT NULL DEFAULT 5,
|
|
retry_count int NOT NULL DEFAULT 0,
|
|
retry_delay int NOT NULL DEFAULT 10,
|
|
retry_backoff boolean NOT NULL DEFAULT true,
|
|
start_after timestamptz NOT NULL DEFAULT now(),
|
|
started_on timestamptz,
|
|
completed_on timestamptz,
|
|
created_on timestamptz NOT NULL DEFAULT now(),
|
|
output jsonb
|
|
)
|
|
""")
|
|
|
|
@pytest.fixture
|
|
def boss_ready(conn):
|
|
ensure_pgboss(conn)
|
|
yield conn
|
|
```
|
|
|
|
- [ ] **Step 2:** Write the failing test `workers/tests/test_boss.py`:
|
|
|
|
```python
|
|
import json
|
|
from void_workers.boss import Boss
|
|
|
|
def test_claim_returns_none_when_no_jobs(boss_ready):
|
|
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
|
|
assert boss.claim("echo") is None
|
|
|
|
def test_claim_then_complete_round_trip(boss_ready):
|
|
boss_ready.execute(
|
|
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
|
|
(json.dumps({"ping": 1}),)
|
|
)
|
|
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
|
|
job = boss.claim("echo")
|
|
assert job is not None
|
|
assert job["data"] == {"ping": 1}
|
|
boss.complete(job["id"], {"pong": 1})
|
|
row = boss_ready.execute(
|
|
"SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],)
|
|
).fetchone()
|
|
assert row[0] == "completed"
|
|
assert row[1] == {"pong": 1}
|
|
|
|
def test_fail_records_output_and_state(boss_ready):
|
|
boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')")
|
|
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
|
|
job = boss.claim("echo")
|
|
boss.fail(job["id"], {"name": "BoomError", "message": "boom"})
|
|
row = boss_ready.execute(
|
|
"SELECT state, output, retry_count FROM pgboss.job WHERE id=%s",
|
|
(job["id"],)
|
|
).fetchone()
|
|
# retry_limit defaults to 5 and retry_count was 0 → should move to 'retry'
|
|
assert row[0] == "retry"
|
|
assert row[2] == 1
|
|
|
|
def test_fail_past_retry_limit_marks_failed(boss_ready):
|
|
boss_ready.execute(
|
|
"INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)"
|
|
)
|
|
boss = Boss(dsn=__import__("void_workers.config", fromlist=["DATABASE_URL"]).DATABASE_URL)
|
|
job = boss.claim("echo")
|
|
boss.fail(job["id"], {"name": "Done"})
|
|
row = boss_ready.execute(
|
|
"SELECT state FROM pgboss.job WHERE id=%s", (job["id"],)
|
|
).fetchone()
|
|
assert row[0] == "failed"
|
|
```
|
|
|
|
- [ ] **Step 3:** Run red:
|
|
|
|
```bash
|
|
cd /project/src/void-v2/workers
|
|
. .venv/bin/activate
|
|
DATABASE_URL="$(grep ^DATABASE_URL= ../.env 2>/dev/null | cut -d= -f2- ||
|
|
echo "postgres://void:$(ssh root@192.168.1.124 cat /root/void2-db-pass.txt | sed s/^DB_PASS=//)@192.168.1.215:5432/void")" \
|
|
pytest tests/test_boss.py -v
|
|
```
|
|
Expected: 4 failed (boss.py does not exist).
|
|
|
|
- [ ] **Step 4:** Implement `workers/void_workers/boss.py`:
|
|
|
|
```python
|
|
import json
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
|
|
class Boss:
|
|
def __init__(self, dsn):
|
|
self.dsn = dsn
|
|
|
|
def _conn(self):
|
|
return psycopg.connect(self.dsn, autocommit=False, row_factory=dict_row)
|
|
|
|
def claim(self, queue):
|
|
"""Atomically claim one job for the given queue. Returns dict or None."""
|
|
with self._conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
SELECT id, data, retry_count, retry_limit
|
|
FROM pgboss.job
|
|
WHERE name=%s
|
|
AND state IN ('created','retry')
|
|
AND start_after <= now()
|
|
ORDER BY priority DESC, created_on ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT 1
|
|
""", (queue,))
|
|
row = cur.fetchone()
|
|
if not row: return None
|
|
cur.execute(
|
|
"UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s",
|
|
(row["id"],)
|
|
)
|
|
conn.commit()
|
|
return row
|
|
|
|
def complete(self, job_id, output):
|
|
with self._conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
UPDATE pgboss.job
|
|
SET state='completed', completed_on=now(), output=%s
|
|
WHERE id=%s
|
|
""", (json.dumps(output), job_id))
|
|
conn.commit()
|
|
|
|
def fail(self, job_id, output):
|
|
"""Mark the job retry (if budget left) or failed (otherwise)."""
|
|
with self._conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT retry_count, retry_limit, retry_delay, retry_backoff "
|
|
"FROM pgboss.job WHERE id=%s FOR UPDATE",
|
|
(job_id,)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row: return
|
|
new_count = row["retry_count"] + 1
|
|
if new_count > row["retry_limit"]:
|
|
cur.execute(
|
|
"UPDATE pgboss.job SET state='failed', output=%s, "
|
|
"completed_on=now() WHERE id=%s",
|
|
(json.dumps(output), job_id)
|
|
)
|
|
else:
|
|
# exponential backoff: delay * 2^(count-1)
|
|
delay = row["retry_delay"]
|
|
if row["retry_backoff"]:
|
|
delay = delay * (2 ** (new_count - 1))
|
|
cur.execute("""
|
|
UPDATE pgboss.job
|
|
SET state='retry',
|
|
retry_count=%s,
|
|
start_after=now() + (%s || ' seconds')::interval,
|
|
output=%s
|
|
WHERE id=%s
|
|
""", (new_count, str(delay), json.dumps(output), job_id))
|
|
conn.commit()
|
|
```
|
|
|
|
- [ ] **Step 5:** Run green:
|
|
|
|
```bash
|
|
DATABASE_URL=... pytest tests/test_boss.py -v
|
|
```
|
|
Expected: 4 passed.
|
|
|
|
- [ ] **Step 6: Commit.**
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
git add workers/void_workers/boss.py workers/tests/conftest.py workers/tests/test_boss.py
|
|
git commit -m "feat(workers): pgboss claim/complete/fail via psycopg"
|
|
```
|
|
|
|
### Task A3: Echo handler + runner loop
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/handlers/__init__.py`
|
|
- Create: `workers/void_workers/handlers/echo.py`
|
|
- Create: `workers/void_workers/runner.py`
|
|
- Create: `workers/tests/test_echo.py`
|
|
|
|
- [ ] **Step 1:** Write failing test `workers/tests/test_echo.py`:
|
|
|
|
```python
|
|
import json
|
|
import threading
|
|
import time
|
|
from void_workers.boss import Boss
|
|
from void_workers.runner import Runner
|
|
from void_workers.config import DATABASE_URL
|
|
|
|
def test_echo_handler_runs(boss_ready):
|
|
boss_ready.execute(
|
|
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
|
|
(json.dumps({"ping": 42}),)
|
|
)
|
|
runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}})
|
|
t = threading.Thread(target=runner.run, kwargs={"once": True}, daemon=True)
|
|
t.start()
|
|
t.join(timeout=5)
|
|
row = boss_ready.execute(
|
|
"SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1"
|
|
).fetchone()
|
|
assert row[0] == "completed"
|
|
assert row[1] == {"pong": 42}
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red:
|
|
|
|
```bash
|
|
DATABASE_URL=... pytest tests/test_echo.py -v
|
|
```
|
|
Expected: ImportError / 1 failed.
|
|
|
|
- [ ] **Step 3:** Implement `workers/void_workers/handlers/__init__.py`:
|
|
|
|
```python
|
|
from . import echo
|
|
|
|
REGISTRY = {
|
|
echo.NAME: echo.handle,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4:** Implement `workers/void_workers/handlers/echo.py`:
|
|
|
|
```python
|
|
NAME = "echo"
|
|
|
|
def handle(job_data: dict) -> dict:
|
|
return {"pong": job_data.get("ping", 0)}
|
|
```
|
|
|
|
- [ ] **Step 5:** Implement `workers/void_workers/runner.py`:
|
|
|
|
```python
|
|
import signal
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from .boss import Boss
|
|
from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL
|
|
from .handlers import REGISTRY
|
|
from .log import log
|
|
|
|
class Runner:
|
|
def __init__(self, dsn=None, handlers=None):
|
|
self.boss = Boss(dsn or DATABASE_URL)
|
|
self.handlers = handlers or {
|
|
name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY
|
|
}
|
|
self._stop = threading.Event()
|
|
|
|
def _process_one(self, queue):
|
|
job = self.boss.claim(queue)
|
|
if not job: return False
|
|
log.info("job_claimed", job_id=str(job["id"]), name=queue)
|
|
handler = REGISTRY[queue]
|
|
try:
|
|
out = handler(job["data"] or {})
|
|
self.boss.complete(job["id"], out or {})
|
|
log.info("job_completed", job_id=str(job["id"]), name=queue)
|
|
except Exception as e:
|
|
log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e))
|
|
self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)})
|
|
return True
|
|
|
|
def _loop(self, queue):
|
|
interval = POLL_INTERVAL_MS / 1000.0
|
|
while not self._stop.is_set():
|
|
try:
|
|
did = self._process_one(queue)
|
|
except Exception as e:
|
|
log.error("loop_error", queue=queue, err=str(e))
|
|
did = False
|
|
if not did:
|
|
self._stop.wait(interval)
|
|
|
|
def run(self, once=False):
|
|
if once:
|
|
for q in self.handlers:
|
|
self._process_one(q)
|
|
return
|
|
executor = ThreadPoolExecutor(max_workers=sum(h["concurrency"] for h in self.handlers.values()))
|
|
for q, cfg in self.handlers.items():
|
|
for _ in range(cfg["concurrency"]):
|
|
executor.submit(self._loop, q)
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
signal.signal(sig, lambda *_: self._stop.set())
|
|
while not self._stop.is_set():
|
|
time.sleep(0.5)
|
|
executor.shutdown(wait=True)
|
|
|
|
if __name__ == "__main__":
|
|
Runner().run()
|
|
```
|
|
|
|
- [ ] **Step 6:** Run green:
|
|
|
|
```bash
|
|
DATABASE_URL=... pytest tests/test_echo.py -v
|
|
```
|
|
Expected: 1 passed.
|
|
|
|
- [ ] **Step 7: Commit.**
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/echo.py workers/void_workers/runner.py workers/tests/test_echo.py
|
|
git commit -m "feat(workers): runner loop + echo handler"
|
|
```
|
|
|
|
### Task A4: systemd unit + push-workers.sh
|
|
|
|
**Files:**
|
|
- Create: `deploy/void-workers.service`
|
|
- Create: `deploy/push-workers.sh`
|
|
- Modify: `deploy/README.md` (append workers bootstrap section)
|
|
|
|
- [ ] **Step 1:** Create `deploy/void-workers.service`:
|
|
|
|
```ini
|
|
[Unit]
|
|
Description=Void 2.0 workers
|
|
After=network-online.target void-server.service
|
|
Wants=network-online.target
|
|
|
|
[Service]
|
|
Type=simple
|
|
User=voidworkers
|
|
WorkingDirectory=/opt/void-workers
|
|
EnvironmentFile=/opt/void-workers/.env
|
|
ExecStart=/opt/void-workers/venv/bin/python -m void_workers.runner
|
|
Restart=on-failure
|
|
RestartSec=5
|
|
StandardOutput=journal
|
|
StandardError=journal
|
|
MemoryMax=6G
|
|
|
|
[Install]
|
|
WantedBy=multi-user.target
|
|
```
|
|
|
|
- [ ] **Step 2:** Create `deploy/push-workers.sh`:
|
|
|
|
```bash
|
|
#!/usr/bin/env bash
|
|
set -euo pipefail
|
|
|
|
# Push Python void-workers source to CT 311 and restart the service.
|
|
# Run from /project/src/void-v2.
|
|
|
|
TARGET=${TARGET:-root@192.168.1.13}
|
|
REMOTE_DIR=${REMOTE_DIR:-/opt/void-workers}
|
|
|
|
rsync -avz --delete \
|
|
--exclude .venv \
|
|
--exclude __pycache__ \
|
|
--exclude '*.egg-info' \
|
|
--exclude tests \
|
|
workers/ "$TARGET:$REMOTE_DIR/"
|
|
|
|
ssh "$TARGET" "
|
|
cd $REMOTE_DIR
|
|
if [ ! -d venv ]; then
|
|
python3.12 -m venv venv
|
|
fi
|
|
. venv/bin/activate
|
|
pip install --quiet --upgrade pip
|
|
pip install --quiet -e '.[all]'
|
|
systemctl restart void-workers
|
|
"
|
|
echo "Workers deployed."
|
|
```
|
|
|
|
Make it executable:
|
|
```bash
|
|
chmod +x deploy/push-workers.sh
|
|
```
|
|
|
|
- [ ] **Step 3:** Extend `deploy/README.md` — append the following section:
|
|
|
|
```markdown
|
|
|
|
## Workers (Python void-workers — Plan 4+)
|
|
|
|
Runs alongside void-server as a second systemd unit.
|
|
|
|
One-time setup on CT 311:
|
|
|
|
```bash
|
|
apt install -y python3.12 python3.12-venv python3-pip \
|
|
ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils
|
|
|
|
useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers
|
|
mkdir -p /opt/void-workers /var/lib/void/whisper-models
|
|
chown voidworkers: /opt/void-workers
|
|
chown -R voidworkers: /var/lib/void/whisper-models
|
|
|
|
# voidworkers needs to read the shared blob store
|
|
usermod -aG void voidworkers
|
|
|
|
install -m 644 deploy/void-workers.service /etc/systemd/system/
|
|
systemctl daemon-reload
|
|
systemctl enable void-workers
|
|
|
|
# /opt/void-workers/.env — see plan §Deploy delta
|
|
```
|
|
|
|
Deploy after edits:
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
./deploy/push-workers.sh
|
|
```
|
|
```
|
|
|
|
- [ ] **Step 4:** Run the existing Node test suite — must still be green (we have not modified anything Node touches yet).
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
npx vitest run
|
|
```
|
|
|
|
- [ ] **Step 5: Commit.**
|
|
|
|
```bash
|
|
git add deploy/void-workers.service deploy/push-workers.sh deploy/README.md
|
|
git commit -m "feat(workers): systemd unit + push-workers.sh"
|
|
```
|
|
|
|
### Task A5: Phase A close — memory checkpoint
|
|
|
|
- [ ] **Step 1:** Update memory `project_void_v2_execution.md`: mark "Plan 4 Phase A complete: harness + echo running locally; void-workers.service installed on CT 311 pending real deps (Phase B+)".
|
|
- [ ] **Step 2:** Note the boss.py contract decision (psycopg + native SQL) so future plans don't re-litigate it.
|
|
|
|
---
|
|
|
|
## Phase B — `extract.pdf` + `extract.image`
|
|
|
|
### Task B1: Install system deps on the dev box
|
|
|
|
These deps support local pytest runs. The same packages will be installed on CT 311 when Phase A's systemd unit lands there.
|
|
|
|
- [ ] **Step 1:** Verify deps locally (we are running on Ubuntu 24.04 LXC per CLAUDE.md):
|
|
|
|
```bash
|
|
command -v pdftotext && command -v tesseract && command -v ffmpeg || echo "missing deps"
|
|
apt list --installed 2>/dev/null | grep -E "poppler|tesseract|ffmpeg" | head -5
|
|
```
|
|
|
|
- [ ] **Step 2:** If any are missing, install:
|
|
|
|
```bash
|
|
apt install -y poppler-utils tesseract-ocr tesseract-ocr-eng ffmpeg
|
|
```
|
|
|
|
- [ ] **Step 3:** Install pdf/image Python extras into the workers venv:
|
|
|
|
```bash
|
|
cd /project/src/void-v2/workers
|
|
. .venv/bin/activate
|
|
pip install -e '.[pdf,image,test]'
|
|
python -c "import pdfplumber, pytesseract; print('ok')"
|
|
```
|
|
|
|
- [ ] **Step 4:** Add a `tests/fixtures/` README explaining how fixtures were generated (so we can regenerate them):
|
|
|
|
Create `workers/tests/fixtures/README.md`:
|
|
|
|
```markdown
|
|
# Test fixtures
|
|
|
|
- `born_digital.pdf` — generated by
|
|
`pdftotext -version 2>&1 | head -1 > /tmp/b.txt && enscript -p /tmp/b.ps /tmp/b.txt && ps2pdf /tmp/b.ps born_digital.pdf`
|
|
(or any tool that produces a PDF with embedded text). Must contain
|
|
the string "void-workers".
|
|
- `scanned.pdf` — `convert -density 200 eng_text.png scanned.pdf` (PDF
|
|
that is a single page of an image — `pdftotext` will yield nothing,
|
|
Tesseract fallback must pick it up).
|
|
- `eng_text.png` — a PNG with a clear English sentence on white
|
|
background. Must contain the string "blackflame".
|
|
```
|
|
|
|
Generate them now:
|
|
|
|
```bash
|
|
cd /project/src/void-v2/workers/tests/fixtures
|
|
# eng_text.png: render text with ImageMagick (apt: imagemagick)
|
|
apt install -y imagemagick ghostscript
|
|
convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 36 \
|
|
-fill black -annotate +50+100 "blackflame palette" eng_text.png
|
|
# born_digital.pdf: simple TXT → PDF
|
|
echo "void-workers proof-of-life document" > /tmp/b.txt
|
|
enscript -p /tmp/b.ps /tmp/b.txt 2>/dev/null || echo "enscript missing — using ImageMagick"
|
|
if [ -f /tmp/b.ps ]; then
|
|
ps2pdf /tmp/b.ps born_digital.pdf
|
|
else
|
|
convert -size 800x200 xc:white -font DejaVu-Sans -pointsize 24 \
|
|
-fill black -annotate +50+100 "void-workers proof-of-life" born_digital.pdf
|
|
# NOTE: this is a single-image PDF too — pdftotext will return empty.
|
|
# Override: use poppler's pdftohtml tools or create with text.
|
|
fi
|
|
# scanned.pdf: PNG → PDF
|
|
convert -density 200 eng_text.png scanned.pdf
|
|
ls -la
|
|
```
|
|
|
|
If `enscript` is not available, install or use another text-PDF tool. The key fixture invariant is that `pdftotext born_digital.pdf -` returns non-empty text.
|
|
|
|
- [ ] **Step 5: Commit fixtures.**
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
git add workers/tests/fixtures
|
|
git commit -m "test(workers): pdf/image test fixtures"
|
|
```
|
|
|
|
### Task B2: `extract.pdf` worker — pdftotext path
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/handlers/pdf.py`
|
|
- Modify: `workers/void_workers/handlers/__init__.py`
|
|
- Create: `workers/void_workers/repo.py`
|
|
- Create: `workers/tests/test_pdf.py`
|
|
|
|
- [ ] **Step 1:** Write the failing test:
|
|
|
|
```python
|
|
# workers/tests/test_pdf.py
|
|
import json
|
|
import os
|
|
import psycopg
|
|
from pathlib import Path
|
|
from void_workers.handlers.pdf import handle as handle_pdf
|
|
from void_workers.config import DATABASE_URL
|
|
|
|
FIXTURES = Path(__file__).parent / "fixtures"
|
|
|
|
def _seed_space_and_ref(conn, blob_path, kind="pdf"):
|
|
"""Bring up the Void schema (matches migrate.js) just enough to insert a ref."""
|
|
sp = conn.execute(
|
|
"INSERT INTO spaces(slug, name) VALUES('plan4-tests', 'P4') "
|
|
"ON CONFLICT (slug) DO UPDATE SET name=EXCLUDED.name RETURNING id"
|
|
).fetchone()[0]
|
|
ref = conn.execute(
|
|
"INSERT INTO refs(space_id, kind, source_url, title, blob_path) "
|
|
"VALUES(%s, %s, NULL, 'fixture', %s) RETURNING id",
|
|
(sp, kind, str(blob_path))
|
|
).fetchone()[0]
|
|
return sp, ref
|
|
|
|
def test_pdf_born_digital_uses_pdftotext(conn):
|
|
"""A PDF with embedded text gets pdftotext-extracted body."""
|
|
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
|
|
# Apply Void migrations against the conn
|
|
_run_node_migrations()
|
|
blob = FIXTURES / "born_digital.pdf"
|
|
sp, ref = _seed_space_and_ref(conn, blob)
|
|
out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)})
|
|
assert out["ref_id"] == str(ref)
|
|
assert out["chars"] > 0
|
|
row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
|
|
assert "void-workers" in (row[0] or "").lower()
|
|
|
|
def _run_node_migrations():
|
|
"""Shell to Node's migrate runner — same DB."""
|
|
import subprocess
|
|
subprocess.run(
|
|
["node", "lib/db/migrate.js", "up"],
|
|
cwd="/project/src/void-v2",
|
|
check=True
|
|
)
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red:
|
|
|
|
```bash
|
|
cd /project/src/void-v2/workers
|
|
. .venv/bin/activate
|
|
DATABASE_URL=... pytest tests/test_pdf.py -v
|
|
```
|
|
Expected: ImportError on pdf module.
|
|
|
|
- [ ] **Step 3:** Implement `workers/void_workers/repo.py`:
|
|
|
|
```python
|
|
import json
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
from .config import DATABASE_URL
|
|
|
|
def _conn():
|
|
return psycopg.connect(DATABASE_URL, autocommit=True, row_factory=dict_row)
|
|
|
|
def update_ref(ref_id, *, body_text=None, metadata_patch=None):
|
|
"""UPDATE refs ... + emit audit_log row with actor_kind='worker'."""
|
|
with _conn() as conn:
|
|
before = conn.execute("SELECT * FROM refs WHERE id=%s", (ref_id,)).fetchone()
|
|
sets, args = [], []
|
|
if body_text is not None:
|
|
sets.append("body_text=%s"); args.append(body_text)
|
|
if metadata_patch is not None:
|
|
sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
|
|
args.append(json.dumps(metadata_patch))
|
|
if not sets: return before
|
|
sets.append("updated_at=now()")
|
|
args.append(ref_id)
|
|
after = conn.execute(
|
|
f"UPDATE refs SET {', '.join(sets)} WHERE id=%s RETURNING *",
|
|
args
|
|
).fetchone()
|
|
# audit log — match Node's audit.js diff shape (simple)
|
|
diff = {}
|
|
if body_text is not None and (before or {}).get("body_text") != body_text:
|
|
diff["body_text"] = {"before": "(redacted)", "after_len": len(body_text)}
|
|
conn.execute("""
|
|
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
|
|
VALUES('worker', NULL, 'ref', %s, 'update', %s)
|
|
""", (ref_id, json.dumps({"kind": "update", "changes": diff})))
|
|
return after
|
|
```
|
|
|
|
- [ ] **Step 4:** Implement `workers/void_workers/handlers/pdf.py`:
|
|
|
|
```python
|
|
import subprocess
|
|
from .. import repo
|
|
|
|
NAME = "extract.pdf"
|
|
|
|
def _pdftotext(blob_path):
|
|
return subprocess.check_output(
|
|
["pdftotext", "-layout", blob_path, "-"],
|
|
timeout=120
|
|
).decode("utf-8", errors="replace")
|
|
|
|
def handle(job_data: dict) -> dict:
|
|
ref_id = job_data["ref_id"]
|
|
blob_path = job_data["blob_path"]
|
|
text = _pdftotext(blob_path).strip()
|
|
method = "pdftotext"
|
|
# Fallback (Tesseract OCR) lands in Task B3.
|
|
body_text = text[:200_000]
|
|
repo.update_ref(ref_id, body_text=body_text,
|
|
metadata_patch={"extract": {"method": method, "chars": len(body_text)}})
|
|
return {"ref_id": ref_id, "chars": len(body_text), "method": method}
|
|
```
|
|
|
|
- [ ] **Step 5:** Add `pdf.handle` to the registry — edit `workers/void_workers/handlers/__init__.py`:
|
|
|
|
```python
|
|
from . import echo, pdf
|
|
|
|
REGISTRY = {
|
|
echo.NAME: echo.handle,
|
|
pdf.NAME: pdf.handle,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6:** Run green:
|
|
|
|
```bash
|
|
DATABASE_URL=... pytest tests/test_pdf.py::test_pdf_born_digital_uses_pdftotext -v
|
|
```
|
|
Expected: 1 passed.
|
|
|
|
- [ ] **Step 7: Commit.**
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/pdf.py workers/void_workers/repo.py workers/tests/test_pdf.py
|
|
git commit -m "feat(workers): extract.pdf via pdftotext"
|
|
```
|
|
|
|
### Task B3: PDF Tesseract fallback for scanned pages
|
|
|
|
**Files:**
|
|
- Modify: `workers/void_workers/handlers/pdf.py`
|
|
- Modify: `workers/tests/test_pdf.py`
|
|
|
|
- [ ] **Step 1:** Add the failing test (append to `test_pdf.py`):
|
|
|
|
```python
|
|
def test_pdf_scanned_falls_back_to_tesseract(conn):
|
|
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
|
|
_run_node_migrations()
|
|
blob = FIXTURES / "scanned.pdf"
|
|
sp, ref = _seed_space_and_ref(conn, blob)
|
|
out = handle_pdf({"ref_id": str(ref), "blob_path": str(blob)})
|
|
assert out["method"] == "tesseract"
|
|
row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
|
|
assert "blackflame" in (row[0] or "").lower()
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Implement the fallback in `workers/void_workers/handlers/pdf.py`:
|
|
|
|
```python
|
|
import io
|
|
import subprocess
|
|
import tempfile
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
import pytesseract
|
|
from .. import repo
|
|
|
|
NAME = "extract.pdf"
|
|
FALLBACK_THRESHOLD = 200 # chars
|
|
|
|
def _pdftotext(blob_path):
|
|
return subprocess.check_output(
|
|
["pdftotext", "-layout", blob_path, "-"], timeout=120
|
|
).decode("utf-8", errors="replace")
|
|
|
|
def _ocr_pdf(blob_path):
|
|
"""Rasterize each page with pdftoppm, OCR each with Tesseract."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
subprocess.run(
|
|
["pdftoppm", "-r", "200", "-png", blob_path, f"{tmp}/p"],
|
|
check=True, timeout=300
|
|
)
|
|
pages = sorted(Path(tmp).glob("p-*.png"))
|
|
parts = []
|
|
for p in pages:
|
|
img = Image.open(p)
|
|
parts.append(pytesseract.image_to_string(img, lang="eng"))
|
|
return "\n".join(parts)
|
|
|
|
def handle(job_data: dict) -> dict:
|
|
ref_id = job_data["ref_id"]
|
|
blob_path = job_data["blob_path"]
|
|
method = "pdftotext"
|
|
text = _pdftotext(blob_path).strip()
|
|
if len(text) < FALLBACK_THRESHOLD:
|
|
method = "tesseract"
|
|
text = _ocr_pdf(blob_path).strip()
|
|
body_text = text[:200_000]
|
|
repo.update_ref(ref_id, body_text=body_text,
|
|
metadata_patch={"extract": {"method": method, "chars": len(body_text)}})
|
|
return {"ref_id": ref_id, "chars": len(body_text), "method": method}
|
|
```
|
|
|
|
- [ ] **Step 4:** Run green:
|
|
|
|
```bash
|
|
DATABASE_URL=... pytest tests/test_pdf.py -v
|
|
```
|
|
Expected: 2 passed.
|
|
|
|
- [ ] **Step 5: Commit.**
|
|
|
|
```bash
|
|
git add workers/void_workers/handlers/pdf.py workers/tests/test_pdf.py
|
|
git commit -m "feat(workers): pdf Tesseract fallback for scanned pages"
|
|
```
|
|
|
|
### Task B4: `extract.image` worker
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/handlers/image.py`
|
|
- Modify: `workers/void_workers/handlers/__init__.py`
|
|
- Create: `workers/tests/test_image.py`
|
|
|
|
- [ ] **Step 1:** Write the failing test `workers/tests/test_image.py`:
|
|
|
|
```python
|
|
import os
|
|
from pathlib import Path
|
|
from void_workers.handlers.image import handle as handle_image
|
|
|
|
FIXTURES = Path(__file__).parent / "fixtures"
|
|
|
|
def test_image_ocr(conn):
|
|
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
|
|
import subprocess
|
|
subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
|
|
sp = conn.execute(
|
|
"INSERT INTO spaces(slug, name) VALUES('plan4-img', 'I') RETURNING id"
|
|
).fetchone()[0]
|
|
blob = FIXTURES / "eng_text.png"
|
|
ref = conn.execute(
|
|
"INSERT INTO refs(space_id, kind, blob_path) VALUES(%s, 'image', %s) RETURNING id",
|
|
(sp, str(blob))
|
|
).fetchone()[0]
|
|
out = handle_image({"ref_id": str(ref), "blob_path": str(blob)})
|
|
assert out["chars"] > 0
|
|
row = conn.execute("SELECT body_text FROM refs WHERE id=%s", (ref,)).fetchone()
|
|
assert "blackflame" in (row[0] or "").lower()
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Implement `workers/void_workers/handlers/image.py`:
|
|
|
|
```python
|
|
from PIL import Image
|
|
import pytesseract
|
|
from .. import repo
|
|
|
|
NAME = "extract.image"
|
|
|
|
def handle(job_data: dict) -> dict:
|
|
ref_id = job_data["ref_id"]
|
|
blob_path = job_data["blob_path"]
|
|
text = pytesseract.image_to_string(Image.open(blob_path), lang="eng").strip()
|
|
body_text = text[:200_000]
|
|
repo.update_ref(ref_id, body_text=body_text,
|
|
metadata_patch={"extract": {"method": "tesseract", "chars": len(body_text)}})
|
|
return {"ref_id": ref_id, "chars": len(body_text)}
|
|
```
|
|
|
|
- [ ] **Step 4:** Register — edit `workers/void_workers/handlers/__init__.py`:
|
|
|
|
```python
|
|
from . import echo, pdf, image
|
|
|
|
REGISTRY = {
|
|
echo.NAME: echo.handle,
|
|
pdf.NAME: pdf.handle,
|
|
image.NAME: image.handle,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5:** Run green.
|
|
|
|
- [ ] **Step 6: Commit.**
|
|
|
|
```bash
|
|
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/image.py workers/tests/test_image.py
|
|
git commit -m "feat(workers): extract.image via Tesseract"
|
|
```
|
|
|
|
### Task B5: Node side — blob worker enqueues extract jobs
|
|
|
|
**Files:**
|
|
- Modify: `lib/jobs/workers/blob.js`
|
|
- Modify: `tests/jobs/workers/blob.test.js`
|
|
|
|
- [ ] **Step 1:** Extend the existing blob.test.js with a new assertion. Append:
|
|
|
|
```js
|
|
it('enqueues extract.pdf after creating a pdf ref', async () => {
|
|
const sp = await spaces.create({ slug: 'bpdf', name: 'BPdf' }, { kind: 'user', id: null });
|
|
const upTmp = path.join(tmpRoot, 'doc.tmp');
|
|
await fs.writeFile(upTmp, Buffer.from('%PDF-1.4 ...'));
|
|
const id = await queue.enqueue('ingest.blob', {
|
|
space_id: sp.id, tmp_path: upTmp, filename: 'a.pdf', content_type: 'application/pdf'
|
|
});
|
|
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
|
|
const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js');
|
|
const followUp = (await jobsRepo.list({ name: 'extract.pdf' })).find(j => j.data?.blob_path);
|
|
expect(followUp).toBeTruthy();
|
|
});
|
|
|
|
it('enqueues extract.image after creating an image ref', async () => {
|
|
const sp = await spaces.create({ slug: 'bimg', name: 'BImg' }, { kind: 'user', id: null });
|
|
const upTmp = path.join(tmpRoot, 'pic.tmp');
|
|
await fs.writeFile(upTmp, Buffer.from([0x89, 0x50, 0x4e, 0x47]));
|
|
const id = await queue.enqueue('ingest.blob', {
|
|
space_id: sp.id, tmp_path: upTmp, filename: 'a.png', content_type: 'image/png'
|
|
});
|
|
await waitForJob('ingest.blob', id, { timeoutMs: 10_000 });
|
|
const { default: jobsRepo } = await import('../../../lib/db/repos/jobs.js');
|
|
const followUp = (await jobsRepo.list({ name: 'extract.image' })).find(j => j.data?.blob_path);
|
|
expect(followUp).toBeTruthy();
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red:
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
npx vitest run tests/jobs/workers/blob.test.js
|
|
```
|
|
Expected: 2 new tests fail (no extract.* enqueue yet).
|
|
|
|
- [ ] **Step 3:** Modify `lib/jobs/workers/blob.js` to enqueue the follow-up job:
|
|
|
|
```js
|
|
// existing imports
|
|
import * as queue from '../queue.js';
|
|
|
|
// ... inside handler, after `const row = await refs.create(...)`:
|
|
if (kind === 'pdf') {
|
|
await queue.enqueue('extract.pdf', { ref_id: row.id, blob_path: path });
|
|
} else if (kind === 'image') {
|
|
await queue.enqueue('extract.image', { ref_id: row.id, blob_path: path });
|
|
}
|
|
return { ref_id: row.id, sha };
|
|
```
|
|
|
|
(Apply the exact edit at the `return { ref_id: row.id, sha };` site.)
|
|
|
|
- [ ] **Step 4:** Run green:
|
|
|
|
```bash
|
|
npx vitest run tests/jobs/workers/blob.test.js
|
|
```
|
|
Expected: 4 passed.
|
|
|
|
- [ ] **Step 5:** Run the full suite — must still be green.
|
|
|
|
```bash
|
|
npx vitest run
|
|
```
|
|
|
|
- [ ] **Step 6: Commit.**
|
|
|
|
```bash
|
|
git add lib/jobs/workers/blob.js tests/jobs/workers/blob.test.js
|
|
git commit -m "feat(jobs): blob worker fans out to extract.pdf / extract.image"
|
|
```
|
|
|
|
### Task B6: Phase B close — memory + full suite
|
|
|
|
- [ ] **Step 1:** `npx vitest run` — full Node suite green.
|
|
- [ ] **Step 2:** `cd workers && DATABASE_URL=... pytest -v` — full Python suite green.
|
|
- [ ] **Step 3:** Update memory: "Plan 4 Phase B complete: extract.pdf + extract.image workers; Node blob worker fans out."
|
|
|
|
---
|
|
|
|
## Phase C — `ingest.video` + CT 311 resize + GPU passthrough
|
|
|
|
### Task C1: Snapshot + CT 311 resize + GPU passthrough
|
|
|
|
User pre-authorized this 2026-06-01.
|
|
|
|
- [ ] **Step 1:** Snapshot CT 310 + CT 311.
|
|
|
|
```bash
|
|
SNAP=plan4_pre_resize_$(date +%Y%m%d_%H%M)
|
|
ssh root@192.168.1.124 "pct snapshot 310 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" &
|
|
ssh root@192.168.1.124 "pct snapshot 311 $SNAP --description 'Pre CT 311 resize + GPU passthrough'" &
|
|
wait
|
|
ssh root@192.168.1.124 "pct listsnapshot 310 | tail -3; echo ---; pct listsnapshot 311 | tail -3"
|
|
```
|
|
|
|
- [ ] **Step 2:** Resize CT 311.
|
|
|
|
```bash
|
|
ssh root@192.168.1.124 "pct set 311 -memory 8192 -cores 6"
|
|
ssh root@192.168.1.124 "pct config 311 | grep -E 'cores|memory'"
|
|
```
|
|
|
|
- [ ] **Step 3:** Add GPU device-node passthrough lines (copy from CT 102).
|
|
|
|
```bash
|
|
ssh root@192.168.1.124 "
|
|
pct set 311 -dev0 /dev/nvidia0,gid=44
|
|
pct set 311 -dev1 /dev/nvidiactl,gid=44
|
|
pct set 311 -dev2 /dev/nvidia-uvm,gid=44
|
|
pct set 311 -dev3 /dev/nvidia-uvm-tools,gid=44
|
|
pct set 311 -dev4 /dev/nvidia-caps/nvidia-cap1,gid=44
|
|
pct set 311 -dev5 /dev/nvidia-caps/nvidia-cap2,gid=44
|
|
pct config 311 | grep dev
|
|
"
|
|
```
|
|
|
|
- [ ] **Step 4:** Restart CT 311.
|
|
|
|
```bash
|
|
ssh root@192.168.1.124 "pct reboot 311"
|
|
sleep 10
|
|
ssh root@192.168.1.13 "ls -la /dev/nvidia* 2>&1 | head; nvidia-smi --query-gpu=name --format=csv,noheader 2>&1 || echo 'nvidia-smi missing (no driver in CT) — expected'"
|
|
curl -s http://192.168.1.13:3000/health
|
|
```
|
|
Expected: device nodes visible, health returns alpha-3.
|
|
|
|
- [ ] **Step 5: Commit (config-as-comment).** No code change yet — we record the infra step in the next phase's commit message.
|
|
|
|
### Task C2: Install ffmpeg + faster-whisper deps on CT 311
|
|
|
|
- [ ] **Step 1:** Install system deps:
|
|
|
|
```bash
|
|
ssh root@192.168.1.13 "
|
|
apt update
|
|
apt install -y python3.12 python3.12-venv python3-pip \
|
|
ffmpeg tesseract-ocr tesseract-ocr-eng poppler-utils \
|
|
nvidia-utils-535 nvidia-cuda-toolkit 2>&1 | tail -5
|
|
python3.12 --version
|
|
ffmpeg -version | head -1
|
|
tesseract --version 2>&1 | head -1
|
|
"
|
|
```
|
|
|
|
Note: `nvidia-cuda-toolkit` is for `ctranslate2`'s CUDA runtime; if you'd rather not pull the full toolkit (~1 GB), use the official `libcudnn8 libcublas12` runtime packages instead.
|
|
|
|
- [ ] **Step 2:** Create the voidworkers user + dirs on CT 311:
|
|
|
|
```bash
|
|
ssh root@192.168.1.13 "
|
|
id voidworkers >/dev/null 2>&1 || useradd -r -m -d /opt/void-workers -s /bin/bash voidworkers
|
|
mkdir -p /opt/void-workers /var/lib/void/whisper-models
|
|
chown voidworkers: /opt/void-workers
|
|
chown -R voidworkers: /var/lib/void/whisper-models
|
|
usermod -aG void voidworkers
|
|
chmod -R g+rX /var/lib/void/blobs # ensure voidworkers (in void group) can read blobs
|
|
|
|
install -m 644 /opt/void-workers/.env 2>/dev/null || true
|
|
cat > /opt/void-workers/.env <<EOF
|
|
DATABASE_URL=$(grep ^DATABASE_URL= /opt/void-server/.env | cut -d= -f2-)
|
|
BLOB_ROOT=/var/lib/void/blobs
|
|
WHISPER_MODEL=small.en
|
|
WHISPER_CACHE=/var/lib/void/whisper-models
|
|
EOF
|
|
chmod 600 /opt/void-workers/.env
|
|
chown voidworkers: /opt/void-workers/.env
|
|
"
|
|
```
|
|
|
|
- [ ] **Step 3:** Copy the systemd unit:
|
|
|
|
```bash
|
|
scp /project/src/void-v2/deploy/void-workers.service root@192.168.1.13:/etc/systemd/system/void-workers.service
|
|
ssh root@192.168.1.13 "systemctl daemon-reload && systemctl enable void-workers"
|
|
```
|
|
|
|
(Don't start it yet — venv is not installed; `push-workers.sh` will set it up.)
|
|
|
|
- [ ] **Step 4: Initial push.**
|
|
|
|
```bash
|
|
cd /project/src/void-v2
|
|
./deploy/push-workers.sh
|
|
ssh root@192.168.1.13 "systemctl status void-workers --no-pager -n 5"
|
|
```
|
|
Expected: active (running), waiting for jobs.
|
|
|
|
- [ ] **Step 5:** Sanity — enqueue an echo via Node, observe Python claim it.
|
|
|
|
```bash
|
|
TOKEN=$(ssh root@192.168.1.13 "grep ^OWNER_TOKEN= /opt/void-server/.env | cut -d= -f2-")
|
|
# Manually insert an echo via psql since /api/capture doesn't enqueue 'echo':
|
|
DBPASS=$(ssh root@192.168.1.124 'cat /root/void2-db-pass.txt' | tr -d '\r\n' | sed 's/^DB_PASS=//')
|
|
PGPASSWORD="$DBPASS" psql -h 192.168.1.215 -U void -d void -c "INSERT INTO pgboss.job(name, data) VALUES('echo', '{\"ping\": 99}'::jsonb) RETURNING id"
|
|
sleep 3
|
|
PGPASSWORD="$DBPASS" psql -h 192.168.1.215 -U void -d void -c "SELECT state, output FROM pgboss.job WHERE name='echo' ORDER BY created_on DESC LIMIT 1"
|
|
```
|
|
Expected: `completed | {"pong": 99}`.
|
|
|
|
- [ ] **Step 6: Commit deploy-side changes (none in repo from this task, but mark the milestone).**
|
|
|
|
No commit — this task is operational on CT 311. Note in memory.
|
|
|
|
### Task C3: Whisper model loader (`model.py`)
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/model.py`
|
|
- Create: `workers/tests/test_model.py`
|
|
|
|
- [ ] **Step 1:** Failing test (uses mock):
|
|
|
|
```python
|
|
# workers/tests/test_model.py
|
|
from unittest.mock import patch, MagicMock
|
|
from void_workers import model
|
|
|
|
def test_model_returns_singleton(monkeypatch):
|
|
m = MagicMock()
|
|
monkeypatch.setattr(model, "_whisper_model", None)
|
|
with patch("void_workers.model.cuda_available", return_value=False):
|
|
with patch("faster_whisper.WhisperModel", return_value=m):
|
|
a = model.whisper_model()
|
|
b = model.whisper_model()
|
|
assert a is b
|
|
|
|
def test_transcribe_returns_joined_segments(monkeypatch):
|
|
seg1 = MagicMock(text=" Hello world ")
|
|
seg2 = MagicMock(text=" second line")
|
|
fake_model = MagicMock()
|
|
fake_model.transcribe.return_value = ([seg1, seg2], MagicMock())
|
|
monkeypatch.setattr(model, "_whisper_model", fake_model)
|
|
out = model.whisper_transcribe("/tmp/whatever.opus")
|
|
assert "Hello world" in out
|
|
assert "second line" in out
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Implement `workers/void_workers/model.py`:
|
|
|
|
```python
|
|
import os
|
|
from .log import log
|
|
|
|
_whisper_model = None
|
|
|
|
def cuda_available():
|
|
try:
|
|
import ctranslate2
|
|
return ctranslate2.get_cuda_device_count() > 0
|
|
except Exception as e:
|
|
log.info("ctranslate2_cuda_probe_failed", err=str(e))
|
|
return False
|
|
|
|
def whisper_model():
|
|
global _whisper_model
|
|
if _whisper_model is None:
|
|
from faster_whisper import WhisperModel
|
|
name = os.environ.get("WHISPER_MODEL", "small.en")
|
|
cache = os.environ.get("WHISPER_CACHE", "/var/lib/void/whisper-models")
|
|
device = "cuda" if cuda_available() else "cpu"
|
|
compute_type = "float16" if device == "cuda" else "int8"
|
|
log.info("whisper_loading", model=name, device=device, compute_type=compute_type, cache=cache)
|
|
_whisper_model = WhisperModel(name, device=device, compute_type=compute_type, download_root=cache)
|
|
return _whisper_model
|
|
|
|
def whisper_transcribe(audio_path):
|
|
segments, _info = whisper_model().transcribe(audio_path, vad_filter=True)
|
|
return "\n".join(s.text.strip() for s in segments).strip()
|
|
```
|
|
|
|
- [ ] **Step 4:** Run green:
|
|
|
|
```bash
|
|
pytest tests/test_model.py -v
|
|
```
|
|
Expected: 2 passed.
|
|
|
|
- [ ] **Step 5: Commit.**
|
|
|
|
```bash
|
|
git add workers/void_workers/model.py workers/tests/test_model.py
|
|
git commit -m "feat(workers): whisper loader with CUDA detect + CPU fallback"
|
|
```
|
|
|
|
### Task C4: `ingest.video` worker (yt-dlp + transcribe + create ref)
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/handlers/video.py`
|
|
- Modify: `workers/void_workers/handlers/__init__.py`
|
|
- Create: `workers/tests/test_video.py`
|
|
- Modify: `workers/void_workers/repo.py` — add `create_ref()`
|
|
|
|
- [ ] **Step 1:** Failing test (mocked yt-dlp + mocked Whisper):
|
|
|
|
```python
|
|
# workers/tests/test_video.py
|
|
from unittest.mock import patch, MagicMock
|
|
from void_workers.handlers.video import handle as handle_video
|
|
|
|
def test_video_creates_ref_with_transcript_and_metadata(conn, monkeypatch):
|
|
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
|
|
import subprocess
|
|
subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
|
|
sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('plan4-vid', 'V') RETURNING id").fetchone()[0]
|
|
|
|
info = {
|
|
"title": "Sample video",
|
|
"description": "a description",
|
|
"duration": 90,
|
|
"uploader": "Channel",
|
|
"thumbnail": "https://i.ytimg.com/t.jpg"
|
|
}
|
|
with patch("void_workers.handlers.video._yt_dlp_info", return_value=info), \
|
|
patch("void_workers.handlers.video._yt_dlp_audio", return_value="/tmp/fake.opus"), \
|
|
patch("void_workers.handlers.video.whisper_transcribe", return_value="hello world transcript"), \
|
|
patch("os.unlink"):
|
|
out = handle_video({"space_id": str(sp), "url": "https://youtu.be/abc"})
|
|
|
|
assert "ref_id" in out
|
|
row = conn.execute("SELECT title, body_text, source_kind FROM refs WHERE id=%s", (out["ref_id"],)).fetchone()
|
|
assert row[0] == "Sample video"
|
|
assert "hello world" in row[1]
|
|
assert row[2] == "youtube"
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Implement `workers/void_workers/handlers/video.py`:
|
|
|
|
```python
|
|
import hashlib
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
import json
|
|
from .. import repo
|
|
from ..model import whisper_transcribe
|
|
|
|
NAME = "ingest.video"
|
|
|
|
def _yt_dlp_info(url):
|
|
"""Returns dict of metadata, or None if yt-dlp could not extract."""
|
|
try:
|
|
out = subprocess.check_output(
|
|
["yt-dlp", "-J", "--no-warnings", "--no-playlist", url],
|
|
timeout=60
|
|
)
|
|
return json.loads(out)
|
|
except subprocess.CalledProcessError as e:
|
|
return None
|
|
|
|
def _yt_dlp_audio(url):
|
|
"""Downloads bestaudio to a temp .opus and returns the path."""
|
|
tmp_dir = tempfile.mkdtemp(prefix="void-yt-")
|
|
out_template = os.path.join(tmp_dir, "audio.%(ext)s")
|
|
subprocess.run(
|
|
["yt-dlp", "-x", "--audio-format", "opus", "-o", out_template,
|
|
"--no-warnings", "--no-playlist", url],
|
|
check=True, timeout=600
|
|
)
|
|
for f in os.listdir(tmp_dir):
|
|
if f.startswith("audio."):
|
|
return os.path.join(tmp_dir, f)
|
|
raise RuntimeError("yt-dlp produced no audio file")
|
|
|
|
def _idem(space_id, url):
|
|
return hashlib.sha256((space_id + "\x00" + url).encode()).hexdigest()
|
|
|
|
def _kind(url):
|
|
return "youtube" if ("youtube.com" in url or "youtu.be" in url) else "video"
|
|
|
|
def handle(job_data: dict) -> dict:
|
|
space_id = job_data["space_id"]
|
|
url = job_data["url"]
|
|
idem = _idem(space_id, url)
|
|
|
|
info = _yt_dlp_info(url)
|
|
if info is None:
|
|
return {"skipped": "yt-dlp"}
|
|
|
|
audio_path = _yt_dlp_audio(url)
|
|
try:
|
|
transcript = whisper_transcribe(audio_path)
|
|
finally:
|
|
try: os.unlink(audio_path)
|
|
except OSError: pass
|
|
|
|
ref_id = repo.create_ref({
|
|
"space_id": space_id,
|
|
"kind": "video",
|
|
"source_url": url,
|
|
"title": info.get("title") or url,
|
|
"summary": (info.get("description") or "")[:5000],
|
|
"body_text": transcript[:200_000],
|
|
"source_kind": _kind(url),
|
|
"external_id": idem,
|
|
"metadata": {
|
|
"duration_s": info.get("duration"),
|
|
"uploader": info.get("uploader"),
|
|
"thumbnail": info.get("thumbnail"),
|
|
"extract": {"method": "whisper", "chars": len(transcript)},
|
|
}
|
|
})
|
|
return {"ref_id": ref_id, "chars": len(transcript)}
|
|
```
|
|
|
|
- [ ] **Step 4:** Extend `workers/void_workers/repo.py` with `create_ref`:
|
|
|
|
```python
|
|
def create_ref(input_):
|
|
"""INSERT INTO refs(...) RETURNING id; emit audit_log."""
|
|
fields = ["space_id","kind","source_url","title","summary","body_text",
|
|
"blob_path","metadata","source_kind","external_id","captured_at"]
|
|
cols, vals = [], []
|
|
for f in fields:
|
|
if f in input_:
|
|
cols.append(f); vals.append(input_[f])
|
|
placeholders = ",".join(f"%s" for _ in cols)
|
|
sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
|
|
with _conn() as conn:
|
|
ref_id = conn.execute(sql, vals).fetchone()["id"]
|
|
conn.execute("""
|
|
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
|
|
VALUES('worker', NULL, 'ref', %s, 'create', %s)
|
|
""", (ref_id, json.dumps({"kind": "create"})))
|
|
return str(ref_id)
|
|
```
|
|
|
|
(Make sure `json` import is at the top of `repo.py` if not already there. Also note `metadata` must be `json.dumps`'d before insertion because we're passing a dict; tweak the field handling to JSON-encode dict/list values.)
|
|
|
|
Patch `create_ref` to JSON-encode jsonb fields:
|
|
|
|
```python
|
|
def _maybe_json(v):
|
|
return json.dumps(v) if isinstance(v, (dict, list)) else v
|
|
|
|
def create_ref(input_):
|
|
fields = ["space_id","kind","source_url","title","summary","body_text",
|
|
"blob_path","metadata","source_kind","external_id","captured_at"]
|
|
cols, vals = [], []
|
|
for f in fields:
|
|
if f in input_:
|
|
cols.append(f); vals.append(_maybe_json(input_[f]))
|
|
placeholders = ",".join(f"%s" for _ in cols)
|
|
sql = f"INSERT INTO refs({','.join(cols)}) VALUES({placeholders}) RETURNING id"
|
|
with _conn() as conn:
|
|
ref_id = conn.execute(sql, vals).fetchone()["id"]
|
|
conn.execute("""
|
|
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
|
|
VALUES('worker', NULL, 'ref', %s, 'create', %s)
|
|
""", (ref_id, json.dumps({"kind": "create"})))
|
|
return str(ref_id)
|
|
```
|
|
|
|
- [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`:
|
|
|
|
```python
|
|
from . import echo, pdf, image, video
|
|
REGISTRY = {
|
|
echo.NAME: echo.handle,
|
|
pdf.NAME: pdf.handle,
|
|
image.NAME: image.handle,
|
|
video.NAME: video.handle,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6:** Run green:
|
|
|
|
```bash
|
|
pytest tests/test_video.py -v
|
|
```
|
|
Expected: 1 passed.
|
|
|
|
- [ ] **Step 7: Commit.**
|
|
|
|
```bash
|
|
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/video.py workers/void_workers/repo.py workers/tests/test_video.py
|
|
git commit -m "feat(workers): ingest.video via yt-dlp + Whisper"
|
|
```
|
|
|
|
### Task C5: Node — capture.js routes YouTube to `ingest.video`
|
|
|
|
**Files:**
|
|
- Modify: `lib/api/routes/capture.js`
|
|
- Modify: `tests/api/capture.test.js`
|
|
|
|
- [ ] **Step 1:** Failing test — append to `tests/api/capture.test.js`:
|
|
|
|
```js
|
|
it('POST /api/capture with YouTube URL enqueues ingest.video, not ingest.url', async () => {
|
|
const res = await request(app).post('/api/capture').set(ownerHeaders)
|
|
.send({ space_id: sp.id, url: 'https://youtu.be/abc' });
|
|
expect(res.status).toBe(202);
|
|
expect(res.body.job_id).toBeTruthy();
|
|
const { default: jobsRepo } = await import('../../lib/db/repos/jobs.js');
|
|
const rows = await jobsRepo.list({ name: 'ingest.video' });
|
|
expect(rows.find(r => r.id === res.body.job_id)).toBeTruthy();
|
|
const urlRows = await jobsRepo.list({ name: 'ingest.url' });
|
|
expect(urlRows.find(r => r.id === res.body.job_id)).toBeFalsy();
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Modify `lib/api/routes/capture.js` — replace the `POST /` handler's enqueue logic:
|
|
|
|
```js
|
|
// helper near the top
|
|
const VIDEO_HOST_RE = /(^|\.)(youtube\.com|youtu\.be|vimeo\.com)$/i;
|
|
|
|
function isVideoUrl(url) {
|
|
try { return VIDEO_HOST_RE.test(new URL(url).hostname); }
|
|
catch { return false; }
|
|
}
|
|
|
|
// inside router.post('/'), after computing idem + the existing-ref check:
|
|
const isVideo = isVideoUrl(url);
|
|
const job_name = isVideo ? 'ingest.video' : 'ingest.url';
|
|
const job_id = await queue.enqueue(job_name, { space_id, url });
|
|
res.status(202).json({ job_id, idempotency_key: idem });
|
|
```
|
|
|
|
- [ ] **Step 4:** Run green:
|
|
|
|
```bash
|
|
npx vitest run tests/api/capture.test.js
|
|
```
|
|
Expected: existing + new tests pass.
|
|
|
|
- [ ] **Step 5: Commit.**
|
|
|
|
```bash
|
|
git add lib/api/routes/capture.js tests/api/capture.test.js
|
|
git commit -m "feat(api): capture routes YouTube/Vimeo URLs to ingest.video"
|
|
```
|
|
|
|
### Task C6: Phase C close — snapshot + memory
|
|
|
|
- [ ] **Step 1:** Run full Node + Python test suites green.
|
|
|
|
```bash
|
|
cd /project/src/void-v2 && npx vitest run
|
|
cd workers && DATABASE_URL=... pytest -v
|
|
```
|
|
|
|
- [ ] **Step 2:** Snapshot CT 310 + 311 — `plan4_phase_c_<timestamp>`.
|
|
|
|
- [ ] **Step 3:** Update memory: "Plan 4 Phase C complete: Whisper + yt-dlp live; CT 311 resized 6c/8G + GPU passthrough; voidworkers.service running on CT 311."
|
|
|
|
---
|
|
|
|
## Phase D — Source-doc sync + version bump + completion doc
|
|
|
|
### Task D1: Python `safe_fetch` port
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/safe_fetch.py`
|
|
- Create: `workers/tests/test_safe_fetch.py`
|
|
|
|
- [ ] **Step 1:** Failing test:
|
|
|
|
```python
|
|
# workers/tests/test_safe_fetch.py
|
|
import pytest
|
|
from void_workers.safe_fetch import safe_fetch, SafeFetchError
|
|
|
|
def test_rejects_file_scheme():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("file:///etc/passwd")
|
|
|
|
def test_rejects_loopback():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://127.0.0.1/x")
|
|
|
|
def test_rejects_rfc1918():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://192.168.1.1/x")
|
|
|
|
def test_rejects_metadata_endpoint():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://169.254.169.254/latest/")
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Implement `workers/void_workers/safe_fetch.py`:
|
|
|
|
```python
|
|
import socket
|
|
import ipaddress
|
|
import urllib.request
|
|
import os
|
|
from urllib.parse import urlparse
|
|
|
|
BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [
|
|
"0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8",
|
|
"172.16.0.0/12", "192.168.0.0/16",
|
|
"169.254.0.0/16", "100.64.0.0/10",
|
|
]]
|
|
|
|
class SafeFetchError(Exception):
|
|
pass
|
|
|
|
def _is_blocked(addr):
|
|
if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true":
|
|
return False
|
|
try:
|
|
ip = ipaddress.ip_address(addr)
|
|
except ValueError:
|
|
return True
|
|
if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
|
|
return True
|
|
if isinstance(ip, ipaddress.IPv4Address):
|
|
return any(ip in n for n in BLOCK_V4_NETS)
|
|
# IPv6: ULA + link-local
|
|
if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"):
|
|
return True
|
|
return False
|
|
|
|
def _resolve(host):
|
|
try:
|
|
infos = socket.getaddrinfo(host, None)
|
|
except socket.gaierror as e:
|
|
raise SafeFetchError(f"no DNS for {host}: {e}")
|
|
addrs = list({i[4][0] for i in infos})
|
|
for a in addrs:
|
|
if _is_blocked(a):
|
|
raise SafeFetchError(f"{host} resolves to blocked address {a}")
|
|
if not addrs:
|
|
raise SafeFetchError(f"no addresses for {host}")
|
|
return addrs[0]
|
|
|
|
def safe_fetch(url, *, headers=None, timeout=15, max_hops=5):
|
|
current = url
|
|
for hop in range(max_hops + 1):
|
|
u = urlparse(current)
|
|
if u.scheme not in ("http", "https"):
|
|
raise SafeFetchError(f"unsupported scheme {u.scheme}")
|
|
host = u.hostname
|
|
# If host is a literal IP, validate; else resolve + validate
|
|
try:
|
|
ipaddress.ip_address(host)
|
|
if _is_blocked(host):
|
|
raise SafeFetchError(f"blocked literal IP {host}")
|
|
except ValueError:
|
|
_resolve(host)
|
|
req = urllib.request.Request(current, headers=headers or {})
|
|
try:
|
|
opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler)
|
|
with opener.open(req, timeout=timeout) as r:
|
|
return r.read()
|
|
except urllib.error.HTTPError as e:
|
|
if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops:
|
|
current = e.headers["Location"]
|
|
continue
|
|
raise
|
|
raise SafeFetchError(f"too many redirects ({max_hops})")
|
|
```
|
|
|
|
- [ ] **Step 4:** Run green.
|
|
|
|
- [ ] **Step 5: Commit.**
|
|
|
|
```bash
|
|
git add workers/void_workers/safe_fetch.py workers/tests/test_safe_fetch.py
|
|
git commit -m "feat(workers): safe_fetch (Python port)"
|
|
```
|
|
|
|
### Task D2: `sync.source_doc` worker
|
|
|
|
**Files:**
|
|
- Create: `workers/void_workers/handlers/sourcedoc.py`
|
|
- Modify: `workers/void_workers/handlers/__init__.py`
|
|
- Create: `workers/tests/test_sourcedoc.py`
|
|
- Modify: `workers/void_workers/repo.py` — add `get_source_doc`, `update_source_doc`
|
|
|
|
- [ ] **Step 1:** Failing test:
|
|
|
|
```python
|
|
# workers/tests/test_sourcedoc.py
|
|
from unittest.mock import patch
|
|
from void_workers.handlers.sourcedoc import handle as handle_sd
|
|
|
|
def test_sourcedoc_updates_body_text(conn):
|
|
conn.execute("DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public")
|
|
import subprocess
|
|
subprocess.run(["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True)
|
|
sp = conn.execute("INSERT INTO spaces(slug, name) VALUES('sd', 'SD') RETURNING id").fetchone()[0]
|
|
res = conn.execute(
|
|
"INSERT INTO resources(space_id, slug, name, runtime_type) "
|
|
"VALUES(%s, 'r', 'R', 'lxc') RETURNING id", (sp,)
|
|
).fetchone()[0]
|
|
sd = conn.execute(
|
|
"INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) "
|
|
"VALUES(%s, 'doc', 'https://example.com/r', 'url') RETURNING id",
|
|
(res,)
|
|
).fetchone()[0]
|
|
with patch("void_workers.handlers.sourcedoc.safe_fetch",
|
|
return_value=b"hello world doc body"):
|
|
out = handle_sd({"source_doc_id": str(sd)})
|
|
assert out.get("updated") is True
|
|
row = conn.execute("SELECT body_text FROM source_docs WHERE id=%s", (sd,)).fetchone()
|
|
assert "hello world" in (row[0] or "")
|
|
```
|
|
|
|
- [ ] **Step 2:** Run red.
|
|
|
|
- [ ] **Step 3:** Extend `workers/void_workers/repo.py` with the two helpers:
|
|
|
|
```python
|
|
def get_source_doc(source_doc_id):
|
|
with _conn() as conn:
|
|
return conn.execute(
|
|
"SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
|
|
).fetchone()
|
|
|
|
def update_source_doc(source_doc_id, *, body_text=None, last_synced=None, metadata_patch=None):
|
|
with _conn() as conn:
|
|
before = conn.execute(
|
|
"SELECT * FROM source_docs WHERE id=%s", (source_doc_id,)
|
|
).fetchone()
|
|
sets, args = [], []
|
|
if body_text is not None:
|
|
sets.append("body_text=%s"); args.append(body_text)
|
|
if last_synced is not None:
|
|
sets.append("last_synced=%s"); args.append(last_synced)
|
|
if metadata_patch is not None:
|
|
sets.append("metadata = coalesce(metadata, '{}'::jsonb) || %s::jsonb")
|
|
args.append(json.dumps(metadata_patch))
|
|
if not sets: return before
|
|
sets.append("updated_at=now()")
|
|
args.append(source_doc_id)
|
|
after = conn.execute(
|
|
f"UPDATE source_docs SET {', '.join(sets)} WHERE id=%s RETURNING *", args
|
|
).fetchone()
|
|
conn.execute("""
|
|
INSERT INTO audit_log(actor_kind, actor_id, entity_type, entity_id, action, diff)
|
|
VALUES('worker', NULL, 'source_doc', %s, 'update', %s)
|
|
""", (source_doc_id, json.dumps({"kind":"update","changes":{"body_text":"updated" if body_text is not None else None}})))
|
|
return after
|
|
```
|
|
|
|
- [ ] **Step 4:** Implement `workers/void_workers/handlers/sourcedoc.py`:
|
|
|
|
```python
|
|
import hashlib
|
|
from datetime import datetime, timezone
|
|
from .. import repo
|
|
from ..safe_fetch import safe_fetch
|
|
|
|
NAME = "sync.source_doc"
|
|
|
|
def _sha(data):
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
def handle(job_data: dict) -> dict:
|
|
sd_id = job_data["source_doc_id"]
|
|
doc = repo.get_source_doc(sd_id)
|
|
if not doc:
|
|
return {"skipped": "gone"}
|
|
body = safe_fetch(doc["upstream_url"], headers={"User-Agent": "void-ingest/2.0"})
|
|
new_sha = _sha(body)
|
|
old_sha = ((doc.get("metadata") or {}).get("body_sha"))
|
|
now = datetime.now(timezone.utc)
|
|
if new_sha == old_sha:
|
|
repo.update_source_doc(sd_id, last_synced=now)
|
|
return {"unchanged": True}
|
|
text = body.decode("utf-8", errors="replace")[:1_000_000]
|
|
repo.update_source_doc(
|
|
sd_id,
|
|
body_text=text,
|
|
last_synced=now,
|
|
metadata_patch={"body_sha": new_sha}
|
|
)
|
|
return {"updated": True, "chars": len(text)}
|
|
```
|
|
|
|
- [ ] **Step 5:** Register — edit `workers/void_workers/handlers/__init__.py`:
|
|
|
|
```python
|
|
from . import echo, pdf, image, video, sourcedoc
|
|
REGISTRY = {
|
|
echo.NAME: echo.handle,
|
|
pdf.NAME: pdf.handle,
|
|
image.NAME: image.handle,
|
|
video.NAME: video.handle,
|
|
sourcedoc.NAME: sourcedoc.handle,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6:** Run green.
|
|
|
|
- [ ] **Step 7: Commit.**
|
|
|
|
```bash
|
|
git add workers/void_workers/handlers/__init__.py workers/void_workers/handlers/sourcedoc.py workers/void_workers/repo.py workers/tests/test_sourcedoc.py
|
|
git commit -m "feat(workers): sync.source_doc with sha256 diff"
|
|
```
|
|
|
|
### Task D3: Node-side cron — `lib/cron/sync_source_docs.js`
|
|
|
|
**Files:**
|
|
- Modify: `package.json` — add `node-cron`.
|
|
- Create: `lib/cron/index.js`
|
|
- Create: `lib/cron/sync_source_docs.js`
|
|
- Modify: `server.js` — start the cron in the CLI gate.
|
|
- Create: `tests/cron/sync_source_docs.test.js`
|
|
|
|
- [ ] **Step 1:** Add dep:
|
|
|
|
```bash
|
|
cd /project/src/void-v2 && npm i node-cron@^3
|
|
```
|
|
|
|
- [ ] **Step 2:** Failing test `tests/cron/sync_source_docs.test.js`:
|
|
|
|
```js
|
|
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
|
import { resetDb } from '../helpers/db.js';
|
|
import { migrateUp } from '../../lib/db/migrate.js';
|
|
import { stopBoss } from '../helpers/boss.js';
|
|
import { pool } from '../../lib/db/pool.js';
|
|
import * as queue from '../../lib/jobs/queue.js';
|
|
import { registerWorkers } from '../../lib/jobs/index.js';
|
|
import { runSync } from '../../lib/cron/sync_source_docs.js';
|
|
import * as jobs from '../../lib/db/repos/jobs.js';
|
|
|
|
beforeEach(async () => {
|
|
await resetDb(); await migrateUp();
|
|
await queue.start(); await registerWorkers();
|
|
});
|
|
afterEach(async () => { await stopBoss(); });
|
|
|
|
describe('cron/sync_source_docs.runSync', () => {
|
|
it('enqueues sync.source_doc for each url-synced row', async () => {
|
|
const sp = (await pool.query(
|
|
`INSERT INTO spaces(slug, name) VALUES('s','S') RETURNING id`
|
|
)).rows[0].id;
|
|
const res = (await pool.query(
|
|
`INSERT INTO resources(space_id, slug, name, runtime_type) VALUES($1,'r','R','lxc') RETURNING id`,
|
|
[sp]
|
|
)).rows[0].id;
|
|
await pool.query(
|
|
`INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) VALUES($1,'doc','https://example.com/r','url')`,
|
|
[res]
|
|
);
|
|
const enqueued = await runSync();
|
|
expect(enqueued).toBe(1);
|
|
const queued = await jobs.list({ name: 'sync.source_doc' });
|
|
expect(queued.length).toBe(1);
|
|
});
|
|
});
|
|
```
|
|
|
|
- [ ] **Step 3:** Run red.
|
|
|
|
- [ ] **Step 4:** Implement `lib/cron/sync_source_docs.js`:
|
|
|
|
```js
|
|
import { pool } from '../db/pool.js';
|
|
import * as queue from '../jobs/queue.js';
|
|
import { log } from '../log.js';
|
|
|
|
export async function runSync() {
|
|
const { rows } = await pool.query(
|
|
`SELECT id FROM source_docs WHERE sync_source = 'url'`
|
|
);
|
|
for (const r of rows) {
|
|
await queue.enqueue('sync.source_doc', { source_doc_id: r.id });
|
|
}
|
|
return rows.length;
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 5:** Implement `lib/cron/index.js`:
|
|
|
|
```js
|
|
import cron from 'node-cron';
|
|
import { runSync } from './sync_source_docs.js';
|
|
import { log } from '../log.js';
|
|
|
|
export function startCron() {
|
|
// Daily at 03:00 local time
|
|
cron.schedule('0 3 * * *', async () => {
|
|
try {
|
|
const n = await runSync();
|
|
log.info({ enqueued: n }, 'cron sync.source_doc complete');
|
|
} catch (e) {
|
|
log.error({ err: e }, 'cron sync.source_doc failed');
|
|
}
|
|
});
|
|
log.info('cron started');
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6:** Wire into `server.js` — extend the CLI gate:
|
|
|
|
```js
|
|
// additions
|
|
import { startCron } from './lib/cron/index.js';
|
|
|
|
// Inside the `if (import.meta.url === ...)` block, after registerWorkers:
|
|
startCron();
|
|
```
|
|
|
|
- [ ] **Step 7:** Run green:
|
|
|
|
```bash
|
|
npx vitest run tests/cron/sync_source_docs.test.js
|
|
```
|
|
|
|
- [ ] **Step 8: Commit.**
|
|
|
|
```bash
|
|
git add package.json package-lock.json lib/cron/index.js lib/cron/sync_source_docs.js server.js tests/cron/sync_source_docs.test.js
|
|
git commit -m "feat(cron): daily sync.source_doc enqueue"
|
|
```
|
|
|
|
### Task D4: Version bump + CHANGELOG + completion doc
|
|
|
|
**Files:**
|
|
- Modify: `package.json` → `2.0.0-alpha.4`
|
|
- Modify: `server.js` → `VERSION = '2.0.0-alpha.4'`
|
|
- Modify: `tests/server.test.js` → assert `2.0.0-alpha.4`
|
|
- Modify: `CHANGELOG.md` — append `[2.0.0-alpha.4]` entry
|
|
- Create: `docs/plan-4-complete.md`
|
|
|
|
- [ ] **Step 1:** Bump version literals (three files). Use Edit.
|
|
|
|
- [ ] **Step 2:** Append CHANGELOG entry:
|
|
|
|
```markdown
|
|
## [2.0.0-alpha.4] — 2026-06-NN
|
|
|
|
### Added (Plan 4: Python void-workers)
|
|
- `void-workers.service` — Python 3.12 service alongside `void-server`
|
|
on CT 311. psycopg-based pg-boss client matches Node's claim/finish
|
|
semantics via `SELECT ... FOR UPDATE SKIP LOCKED`.
|
|
- **`extract.pdf`** — pdftotext first; per-page Tesseract OCR fallback
|
|
when extraction yields < 200 chars.
|
|
- **`extract.image`** — Tesseract OCR with English data.
|
|
- **`ingest.video`** — yt-dlp metadata + audio extraction + faster-whisper
|
|
transcription. CUDA at startup, CPU fallback on HA failover to Z3.
|
|
YouTube / youtu.be / Vimeo URLs route through `/api/capture` to
|
|
`ingest.video` instead of `ingest.url`.
|
|
- **`sync.source_doc`** — fetch upstream + sha256 diff + update
|
|
`body_text`. Node cron enqueues daily at 03:00.
|
|
- **Node `blob.js`** fans out to `extract.pdf` / `extract.image` after
|
|
creating PDF / image refs.
|
|
- **CT 311 infrastructure**: resized to 6 cores / 8 GB RAM, GPU device
|
|
nodes passed through (shared with CT 102's Ollama).
|
|
```
|
|
|
|
- [ ] **Step 3:** Create `docs/plan-4-complete.md` (use the Plan 3 doc as a template — sections: Date / Version / Tests / Commits / Snapshots / Scope per phase / Security findings handled / Open items / What's left after Plan 4).
|
|
|
|
- [ ] **Step 4:** Run full test suites green (Node + Python).
|
|
|
|
```bash
|
|
cd /project/src/void-v2 && npx vitest run
|
|
cd workers && DATABASE_URL=... pytest -v
|
|
```
|
|
|
|
- [ ] **Step 5: Commit.**
|
|
|
|
```bash
|
|
git add package.json server.js tests/server.test.js CHANGELOG.md docs/plan-4-complete.md
|
|
git commit -m "chore: version 2.0.0-alpha.4 + changelog + plan-4 completion doc"
|
|
```
|
|
|
|
### Task D5: Plan 4 close — snapshot + memory
|
|
|
|
- [ ] **Step 1:** Snapshot CT 310 + 311 — `plan4_complete_<timestamp>`.
|
|
- [ ] **Step 2:** Update memory `project_void_v2_execution.md` — mark Plan 4 complete; note alpha-4 source-built but not yet deployed (deploy is a separate user-OK decision per the standing rule).
|
|
|
|
---
|
|
|
|
## Spec coverage check
|
|
|
|
Every Plan 4 spec section maps to at least one task:
|
|
|
|
| Spec § | Task(s) |
|
|
|---|---|
|
|
| Phase A harness | A1, A2, A3, A4 |
|
|
| Phase B PDF + image | B1, B2, B3, B4, B5 |
|
|
| Phase C Whisper + yt-dlp + GPU | C1, C2, C3, C4, C5 |
|
|
| Phase D source-doc sync + version | D1, D2, D3, D4 |
|
|
| `safe_fetch` Python port | D1 |
|
|
| Concurrency env config | A1 (`config.py`) — applies across handlers |
|
|
| systemd `MemoryMax=6G` | A4 (`void-workers.service`) |
|
|
| CT 311 resize + GPU passthrough | C1 |
|
|
| YouTube routing in capture.js | C5 |
|
|
| `blob.js` fans out | B5 |
|
|
| Repo audit shim (`actor_kind='worker'`) | B2 (`repo.update_ref`), C4 (`repo.create_ref`), D2 (`repo.update_source_doc`) |
|
|
|
|
## Type & name consistency
|
|
|
|
- Worker name strings: `echo`, `extract.pdf`, `extract.image`, `ingest.video`, `sync.source_doc` — exact same strings used in `NAME` constants, registry keys, Node enqueue calls, and tests.
|
|
- Handler signature: `def handle(job_data: dict) -> dict` everywhere.
|
|
- Job payload shapes:
|
|
- `extract.pdf` / `extract.image`: `{ ref_id, blob_path }` (both producers Node-side use these keys; consumer reads same keys).
|
|
- `ingest.video`: `{ space_id, url }`.
|
|
- `sync.source_doc`: `{ source_doc_id }` (Node cron and Python worker both use this exact key).
|
|
- `repo.update_ref` / `repo.create_ref` / `repo.update_source_doc` / `repo.get_source_doc` — same names across all tasks that touch them.
|