64 lines
2.1 KiB
Python
64 lines
2.1 KiB
Python
import signal
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from .boss import Boss
|
|
from .config import CONCURRENCY, POLL_INTERVAL_MS, DATABASE_URL
|
|
from .handlers import REGISTRY
|
|
from .log import log
|
|
|
|
|
|
class Runner:
|
|
def __init__(self, dsn=None, handlers=None):
|
|
self.boss = Boss(dsn or DATABASE_URL)
|
|
self.handlers = handlers or {
|
|
name: {"concurrency": CONCURRENCY.get(name, 1)} for name in REGISTRY
|
|
}
|
|
self._stop = threading.Event()
|
|
|
|
def _process_one(self, queue):
|
|
job = self.boss.claim(queue)
|
|
if not job:
|
|
return False
|
|
log.info("job_claimed", job_id=str(job["id"]), name=queue)
|
|
handler = REGISTRY[queue]
|
|
try:
|
|
out = handler(job["data"] or {})
|
|
self.boss.complete(job["id"], out or {})
|
|
log.info("job_completed", job_id=str(job["id"]), name=queue)
|
|
except Exception as e:
|
|
log.error("job_failed", job_id=str(job["id"]), name=queue, err=str(e))
|
|
self.boss.fail(job["id"], {"name": type(e).__name__, "message": str(e)})
|
|
return True
|
|
|
|
def _loop(self, queue):
|
|
interval = POLL_INTERVAL_MS / 1000.0
|
|
while not self._stop.is_set():
|
|
try:
|
|
did = self._process_one(queue)
|
|
except Exception as e:
|
|
log.error("loop_error", queue=queue, err=str(e))
|
|
did = False
|
|
if not did:
|
|
self._stop.wait(interval)
|
|
|
|
def run(self, once=False):
|
|
if once:
|
|
for q in self.handlers:
|
|
self._process_one(q)
|
|
return
|
|
max_workers = sum(h["concurrency"] for h in self.handlers.values())
|
|
executor = ThreadPoolExecutor(max_workers=max_workers)
|
|
for q, cfg in self.handlers.items():
|
|
for _ in range(cfg["concurrency"]):
|
|
executor.submit(self._loop, q)
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
signal.signal(sig, lambda *_: self._stop.set())
|
|
while not self._stop.is_set():
|
|
time.sleep(0.5)
|
|
executor.shutdown(wait=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
Runner().run()
|