|
| 1 | +"""Shared ``api_url`` validation used by every SDK config boundary. |
| 2 | +
|
| 3 | +Centralizes the rule that an ``api_url`` must be an http(s) URL with a |
| 4 | +host, and adds SSRF defense: link-local / cloud-metadata addresses |
| 5 | +(notably the ``169.254.169.254`` IMDS endpoint) are always rejected. |
| 6 | +Loopback / private / reserved IP literals are *allowed by default* — the |
| 7 | +SDK routinely connects to local and self-hosted cores — and only rejected |
| 8 | +when the caller opts into strict mode via ``allow_private_networks=False``. |
| 9 | +This mirrors the Node SDK's posture for cross-SDK parity. |
| 10 | +
|
| 11 | +Hostnames are intentionally NOT resolved here. Config-time DNS resolution |
| 12 | +would be slow, racy, and still bypassable via DNS rebinding, so a literal |
| 13 | +hostname (including ``localhost`` and ``metadata.google.internal``) passes |
| 14 | +the scheme/host checks. Deployments that must defend against |
| 15 | +hostname-based metadata access should pin ``api_url`` to a vetted host. |
| 16 | +""" |
| 17 | + |
| 18 | +from __future__ import annotations |
| 19 | + |
| 20 | +import ipaddress |
| 21 | +import socket |
| 22 | +from urllib.parse import urlparse |
| 23 | + |
| 24 | +_ALLOWED_SCHEMES = frozenset({"http", "https"}) |
| 25 | + |
| 26 | + |
| 27 | +def _parse_ip(host: str) -> ipaddress.IPv4Address | ipaddress.IPv6Address | None: |
| 28 | + """Return the parsed IP when ``host`` is an IP literal, else ``None``. |
| 29 | +
|
| 30 | + Covers canonical literals AND the legacy IPv4 encodings the C resolver |
| 31 | + (``inet_aton``/``getaddrinfo``) still accepts — decimal (``2852039166``), |
| 32 | + hex (``0xA9FEA9FE``), octal (``0251.0376.0251.0376``) and short forms |
| 33 | + (``127.1``). Without this they slip through as un-resolved "hostnames" and |
| 34 | + defeat the SSRF checks, since the HTTP client resolves them to the real |
| 35 | + address (e.g. ``http://2852039166/`` → ``169.254.169.254``). |
| 36 | +
|
| 37 | + Args: |
| 38 | + host: The URL host component. |
| 39 | +
|
| 40 | + Returns: |
| 41 | + The parsed/canonicalized IP address, or ``None`` when ``host`` is a |
| 42 | + genuine (non-numeric) hostname. |
| 43 | + """ |
| 44 | + try: |
| 45 | + return _collapse_mapped(ipaddress.ip_address(host)) |
| 46 | + except ValueError: |
| 47 | + pass |
| 48 | + try: |
| 49 | + return ipaddress.IPv4Address(socket.inet_aton(host)) |
| 50 | + except (OSError, ValueError): |
| 51 | + return None |
| 52 | + |
| 53 | + |
| 54 | +def _collapse_mapped( |
| 55 | + ip: ipaddress.IPv4Address | ipaddress.IPv6Address, |
| 56 | +) -> ipaddress.IPv4Address | ipaddress.IPv6Address: |
| 57 | + """Reclassify an IPv4-mapped IPv6 address (``::ffff:a.b.c.d``) as its IPv4. |
| 58 | +
|
| 59 | + ``IPv6Address.is_link_local`` only delegates to the embedded IPv4 on |
| 60 | + newer CPython, so on Python 3.10/3.11 ``::ffff:169.254.169.254`` would |
| 61 | + otherwise read as a benign global IPv6 and bypass the metadata block. |
| 62 | + Collapsing to the embedded IPv4 makes classification deterministic |
| 63 | + across all supported interpreters and matches the Node SDK. |
| 64 | +
|
| 65 | + Args: |
| 66 | + ip: A parsed IP literal. |
| 67 | +
|
| 68 | + Returns: |
| 69 | + The embedded IPv4 when ``ip`` is IPv4-mapped, otherwise ``ip``. |
| 70 | + """ |
| 71 | + mapped = getattr(ip, "ipv4_mapped", None) |
| 72 | + return mapped if mapped is not None else ip |
| 73 | + |
| 74 | + |
| 75 | +def validate_api_url(value: str, *, allow_private_networks: bool = True) -> str: |
| 76 | + """Validate and normalize an ``api_url``, guarding against SSRF. |
| 77 | +
|
| 78 | + Args: |
| 79 | + value: The candidate URL. |
| 80 | + allow_private_networks: Defaults to ``True`` — loopback / private / |
| 81 | + reserved IP literals are permitted because the SDK routinely |
| 82 | + connects to local and self-hosted cores. Pass ``False`` to reject |
| 83 | + those too (hardened multi-tenant deployments). Link-local / |
| 84 | + cloud-metadata addresses are rejected regardless of this flag. |
| 85 | +
|
| 86 | + Returns: |
| 87 | + The whitespace-stripped URL. |
| 88 | +
|
| 89 | + Raises: |
| 90 | + ValueError: If the scheme is not http(s), the host is missing, or |
| 91 | + the host is a disallowed IP literal. |
| 92 | + """ |
| 93 | + stripped = value.strip() |
| 94 | + parsed = urlparse(stripped) |
| 95 | + if parsed.scheme not in _ALLOWED_SCHEMES or not parsed.netloc: |
| 96 | + raise ValueError("api_url must be an http(s) URL") |
| 97 | + host = parsed.hostname |
| 98 | + if not host: |
| 99 | + raise ValueError("api_url must include a host") |
| 100 | + |
| 101 | + ip = _parse_ip(host) |
| 102 | + if ip is None: |
| 103 | + return stripped |
| 104 | + |
| 105 | + if ip.is_link_local: |
| 106 | + raise ValueError("api_url must not target a link-local or cloud-metadata address") |
| 107 | + if not allow_private_networks and ( |
| 108 | + ip.is_loopback or ip.is_private or ip.is_reserved or ip.is_multicast or ip.is_unspecified |
| 109 | + ): |
| 110 | + raise ValueError( |
| 111 | + "api_url must not target a loopback, private, or reserved address; " |
| 112 | + "set allow_private_networks=True to permit it" |
| 113 | + ) |
| 114 | + return stripped |
0 commit comments