diff --git a/datadog_checks_base/changelog.d/22676.added b/datadog_checks_base/changelog.d/22676.added index ff63715e3c69d..8125821a3d951 100644 --- a/datadog_checks_base/changelog.d/22676.added +++ b/datadog_checks_base/changelog.d/22676.added @@ -1 +1 @@ -Add library-agnostic HTTP mocks/proto/exceptions and migrate intg tests. +Add library-agnostic HTTP mocks/proto/exceptions and migrate intg tests. Add ``HTTPXWrapper``, an httpx2-backed HTTP client opt-in via the ``use_httpx`` instance config flag. The default remains the existing requests-based client. diff --git a/datadog_checks_base/datadog_checks/base/checks/base.py b/datadog_checks_base/datadog_checks/base/checks/base.py index 13d107b5cea73..9ec690ab2432b 100644 --- a/datadog_checks_base/datadog_checks/base/checks/base.py +++ b/datadog_checks_base/datadog_checks/base/checks/base.py @@ -412,10 +412,16 @@ def http(self) -> HTTPClientProtocol: Only new checks or checks on Agent 6.13+ can and should use this for HTTP requests. """ if not hasattr(self, '_http'): - # See Performance Optimizations in this package's README.md. - from datadog_checks.base.utils.http import RequestsWrapper + instance = self.instance or {} + if is_affirmative(instance.get('use_httpx', False)): + from datadog_checks.base.utils.http_httpx import HTTPXWrapper + + self._http = HTTPXWrapper(instance, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log) + else: + # See Performance Optimizations in this package's README.md. + from datadog_checks.base.utils.http import RequestsWrapper - self._http = RequestsWrapper(self.instance or {}, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log) + self._http = RequestsWrapper(instance, self.init_config, self.HTTP_CONFIG_REMAPPER, self.log) return self._http diff --git a/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper/base_scraper.py b/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper/base_scraper.py index 24719c329efef..a3211f6c39405 100644 --- a/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper/base_scraper.py +++ b/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/scraper/base_scraper.py @@ -23,6 +23,7 @@ from datadog_checks.base.constants import ServiceCheck from datadog_checks.base.errors import ConfigurationError from datadog_checks.base.utils.functions import no_op, return_true +from datadog_checks.base.utils.http_exceptions import HTTPConnectionError class OpenMetricsScraper: @@ -405,11 +406,11 @@ def stream_connection_lines(self): self._content_type = connection.headers.get('Content-Type', '') for line in connection.iter_lines(decode_unicode=True): yield line - except ConnectionError as e: + except (ConnectionError, HTTPConnectionError) as e: if self.ignore_connection_errors: - self.log.warning("OpenMetrics endpoint %s is not accessible", self.endpoint) + self.log.warning("OpenMetrics endpoint %s is not accessible: %s", self.endpoint, e) else: - raise e + raise def filter_connection_lines(self, line_streamer): """ diff --git a/datadog_checks_base/datadog_checks/base/utils/http_httpx.py b/datadog_checks_base/datadog_checks/base/utils/http_httpx.py new file mode 100644 index 0000000000000..a91b7c37acb74 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/http_httpx.py @@ -0,0 +1,424 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from __future__ import annotations + +import logging +from collections.abc import Iterator, Mapping +from datetime import timedelta +from typing import Any + +import httpx2 as httpx +from binary import KIBIBYTE + +from datadog_checks.base.config import is_affirmative + +from .headers import get_default_headers, update_headers +from .http_exceptions import ( + HTTPConnectionError, + HTTPError, + HTTPInvalidURLError, + HTTPRequestError, + HTTPStatusError, + HTTPTimeoutError, +) + +LOGGER = logging.getLogger(__name__) + +DEFAULT_TIMEOUT = 10 +# Matches the effective chunk size used by RequestsWrapper.iter_content (http.py:415 multiplies by KIBIBYTE). +DEFAULT_CHUNK_SIZE = 16 * KIBIBYTE + +STANDARD_FIELDS = { + 'allow_redirects': True, + 'connect_timeout': None, + 'extra_headers': None, + 'headers': None, + 'log_requests': False, + 'password': None, + 'read_timeout': None, + 'timeout': DEFAULT_TIMEOUT, + 'tls_ca_cert': None, + 'tls_cert': None, + 'tls_private_key': None, + 'tls_verify': True, + 'username': None, +} + +DEFAULT_REMAPPED_FIELDS: dict[str, dict[str, Any]] = {} + +REQUEST_KWARGS = frozenset( + { + 'params', + 'json', + 'data', + 'content', + 'files', + 'cookies', + 'headers', + 'extra_headers', + 'timeout', + 'follow_redirects', + } +) + + +def _make_timeout(connect: float, read: float) -> httpx.Timeout: + return httpx.Timeout(connect=connect, read=read, write=None, pool=None) + + +def _build_basic_auth(config: dict[str, Any]) -> httpx.BasicAuth | None: + if config['username'] is not None and config['password'] is not None: + return httpx.BasicAuth(config['username'], config['password']) + return None + + +def _build_verify(config: dict[str, Any]) -> bool | str: + if isinstance(config['tls_ca_cert'], str): + return config['tls_ca_cert'] + if not is_affirmative(config['tls_verify']): + return False + return True + + +def _build_cert(config: dict[str, Any]) -> str | tuple[str, str] | None: + cert = config['tls_cert'] + if not isinstance(cert, str): + return None + private_key = config['tls_private_key'] + if isinstance(private_key, str): + return (cert, private_key) + return cert + + +def _build_timeout(config: dict[str, Any]) -> tuple[float, float]: + base = float(config['timeout']) + connect = float(config['connect_timeout']) if config['connect_timeout'] is not None else base + read = float(config['read_timeout']) if config['read_timeout'] is not None else base + return connect, read + + +def _map_httpx_exception(exc: httpx.HTTPError | httpx.InvalidURL) -> HTTPError: + """Translate an httpx2 exception into the library-agnostic equivalent.""" + # ConnectError -> HTTPConnectionError pairs 1:1 with the Step 3a widening (PR #22864). + # Mid-stream NetworkError/ReadError/WriteError stay HTTPRequestError on purpose. + if isinstance(exc, httpx.InvalidURL): + return HTTPInvalidURLError(str(exc) or exc.__class__.__name__, request=getattr(exc, 'request', None)) + if isinstance(exc, httpx.TimeoutException): + return HTTPTimeoutError(str(exc) or exc.__class__.__name__, request=getattr(exc, 'request', None)) + if isinstance(exc, httpx.ConnectError): + return HTTPConnectionError(str(exc) or exc.__class__.__name__, request=getattr(exc, 'request', None)) + if isinstance(exc, httpx.HTTPStatusError): + return HTTPStatusError( + str(exc) or exc.__class__.__name__, + request=getattr(exc, 'request', None), + response=getattr(exc, 'response', None), + ) + if isinstance(exc, httpx.RequestError): + return HTTPRequestError(str(exc) or exc.__class__.__name__, request=getattr(exc, 'request', None)) + return HTTPError(str(exc) or exc.__class__.__name__) + + +class HTTPXResponseAdapter: + """Wraps an httpx2.Response to satisfy HTTPResponseProtocol.""" + + __slots__ = ('_response',) + + def __init__(self, response: httpx.Response) -> None: + self._response = response + + @property + def status_code(self) -> int: + return self._response.status_code + + @property + def content(self) -> bytes: + return self._response.read() + + @property + def text(self) -> str: + self._response.read() + return self._response.text + + @property + def headers(self) -> Mapping[str, str]: + return self._response.headers + + @property + def ok(self) -> bool: + return self._response.status_code < 400 + + @property + def reason(self) -> str: + return self._response.reason_phrase + + @property + def encoding(self) -> str | None: + return self._response.encoding + + @encoding.setter + def encoding(self, value: str | None) -> None: + self._response.encoding = value + + @property + def url(self) -> str: + return str(self._response.url) + + @property + def cookies(self) -> httpx.Cookies: + return self._response.cookies + + @property + def elapsed(self) -> timedelta: + try: + return self._response.elapsed + except RuntimeError: + LOGGER.debug('elapsed unavailable for response from %s', self._response.url) + return timedelta(0) + + def json(self, **kwargs: Any) -> Any: + self._response.read() + return self._response.json(**kwargs) + + def raise_for_status(self) -> None: + # Mirror requests semantics (4xx/5xx only); httpx2 also raises on 3xx. + if self._response.status_code < 400: + return + try: + self._response.raise_for_status() + except httpx.HTTPStatusError as exc: + raise _map_httpx_exception(exc) from exc + + def close(self) -> None: + self._response.close() + + def iter_content(self, chunk_size: int | None = None, decode_unicode: bool = False) -> Iterator[bytes | str]: + effective_size = chunk_size if chunk_size is not None else DEFAULT_CHUNK_SIZE + encoding = self._response.encoding or 'utf-8' + for chunk in self._response.iter_bytes(chunk_size=effective_size): + yield chunk.decode(encoding) if decode_unicode else chunk + + def iter_lines( + self, + chunk_size: int | None = None, # noqa: ARG002 - httpx2 buffers lines internally; kept for HTTPResponseProtocol parity + decode_unicode: bool = False, + delimiter: bytes | str | None = None, + ) -> Iterator[bytes | str]: + if delimiter is not None: + raise NotImplementedError("HTTPXResponseAdapter.iter_lines does not support custom delimiters") + encoding = self._response.encoding or 'utf-8' + for line in self._response.iter_lines(): + yield line if decode_unicode else line.encode(encoding) + + def __enter__(self) -> 'HTTPXResponseAdapter': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool | None: + self.close() + return None + + +class HTTPXWrapper: + """Implements HTTPClientProtocol using a single shared httpx2.Client per wrapper.""" + + __slots__ = ( + '_client', + '_log_requests', + 'logger', + 'options', + ) + + def __init__( + self, + instance: dict[str, Any], + init_config: dict[str, Any] | None = None, + remapper: dict[str, dict[str, Any]] | None = None, + logger: logging.Logger | None = None, + transport: httpx.BaseTransport | None = None, + ) -> None: + self.logger = logger or LOGGER + init_config = init_config or {} + + config = self._resolve_config(instance, init_config, remapper) + + headers = get_default_headers() + if config['headers']: + headers.clear() + update_headers(headers, config['headers']) + if config['extra_headers']: + update_headers(headers, config['extra_headers']) + + auth = _build_basic_auth(config) + verify = _build_verify(config) + cert = _build_cert(config) + timeout = _build_timeout(config) + allow_redirects = is_affirmative(config['allow_redirects']) + + # proxies=None mirrors RequestsWrapper.options for consumers (e.g. http_check). Wiring is Phase 3. + self.options: dict[str, Any] = { + 'auth': auth, + 'cert': cert, + 'headers': headers, + 'proxies': None, + 'timeout': timeout, + 'verify': verify, + 'allow_redirects': allow_redirects, + } + + self._log_requests = is_affirmative(config['log_requests']) + self._client = self._build_client(transport) + + @staticmethod + def _resolve_config( + instance: dict[str, Any], + init_config: dict[str, Any], + remapper: dict[str, dict[str, Any]] | None, + ) -> dict[str, Any]: + default_fields = dict(STANDARD_FIELDS) + default_fields['log_requests'] = init_config.get('log_requests', default_fields['log_requests']) + default_fields['timeout'] = init_config.get('timeout', default_fields['timeout']) + + config = {field: instance.get(field, value) for field, value in default_fields.items()} + + remapper = dict(remapper) if remapper else {} + remapper.update(DEFAULT_REMAPPED_FIELDS) + + for remapped_field, data in remapper.items(): + field = data.get('name') + if field not in STANDARD_FIELDS: + continue + if field in instance: + continue + + default = default_fields[field] + if data.get('invert'): + default = not default + + value = instance.get(remapped_field, data.get('default', default)) + if data.get('invert'): + value = not is_affirmative(value) + + config[field] = value + return config + + def _build_client(self, transport: httpx.BaseTransport | None) -> httpx.Client: + kwargs: dict[str, Any] = { + 'headers': self.options['headers'], + 'timeout': _make_timeout(self.options['timeout'][0], self.options['timeout'][1]), + 'follow_redirects': self.options['allow_redirects'], + 'verify': self.options['verify'], + } + if self.options['cert'] is not None: + kwargs['cert'] = self.options['cert'] + if self.options['auth'] is not None: + kwargs['auth'] = self.options['auth'] + if transport is not None: + kwargs['transport'] = transport + return httpx.Client(**kwargs) + + def get_header(self, name: str, default: str | None = None) -> str | None: + for key, value in self.options['headers'].items(): + if key.lower() == name.lower(): + return value + return default + + def set_header(self, name: str, value: str) -> None: + # Mirror into both stores: options['headers'] is the public shape, _client.headers is what httpx2 sends. + for key in list(self.options['headers']): + if key.lower() == name.lower(): + self.options['headers'][key] = value + self._client.headers[key] = value + return + self.options['headers'][name] = value + self._client.headers[name] = value + + def get(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('GET', url, options) + + def post(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('POST', url, options) + + def put(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('PUT', url, options) + + def delete(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('DELETE', url, options) + + def head(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('HEAD', url, options) + + def patch(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('PATCH', url, options) + + def options_method(self, url: str, **options: Any) -> HTTPXResponseAdapter: + return self._request('OPTIONS', url, options) + + def _request(self, method: str, url: str, options: dict[str, Any]) -> HTTPXResponseAdapter: + if self._log_requests: + self.logger.debug('Sending %s request to %s', method, url) + + request_kwargs = self._build_request_kwargs(options) + follow_redirects = request_kwargs.pop('follow_redirects', httpx.USE_CLIENT_DEFAULT) + try: + request = self._client.build_request(method, url, **request_kwargs) + response = self._client.send(request, stream=True, follow_redirects=follow_redirects) + except (httpx.HTTPError, httpx.InvalidURL) as exc: + raise _map_httpx_exception(exc) from exc + return HTTPXResponseAdapter(response) + + def _build_request_kwargs(self, options: dict[str, Any]) -> dict[str, Any]: + """Translate call-site options to httpx2.Client.request kwargs.""" + # OM v2 scraper injects stream=True unconditionally (base_scraper.py:459). The wrapper + # always streams internally, so drop the kwarg silently rather than raising on it. + options = {k: v for k, v in options.items() if k != 'stream'} + + unknown = set(options) - REQUEST_KWARGS + if unknown: + raise TypeError(f"HTTPXWrapper does not support per-request kwargs: {sorted(unknown)}") + kwargs: dict[str, Any] = {} + passthrough = ('params', 'json', 'data', 'content', 'files', 'cookies') + for key in passthrough: + if key in options: + kwargs[key] = options[key] + + extra_headers = options.get('extra_headers') + headers = options.get('headers') + merged_headers: dict[str, str] | None = None + if headers is not None or extra_headers is not None: + merged_headers = {} + if headers is not None: + merged_headers.update(headers) + if extra_headers is not None: + merged_headers.update(extra_headers) + if merged_headers is not None: + kwargs['headers'] = merged_headers + + if 'timeout' in options: + timeout_value = options['timeout'] + if isinstance(timeout_value, (tuple, list)) and len(timeout_value) == 2: + kwargs['timeout'] = _make_timeout(float(timeout_value[0]), float(timeout_value[1])) + else: + kwargs['timeout'] = float(timeout_value) # type: ignore[arg-type] + + if 'follow_redirects' in options: + kwargs['follow_redirects'] = bool(options['follow_redirects']) + + return kwargs + + def close(self) -> None: + client = getattr(self, '_client', None) + if client is not None: + client.close() + + def __enter__(self) -> 'HTTPXWrapper': + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool | None: + self.close() + return None + + def __del__(self) -> None: # no cov + try: + self.close() + except AttributeError: + pass diff --git a/datadog_checks_base/pyproject.toml b/datadog_checks_base/pyproject.toml index d52b26f0a731a..7e0d41fba94c6 100644 --- a/datadog_checks_base/pyproject.toml +++ b/datadog_checks_base/pyproject.toml @@ -38,6 +38,7 @@ deps = [ "cachetools==7.0.5", "cryptography==46.0.7", "ddtrace==3.19.5", + "httpx2==2.2.0", "jellyfish==1.2.1", "lazy-loader==0.5", "prometheus-client==0.24.1", diff --git a/datadog_checks_base/tests/base/utils/http_httpx/__init__.py b/datadog_checks_base/tests/base/utils/http_httpx/__init__.py new file mode 100644 index 0000000000000..75c6647cb9233 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/datadog_checks_base/tests/base/utils/http_httpx/common.py b/datadog_checks_base/tests/base/utils/http_httpx/common.py new file mode 100644 index 0000000000000..d63c9b58cb84a --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/common.py @@ -0,0 +1,12 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import base64 + + +def parse_basic_auth(header_value: str) -> tuple[str, str]: + scheme, _, b64 = header_value.partition(' ') + assert scheme.lower() == 'basic' + user_pass = base64.b64decode(b64).decode('utf-8') + user, _, password = user_pass.partition(':') + return user, password diff --git a/datadog_checks_base/tests/base/utils/http_httpx/conftest.py b/datadog_checks_base/tests/base/utils/http_httpx/conftest.py new file mode 100644 index 0000000000000..272e7b1bc1228 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/conftest.py @@ -0,0 +1,46 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from collections.abc import Callable + +import httpx2 as httpx +import pytest + + +@pytest.fixture +def status_transport_factory() -> Callable[[int, bytes | str], httpx.MockTransport]: + def _factory(status_code: int, body: bytes | str = b''): + def handler(_request: httpx.Request) -> httpx.Response: + if isinstance(body, str): + return httpx.Response(status_code, text=body) + return httpx.Response(status_code, content=body) + + return httpx.MockTransport(handler) + + return _factory + + +@pytest.fixture +def raising_transport_factory() -> Callable[[Exception], httpx.MockTransport]: + def _factory(exc: Exception): + def handler(_request: httpx.Request) -> httpx.Response: + raise exc + + return httpx.MockTransport(handler) + + return _factory + + +@pytest.fixture +def captured_requests() -> list[httpx.Request]: + return [] + + +@pytest.fixture +def capturing_transport(captured_requests: list[httpx.Request]) -> httpx.MockTransport: + def handler(request: httpx.Request) -> httpx.Response: + _ = request.content + captured_requests.append(request) + return httpx.Response(200, json={'ok': True}) + + return httpx.MockTransport(handler) diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_auth_basic.py b/datadog_checks_base/tests/base/utils/http_httpx/test_auth_basic.py new file mode 100644 index 0000000000000..23081a10a34b1 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_auth_basic.py @@ -0,0 +1,34 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import pytest + +from datadog_checks.base.utils.http_httpx import HTTPXWrapper + +from .common import parse_basic_auth + + +def test_basic_auth_sent_on_request(capturing_transport, captured_requests): + http = HTTPXWrapper( + {'username': 'alice', 'password': 'secret'}, + {}, + transport=capturing_transport, + ) + http.get('http://example.test/') + user, password = parse_basic_auth(captured_requests[0].headers['authorization']) + assert user == 'alice' + assert password == 'secret' + + +@pytest.mark.parametrize( + 'instance', + [ + pytest.param({}, id='no-credentials'), + pytest.param({'username': 'alice'}, id='username-only'), + pytest.param({'password': 'secret'}, id='password-only'), + ], +) +def test_no_authorization_header_set(instance, capturing_transport, captured_requests): + http = HTTPXWrapper(instance, {}, transport=capturing_transport) + http.get('http://example.test/') + assert 'authorization' not in captured_requests[0].headers diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_config.py b/datadog_checks_base/tests/base/utils/http_httpx/test_config.py new file mode 100644 index 0000000000000..e83e74cfb95e2 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_config.py @@ -0,0 +1,136 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import pytest + +from datadog_checks.base.utils.http_httpx import HTTPXWrapper + + +def test_default_headers_include_user_agent(capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + assert any(key.lower() == 'user-agent' for key in http.options['headers']) + + +def test_extra_headers_merge(capturing_transport, captured_requests): + http = HTTPXWrapper({'extra_headers': {'X-Extra': 'value'}}, {}, transport=capturing_transport) + http.get('http://example.test/') + assert captured_requests[0].headers['x-extra'] == 'value' + + +def test_headers_override_defaults(capturing_transport, captured_requests): + http = HTTPXWrapper({'headers': {'User-Agent': 'custom-agent/1.0'}}, {}, transport=capturing_transport) + http.get('http://example.test/') + assert captured_requests[0].headers['user-agent'] == 'custom-agent/1.0' + + +def test_per_request_headers_merge_into_request(capturing_transport, captured_requests): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + http.get('http://example.test/', headers={'X-Per-Request': 'yes'}) + assert captured_requests[0].headers['x-per-request'] == 'yes' + + +def test_timeout_from_instance(capturing_transport): + http = HTTPXWrapper({'timeout': 25}, {}, transport=capturing_transport) + connect, read = http.options['timeout'] + assert connect == 25.0 + assert read == 25.0 + + +def test_connect_and_read_timeout_split(capturing_transport): + http = HTTPXWrapper({'connect_timeout': 5, 'read_timeout': 30}, {}, transport=capturing_transport) + connect, read = http.options['timeout'] + assert connect == 5.0 + assert read == 30.0 + + +def test_verify_defaults_to_true(capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + assert http.options['verify'] is True + + +def test_verify_false_when_tls_verify_off(capturing_transport): + http = HTTPXWrapper({'tls_verify': False}, {}, transport=capturing_transport) + assert http.options['verify'] is False + + +def test_tls_ca_cert_uses_path(capturing_transport): + http = HTTPXWrapper({'tls_ca_cert': '/etc/ssl/ca.pem'}, {}, transport=capturing_transport) + assert http.options['verify'] == '/etc/ssl/ca.pem' + + +def test_tls_client_cert_string(capturing_transport): + http = HTTPXWrapper({'tls_cert': '/etc/ssl/client.pem'}, {}, transport=capturing_transport) + assert http.options['cert'] == '/etc/ssl/client.pem' + + +def test_tls_client_cert_with_key(capturing_transport): + http = HTTPXWrapper( + {'tls_cert': '/etc/ssl/client.pem', 'tls_private_key': '/etc/ssl/client.key'}, + {}, + transport=capturing_transport, + ) + assert http.options['cert'] == ('/etc/ssl/client.pem', '/etc/ssl/client.key') + + +def test_tls_no_cert_when_not_configured(capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + assert http.options['cert'] is None + + +def test_options_proxies_is_none(capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + assert http.options['proxies'] is None + + +@pytest.mark.parametrize( + 'lookup_name,default,expected', + [ + pytest.param('x-foo', None, 'bar', id='lowercase-lookup'), + pytest.param('X-FOO', None, 'bar', id='uppercase-lookup'), + pytest.param('missing', None, None, id='missing-no-default'), + pytest.param('missing', 'fallback', 'fallback', id='missing-with-default'), + ], +) +def test_get_header(capturing_transport, lookup_name, default, expected): + http = HTTPXWrapper({'extra_headers': {'X-Foo': 'bar'}}, {}, transport=capturing_transport) + assert http.get_header(lookup_name, default=default) == expected + + +def test_set_header_overrides_existing(capturing_transport): + http = HTTPXWrapper({'extra_headers': {'X-Foo': 'bar'}}, {}, transport=capturing_transport) + http.set_header('X-FOO', 'new') + assert http.get_header('x-foo') == 'new' + + +def test_set_header_propagates_to_outgoing_request(capturing_transport, captured_requests): + http = HTTPXWrapper({'extra_headers': {'X-Foo': 'bar'}}, {}, transport=capturing_transport) + http.set_header('X-FOO', 'updated') + http.get('http://example.test/') + assert captured_requests[0].headers['x-foo'] == 'updated' + + +def test_set_header_adds_new_header(capturing_transport, captured_requests): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + http.set_header('X-New', 'value') + assert http.get_header('x-new') == 'value' + http.get('http://example.test/') + assert captured_requests[0].headers['x-new'] == 'value' + + +def test_remapper_renames_field(capturing_transport): + remapper = {'ssl_validation': {'name': 'tls_verify'}} + http = HTTPXWrapper({'ssl_validation': False}, {}, remapper=remapper, transport=capturing_transport) + assert http.options['verify'] is False + + +@pytest.mark.parametrize( + 'kwarg,value', + [ + pytest.param('proxies', {'http': 'http://proxy:8080'}, id='proxies'), + pytest.param('allow_redirects', False, id='allow-redirects-uses-httpx-name'), + ], +) +def test_request_rejects_unknown_kwarg(capturing_transport, kwarg, value): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + with pytest.raises(TypeError, match=kwarg): + http.get('http://example.test/', **{kwarg: value}) diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_exceptions.py b/datadog_checks_base/tests/base/utils/http_httpx/test_exceptions.py new file mode 100644 index 0000000000000..7ac2ca585ba57 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_exceptions.py @@ -0,0 +1,44 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import httpx2 as httpx +import pytest + +from datadog_checks.base.utils.http_exceptions import ( + HTTPConnectionError, + HTTPInvalidURLError, + HTTPRequestError, + HTTPTimeoutError, +) +from datadog_checks.base.utils.http_httpx import HTTPXWrapper, _map_httpx_exception + + +@pytest.mark.parametrize( + 'raised,expected', + [ + pytest.param(httpx.ConnectTimeout('boom'), HTTPTimeoutError, id='connect-timeout'), + pytest.param(httpx.ReadTimeout('slow'), HTTPTimeoutError, id='read-timeout'), + pytest.param(httpx.PoolTimeout('pool'), HTTPTimeoutError, id='pool-timeout'), + pytest.param(httpx.ConnectError('refused'), HTTPConnectionError, id='connect-error'), + pytest.param(httpx.ReadError('mid-stream'), HTTPRequestError, id='read-error'), + pytest.param(httpx.LocalProtocolError('bad'), HTTPRequestError, id='local-protocol-error'), + pytest.param(httpx.RequestError('generic'), HTTPRequestError, id='request-error'), + ], +) +def test_request_exception_mapping(raising_transport_factory, raised, expected): + transport = raising_transport_factory(raised) + http = HTTPXWrapper({}, {}, transport=transport) + with pytest.raises(expected): + http.get('http://example.test/') + + +def test_map_httpx_exception_routes_invalid_url(): + mapped = _map_httpx_exception(httpx.InvalidURL('bad url')) + assert isinstance(mapped, HTTPInvalidURLError) + + +def test_request_raises_invalid_url_error(raising_transport_factory): + transport = raising_transport_factory(httpx.InvalidURL('bad url')) + http = HTTPXWrapper({}, {}, transport=transport) + with pytest.raises(HTTPInvalidURLError): + http.get('http://example.test/') diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_lifecycle.py b/datadog_checks_base/tests/base/utils/http_httpx/test_lifecycle.py new file mode 100644 index 0000000000000..db9b5ba319a06 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_lifecycle.py @@ -0,0 +1,47 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import sys + +import pytest + +from datadog_checks.base.utils.http_httpx import HTTPXWrapper + + +def test_close_is_idempotent(capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + http.close() + http.close() + + +def test_context_manager(capturing_transport): + with HTTPXWrapper({}, {}, transport=capturing_transport) as http: + response = http.get('http://example.test/') + assert response.status_code == 200 + + +def test_module_import_fails_without_httpx2(monkeypatch): + monkeypatch.setitem(sys.modules, 'httpx2', None) + monkeypatch.delitem(sys.modules, 'datadog_checks.base.utils.http_httpx', raising=False) + with pytest.raises(ImportError): + import datadog_checks.base.utils.http_httpx # noqa: F401 + + +@pytest.mark.parametrize( + 'instance,expected_cls_name', + [ + pytest.param({'use_httpx': True}, 'HTTPXWrapper', id='opt-in'), + pytest.param({'use_httpx': False}, 'RequestsWrapper', id='explicit-default'), + pytest.param({}, 'RequestsWrapper', id='unset-default'), + ], +) +def test_agentcheck_http_dispatch(instance, expected_cls_name): + from datadog_checks.base import AgentCheck + + check = AgentCheck('test', {}, [instance]) + try: + assert type(check.http).__name__ == expected_cls_name + finally: + close = getattr(check.http, 'close', None) + if close is not None: + close() diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_methods.py b/datadog_checks_base/tests/base/utils/http_httpx/test_methods.py new file mode 100644 index 0000000000000..55da483fc74ff --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_methods.py @@ -0,0 +1,42 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import json + +import pytest + +from datadog_checks.base.utils.http_httpx import HTTPXWrapper + +HTTP_VERBS = { + 'get': 'GET', + 'post': 'POST', + 'put': 'PUT', + 'delete': 'DELETE', + 'head': 'HEAD', + 'patch': 'PATCH', + 'options_method': 'OPTIONS', +} + + +@pytest.mark.parametrize('method,verb', HTTP_VERBS.items()) +def test_method_dispatches_with_correct_verb(method, verb, captured_requests, capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + fn = getattr(http, method) + response = fn('http://example.test/path', headers={'X-Test': '1'}) + + assert response.status_code == 200 + assert captured_requests[0].method == verb + + +def test_post_json_body_is_serialized(capturing_transport, captured_requests): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + http.post('http://example.test/path', json={'a': 1, 'b': 'two'}) + req = captured_requests[0] + assert req.headers['content-type'] == 'application/json' + assert json.loads(req.content) == {'a': 1, 'b': 'two'} + + +def test_request_accepts_stream_kwarg(capturing_transport): + http = HTTPXWrapper({}, {}, transport=capturing_transport) + response = http.get('http://example.test/', stream=True) + assert response.status_code == 200 diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_response.py b/datadog_checks_base/tests/base/utils/http_httpx/test_response.py new file mode 100644 index 0000000000000..d3e7700e46670 --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_response.py @@ -0,0 +1,198 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import logging +from datetime import timedelta + +import httpx2 as httpx +import pytest + +from datadog_checks.base.utils.http_exceptions import HTTPStatusError +from datadog_checks.base.utils.http_httpx import DEFAULT_CHUNK_SIZE, HTTPXResponseAdapter, HTTPXWrapper + + +@pytest.mark.parametrize('status_code', [404, 500]) +def test_response_raise_for_status_raises_on_error_codes(status_transport_factory, status_code): + transport = status_transport_factory(status_code, b'') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + with pytest.raises(HTTPStatusError): + response.raise_for_status() + + +def test_response_iter_content_bytes(status_transport_factory): + transport = status_transport_factory(200, b'abcdef') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + chunks = list(response.iter_content(chunk_size=2)) + assert b''.join(chunks) == b'abcdef' + + +def test_response_iter_content_decode_unicode(status_transport_factory): + transport = status_transport_factory(200, b'abcdef') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + chunks = list(response.iter_content(chunk_size=3, decode_unicode=True)) + assert ''.join(chunks) == 'abcdef' + + +@pytest.mark.parametrize( + 'decode_unicode,expected', + [ + pytest.param(False, [b'a', b'b', b'c'], id='bytes'), + pytest.param(True, ['a', 'b', 'c'], id='decoded-unicode'), + ], +) +def test_response_iter_lines(status_transport_factory, decode_unicode, expected): + transport = status_transport_factory(200, b'a\nb\nc') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert list(response.iter_lines(decode_unicode=decode_unicode)) == expected + + +def test_response_iter_content_empty_body_yields_nothing(status_transport_factory): + transport = status_transport_factory(200, b'') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert list(response.iter_content()) == [] + + +def test_response_iter_content_default_chunk_size_uses_default(status_transport_factory): + body = b'X' * (DEFAULT_CHUNK_SIZE * 3 + 5) + transport = status_transport_factory(200, body) + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + chunks = list(response.iter_content()) + assert b''.join(chunks) == body + assert all(len(chunk) <= DEFAULT_CHUNK_SIZE for chunk in chunks) + assert any(len(chunk) == DEFAULT_CHUNK_SIZE for chunk in chunks) + + +@pytest.mark.parametrize( + 'charset,raw,expected', + [ + pytest.param('utf-8', 'café'.encode('utf-8'), 'café', id='utf-8'), + pytest.param('iso-8859-1', 'café'.encode('iso-8859-1'), 'café', id='iso-8859-1'), + ], +) +def test_response_iter_content_decode_uses_response_encoding(charset, raw, expected): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, content=raw, headers={'Content-Type': f'text/plain; charset={charset}'}) + + http = HTTPXWrapper({}, {}, transport=httpx.MockTransport(handler)) + response = http.get('http://example.test/') + chunks = list(response.iter_content(chunk_size=64, decode_unicode=True)) + assert ''.join(chunks) == expected + + +@pytest.mark.parametrize( + 'charset,line', + [ + pytest.param('utf-8', 'café', id='utf-8'), + pytest.param('iso-8859-1', 'café', id='iso-8859-1'), + ], +) +def test_response_iter_lines_decode_uses_response_encoding(charset, line): + raw = (line + '\n' + line).encode(charset) + + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, content=raw, headers={'Content-Type': f'text/plain; charset={charset}'}) + + http = HTTPXWrapper({}, {}, transport=httpx.MockTransport(handler)) + response = http.get('http://example.test/') + encoded_lines = list(response.iter_lines(decode_unicode=False)) + assert encoded_lines == [line.encode(charset), line.encode(charset)] + + +def test_response_iter_lines_rejects_delimiter(status_transport_factory): + transport = status_transport_factory(200, b'a\nb\n') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + with pytest.raises(NotImplementedError): + list(response.iter_lines(delimiter=b'|')) + + +def test_response_elapsed_returns_zero_on_runtime_error(caplog): + class _FakeResponse: + url = 'http://example.test/' + + @property + def elapsed(self): + raise RuntimeError('not measured') + + adapter = HTTPXResponseAdapter(_FakeResponse()) # type: ignore[arg-type] + with caplog.at_level(logging.DEBUG, logger='datadog_checks.base.utils.http_httpx'): + assert adapter.elapsed == timedelta(0) + assert any('elapsed unavailable' in record.message for record in caplog.records) + + +@pytest.mark.parametrize('status_code,expected_ok', [(200, True), (204, True), (301, True), (400, False), (500, False)]) +def test_response_ok_property(status_transport_factory, status_code, expected_ok): + transport = status_transport_factory(status_code, b'') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert response.ok is expected_ok + + +def test_response_reason_from_httpx_response(status_transport_factory): + transport = status_transport_factory(200, b'') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert response.reason == 'OK' + + +def test_response_text_decodes_body(status_transport_factory): + transport = status_transport_factory(200, b'hello') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert response.text == 'hello' + + +def test_response_json_returns_decoded_object(): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, json={'a': 1}) + + http = HTTPXWrapper({}, {}, transport=httpx.MockTransport(handler)) + response = http.get('http://example.test/') + assert response.json() == {'a': 1} + + +def test_response_content_returns_raw_bytes(status_transport_factory): + transport = status_transport_factory(200, b'\x00\x01\x02') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert response.content == b'\x00\x01\x02' + + +def test_response_headers_exposed(): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, headers={'X-Custom': 'value'}, content=b'') + + http = HTTPXWrapper({}, {}, transport=httpx.MockTransport(handler)) + response = http.get('http://example.test/') + assert response.headers['X-Custom'] == 'value' + + +def test_response_url_reflects_request_url(status_transport_factory): + transport = status_transport_factory(200, b'') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/path') + assert str(response.url) == 'http://example.test/path' + + +def test_response_encoding_property_exposed(status_transport_factory): + transport = status_transport_factory(200, b'hello') + http = HTTPXWrapper({}, {}, transport=transport) + response = http.get('http://example.test/') + assert hasattr(response, 'encoding') + encoding = response.encoding + assert encoding is None or isinstance(encoding, str) + + +def test_response_cookies_exposed(): + def handler(_request: httpx.Request) -> httpx.Response: + return httpx.Response(200, headers={'Set-Cookie': 'session=abc123'}, content=b'') + + http = HTTPXWrapper({}, {}, transport=httpx.MockTransport(handler)) + response = http.get('http://example.test/') + assert response.cookies['session'] == 'abc123' diff --git a/datadog_checks_base/tests/base/utils/http_httpx/test_stream_connection_lines.py b/datadog_checks_base/tests/base/utils/http_httpx/test_stream_connection_lines.py new file mode 100644 index 0000000000000..fb27824db248c --- /dev/null +++ b/datadog_checks_base/tests/base/utils/http_httpx/test_stream_connection_lines.py @@ -0,0 +1,53 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import logging +from unittest import mock + +import pytest +from requests.exceptions import ConnectionError as RequestsConnectionError + +from datadog_checks.base.checks.openmetrics.v2.scraper.base_scraper import OpenMetricsScraper +from datadog_checks.base.utils.http_exceptions import HTTPConnectionError + + +def _scraper_with_connection_error(exception, *, ignore_connection_errors): + scraper = OpenMetricsScraper.__new__(OpenMetricsScraper) + scraper.endpoint = 'http://example.test/metrics' + scraper.ignore_connection_errors = ignore_connection_errors + scraper.log = logging.getLogger('test_stream_connection_lines') + scraper.get_connection = mock.Mock(side_effect=exception) + return scraper + + +def test_connection_error_warning_path(caplog): + scraper = _scraper_with_connection_error( + HTTPConnectionError('refused'), + ignore_connection_errors=True, + ) + with caplog.at_level(logging.WARNING, logger='test_stream_connection_lines'): + assert list(scraper.stream_connection_lines()) == [] + assert any( + 'OpenMetrics endpoint http://example.test/metrics is not accessible' in record.message + for record in caplog.records + ) + assert any('refused' in record.message for record in caplog.records) + + +def test_connection_error_reraises_when_not_ignored(): + scraper = _scraper_with_connection_error( + HTTPConnectionError('refused'), + ignore_connection_errors=False, + ) + with pytest.raises(HTTPConnectionError): + list(scraper.stream_connection_lines()) + + +def test_requests_connection_error_still_handled(caplog): + scraper = _scraper_with_connection_error( + RequestsConnectionError('boom'), + ignore_connection_errors=True, + ) + with caplog.at_level(logging.WARNING, logger='test_stream_connection_lines'): + assert list(scraper.stream_connection_lines()) == [] + assert any('boom' in record.message for record in caplog.records)