Skip to content

Commit aea685f

Browse files
committed
Test certificate pinning
1 parent 8adb04c commit aea685f

1 file changed

Lines changed: 185 additions & 0 deletions

File tree

tests/test_certificate_pinning.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
"""Integration test for TLS certificate pinning with a real HTTPS server."""
2+
3+
from collections.abc import AsyncGenerator
4+
import datetime
5+
import ipaddress
6+
import json
7+
import pathlib
8+
import ssl
9+
10+
import aiohttp
11+
from aiohttp import web
12+
from cryptography import x509
13+
from cryptography.hazmat.primitives import hashes, serialization
14+
from cryptography.hazmat.primitives.asymmetric import rsa
15+
from cryptography.x509.oid import NameOID
16+
import pytest
17+
18+
from nanokvm.client import NanoKVMClient
19+
from nanokvm.utils import async_fetch_remote_fingerprint
20+
21+
22+
def generate_nanokvm_cert() -> tuple[bytes, bytes]:
23+
"""Generate a self-signed certificate matching NanoKVM's cert.go parameters.
24+
25+
RSA 2048, CN=localhost, SAN: localhost + 127.0.0.1 + ::1, valid 10 years,
26+
KeyUsage: keyEncipherment | digitalSignature, ExtKeyUsage: serverAuth.
27+
"""
28+
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
29+
30+
subject = issuer = x509.Name(
31+
[
32+
x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
33+
]
34+
)
35+
36+
cert = (
37+
x509.CertificateBuilder()
38+
.subject_name(subject)
39+
.issuer_name(issuer)
40+
.public_key(key.public_key())
41+
.serial_number(x509.random_serial_number())
42+
.not_valid_before(datetime.datetime.now(datetime.timezone.utc))
43+
.not_valid_after(
44+
datetime.datetime.now(datetime.timezone.utc)
45+
+ datetime.timedelta(days=365 * 10)
46+
)
47+
.add_extension(
48+
x509.SubjectAlternativeName(
49+
[
50+
x509.DNSName("localhost"),
51+
x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")),
52+
x509.IPAddress(ipaddress.IPv6Address("::1")),
53+
]
54+
),
55+
critical=False,
56+
)
57+
.add_extension(
58+
x509.KeyUsage(
59+
digital_signature=True,
60+
key_encipherment=True,
61+
content_commitment=False,
62+
data_encipherment=False,
63+
key_agreement=False,
64+
key_cert_sign=False,
65+
crl_sign=False,
66+
encipher_only=False,
67+
decipher_only=False,
68+
),
69+
critical=True,
70+
)
71+
.add_extension(
72+
x509.ExtendedKeyUsage([x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]),
73+
critical=False,
74+
)
75+
.add_extension(
76+
x509.BasicConstraints(ca=False, path_length=None),
77+
critical=True,
78+
)
79+
.sign(key, hashes.SHA256())
80+
)
81+
82+
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
83+
key_pem = key.private_bytes(
84+
serialization.Encoding.PEM,
85+
serialization.PrivateFormat.PKCS8,
86+
serialization.NoEncryption(),
87+
)
88+
89+
return cert_pem, key_pem
90+
91+
92+
async def _handle_login(request: web.Request) -> web.Response:
93+
body = await request.json()
94+
95+
if body["username"] == "admin" and body["password"] == "test":
96+
return web.Response(
97+
text=json.dumps(
98+
{
99+
"code": 0,
100+
"msg": "success",
101+
"data": {"token": "fake-token-123"},
102+
}
103+
),
104+
content_type="application/json",
105+
)
106+
107+
return web.Response(
108+
text=json.dumps(
109+
{
110+
"code": -2,
111+
"msg": "invalid username or password",
112+
"data": None,
113+
}
114+
),
115+
content_type="application/json",
116+
)
117+
118+
119+
@pytest.fixture
120+
async def nanokvm_https_server(tmp_path: pathlib.Path) -> AsyncGenerator[str, None]:
121+
"""Spin up a minimal HTTPS server mimicking a NanoKVM device."""
122+
cert_pem, key_pem = generate_nanokvm_cert()
123+
124+
cert_file = tmp_path / "server.crt"
125+
key_file = tmp_path / "server.key"
126+
cert_file.write_bytes(cert_pem)
127+
key_file.write_bytes(key_pem)
128+
129+
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
130+
ssl_ctx.load_cert_chain(str(cert_file), str(key_file))
131+
132+
app = web.Application()
133+
app.router.add_post("/api/auth/login", _handle_login)
134+
135+
runner = web.AppRunner(app)
136+
await runner.setup()
137+
138+
try:
139+
site = web.TCPSite(runner, "127.0.0.1", 0, ssl_context=ssl_ctx)
140+
await site.start()
141+
142+
yield site.name + "api/"
143+
finally:
144+
await runner.cleanup()
145+
146+
147+
async def test_certificate_pinning(nanokvm_https_server: str) -> None:
148+
"""Test the full certificate pinning flow against a real HTTPS server.
149+
150+
1. Connecting with default SSL verification fails (self-signed cert).
151+
2. Fetch the server's certificate fingerprint.
152+
3. Connecting with the pinned fingerprint succeeds.
153+
"""
154+
url = nanokvm_https_server
155+
156+
# Step 1: default SSL verification rejects the self-signed certificate
157+
async with NanoKVMClient(url, use_password_obfuscation=False) as client:
158+
with pytest.raises(aiohttp.ClientConnectorCertificateError):
159+
await client.authenticate("admin", "test")
160+
161+
# Step 2: fetch the remote certificate fingerprint
162+
fingerprint = await async_fetch_remote_fingerprint(url)
163+
assert len(fingerprint) == 64 # SHA-256 hex string
164+
165+
# Step 3: pinned fingerprint allows the connection to succeed
166+
async with NanoKVMClient(
167+
url,
168+
pinned_ca_cert_hash=fingerprint,
169+
use_password_obfuscation=False,
170+
) as client:
171+
await client.authenticate("admin", "test")
172+
assert client.token == "fake-token-123"
173+
174+
175+
async def test_certificate_pinning_wrong_hash(nanokvm_https_server: str) -> None:
176+
"""Test that a wrong pinned hash is rejected."""
177+
url = nanokvm_https_server
178+
179+
async with NanoKVMClient(
180+
url,
181+
pinned_ca_cert_hash="AB" * 32,
182+
use_password_obfuscation=False,
183+
) as client:
184+
with pytest.raises(aiohttp.ServerFingerprintMismatch):
185+
await client.authenticate("admin", "test")

0 commit comments

Comments
 (0)