Skip to content

Commit fefb7ab

Browse files
committed
implement PASETO Version4 protocol
1 parent a9b8dd0 commit fefb7ab

3 files changed

Lines changed: 355 additions & 0 deletions

File tree

paseto/protocol/version4.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
"""
2+
This module contains Version4 implementation of the Paseto protocol.
3+
4+
https://github.com/paseto-standard/paseto-spec/blob/master/docs/01-Protocol-Versions/Version4.md
5+
"""
6+
7+
import hashlib
8+
import hmac
9+
import os
10+
from typing import Tuple
11+
12+
from paseto.crypto import libsodium_wrapper, primitives
13+
from paseto.exceptions import InvalidKey, InvalidMac
14+
from paseto.paserk.keys import (
15+
_TYPE_LOCAL,
16+
_TYPE_PUBLIC,
17+
_TYPE_SECRET,
18+
_create_asymmetric_key,
19+
_create_symmetric_key,
20+
_deserialize_key,
21+
)
22+
from paseto.paserk.keys import _verify_key as _generic_verify_key
23+
from paseto.protocol.common import check_footer, check_header, decode_message
24+
from paseto.protocol.util import b64, pae
25+
26+
HEADER_LOCAL = b"v4.local."
27+
HEADER_PUBLIC = b"v4.public."
28+
29+
NONCE_SIZE = 32
30+
ENCRYPTION_KEY_LENGTH = 32
31+
AUTHENTICATION_KEY_LENGTH = 32
32+
MAC_SIZE = 32
33+
34+
INFO_ENCRYPTION = b"paseto-encryption-key"
35+
INFO_AUTHENTICATION = b"paseto-auth-key-for-aead"
36+
37+
38+
def encrypt(
39+
message: bytes, key: bytes, footer: bytes = b"", implicit_assertion: bytes = b""
40+
) -> bytes:
41+
"""PASETO Version4 encrypt function."""
42+
43+
# verify that key is intended for use with this function
44+
_verify_key(key, _TYPE_LOCAL)
45+
raw_key: bytes = _deserialize_key(key)
46+
47+
# Step 1
48+
header: bytes = HEADER_LOCAL
49+
50+
# Step 2
51+
nonce: bytes = os.urandom(NONCE_SIZE)
52+
53+
# Step 3
54+
encryption_key: bytes
55+
authentication_key: bytes
56+
nonce2: bytes
57+
encryption_key, authentication_key, nonce2 = _split_key(raw_key, nonce)
58+
59+
# Step 4
60+
ciphertext: bytes = libsodium_wrapper.crypto_stream_xchacha20_xor(
61+
message=message, nonce=nonce2, key=encryption_key
62+
)
63+
64+
# Step 5
65+
pre_auth: bytes = pae([header, nonce, ciphertext, footer, implicit_assertion])
66+
67+
# Step 6
68+
message_authentication_code: bytes = hashlib.blake2b(
69+
pre_auth, key=authentication_key, digest_size=32
70+
).digest()
71+
72+
# Step 7
73+
ret: bytes = header + b64(nonce + ciphertext + message_authentication_code)
74+
if footer:
75+
ret += b"." + b64(footer)
76+
return ret
77+
78+
79+
def decrypt(
80+
message: bytes, key: bytes, footer: bytes = b"", implicit_assertion: bytes = b""
81+
) -> bytes:
82+
"""PASETO Version4 decrypt function."""
83+
84+
# verify that key is intended for use with this function
85+
_verify_key(key, _TYPE_LOCAL)
86+
raw_key: bytes = _deserialize_key(key)
87+
88+
# Step 1
89+
check_footer(message, footer)
90+
91+
# Step 2
92+
header: bytes = HEADER_LOCAL
93+
check_header(message, header)
94+
95+
# Step 3
96+
decoded: bytes = decode_message(message, len(header))
97+
nonce: bytes = decoded[:NONCE_SIZE]
98+
ciphertext: bytes = decoded[NONCE_SIZE:-MAC_SIZE]
99+
100+
# Step 4
101+
encryption_key, authentication_key, nonce2 = _split_key(raw_key, nonce)
102+
103+
# Step 5
104+
pre_auth: bytes = pae([header, nonce, ciphertext, footer, implicit_assertion])
105+
106+
# Step 6
107+
computed_mac: bytes = hashlib.blake2b(
108+
pre_auth, key=authentication_key, digest_size=MAC_SIZE
109+
).digest()
110+
111+
# Step 7
112+
mac_in_message: bytes = decoded[-MAC_SIZE:]
113+
if not hmac.compare_digest(mac_in_message, computed_mac):
114+
raise InvalidMac("Invalid MAC for given ciphertext")
115+
116+
# Steps 8 and 9
117+
return libsodium_wrapper.crypto_stream_xchacha20_xor(
118+
message=ciphertext, nonce=nonce2, key=encryption_key
119+
)
120+
121+
122+
def sign(
123+
message: bytes,
124+
secret_key: bytes,
125+
footer: bytes = b"",
126+
implicit_assertion: bytes = b"",
127+
) -> bytes:
128+
"""Sign message and return token which can then be used with verify()."""
129+
130+
# verify that key is intended for use with this function
131+
_verify_key(secret_key, _TYPE_SECRET)
132+
raw_secret_key: bytes = _deserialize_key(secret_key)
133+
134+
# Step 1
135+
header = HEADER_PUBLIC
136+
137+
# Step 2
138+
message2 = pae([header, message, footer, implicit_assertion])
139+
140+
# Step 3
141+
signature = primitives.sign(message2, raw_secret_key)
142+
143+
# Step 4
144+
ret = header + b64(message + signature)
145+
if footer:
146+
ret += b"." + b64(footer)
147+
148+
return ret
149+
150+
151+
def verify(
152+
signed_message: bytes,
153+
public_key: bytes,
154+
footer: bytes = b"",
155+
implicit_assertion: bytes = b"",
156+
) -> bytes:
157+
"""Verify signature and return message. Raises exception if signature is invalid."""
158+
159+
# verify that key is intended for use with this function
160+
_verify_key(public_key, _TYPE_PUBLIC)
161+
raw_public_key: bytes = _deserialize_key(public_key)
162+
163+
# Step 1
164+
check_footer(signed_message, footer)
165+
166+
# Step 2
167+
header = HEADER_PUBLIC
168+
check_header(signed_message, header)
169+
170+
# Step 3
171+
raw_inner_message: bytes = decode_message(signed_message, len(header))
172+
signature = raw_inner_message[-64:]
173+
message = raw_inner_message[:-64]
174+
175+
# Step 4
176+
message2 = pae([header, message, footer, implicit_assertion])
177+
178+
# Steps 5 and 6
179+
primitives.verify(signature, message2, raw_public_key)
180+
return message
181+
182+
183+
def _split_key(key: bytes, nonce: bytes) -> Tuple[bytes, bytes, bytes]:
184+
185+
hashed: bytes = hashlib.blake2b(
186+
INFO_ENCRYPTION + nonce, key=key, digest_size=56
187+
).digest()
188+
encryption_key: bytes = hashed[:ENCRYPTION_KEY_LENGTH]
189+
nonce2: bytes = hashed[ENCRYPTION_KEY_LENGTH:]
190+
authentication_key: bytes = hashlib.blake2b(
191+
INFO_AUTHENTICATION + nonce, key=key, digest_size=AUTHENTICATION_KEY_LENGTH
192+
).digest()
193+
194+
return encryption_key, authentication_key, nonce2
195+
196+
197+
def _verify_key(key: bytes, key_type: bytes) -> None:
198+
if not _generic_verify_key(key, 4, key_type):
199+
raise InvalidKey
200+
201+
202+
def create_symmetric_key() -> bytes:
203+
"""Return key for use with encrypt() and decrypt()."""
204+
return _create_symmetric_key(4)
205+
206+
207+
def create_asymmetric_key() -> Tuple[bytes, bytes]:
208+
"""Return key pair for use with sign() and verify()."""
209+
return _create_asymmetric_key(4)
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
This module contains tests for official test vectors.
3+
4+
Test vectors: https://github.com/paseto-standard/test-vectors
5+
Docs: https://github.com/paseto-standard/paseto-spec
6+
"""
7+
8+
import os
9+
from unittest.mock import MagicMock, patch
10+
11+
import pytest
12+
13+
from paseto.paserk.keys import _create_asymmetric_key, _create_symmetric_key
14+
from paseto.protocol import version4
15+
from tests.conftest import get_test_vector
16+
from tests.util import (
17+
transform_test_case_for_v4_local,
18+
transform_test_case_for_v4_public,
19+
)
20+
21+
22+
def get_test_cases(name: str):
23+
"""Return test cases filtered by name."""
24+
return [
25+
test_case
26+
for test_case in get_test_vector("v4")["tests"]
27+
if test_case["name"].startswith(name)
28+
]
29+
30+
31+
# use a test nonce for reproducible tests
32+
@patch.object(os, "urandom")
33+
# pylint: disable=too-many-arguments
34+
@pytest.mark.parametrize(
35+
"test_name,nonce,raw_key_material,test_token,payload,footer,implicit_assertion",
36+
[
37+
transform_test_case_for_v4_local(test_case)
38+
for test_case in get_test_cases("4-E")
39+
],
40+
)
41+
def test_v4_local(
42+
mock: MagicMock,
43+
test_name: str,
44+
nonce: bytes,
45+
raw_key_material: bytes,
46+
test_token: bytes,
47+
payload: bytes,
48+
footer: bytes,
49+
implicit_assertion: bytes,
50+
) -> None:
51+
"""Tests for v4.local (Shared-Key Encryption)."""
52+
53+
# use non random nonce for reproducible tests
54+
mock.return_value = nonce
55+
key = _create_symmetric_key(4, raw_key_material)
56+
57+
# verify that encrypt produces expected token
58+
assert test_token == version4.encrypt(
59+
payload, key, footer, implicit_assertion
60+
), test_name
61+
62+
# verify that decrypt produces expected payload
63+
assert payload == version4.decrypt(
64+
test_token, key, footer, implicit_assertion
65+
), test_name
66+
67+
68+
# pylint: disable=too-many-arguments
69+
@pytest.mark.parametrize(
70+
"test_name,raw_public_key,raw_secret_key,test_token,payload,footer,implicit_assertion",
71+
[
72+
transform_test_case_for_v4_public(test_case)
73+
for test_case in get_test_cases("4-S")
74+
],
75+
)
76+
def test_v4_public(
77+
test_name: str,
78+
raw_public_key: bytes,
79+
raw_secret_key: bytes,
80+
test_token: bytes,
81+
payload: bytes,
82+
footer: bytes,
83+
implicit_assertion: bytes,
84+
) -> None:
85+
"""Tests for v4.public"""
86+
87+
public_key, secret_key = _create_asymmetric_key(4, raw_public_key, raw_secret_key)
88+
89+
# verify that sign produces expected token
90+
assert test_token == version4.sign(
91+
payload, secret_key, footer, implicit_assertion
92+
), test_name
93+
94+
# verify that token contains expected payload
95+
assert payload == version4.verify(
96+
test_token, public_key, footer, implicit_assertion
97+
), test_name

tests/protocol/test_version4.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
"""This module contains test for version4.py"""
2+
3+
import pytest
4+
5+
from paseto.exceptions import InvalidKey, InvalidMac
6+
from paseto.paserk.keys import _create_symmetric_key
7+
from paseto.protocol import version4
8+
from paseto.protocol.version4 import _verify_key
9+
10+
11+
def test_encrypt_decrypt() -> None:
12+
"""Test that decrypt() reverses encrypt()."""
13+
message: bytes = b"foo"
14+
key: bytes = version4.create_symmetric_key()
15+
16+
token: bytes = version4.encrypt(message, key)
17+
plain_text: bytes = version4.decrypt(token, key)
18+
19+
assert plain_text == message
20+
21+
22+
@pytest.mark.parametrize(
23+
"footer,implicit_assertion", [(b"", b""), (b"some footer", b"some assertion")]
24+
)
25+
def test_sign_verify(footer: bytes, implicit_assertion: bytes) -> None:
26+
"""Check that verify() reverses sign()."""
27+
public_key, secret_key = version4.create_asymmetric_key()
28+
message = b"foo"
29+
30+
signed = version4.sign(message, secret_key, footer, implicit_assertion)
31+
assert version4.verify(signed, public_key, footer, implicit_assertion) == message
32+
33+
34+
def test_decrypt_invalid_mac() -> None:
35+
"""Test that exception is raised when mac is not valid."""
36+
message: bytes = b"foo"
37+
key: bytes = _create_symmetric_key(4, b"0" * 32)
38+
39+
token: bytes = version4.encrypt(message, key)
40+
# tamper with mac
41+
token_with_invalid_mac = token[:40] + b"0" + token[41:]
42+
with pytest.raises(InvalidMac):
43+
version4.decrypt(token_with_invalid_mac, key)
44+
45+
46+
def test_verify_key() -> None:
47+
"""Test that exception is raised when key is not verified."""
48+
with pytest.raises(InvalidKey):
49+
_verify_key(b"", b"some type")

0 commit comments

Comments
 (0)