import subprocess from unittest.mock import patch from void_workers.handlers.video import handle as handle_video def _reset_void_schema(conn): conn.execute("DROP SCHEMA IF EXISTS public CASCADE") conn.execute("CREATE SCHEMA public") conn.execute("CREATE EXTENSION IF NOT EXISTS pgcrypto") conn.execute("CREATE EXTENSION IF NOT EXISTS vector") def _run_node_migrations(): subprocess.run( ["node", "lib/db/migrate.js", "up"], cwd="/project/src/void-v2", check=True ) def test_video_creates_ref_with_transcript_and_metadata(conn): _reset_void_schema(conn) _run_node_migrations() sp = conn.execute( "INSERT INTO spaces(slug, name) VALUES('plan4-vid', 'V') RETURNING id" ).fetchone()[0] info = { "title": "Sample video", "description": "a description", "duration": 90, "uploader": "Channel", "thumbnail": "https://i.ytimg.com/t.jpg" } with patch("void_workers.handlers.video._yt_dlp_info", return_value=info), \ patch("void_workers.handlers.video._yt_dlp_audio", return_value="/tmp/fake.opus"), \ patch("void_workers.handlers.video.whisper_transcribe", return_value="hello world transcript"), \ patch("os.unlink"): out = handle_video({"space_id": str(sp), "url": "https://youtu.be/abc"}) assert "ref_id" in out row = conn.execute( "SELECT title, body_text, source_kind FROM refs WHERE id=%s", (out["ref_id"],) ).fetchone() assert row[0] == "Sample video" assert "hello world" in row[1] assert row[2] == "youtube" def test_video_skipped_when_yt_dlp_fails(conn): _reset_void_schema(conn) _run_node_migrations() sp = conn.execute( "INSERT INTO spaces(slug, name) VALUES('plan4-vid2', 'V2') RETURNING id" ).fetchone()[0] with patch("void_workers.handlers.video._yt_dlp_info", return_value=None): out = handle_video({"space_id": str(sp), "url": "https://youtu.be/gone"}) assert out.get("skipped") == "yt-dlp"