Skip to content

Commit b84b3ab

Browse files
committed
Use cryptography.x509.verification for certification verifier.
1 parent 17d3b8f commit b84b3ab

5 files changed

Lines changed: 35 additions & 66 deletions

File tree

cwt/algs/asymmetric.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from typing import Any, Dict, List
22

3-
from certvalidator import CertificateValidator, ValidationContext
3+
from cryptography.x509 import Certificate, DNSName, load_der_x509_certificate
4+
from cryptography.x509.oid import NameOID
5+
from cryptography.x509.verification import PolicyBuilder, Store
46

57
from ..cose_key_interface import COSEKeyInterface
68
from ..exceptions import VerifyError
@@ -11,28 +13,32 @@ def __init__(self, params: Dict[int, Any]):
1113
super().__init__(params)
1214

1315
self._key: Any = b""
14-
self._cert = b""
15-
self._intermediates = []
16+
self._cert: Certificate = None
17+
self._intermediates: List[Certificate] = []
1618

1719
if 33 in params:
1820
if not isinstance(params[33], (bytes, list)):
1921
raise ValueError("x5c(33) should be bytes(bstr) or list.")
2022
certs = [params[33]] if isinstance(params[33], bytes) else params[33]
21-
self._cert = certs[0]
23+
self._cert = load_der_x509_certificate(certs[0])
2224
if len(certs) > 1:
23-
self._intermediates = certs[1:]
25+
for c in certs[1:]:
26+
self._intermediates.append(load_der_x509_certificate(c))
2427
return
2528

26-
def validate_certificate(self, ca_certs: List[bytes]) -> bool:
29+
def validate_certificate(self, ca_certs: List[Certificate]) -> bool:
2730
if not ca_certs:
2831
raise ValueError("ca_certs should be set.")
2932
if not self._cert:
3033
return False
3134

32-
ctx = ValidationContext(trust_roots=ca_certs)
35+
store = Store(ca_certs)
36+
builder = PolicyBuilder().store(store)
37+
verifier = builder.build_server_verifier(
38+
DNSName(self._cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value)
39+
)
3340
try:
34-
validator = CertificateValidator(self._cert, self._intermediates, validation_context=ctx)
35-
validator.validate_usage(set(["digital_signature"]), extended_optional=True)
41+
verifier.verify(self._cert, self._intermediates)
3642
except Exception as err:
3743
raise VerifyError("Failed to validate the certificate bound to the key.") from err
3844
return True

cwt/cose.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Any, Dict, List, Optional, Tuple, Union
22

3-
from asn1crypto import pem
43
from cbor2 import CBORTag
4+
from cryptography.x509 import load_pem_x509_certificates
55

66
from .cbor_processor import CBORProcessor
77
from .const import (
@@ -54,11 +54,8 @@ def __init__(
5454
if ca_certs:
5555
if not isinstance(ca_certs, str):
5656
raise ValueError("ca_certs should be str.")
57-
self._trust_roots: List[bytes] = []
5857
with open(ca_certs, "rb") as f:
59-
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True):
60-
self._ca_certs.append(der_bytes)
61-
58+
self._ca_certs = load_pem_x509_certificates(f.read())
6259
if not isinstance(deterministic_header, bool):
6360
raise ValueError("deterministic_header should be bool.")
6461
self._deterministic_header = deterministic_header

poetry.lock

Lines changed: 4 additions & 44 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@ exclude = [
2626

2727
[tool.poetry.dependencies]
2828
python = "^3.9.2,<4.0"
29-
asn1crypto = "^1.4.0"
3029
cbor2 = "^5.4.2"
31-
certvalidator = "^0.11.1"
3230
cryptography = ">=42.0.1,<45"
3331
Sphinx = {version = ">=7.1,<8", optional = true, extras = ["docs"]}
3432
sphinx-autodoc-typehints = {version = ">=1.25.2,<3.0.0", optional = true, extras = ["docs"]}

tests/test_cwt_sample.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def test_sample_readme_signed_cwt_es256(self):
175175
decoded = cwt.decode(token, public_key)
176176
assert 1 in decoded and decoded[1] == "coaps://as.example"
177177

178-
def test_sample_readme_signed_cwt_es256_with_cert(self):
178+
def test_sample_readme_signed_cwt_es256_with_cert_missing_required_extension(self):
179179
# with open(key_path("cacert.pem")) as f:
180180
# k1 = x509.load_pem_x509_certificate(f.read().encode("utf-8"))
181181

@@ -196,10 +196,14 @@ def test_sample_readme_signed_cwt_es256_with_cert(self):
196196
token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key)
197197

198198
decoder = CWT.new(ca_certs=key_path("cacert.pem"))
199-
decoded = decoder.decode(token, public_key)
200-
assert 1 in decoded and decoded[1] == "coaps://as.example"
199+
with pytest.raises(VerifyError) as err:
200+
decoder.decode(token, public_key)
201+
pytest.fail("decode() should fail.")
202+
assert "Failed to validate the certificate bound to the key." in str(err.value)
203+
# decoded = decoder.decode(token, public_key)
204+
# assert 1 in decoded and decoded[1] == "coaps://as.example"
201205

202-
def test_sample_readme_signed_cwt_es256_with_cert_without_intermediates(self):
206+
def test_sample_readme_signed_cwt_es256_with_cert_missing_required_extension_without_intermediates(self):
203207
with open(key_path("private_key_cert_es256.pem")) as f:
204208
private_key = COSEKey.from_pem(f.read(), kid="P-256-01")
205209

@@ -209,8 +213,12 @@ def test_sample_readme_signed_cwt_es256_with_cert_without_intermediates(self):
209213
token = cwt.encode({"iss": "coaps://as.example", "sub": "dajiaji", "cti": "123"}, private_key)
210214

211215
decoder = CWT.new(ca_certs=key_path("cacert.pem"))
212-
decoded = decoder.decode(token, public_key)
213-
assert 1 in decoded and decoded[1] == "coaps://as.example"
216+
with pytest.raises(VerifyError) as err:
217+
decoder.decode(token, public_key)
218+
pytest.fail("decode() should fail.")
219+
assert "Failed to validate the certificate bound to the key." in str(err.value)
220+
# decoded = decoder.decode(token, public_key)
221+
# assert 1 in decoded and decoded[1] == "coaps://as.example"
214222

215223
def test_sample_readme_signed_cwt_es256_with_another_ca_cert(self):
216224
with open(key_path("private_key_cert_es256.pem")) as f:

0 commit comments

Comments
 (0)