Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/lightwell-network-feature-guard.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed ``DomainBasedPermission`` allowing any authenticated user to read the ``lightwell`` domain's PyPI views (simple API, package metadata) without a subscription check. Reading these views now requires either a ``DomainOrg`` association with the domain or the ``lightwell-network`` feature entitlement (checked against the Features Service, using the same cache as ``FeatureContentGuard``); ``public-`` prefixed domains, other domains' PyPI views, and write operations are unaffected.
77 changes: 69 additions & 8 deletions pulp_service/pulp_service/app/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pulpcore.plugin.models import Domain
from pulpcore.plugin.util import extract_pk, get_domain_pk

from pulp_service.app.models import DomainOrg
from pulp_service.app.models import DomainOrg, FeatureContentGuard

_logger = logging.getLogger(__name__)
org_id_var = ContextVar("org_id")
Expand All @@ -20,6 +20,14 @@
user_id_var = ContextVar("user_id")
group_var = ContextVar("group")

# The domain whose PyPI views require the lightwell-network feature entitlement (see
# has_permission()). Other domains' PyPI views keep the pre-existing "any SAFE_METHOD
# request is allowed" behavior.
LIGHTWELL_DOMAIN_NAME = "lightwell"
# Feature entitlement required to read the lightwell domain's PyPI views (simple API, etc.)
# for orgs that don't have a DomainOrg association with the domain. See has_permission().
LIGHTWELL_NETWORK_FEATURE = "lightwell-network"


class DomainBasedPermission(BasePermission):
"""
Expand All @@ -41,23 +49,76 @@ def _has_domain_access(self, domain_pk, org_id, user):

return DomainOrg.objects.filter(query).exists()

def has_permission(self, request, view):
def _has_pypi_read_access(self, request, domain):
"""
Checks SAFE_METHOD access to the lightwell domain's PyPI views.

Users with a DomainOrg association bypass the feature check entirely (existing
permission model). Everyone else -- including unauthenticated users -- must belong
to an org that has the lightwell-network feature entitlement.
"""
user = request.user
domain_pk = domain.pk if domain is not None else get_domain_pk()

decoded_header_content = self.get_decoded_identity_header(request)
org_id = self.get_org_id(decoded_header_content)

if user.is_authenticated and self._has_domain_access(domain_pk, org_id, user):
return True

if org_id is None:
return False

return self._has_lightwell_network_feature(org_id)

def _has_lightwell_network_feature(self, org_id):
"""
Checks (with caching) whether `org_id` has the lightwell-network feature, via the
same cache/lookup mechanism as FeatureContentGuard.
"""
guard = FeatureContentGuard(features=[LIGHTWELL_NETWORK_FEATURE], pulp_domain=None)
try:
return guard.check_feature(org_id)
except PermissionError:
return False

def _check_pypi_safe_method_access(self, request, view, domain):
"""
Returns True/False for a SAFE_METHOD request to a PyPI `view`, or None if `view`
isn't a PyPI view (the caller should then fall through to the standard
DomainOrg-based checks below).

Only the lightwell domain's PyPI views require a DomainOrg association or the
lightwell-network feature entitlement. Other domains' PyPI views keep the
pre-existing "any SAFE_METHOD request is allowed" behavior; non-PyPI endpoints
(Maven, repository listing, Pulp REST API, ...) never hit this method.
"""
from pulp_python.app.pypi.views import PyPIMixin

if not isinstance(view, PyPIMixin):
return None

if domain and domain.name == LIGHTWELL_DOMAIN_NAME:
return self._has_pypi_read_access(request, domain)
return True

def has_permission(self, request, view): # noqa: PLR0911
# Admins have all permissions
if request.user.is_superuser:
return True

user = request.user

# Allow safe requests on PyPI views and public domains for all users
# Allow safe requests on public domains for all users, authenticated or not
if request.method in SAFE_METHODS:
from pulp_python.app.pypi.views import PyPIMixin

if isinstance(view, PyPIMixin):
return True
domain = getattr(request, "pulp_domain", None)
if domain and "public-" in domain.name:
if domain and domain.name.startswith("public-"):
return True

pypi_access = self._check_pypi_safe_method_access(request, view, domain)
if pypi_access is not None:
return pypi_access

if not user.is_authenticated:
return False

Expand Down
36 changes: 25 additions & 11 deletions pulp_service/pulp_service/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,30 @@ def _set_cached_result(feature_cache, key, allowed):
entry = {"allowed": allowed, "expires_at": time.time() + feature_cache.default_expires_ttl}
feature_cache.set(key, json.dumps(entry), expires=feature_cache.default_expires_ttl)

def check_feature(self, account_id):
"""
Returns whether `account_id` has all of `self.features`, per the Features Service.

