diff --git a/CHANGES/lightwell-network-feature-guard.bugfix b/CHANGES/lightwell-network-feature-guard.bugfix new file mode 100644 index 00000000..78548c70 --- /dev/null +++ b/CHANGES/lightwell-network-feature-guard.bugfix @@ -0,0 +1 @@ +Fixed ``DomainBasedPermission`` allowing any authenticated user to read the ``lightwell`` domain's PyPI views (simple API, package metadata) without a subscription check. Reading these views now requires either a ``DomainOrg`` association with the domain or the ``lightwell-network`` feature entitlement (checked against the Features Service, using the same cache as ``FeatureContentGuard``); ``public-`` prefixed domains, other domains' PyPI views, and write operations are unaffected. diff --git a/pulp_service/pulp_service/app/authorization.py b/pulp_service/pulp_service/app/authorization.py index f8ae8319..c29a17f5 100644 --- a/pulp_service/pulp_service/app/authorization.py +++ b/pulp_service/pulp_service/app/authorization.py @@ -11,7 +11,7 @@ from pulpcore.plugin.models import Domain from pulpcore.plugin.util import extract_pk, get_domain_pk -from pulp_service.app.models import DomainOrg +from pulp_service.app.models import DomainOrg, FeatureContentGuard _logger = logging.getLogger(__name__) org_id_var = ContextVar("org_id") @@ -20,6 +20,14 @@ user_id_var = ContextVar("user_id") group_var = ContextVar("group") +# The domain whose PyPI views require the lightwell-network feature entitlement (see +# has_permission()). Other domains' PyPI views keep the pre-existing "any SAFE_METHOD +# request is allowed" behavior. +LIGHTWELL_DOMAIN_NAME = "lightwell" +# Feature entitlement required to read the lightwell domain's PyPI views (simple API, etc.) +# for orgs that don't have a DomainOrg association with the domain. See has_permission(). +LIGHTWELL_NETWORK_FEATURE = "lightwell-network" + class DomainBasedPermission(BasePermission): """ @@ -41,23 +49,76 @@ def _has_domain_access(self, domain_pk, org_id, user): return DomainOrg.objects.filter(query).exists() - def has_permission(self, request, view): + def _has_pypi_read_access(self, request, domain): + """ + Checks SAFE_METHOD access to the lightwell domain's PyPI views. + + Users with a DomainOrg association bypass the feature check entirely (existing + permission model). Everyone else -- including unauthenticated users -- must belong + to an org that has the lightwell-network feature entitlement. + """ + user = request.user + domain_pk = domain.pk if domain is not None else get_domain_pk() + + decoded_header_content = self.get_decoded_identity_header(request) + org_id = self.get_org_id(decoded_header_content) + + if user.is_authenticated and self._has_domain_access(domain_pk, org_id, user): + return True + + if org_id is None: + return False + + return self._has_lightwell_network_feature(org_id) + + def _has_lightwell_network_feature(self, org_id): + """ + Checks (with caching) whether `org_id` has the lightwell-network feature, via the + same cache/lookup mechanism as FeatureContentGuard. + """ + guard = FeatureContentGuard(features=[LIGHTWELL_NETWORK_FEATURE], pulp_domain=None) + try: + return guard.check_feature(org_id) + except PermissionError: + return False + + def _check_pypi_safe_method_access(self, request, view, domain): + """ + Returns True/False for a SAFE_METHOD request to a PyPI `view`, or None if `view` + isn't a PyPI view (the caller should then fall through to the standard + DomainOrg-based checks below). + + Only the lightwell domain's PyPI views require a DomainOrg association or the + lightwell-network feature entitlement. Other domains' PyPI views keep the + pre-existing "any SAFE_METHOD request is allowed" behavior; non-PyPI endpoints + (Maven, repository listing, Pulp REST API, ...) never hit this method. + """ + from pulp_python.app.pypi.views import PyPIMixin + + if not isinstance(view, PyPIMixin): + return None + + if domain and domain.name == LIGHTWELL_DOMAIN_NAME: + return self._has_pypi_read_access(request, domain) + return True + + def has_permission(self, request, view): # noqa: PLR0911 # Admins have all permissions if request.user.is_superuser: return True user = request.user - # Allow safe requests on PyPI views and public domains for all users + # Allow safe requests on public domains for all users, authenticated or not if request.method in SAFE_METHODS: - from pulp_python.app.pypi.views import PyPIMixin - - if isinstance(view, PyPIMixin): - return True domain = getattr(request, "pulp_domain", None) - if domain and "public-" in domain.name: + if domain and domain.name.startswith("public-"): return True + pypi_access = self._check_pypi_safe_method_access(request, view, domain) + if pypi_access is not None: + return pypi_access + if not user.is_authenticated: return False diff --git a/pulp_service/pulp_service/app/models.py b/pulp_service/pulp_service/app/models.py index 5a02372f..dc13b32c 100755 --- a/pulp_service/pulp_service/app/models.py +++ b/pulp_service/pulp_service/app/models.py @@ -164,6 +164,30 @@ def _set_cached_result(feature_cache, key, allowed): entry = {"allowed": allowed, "expires_at": time.time() + feature_cache.default_expires_ttl} feature_cache.set(key, json.dumps(entry), expires=feature_cache.default_expires_ttl) + def check_feature(self, account_id): + """ + Returns whether `account_id` has all of `self.features`, per the Features Service. + + Reuses the same `FeatureContentGuardCache` cache key scheme as `permit()` so that + callers checking the same (account_id, features) pair -- e.g. `DomainBasedPermission` + checking the `lightwell-network` feature -- share cache entries with this guard and + avoid redundant Features Service calls. May raise `PermissionError` if the Features + Service call fails (see `_check_for_feature`). + """ + cache_key = f"{account_id}-{','.join(self.features)}" + cache_key_digest = sha256(bytes(cache_key, "utf8")).hexdigest() + feature_cache = FeatureContentGuardCache() + account_allowed = self._get_cached_result(feature_cache, cache_key_digest) + + if account_allowed is None: + _logger.debug("Feature cache MISS for key %s", cache_key_digest) + account_allowed = self._check_for_feature(account_id) + self._set_cached_result(feature_cache, cache_key_digest, account_allowed) + else: + _logger.debug("Feature cache HIT for key %s", cache_key_digest) + + return account_allowed + def permit(self, request): try: header_content = request.headers[self.header_name] @@ -190,17 +214,7 @@ def permit(self, request): _logger.exception("Access not allowed - Invalid JSON or Path not found.") raise PermissionError(_("Access denied.")) from exc - cache_key = f"{header_value}-{','.join(self.features)}" - cache_key_digest = sha256(bytes(cache_key, "utf8")).hexdigest() - feature_cache = FeatureContentGuardCache() - account_allowed = self._get_cached_result(feature_cache, cache_key_digest) - - if account_allowed is None: - _logger.debug("Feature cache MISS for key %s", cache_key_digest) - account_allowed = self._check_for_feature(header_value) - self._set_cached_result(feature_cache, cache_key_digest, account_allowed) - else: - _logger.debug("Feature cache HIT for key %s", cache_key_digest) + account_allowed = self.check_feature(header_value) if not account_allowed: _logger.warning("Access not allowed - Features not available for the user.") diff --git a/pulp_service/pulp_service/tests/functional/constants.py b/pulp_service/pulp_service/tests/functional/constants.py index 746f5f51..a4202f40 100644 --- a/pulp_service/pulp_service/tests/functional/constants.py +++ b/pulp_service/pulp_service/tests/functional/constants.py @@ -38,6 +38,13 @@ CONTENT_GUARD_FEATURES_NOT_SUBSCRIBED = ["rhods"] CONTENT_GUARD_FILTER = ".identity.org_id" +# LIGHTWELL-NETWORK FEATURE CONSTANTS +# Used to test the lightwell-network feature check enforced by DomainBasedPermission on the +# "lightwell" domain's PyPI views. These are real staging Features Service accounts. +LIGHTWELL_NETWORK_FEATURE = "lightwell-network" +LIGHTWELL_ENTITLED_ORG_ID = "20368420" # has the lightwell-network feature +LIGHTWELL_NOT_ENTITLED_ORG_ID = "1979710" # does not have the lightwell-network feature + # VULNERABILITY REPORT CONSTANTS # NPM NPM_REMOTE_REGISTRY = "https://registry.npmjs.org/" diff --git a/pulp_service/pulp_service/tests/functional/test_authentication.py b/pulp_service/pulp_service/tests/functional/test_authentication.py index 35ab3c6f..e3e3bee2 100644 --- a/pulp_service/pulp_service/tests/functional/test_authentication.py +++ b/pulp_service/pulp_service/tests/functional/test_authentication.py @@ -120,7 +120,10 @@ def test_get_requests_without_auth_to_simple_api( python_bindings, gen_object_with_cleanup, ): - """Test that all domains allow GET requests without authentication but block other methods.""" + """Test that all domains (other than "lightwell") allow GET requests without + authentication but block other methods. The "lightwell" domain is excluded from this + behavior -- see test_lightwell_feature_permission.py. + """ # Create a user with credentials to set up the domain setup_user = { "identity": {"org_id": 33333, "internal": {"org_id": 33333}, "user": {"username": "publicdomainuser"}} diff --git a/pulp_service/pulp_service/tests/functional/test_lightwell_feature_permission.py b/pulp_service/pulp_service/tests/functional/test_lightwell_feature_permission.py new file mode 100644 index 00000000..b445f4bd --- /dev/null +++ b/pulp_service/pulp_service/tests/functional/test_lightwell_feature_permission.py @@ -0,0 +1,200 @@ +""" +Functional tests for the lightwell-network feature check enforced by DomainBasedPermission, +scoped specifically to the "lightwell" domain's PyPI views (simple API, package metadata, +etc.). Other domains' PyPI views, and all non-PyPI endpoints (even within the lightwell +domain), keep the pre-existing permission model unaffected by this feature check. + +These follow the pattern used in test_feature_service.py: they exercise the real Features +Service (no mocking) using known staging accounts. Org LIGHTWELL_ENTITLED_ORG_ID has the +lightwell-network feature; org LIGHTWELL_NOT_ENTITLED_ORG_ID does not. + +NOTE: the feature check is keyed off the literal domain name "lightwell" (see +pulp_service.app.authorization.LIGHTWELL_DOMAIN_NAME), so the domain created here can't use +a random per-test suffix like most other functional tests in this suite. These tests assume +they run against an ephemeral Pulp instance where no "lightwell" domain already exists. +""" + +import json +from base64 import b64encode +from urllib.parse import urljoin +from uuid import uuid4 + +import pytest +import requests + +from pulp_service.tests.functional.constants import ( + LIGHTWELL_ENTITLED_ORG_ID, + LIGHTWELL_NOT_ENTITLED_ORG_ID, +) + +LIGHTWELL_DOMAIN_NAME = "lightwell" + +# An org with no DomainOrg association with the test domains and no lightwell-network +# feature entitlement; only used to own the test domain/repo/distribution. +DOMAIN_OWNER_ORG_ID = "555555555" + + +def _identity_header(org_id, username): + identity = { + "identity": { + "org_id": org_id, + "internal": {"org_id": org_id}, + "user": {"username": username}, + } + } + return b64encode(json.dumps(identity).encode()).decode() + + +@pytest.fixture +def configure_pypi_distribution( + anonymous_user, + gen_object_with_cleanup, + pulpcore_bindings, + python_bindings, + bindings_cfg, +): + """ + Creates a domain owned by DOMAIN_OWNER_ORG_ID, with a Python repository and a PyPI + distribution. + + Returns a (domain_name, pypi_simple_url, repos_url, owner_header) tuple. + """ + owner_header = _identity_header(DOMAIN_OWNER_ORG_ID, "lightwell-test-owner") + + def _configure(domain_name): + with anonymous_user: + pulpcore_bindings.DomainsApi.api_client.default_headers["x-rh-identity"] = owner_header + + gen_object_with_cleanup( + pulpcore_bindings.DomainsApi, + { + "name": domain_name, + "storage_class": "pulpcore.app.models.storage.FileSystem", + "storage_settings": {"MEDIA_ROOT": "/var/lib/pulp/media/"}, + }, + ) + + python_bindings.RepositoriesPythonApi.api_client.default_headers["x-rh-identity"] = owner_header + repo = gen_object_with_cleanup( + python_bindings.RepositoriesPythonApi, {"name": str(uuid4())}, pulp_domain=domain_name + ) + + python_bindings.DistributionsPypiApi.api_client.default_headers["x-rh-identity"] = owner_header + base_path = str(uuid4()) + gen_object_with_cleanup( + python_bindings.DistributionsPypiApi, + {"name": str(uuid4()), "base_path": base_path, "repository": repo.pulp_href}, + pulp_domain=domain_name, + ) + + pypi_url = urljoin(bindings_cfg.host, f"/api/pypi/{domain_name}/{base_path}/simple/") + repos_url = urljoin(bindings_cfg.host, f"/api/pulp/{domain_name}/api/v3/repositories/python/python/") + return domain_name, pypi_url, repos_url, owner_header + + yield _configure + + pulpcore_bindings.DomainsApi.api_client.default_headers.pop("x-rh-identity", None) + python_bindings.RepositoriesPythonApi.api_client.default_headers.pop("x-rh-identity", None) + python_bindings.DistributionsPypiApi.api_client.default_headers.pop("x-rh-identity", None) + + +@pytest.fixture +def configure_lightwell_pypi_distribution(configure_pypi_distribution): + """Same as configure_pypi_distribution, but uses the literal "lightwell" domain name that + DomainBasedPermission gates behind the lightwell-network feature.""" + + def _configure(): + return configure_pypi_distribution(LIGHTWELL_DOMAIN_NAME) + + return _configure + + +def test_org_without_feature_denied_on_lightwell_pypi_simple_api(configure_lightwell_pypi_distribution): + """A user whose org doesn't have the lightwell-network feature and has no DomainOrg + association gets 403 on the lightwell domain's PyPI simple API.""" + _, pypi_url, _, _ = configure_lightwell_pypi_distribution() + headers = {"x-rh-identity": _identity_header(LIGHTWELL_NOT_ENTITLED_ORG_ID, "not-entitled-user")} + + response = requests.get(pypi_url, headers=headers, timeout=30) + + assert response.status_code == 403 + + +def test_org_with_feature_allowed_on_lightwell_pypi_simple_api(configure_lightwell_pypi_distribution): + """A user whose org has the lightwell-network feature can read the lightwell domain's + PyPI simple API, even without a DomainOrg association.""" + _, pypi_url, _, _ = configure_lightwell_pypi_distribution() + headers = {"x-rh-identity": _identity_header(LIGHTWELL_ENTITLED_ORG_ID, "entitled-user")} + + response = requests.get(pypi_url, headers=headers, timeout=30) + + assert response.status_code == 200 + + +def test_domain_org_association_bypasses_feature_check(configure_lightwell_pypi_distribution): + """The domain owner (has a DomainOrg association) can read the lightwell domain's PyPI + simple API regardless of the lightwell-network feature.""" + _, pypi_url, _, owner_header = configure_lightwell_pypi_distribution() + headers = {"x-rh-identity": owner_header} + + response = requests.get(pypi_url, headers=headers, timeout=30) + + assert response.status_code == 200 + + +def test_unauthenticated_denied_on_lightwell_pypi_simple_api(configure_lightwell_pypi_distribution): + """Without any identity at all (no org_id to check a feature for), the lightwell domain's + PyPI simple API must not be readable.""" + _, pypi_url, _, _ = configure_lightwell_pypi_distribution() + + response = requests.get(pypi_url, timeout=30) + + assert response.status_code in (401, 403) + + +def test_write_operations_unaffected_by_feature_check(configure_lightwell_pypi_distribution): + """The lightwell-network feature only grants read access -- an entitled org with no + DomainOrg association must still be denied write access to the lightwell domain's PyPI + views.""" + _, pypi_url, _, _ = configure_lightwell_pypi_distribution() + headers = {"x-rh-identity": _identity_header(LIGHTWELL_ENTITLED_ORG_ID, "entitled-write-user")} + + response = requests.post(pypi_url, headers=headers, data={}, timeout=30) + + assert response.status_code in (401, 403) + + +def test_non_pypi_endpoints_unaffected_by_feature_check(configure_lightwell_pypi_distribution): + """Non-PyPI endpoints (here, the Pulp REST API's repository listing) must keep using the + existing DomainOrg-based permission model, even within the lightwell domain: the + lightwell-network feature grants no access to them.""" + _, _, repos_url, _ = configure_lightwell_pypi_distribution() + headers = {"x-rh-identity": _identity_header(LIGHTWELL_ENTITLED_ORG_ID, "entitled-rest-user")} + + response = requests.get(repos_url, headers=headers, timeout=30) + + assert response.status_code == 403 + + +def test_other_domains_pypi_simple_api_unaffected_by_feature_check(configure_pypi_distribution): + """Domains other than "lightwell" must keep the pre-existing behavior: any SAFE_METHOD + request -- even unauthenticated -- can read the PyPI simple API, regardless of the + lightwell-network feature.""" + domain_name = f"not-lightwell-{uuid4()}" + _, pypi_url, _, _ = configure_pypi_distribution(domain_name) + + response = requests.get(pypi_url, timeout=30) + + assert response.status_code == 200 + + +def test_public_domain_allows_unauthenticated_pypi_access(configure_pypi_distribution): + """A public- domain's PyPI simple API stays open to unauthenticated SAFE_METHOD + requests -- unaffected by the lightwell-network feature check (which never applies to + public- domains, or to non-"lightwell" domains in general).""" + domain_name = f"public-{uuid4()}" + _, pypi_url, _, _ = configure_pypi_distribution(domain_name) + + response = requests.get(pypi_url, timeout=30) + + assert response.status_code == 200 diff --git a/pulp_service/pulp_service/tests/unit/test_domain_based_permission.py b/pulp_service/pulp_service/tests/unit/test_domain_based_permission.py index 7e4fe1c6..4ca4c9f5 100644 --- a/pulp_service/pulp_service/tests/unit/test_domain_based_permission.py +++ b/pulp_service/pulp_service/tests/unit/test_domain_based_permission.py @@ -2,17 +2,26 @@ Unit tests for DomainBasedPermission.has_permission(). These tests mock Django ORM and Pulp internals so they can run without -a live Pulp stack. They cover the safe-method bypass for PyPI views -and public domains, for both anonymous and authenticated users. +a live Pulp stack. They cover the safe-method bypass for public domains and +for PyPI views on domains other than "lightwell", the lightwell-network +feature check specifically on the lightwell domain's PyPI views, the +DomainOrg bypass of that feature check, and unsafe-method handling. """ +import json +from base64 import b64encode from types import SimpleNamespace from unittest.mock import MagicMock, patch from pulp_service.app.authorization import DomainBasedPermission -def _make_request(method="GET", user=None, domain=None, view_name="pypi-metadata"): +def _encode_identity_header(org_id): + identity = {"identity": {"internal": {"org_id": org_id}}} + return b64encode(json.dumps(identity).encode()).decode() + + +def _make_request(method="GET", user=None, domain=None, view_name="pypi-metadata", org_id=None): """Build a mock DRF request.""" request = MagicMock() request.method = method @@ -21,6 +30,8 @@ def _make_request(method="GET", user=None, domain=None, view_name="pypi-metadata request.resolver_match = MagicMock() request.resolver_match.view_name = view_name request.META = {"REQUEST_METHOD": method, "PATH_INFO": "/api/pypi/test/main/simple/"} + if org_id is not None: + request.META["HTTP_X_RH_IDENTITY"] = _encode_identity_header(org_id) return request @@ -61,32 +72,37 @@ def _make_regular_view(): class TestSafeMethodBypass: - """Verify that safe methods on PyPI views and public domains are allowed for all users.""" + """Verify that safe methods on public domains are allowed for all users, and PyPI views + on public domains bypass the lightwell-network feature check entirely.""" - def test_anonymous_get_pypi_view_allowed(self): + def test_anonymous_get_pypi_view_public_domain_allowed(self): permission = DomainBasedPermission() - request = _make_request(method="GET", user=_make_anonymous_user()) + domain = _make_domain("public-trusted-libraries") + request = _make_request(method="GET", user=_make_anonymous_user(), domain=domain) view = _make_pypi_view() assert permission.has_permission(request, view) is True - def test_authenticated_get_pypi_view_allowed(self): + def test_authenticated_get_pypi_view_public_domain_allowed(self): permission = DomainBasedPermission() - request = _make_request(method="GET", user=_make_authenticated_user()) + domain = _make_domain("public-trusted-libraries") + request = _make_request(method="GET", user=_make_authenticated_user(), domain=domain) view = _make_pypi_view() assert permission.has_permission(request, view) is True - def test_anonymous_head_pypi_view_allowed(self): + def test_anonymous_head_pypi_view_public_domain_allowed(self): permission = DomainBasedPermission() - request = _make_request(method="HEAD", user=_make_anonymous_user()) + domain = _make_domain("public-trusted-libraries") + request = _make_request(method="HEAD", user=_make_anonymous_user(), domain=domain) view = _make_pypi_view() assert permission.has_permission(request, view) is True - def test_authenticated_head_pypi_view_allowed(self): + def test_authenticated_head_pypi_view_public_domain_allowed(self): permission = DomainBasedPermission() - request = _make_request(method="HEAD", user=_make_authenticated_user()) + domain = _make_domain("public-trusted-libraries") + request = _make_request(method="HEAD", user=_make_authenticated_user(), domain=domain) view = _make_pypi_view() assert permission.has_permission(request, view) is True @@ -107,6 +123,127 @@ def test_authenticated_get_public_domain_allowed(self): assert permission.has_permission(request, view) is True + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + def test_public_domain_pypi_view_never_checks_feature(self, mock_feature_check): + permission = DomainBasedPermission() + domain = _make_domain("public-something") + request = _make_request(method="GET", user=_make_anonymous_user(), domain=domain) + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + mock_feature_check.assert_not_called() + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + def test_anonymous_get_pypi_view_non_lightwell_domain_allowed(self, mock_feature_check): + """Domains other than "lightwell" keep the pre-existing behavior: any SAFE_METHOD + request to a PyPI view is allowed, without any feature check.""" + permission = DomainBasedPermission() + domain = _make_domain("some-other-domain") + request = _make_request(method="GET", user=_make_anonymous_user(), domain=domain) + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + mock_feature_check.assert_not_called() + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + def test_authenticated_get_pypi_view_non_lightwell_domain_allowed(self, mock_feature_check): + permission = DomainBasedPermission() + domain = _make_domain("some-other-domain") + request = _make_request(method="GET", user=_make_authenticated_user(), domain=domain) + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + mock_feature_check.assert_not_called() + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + def test_anonymous_get_pypi_view_no_domain_allowed(self, mock_feature_check): + """No domain resolved on the request (shouldn't happen in practice for PyPI views, + but matches the pre-existing default-allow behavior).""" + permission = DomainBasedPermission() + request = _make_request(method="GET", user=_make_anonymous_user(), domain=None) + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + mock_feature_check.assert_not_called() + + +class TestLightwellDomainPyPIFeatureCheck: + """Verify the lightwell-network feature check, scoped specifically to the "lightwell" + domain's PyPI views.""" + + def test_anonymous_no_org_id_denied(self): + """Anonymous requests carry no org_id, so there's nothing to check the feature for.""" + permission = DomainBasedPermission() + domain = _make_domain("lightwell") + request = _make_request(method="GET", user=_make_anonymous_user(), domain=domain) + view = _make_pypi_view() + + assert permission.has_permission(request, view) is False + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + @patch("pulp_service.app.authorization.DomainOrg.objects") + def test_authenticated_with_feature_allowed(self, mock_domain_org, mock_feature_check): + mock_domain_org.filter.return_value.exists.return_value = False + mock_feature_check.return_value = True + permission = DomainBasedPermission() + domain = _make_domain("lightwell") + request = _make_request(method="GET", user=_make_authenticated_user(), domain=domain, org_id="20368420") + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + mock_feature_check.assert_called_once_with("20368420") + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + @patch("pulp_service.app.authorization.DomainOrg.objects") + def test_authenticated_without_feature_denied(self, mock_domain_org, mock_feature_check): + mock_domain_org.filter.return_value.exists.return_value = False + mock_feature_check.return_value = False + permission = DomainBasedPermission() + domain = _make_domain("lightwell") + request = _make_request(method="GET", user=_make_authenticated_user(), domain=domain, org_id="1979710") + view = _make_pypi_view() + + assert permission.has_permission(request, view) is False + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + def test_unauthenticated_with_org_id_and_feature_allowed(self, mock_feature_check): + """An org_id can be present even without a fully authenticated user; the feature + check still applies (and still gates access) in that case.""" + mock_feature_check.return_value = True + permission = DomainBasedPermission() + domain = _make_domain("lightwell") + request = _make_request(method="GET", user=_make_anonymous_user(), domain=domain, org_id="20368420") + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + + @patch("pulp_service.app.authorization.DomainBasedPermission._has_lightwell_network_feature") + @patch("pulp_service.app.authorization.DomainOrg.objects") + def test_domain_org_association_bypasses_feature_check(self, mock_domain_org, mock_feature_check): + """Users with a DomainOrg association must not be denied by (or even trigger) the + feature check.""" + mock_domain_org.filter.return_value.exists.return_value = True + permission = DomainBasedPermission() + domain = _make_domain("lightwell") + request = _make_request(method="GET", user=_make_authenticated_user(), domain=domain, org_id="1979710") + view = _make_pypi_view() + + assert permission.has_permission(request, view) is True + mock_feature_check.assert_not_called() + + @patch("pulp_service.app.models.FeatureContentGuard._get_cached_result", return_value=None) + @patch("pulp_service.app.models.FeatureContentGuard._check_for_feature", side_effect=PermissionError) + @patch("pulp_service.app.authorization.DomainOrg.objects") + def test_feature_service_failure_fails_closed(self, mock_domain_org, mock_check_feature, mock_cache_result): + """If the Features Service call fails, access must be denied, not silently allowed.""" + mock_domain_org.filter.return_value.exists.return_value = False + permission = DomainBasedPermission() + domain = _make_domain("lightwell") + request = _make_request(method="GET", user=_make_authenticated_user(), domain=domain, org_id="1979710") + view = _make_pypi_view() + + assert permission.has_permission(request, view) is False + class TestSafeMethodDenied: """Verify that safe methods are denied when they should be.""" diff --git a/pulp_service/pulp_service/tests/unit/test_feature_content_guard.py b/pulp_service/pulp_service/tests/unit/test_feature_content_guard.py index ee5b859c..16e90757 100644 --- a/pulp_service/pulp_service/tests/unit/test_feature_content_guard.py +++ b/pulp_service/pulp_service/tests/unit/test_feature_content_guard.py @@ -256,3 +256,81 @@ def test_session_is_a_process_wide_singleton(self): assert first is second mock_session_cls.assert_called_once() + + +class TestFeatureContentGuardCheckFeature: + """Unit tests for `check_feature()`, the caching entry point reused by `permit()` and by + `DomainBasedPermission` (for the lightwell-network feature check on PyPI views).""" + + @patch.object(FeatureContentGuardCache, "set") + @patch.object(FeatureContentGuardCache, "get") + def test_cache_hit_skips_feature_service_call(self, mock_get, mock_set): + entry = json.dumps({"allowed": True, "expires_at": time.time() + 3600}) + mock_get.return_value = entry.encode() + guard = _make_guard() + guard._get_session = MagicMock() + + assert guard.check_feature(ACCOUNT_ID) is True + + guard._get_session.assert_not_called() + mock_set.assert_not_called() + + @patch.object(FeatureContentGuardCache, "set") + @patch.object(FeatureContentGuardCache, "get") + def test_cache_miss_calls_feature_service_and_populates_cache(self, mock_get, mock_set): + mock_get.return_value = None + guard = _make_guard() + session = MagicMock() + session.get.return_value = _feature_response(FEATURES) + guard._get_session = MagicMock(return_value=session) + + assert guard.check_feature(ACCOUNT_ID) is True + + session.get.assert_called_once() + mock_set.assert_called_once() + + @patch.object(FeatureContentGuardCache, "set") + @patch.object(FeatureContentGuardCache, "get") + def test_account_without_feature_returns_false_without_raising(self, mock_get, mock_set): + mock_get.return_value = None + guard = _make_guard() + session = MagicMock() + session.get.return_value = _feature_response(["some-other-feature"]) + guard._get_session = MagicMock(return_value=session) + + assert guard.check_feature(ACCOUNT_ID) is False + + @patch.object(FeatureContentGuardCache, "set") + @patch.object(FeatureContentGuardCache, "get") + def test_feature_service_failure_raises_permission_error(self, mock_get, mock_set): + mock_get.return_value = None + guard = _make_guard() + session = MagicMock() + session.get.side_effect = requests.Timeout("Features Service did not respond") + guard._get_session = MagicMock(return_value=session) + + with pytest.raises(PermissionError): + guard.check_feature(ACCOUNT_ID) + + mock_set.assert_not_called() + + @patch.object(FeatureContentGuardCache, "set") + @patch.object(FeatureContentGuardCache, "get") + def test_permit_and_check_feature_share_the_same_cache_key(self, mock_get, mock_set): + """`permit()` and `check_feature()` must compute identical cache keys for the same + (account_id, features) pair, so a lookup made through one path is reused by the + other and the Features Service isn't hit twice for the same account/feature.""" + mock_get.return_value = None + guard = _make_guard() + session = MagicMock() + session.get.return_value = _feature_response(FEATURES) + guard._get_session = MagicMock(return_value=session) + + guard.permit(_make_request(account_id=ACCOUNT_ID)) + permit_cache_key = mock_set.call_args.args[0] + + mock_set.reset_mock() + assert guard.check_feature(ACCOUNT_ID) is True + check_feature_cache_key = mock_set.call_args.args[0] + + assert permit_cache_key == check_feature_cache_key