Mirrors lib/ingest/safe_fetch.js. Same scheme + IP-range checks and VOID_INGEST_ALLOW_PRIVATE env gate. Used by sync.source_doc and any future Python workers that fetch user-controlled URLs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
"""Python port of lib/ingest/safe_fetch.js.
|
|
|
|
Same SSRF mitigations the Node side ships:
|
|
- http/https only
|
|
- DNS-resolved hostnames checked against loopback / RFC1918 /
|
|
link-local / CGNAT / IPv6 ULA + link-local
|
|
- Redirects followed manually with the same checks on each hop
|
|
- VOID_INGEST_ALLOW_PRIVATE=true gate for offline-fixture tests
|
|
"""
|
|
import socket
|
|
import ipaddress
|
|
import urllib.request
|
|
import urllib.error
|
|
import os
|
|
from urllib.parse import urlparse
|
|
|
|
BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [
|
|
"0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8",
|
|
"172.16.0.0/12", "192.168.0.0/16",
|
|
"169.254.0.0/16", "100.64.0.0/10",
|
|
]]
|
|
|
|
|
|
class SafeFetchError(Exception):
|
|
pass
|
|
|
|
|
|
def _is_blocked(addr):
|
|
if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true":
|
|
return False
|
|
try:
|
|
ip = ipaddress.ip_address(addr)
|
|
except ValueError:
|
|
return True
|
|
if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
|
|
return True
|
|
if isinstance(ip, ipaddress.IPv4Address):
|
|
return any(ip in n for n in BLOCK_V4_NETS)
|
|
# IPv6: ULA + link-local
|
|
if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _resolve(host):
|
|
try:
|
|
infos = socket.getaddrinfo(host, None)
|
|
except socket.gaierror as e:
|
|
raise SafeFetchError(f"no DNS for {host}: {e}")
|
|
addrs = list({i[4][0] for i in infos})
|
|
for a in addrs:
|
|
if _is_blocked(a):
|
|
raise SafeFetchError(f"{host} resolves to blocked address {a}")
|
|
if not addrs:
|
|
raise SafeFetchError(f"no addresses for {host}")
|
|
return addrs[0]
|
|
|
|
|
|
def safe_fetch(url, *, headers=None, timeout=15, max_hops=5):
|
|
current = url
|
|
for hop in range(max_hops + 1):
|
|
u = urlparse(current)
|
|
if u.scheme not in ("http", "https"):
|
|
raise SafeFetchError(f"unsupported scheme {u.scheme}")
|
|
host = u.hostname
|
|
try:
|
|
ipaddress.ip_address(host)
|
|
if _is_blocked(host):
|
|
raise SafeFetchError(f"blocked literal IP {host}")
|
|
except ValueError:
|
|
_resolve(host)
|
|
req = urllib.request.Request(current, headers=headers or {})
|
|
try:
|
|
opener = urllib.request.build_opener()
|
|
with opener.open(req, timeout=timeout) as r:
|
|
return r.read()
|
|
except urllib.error.HTTPError as e:
|
|
if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops:
|
|
current = e.headers["Location"]
|
|
continue
|
|
raise
|
|
raise SafeFetchError(f"too many redirects ({max_hops})")
|