Reuses the same `FeatureContentGuardCache` cache key scheme as `permit()` so that
callers checking the same (account_id, features) pair -- e.g. `DomainBasedPermission`
checking the `lightwell-network` feature -- share cache entries with this guard and
avoid redundant Features Service calls. May raise `PermissionError` if the Features
Service call fails (see `_check_for_feature`).
"""
cache_key = f"{account_id}-{','.join(self.features)}"
cache_key_digest = sha256(bytes(cache_key, "utf8")).hexdigest()
feature_cache = FeatureContentGuardCache()
account_allowed = self._get_cached_result(feature_cache, cache_key_digest)

if account_allowed is None:
_logger.debug("Feature cache MISS for key %s", cache_key_digest)
account_allowed = self._check_for_feature(account_id)
self._set_cached_result(feature_cache, cache_key_digest, account_allowed)
else:
_logger.debug("Feature cache HIT for key %s", cache_key_digest)

return account_allowed

def permit(self, request):
try:
header_content = request.headers[self.header_name]
Expand All @@ -190,17 +214,7 @@ def permit(self, request):
_logger.exception("Access not allowed - Invalid JSON or Path not found.")
raise PermissionError(_("Access denied.")) from exc

cache_key = f"{header_value}-{','.join(self.features)}"
cache_key_digest = sha256(bytes(cache_key, "utf8")).hexdigest()
feature_cache = FeatureContentGuardCache()
account_allowed = self._get_cached_result(feature_cache, cache_key_digest)

if account_allowed is None:
_logger.debug("Feature cache MISS for key %s", cache_key_digest)
account_allowed = self._check_for_feature(header_value)
self._set_cached_result(feature_cache, cache_key_digest, account_allowed)
else:
_logger.debug("Feature cache HIT for key %s", cache_key_digest)
account_allowed = self.check_feature(header_value)

if not account_allowed:
_logger.warning("Access not allowed - Features not available for the user.")
Expand Down
7 changes: 7 additions & 0 deletions pulp_service/pulp_service/tests/functional/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
CONTENT_GUARD_FEATURES_NOT_SUBSCRIBED = ["rhods"]
CONTENT_GUARD_FILTER = ".identity.org_id"

# LIGHTWELL-NETWORK FEATURE CONSTANTS
# Used to test the lightwell-network feature check enforced by DomainBasedPermission on the
# "lightwell" domain's PyPI views. These are real staging Features Service accounts.
LIGHTWELL_NETWORK_FEATURE = "lightwell-network"
LIGHTWELL_ENTITLED_ORG_ID = "20368420" # has the lightwell-network feature
LIGHTWELL_NOT_ENTITLED_ORG_ID = "1979710" # does not have the lightwell-network feature

# VULNERABILITY REPORT CONSTANTS
# NPM
NPM_REMOTE_REGISTRY = "https://registry.npmjs.org/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ def test_get_requests_without_auth_to_simple_api(
python_bindings,
gen_object_with_cleanup,
):
"""Test that all domains allow GET requests without authentication but block other methods."""
"""Test that all domains (other than "lightwell") allow GET requests without
authentication but block other methods. The "lightwell" domain is excluded from this
behavior -- see test_lightwell_feature_permission.py.
"""
# Create a user with credentials to set up the domain
setup_user = {
"identity": {"org_id": 33333, "internal": {"org_id": 33333}, "user": {"username": "publicdomainuser"}}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
"""
Functional tests for the lightwell-network feature check enforced by DomainBasedPermission,
scoped specifically to the "lightwell" domain's PyPI views (simple API, package metadata,
etc.). Other domains' PyPI views, and all non-PyPI endpoints (even within the lightwell
domain), keep the pre-existing permission model unaffected by this feature check.

