19 lines
595 B
Python
19 lines
595 B
Python
import json
|
|
from void_workers.runner import Runner
|
|
from void_workers.config import DATABASE_URL
|
|
|
|
|
|
def test_echo_handler_runs(boss_ready):
|
|
boss_ready.execute(
|
|
"INSERT INTO pgboss.job(name, data) VALUES('echo', %s)",
|
|
(json.dumps({"ping": 42}),)
|
|
)
|
|
runner = Runner(dsn=DATABASE_URL, handlers={"echo": {"concurrency": 1}})
|
|
runner.run(once=True)
|
|
row = boss_ready.execute(
|
|
"SELECT state, output FROM pgboss.job WHERE name='echo' "
|
|
"ORDER BY created_on DESC LIMIT 1"
|
|
).fetchone()
|
|
assert row[0] == "completed"
|
|
assert row[1] == {"pong": 42}
|