Skip to content

Commit 241b4cd

Browse files
committed
add: enable_non_aead=False to prevent unintended use of non-AEAD
1 parent 5025acb commit 241b4cd

2 files changed

Lines changed: 61 additions & 10 deletions

File tree

cwt/cose.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ def encode(
133133
signers: List[Signer] = [],
134134
external_aad: bytes = b"",
135135
out: str = "",
136+
enable_non_aead: bool = False,
136137
) -> bytes:
137138
"""
138139
Encodes COSE message with MAC, signing and encryption.
@@ -152,14 +153,21 @@ def encode(
152153
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
153154
``CBORTag`` object. If any other value is specified, it will return
154155
encoded data as bytes.
156+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
157+
(False = disabled by default). Before enable non-AEAD ciphers,
158+
read and understand Security considerations of RFC 9459 carefully.
159+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
160+
of decrypted message, make sure to deliver the encoded COSE message
161+
in conjunction with an authentication and integrity mechanisms,
162+
such as a digital signature.
155163
Returns:
156164
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
157165
cbor2.CBORTag object.
158166
Raises:
159167
ValueError: Invalid arguments.
160168
EncodeError: Failed to encode data.
161169
"""
162-
p, u = self._encode_headers(key, protected, unprotected)
170+
p, u = self._encode_headers(key, protected, unprotected, enable_non_aead)
163171
typ = self._validate_cose_message(key, p, u, recipients, signers)
164172
if typ == 0:
165173
return self._encode_and_encrypt(payload, key, p, u, recipients, external_aad, out)
@@ -177,6 +185,7 @@ def encode_and_encrypt(
177185
recipients: List[RecipientInterface] = [],
178186
external_aad: bytes = b"",
179187
out: str = "",
188+
enable_non_aead: bool = False,
180189
) -> bytes:
181190
"""
182191
Encodes data with encryption.
@@ -194,14 +203,21 @@ def encode_and_encrypt(
194203
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
195204
``CBORTag`` object. If any other value is specified, it will return
196205
encoded data as bytes.
206+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
207+
(False = disabled by default). Before enable non-AEAD ciphers,
208+
read and understand Security considerations of RFC 9459 carefully.
209+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
210+
of decrypted message, make sure to deliver the encoded COSE message
211+
in conjunction with an authentication and integrity mechanisms,
212+
such as a digital signature.
197213
Returns:
198214
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
199215
cbor2.CBORTag object.
200216
Raises:
201217
ValueError: Invalid arguments.
202218
EncodeError: Failed to encode data.
203219
"""
204-
p, u = self._encode_headers(key, protected, unprotected)
220+
p, u = self._encode_headers(key, protected, unprotected, enable_non_aead)
205221
typ = self._validate_cose_message(key, p, u, recipients, [])
206222
if typ != 0:
207223
raise ValueError("The COSE message is not suitable for COSE Encrypt0/Encrypt.")
@@ -237,7 +253,7 @@ def encode_and_mac(
237253
ValueError: Invalid arguments.
238254
EncodeError: Failed to encode data.
239255
"""
240-
p, u = self._encode_headers(key, protected, unprotected)
256+
p, u = self._encode_headers(key, protected, unprotected, False)
241257
typ = self._validate_cose_message(key, p, u, recipients, [])
242258
if typ != 1:
243259
raise ValueError("The COSE message is not suitable for COSE MAC0/MAC.")
@@ -279,7 +295,7 @@ def encode_and_sign(
279295
ValueError: Invalid arguments.
280296
EncodeError: Failed to encode data.
281297
"""
282-
p, u = self._encode_headers(key, protected, unprotected)
298+
p, u = self._encode_headers(key, protected, unprotected, False)
283299
typ = self._validate_cose_message(key, p, u, [], signers)
284300
if typ != 2:
285301
raise ValueError("The COSE message is not suitable for COSE Sign0/Sign.")
@@ -292,6 +308,7 @@ def decode(
292308
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
293309
external_aad: bytes = b"",
294310
detached_payload: Optional[bytes] = None,
311+
enable_non_aead: bool = False,
295312
) -> bytes:
296313
"""
297314
Verifies and decodes COSE data, and returns only payload.
@@ -306,14 +323,19 @@ def decode(
306323
external_aad(bytes): External additional authenticated data supplied by
307324
application.
308325
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
326+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
327+
(False = disabled by default). Before enable non-AEAD ciphers,
328+
read and understand Security considerations of RFC 9459 carefully.
329+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
330+
of decrypted message, make sure to validate them outside of this library.
309331
Returns:
310332
bytes: A byte string of decoded payload.
311333
Raises:
312334
ValueError: Invalid arguments.
313335
DecodeError: Failed to decode data.
314336
VerifyError: Failed to verify data.
315337
"""
316-
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload)
338+
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload, enable_non_aead)
317339
return res
318340

