feat(workers): pgboss claim/complete/fail via psycopg

Adds the Boss class — SELECT … FOR UPDATE SKIP LOCKED to atomically
claim, UPDATE state on completion. Retry semantics match pg-boss:
exponential backoff via retry_count / retry_delay / retry_backoff.

Forces client_encoding=UTF8 on every connection. The void2-db cluster
was initialized as SQL_ASCII so psycopg refuses to decode text by
default; UTF8 client_encoding works because the data is already UTF-8.
Node's pg lib is more forgiving and didn't surface this.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
root
2026-06-01 04:43:26 +10:00
parent 6e3798f6d1
commit 3e1dcbb7f8
3 changed files with 179 additions and 0 deletions

43
workers/tests/conftest.py Normal file
View File

@@ -0,0 +1,43 @@
import os
import pytest
import psycopg
DB_URL = os.environ["DATABASE_URL"]
@pytest.fixture
def conn():
with psycopg.connect(DB_URL, autocommit=True, client_encoding='UTF8') as c:
yield c
@pytest.fixture(autouse=True)
def reset_pgboss(conn):
"""Drop the pgboss schema before each test. Tests that need it call ensure_pgboss(conn)."""
conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE")
yield
def ensure_pgboss(conn):
"""Bring up a minimal pgboss schema matching the columns boss.py reads/writes."""
conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss")
conn.execute("""
CREATE TABLE IF NOT EXISTS pgboss.job (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
name text NOT NULL,
priority int NOT NULL DEFAULT 0,
data jsonb,
state text NOT NULL DEFAULT 'created',
retry_limit int NOT NULL DEFAULT 5,
retry_count int NOT NULL DEFAULT 0,
retry_delay int NOT NULL DEFAULT 10,
retry_backoff boolean NOT NULL DEFAULT true,
start_after timestamptz NOT NULL DEFAULT now(),
started_on timestamptz,
completed_on timestamptz,
created_on timestamptz NOT NULL DEFAULT now(),
output jsonb
)
""")
@pytest.fixture
def boss_ready(conn):
ensure_pgboss(conn)
yield conn

View File

@@ -0,0 +1,48 @@
import json
from void_workers.boss import Boss
from void_workers.config import DATABASE_URL
def test_claim_returns_none_when_no_jobs(boss_ready):
boss = Boss(dsn=DATABASE_URL)
assert boss.claim("echo") is None
def test_claim_then_complete_round_trip(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
(json.dumps({"ping": 1}),)
)
boss = Boss(dsn=DATABASE_URL)
job = boss.claim("echo")
assert job is not None
assert job["data"] == {"ping": 1}
boss.complete(job["id"], {"pong": 1})
row = boss_ready.execute(
"SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],)
).fetchone()
assert row[0] == "completed"
assert row[1] == {"pong": 1}
def test_fail_records_output_and_state(boss_ready):
boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')")
boss = Boss(dsn=DATABASE_URL)
job = boss.claim("echo")
boss.fail(job["id"], {"name": "BoomError", "message": "boom"})
row = boss_ready.execute(
"SELECT state, output, retry_count FROM pgboss.job WHERE id=%s",
(job["id"],)
).fetchone()
# retry_limit defaults to 5 and retry_count was 0 → should move to 'retry'
assert row[0] == "retry"
assert row[2] == 1
def test_fail_past_retry_limit_marks_failed(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)"
)
boss = Boss(dsn=DATABASE_URL)
job = boss.claim("echo")
boss.fail(job["id"], {"name": "Done"})
row = boss_ready.execute(
"SELECT state FROM pgboss.job WHERE id=%s", (job["id"],)
).fetchone()
assert row[0] == "failed"

View File

@@ -0,0 +1,88 @@
import json
import psycopg
from psycopg.rows import dict_row
class Boss:
"""psycopg-based pg-boss-compatible job claimer.
Matches the SQL semantics of pg-boss v10 close enough that the same
pgboss.job partition can be operated on by either side. We use
SELECT ... FOR UPDATE SKIP LOCKED to atomically claim work, then
UPDATE state on completion/failure.
"""
def __init__(self, dsn):
self.dsn = dsn
def _conn(self):
return psycopg.connect(
self.dsn, autocommit=False, row_factory=dict_row, client_encoding='UTF8'
)
def claim(self, queue):
"""Atomically claim one job for the given queue. Returns dict or None."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, data, retry_count, retry_limit
FROM pgboss.job
WHERE name=%s
AND state IN ('created','retry')
AND start_after <= now()
ORDER BY priority DESC, created_on ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
""", (queue,))
row = cur.fetchone()
if not row:
return None
cur.execute(
"UPDATE pgboss.job SET state='active', started_on=now() WHERE id=%s",
(row["id"],)
)
conn.commit()
return row
def complete(self, job_id, output):
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute("""
UPDATE pgboss.job
SET state='completed', completed_on=now(), output=%s
WHERE id=%s
""", (json.dumps(output), job_id))
conn.commit()
def fail(self, job_id, output):
"""Mark the job retry (if budget left) or failed (otherwise)."""
with self._conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT retry_count, retry_limit, retry_delay, retry_backoff "
"FROM pgboss.job WHERE id=%s FOR UPDATE",
(job_id,)
)
row = cur.fetchone()
if not row:
return
new_count = row["retry_count"] + 1
if new_count > row["retry_limit"]:
cur.execute(
"UPDATE pgboss.job SET state='failed', output=%s, "
"completed_on=now() WHERE id=%s",
(json.dumps(output), job_id)
)
else:
delay = row["retry_delay"]
if row["retry_backoff"]:
delay = delay * (2 ** (new_count - 1))
cur.execute("""
UPDATE pgboss.job
SET state='retry',
retry_count=%s,
start_after=now() + (%s || ' seconds')::interval,
output=%s
WHERE id=%s
""", (new_count, str(delay), json.dumps(output), job_id))
conn.commit()