import json from void_workers.boss import Boss from void_workers.config import DATABASE_URL def test_claim_returns_none_when_no_jobs(boss_ready): boss = Boss(dsn=DATABASE_URL) assert boss.claim("echo") is None def test_claim_then_complete_round_trip(boss_ready): boss_ready.execute( "INSERT INTO pgboss.job(name, data) VALUES('echo', %s)", (json.dumps({"ping": 1}),) ) boss = Boss(dsn=DATABASE_URL) job = boss.claim("echo") assert job is not None assert job["data"] == {"ping": 1} boss.complete(job["id"], {"pong": 1}) row = boss_ready.execute( "SELECT state, output FROM pgboss.job WHERE id=%s", (job["id"],) ).fetchone() assert row[0] == "completed" assert row[1] == {"pong": 1} def test_fail_records_output_and_state(boss_ready): boss_ready.execute("INSERT INTO pgboss.job(name) VALUES('echo')") boss = Boss(dsn=DATABASE_URL) job = boss.claim("echo") boss.fail(job["id"], {"name": "BoomError", "message": "boom"}) row = boss_ready.execute( "SELECT state, output, retry_count FROM pgboss.job WHERE id=%s", (job["id"],) ).fetchone() # retry_limit defaults to 5 and retry_count was 0 → should move to 'retry' assert row[0] == "retry" assert row[2] == 1 def test_fail_past_retry_limit_marks_failed(boss_ready): boss_ready.execute( "INSERT INTO pgboss.job(name, retry_count, retry_limit) VALUES('echo', 5, 5)" ) boss = Boss(dsn=DATABASE_URL) job = boss.claim("echo") boss.fail(job["id"], {"name": "Done"}) row = boss_ready.execute( "SELECT state FROM pgboss.job WHERE id=%s", (job["id"],) ).fetchone() assert row[0] == "failed"