feat(workers): runner loop + echo handler
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
18
workers/tests/test_echo.py
Normal file
18
workers/tests/test_echo.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
import json
|
||||||
|
from void_workers.runner import Runner
|
||||||
|
from void_workers.config import DATABASE_URL
|
||||||
|
|
||||||
|
|
||||||
|
def test_echo_handler_runs(boss_ready):
|
||||||
|
boss_ready.execute(
|
||||||
|
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
|
||||||
|
(json.dumps({"ping": 42}),)
|
||||||
|
)
|
||||||
|
runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}})
|
||||||
|
runner.run(once=True)
|
||||||
|
row = boss_ready.execute(
|
||||||
|
"SELECT state, output FROM pgboss.job WHERE name='echo' "
|
||||||
|
"ORDER BY created_on DESC LIMIT 1"
|
||||||
|
).fetchone()
|
||||||
|
assert row[0] == "completed"
|
||||||
|
assert row[1] == {"pong": 42}
|
||||||
5
workers/void_workers/handlers/__init__.py
Normal file
5
workers/void_workers/handlers/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
from . import echo
|
||||||
|
|
||||||
|
REGISTRY = {
|
||||||
|
echo.NAME: echo.handle,
|
||||||
|
}
|
||||||
5
workers/void_workers/handlers/echo.py
Normal file
5
workers/void_workers/handlers/echo.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
NAME = "echo"
|
||||||
|
|
||||||
|
|
||||||
|
def handle(job_data: dict) -> dict:
|
||||||
|
return {"pong": job_data.get("ping", 0)}
|
||||||
63
workers/void_workers/runner.py
Normal file
63
workers/void_workers/runner.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
import signal
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from .boss import Boss
|
||||||
|
from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL
|
||||||
|
from .handlers import REGISTRY
|
||||||
|
from .log import log
|
||||||
|
|
||||||
|
|
||||||
|
class Runner:
|
||||||
|
def __init__(self, dsn=None, handlers=None):
|
||||||
|
self.boss = Boss(dsn or DATABASE_URL)
|
||||||
|
self.handlers = handlers or {
|
||||||
|
name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY
|
||||||
|
}
|
||||||
|
self._stop = threading.Event()
|
||||||
|
|
||||||
|
def _process_one(self, queue):
|
||||||
|
job = self.boss.claim(queue)
|
||||||
|
if not job:
|
||||||
|
return False
|
||||||
|
log.info("job_claimed", job_id=str(job["id"]), name=queue)
|
||||||
|
handler = REGISTRY[queue]
|
||||||
|
try:
|
||||||
|
out = handler(job["data"] or {})
|
||||||
|
self.boss.complete(job["id"], out or {})
|
||||||
|
log.info("job_completed", job_id=str(job["id"]), name=queue)
|
||||||
|
except Exception as e:
|
||||||
|
log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e))
|
||||||
|
self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)})
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _loop(self, queue):
|
||||||
|
interval = POLL_INTERVAL_MS / 1000.0
|
||||||
|
while not self._stop.is_set():
|
||||||
|
try:
|
||||||
|
did = self._process_one(queue)
|
||||||
|
except Exception as e:
|
||||||
|
log.error("loop_error", queue=queue, err=str(e))
|
||||||
|
did = False
|
||||||
|
if not did:
|
||||||
|
self._stop.wait(interval)
|
||||||
|
|
||||||
|
def run(self, once=False):
|
||||||
|
if once:
|
||||||
|
for q in self.handlers:
|
||||||
|
self._process_one(q)
|
||||||
|
return
|
||||||
|
max_workers = sum(h["concurrency"] for h in self.handlers.values())
|
||||||
|
executor = ThreadPoolExecutor(max_workers=max_workers)
|
||||||
|
for q, cfg in self.handlers.items():
|
||||||
|
for _ in range(cfg["concurrency"]):
|
||||||
|
executor.submit(self._loop, q)
|
||||||
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||||
|
signal.signal(sig, lambda *_: self._stop.set())
|
||||||
|
while not self._stop.is_set():
|
||||||
|
time.sleep(0.5)
|
||||||
|
executor.shutdown(wait=True)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
Runner().run()
|
||||||
Reference in New Issue
Block a user