-
Notifications
You must be signed in to change notification settings - Fork 114
Expand file tree
/
Copy pathtest_access_token.py
More file actions
101 lines (81 loc) · 3.09 KB
/
test_access_token.py
File metadata and controls
101 lines (81 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import datetime
import pytest # type: ignore
from livekit.api import AccessToken, TokenVerifier, VideoGrants, SIPGrants
from livekit.protocol.room import RoomConfiguration
from livekit.protocol.agent_dispatch import RoomAgentDispatch
TEST_API_KEY = "myapikey"
TEST_API_SECRET = "thiskeyistotallyunsafe"
def test_verify_token():
grants = VideoGrants(room_join=True, room="test_room")
sip = SIPGrants(admin=True)
token = (
AccessToken(TEST_API_KEY, TEST_API_SECRET)
.with_identity("test_identity")
.with_metadata("test_metadata")
.with_grants(grants)
.with_sip_grants(sip)
.with_attributes({"key1": "value1", "key2": "value2"})
.to_jwt()
)
token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET)
claims = token_verifier.verify(token)
assert claims.identity == "test_identity"
assert claims.metadata == "test_metadata"
assert claims.video == grants
assert claims.sip == sip
assert claims.attributes["key1"] == "value1"
assert claims.attributes["key2"] == "value2"
def test_agent_config():
token = (
AccessToken(TEST_API_KEY, TEST_API_SECRET)
.with_identity("test_identity")
.with_grants(VideoGrants(room_join=True, room="test_room"))
.with_room_config(
RoomConfiguration(
agents=[RoomAgentDispatch(agent_name="test-agent")],
),
)
.to_jwt()
)
token_verifier = TokenVerifier(TEST_API_KEY, TEST_API_SECRET)
claims = token_verifier.verify(token)
# Verify the decoded claims match
assert claims.room_config.agents[0].agent_name == "test-agent"
# Split token into header.payload.signature
parts = token.split(".")
import base64
import json
# Decode the payload (middle part)
payload = parts[1]
# Add padding if needed
padding = len(payload) % 4
if padding:
payload += "=" * (4 - padding)
decoded = base64.b64decode(payload)
payload_json = json.loads(decoded)
print(decoded)
# Verify the room_config and agents were encoded correctly
assert "roomConfig" in payload_json
assert "agents" in payload_json["roomConfig"]
assert len(payload_json["roomConfig"]["agents"]) == 1
assert payload_json["roomConfig"]["agents"][0]["agentName"] == "test-agent"
def test_verify_token_invalid():
token = AccessToken(TEST_API_KEY, TEST_API_SECRET).with_identity("test_identity").to_jwt()
token_verifier = TokenVerifier(TEST_API_KEY, "invalid_secret")
with pytest.raises(Exception):
token_verifier.verify(token)
token_verifier = TokenVerifier("invalid_key", TEST_API_SECRET)
with pytest.raises(Exception):
token_verifier.verify(token)
def test_verify_token_expired():
token = (
AccessToken(TEST_API_KEY, TEST_API_SECRET)
.with_identity("test_identity")
.with_ttl(datetime.timedelta(seconds=-1))
.to_jwt()
)
token_verifier = TokenVerifier(
TEST_API_KEY, TEST_API_SECRET, leeway=datetime.timedelta(seconds=0)
)
with pytest.raises(Exception):
token_verifier.verify(token)