Files
Void-Homelab/workers/void_workers/runner.py
2026-06-01 04:43:52 +10:00

64 lines
2.1 KiB
Python

import signal
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from .boss import Boss
from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL
from .handlers import REGISTRY
from .log import log
class Runner:
def __init__(self, dsn=None, handlers=None):
self.boss = Boss(dsn or DATABASE_URL)
self.handlers = handlers or {
name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY
}
self._stop = threading.Event()
def _process_one(self, queue):
job = self.boss.claim(queue)
if not job:
return False
log.info("job_claimed", job_id=str(job["id"]), name=queue)
handler = REGISTRY[queue]
try:
out = handler(job["data"] or {})
self.boss.complete(job["id"], out or {})
log.info("job_completed", job_id=str(job["id"]), name=queue)
except Exception as e:
log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e))
self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)})
return True
def _loop(self, queue):
interval = POLL_INTERVAL_MS / 1000.0
while not self._stop.is_set():
try:
did = self._process_one(queue)
except Exception as e:
log.error("loop_error", queue=queue, err=str(e))
did = False
if not did:
self._stop.wait(interval)
def run(self, once=False):
if once:
for q in self.handlers:
self._process_one(q)
return
max_workers = sum(h["concurrency"] for h in self.handlers.values())
executor = ThreadPoolExecutor(max_workers=max_workers)
for q, cfg in self.handlers.items():
for _ in range(cfg["concurrency"]):
executor.submit(self._loop, q)
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda *_: self._stop.set())
while not self._stop.is_set():
time.sleep(0.5)
executor.shutdown(wait=True)
if __name__ == "__main__":
Runner().run()