Skip to content

Commit 88a085a

Browse files
authored
Add support for HPKE key wrapping.
Add support for HPKE key wrapping.
2 parents e51f001 + fec2e3a commit 88a085a

12 files changed

Lines changed: 121 additions & 38 deletions

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ Create a COSE-HPKE Encrypt message and decrypt it as follows:
623623
from cwt import COSE, COSEKey, Recipient
624624

625625
# The sender side:
626+
enc_key = COSEKey.from_symmetric_key(alg="A128GCM")
626627
rpk = COSEKey.from_jwk(
627628
{
628629
"kty": "EC",
@@ -645,10 +646,13 @@ r = Recipient.new(
645646
},
646647
},
647648
)
648-
r.apply(recipient_key=rpk)
649+
r.encode(enc_key.key, recipient_key=rpk)
649650
sender = COSE.new()
650651
encoded = sender.encode_and_encrypt(
651652
b"This is the content.",
653+
protected={
654+
1: 1, # alg: "A128GCM"
655+
},
652656
recipients=[r],
653657
)
654658

cwt/cose.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ def encode_and_encrypt(
169169
if not recipients:
170170
if 1 in p and p[1] == -1: # HPKE
171171
hpke = HPKE(p, u)
172-
hpke.apply(recipient_key=key)
173-
res = CBORTag(16, hpke.to_list(payload, external_aad, "Encrypt0"))
172+
hpke.encode(payload, recipient_key=key, external_aad=external_aad, aad_context="Encrypt0")
173+
res = CBORTag(16, hpke.to_list())
174174
return res if out == "cbor2/CBORTag" else self._dumps(res)
175175
if key is None:
176176
raise ValueError("key should be set.")
@@ -412,7 +412,7 @@ def decode(
412412
try:
413413
if not isinstance(protected, bytes) and alg == -1: # HPKE
414414
hpke = HPKE(protected, unprotected, data.value[2])
415-
return hpke.decrypt(k, external_aad=external_aad, aad_context="Encrypt0")
415+
return hpke.decode(k, external_aad=external_aad, aad_context="Encrypt0")
416416
return k.decrypt(data.value[2], nonce, aad)
417417
except Exception as e:
418418
err = e

cwt/recipient_algs/aes_key_wrap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ def apply(
4141
recipient_key: Optional[COSEKeyInterface] = None,
4242
salt: Optional[bytes] = None,
4343
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
44+
external_aad: bytes = b"",
45+
aad_context: str = "Enc_Recipient",
4446
) -> COSEKeyInterface:
4547
if not key:
4648
raise ValueError("key should be set.")

cwt/recipient_algs/direct_hkdf.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ def apply(
8888
recipient_key: Optional[COSEKeyInterface] = None,
8989
salt: Optional[bytes] = None,
9090
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
91+
external_aad: bytes = b"",
92+
aad_context: str = "Enc_Recipient",
9193
) -> COSEKeyInterface:
9294

9395
if not key:

cwt/recipient_algs/direct_key.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ def apply(
2323
recipient_key: Optional[COSEKeyInterface] = None,
2424
salt: Optional[bytes] = None,
2525
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
26+
external_aad: bytes = b"",
27+
aad_context: str = "Enc_Recipient",
2628
) -> COSEKeyInterface:
2729
if not key:
2830
raise ValueError("key should be set.")

cwt/recipient_algs/ecdh_aes_key_wrap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ def apply(
5555
recipient_key: Optional[COSEKeyInterface] = None,
5656
salt: Optional[bytes] = None,
5757
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
58+
external_aad: bytes = b"",
59+
aad_context: str = "Enc_Recipient",
5860
) -> COSEKeyInterface:
5961

6062
if not key:

cwt/recipient_algs/ecdh_direct_hkdf.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ def apply(
6464
recipient_key: Optional[COSEKeyInterface] = None,
6565
salt: Optional[bytes] = None,
6666
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
67+
external_aad: bytes = b"",
68+
aad_context: str = "Enc_Recipient",
6769
) -> COSEKeyInterface:
6870

6971
if not self._sender_key:

cwt/recipient_algs/hpke.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pyhpke import AEADId, CipherSuite, KDFId, KEMId, KEMKey, KEMKeyInterface
44

5+
from ..cose_key import COSEKey
56
from ..cose_key_interface import COSEKeyInterface
67
from ..exceptions import DecodeError, EncodeError
78
from ..recipient_interface import RecipientInterface
@@ -30,49 +31,58 @@ def __init__(
3031
self._suite = CipherSuite.new(KEMId(unprotected[-4][1]), KDFId(unprotected[-4][2]), AEADId(unprotected[-4][3]))
3132
return
3233

33-
def apply(
34+
def encode(
3435
self,
35-
key: Optional[COSEKeyInterface] = None,
36+
plaintext: bytes,
3637
recipient_key: Optional[COSEKeyInterface] = None,
3738
salt: Optional[bytes] = None,
3839
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
39-
) -> COSEKeyInterface:
40+
external_aad: bytes = b"",
41+
aad_context: str = "Enc_Recipient",
42+
) -> Optional[COSEKeyInterface]:
4043
if not recipient_key:
4144
raise ValueError("recipient_key should be set.")
42-
4345
self._recipient_key = recipient_key
4446
self._kem_key = self._to_kem_key(recipient_key)
45-
return self._recipient_key
46-
47-
def to_list(self, payload: bytes = b"", external_aad: bytes = b"", aad_context: str = "Enc_Recipient") -> List[Any]:
4847
enc_structure = [aad_context, self._dumps(self._protected), external_aad]
4948
aad = self._dumps(enc_structure)
50-
enc, sender = self._suite.create_sender_context(self._kem_key)
51-
self._unprotected[-4][4] = enc
5249
try:
53-
self._ciphertext = sender.seal(payload, aad=aad)
54-
return super().to_list(payload, external_aad, aad_context)
50+
enc, ctx = self._suite.create_sender_context(self._kem_key)
51+
self._unprotected[-4][4] = enc
52+
self._ciphertext = ctx.seal(plaintext, aad=aad)
5553
except Exception as err:
5654
raise EncodeError("Failed to seal.") from err
55+
return None
5756

58-
def decrypt(
57+
def decode(
5958
self,
6059
key: COSEKeyInterface,
61-
alg: Optional[int] = None,
6260
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
63-
payload: bytes = b"",
64-
nonce: bytes = b"",
65-
aad: bytes = b"",
6661
external_aad: bytes = b"",
6762
aad_context: str = "Enc_Recipient",
6863
) -> bytes:
6964
enc_structure = [aad_context, self._dumps(self._protected), external_aad]
7065
aad = self._dumps(enc_structure)
71-
recipient = self._suite.create_recipient_context(self._unprotected[-4][4], self._to_kem_key(key))
7266
try:
73-
return recipient.open(self._ciphertext, aad=aad)
67+
ctx = self._suite.create_recipient_context(self._unprotected[-4][4], self._to_kem_key(key))
68+
return ctx.open(self._ciphertext, aad=aad)
7469
except Exception as err:
7570
raise DecodeError("Failed to open.") from err
7671

72+
def decrypt(
73+
self,
74+
key: COSEKeyInterface,
75+
alg: Optional[int] = None,
76+
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
77+
payload: bytes = b"",
78+
nonce: bytes = b"",
79+
aad: bytes = b"",
80+
external_aad: bytes = b"",
81+
aad_context: str = "Enc_Recipient",
82+
) -> bytes:
83+
alg = alg if isinstance(alg, int) else 0
84+
raw = self.decode(key, context, external_aad, aad_context)
85+
return COSEKey.from_symmetric_key(raw, alg=alg, kid=self._kid).decrypt(payload, nonce, aad)
86+
7787
def _to_kem_key(self, src: COSEKeyInterface) -> KEMKeyInterface:
7888
return KEMKey.from_pyca_cryptography_key(src.key)

cwt/recipient_interface.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,56 @@ def to_list(self, payload: bytes = b"", external_aad: bytes = b"", aad_context:
154154
res.append(children)
155155
return res
156156

157+
def encode(
158+
self,
159+
plaintext: bytes,
160+
recipient_key: Optional[COSEKeyInterface] = None,
161+
salt: Optional[bytes] = None,
162+
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
163+
external_aad: bytes = b"",
164+
aad_context: str = "Enc_Recipient",
165+
) -> Optional[COSEKeyInterface]:
166+
"""
167+
Encodes a specified plaintext to a ciphertext with the recipient-specific
168+
method (e.g., key wrapping, key agreement, or the combination of them)
169+
and sets up the related information (context information or ciphertext)
170+
in the recipient structure.
171+
Therefore, it will be used by the sender of the recipient information
172+
before calling COSE.encode_* functions with the Recipient object. The
173+
key generated through this function will be set to ``key`` parameter
174+
of COSE.encode_* functions.
175+
176+
Args:
177+
plaintext (bytes): A plaing text to be encrypted. In most of the cases,
178+
the plaintext is a byte string of a content encryption key.
179+
recipient_key (Optional[COSEKeyInterface]): The external public
180+
key provided by the recipient used for ECDH key agreement, HPKE, etc.
181+
salt (Optional[bytes]): A salt used for deriving a key.
182+
context (Optional[Union[List[Any], Dict[str, Any]]]): Context
183+
information structure.
184+
external_aad (bytes): External additional authenticated data for AEAD.
185+
aad_context (bytes): An additional authenticated data context to build
186+
an Enc_structure internally.
187+
Returns:
188+
Optional[COSEKeyInterface]: A generated key or passed-through key
189+
which is used as ``key`` parameter of COSE.encode_* functions.
190+
Raises:
191+
ValueError: Invalid arguments.
192+
EncodeError: Failed to encode(e.g., wrap, derive) the key.
193+
"""
194+
raise NotImplementedError
195+
157196
def apply(
158197
self,
159198
key: Optional[COSEKeyInterface] = None,
160199
recipient_key: Optional[COSEKeyInterface] = None,
161200
salt: Optional[bytes] = None,
162201
context: Optional[Union[List[Any], Dict[str, Any]]] = None,
202+
external_aad: bytes = b"",
203+
aad_context: str = "Enc_Recipient",
163204
) -> COSEKeyInterface:
164205
"""
165-
Applies a COSEKey as a material to prepare a MAC/encryption key with
206+
[DEPRECATED] Applies a COSEKey as a material to prepare a MAC/encryption key with
166207
the recipient-specific method (e.g., key wrapping, key agreement,
167208
or the combination of them) and sets up the related information
168209
(context information or ciphertext) in the recipient structure.
@@ -179,6 +220,9 @@ def apply(
179220
salt (Optional[bytes]): A salt used for deriving a key.
180221
context (Optional[Union[List[Any], Dict[str, Any]]]): Context
181222
information structure.
223+
external_aad (bytes): External additional authenticated data for AEAD.
224+
aad_context (bytes): An additional authenticated data context to build
225+
an Enc_structure internally.
182226
Returns:
183227
COSEKeyInterface: A generated key or passed-throug key which is used
184228
as ``key`` parameter of COSE.encode_* functions.
@@ -233,7 +277,9 @@ def decrypt(
233277
key (COSEKeyInterface): The external key to be used for extracting the key.
234278
alg (Optional[int]): The algorithm of the key extracted.
235279
context (Optional[Union[List[Any], Dict[str, Any]]]): Context information structure.
236-
external_aad (bytes): External additional authenticated data.
280+
external_aad (bytes): External additional authenticated data for AEAD.
281+
aad_context (bytes): An additional authenticated data context to build
282+
an Enc_structure internally.
237283
Returns:
238284
bytes: The decrypted plain text.
239285
Raises:

tests/test_cose_sample.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ def test_cose_usage_examples_cose_encrypt(self):
421421
def test_cose_usage_examples_cose_encrypt_hpke(self):
422422

423423
# The sender side:
424+
enc_key = COSEKey.from_symmetric_key(alg="A128GCM")
424425
rpk = COSEKey.from_jwk(
425426
{
426427
"kty": "EC",
@@ -443,10 +444,11 @@ def test_cose_usage_examples_cose_encrypt_hpke(self):
443444
},
444445
},
445446
)
446-
r.apply(recipient_key=rpk)
447-
sender = COSE.new()
447+
r.encode(enc_key.key, recipient_key=rpk)
448+
sender = COSE.new(alg_auto_inclusion=True)
448449
encoded = sender.encode_and_encrypt(
449450
b"This is the content.",
451+
key=enc_key,
450452
recipients=[r],
451453
)
452454

@@ -469,6 +471,7 @@ def test_cose_usage_examples_cose_encrypt_hpke(self):
469471
def test_cose_usage_examples_cose_encrypt_hpke_with_1st_layer_hpke(self):
470472

471473
# The sender side:
474+
enc_key = COSEKey.from_symmetric_key(alg="A128GCM")
472475
rpk = COSEKey.from_jwk(
473476
{
474477
"kty": "EC",
@@ -491,7 +494,7 @@ def test_cose_usage_examples_cose_encrypt_hpke_with_1st_layer_hpke(self):
491494
},
492495
},
493496
)
494-
r.apply(recipient_key=rpk)
497+
r.encode(enc_key.key, recipient_key=rpk)
495498
sender = COSE.new()
496499
with pytest.raises(ValueError) as err:
497500
sender.encode_and_encrypt(
@@ -515,6 +518,7 @@ def test_cose_usage_examples_cose_encrypt_hpke_with_1st_layer_hpke(self):
515518
def test_cose_usage_examples_cose_encrypt_hpke_with_nonce(self):
516519

517520
# The sender side:
521+
enc_key = COSEKey.from_symmetric_key(alg="A128GCM")
518522
rpk = COSEKey.from_jwk(
519523
{
520524
"kty": "EC",
@@ -537,7 +541,7 @@ def test_cose_usage_examples_cose_encrypt_hpke_with_nonce(self):
537541
},
538542
},
539543
)
540-
r.apply(recipient_key=rpk)
544+
r.encode(enc_key.key, recipient_key=rpk)
541545
sender = COSE.new()
542546
with pytest.raises(ValueError) as err:
543547
sender.encode_and_encrypt(

0 commit comments

Comments
 (0)