From 3e1dcbb7f85bfdf2385c23c6dc5c4316861ee53b Mon Sep 17 00:00:00 2001 From: root Date: Mon, 1 Jun 2026 04:43:26 +1000 Subject: [PATCH] feat(workers): pgboss claim/complete/fail via psycopg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the Boss class — SELECT … FOR UPDATE SKIP LOCKED to atomically claim, UPDATE state on completion. Retry semantics match pg-boss: exponential backoff via retry_count / retry_delay / retry_backoff. Forces client_encoding=UTF8 on every connection. The void2-db cluster was initialized as SQL_ASCII so psycopg refuses to decode text by default; UTF8 client_encoding works because the data is already UTF-8. Node's pg lib is more forgiving and didn't surface this. Co-Authored-By: Claude Opus 4.7 --- workers/tests/conftest.py | 43 ++++++++++++++++++ workers/tests/test_boss.py | 48 ++++++++++++++++++++ workers/void_workers/boss.py | 88 ++++++++++++++++++++++++++++++++++++ 3 files changed, 179 insertions(+) create mode 100644 workers/tests/conftest.py create mode 100644 workers/tests/test_boss.py create mode 100644 workers/void_workers/boss.py diff --git a/workers/tests/conftest.py b/workers/tests/conftest.py new file mode 100644 index 0000000..a3a1cb8 --- /dev/null +++ b/workers/tests/conftest.py @@ -0,0 +1,43 @@ +import os +import pytest +import psycopg + +DB_URL = os.environ["DATABASE_URL"] + +@pytest.fixture +def conn(): + with psycopg.connect(DB_URL, autocommit=True, client_encoding='UTF8') as c: + yield c + +@pytest.fixture(autouse=True) +def reset_pgboss(conn): + """Drop the pgboss schema before each test. Tests that need it call ensure_pgboss(conn).""" + conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE") + yield + +def ensure_pgboss(conn): + """Bring up a minimal pgboss schema matching the columns boss.py reads/writes.""" + conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss") + conn.execute(""" + CREATE TABLE IF NOT EXISTS pgboss.job ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + name text NOT NULL, + priority int NOT NULL DEFAULT 0, + data jsonb, + state text NOT NULL DEFAULT 'created', + retry_limit int NOT NULL DEFAULT 5, + retry_count int NOT NULL DEFAULT 0, + retry_delay int NOT NULL DEFAULT 10, + retry_backoff boolean NOT NULL DEFAULT true, + start_after timestamptz NOT NULL DEFAULT now(), + started_on timestamptz, + completed_on timestamptz, + created_on timestamptz NOT NULL DEFAULT now(), + output jsonb + ) + """) + +@pytest.fixture +def boss_ready(conn): + ensure_pgboss(conn) + yield conn diff --git a/workers/tests/test_boss.py b/workers/tests/test_boss.py new file mode 100644 index 0000000..4dffc9d --- /dev/null +++ b/workers/tests/test_boss.py @@ -0,0 +1,48 @@ +import json +from void_workers.boss import Boss +from void_workers.config import DATABASE_URL + +def test_claim_returns_none_when_no_jobs(boss_ready): + boss = Boss(dsn=DATABASE_URL) + assert boss.claim("echo") is None + +def test_claim_then_complete_round_trip(boss_ready): + boss_ready.execute( + "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", + (json.dumps({"ping": 1}),) + ) + boss = Boss(dsn=DATABASE_URL) + job = boss.claim("echo") + assert job is not None + assert job["data"] == {"ping": 1} + boss.complete(job["id"], {"pong": 1}) + row = boss_ready.execute( + "SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],) + ).fetchone() + assert row[0] == "completed" + assert row[1] == {"pong": 1} + +def test_fail_records_output_and_state(boss_ready): + boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')") + boss = Boss(dsn=DATABASE_URL) + job = boss.claim("echo") + boss.fail(job["id"], {"name": "BoomError", "message": "boom"}) + row = boss_ready.execute( + "SELECT state, output, retry_count FROM pgboss.job WHERE id=%s", + (job["id"],) + ).fetchone() + # retry_limit defaults to 5 and retry_count was 0 → should move to 'retry' + assert row[0] == "retry" + assert row[2] == 1 + +def test_fail_past_retry_limit_marks_failed(boss_ready): + boss_ready.execute( + "INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)" + ) + boss = Boss(dsn=DATABASE_URL) + job = boss.claim("echo") + boss.fail(job["id"], {"name": "Done"}) + row = boss_ready.execute( + "SELECT state FROM pgboss.job WHERE id=%s", (job["id"],) + ).fetchone() + assert row[0] == "failed" diff --git a/workers/void_workers/boss.py b/workers/void_workers/boss.py new file mode 100644 index 0000000..7dbfce6 --- /dev/null +++ b/workers/void_workers/boss.py @@ -0,0 +1,88 @@ +import json +import psycopg +from psycopg.rows import dict_row + + +class Boss: + """psycopg-based pg-boss-compatible job claimer. + + Matches the SQL semantics of pg-boss v10 close enough that the same + pgboss.job partition can be operated on by either side. We use + SELECT ... FOR UPDATE SKIP LOCKED to atomically claim work, then + UPDATE state on completion/failure. + """ + + def __init__(self, dsn): + self.dsn = dsn + + def _conn(self): + return psycopg.connect( + self.dsn, autocommit=False, row_factory=dict_row, client_encoding='UTF8' + ) + + def claim(self, queue): + """Atomically claim one job for the given queue. Returns dict or None.""" + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + SELECT id, data, retry_count, retry_limit + FROM pgboss.job + WHERE name=%s + AND state IN ('created','retry') + AND start_after <= now() + ORDER BY priority DESC, created_on ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + """, (queue,)) + row = cur.fetchone() + if not row: + return None + cur.execute( + "UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s", + (row["id"],) + ) + conn.commit() + return row + + def complete(self, job_id, output): + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute(""" + UPDATE pgboss.job + SET state='completed', completed_on=now(), output=%s + WHERE id=%s + """, (json.dumps(output), job_id)) + conn.commit() + + def fail(self, job_id, output): + """Mark the job retry (if budget left) or failed (otherwise).""" + with self._conn() as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT retry_count, retry_limit, retry_delay, retry_backoff " + "FROM pgboss.job WHERE id=%s FOR UPDATE", + (job_id,) + ) + row = cur.fetchone() + if not row: + return + new_count = row["retry_count"] + 1 + if new_count > row["retry_limit"]: + cur.execute( + "UPDATE pgboss.job SET state='failed', output=%s, " + "completed_on=now() WHERE id=%s", + (json.dumps(output), job_id) + ) + else: + delay = row["retry_delay"] + if row["retry_backoff"]: + delay = delay * (2 ** (new_count - 1)) + cur.execute(""" + UPDATE pgboss.job + SET state='retry', + retry_count=%s, + start_after=now() + (%s || ' seconds')::interval, + output=%s + WHERE id=%s + """, (new_count, str(delay), json.dumps(output), job_id)) + conn.commit()