Two real findings from the security reviewer: 1. urllib auto-follows 3xx redirects via the default HTTPRedirectHandler. The previous code's hop loop never ran — urllib silently followed. Replaced with http.client + a manual hop loop. Every hop re-runs _validate_url, so an open-redirect to 127.0.0.1 / RFC1918 / metadata gets caught on the second hop. 2. DNS TOCTOU — _resolve() validated but urllib.request re-resolved on connect. Now the connection is pinned to the validated IP via a PinnedHTTPConn / PinnedHTTPSConn subclass that overrides connect() to bind socket.create_connection to (addr, port). For HTTPS, TLS server_hostname is set to the original host so SNI + cert verification still work against the named host while the TCP destination is the pinned IP. Tests added: redirect-to-loopback short-circuits at validation; too-many-redirects exhausts max_hops; 2xx returns body; non-2xx raises. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
92 lines
3.2 KiB
Python
92 lines
3.2 KiB
Python
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
from void_workers.safe_fetch import safe_fetch, SafeFetchError, _validate_url
|
|
|
|
|
|
def test_rejects_file_scheme():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("file:///etc/passwd")
|
|
|
|
|
|
def test_rejects_loopback():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://127.0.0.1/x")
|
|
|
|
|
|
def test_rejects_rfc1918():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://192.168.1.1/x")
|
|
|
|
|
|
def test_rejects_metadata_endpoint():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://169.254.169.254/latest/")
|
|
|
|
|
|
def test_rejects_cgnat():
|
|
with pytest.raises(SafeFetchError):
|
|
safe_fetch("http://100.64.0.1/x")
|
|
|
|
|
|
def test_redirect_to_loopback_is_rejected():
|
|
"""Open-redirect attack: a public URL 302s to http://127.0.0.1/.
|
|
The loop re-runs full validation on the next hop, so the redirect
|
|
target's literal IP triggers _validate_url and raises."""
|
|
redirect_headers = MagicMock()
|
|
redirect_headers.get.return_value = "http://127.0.0.1/admin"
|
|
call_count = {"n": 0}
|
|
|
|
def side_effect(url, **_kw):
|
|
call_count["n"] += 1
|
|
if call_count["n"] == 1:
|
|
# First hop: 302 to a blocked address.
|
|
return (302, redirect_headers, b"")
|
|
# Second hop: should never reach _request_one because _validate_url
|
|
# in _request_one will raise before issuing the request.
|
|
raise AssertionError("second hop was issued — validation bypassed!")
|
|
|
|
# Patch _request_one to short-circuit only the FIRST hop's network IO.
|
|
# The second hop's call still goes through the real _request_one which
|
|
# invokes the real _validate_url — that's where the blocked-IP error
|
|
# comes from.
|
|
import void_workers.safe_fetch as sf
|
|
real_request_one = sf._request_one
|
|
def hybrid(url, **kw):
|
|
if call_count["n"] == 0:
|
|
return side_effect(url, **kw)
|
|
return real_request_one(url, **kw)
|
|
with patch.object(sf, "_request_one", side_effect=hybrid):
|
|
with pytest.raises(SafeFetchError, match="blocked"):
|
|
safe_fetch("http://example.com/")
|
|
|
|
|
|
def test_too_many_redirects():
|
|
redirect_headers = MagicMock()
|
|
redirect_headers.get.return_value = "http://example.com/loop"
|
|
with patch("void_workers.safe_fetch._request_one",
|
|
return_value=(302, redirect_headers, b"")):
|
|
with pytest.raises(SafeFetchError, match="too many redirects"):
|
|
safe_fetch("http://example.com/loop", max_hops=2)
|
|
|
|
|
|
def test_validate_url_returns_pinned_address_for_literal_public_ip():
|
|
scheme, host, port, path, addr, family = _validate_url("http://8.8.8.8:80/x")
|
|
assert host == "8.8.8.8"
|
|
assert addr == "8.8.8.8"
|
|
assert port == 80
|
|
|
|
|
|
def test_2xx_returns_body():
|
|
headers = MagicMock()
|
|
with patch("void_workers.safe_fetch._request_one",
|
|
return_value=(200, headers, b"hello")):
|
|
assert safe_fetch("http://example.com/x") == b"hello"
|
|
|
|
|
|
def test_non_2xx_raises():
|
|
headers = MagicMock()
|
|
with patch("void_workers.safe_fetch._request_one",
|
|
return_value=(500, headers, b"err")):
|
|
with pytest.raises(SafeFetchError, match="http 500"):
|
|
safe_fetch("http://example.com/x")
|