Files
Void-Homelab/workers/tests/test_boss.py
root 3e1dcbb7f8 feat(workers): pgboss claim/complete/fail via psycopg
Adds the Boss class — SELECT … FOR UPDATE SKIP LOCKED to atomically
claim, UPDATE state on completion. Retry semantics match pg-boss:
exponential backoff via retry_count / retry_delay / retry_backoff.

Forces client_encoding=UTF8 on every connection. The void2-db cluster
was initialized as SQL_ASCII so psycopg refuses to decode text by
default; UTF8 client_encoding works because the data is already UTF-8.
Node's pg lib is more forgiving and didn't surface this.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-01 04:43:26 +10:00

49 lines
1.7 KiB
Python

import json
from void_workers.boss import Boss
from void_workers.config import DATABASE_URL
def test_claim_returns_none_when_no_jobs(boss_ready):
boss = Boss(dsn=DATABASE_URL)
assert boss.claim("echo") is None
def test_claim_then_complete_round_trip(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
(json.dumps({"ping": 1}),)
)
boss = Boss(dsn=DATABASE_URL)
job = boss.claim("echo")
assert job is not None
assert job["data"] == {"ping": 1}
boss.complete(job["id"], {"pong": 1})
row = boss_ready.execute(
"SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],)
).fetchone()
assert row[0] == "completed"
assert row[1] == {"pong": 1}
def test_fail_records_output_and_state(boss_ready):
boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')")
boss = Boss(dsn=DATABASE_URL)
job = boss.claim("echo")
boss.fail(job["id"], {"name": "BoomError", "message": "boom"})
row = boss_ready.execute(
"SELECT state, output, retry_count FROM pgboss.job WHERE id=%s",
(job["id"],)
).fetchone()
# retry_limit defaults to 5 and retry_count was 0 → should move to 'retry'
assert row[0] == "retry"
assert row[2] == 1
def test_fail_past_retry_limit_marks_failed(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)"
)
boss = Boss(dsn=DATABASE_URL)
job = boss.claim("echo")
boss.fail(job["id"], {"name": "Done"})
row = boss_ready.execute(
"SELECT state FROM pgboss.job WHERE id=%s", (job["id"],)
).fetchone()
assert row[0] == "failed"