319341
def decode_with_headers(
@@ -323,6 +345,7 @@ def decode_with_headers(
323345
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
324346
external_aad: bytes = b"",
325347
detached_payload: Optional[bytes] = None,
348+
enable_non_aead: bool = False,
326349
) -> Tuple[Dict[int, Any], Dict[int, Any], bytes]:
327350
"""
328351
Verifies and decodes COSE data, and returns protected headers, unprotected headers and payload.
@@ -337,6 +360,11 @@ def decode_with_headers(
337360
external_aad(bytes): External additional authenticated data supplied by
338361
application.
339362
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
363+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
364+
(False = disabled by default). Before enable non-AEAD ciphers,
365+
read and understand Security considerations of RFC 9459 carefully.
366+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
367+
of decrypted message, make sure to validate them outside of this library.
340368
Returns:
341369
Tuple[Dict[int, Any], Dict[int, Any], bytes]: A dictionary data of decoded protected headers, and a dictionary data of unprotected headers, and a byte string of decoded payload.
342370
Raises:
@@ -395,6 +423,8 @@ def decode_with_headers(
395423
# raise ValueError("unprotected header should be dict.")
396424
p, u = self._decode_headers(data.value[0], data.value[1])
397425
alg = p[1] if 1 in p else u.get(1, 0)
426+
if enable_non_aead is False and alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
427+
raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.")
398428

399429
# Local variable `protected` is byte encoded protected header
400430
# Sender is allowed to encode empty protected header into a bstr-wrapped zero-length map << {} >> (0x40A0)
@@ -549,6 +579,7 @@ def _encode_headers(
549579
key: Optional[COSEKeyInterface],
550580
protected: Optional[dict],
551581
unprotected: Optional[dict],
582+
enable_non_aead: bool,
552583
) -> Tuple[Dict[int, Any], Dict[int, Any]]:
553584
p = to_cose_header(protected)
554585
u = to_cose_header(unprotected)
@@ -564,8 +595,11 @@ def _encode_headers(
564595
# Check the protected header is empty if the algorithm is non AEAD (AES-CBC or AES-CTR)
565596
# because section 4 of RFC9459 says "The 'protected' header MUST be a zero-length byte string."
566597
alg = p[1] if 1 in p else u.get(1, 0)
567-
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values() and len(p) > 0:
568-
raise ValueError("protected header MUST be zero-length")
598+
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
599+
if enable_non_aead is False:
600+
raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.")
601+
if len(p) > 0:
602+
raise ValueError("protected header MUST be zero-length")
569603
return p, u
570604

571605
def _decode_headers(self, protected: Any, unprotected: Any) -> Tuple[Dict[int, Any], Dict[int, Any]]:

tests/test_recipient.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,19 @@ def test_recipients_aes(self, kw_alg, enc_alg):
797797
kw_key = COSEKey.from_symmetric_key(alg=kw_alg)
798798
enc_key = COSEKey.from_symmetric_key(alg=enc_alg)
799799

800+
# The sender side (must fail):
801+
r = Recipient.new(unprotected={"alg": kw_alg}, sender_key=kw_key)
802+
sender = COSE.new(alg_auto_inclusion=True)
803+
with pytest.raises(ValueError) as err:
804+
encoded = sender.encode_and_encrypt(
805+
b"Hello world!",
806+
enc_key,
807+
recipients=[r],
808+
enable_non_aead=False,
809+
)
810+
pytest.fail("encode_and_encrypt() should fail.")
811+
assert "Deprecated non-AEAD algorithm" in str(err.value)
812+
800813
# The sender side (must fail):
801814
with pytest.raises(ValueError) as err:
802815
r = Recipient.new(protected={"alg": kw_alg}, sender_key=kw_key)
@@ -812,6 +825,7 @@ def test_recipients_aes(self, kw_alg, enc_alg):
812825
enc_key,
813826
protected={"kid": "actually-not-protected"},
814827
recipients=[r],
828+
enable_non_aead=True,
815829
)
816830
pytest.fail("encode_and_encrypt() should fail.")
817831
assert "protected header MUST be zero-length" in str(err.value)
@@ -823,11 +837,12 @@ def test_recipients_aes(self, kw_alg, enc_alg):
823837
b"Hello world!",
824838
enc_key,
825839
recipients=[r],
840+
enable_non_aead=True,
826841
)
827842

828843
# The recipient side:
829844
recipient = COSE.new()
830-
assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key])
845+
assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key], enable_non_aead=True)
831846

832847
@pytest.mark.parametrize(
833848
"enc_alg",
@@ -859,9 +874,10 @@ def test_recipients_hpke(self, rsk1, rsk2, enc_alg):
859874
enc_key,
860875
unprotected={"alg": enc_alg},
861876
recipients=[r],
877+
enable_non_aead=True,
862878
)
863879
recipient = COSE.new()
864-
assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2])
880+
assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2], enable_non_aead=True)
865881

866882
@pytest.mark.parametrize(
867883
"key_agreement_alg, key_agreement_alg_id, kw_alg, enc_alg",
@@ -915,6 +931,7 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al
915931
protected={},
916932
unprotected={"alg": enc_alg, "iv": nonce},
917933
recipients=[r],
934+
enable_non_aead=True,
918935
)
919936

920937
# The recipient side:
@@ -930,4 +947,4 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al
930947
}
931948
)
932949
recipient = COSE.new()
933-
assert b"Hello world!" == recipient.decode(encoded, rsk2, context)
950+
assert b"Hello world!" == recipient.decode(encoded, rsk2, context, enable_non_aead=True)

0 commit comments

Comments
 (0)