These follow the pattern used in test_feature_service.py: they exercise the real Features
Service (no mocking) using known staging accounts. Org LIGHTWELL_ENTITLED_ORG_ID has the
lightwell-network feature; org LIGHTWELL_NOT_ENTITLED_ORG_ID does not.

NOTE: the feature check is keyed off the literal domain name "lightwell" (see
pulp_service.app.authorization.LIGHTWELL_DOMAIN_NAME), so the domain created here can't use
a random per-test suffix like most other functional tests in this suite. These tests assume
they run against an ephemeral Pulp instance where no "lightwell" domain already exists.
"""

import json
from base64 import b64encode
from urllib.parse import urljoin
from uuid import uuid4

import pytest
import requests

from pulp_service.tests.functional.constants import (
LIGHTWELL_ENTITLED_ORG_ID,
LIGHTWELL_NOT_ENTITLED_ORG_ID,
)

LIGHTWELL_DOMAIN_NAME = "lightwell"

# An org with no DomainOrg association with the test domains and no lightwell-network
# feature entitlement; only used to own the test domain/repo/distribution.
DOMAIN_OWNER_ORG_ID = "555555555"


def _identity_header(org_id, username):
identity = {
"identity": {
"org_id": org_id,
"internal": {"org_id": org_id},
"user": {"username": username},
}
}
return b64encode(json.dumps(identity).encode()).decode()


@pytest.fixture
def configure_pypi_distribution(
anonymous_user,
gen_object_with_cleanup,
pulpcore_bindings,
python_bindings,
bindings_cfg,
):
"""
Creates a domain owned by DOMAIN_OWNER_ORG_ID, with a Python repository and a PyPI
distribution.

Returns a (domain_name, pypi_simple_url, repos_url, owner_header) tuple.
"""
owner_header = _identity_header(DOMAIN_OWNER_ORG_ID, "lightwell-test-owner")

def _configure(domain_name):
with anonymous_user:
pulpcore_bindings.DomainsApi.api_client.default_headers["x-rh-identity"] = owner_header

gen_object_with_cleanup(
pulpcore_bindings.DomainsApi,
{
"name": domain_name,
"storage_class": "pulpcore.app.models.storage.FileSystem",
"storage_settings": {"MEDIA_ROOT": "/var/lib/pulp/media/"},
},
)

python_bindings.RepositoriesPythonApi.api_client.default_headers["x-rh-identity"] = owner_header
repo = gen_object_with_cleanup(
python_bindings.RepositoriesPythonApi, {"name": str(uuid4())}, pulp_domain=domain_name
)

python_bindings.DistributionsPypiApi.api_client.default_headers["x-rh-identity"] = owner_header
base_path = str(uuid4())
gen_object_with_cleanup(
python_bindings.DistributionsPypiApi,
{"name": str(uuid4()), "base_path": base_path, "repository": repo.pulp_href},
pulp_domain=domain_name,
)

pypi_url = urljoin(bindings_cfg.host, f"/api/pypi/{domain_name}/{base_path}/simple/")
repos_url = urljoin(bindings_cfg.host, f"/api/pulp/{domain_name}/api/v3/repositories/python/python/")
return domain_name, pypi_url, repos_url, owner_header

yield _configure

pulpcore_bindings.DomainsApi.api_client.default_headers.pop("x-rh-identity", None)
python_bindings.RepositoriesPythonApi.api_client.default_headers.pop("x-rh-identity", None)
python_bindings.DistributionsPypiApi.api_client.default_headers.pop("x-rh-identity", None)


@pytest.fixture
def configure_lightwell_pypi_distribution(configure_pypi_distribution):
"""Same as configure_pypi_distribution, but uses the literal "lightwell" domain name that
DomainBasedPermission gates behind the lightwell-network feature."""

def _configure():
return configure_pypi_distribution(LIGHTWELL_DOMAIN_NAME)

return _configure


def test_org_without_feature_denied_on_lightwell_pypi_simple_api(configure_lightwell_pypi_distribution):
"""A user whose org doesn't have the lightwell-network feature and has no DomainOrg
association gets 403 on the lightwell domain's PyPI simple API."""
_, pypi_url, _, _ = configure_lightwell_pypi_distribution()
headers = {"x-rh-identity": _identity_header(LIGHTWELL_NOT_ENTITLED_ORG_ID, "not-entitled-user")}

