Skip to content

Commit 83c3be4

Browse files
authored
Bump pyhpke to v0.4.0.
Bump pyhpke to v0.4.0.
2 parents f68a8a1 + 72ed1b3 commit 83c3be4

12 files changed

Lines changed: 309 additions & 919 deletions

cwt/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from .encrypted_cose_key import EncryptedCOSEKey
1414
from .exceptions import CWTError, DecodeError, EncodeError, VerifyError
1515
from .helpers.hcert import load_pem_hcert_dsc
16-
from .hpke_cipher_suite import HPKECipherSuite
1716
from .recipient import Recipient
1817
from .signer import Signer
1918

cwt/algs/ec2.py

Lines changed: 2 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
from typing import Any, Dict, List, Optional, Tuple, Union
1+
from typing import Any, Dict, List, Optional, Union
22

33
import cryptography
4-
import pyhpke
54
from cryptography.hazmat.primitives import hashes
65
from cryptography.hazmat.primitives.asymmetric import ec
76
from cryptography.hazmat.primitives.asymmetric.ec import (
@@ -24,8 +23,7 @@
2423
COSE_KEY_TYPES,
2524
)
2625
from ..cose_key_interface import COSEKeyInterface
27-
from ..exceptions import DecodeError, EncodeError, VerifyError
28-
from ..hpke_cipher_suite import HPKECipherSuite
26+
from ..exceptions import EncodeError, VerifyError
2927
from ..utils import i2osp, os2ip, to_cis
3028
from .asymmetric import AsymmetricKey
3129
from .symmetric import AESCCMKey, AESGCMKey, ChaCha20Key, HMACKey
@@ -275,28 +273,6 @@ def verify(self, msg: bytes, sig: bytes):
275273
except ValueError as err:
276274
raise VerifyError("Invalid signature.") from err
277275

