|
1 | | -import time |
2 | 1 | from typing import Any, Dict, Iterable |
| 2 | +from unittest.mock import patch |
3 | 3 |
|
4 | 4 | import jwt |
5 | 5 | import pytest |
@@ -74,41 +74,49 @@ async def test_jwt_validator_can_validate_valid_access_tokens(default_keys_provi |
74 | 74 |
|
75 | 75 | @pytest.mark.asyncio |
76 | 76 | async def test_jwt_validator_cache_expiration(default_keys_provider): |
77 | | - validator = JWTValidator( |
78 | | - valid_audiences=["a"], |
79 | | - valid_issuers=["b"], |
80 | | - keys_provider=default_keys_provider, |
81 | | - cache_time=0.1, |
82 | | - ) |
83 | | - await _valid_tokens_scenario(validator) |
84 | | - time.sleep(0.2) |
85 | | - await _valid_tokens_scenario(validator) |
| 77 | + with patch("guardpost.jwks.caching.time") as mock_time: |
| 78 | + mock_time.time.return_value = 0 |
| 79 | + validator = JWTValidator( |
| 80 | + valid_audiences=["a"], |
| 81 | + valid_issuers=["b"], |
| 82 | + keys_provider=default_keys_provider, |
| 83 | + cache_time=10, |
| 84 | + ) |
| 85 | + await _valid_tokens_scenario(validator) |
| 86 | + |
| 87 | + # Simulate cache_time elapsed — keys must be re-fetched |
| 88 | + mock_time.time.return_value = 11 |
| 89 | + await _valid_tokens_scenario(validator) |
86 | 90 |
|
87 | 91 |
|
88 | 92 | @pytest.mark.asyncio |
89 | 93 | async def test_jwt_validator_fetches_tokens_again_for_unknown_kid(): |
90 | 94 | keys = get_test_jwks() |
91 | 95 | # configure a key provider that returns the given JWKS in sequence |
92 | 96 | keys_provider = MockedKeysProvider([JWKS(keys.keys[0:2]), JWKS(keys.keys[2:])]) |
93 | | - validator = JWTValidator( |
94 | | - valid_audiences=["a"], |
95 | | - valid_issuers=["b"], |
96 | | - keys_provider=keys_provider, |
97 | | - cache_time=10, |
98 | | - refresh_time=0.2, |
99 | | - ) |
100 | | - await _valid_token_scenario("0", validator) |
101 | | - await _valid_token_scenario("1", validator) |
102 | 97 |
|
103 | | - # this must fail because tokens were just fetched, and kid "2" is not present |
104 | | - with pytest.raises(InvalidAccessToken): |
| 98 | + with patch("guardpost.jwks.caching.time") as mock_time: |
| 99 | + mock_time.time.return_value = 0 |
| 100 | + validator = JWTValidator( |
| 101 | + valid_audiences=["a"], |
| 102 | + valid_issuers=["b"], |
| 103 | + keys_provider=keys_provider, |
| 104 | + cache_time=10, |
| 105 | + refresh_time=30, |
| 106 | + ) |
| 107 | + await _valid_token_scenario("0", validator) |
| 108 | + await _valid_token_scenario("1", validator) |
| 109 | + |
| 110 | + # this must fail because refresh_time has not elapsed yet (t=1 < 30s) |
| 111 | + mock_time.time.return_value = 1 |
| 112 | + with pytest.raises(InvalidAccessToken): |
| 113 | + await _valid_token_scenario("2", validator) |
| 114 | + |
| 115 | + # simulate refresh_time elapsed — provider should now fetch the new keys |
| 116 | + mock_time.time.return_value = 31 |
105 | 117 | await _valid_token_scenario("2", validator) |
106 | | - |
107 | | - time.sleep(0.3) |
108 | | - # now the JWTValidator should fetch automatically the new keys |
109 | | - await _valid_token_scenario("2", validator) |
110 | | - await _valid_token_scenario("3", validator) |
111 | | - await _valid_token_scenario("4", validator) |
| 118 | + await _valid_token_scenario("3", validator) |
| 119 | + await _valid_token_scenario("4", validator) |
112 | 120 |
|
113 | 121 |
|
114 | 122 | @pytest.mark.asyncio |
|
0 commit comments