Skip to content

Commit afc9fbc

Browse files
authored
Merge pull request #451 from dajiaji/refine-handling-detached-payload
Add detached_payload parameter to sign/verify funcs.
2 parents a158c97 + 8782938 commit afc9fbc

4 files changed

Lines changed: 282 additions & 42 deletions

File tree

cwt/cose.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ def decode(
290290
keys: Union[COSEKeyInterface, List[COSEKeyInterface]],
291291
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
292292
external_aad: bytes = b"",
293+
detached_payload: Optional[bytes] = None,
293294
) -> bytes:
294295
"""
295296
Verifies and decodes COSE data, and returns only payload.
@@ -303,14 +304,15 @@ def decode(
303304
structure for key deriviation functions.
304305
external_aad(bytes): External additional authenticated data supplied by
305306
application.
307+
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
306308
Returns:
307309
bytes: A byte string of decoded payload.
308310
Raises:
309311
ValueError: Invalid arguments.
310312
DecodeError: Failed to decode data.
311313
VerifyError: Failed to verify data.
312314
"""
313-
_, _, res = self.decode_with_headers(data, keys, context, external_aad)
315+
_, _, res = self.decode_with_headers(data, keys, context, external_aad, detached_payload)
314316
return res
315317

316318
def decode_with_headers(
@@ -319,6 +321,7 @@ def decode_with_headers(
319321
keys: Union[COSEKeyInterface, List[COSEKeyInterface]],
320322
context: Optional[Union[Dict[str, Any], List[Any]]] = None,
321323
external_aad: bytes = b"",
324+
detached_payload: Optional[bytes] = None,
322325
) -> Tuple[Dict[int, Any], Dict[int, Any], bytes]:
323326
"""
324327
Verifies and decodes COSE data, and returns protected headers, unprotected headers and payload.
@@ -332,6 +335,7 @@ def decode_with_headers(
332335
structure for key deriviation functions.
333336
external_aad(bytes): External additional authenticated data supplied by
334337
application.
338+
detached_payload (Optional[bytes]): The detached payload that should be verified with data.
335339
Returns:
336340
Tuple[Dict[int, Any], Dict[int, Any], bytes]: A dictionary data of decoded protected headers, and a dictionary data of unprotected headers, and a byte string of decoded payload.
337341
Raises:
@@ -376,6 +380,14 @@ def decode_with_headers(
376380
else:
377381
raise ValueError(f"Unsupported or unknown CBOR tag({data.tag}).")
378382

383+
payload = data.value[2]
384+
if detached_payload is not None:
385+
if data.value[2] is not None:
386+
raise ValueError("The payload already exists.")
387+
payload = detached_payload
388+
if payload is None:
389+
raise ValueError("detached_payload should be set.")
390+
379391
# protected: Union[Dict[int, Any], bytes] = self._loads(data.value[0]) if data.value[0] else b""
380392
# unprotected = data.value[1]
381393
# if not isinstance(unprotected, dict):
@@ -396,18 +408,18 @@ def decode_with_headers(
396408
continue
397409
try:
398410
if not isinstance(p, bytes) and alg in COSE_ALGORITHMS_HPKE.values(): # HPKE
399-
hpke = HPKE(p, u, data.value[2])
411+
hpke = HPKE(p, u, payload)
400412
res = hpke.decode(k, aad)
401413
if not isinstance(res, bytes):
402414
raise TypeError("Internal type error.")
403415
return p, u, res
404-
return p, u, k.decrypt(data.value[2], nonce, aad)
416+
return p, u, k.decrypt(payload, nonce, aad)
405417
except Exception as e:
406418
err = e
407419
raise err
408420
for _, k in enumerate(keys):
409421
try:
410-
return p, u, k.decrypt(data.value[2], nonce, aad)
422+
return p, u, k.decrypt(payload, nonce, aad)
411423
except Exception as e:
412424
err = e
413425
raise err
@@ -418,42 +430,42 @@ def decode_with_headers(
418430
nonce = u.get(5, b"")
419431
enc_key = rs.derive_key(keys, alg, external_aad, "Enc_Recipient")
420432
aad = self._dumps(["Encrypt", data.value[0], external_aad])
421-
return p, u, enc_key.decrypt(data.value[2], nonce, aad)
433+
return p, u, enc_key.decrypt(payload, nonce, aad)
422434

423435
# MAC0
424436
if data.tag == 17:
425437
kid = self._get_kid(p, u)
426-
msg = self._dumps(["MAC0", data.value[0], external_aad, data.value[2]])
438+
msg = self._dumps(["MAC0", data.value[0], external_aad, payload])
427439
if kid:
428440
for _, k in enumerate(keys):
429441
if k.kid != kid:
430442
continue
431443
try:
432444
k.verify(msg, data.value[3])
433-
return p, u, data.value[2]
445+
return p, u, payload
434446
except Exception as e:
435447
err = e
436448
raise err
437449
for _, k in enumerate(keys):
438450
try:
439451
k.verify(msg, data.value[3])
440-
return p, u, data.value[2]
452+
return p, u, payload
441453
except Exception as e:
442454
err = e
443455
raise err
444456

445457
# MAC
446458
if data.tag == 97:
447-
to_be_maced = self._dumps(["MAC", data.value[0], external_aad, data.value[2]])
459+
to_be_maced = self._dumps(["MAC", data.value[0], external_aad, payload])
448460
rs = Recipients.from_list(data.value[4], self._verify_kid, context)
449461
mac_auth_key = rs.derive_key(keys, alg, external_aad, "Mac_Recipient")
450462
mac_auth_key.verify(to_be_maced, data.value[3])
451-
return p, u, data.value[2]
463+
return p, u, payload
452464

453465
# Signature1
454466
if data.tag == 18:
455467
kid = self._get_kid(p, u)
456-
to_be_signed = self._dumps(["Signature1", data.value[0], external_aad, data.value[2]])
468+
to_be_signed = self._dumps(["Signature1", data.value[0], external_aad, payload])
457469
if kid:
458470
for _, k in enumerate(keys):
459471
if k.kid != kid:
@@ -462,7 +474,7 @@ def decode_with_headers(
462474
if self._ca_certs:
463475
k.validate_certificate(self._ca_certs)
464476
k.verify(to_be_signed, data.value[3])
465-
return p, u, data.value[2]
477+
return p, u, payload
466478
except Exception as e:
467479
err = e
468480
raise err
@@ -471,7 +483,7 @@ def decode_with_headers(
471483
if self._ca_certs:
472484
k.validate_certificate(self._ca_certs)
473485
k.verify(to_be_signed, data.value[3])
474-
return p, u, data.value[2]
486+
return p, u, payload
475487
except Exception as e:
476488
err = e
477489
raise err
@@ -501,11 +513,11 @@ def decode_with_headers(
501513
data.value[0],
502514
sig[0],
503515
external_aad,
504-
data.value[2],
516+
payload,
505517
]
506518
)
507519
k.verify(to_be_signed, sig[2])
508-
return p, u, data.value[2]
520+
return p, u, payload
509521
except Exception as e:
510522
err = e
511523
continue
@@ -517,11 +529,11 @@ def decode_with_headers(
517529
data.value[0],
518530
sig[0],
519531
external_aad,
520-
data.value[2],
532+
payload,
521533
]
522534
)
523535
k.verify(to_be_signed, sig[2])
524-
return p, u, data.value[2]
536+
return p, u, payload
525537
except Exception as e:
526538
err = e
527539
raise err

cwt/cose_message.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(self, type: COSETypes, msg: List[Any]):
2929
self._type = type
3030
self._protected = msg[0]
3131
self._unprotected = msg[1]
32-
self._payload = msg[2]
32+
# self._payload = msg[2] # msg[2] is mutable and has no readable alias to avoid complexity.
3333
self._other_fields: List[bytes] = []
3434
self._recipients: List[List[Any]] = []
3535
self._signatures: List[List[Any]] = []
@@ -150,7 +150,7 @@ def payload(self) -> bytes:
150150
"""
151151
The payload of the COSE message.
152152
"""
153-
return self._payload
153+
return self._msg[2]
154154

155155
@property
156156
def other_fields(self) -> List[bytes]:
@@ -180,7 +180,14 @@ def dumps(self) -> bytes:
180180
tag = COSE_TYPE_TO_TAG.get(self._type, -1)
181181
return self._dumps(CBORTag(tag, self._msg)) if tag > 0 else self._dumps(self._msg)
182182

183-
def countersign(self, signer: Signer, aad: bytes = b"", abbreviated: bool = False, tagged: bool = False) -> COSEMessage:
183+
def countersign(
184+
self,
185+
signer: Signer,
186+
aad: bytes = b"",
187+
abbreviated: bool = False,
188+
tagged: bool = False,
189+
detached_payload: Optional[bytes] = None,
190+
) -> COSEMessage:
184191
"""
185192
Countersigns to the COSE message with the signer specified.
186193
@@ -189,21 +196,29 @@ def countersign(self, signer: Signer, aad: bytes = b"", abbreviated: bool = Fals
189196
aad (bytes): The application supplied additional authenticated data.
190197
abbreviated(bool): The type of the countersignature (abbreviated or not).
191198
tagged(bool): The indicator whether the countersignature is tagged or not.
199+
detached_payload (Optional[bytes]): The detached payload that should be
200+
countersigned with the COSEMessage.
192201
Returns:
193202
COSEMessage: The COSE message (self).
194203
Raises:
195204
ValueError: Invalid arguments.
196205
EncodeError: Failed to countersign.
197206
"""
207+
payload = self._msg[2]
208+
if detached_payload is not None:
209+
if self._msg[2] is not None:
210+
raise ValueError("The payload already exists.")
211+
payload = detached_payload
212+
198213
if abbreviated:
199-
to_be_signed = ["CounterSignature0V2", self._protected, aad, self._payload]
214+
to_be_signed = ["CounterSignature0V2", self._protected, aad, payload]
200215
for other_field in self._other_fields:
201216
to_be_signed.append(other_field)
202217
signer.sign(self._dumps(to_be_signed))
203218
self._unprotected[12] = signer.signature
204219
return self
205220

206-
to_be_signed = ["CounterSignatureV2", self._protected, signer.protected, aad, self._payload]
221+
to_be_signed = ["CounterSignatureV2", self._protected, signer.protected, aad, payload]
207222
for other_field in self._other_fields:
208223
to_be_signed.append(other_field)
209224
signer.sign(self._dumps(to_be_signed))
@@ -217,23 +232,36 @@ def countersign(self, signer: Signer, aad: bytes = b"", abbreviated: bool = Fals
217232
self._unprotected[11].append([signer.protected, signer.unprotected, signer.signature])
218233
return self
219234

220-
def counterverify(self, key: COSEKeyInterface, aad: bytes = b"") -> Optional[List[Any]]:
235+
def counterverify(
236+
self,
237+
key: COSEKeyInterface,
238+
aad: bytes = b"",
239+
detached_payload: Optional[bytes] = None,
240+
) -> Optional[List[Any]]:
221241
"""
222242
Verifies a countersignature in the COSE message with the verification key specified.
223243
224244
Args:
225245
key(COSEKeyInterface): A COSEKey that is used to verify a signature in the COSE message.
226246
aad (bytes): The application supplied additional authenticated data.
247+
detached_payload (Optional[bytes]): The detached payload that should be
248+
counterverified with the COSEMessage.
227249
Returns:
228250
Optional[List[Any]]: The COSE signature verified.
229251
Raises:
230252
ValueError: Invalid arguments.
231253
VerifyError: Failed to verify.
232254
"""
255+
payload = self._msg[2]
256+
if detached_payload is not None:
257+
if self._msg[2] is not None:
258+
raise ValueError("The payload already exists.")
259+
payload = detached_payload
260+
233261
err: Exception = ValueError("Countersignature not found.")
234262
acs = self._unprotected.get(12, None)
235263
if acs:
236-
to_be_signed = ["CounterSignature0V2", self._protected, aad, self._payload]
264+
to_be_signed = ["CounterSignature0V2", self._protected, aad, payload]
237265
for other_field in self._other_fields:
238266
to_be_signed.append(other_field)
239267
try:
@@ -245,7 +273,7 @@ def counterverify(self, key: COSEKeyInterface, aad: bytes = b"") -> Optional[Lis
245273
cs = self._unprotected.get(11, None)
246274
if not cs:
247275
raise err
248-
to_be_signed = ["CounterSignatureV2", self._protected, b"", aad, self._payload]
276+
to_be_signed = ["CounterSignatureV2", self._protected, b"", aad, payload]
249277
for other_field in self._other_fields:
250278
to_be_signed.append(other_field)
251279
if isinstance(cs[0], bytes):
@@ -310,8 +338,9 @@ def detach_payload(self) -> Tuple[COSEMessage, bytes]:
310338
if self._msg[2] is None:
311339
raise ValueError("The payload does not exist.")
312340

341+
payload = self._msg[2]
313342
self._msg[2] = None
314-
return self, self._payload
343+
return self, payload
315344

316345
def attach_payload(self, payload: bytes) -> COSEMessage:
317346
"""
@@ -326,7 +355,6 @@ def attach_payload(self, payload: bytes) -> COSEMessage:
326355
"""
327356

328357
if self._msg[2] is not None:
329-
raise ValueError("The payload already exist.")
330-
self._payload = payload
331-
358+
raise ValueError("The payload already exists.")
359+
self._msg[2] = payload
332360
return self

0 commit comments

Comments
 (0)