Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/workflows/tests/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,53 @@

from __future__ import annotations

import json
import os
import unittest
from typing import Any

import requests
import urllib3
from requests.auth import HTTPBasicAuth

# Default timeout for HTTP calls to the gateway (self-signed TLS, CI).
KNOX_REQUEST_TIMEOUT = 30

HSTS_HEADER_NAME = "Strict-Transport-Security"
HSTS_EXPECTED_VALUE = "max-age=300; includeSubDomains"

# Top-level keys in /gateway/{topology}/health/v1/metrics JSON (see Java GatewayHealthFuncTest).
METRICS_TOP_LEVEL_KEYS = frozenset(
{"timers", "histograms", "counters", "gauges", "version", "meters"}
)

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def knox_ldap_guest_auth() -> HTTPBasicAuth:
"""HTTP Basic auth for the demo LDAP `guest` user (CI Docker topologies)."""

return HTTPBasicAuth("guest", "guest-password")


def knox_ldap_admin_auth() -> HTTPBasicAuth:
"""HTTP Basic auth for the demo LDAP `admin` user (CI Docker topologies)."""

return HTTPBasicAuth("admin", "admin-password")


def health_metrics_pretty_dict(gateway_base: str) -> dict[str, Any]:
"""
GET health metrics with ?pretty=true; raise AssertionError if not HTTP 200 or invalid JSON.
gateway_base must include a trailing slash (see gateway_base_url()).
"""

r = knox_get(gateway_base + "gateway/health/v1/metrics?pretty=true")
if r.status_code != 200:
raise AssertionError(f"health metrics expected 200, got {r.status_code}")
return json.loads(r.text)


def gateway_base_url() -> str:
"""Return KNOX_GATEWAY_URL with a trailing slash."""
url = os.environ.get("KNOX_GATEWAY_URL", "https://localhost:8443/")
Expand Down
155 changes: 113 additions & 42 deletions .github/workflows/tests/test_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@

import requests

from common_utils import assert_hsts_header, gateway_base_url, knox_get
from common_utils import (
METRICS_TOP_LEVEL_KEYS,
assert_hsts_header,
gateway_base_url,
health_metrics_pretty_dict,
knox_get,
knox_post,
)


class TestKnoxHealth(unittest.TestCase):
"""Integration checks for the gateway health REST API (ping and metrics)."""

# Top-level keys expected in /metrics JSON (aligned with Java GatewayHealthFuncTest).
_METRICS_TOP_LEVEL_KEYS = frozenset(
{"timers", "histograms", "counters", "gauges", "version", "meters"}
)

def setUp(self):
self.base_url = gateway_base_url()

Expand All @@ -40,58 +42,127 @@ def test_health_ping_ok_and_hsts(self):
print(f"Received status code: {response.status_code}")
self.assertEqual(response.status_code, 200)
self.assertEqual(response.text.strip(), "OK")
self.assertIn("text/plain", response.headers.get("Content-Type", ""))

assert_hsts_header(self, response)
except requests.exceptions.ConnectionError:
self.fail("Failed to connect to Knox on port 8443 - Connection refused")
except Exception as e:
self.fail(f"Health check failed with unexpected error: {e}")

def test_health_metrics_returns_json(self):
"""Metrics with pretty=true returns 200 and a JSON object with application/json content type."""
url = self.base_url + "gateway/health/v1/metrics?pretty=true"
response = knox_get(url)
self.assertEqual(response.status_code, 200)

content_type = response.headers.get("Content-Type", "")
self.assertIn("application/json", content_type)
class TestHealthGatewayExtended(unittest.TestCase):
"""Anonymous HEALTH topology: gateway-status, ping variants, metrics keys, routing."""

payload = json.loads(response.text)
self.assertIsInstance(payload, dict)
def setUp(self):
self.base_url = gateway_base_url()

def test_health_gateway_status_returns_ok_or_pending_plain_text(self):
"""gateway-status is 200 text/plain with body OK or PENDING."""
url = self.base_url + "gateway/health/v1/gateway-status"
r = knox_get(url)
self.assertEqual(r.status_code, 200)
self.assertIn("text/plain", r.headers.get("Content-Type", ""))
self.assertIn(r.text.strip(), ("OK", "PENDING"))

def test_health_metrics_contains_core_fields(self):
"""Metrics JSON should expose the same top-level keys as the Java GatewayHealthFuncTest."""
def test_health_ping_post_returns_ok(self):
"""POST /v1/ping matches GET semantics for the health service."""
url = self.base_url + "gateway/health/v1/ping"
r = knox_post(url)
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text.strip(), "OK")

