Adds the Boss class — SELECT … FOR UPDATE SKIP LOCKED to atomically claim, UPDATE state on completion. Retry semantics match pg-boss: exponential backoff via retry_count / retry_delay / retry_backoff. Forces client_encoding=UTF8 on every connection. The void2-db cluster was initialized as SQL_ASCII so psycopg refuses to decode text by default; UTF8 client_encoding works because the data is already UTF-8. Node's pg lib is more forgiving and didn't surface this. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
49 lines
1.7 KiB
Python
49 lines
1.7 KiB
Python
import json
|
|
from void_workers.boss import Boss
|
|
from void_workers.config import DATABASE_URL
|
|
|
|
def test_claim_returns_none_when_no_jobs(boss_ready):
|
|
boss = Boss(dsn=DATABASE_URL)
|
|
assert boss.claim("echo") is None
|
|
|
|
def test_claim_then_complete_round_trip(boss_ready):
|
|
boss_ready.execute(
|
|
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
|
|
(json.dumps({"ping": 1}),)
|
|
)
|
|
boss = Boss(dsn=DATABASE_URL)
|
|
job = boss.claim("echo")
|
|
assert job is not None
|
|
assert job["data"] == {"ping": 1}
|
|
boss.complete(job["id"], {"pong": 1})
|
|
row = boss_ready.execute(
|
|
"SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],)
|
|
).fetchone()
|
|
assert row[0] == "completed"
|
|
assert row[1] == {"pong": 1}
|
|
|
|
def test_fail_records_output_and_state(boss_ready):
|
|
boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')")
|
|
boss = Boss(dsn=DATABASE_URL)
|
|
job = boss.claim("echo")
|
|
boss.fail(job["id"], {"name": "BoomError", "message": "boom"})
|
|
row = boss_ready.execute(
|
|
"SELECT state, output, retry_count FROM pgboss.job WHERE id=%s",
|
|
(job["id"],)
|
|
).fetchone()
|
|
# retry_limit defaults to 5 and retry_count was 0 → should move to 'retry'
|
|
assert row[0] == "retry"
|
|
assert row[2] == 1
|
|
|
|
def test_fail_past_retry_limit_marks_failed(boss_ready):
|
|
boss_ready.execute(
|
|
"INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)"
|
|
)
|
|
boss = Boss(dsn=DATABASE_URL)
|
|
job = boss.claim("echo")
|
|
boss.fail(job["id"], {"name": "Done"})
|
|
row = boss_ready.execute(
|
|
"SELECT state FROM pgboss.job WHERE id=%s", (job["id"],)
|
|
).fetchone()
|
|
assert row[0] == "failed"
|