From fba1ce48e45b656363e9ddeb40d5f8f3bc05c5fc Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 04:43:52 +1000 Subject: [PATCH] feat(workers): runner loop + echo handler Co-Authored-By: Claude Opus 4.7 --- workers/tests/test_echo.py | 18 +++++++ workers/void_workers/handlers/__init__.py | 5 ++ workers/void_workers/handlers/echo.py | 5 ++ workers/void_workers/runner.py | 63 +++++++++++++++++++++++ 4 files changed, 91 insertions(+) create mode 100644 workers/tests/test_echo.py create mode 100644 workers/void_workers/handlers/__init__.py create mode 100644 workers/void_workers/handlers/echo.py create mode 100644 workers/void_workers/runner.py diff --git a/workers/tests/test_echo.py b/workers/tests/test_echo.py new file mode 100644 index 0000000..c0d6719 --- /dev/null +++ b/workers/tests/test_echo.py @@ -0,0 +1,18 @@ +import json +from void_workers.runner import Runner +from void_workers.config import DATABASE_URL + + +def test_echo_handler_runs(boss_ready): + boss_ready.execute( + "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", + (json.dumps({"ping": 42}),) + ) + runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}}) + runner.run(once=True) + row = boss_ready.execute( + "SELECT state, output FROM pgboss.job WHERE name='echo' " + "ORDER BY created_on DESC LIMIT 1" + ).fetchone() + assert row[0] == "completed" + assert row[1] == {"pong": 42} diff --git a/workers/void_workers/handlers/__init__.py b/workers/void_workers/handlers/__init__.py new file mode 100644 index 0000000..462c509 --- /dev/null +++ b/workers/void_workers/handlers/__init__.py @@ -0,0 +1,5 @@ +from . import echo + +REGISTRY = { + echo.NAME: echo.handle, +} diff --git a/workers/void_workers/handlers/echo.py b/workers/void_workers/handlers/echo.py new file mode 100644 index 0000000..f3b08e0 --- /dev/null +++ b/workers/void_workers/handlers/echo.py @@ -0,0 +1,5 @@ +NAME = "echo" + + +def handle(job_data: dict) -> dict: + return {"pong": job_data.get("ping", 0)} diff --git a/workers/void_workers/runner.py b/workers/void_workers/runner.py new file mode 100644 index 0000000..6423d8b --- /dev/null +++ b/workers/void_workers/runner.py @@ -0,0 +1,63 @@ +import signal +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from .boss import Boss +from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL +from .handlers import REGISTRY +from .log import log + + +class Runner: + def __init__(self, dsn=None, handlers=None): + self.boss = Boss(dsn or DATABASE_URL) + self.handlers = handlers or { + name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY + } + self._stop = threading.Event() + + def _process_one(self, queue): + job = self.boss.claim(queue) + if not job: + return False + log.info("job_claimed", job_id=str(job["id"]), name=queue) + handler = REGISTRY[queue] + try: + out = handler(job["data"] or {}) + self.boss.complete(job["id"], out or {}) + log.info("job_completed", job_id=str(job["id"]), name=queue) + except Exception as e: + log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e)) + self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)}) + return True + + def _loop(self, queue): + interval = POLL_INTERVAL_MS / 1000.0 + while not self._stop.is_set(): + try: + did = self._process_one(queue) + except Exception as e: + log.error("loop_error", queue=queue, err=str(e)) + did = False + if not did: + self._stop.wait(interval) + + def run(self, once=False): + if once: + for q in self.handlers: + self._process_one(q) + return + max_workers = sum(h["concurrency"] for h in self.handlers.values()) + executor = ThreadPoolExecutor(max_workers=max_workers) + for q, cfg in self.handlers.items(): + for _ in range(cfg["concurrency"]): + executor.submit(self._loop, q) + for sig in (signal.SIGTERM, signal.SIGINT): + signal.signal(sig, lambda *_: self._stop.set()) + while not self._stop.is_set(): + time.sleep(0.5) + executor.shutdown(wait=True) + + +if __name__ == "__main__": + Runner().run()