response = requests.get(pypi_url, headers=headers, timeout=30)

assert response.status_code == 403


def test_org_with_feature_allowed_on_lightwell_pypi_simple_api(configure_lightwell_pypi_distribution):
"""A user whose org has the lightwell-network feature can read the lightwell domain's
PyPI simple API, even without a DomainOrg association."""
_, pypi_url, _, _ = configure_lightwell_pypi_distribution()
headers = {"x-rh-identity": _identity_header(LIGHTWELL_ENTITLED_ORG_ID, "entitled-user")}

response = requests.get(pypi_url, headers=headers, timeout=30)

assert response.status_code == 200


def test_domain_org_association_bypasses_feature_check(configure_lightwell_pypi_distribution):
"""The domain owner (has a DomainOrg association) can read the lightwell domain's PyPI
simple API regardless of the lightwell-network feature."""
_, pypi_url, _, owner_header = configure_lightwell_pypi_distribution()
headers = {"x-rh-identity": owner_header}

response = requests.get(pypi_url, headers=headers, timeout=30)

assert response.status_code == 200


def test_unauthenticated_denied_on_lightwell_pypi_simple_api(configure_lightwell_pypi_distribution):
"""Without any identity at all (no org_id to check a feature for), the lightwell domain's
PyPI simple API must not be readable."""
_, pypi_url, _, _ = configure_lightwell_pypi_distribution()

response = requests.get(pypi_url, timeout=30)

assert response.status_code in (401, 403)


def test_write_operations_unaffected_by_feature_check(configure_lightwell_pypi_distribution):
"""The lightwell-network feature only grants read access -- an entitled org with no
DomainOrg association must still be denied write access to the lightwell domain's PyPI
views."""
_, pypi_url, _, _ = configure_lightwell_pypi_distribution()
headers = {"x-rh-identity": _identity_header(LIGHTWELL_ENTITLED_ORG_ID, "entitled-write-user")}

response = requests.post(pypi_url, headers=headers, data={}, timeout=30)

assert response.status_code in (401, 403)


def test_non_pypi_endpoints_unaffected_by_feature_check(configure_lightwell_pypi_distribution):
"""Non-PyPI endpoints (here, the Pulp REST API's repository listing) must keep using the
existing DomainOrg-based permission model, even within the lightwell domain: the
lightwell-network feature grants no access to them."""
_, _, repos_url, _ = configure_lightwell_pypi_distribution()
headers = {"x-rh-identity": _identity_header(LIGHTWELL_ENTITLED_ORG_ID, "entitled-rest-user")}

response = requests.get(repos_url, headers=headers, timeout=30)

assert response.status_code == 403


def test_other_domains_pypi_simple_api_unaffected_by_feature_check(configure_pypi_distribution):
"""Domains other than "lightwell" must keep the pre-existing behavior: any SAFE_METHOD
request -- even unauthenticated -- can read the PyPI simple API, regardless of the
lightwell-network feature."""
domain_name = f"not-lightwell-{uuid4()}"
_, pypi_url, _, _ = configure_pypi_distribution(domain_name)

response = requests.get(pypi_url, timeout=30)

assert response.status_code == 200


def test_public_domain_allows_unauthenticated_pypi_access(configure_pypi_distribution):
"""A public- domain's PyPI simple API stays open to unauthenticated SAFE_METHOD
requests -- unaffected by the lightwell-network feature check (which never applies to
public- domains, or to non-"lightwell" domains in general)."""
domain_name = f"public-{uuid4()}"
_, pypi_url, _, _ = configure_pypi_distribution(domain_name)

response = requests.get(pypi_url, timeout=30)

assert response.status_code == 200
Loading
Loading