diff --git a/src/mcp/client/auth/oauth2.py b/src/mcp/client/auth/oauth2.py index 25075dec3..1a45dae76 100644 --- a/src/mcp/client/auth/oauth2.py +++ b/src/mcp/client/auth/oauth2.py @@ -572,11 +572,14 @@ async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx. logger.debug(f"OAuth metadata discovery failed: {url}") # Step 3: Apply scope selection strategy - self.context.client_metadata.scope = get_client_metadata_scopes( - extract_scope_from_www_auth(response), - self.context.protected_resource_metadata, - self.context.oauth_metadata, - ) + # Respect explicitly-set scopes; only auto-select + # when the caller hasn't specified any. + if self.context.client_metadata.scope is None: + self.context.client_metadata.scope = get_client_metadata_scopes( + extract_scope_from_www_auth(response), + self.context.protected_resource_metadata, + self.context.oauth_metadata, + ) # Step 4: Register client or use URL-based client ID (CIMD) if not self.context.client_info: diff --git a/tests/client/test_auth.py b/tests/client/test_auth.py index 5aa985e36..3fbd2b113 100644 --- a/tests/client/test_auth.py +++ b/tests/client/test_auth.py @@ -1167,6 +1167,141 @@ async def test_auth_flow_with_no_tokens(self, oauth_provider: OAuthClientProvide assert oauth_provider.context.current_tokens.access_token == "new_access_token" assert oauth_provider.context.token_expiry_time is not None + @pytest.mark.anyio + async def test_auth_flow_preserves_explicit_scopes( + self, oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage + ): + """Test that explicitly-set client_metadata.scope is not overwritten during discovery.""" + oauth_provider.context.current_tokens = None + oauth_provider.context.token_expiry_time = None + oauth_provider._initialized = True + + # The fixture sets scope="read write" — verify it is preserved + assert oauth_provider.context.client_metadata.scope == "read write" + + test_request = httpx.Request("GET", "https://api.example.com/mcp") + auth_flow = oauth_provider.async_auth_flow(test_request) + + # First request — no auth header + await auth_flow.__anext__() + + # 401 triggers OAuth discovery + response = httpx.Response( + 401, + headers={ + "WWW-Authenticate": ( + 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource",' + ' scope="server:scope1 server:scope2"' + ) + }, + request=test_request, + ) + + # PRM discovery + prm_request = await auth_flow.asend(response) + prm_response = httpx.Response( + 200, + content=( + b'{"resource": "https://api.example.com/v1/mcp",' + b' "authorization_servers": ["https://auth.example.com"],' + b' "scopes_supported": ["server:scope1", "server:scope2"]}' + ), + request=prm_request, + ) + + # OAuth metadata discovery + oauth_request = await auth_flow.asend(prm_response) + oauth_response = httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com",' + b' "authorization_endpoint": "https://auth.example.com/authorize",' + b' "token_endpoint": "https://auth.example.com/token",' + b' "registration_endpoint": "https://auth.example.com/register"}' + ), + request=oauth_request, + ) + + # After scope selection (Step 3), the explicit scope must be preserved + await auth_flow.asend(oauth_response) + assert oauth_provider.context.client_metadata.scope == "read write" + + # Clean up the generator + await auth_flow.aclose() + + @pytest.mark.anyio + async def test_auth_flow_auto_selects_scopes_when_none(self, mock_storage: MockTokenStorage): + """Test that scope auto-selection works when no explicit scope is set.""" + + async def redirect_handler(url: str) -> None: + pass # pragma: no cover + + async def callback_handler() -> tuple[str, str | None]: + return "test_auth_code", "test_state" # pragma: no cover + + client_metadata = OAuthClientMetadata( + client_name="Test Client", + client_uri=AnyHttpUrl("https://example.com"), + redirect_uris=[AnyUrl("http://localhost:3030/callback")], + scope=None, + ) + provider = OAuthClientProvider( + server_url="https://api.example.com/v1/mcp", + client_metadata=client_metadata, + storage=mock_storage, + redirect_handler=redirect_handler, + callback_handler=callback_handler, + ) + provider.context.current_tokens = None + provider.context.token_expiry_time = None + provider._initialized = True + + test_request = httpx.Request("GET", "https://api.example.com/mcp") + auth_flow = provider.async_auth_flow(test_request) + + await auth_flow.__anext__() + + response = httpx.Response( + 401, + headers={ + "WWW-Authenticate": ( + 'Bearer resource_metadata="https://api.example.com/.well-known/oauth-protected-resource",' + ' scope="server:scope1 server:scope2"' + ) + }, + request=test_request, + ) + + prm_request = await auth_flow.asend(response) + prm_response = httpx.Response( + 200, + content=( + b'{"resource": "https://api.example.com/v1/mcp",' + b' "authorization_servers": ["https://auth.example.com"],' + b' "scopes_supported": ["server:scope1", "server:scope2"]}' + ), + request=prm_request, + ) + + oauth_request = await auth_flow.asend(prm_response) + oauth_response = httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com",' + b' "authorization_endpoint": "https://auth.example.com/authorize",' + b' "token_endpoint": "https://auth.example.com/token",' + b' "registration_endpoint": "https://auth.example.com/register"}' + ), + request=oauth_request, + ) + + await auth_flow.asend(oauth_response) + # Scope should have been auto-selected from the server metadata + assert provider.context.client_metadata.scope is not None + assert provider.context.client_metadata.scope == "server:scope1 server:scope2" + + await auth_flow.aclose() + @pytest.mark.anyio async def test_auth_flow_no_unnecessary_retry_after_oauth( self, oauth_provider: OAuthClientProvider, mock_storage: MockTokenStorage, valid_tokens: OAuthToken