278-
def seal(self, suite: HPKECipherSuite, msg: bytes, aad: bytes = b"") -> Tuple[bytes, bytes]:
279-
# if self._alg != -1:
280-
# raise ValueError("alg should be HPKE(-1).");
281-
if self._private_key is not None:
282-
raise ValueError("Private key cannot be used for HPKE encryption.")
283-
284-
ctx = self._create_hpke_context(suite)
285-
enc, sender = ctx.setup_send(self._public_key, b"") # TODO: Add support for info
286-
return enc, sender.aead.seal(aad, msg)
287-
288-
def open(self, suite: HPKECipherSuite, enc: bytes, msg: bytes, aad: bytes = b"") -> bytes:
289-
# if self._alg != -1:
290-
# raise ValueError("alg should be HPKE(-1).");
291-
if self._private_key is None:
292-
raise ValueError("Public key cannot be used for HPKE decryption.")
293-
ctx = self._create_hpke_context(suite)
294-
try:
295-
recipient = ctx.setup_recv(enc, self._private_key, b"") # TODO: Add support for info
296-
return recipient.aead.open(aad, msg)
297-
except Exception as err:
298-
raise DecodeError("Failed to decrypt.") from err
299-
300276
def derive_key(
301277
self,
302278
context: Union[List[Any], Dict[str, Any]],
@@ -355,26 +331,3 @@ def _os_to_der(self, key_size: int, sig: bytes) -> bytes:
355331
r = os2ip(sig[:num_bytes])
356332
s = os2ip(sig[num_bytes:])
357333
return encode_dss_signature(r, s)
358-
359-
def _create_hpke_context(self, suite: HPKECipherSuite) -> Any:
360-
if self._crv == 1:
361-
if suite.kem != 0x0010:
362-
raise ValueError("KEM id should be 0x0010.")
363-
if suite.kdf == 0x0001 and suite.aead == 0x0001:
364-
return pyhpke.Suite__DHKEM_P256_HKDF_SHA256__HKDF_SHA256__AES_128_GCM
365-
if suite.kdf == 0x0001 and suite.aead == 0x0003:
366-
return pyhpke.Suite__DHKEM_P256_HKDF_SHA256__HKDF_SHA256__ChaCha20Poly1305
367-
if suite.kdf == 0x0003 and suite.aead == 0x0001:
368-
return pyhpke.Suite__DHKEM_P256_HKDF_SHA256__HKDF_SHA512__AES_128_GCM
369-
raise ValueError("Unsupported kdf/aead combination.")
370-
elif self._crv == 2:
371-
if suite.kem != 0x0011:
372-
raise ValueError("KEM id should be 0x0011.")
373-
raise ValueError("Unsupported kdf/aead combination.")
374-
elif self._crv == 3:
375-
if suite.kem != 0x0012:
376-
raise ValueError("KEM id should be 0x0012.")
377-
if suite.kdf == 0x0001 and suite.aead == 0x0001:
378-
return pyhpke.Suite__DHKEM_P521_HKDF_SHA512__HKDF_SHA512__AES_256_GCM
379-
raise ValueError("Unsupported kdf/aead combination.")
380-
raise ValueError("Invalid crv for HPKE.")

cwt/cose_key_interface.py

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
from typing import Any, Dict, List, Optional, Tuple, Union
1+
from typing import Any, Dict, List, Optional, Union
22

33
from .cbor_processor import CBORProcessor
44
from .const import (
55
COSE_KEY_OPERATION_VALUES,
66
COSE_KEY_TYPES,
77
COSE_NAMED_ALGORITHMS_SUPPORTED,
88
)
9-
from .hpke_cipher_suite import HPKECipherSuite
109

1110

1211
class COSEKeyInterface(CBORProcessor):
@@ -227,39 +226,6 @@ def decrypt(self, msg: bytes, nonce: bytes, aad: bytes) -> bytes:
227226
"""
228227
raise NotImplementedError
229228

230-
def seal(self, suite: HPKECipherSuite, msg: bytes, aad: bytes = b"") -> Tuple[bytes, bytes]:
231-
"""
232-
Encrypts the specified message with HPKE.
233-
234-
Args:
235-
msg (bytes): A message to be encrypted.
236-
aad (bytes): Additional authenticated data.
237-
Returns:
238-
Tuple[bytes, bytes]: The encapsulation key and the ciphertext respectively.
239-
Raises:
240-
NotImplementedError: Not implemented.
241-
ValueError: Invalid arguments.
242-
EncodeError: Failed to encrypt the message.
243-
"""
244-
raise NotImplementedError
245-
246-
def open(self, suite: HPKECipherSuite, enc: bytes, msg: bytes, aad: bytes = b"") -> bytes:
247-
"""
248-
Decrypts the specified message with HPKE.
249-
250-
Args:
251-
enc (bytes): An encapsulated key.
252-
msg (bytes): An encrypted message.
253-
aad (bytes): Additional authenticated data.
254-
Returns:
255-
bytes: The byte string of the decrypted data.
256-
Raises:
257-
NotImplementedError: Not implemented.
258-
ValueError: Invalid arguments.
259-
DecodeError: Failed to decrypt the message.
260-
"""
261-
raise NotImplementedError
262-
263229
def wrap_key(self, key_to_wrap: bytes) -> bytes:
264230
"""
265231
Wraps a key.

cwt/hpke_cipher_suite.py

Lines changed: 0 additions & 57 deletions
This file was deleted.

cwt/recipient_algs/hpke.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from typing import Any, Dict, List, Optional, Union
22

3+
from pyhpke import AEADId, CipherSuite, KDFId, KEMId, KEMKey, KEMKeyInterface
4+
35
from ..cose_key_interface import COSEKeyInterface
4-
from ..hpke_cipher_suite import HPKECipherSuite
6+
from ..exceptions import DecodeError, EncodeError
57
from ..recipient_interface import RecipientInterface
68

79

@@ -25,7 +27,7 @@ def __init__(
2527
raise ValueError("kdf id(2) not found in HPKE sender information(-4).")
2628
if 3 not in unprotected[-4]:
2729
raise ValueError("aead id(3) not found in HPKE sender information(-4).")
28-
self._suite = HPKECipherSuite(unprotected[-4][1], unprotected[-4][2], unprotected[-4][3])
30+
self._suite = CipherSuite.new(KEMId(unprotected[-4][1]), KDFId(unprotected[-4][2]), AEADId(unprotected[-4][3]))
2931
return
3032

3133
def apply(
@@ -39,14 +41,19 @@ def apply(
3941
raise ValueError("recipient_key should be set.")
4042

4143
self._recipient_key = recipient_key
44+
self._kem_key = self._to_kem_key(recipient_key)
4245
return self._recipient_key
4346

4447
def to_list(self, payload: bytes = b"", external_aad: bytes = b"", aad_context: str = "Enc_Recipient") -> List[Any]:
4548
enc_structure = [aad_context, self._dumps(self._protected), external_aad]
4649
aad = self._dumps(enc_structure)
47-
enc, self._ciphertext = self._recipient_key.seal(self._suite, payload, aad)
50+
enc, sender = self._suite.create_sender_context(self._kem_key)
4851
self._unprotected[-4][4] = enc
49-
return super().to_list(payload, external_aad, aad_context)
52+
try:
53+
self._ciphertext = sender.seal(payload, aad=aad)
54+
return super().to_list(payload, external_aad, aad_context)
55+
except Exception as err:
56+
raise EncodeError("Failed to seal.") from err
5057

5158
def decrypt(
5259
self,
@@ -61,4 +68,11 @@ def decrypt(
6168
) -> bytes:
6269
enc_structure = [aad_context, self._dumps(self._protected), external_aad]
6370
aad = self._dumps(enc_structure)
64-
return key.open(self._suite, self._unprotected[-4][4], self._ciphertext, aad)
71+
recipient = self._suite.create_recipient_context(self._unprotected[-4][4], self._to_kem_key(key))
72+
try:
73+
return recipient.open(self._ciphertext, aad=aad)
74+
except Exception as err:
75+
raise DecodeError("Failed to open.") from err
76+
77+
def _to_kem_key(self, src: COSEKeyInterface) -> KEMKeyInterface:
78+
return KEMKey.from_pyca_cryptography_key(src.key)

0 commit comments

Comments
 (0)