import os import pytest import psycopg DB_URL = os.environ["DATABASE_URL"] @pytest.fixture def conn(): with psycopg.connect(DB_URL, autocommit=True, client_encoding='UTF8') as c: yield c @pytest.fixture(autouse=True) def reset_pgboss(conn): """Drop the pgboss schema before each test. Tests that need it call ensure_pgboss(conn).""" conn.execute("DROP SCHEMA IF EXISTS pgboss CASCADE") yield def ensure_pgboss(conn): """Bring up a minimal pgboss schema matching the columns boss.py reads/writes.""" conn.execute("CREATE SCHEMA IF NOT EXISTS pgboss") conn.execute(""" CREATE TABLE IF NOT EXISTS pgboss.job ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), name text NOT NULL, priority int NOT NULL DEFAULT 0, data jsonb, state text NOT NULL DEFAULT 'created', retry_limit int NOT NULL DEFAULT 5, retry_count int NOT NULL DEFAULT 0, retry_delay int NOT NULL DEFAULT 10, retry_backoff boolean NOT NULL DEFAULT true, start_after timestamptz NOT NULL DEFAULT now(), started_on timestamptz, completed_on timestamptz, created_on timestamptz NOT NULL DEFAULT now(), output jsonb ) """) @pytest.fixture def boss_ready(conn): ensure_pgboss(conn) yield conn