Skip to content

Commit 0452202

Browse files
authored
Merge pull request #575 from dajiaji/disable-non-aead-by-default
Add enable_non_aead=False option for encode & decode
2 parents 6ced08f + 4c03145 commit 0452202

2 files changed

Lines changed: 68 additions & 10 deletions

File tree

cwt/cose.py

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ def encode(
141141
signers: List[Signer] = [],
142142
external_aad: bytes = b"",
143143
out: str = "",
144+
enable_non_aead: bool = False,
144145
) -> bytes:
145146
"""
146147
Encodes COSE message with MAC, signing and encryption.
@@ -160,14 +161,21 @@ def encode(
160161
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
161162
``CBORTag`` object. If any other value is specified, it will return
162163
encoded data as bytes.
164+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
165+
(False = disabled by default). Before enable non-AEAD ciphers,
166+
read and understand Security considerations of RFC 9459 carefully.
167+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
168+
of decrypted message, make sure to deliver the encoded COSE message
169+
in conjunction with an authentication and integrity mechanisms,
170+
such as a digital signature.
163171
Returns:
164172
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
165173
cbor2.CBORTag object.
166174
Raises:
167175
ValueError: Invalid arguments.
168176
EncodeError: Failed to encode data.
169177
"""
170-
p, u = self._encode_headers(key, protected, unprotected)
178+
p, u = self._encode_headers(key, protected, unprotected, enable_non_aead)
171179
typ = self._validate_cose_message(key, p, u, recipients, signers)
172180
if typ == 0:
173181
return self._encode_and_encrypt(payload, key, p, u, recipients, external_aad, out)
@@ -185,6 +193,7 @@ def encode_and_encrypt(
185193
recipients: List[RecipientInterface] = [],
186194
external_aad: bytes = b"",
187195
out: str = "",
196+
enable_non_aead: bool = False,
188197
) -> bytes:
189198
"""
190199
Encodes data with encryption.
@@ -202,14 +211,21 @@ def encode_and_encrypt(
202211
data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s
203212
``CBORTag`` object. If any other value is specified, it will return
204213
encoded data as bytes.
214+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
215+
(False = disabled by default). Before enable non-AEAD ciphers,
216+
read and understand Security considerations of RFC 9459 carefully.
217+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
218+
of decrypted message, make sure to deliver the encoded COSE message
219+
in conjunction with an authentication and integrity mechanisms,
220+
such as a digital signature.
205221
Returns:
206222
Union[bytes, CBORTag]: A byte string of the encoded COSE or a
207223
cbor2.CBORTag object.
208224
Raises:
209225
ValueError: Invalid arguments.
210226
EncodeError: Failed to encode data.
211227
"""
212-
p, u = self._encode_headers(key, protected, unprotected)
228+
p, u = self._encode_headers(key, protected, unprotected, enable_non_aead)
213229
typ = self._validate_cose_message(key, p, u, recipients, [])
214230
if typ != 0:
215231
raise ValueError("The COSE message is not suitable for COSE Encrypt0/Encrypt.")
@@ -245,7 +261,7 @@ def encode_and_mac(
245261
ValueError: Invalid arguments.
246262
EncodeError: Failed to encode data.
247263
"""
248-
p, u = self._encode_headers(key, protected, unprotected)
264+
p, u = self._encode_headers(key, protected, unprotected, False)
249265
typ = self._validate_cose_message(key, p, u, recipients, [])
250266
if typ != 1:
251267
raise ValueError("The COSE message is not suitable for COSE MAC0/MAC.")
@@ -287,7 +303,7 @@ def encode_and_sign(
287303
ValueError: Invalid arguments.
288304
EncodeError: Failed to encode data.
289305
"""
290-
p, u = self._encode_headers(key, protected, unprotected)
306+
p, u = self._encode_headers(key, protected, unprotected, False)
291307
typ = self._validate_cose_message(key, p, u, [], signers)
292308
if typ != 2:
293309
raise ValueError("The COSE message is not suitable for COSE Sign0/Sign.")
@@ -300,6 +316,7 @@ def decode(
300316
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
301317
external_aad: bytes = b"",
302318
detached_payload: Optional[bytes] = None,
319+
enable_non_aead: bool = False,
303320
) -> bytes:
304321
"""
305322
Verifies and decodes COSE data, and returns only payload.
@@ -314,14 +331,19 @@ def decode(
314331
external_aad(bytes): External additional authenticated data supplied by
315332
application.
316333
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
334+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
335+
(False = disabled by default). Before enable non-AEAD ciphers,
336+
read and understand Security considerations of RFC 9459 carefully.
337+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
338+
of decrypted message, make sure to validate them outside of this library.
317339
Returns:
318340
bytes: A byte string of decoded payload.
319341
Raises:
320342
ValueError: Invalid arguments.
321343
DecodeError: Failed to decode data.
322344
VerifyError: Failed to verify data.
323345
"""
324-
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload)
346+
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload, enable_non_aead)
325347
return res
326348

