diff --git a/workers/tests/test_safe_fetch.py b/workers/tests/test_safe_fetch.py new file mode 100644 index 0000000..a9cb95b --- /dev/null +++ b/workers/tests/test_safe_fetch.py @@ -0,0 +1,27 @@ +import pytest +from void_workers.safe_fetch import safe_fetch, SafeFetchError + + +def test_rejects_file_scheme(): + with pytest.raises(SafeFetchError): + safe_fetch("file:///etc/passwd") + + +def test_rejects_loopback(): + with pytest.raises(SafeFetchError): + safe_fetch("http://127.0.0.1/x") + + +def test_rejects_rfc1918(): + with pytest.raises(SafeFetchError): + safe_fetch("http://192.168.1.1/x") + + +def test_rejects_metadata_endpoint(): + with pytest.raises(SafeFetchError): + safe_fetch("http://169.254.169.254/latest/") + + +def test_rejects_cgnat(): + with pytest.raises(SafeFetchError): + safe_fetch("http://100.64.0.1/x") diff --git a/workers/void_workers/safe_fetch.py b/workers/void_workers/safe_fetch.py new file mode 100644 index 0000000..c7bc89f --- /dev/null +++ b/workers/void_workers/safe_fetch.py @@ -0,0 +1,82 @@ +"""Python port of lib/ingest/safe_fetch.js. + +Same SSRF mitigations the Node side ships: +- http/https only +- DNS-resolved hostnames checked against loopback / RFC1918 / + link-local / CGNAT / IPv6 ULA + link-local +- Redirects followed manually with the same checks on each hop +- VOID_INGEST_ALLOW_PRIVATE=true gate for offline-fixture tests +""" +import socket +import ipaddress +import urllib.request +import urllib.error +import os +from urllib.parse import urlparse + +BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [ + "0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8", + "172.16.0.0/12", "192.168.0.0/16", + "169.254.0.0/16", "100.64.0.0/10", +]] + + +class SafeFetchError(Exception): + pass + + +def _is_blocked(addr): + if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true": + return False + try: + ip = ipaddress.ip_address(addr) + except ValueError: + return True + if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified: + return True + if isinstance(ip, ipaddress.IPv4Address): + return any(ip in n for n in BLOCK_V4_NETS) + # IPv6: ULA + link-local + if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"): + return True + return False + + +def _resolve(host): + try: + infos = socket.getaddrinfo(host, None) + except socket.gaierror as e: + raise SafeFetchError(f"no DNS for {host}: {e}") + addrs = list({i[4][0] for i in infos}) + for a in addrs: + if _is_blocked(a): + raise SafeFetchError(f"{host} resolves to blocked address {a}") + if not addrs: + raise SafeFetchError(f"no addresses for {host}") + return addrs[0] + + +def safe_fetch(url, *, headers=None, timeout=15, max_hops=5): + current = url + for hop in range(max_hops + 1): + u = urlparse(current) + if u.scheme not in ("http", "https"): + raise SafeFetchError(f"unsupported scheme {u.scheme}") + host = u.hostname + try: + ipaddress.ip_address(host) + if _is_blocked(host): + raise SafeFetchError(f"blocked literal IP {host}") + except ValueError: + _resolve(host) + req = urllib.request.Request(current, headers=headers or {}) + try: + opener = urllib.request.build_opener() + with opener.open(req, timeout=timeout) as r: + return r.read() + except urllib.error.HTTPError as e: + if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops: + current = e.headers["Location"] + continue + raise + raise SafeFetchError(f"too many redirects ({max_hops})")