|
2 | 2 | from secrets import token_bytes |
3 | 3 | from typing import Any, Dict, List, Optional, Union |
4 | 4 |
|
| 5 | +from ..algs.ec2 import EC2Key |
| 6 | +from ..algs.okp import OKPKey |
5 | 7 | from ..const import COSE_KEY_OPERATION_VALUES |
6 | 8 | from ..cose_key import COSEKey |
7 | 9 | from ..cose_key_interface import COSEKeyInterface |
@@ -58,6 +60,66 @@ def __init__( |
58 | 60 | else: |
59 | 61 | raise ValueError(f"Unknown alg(1) for ECDH with HKDF: {self._alg}.") |
60 | 62 |
|
| 63 | + def encode( |
| 64 | + self, |
| 65 | + plaintext: bytes = b"", |
| 66 | + recipient_key: Optional[COSEKeyInterface] = None, |
| 67 | + salt: Optional[bytes] = None, |
| 68 | + context: Optional[Union[List[Any], Dict[str, Any]]] = None, |
| 69 | + external_aad: bytes = b"", |
| 70 | + aad_context: str = "Enc_Recipient", |
| 71 | + ) -> Optional[COSEKeyInterface]: |
| 72 | + |
| 73 | + if not recipient_key: |
| 74 | + raise ValueError("recipient_key should be set in advance.") |
| 75 | + if not context: |
| 76 | + raise ValueError("context should be set.") |
| 77 | + ctx: list |
| 78 | + if isinstance(context, dict): |
| 79 | + alg = self._alg if isinstance(self._alg, int) else 0 |
| 80 | + ctx = to_cis(context, alg) |
| 81 | + else: |
| 82 | + self._validate_context(context) |
| 83 | + ctx = context |
| 84 | + self._applied_ctx = self._apply_context(ctx) |
| 85 | + |
| 86 | + # Generate a salt automatically if both of a salt and a PartyU nonce are not specified. |
| 87 | + if self._alg in [-27, -28]: # ECDH-SS |
| 88 | + if not salt and not self._salt and not self._applied_ctx[1][1]: |
| 89 | + self._salt = token_bytes(32) if self._alg == -27 else token_bytes(64) |
| 90 | + self._unprotected[-20] = self._salt |
| 91 | + elif salt: |
| 92 | + self._salt = salt |
| 93 | + self._unprotected[-20] = self._salt |
| 94 | + |
| 95 | + # PartyU nonce |
| 96 | + if self._applied_ctx[1][1]: |
| 97 | + self._unprotected[-22] = self._applied_ctx[1][1] |
| 98 | + # PartyV nonce |
| 99 | + if self._applied_ctx[2][1]: |
| 100 | + self._unprotected[-25] = self._applied_ctx[2][1] |
| 101 | + |
| 102 | + # Derive key. |
| 103 | + if self._alg in [-25, -26]: |
| 104 | + # ECDH-ES |
| 105 | + if recipient_key.kty == 2: |
| 106 | + self._sender_key = EC2Key({1: 2, -1: recipient_key.crv, 3: self._alg}) |
| 107 | + else: |
| 108 | + # should drop this support. |
| 109 | + self._sender_key = OKPKey({1: 1, -1: recipient_key.crv, 3: self._alg}) |
| 110 | + else: |
| 111 | + # ECDH-SS (alg=-27 or -28) |
| 112 | + if not self._sender_key: |
| 113 | + raise ValueError("sender_key should be set in advance.") |
| 114 | + derived_key = self._sender_key.derive_key(self._applied_ctx, public_key=recipient_key) |
| 115 | + if self._alg in [-25, -26]: |
| 116 | + # ECDH-ES |
| 117 | + self._unprotected[-1] = self._to_cose_key(self._sender_key.key.public_key()) |
| 118 | + else: |
| 119 | + # ECDH-SS (alg=-27 or -28) |
| 120 | + self._unprotected[-2] = self._to_cose_key(self._sender_key.key.public_key()) |
| 121 | + return derived_key |
| 122 | + |
61 | 123 | def apply( |
62 | 124 | self, |
63 | 125 | key: Optional[COSEKeyInterface] = None, |
|
0 commit comments