From 8fa7f716941a9ba0791472e69aadf3f59b728bf7 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 10:13:27 +1000 Subject: [PATCH] feat(workers): sync.source_doc with sha256 diff Fetches upstream URL via safe_fetch, sha256-diffs against the prior body_sha stored in metadata, updates body_text + last_synced only when content changed. Unchanged syncs just touch last_synced. Co-Authored-By: Claude Opus 4.7 --- workers/tests/test_sourcedoc.py | 70 ++++++++++++++++++++++ workers/void_workers/handlers/__init__.py | 3 +- workers/void_workers/handlers/sourcedoc.py | 32 ++++++++++ 3 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 workers/tests/test_sourcedoc.py create mode 100644 workers/void_workers/handlers/sourcedoc.py diff --git a/workers/tests/test_sourcedoc.py b/workers/tests/test_sourcedoc.py new file mode 100644 index 0000000..ffaeec5 --- /dev/null +++ b/workers/tests/test_sourcedoc.py @@ -0,0 +1,70 @@ +import subprocess +from unittest.mock import patch +from void_workers.handlers.sourcedoc import handle as handle_sd + + +def _reset_void_schema(conn): + conn.execute("DROP SCHEMA IF EXISTS public CASCADE") + conn.execute("CREATE SCHEMA public") + conn.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto") + conn.execute("CREATE EXTENSION IF NOT EXISTS vector") + + +def _run_node_migrations(): + subprocess.run( + ["node", "lib/db/migrate.js", "up"], + cwd="/project/src/void-v2", + check=True + ) + + +def _seed(conn, slug='sd'): + sp = conn.execute( + f"INSERT INTO spaces(slug, name) VALUES('{slug}', 'SD') RETURNING id" + ).fetchone()[0] + res = conn.execute( + "INSERT INTO resources(space_id, slug, name, runtime_type) " + "VALUES(%s, 'r', 'R', 'lxc') RETURNING id", (sp,) + ).fetchone()[0] + sd = conn.execute( + "INSERT INTO source_docs(resource_id, name, upstream_url, sync_source) " + "VALUES(%s, 'doc', 'https://example.com/r', 'url') RETURNING id", + (res,) + ).fetchone()[0] + return sp, res, sd + + +def test_sourcedoc_updates_body_text(conn): + _reset_void_schema(conn) + _run_node_migrations() + sp, res, sd = _seed(conn) + with patch("void_workers.handlers.sourcedoc.safe_fetch", + return_value=b"hello world doc body"): + out = handle_sd({"source_doc_id": str(sd)}) + assert out.get("updated") is True + row = conn.execute("SELECT body_text FROM source_docs WHERE id=%s", (sd,)).fetchone() + assert "hello world" in (row[0] or "") + + +def test_sourcedoc_unchanged_when_sha_matches(conn): + _reset_void_schema(conn) + _run_node_migrations() + sp, res, sd = _seed(conn, slug='sd2') + + # First sync writes body + sha into metadata + with patch("void_workers.handlers.sourcedoc.safe_fetch", + return_value=b"unchanged content"): + handle_sd({"source_doc_id": str(sd)}) + + # Second sync with same body + with patch("void_workers.handlers.sourcedoc.safe_fetch", + return_value=b"unchanged content"): + out = handle_sd({"source_doc_id": str(sd)}) + assert out.get("unchanged") is True + + +def test_sourcedoc_skipped_when_gone(conn): + _reset_void_schema(conn) + _run_node_migrations() + out = handle_sd({"source_doc_id": "00000000-0000-0000-0000-000000000000"}) + assert out.get("skipped") == "gone" diff --git a/workers/void_workers/handlers/__init__.py b/workers/void_workers/handlers/__init__.py index b15bce9..b048658 100644 --- a/workers/void_workers/handlers/__init__.py +++ b/workers/void_workers/handlers/__init__.py @@ -1,8 +1,9 @@ -from . import echo, pdf, image, video +from . import echo, pdf, image, video, sourcedoc REGISTRY = { echo.NAME: echo.handle, pdf.NAME: pdf.handle, image.NAME: image.handle, video.NAME: video.handle, + sourcedoc.NAME: sourcedoc.handle, } diff --git a/workers/void_workers/handlers/sourcedoc.py b/workers/void_workers/handlers/sourcedoc.py new file mode 100644 index 0000000..52db50c --- /dev/null +++ b/workers/void_workers/handlers/sourcedoc.py @@ -0,0 +1,32 @@ +import hashlib +from datetime import datetime, timezone +from .. import repo +from ..safe_fetch import safe_fetch + +NAME = "sync.source_doc" + + +def _sha(data): + return hashlib.sha256(data).hexdigest() + + +def handle(job_data: dict) -> dict: + sd_id = job_data["source_doc_id"] + doc = repo.get_source_doc(sd_id) + if not doc: + return {"skipped": "gone"} + body = safe_fetch(doc["upstream_url"], headers={"User-Agent": "void-ingest/2.0"}) + new_sha = _sha(body) + old_sha = ((doc.get("metadata") or {}).get("body_sha")) + now = datetime.now(timezone.utc) + if new_sha == old_sha: + repo.update_source_doc(sd_id, last_synced=now) + return {"unchanged": True} + text = body.decode("utf-8", errors="replace")[:1_000_000] + repo.update_source_doc( + sd_id, + body_text=text, + last_synced=now, + metadata_patch={"body_sha": new_sha} + ) + return {"updated": True, "chars": len(text)}