327349
def decode_with_headers(
@@ -331,6 +353,7 @@ def decode_with_headers(
331353
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
332354
external_aad: bytes = b"",
333355
detached_payload: Optional[bytes] = None,
356+
enable_non_aead: bool = False,
334357
) -> Tuple[Dict[int, Any], Dict[int, Any], bytes]:
335358
"""
336359
Verifies and decodes COSE data, and returns protected headers, unprotected headers and payload.
@@ -345,6 +368,11 @@ def decode_with_headers(
345368
external_aad(bytes): External additional authenticated data supplied by
346369
application.
347370
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
371+
enable_non_aead (bool): Enable non-AEAD content ecnryption algorithms
372+
(False = disabled by default). Before enable non-AEAD ciphers,
373+
read and understand Security considerations of RFC 9459 carefully.
374+
Since non-AEAD ciphers DO NOT provide neither authentication nor integrity
375+
of decrypted message, make sure to validate them outside of this library.
348376
Returns:
349377
Tuple[Dict[int, Any], Dict[int, Any], bytes]: A dictionary data of decoded protected headers, and a dictionary data of unprotected headers, and a byte string of decoded payload.
350378
Raises:
@@ -403,6 +431,8 @@ def decode_with_headers(
403431
# raise ValueError("unprotected header should be dict.")
404432
p, u = self._decode_headers(data.value[0], data.value[1])
405433
alg = p[1] if 1 in p else u.get(1, 0)
434+
if enable_non_aead is False and alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
435+
raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.")
406436

407437
# Local variable `protected` is byte encoded protected header
408438
# Sender is allowed to encode empty protected header into a bstr-wrapped zero-length map << {} >> (0x40A0)
@@ -557,6 +587,7 @@ def _encode_headers(
557587
key: Optional[COSEKeyInterface],
558588
protected: Optional[dict],
559589
unprotected: Optional[dict],
590+
enable_non_aead: bool,
560591
) -> Tuple[Dict[int, Any], Dict[int, Any]]:
561592
p = to_cose_header(protected)
562593
u = to_cose_header(unprotected)
@@ -577,8 +608,11 @@ def _encode_headers(
577608
# Check the protected header is empty if the algorithm is non AEAD (AES-CBC or AES-CTR)
578609
# because section 4 of RFC9459 says "The 'protected' header MUST be a zero-length byte string."
579610
alg = p[1] if 1 in p else u.get(1, 0)
580-
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values() and len(p) > 0:
581-
raise ValueError("protected header MUST be zero-length")
611+
if alg in COSE_ALGORITHMS_CEK_NON_AEAD.values():
612+
if enable_non_aead is False:
613+
raise ValueError(f"Deprecated non-AEAD algorithm: {alg}.")
614+
if len(p) > 0:
615+
raise ValueError("protected header MUST be zero-length")
582616
return p, u
583617

584618
def _decode_headers(self, protected: Any, unprotected: Any) -> Tuple[Dict[int, Any], Dict[int, Any]]:

tests/test_recipient.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,19 @@ def test_recipients_aes(self, kw_alg, enc_alg):
797797
kw_key = COSEKey.from_symmetric_key(alg=kw_alg)
798798
enc_key = COSEKey.from_symmetric_key(alg=enc_alg)
799799

800+
# The sender side (must fail):
801+
r = Recipient.new(unprotected={"alg": kw_alg}, sender_key=kw_key)
802+
sender = COSE.new(alg_auto_inclusion=True)
803+
with pytest.raises(ValueError) as err:
804+
encoded = sender.encode_and_encrypt(
805+
b"Hello world!",
806+
enc_key,
807+
recipients=[r],
808+
enable_non_aead=False,
809+
)
810+
pytest.fail("encode_and_encrypt() should fail.")
811+
assert "Deprecated non-AEAD algorithm" in str(err.value)
812+
800813
# The sender side (must fail):
801814
with pytest.raises(ValueError) as err:
802815
r = Recipient.new(protected={"alg": kw_alg}, sender_key=kw_key)
@@ -812,6 +825,7 @@ def test_recipients_aes(self, kw_alg, enc_alg):
812825
enc_key,
813826
protected={"kid": "actually-not-protected"},
814827
recipients=[r],
828+
enable_non_aead=True,
815829
)
816830
pytest.fail("encode_and_encrypt() should fail.")
817831
assert "protected header MUST be zero-length" in str(err.value)
@@ -823,11 +837,19 @@ def test_recipients_aes(self, kw_alg, enc_alg):
823837
b"Hello world!",
824838
enc_key,
825839
recipients=[r],
840+
enable_non_aead=True,
826841
)
827842

843+
# The recipient side (must fail):
844+
recipient = COSE.new()
845+
with pytest.raises(ValueError) as err:
846+
_ = recipient.decode(encoded, keys=[kw_key]) # the option enable_non_aead=False by default
847+
pytest.fail("decode() should fail for non-AEAD without enable_non_aead=True.")
848+
assert f"Deprecated non-AEAD algorithm: {enc_key._alg}." == str(err.value)
849+
828850
# The recipient side:
829851
recipient = COSE.new()
830-
assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key])
852+
assert b"Hello world!" == recipient.decode(encoded, keys=[kw_key], enable_non_aead=True)
831853

832854
@pytest.mark.parametrize(
833855
"enc_alg",
@@ -859,9 +881,10 @@ def test_recipients_hpke(self, rsk1, rsk2, enc_alg):
859881
enc_key,
860882
unprotected={"alg": enc_alg},
861883
recipients=[r],
884+
enable_non_aead=True,
862885
)
863886
recipient = COSE.new()
864-
assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2])
887+
assert b"This is the content." == recipient.decode(encoded, [rsk1, rsk2], enable_non_aead=True)
865888

866889
@pytest.mark.parametrize(
867890
"key_agreement_alg, key_agreement_alg_id, kw_alg, enc_alg",
@@ -915,6 +938,7 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al
915938
protected={},
916939
unprotected={"alg": enc_alg, "iv": nonce},
917940
recipients=[r],
941+
enable_non_aead=True,
918942
)
919943

920944
# The recipient side:
@@ -930,4 +954,4 @@ def test_recipients_ecdh_es(self, key_agreement_alg, key_agreement_alg_id, kw_al
930954
}
931955
)
932956
recipient = COSE.new()
933-
assert b"Hello world!" == recipient.decode(encoded, rsk2, context)
957+
assert b"Hello world!" == recipient.decode(encoded, rsk2, context, enable_non_aead=True)

0 commit comments

Comments
 (0)