Files
Void-Homelab/workers/tests/test_echo.py
2026-06-01 04:43:52 +10:00

19 lines
595 B
Python

import json
from void_workers.runner import Runner
from void_workers.config import DATABASE_URL
def test_echo_handler_runs(boss_ready):
boss_ready.execute(
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
(json.dumps({"ping": 42}),)
)
runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}})
runner.run(once=True)
row = boss_ready.execute(
"SELECT state, output FROM pgboss.job WHERE name='echo' "
"ORDER BY created_on DESC LIMIT 1"
).fetchone()
assert row[0] == "completed"
assert row[1] == {"pong": 42}