|
34 | 34 | with open(CLIENT_SECRETS_FILE, "r") as fh: |
35 | 35 | CLIENT_SECRETS_INFO = json.load(fh) |
36 | 36 |
|
| 37 | +VALID_PKCE_VERIFIER_REGEX = r"^[A-Za-z0-9-._~]{128}$" |
| 38 | +VALID_CODE_CHALLENGE_REGEX = r"^[A-Za-z0-9-_]{43}$" |
| 39 | + |
37 | 40 |
|
38 | 41 | class TestFlow(object): |
39 | 42 | def test_from_client_secrets_file(self): |
@@ -114,10 +117,14 @@ def test_authorization_url(self, instance): |
114 | 117 |
|
115 | 118 | assert CLIENT_SECRETS_INFO["web"]["auth_uri"] in url |
116 | 119 | assert scope in url |
| 120 | + assert "code_challenge=" in url |
| 121 | + assert "code_challenge_method=S256" in url |
117 | 122 | authorization_url_spy.assert_called_with( |
118 | 123 | CLIENT_SECRETS_INFO["web"]["auth_uri"], |
119 | 124 | access_type="offline", |
120 | 125 | prompt="consent", |
| 126 | + code_challenge=mock.ANY, |
| 127 | + code_challenge_method="S256", |
121 | 128 | ) |
122 | 129 |
|
123 | 130 | def test_authorization_url_code_verifier(self, instance): |
@@ -183,10 +190,8 @@ def test_authorization_url_generated_verifier(self): |
183 | 190 | assert kwargs["code_challenge_method"] == "S256" |
184 | 191 | assert len(instance.code_verifier) == 128 |
185 | 192 | assert len(kwargs["code_challenge"]) == 43 |
186 | | - valid_verifier = r"^[A-Za-z0-9-._~]*$" |
187 | | - valid_challenge = r"^[A-Za-z0-9-_]*$" |
188 | | - assert re.match(valid_verifier, instance.code_verifier) |
189 | | - assert re.match(valid_challenge, kwargs["code_challenge"]) |
| 193 | + assert re.fullmatch(VALID_PKCE_VERIFIER_REGEX, instance.code_verifier) |
| 194 | + assert re.fullmatch(VALID_CODE_CHALLENGE_REGEX, kwargs["code_challenge"]) |
190 | 195 |
|
191 | 196 | def test_fetch_token(self, instance): |
192 | 197 | instance.code_verifier = "amanaplanacanalpanama" |
@@ -307,13 +312,13 @@ def test_run_local_server(self, webbrowser_mock, instance, mock_fetch_token, por |
307 | 312 | assert credentials.id_token == mock.sentinel.id_token |
308 | 313 | assert webbrowser_mock.get().open.called |
309 | 314 | assert instance.redirect_uri == f"http://localhost:{port}/" |
310 | | - |
| 315 | + assert re.fullmatch(VALID_PKCE_VERIFIER_REGEX, instance.code_verifier) |
311 | 316 | expected_auth_response = auth_redirect_url.replace("http", "https") |
312 | 317 | mock_fetch_token.assert_called_with( |
313 | 318 | CLIENT_SECRETS_INFO["web"]["token_uri"], |
314 | 319 | client_secret=CLIENT_SECRETS_INFO["web"]["client_secret"], |
315 | 320 | authorization_response=expected_auth_response, |
316 | | - code_verifier=None, |
| 321 | + code_verifier=mock.ANY, |
317 | 322 | audience=None, |
318 | 323 | ) |
319 | 324 |
|
@@ -352,7 +357,7 @@ def test_run_local_server_audience( |
352 | 357 | CLIENT_SECRETS_INFO["web"]["token_uri"], |
353 | 358 | client_secret=CLIENT_SECRETS_INFO["web"]["client_secret"], |
354 | 359 | authorization_response=expected_auth_response, |
355 | | - code_verifier=None, |
| 360 | + code_verifier=mock.ANY, |
356 | 361 | audience=self.AUDIENCE, |
357 | 362 | ) |
358 | 363 |
|
|
0 commit comments