|
2 | 2 |
|
3 | 3 | from pyhpke import AEADId, CipherSuite, KDFId, KEMId, KEMKey, KEMKeyInterface |
4 | 4 |
|
| 5 | +from ..cose_key import COSEKey |
5 | 6 | from ..cose_key_interface import COSEKeyInterface |
6 | 7 | from ..exceptions import DecodeError, EncodeError |
7 | 8 | from ..recipient_interface import RecipientInterface |
@@ -30,62 +31,58 @@ def __init__( |
30 | 31 | self._suite = CipherSuite.new(KEMId(unprotected[-4][1]), KDFId(unprotected[-4][2]), AEADId(unprotected[-4][3])) |
31 | 32 | return |
32 | 33 |
|
33 | | - def apply( |
| 34 | + def encode( |
34 | 35 | self, |
35 | | - key: Optional[COSEKeyInterface] = None, |
| 36 | + plaintext: bytes, |
36 | 37 | recipient_key: Optional[COSEKeyInterface] = None, |
37 | 38 | salt: Optional[bytes] = None, |
38 | 39 | context: Optional[Union[List[Any], Dict[str, Any]]] = None, |
39 | 40 | external_aad: bytes = b"", |
40 | 41 | aad_context: str = "Enc_Recipient", |
41 | | - ) -> COSEKeyInterface: |
42 | | - # if not key: |
43 | | - # raise ValueError("key should be set.") |
| 42 | + ) -> Optional[COSEKeyInterface]: |
44 | 43 | if not recipient_key: |
45 | 44 | raise ValueError("recipient_key should be set.") |
46 | | - # if recipient_key.kid: |
47 | | - # self._protected[4] = key.kid |
48 | 45 | self._recipient_key = recipient_key |
49 | 46 | self._kem_key = self._to_kem_key(recipient_key) |
50 | | - # enc_structure = ["Enc_Recipient", self._dumps(self._protected), external_aad] |
51 | | - # aad = self._dumps(enc_structure) |
52 | | - # enc, sender = self._suite.create_sender_context(self._kem_key) |
53 | | - # self._unprotected[-4][4] = enc |
54 | | - # try: |
55 | | - # self._ciphertext = sender.seal(key.key, aad=aad) |
56 | | - # except Exception as err: |
57 | | - # raise EncodeError("Failed to seal.") from err |
58 | | - return self._recipient_key |
59 | | - |
60 | | - def to_list(self, payload: bytes = b"", external_aad: bytes = b"", aad_context: str = "Enc_Recipient") -> List[Any]: |
61 | 47 | enc_structure = [aad_context, self._dumps(self._protected), external_aad] |
62 | 48 | aad = self._dumps(enc_structure) |
63 | | - enc, sender = self._suite.create_sender_context(self._kem_key) |
64 | | - self._unprotected[-4][4] = enc |
65 | 49 | try: |
66 | | - self._ciphertext = sender.seal(payload, aad=aad) |
67 | | - return super().to_list(payload, external_aad, aad_context) |
| 50 | + enc, ctx = self._suite.create_sender_context(self._kem_key) |
| 51 | + self._unprotected[-4][4] = enc |
| 52 | + self._ciphertext = ctx.seal(plaintext, aad=aad) |
68 | 53 | except Exception as err: |
69 | 54 | raise EncodeError("Failed to seal.") from err |
| 55 | + return None |
70 | 56 |
|
71 | | - def decrypt( |
| 57 | + def decode( |
72 | 58 | self, |
73 | 59 | key: COSEKeyInterface, |
74 | | - alg: Optional[int] = None, |
75 | 60 | context: Optional[Union[List[Any], Dict[str, Any]]] = None, |
76 | | - payload: bytes = b"", |
77 | | - nonce: bytes = b"", |
78 | | - aad: bytes = b"", |
79 | 61 | external_aad: bytes = b"", |
80 | 62 | aad_context: str = "Enc_Recipient", |
81 | 63 | ) -> bytes: |
82 | 64 | enc_structure = [aad_context, self._dumps(self._protected), external_aad] |
83 | 65 | aad = self._dumps(enc_structure) |
84 | | - recipient = self._suite.create_recipient_context(self._unprotected[-4][4], self._to_kem_key(key)) |
85 | 66 | try: |
86 | | - return recipient.open(self._ciphertext, aad=aad) |
| 67 | + ctx = self._suite.create_recipient_context(self._unprotected[-4][4], self._to_kem_key(key)) |
| 68 | + return ctx.open(self._ciphertext, aad=aad) |
87 | 69 | except Exception as err: |
88 | 70 | raise DecodeError("Failed to open.") from err |
89 | 71 |
|
| 72 | + def decrypt( |
| 73 | + self, |
| 74 | + key: COSEKeyInterface, |
| 75 | + alg: Optional[int] = None, |
| 76 | + context: Optional[Union[List[Any], Dict[str, Any]]] = None, |
| 77 | + payload: bytes = b"", |
| 78 | + nonce: bytes = b"", |
| 79 | + aad: bytes = b"", |
| 80 | + external_aad: bytes = b"", |
| 81 | + aad_context: str = "Enc_Recipient", |
| 82 | + ) -> bytes: |
| 83 | + alg = alg if isinstance(alg, int) else 0 |
| 84 | + raw = self.decode(key, context, external_aad, aad_context) |
| 85 | + return COSEKey.from_symmetric_key(raw, alg=alg, kid=self._kid).decrypt(payload, nonce, aad) |
| 86 | + |
90 | 87 | def _to_kem_key(self, src: COSEKeyInterface) -> KEMKeyInterface: |
91 | 88 | return KEMKey.from_pyca_cryptography_key(src.key) |
0 commit comments