import json import psycopg from psycopg.rows import dict_row class Boss: """psycopg-based pg-boss-compatible job claimer. Matches the SQL semantics of pg-boss v10 close enough that the same pgboss.job partition can be operated on by either side. We use SELECT ... FOR UPDATE SKIP LOCKED to atomically claim work, then UPDATE state on completion/failure. """ def __init__(self, dsn): self.dsn = dsn def _conn(self): return psycopg.connect( self.dsn, autocommit=False, row_factory=dict_row, client_encoding='UTF8' ) def claim(self, queue): """Atomically claim one job for the given queue. Returns dict or None.""" with self._conn() as conn: with conn.cursor() as cur: cur.execute(""" SELECT id, data, retry_count, retry_limit FROM pgboss.job WHERE name=%s AND state IN ('created','retry') AND start_after <= now() ORDER BY priority DESC, created_on ASC FOR UPDATE SKIP LOCKED LIMIT 1 """, (queue,)) row = cur.fetchone() if not row: return None cur.execute( "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s", (row["id"],) ) conn.commit() return row def complete(self, job_id, output): with self._conn() as conn: with conn.cursor() as cur: cur.execute(""" UPDATE pgboss.job SET state='completed', completed_on=now(), output=%s WHERE id=%s """, (json.dumps(output), job_id)) conn.commit() def fail(self, job_id, output): """Mark the job retry (if budget left) or failed (otherwise).""" with self._conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT retry_count, retry_limit, retry_delay, retry_backoff " "FROM pgboss.job WHERE id=%s FOR UPDATE", (job_id,) ) row = cur.fetchone() if not row: return new_count = row["retry_count"] + 1 if new_count > row["retry_limit"]: cur.execute( "UPDATE pgboss.job SET state='failed', output=%s, " "completed_on=now() WHERE id=%s", (json.dumps(output), job_id) ) else: delay = row["retry_delay"] if row["retry_backoff"]: delay = delay * (2 ** (new_count - 1)) cur.execute(""" UPDATE pgboss.job SET state='retry', retry_count=%s, start_after=now() + (%s || ' seconds')::interval, output=%s WHERE id=%s """, (new_count, str(delay), json.dumps(output), job_id)) conn.commit()