def test_health_ping_sets_cache_control_no_store(self):
"""Ping uses must-revalidate,no-cache,no-store (see PingResource)."""
url = self.base_url + "gateway/health/v1/ping"
r = knox_get(url)
self.assertEqual(r.status_code, 200)
cc = r.headers.get("Cache-Control", "")
self.assertIn("no-store", cc)
self.assertIn("no-cache", cc)

def test_health_metrics_pretty_includes_all_core_top_level_keys(self):
"""Pretty metrics JSON is application/json with timers/histograms/counters/gauges/version/meters."""
url = self.base_url + "gateway/health/v1/metrics?pretty=true"
response = knox_get(url)
self.assertEqual(response.status_code, 200)
payload = json.loads(response.text)
r = knox_get(url)
self.assertEqual(r.status_code, 200)
self.assertIn("application/json", r.headers.get("Content-Type", ""))
payload = json.loads(r.text)
self.assertIsInstance(payload, dict)
self.assertTrue(
self._METRICS_TOP_LEVEL_KEYS.issubset(payload.keys()),
msg=f"Missing keys: {self._METRICS_TOP_LEVEL_KEYS - set(payload.keys())}",
METRICS_TOP_LEVEL_KEYS.issubset(payload.keys()),
msg=f"Missing keys: {METRICS_TOP_LEVEL_KEYS - set(payload.keys())}",
)

def test_health_metrics_without_pretty_returns_json(self):
"""Metrics without pretty still returns 200, parseable JSON, and the same top-level keys as pretty."""
def test_health_metrics_without_pretty_includes_same_top_level_keys(self):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are test duplications with this new PR. For example we already have test_health_metrics_without_pretty_returns_json which does the same.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hanicz can you review it once more, thanks

"""Metrics without ?pretty= returns application/json with the same registry sections."""
url = self.base_url + "gateway/health/v1/metrics"
response = knox_get(url)
self.assertEqual(response.status_code, 200)
self.assertIn("application/json", response.headers.get("Content-Type", ""))
payload = json.loads(response.text)
r = knox_get(url)
self.assertEqual(r.status_code, 200)
self.assertIn("application/json", r.headers.get("Content-Type", ""))
payload = json.loads(r.text)
self.assertIsInstance(payload, dict)
self.assertTrue(
self._METRICS_TOP_LEVEL_KEYS.issubset(payload.keys()),
msg=f"Missing keys: {self._METRICS_TOP_LEVEL_KEYS - set(payload.keys())}",
)
self.assertTrue(METRICS_TOP_LEVEL_KEYS.issubset(payload.keys()))

def test_health_metrics_version_value_is_non_empty_string(self):
"""The version entry in metrics JSON is a string."""
payload = health_metrics_pretty_dict(self.base_url)
ver = payload.get("version")
self.assertIsInstance(ver, str)
self.assertTrue(len(ver) > 0)

def test_unknown_topology_returns_404(self):
"""Requests to an undeployed topology name fail with 404."""
url = self.base_url + "gateway/not-a-deployed-topology/v1/ping"
r = knox_get(url)
self.assertEqual(r.status_code, 404)

def test_health_gateway_status_includes_hsts(self):
"""gateway-status uses the same global Strict-Transport-Security as other gateway responses."""
url = self.base_url + "gateway/health/v1/gateway-status"
r = knox_get(url)
self.assertEqual(r.status_code, 200)
assert_hsts_header(self, r)

def test_health_metrics_includes_hsts(self):
"""Metrics JSON responses include the expected HSTS header."""
url = self.base_url + "gateway/health/v1/metrics?pretty=true"
r = knox_get(url)
self.assertEqual(r.status_code, 200)
assert_hsts_header(self, r)

def test_health_ping_content_type_is_plain_text(self):
"""Ping response declares text/plain Content-Type and body OK."""
url = self.base_url + "gateway/health/v1/ping"
response = knox_get(url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.text.strip(), "OK")
content_type = response.headers.get("Content-Type", "")
self.assertIn("text/plain", content_type)
def test_health_gateway_status_cache_control_no_store(self):
"""gateway-status sets Cache-Control with no-cache/no-store like ping."""
url = self.base_url + "gateway/health/v1/gateway-status"
r = knox_get(url)
self.assertEqual(r.status_code, 200)
cc = r.headers.get("Cache-Control", "")
self.assertIn("no-store", cc)
self.assertIn("no-cache", cc)

if __name__ == '__main__':
unittest.main()

class TestHealthMetricsSectionShapes(unittest.TestCase):
"""Dropwizard metric registry JSON: each major section is an object."""

@classmethod
def setUpClass(cls):
cls._payload = health_metrics_pretty_dict(gateway_base_url())

def test_metrics_timers_section_is_dict(self):
self.assertIsInstance(self._payload["timers"], dict)

def test_metrics_histograms_section_is_dict(self):
self.assertIsInstance(self._payload["histograms"], dict)

def test_metrics_counters_section_is_dict(self):
self.assertIsInstance(self._payload["counters"], dict)

