|
1 | | -from typing import Any, Dict, List, Optional, Union |
| 1 | +from typing import Any, Dict, List, Optional, Tuple, Union |
2 | 2 |
|
3 | 3 | from asn1crypto import pem |
4 | 4 | from cbor2 import CBORTag |
@@ -112,6 +112,63 @@ def verify_kid(self, verify_kid: bool): |
112 | 112 | self._verify_kid = verify_kid |
113 | 113 | return |
114 | 114 |
|
| 115 | + # def encode( |
| 116 | + # self, |
| 117 | + # payload: bytes, |
| 118 | + # key: Optional[COSEKeyInterface] = None, |
| 119 | + # protected: Optional[dict] = None, |
| 120 | + # unprotected: Optional[dict] = None, |
| 121 | + # recipients: Optional[List[RecipientInterface]] = None, |
| 122 | + # signers: List[Signer] = [], |
| 123 | + # external_aad: bytes = b"", |
| 124 | + # out: str = "", |
| 125 | + # ) -> bytes: |
| 126 | + # """ |
| 127 | + # Encodes COSE message with MAC, signing and encryption. |
| 128 | + |
| 129 | + # Args: |
| 130 | + # payload (bytes): A content to be MACed, signed or encrypted. |
| 131 | + # key (Optional[COSEKeyInterface]): A content encryption key as COSEKey. |
| 132 | + # protected (Optional[dict]): Parameters that are to be cryptographically protected. |
| 133 | + # unprotected (Optional[dict]): Parameters that are not cryptographically protected. |
| 134 | + # nonce (bytes): A nonce for encryption. |
| 135 | + # recipients (Optional[List[RecipientInterface]]): A list of recipient |
| 136 | + # information structures. |
| 137 | + # signers (List[Signer]): A list of signer information objects for |
| 138 | + # multiple signer cases. |
| 139 | + # external_aad(bytes): External additional authenticated data supplied |
| 140 | + # by application. |
| 141 | + # out(str): An output format. Only ``"cbor2/CBORTag"`` can be used. If |
| 142 | + # ``"cbor2/CBORTag"`` is specified. This function will return encoded |
| 143 | + # data as `cbor2 <https://cbor2.readthedocs.io/en/stable/>`_'s |
| 144 | + # ``CBORTag`` object. If any other value is specified, it will return |
| 145 | + # encoded data as bytes. |
| 146 | + # Returns: |
| 147 | + # Union[bytes, CBORTag]: A byte string of the encoded COSE or a |
| 148 | + # cbor2.CBORTag object. |
| 149 | + # Raises: |
| 150 | + # ValueError: Invalid arguments. |
| 151 | + # EncodeError: Failed to encode data. |
| 152 | + # """ |
| 153 | + # p = to_cose_header(protected) |
| 154 | + # u = to_cose_header(unprotected) |
| 155 | + # if key is not None: |
| 156 | + # if self._alg_auto_inclusion: |
| 157 | + # p[1] = key.alg |
| 158 | + # if self._kid_auto_inclusion and key.kid: |
| 159 | + # u[4] = key.kid |
| 160 | + |
| 161 | + # if 1 in p and 1 in u: |
| 162 | + # raise ValueError("alg appear both in protected and unprotected.") |
| 163 | + # alg = u[1] if 1 in u else p.get(1, 0) |
| 164 | + # if is_cose_enc(alg): |
| 165 | + # return self._encode_and_encrypt(payload, key, p, u, b"", recipients, external_aad, out) |
| 166 | + # if is_cose_mac(alg): |
| 167 | + # return self._encode_and_mac(payload, key, p, u, recipients, external_aad, out) |
| 168 | + # if is_cose_sign(alg): |
| 169 | + # return self._encode_and_sign(payload, key, p, u, signers, external_aad, out) |
| 170 | + # raise ValueError(f"Unsupported or unknown alg: {alg}.") |
| 171 | + |
115 | 172 | def encode_and_encrypt( |
116 | 173 | self, |
117 | 174 | payload: bytes, |
@@ -148,65 +205,8 @@ def encode_and_encrypt( |
148 | 205 | ValueError: Invalid arguments. |
149 | 206 | EncodeError: Failed to encode data. |
150 | 207 | """ |
151 | | - p = to_cose_header(protected) |
152 | | - u = to_cose_header(unprotected) |
153 | | - if key is not None: |
154 | | - if self._alg_auto_inclusion: |
155 | | - p[1] = key.alg |
156 | | - if self._kid_auto_inclusion and key.kid: |
157 | | - u[4] = key.kid |
158 | | - b_protected = self._dumps(p) if p else b"" |
159 | | - ciphertext: bytes = b"" |
160 | | - |
161 | | - # Encrypt0 |
162 | | - if not recipients: |
163 | | - enc_structure = ["Encrypt0", b_protected, external_aad] |
164 | | - aad = self._dumps(enc_structure) |
165 | | - if 1 in p and p[1] == -1: # HPKE |
166 | | - hpke = HPKE(p, u, recipient_key=key) |
167 | | - encoded, _ = hpke.encode(payload, aad) |
168 | | - res = CBORTag(16, encoded) |
169 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
170 | | - if key is None: |
171 | | - raise ValueError("key should be set.") |
172 | | - if not nonce: |
173 | | - try: |
174 | | - nonce = key.generate_nonce() |
175 | | - except NotImplementedError: |
176 | | - raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.") |
177 | | - u[5] = nonce |
178 | | - ciphertext = key.encrypt(payload, nonce, aad) |
179 | | - res = CBORTag(16, [b_protected, u, ciphertext]) |
180 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
181 | | - |
182 | | - # Encrypt |
183 | | - if recipients[0].alg not in COSE_ALGORITHMS_RECIPIENT.values(): |
184 | | - raise NotImplementedError("Algorithms other than direct are not supported for recipients.") |
185 | | - |
186 | | - recs = [] |
187 | | - b_key = key.to_bytes() if isinstance(key, COSEKeyInterface) else b"" |
188 | | - cek: Optional[COSEKeyInterface] = None |
189 | | - for rec in recipients: |
190 | | - aad = self._dumps(["Enc_Recipient", self._dumps(rec.protected), external_aad]) |
191 | | - encoded, derived_key = rec.encode(b_key, aad) |
192 | | - cek = derived_key if derived_key else key |
193 | | - recs.append(encoded) |
194 | | - |
195 | | - if cek is None: |
196 | | - raise ValueError("key should be set.") |
197 | | - if not nonce: |
198 | | - try: |
199 | | - nonce = cek.generate_nonce() |
200 | | - except NotImplementedError: |
201 | | - raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.") |
202 | | - u[5] = nonce |
203 | | - enc_structure = ["Encrypt", b_protected, external_aad] |
204 | | - aad = self._dumps(enc_structure) |
205 | | - ciphertext = cek.encrypt(payload, nonce, aad) |
206 | | - cose_enc: List[Any] = [b_protected, u, ciphertext] |
207 | | - cose_enc.append(recs) |
208 | | - res = CBORTag(96, cose_enc) |
209 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 208 | + p, u = self._build_headers(key, protected, unprotected) |
| 209 | + return self._encode_and_encrypt(payload, key, p, u, nonce, recipients, external_aad, out) |
210 | 210 |
|
211 | 211 | def encode_and_mac( |
212 | 212 | self, |
@@ -238,45 +238,8 @@ def encode_and_mac( |
238 | 238 | ValueError: Invalid arguments. |
239 | 239 | EncodeError: Failed to encode data. |
240 | 240 | """ |
241 | | - p = to_cose_header(protected) |
242 | | - u = to_cose_header(unprotected) |
243 | | - if key is not None: |
244 | | - if self._alg_auto_inclusion: |
245 | | - p[1] = key.alg |
246 | | - if self._kid_auto_inclusion and key.kid: |
247 | | - u[4] = key.kid |
248 | | - b_protected = self._dumps(p) if p else b"" |
249 | | - |
250 | | - # MAC0 |
251 | | - if not recipients: |
252 | | - if key is None: |
253 | | - raise ValueError("key should be set.") |
254 | | - mac_structure = ["MAC0", b_protected, external_aad, payload] |
255 | | - tag = key.sign(self._dumps(mac_structure)) |
256 | | - res = CBORTag(17, [b_protected, u, payload, tag]) |
257 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
258 | | - |
259 | | - # MAC |
260 | | - if recipients[0].alg not in COSE_ALGORITHMS_RECIPIENT.values(): |
261 | | - raise NotImplementedError("Algorithms other than direct are not supported for recipients.") |
262 | | - |
263 | | - mac_structure = ["MAC", b_protected, external_aad, payload] |
264 | | - |
265 | | - recs = [] |
266 | | - b_key = key.to_bytes() if isinstance(key, COSEKeyInterface) else b"" |
267 | | - for rec in recipients: |
268 | | - aad = self._dumps(["Mac_Recipient", self._dumps(rec.protected), external_aad]) |
269 | | - encoded, derived_key = rec.encode(b_key, aad) |
270 | | - key = derived_key if derived_key else key |
271 | | - recs.append(encoded) |
272 | | - |
273 | | - if key is None: |
274 | | - raise ValueError("key should be set.") |
275 | | - tag = key.sign(self._dumps(mac_structure)) |
276 | | - cose_mac: List[Any] = [b_protected, u, payload, tag] |
277 | | - cose_mac.append(recs) |
278 | | - res = CBORTag(97, cose_mac) |
279 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 241 | + p, u = self._build_headers(key, protected, unprotected) |
| 242 | + return self._encode_and_mac(payload, key, p, u, recipients, external_aad, out) |
280 | 243 |
|
281 | 244 | def encode_and_sign( |
282 | 245 | self, |
@@ -314,30 +277,8 @@ def encode_and_sign( |
314 | 277 | ValueError: Invalid arguments. |
315 | 278 | EncodeError: Failed to encode data. |
316 | 279 | """ |
317 | | - p = to_cose_header(protected) |
318 | | - u = to_cose_header(unprotected) |
319 | | - if key is not None: |
320 | | - if self._alg_auto_inclusion: |
321 | | - p[1] = key.alg |
322 | | - if self._kid_auto_inclusion and key.kid: |
323 | | - u[4] = key.kid |
324 | | - b_protected = self._dumps(p) if p else b"" |
325 | | - |
326 | | - # Signature1 |
327 | | - if not signers and key is not None: |
328 | | - sig_structure = ["Signature1", b_protected, external_aad, payload] |
329 | | - sig = key.sign(self._dumps(sig_structure)) |
330 | | - res = CBORTag(18, [b_protected, u, payload, sig]) |
331 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
332 | | - |
333 | | - # Signature |
334 | | - sigs = [] |
335 | | - for s in signers: |
336 | | - sig_structure = ["Signature", b_protected, s.protected, external_aad, payload] |
337 | | - s.sign(self._dumps(sig_structure)) |
338 | | - sigs.append([s.protected, s.unprotected, s.signature]) |
339 | | - res = CBORTag(98, [b_protected, u, payload, sigs]) |
340 | | - return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 280 | + p, u = self._build_headers(key, protected, unprotected) |
| 281 | + return self._encode_and_sign(payload, key, p, u, signers, external_aad, out) |
341 | 282 |
|
342 | 283 | def decode( |
343 | 284 | self, |
@@ -552,6 +493,159 @@ def decode( |
552 | 493 | err = e |
553 | 494 | raise err |
554 | 495 |
|
| 496 | + def _encode_and_encrypt( |
| 497 | + self, |
| 498 | + payload: bytes, |
| 499 | + key: Optional[COSEKeyInterface], |
| 500 | + p: Dict[int, Any], |
| 501 | + u: Dict[int, Any], |
| 502 | + nonce: bytes, |
| 503 | + recipients: Optional[List[RecipientInterface]], |
| 504 | + external_aad: bytes, |
| 505 | + out: str, |
| 506 | + ) -> bytes: |
| 507 | + |
| 508 | + b_protected = self._dumps(p) if p else b"" |
| 509 | + ciphertext: bytes = b"" |
| 510 | + |
| 511 | + # Encrypt0 |
| 512 | + if not recipients: |
| 513 | + enc_structure = ["Encrypt0", b_protected, external_aad] |
| 514 | + aad = self._dumps(enc_structure) |
| 515 | + if 1 in p and p[1] == -1: # HPKE |
| 516 | + hpke = HPKE(p, u, recipient_key=key) |
| 517 | + encoded, _ = hpke.encode(payload, aad) |
| 518 | + res = CBORTag(16, encoded) |
| 519 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 520 | + if key is None: |
| 521 | + raise ValueError("key should be set.") |
| 522 | + if not nonce: |
| 523 | + try: |
| 524 | + nonce = key.generate_nonce() |
| 525 | + except NotImplementedError: |
| 526 | + raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.") |
| 527 | + u[5] = nonce |
| 528 | + ciphertext = key.encrypt(payload, nonce, aad) |
| 529 | + res = CBORTag(16, [b_protected, u, ciphertext]) |
| 530 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 531 | + |
| 532 | + # Encrypt |
| 533 | + if recipients[0].alg not in COSE_ALGORITHMS_RECIPIENT.values(): |
| 534 | + raise NotImplementedError("Algorithms other than direct are not supported for recipients.") |
| 535 | + |
| 536 | + recs = [] |
| 537 | + b_key = key.to_bytes() if isinstance(key, COSEKeyInterface) else b"" |
| 538 | + cek: Optional[COSEKeyInterface] = None |
| 539 | + for rec in recipients: |
| 540 | + aad = self._dumps(["Enc_Recipient", self._dumps(rec.protected), external_aad]) |
| 541 | + encoded, derived_key = rec.encode(b_key, aad) |
| 542 | + cek = derived_key if derived_key else key |
| 543 | + recs.append(encoded) |
| 544 | + |
| 545 | + if cek is None: |
| 546 | + raise ValueError("key should be set.") |
| 547 | + if not nonce: |
| 548 | + try: |
| 549 | + nonce = cek.generate_nonce() |
| 550 | + except NotImplementedError: |
| 551 | + raise ValueError("Nonce generation is not supported for the key. Set a nonce explicitly.") |
| 552 | + u[5] = nonce |
| 553 | + enc_structure = ["Encrypt", b_protected, external_aad] |
| 554 | + aad = self._dumps(enc_structure) |
| 555 | + ciphertext = cek.encrypt(payload, nonce, aad) |
| 556 | + cose_enc: List[Any] = [b_protected, u, ciphertext] |
| 557 | + cose_enc.append(recs) |
| 558 | + res = CBORTag(96, cose_enc) |
| 559 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 560 | + |
| 561 | + def _build_headers( |
| 562 | + self, |
| 563 | + key: Optional[COSEKeyInterface], |
| 564 | + protected: Optional[dict], |
| 565 | + unprotected: Optional[dict], |
| 566 | + ) -> Tuple[Dict[int, Any], Dict[int, Any]]: |
| 567 | + p = to_cose_header(protected) |
| 568 | + u = to_cose_header(unprotected) |
| 569 | + if key is not None: |
| 570 | + if self._alg_auto_inclusion: |
| 571 | + p[1] = key.alg |
| 572 | + if self._kid_auto_inclusion and key.kid: |
| 573 | + u[4] = key.kid |
| 574 | + return p, u |
| 575 | + |
| 576 | + def _encode_and_mac( |
| 577 | + self, |
| 578 | + payload: bytes, |
| 579 | + key: Optional[COSEKeyInterface], |
| 580 | + p: Dict[int, Any], |
| 581 | + u: Dict[int, Any], |
| 582 | + recipients: Optional[List[RecipientInterface]], |
| 583 | + external_aad: bytes, |
| 584 | + out: str, |
| 585 | + ) -> Union[bytes, CBORTag]: |
| 586 | + |
| 587 | + b_protected = self._dumps(p) if p else b"" |
| 588 | + |
| 589 | + # MAC0 |
| 590 | + if not recipients: |
| 591 | + if key is None: |
| 592 | + raise ValueError("key should be set.") |
| 593 | + mac_structure = ["MAC0", b_protected, external_aad, payload] |
| 594 | + tag = key.sign(self._dumps(mac_structure)) |
| 595 | + res = CBORTag(17, [b_protected, u, payload, tag]) |
| 596 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 597 | + |
| 598 | + # MAC |
| 599 | + if recipients[0].alg not in COSE_ALGORITHMS_RECIPIENT.values(): |
| 600 | + raise NotImplementedError("Algorithms other than direct are not supported for recipients.") |
| 601 | + |
| 602 | + mac_structure = ["MAC", b_protected, external_aad, payload] |
| 603 | + |
| 604 | + recs = [] |
| 605 | + b_key = key.to_bytes() if isinstance(key, COSEKeyInterface) else b"" |
| 606 | + for rec in recipients: |
| 607 | + aad = self._dumps(["Mac_Recipient", self._dumps(rec.protected), external_aad]) |
| 608 | + encoded, derived_key = rec.encode(b_key, aad) |
| 609 | + key = derived_key if derived_key else key |
| 610 | + recs.append(encoded) |
| 611 | + |
| 612 | + if key is None: |
| 613 | + raise ValueError("key should be set.") |
| 614 | + tag = key.sign(self._dumps(mac_structure)) |
| 615 | + cose_mac: List[Any] = [b_protected, u, payload, tag] |
| 616 | + cose_mac.append(recs) |
| 617 | + res = CBORTag(97, cose_mac) |
| 618 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 619 | + |
| 620 | + def _encode_and_sign( |
| 621 | + self, |
| 622 | + payload: bytes, |
| 623 | + key: Optional[COSEKeyInterface], |
| 624 | + p: Dict[int, Any], |
| 625 | + u: Dict[int, Any], |
| 626 | + signers: List[Signer], |
| 627 | + external_aad: bytes, |
| 628 | + out: str, |
| 629 | + ) -> Union[bytes, CBORTag]: |
| 630 | + |
| 631 | + b_protected = self._dumps(p) if p else b"" |
| 632 | + |
| 633 | + # Signature1 |
| 634 | + if not signers and key is not None: |
| 635 | + sig_structure = ["Signature1", b_protected, external_aad, payload] |
| 636 | + sig = key.sign(self._dumps(sig_structure)) |
| 637 | + res = CBORTag(18, [b_protected, u, payload, sig]) |
| 638 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 639 | + |
| 640 | + # Signature |
| 641 | + sigs = [] |
| 642 | + for s in signers: |
| 643 | + sig_structure = ["Signature", b_protected, s.protected, external_aad, payload] |
| 644 | + s.sign(self._dumps(sig_structure)) |
| 645 | + sigs.append([s.protected, s.unprotected, s.signature]) |
| 646 | + res = CBORTag(98, [b_protected, u, payload, sigs]) |
| 647 | + return res if out == "cbor2/CBORTag" else self._dumps(res) |
| 648 | + |
555 | 649 | def _filter_by_key_ops(self, keys: List[COSEKeyInterface], op: int) -> List[COSEKeyInterface]: |
556 | 650 | res: List[COSEKeyInterface] = [] |
557 | 651 | for k in keys: |
|
0 commit comments