import signal import threading import time from concurrent.futures import ThreadPoolExecutor from .boss import Boss from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL from .handlers import REGISTRY from .log import log class Runner: def __init__(self, dsn=None, handlers=None): self.boss = Boss(dsn or DATABASE_URL) self.handlers = handlers or { name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY } self._stop = threading.Event() def _process_one(self, queue): job = self.boss.claim(queue) if not job: return False log.info("job_claimed", job_id=str(job["id"]), name=queue) handler = REGISTRY[queue] try: out = handler(job["data"] or {}) self.boss.complete(job["id"], out or {}) log.info("job_completed", job_id=str(job["id"]), name=queue) except Exception as e: log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e)) self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)}) return True def _loop(self, queue): interval = POLL_INTERVAL_MS / 1000.0 while not self._stop.is_set(): try: did = self._process_one(queue) except Exception as e: log.error("loop_error", queue=queue, err=str(e)) did = False if not did: self._stop.wait(interval) def run(self, once=False): if once: for q in self.handlers: self._process_one(q) return max_workers = sum(h["concurrency"] for h in self.handlers.values()) executor = ThreadPoolExecutor(max_workers=max_workers) for q, cfg in self.handlers.items(): for _ in range(cfg["concurrency"]): executor.submit(self._loop, q) for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, lambda *_: self._stop.set()) while not self._stop.is_set(): time.sleep(0.5) executor.shutdown(wait=True) if __name__ == "__main__": Runner().run()