def test_metrics_gauges_section_is_dict(self):
self.assertIsInstance(self._payload["gauges"], dict)

def test_metrics_meters_section_is_dict(self):
self.assertIsInstance(self._payload["meters"], dict)


if __name__ == "__main__":
unittest.main()
85 changes: 81 additions & 4 deletions .github/workflows/tests/test_knox_auth_service_and_LDAP.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from requests.auth import HTTPBasicAuth

from common_utils import collect_actor_group_values, gateway_base_url, knox_get
from common_utils import (
collect_actor_group_values,
gateway_base_url,
knox_get,
knox_ldap_admin_auth,
knox_ldap_guest_auth,
)

########################################################
# This test is verifying the behavior of the Knox Auth Service + LDAP authentication.
Expand All @@ -40,7 +46,7 @@ def test_auth_service_guest(self):
print(f"\nTesting guest authentication against {self.topology_url}")
response = knox_get(
self.topology_url,
auth=HTTPBasicAuth('guest', 'guest-password'),
auth=knox_ldap_guest_auth(),
)

print(f"Status Code: {response.status_code}")
Expand All @@ -60,7 +66,7 @@ def test_auth_service_admin_groups(self):
print(f"\nTesting admin authentication against {self.topology_url}")
response = knox_get(
self.topology_url,
auth=HTTPBasicAuth('admin', 'admin-password'),
auth=knox_ldap_admin_auth(),
)

print(f"Status Code: {response.status_code}")
Expand All @@ -85,6 +91,77 @@ def test_auth_service_admin_groups(self):
for group in expected_groups:
self.assertIn(group, all_groups)


class TestKnoxLdapKnoxToken(unittest.TestCase):
"""KNOXTOKEN + JWKS under knoxldap topology (Shiro + LDAP)."""

def setUp(self):
self.base_url = gateway_base_url()
self._token_prefix = self.base_url + "gateway/knoxldap/knoxtoken/api"

def test_knoxldap_jwks_with_guest_returns_json_with_keys_array(self):
"""JWKS document is JSON with a keys array (may be empty if no RSA key)."""
url = self._token_prefix + "/v1/jwks.json"
r = knox_get(url, auth=knox_ldap_guest_auth())
self.assertEqual(r.status_code, 200)
self.assertIn("application/json", r.headers.get("Content-Type", ""))
body = json.loads(r.text)
self.assertIn("keys", body)
self.assertIsInstance(body["keys"], list)

def test_knoxldap_jwks_without_credentials_returns_401(self):
"""Shiro requires BASIC auth for knoxldap paths including JWKS."""
url = self._token_prefix + "/v1/jwks.json"
r = knox_get(url)
self.assertEqual(r.status_code, 401)

def test_knoxldap_token_v1_get_returns_access_token_json(self):
"""GET knoxtoken v1 returns a JWT access_token for a valid LDAP user."""
url = self._token_prefix + "/v1/token"
r = knox_get(url, auth=knox_ldap_guest_auth())
self.assertEqual(r.status_code, 200)
self.assertIn("application/json", r.headers.get("Content-Type", ""))
body = json.loads(r.text)
self.assertIn("access_token", body)
token = body["access_token"]
self.assertIsInstance(token, str)
self.assertTrue(len(token) > 0)
parts = token.split(".")
self.assertGreaterEqual(len(parts), 3, msg="access_token should look like a JWT")

def test_knoxldap_token_v2_get_returns_access_token_json(self):
"""GET knoxtoken v2/token exposes access_token for basic acquisition."""
url = self._token_prefix + "/v2/token"
r = knox_get(url, auth=knox_ldap_guest_auth())
self.assertEqual(r.status_code, 200)
body = json.loads(r.text)
self.assertIn("access_token", body)

def test_knoxldap_token_v1_without_credentials_returns_401(self):
url = self._token_prefix + "/v1/token"
r = knox_get(url)
self.assertEqual(r.status_code, 401)

def test_knoxldap_token_v2_without_credentials_returns_401(self):
"""Shiro requires auth for v2 token the same as v1."""
url = self._token_prefix + "/v2/token"
r = knox_get(url)
self.assertEqual(r.status_code, 401)


class TestKnoxLdapExtAuthzAuthn(unittest.TestCase):
"""Unauthenticated access to knoxldap extauthz (Shiro)."""

def setUp(self):
self.base_url = gateway_base_url()
self.extauthz = self.base_url + "gateway/knoxldap/auth/api/v1/extauthz"

def test_knoxldap_extauthz_without_credentials_returns_401(self):
"""extauthz requires BASIC auth like other knoxldap paths."""
r = knox_get(self.extauthz)
self.assertEqual(r.status_code, 401)


if __name__ == '__main__':
unittest.main()

Loading