feat(workers): safe_fetch Python port

Mirrors lib/ingest/safe_fetch.js. Same scheme + IP-range checks and
VOID_INGEST_ALLOW_PRIVATE env gate. Used by sync.source_doc and any
future Python workers that fetch user-controlled URLs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
root
2026-06-01 10:12:47 +10:00
parent 65fd71dc0d
commit cd1d69c689
2 changed files with 109 additions and 0 deletions

View File

@@ -0,0 +1,27 @@
import pytest
from void_workers.safe_fetch import safe_fetch, SafeFetchError
def test_rejects_file_scheme():
with pytest.raises(SafeFetchError):
safe_fetch("file:///etc/passwd")
def test_rejects_loopback():
with pytest.raises(SafeFetchError):
safe_fetch("http://127.0.0.1/x")
def test_rejects_rfc1918():
with pytest.raises(SafeFetchError):
safe_fetch("http://192.168.1.1/x")
def test_rejects_metadata_endpoint():
with pytest.raises(SafeFetchError):
safe_fetch("http://169.254.169.254/latest/")
def test_rejects_cgnat():
with pytest.raises(SafeFetchError):
safe_fetch("http://100.64.0.1/x")

View File

@@ -0,0 +1,82 @@
"""Python port of lib/ingest/safe_fetch.js.
Same SSRF mitigations the Node side ships:
- http/https only
- DNS-resolved hostnames checked against loopback / RFC1918 /
link-local / CGNAT / IPv6 ULA + link-local
- Redirects followed manually with the same checks on each hop
- VOID_INGEST_ALLOW_PRIVATE=true gate for offline-fixture tests
"""
import socket
import ipaddress
import urllib.request
import urllib.error
import os
from urllib.parse import urlparse
BLOCK_V4_NETS = [ipaddress.ip_network(c) for c in [
"0.0.0.0/8", "127.0.0.0/8", "10.0.0.0/8",
"172.16.0.0/12", "192.168.0.0/16",
"169.254.0.0/16", "100.64.0.0/10",
]]
class SafeFetchError(Exception):
pass
def _is_blocked(addr):
if os.environ.get("VOID_INGEST_ALLOW_PRIVATE") == "true":
return False
try:
ip = ipaddress.ip_address(addr)
except ValueError:
return True
if ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_unspecified:
return True
if isinstance(ip, ipaddress.IPv4Address):
return any(ip in n for n in BLOCK_V4_NETS)
# IPv6: ULA + link-local
if ip in ipaddress.ip_network("fc00::/7") or ip in ipaddress.ip_network("fe80::/10"):
return True
return False
def _resolve(host):
try:
infos = socket.getaddrinfo(host, None)
except socket.gaierror as e:
raise SafeFetchError(f"no DNS for {host}: {e}")
addrs = list({i[4][0] for i in infos})
for a in addrs:
if _is_blocked(a):
raise SafeFetchError(f"{host} resolves to blocked address {a}")
if not addrs:
raise SafeFetchError(f"no addresses for {host}")
return addrs[0]
def safe_fetch(url, *, headers=None, timeout=15, max_hops=5):
current = url
for hop in range(max_hops + 1):
u = urlparse(current)
if u.scheme not in ("http", "https"):
raise SafeFetchError(f"unsupported scheme {u.scheme}")
host = u.hostname
try:
ipaddress.ip_address(host)
if _is_blocked(host):
raise SafeFetchError(f"blocked literal IP {host}")
except ValueError:
_resolve(host)
req = urllib.request.Request(current, headers=headers or {})
try:
opener = urllib.request.build_opener()
with opener.open(req, timeout=timeout) as r:
return r.read()
except urllib.error.HTTPError as e:
if e.code in (301, 302, 303, 307, 308) and "Location" in e.headers and hop < max_hops:
current = e.headers["Location"]
continue
raise
raise SafeFetchError(f"too many redirects ({max_hops})")