"""Python port of lib/ingest/safe_fetch.js. Same SSRF mitigations the Node side ships: - http/https only - DNS-resolved hostnames checked against loopback / RFC1918 / link-local / CGNAT / IPv6 ULA + link-local - Redirects followed manually with the same checks on each hop - VOID_INGEST_ALLOW_PRIVATE=true gate for offline-fixture tests """ import socket import ipaddress import urllib.request import urllib.error import os from urllib.parse import urlparse BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [ "0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "169.254.0.0/16", "100.64.0.0/10", ]] class SafeFetchError(Exception): pass def _is_blocked(addr): if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true": return False try: ip = ipaddress.ip_address(addr) except ValueError: return True if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified: return True if isinstance(ip, ipaddress.IPv4Address): return any(ip in n for n in BLOCK_V4_NETS) # IPv6: ULA + link-local if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"): return True return False def _resolve(host): try: infos = socket.getaddrinfo(host, None) except socket.gaierror as e: raise SafeFetchError(f"no DNS for {host}: {e}") addrs = list({i[4][0] for i in infos}) for a in addrs: if _is_blocked(a): raise SafeFetchError(f"{host} resolves to blocked address {a}") if not addrs: raise SafeFetchError(f"no addresses for {host}") return addrs[0] def safe_fetch(url, *, headers=None, timeout=15, max_hops=5): current = url for hop in range(max_hops + 1): u = urlparse(current) if u.scheme not in ("http", "https"): raise SafeFetchError(f"unsupported scheme {u.scheme}") host = u.hostname try: ipaddress.ip_address(host) if _is_blocked(host): raise SafeFetchError(f"blocked literal IP {host}") except ValueError: _resolve(host) req = urllib.request.Request(current, headers=headers or {}) try: opener = urllib.request.build_opener() with opener.open(req, timeout=timeout) as r: return r.read() except urllib.error.HTTPError as e: if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops: current = e.headers["Location"] continue raise raise SafeFetchError(f"too many redirects ({max_hops})")