-
Notifications
You must be signed in to change notification settings - Fork 138
Expand file tree
/
Copy pathtest_idpy_oidc.py
More file actions
234 lines (209 loc) · 8.96 KB
/
test_idpy_oidc.py
File metadata and controls
234 lines (209 loc) · 8.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import json
import re
import time
from datetime import datetime
from unittest.mock import Mock
from urllib.parse import parse_qsl
from urllib.parse import urlparse
from cryptojwt.key_jar import build_keyjar
from idpyoidc.client.defaults import DEFAULT_KEY_DEFS
from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient
from idpyoidc.message.oidc import AuthorizationResponse
from idpyoidc.message.oidc import IdToken
from oic.oic import AuthorizationRequest
import pytest
import responses
from satosa.backends.idpy_oidc import IdpyOIDCBackend
from satosa.context import Context
from satosa.internal import InternalData
from satosa.response import Response
ISSUER = "https://provider.example.com"
CLIENT_ID = "test_client"
CLIENT_BASE_URL = "https://client.test.com"
NONCE = "the nonce"
class TestIdpyOIDCBackend(object):
@pytest.fixture
def backend_config(self):
return {
"client": {
"base_url": CLIENT_BASE_URL,
"client_id": CLIENT_ID,
"client_type": "oidc",
"client_secret": "ZJYCqe3GGRvdrudKyZS0XhGv_Z45DuKhCUk0gBR1vZk",
"application_type": "web",
"application_name": "SATOSA Test",
"contacts": ["ops@example.com"],
"response_types_supported": ["code"],
"response_type": "code id_token token",
"scope": "openid foo",
"key_conf": {"key_defs": DEFAULT_KEY_DEFS},
"jwks_uri": f"{CLIENT_BASE_URL}/jwks.json",
"provider_info": {
"issuer": ISSUER,
"authorization_endpoint": f"{ISSUER}/authn",
"token_endpoint": f"{ISSUER}/token",
"userinfo_endpoint": f"{ISSUER}/user",
"jwks_uri": f"{ISSUER}/static/jwks"
}
}
}
@pytest.fixture
def internal_attributes(self):
return {
"attributes": {
"givenname": {"openid": ["given_name"]},
"mail": {"openid": ["email"]},
"edupersontargetedid": {"openid": ["sub"]},
"surname": {"openid": ["family_name"]}
}
}
@pytest.fixture(autouse=True)
@responses.activate
def create_backend(self, internal_attributes, backend_config):
base_url = backend_config['client']['base_url']
self.issuer_keys = build_keyjar(DEFAULT_KEY_DEFS)
with responses.RequestsMock() as rsps:
rsps.add(
responses.GET,
backend_config['client']['provider_info']['jwks_uri'],
body=self.issuer_keys.export_jwks_as_json(),
status=200,
content_type="application/json")
self.oidc_backend = IdpyOIDCBackend(Mock(), internal_attributes, backend_config,
base_url, "oidc")
@pytest.fixture
def userinfo(self):
return {
"given_name": "Test",
"family_name": "Devsson",
"email": "test_dev@example.com",
"sub": "username"
}
@pytest.fixture
def id_token(self, userinfo):
issuer_keys = build_keyjar(DEFAULT_KEY_DEFS)
signing_key = issuer_keys.get_signing_key(key_type='RSA')[0]
signing_key.alg = "RS256"
auth_time = int(datetime.utcnow().timestamp())
id_token_claims = {
"auth_time": auth_time,
"iss": ISSUER,
"sub": userinfo["sub"],
"aud": CLIENT_ID,
"nonce": NONCE,
"exp": auth_time + 3600,
"iat": auth_time,
}
id_token = IdToken(**id_token_claims)
return id_token
@pytest.fixture
def all_user_claims(self, userinfo, id_token):
all_user_claims = {**userinfo, **id_token}
return all_user_claims
def test_client(self, backend_config):
assert isinstance(self.oidc_backend.client, StandAloneClient)
# 3 signing keys. One RSA, one EC and one symmetric
assert len(self.oidc_backend.client.context.keyjar.get_signing_key()) == 3
assert self.oidc_backend.client.context.jwks_uri == backend_config['client']['jwks_uri']
def assert_expected_attributes(self, attr_map, user_claims, actual_attributes):
expected_attributes = {
out_attr: [user_claims[in_mapping["openid"][0]]]
for out_attr, in_mapping in attr_map["attributes"].items()
}
assert actual_attributes == expected_attributes
def setup_token_endpoint(self, userinfo):
_client = self.oidc_backend.client
signing_key = self.issuer_keys.get_signing_key(key_type='RSA')[0]
signing_key.alg = "RS256"
id_token_claims = {
"iss": ISSUER,
"sub": userinfo["sub"],
"aud": CLIENT_ID,
"nonce": NONCE,
"exp": time.time() + 3600,
"iat": time.time()
}
id_token = IdToken(**id_token_claims).to_jwt([signing_key], algorithm=signing_key.alg)
token_response = {
"access_token": "SlAV32hkKG",
"token_type": "Bearer",
"refresh_token": "8xLOxBtZp8",
"expires_in": 3600,
"id_token": id_token
}
responses.add(responses.POST,
_client.context.provider_info['token_endpoint'],
body=json.dumps(token_response),
status=200,
content_type="application/json")
def setup_userinfo_endpoint(self, userinfo):
responses.add(responses.GET,
self.oidc_backend.client.context.provider_info['userinfo_endpoint'],
body=json.dumps(userinfo),
status=200,
content_type="application/json")
@pytest.fixture
def incoming_authn_response(self):
_context = self.oidc_backend.client.context
oidc_state = "my state"
_uri = _context.claims.get_usage("redirect_uris")[0]
_request = AuthorizationRequest(
redirect_uri=_uri,
response_type="code",
client_id=_context.get_client_id(),
scope=_context.claims.get_usage("scope"),
nonce=NONCE
)
_context.cstate.set(oidc_state, {"iss": _context.issuer})
_context.cstate.bind_key(NONCE, oidc_state)
_context.cstate.update(oidc_state, _request)
response = AuthorizationResponse(
code="F+R4uWbN46U+Bq9moQPC4lEvRd2De4o=",
state=oidc_state,
iss=_context.issuer,
nonce=NONCE
)
return response.to_dict()
def test_register_endpoints(self):
_uri = self.oidc_backend.client.context.claims.get_usage("redirect_uris")[0]
redirect_uri_path = urlparse(_uri).path.lstrip('/')
url_map = self.oidc_backend.register_endpoints()
regex, callback = url_map[0]
assert re.search(regex, redirect_uri_path)
assert callback == self.oidc_backend.response_endpoint
def test_translate_response_to_internal_response(self, all_user_claims):
internal_response = self.oidc_backend._translate_response(all_user_claims, ISSUER)
assert internal_response.subject_id == all_user_claims["sub"]
self.assert_expected_attributes(
self.oidc_backend.internal_attributes,
all_user_claims,
internal_response.attributes,
)
@responses.activate
def test_response_endpoint(self, context, all_user_claims, incoming_authn_response):
self.setup_token_endpoint(all_user_claims)
self.setup_userinfo_endpoint(all_user_claims)
response_context = Context()
response_context.request = incoming_authn_response
response_context.state = context.state
self.oidc_backend.response_endpoint(response_context)
args = self.oidc_backend.auth_callback_func.call_args[0]
assert isinstance(args[0], Context)
assert isinstance(args[1], InternalData)
self.assert_expected_attributes(
self.oidc_backend.internal_attributes, all_user_claims, args[1].attributes
)
def test_start_auth_redirects_to_provider_authorization_endpoint(self, context):
_client = self.oidc_backend.client
auth_response = self.oidc_backend.start_auth(context, None)
assert isinstance(auth_response, Response)
login_url = auth_response.message
parsed = urlparse(login_url)
assert login_url.startswith(_client.context.provider_info["authorization_endpoint"])
auth_params = dict(parse_qsl(parsed.query))
assert auth_params["scope"] == " ".join(_client.context.claims.get_usage("scope"))
assert auth_params["response_type"] == _client.context.claims.get_usage("response_types")[0]
assert auth_params["client_id"] == _client.client_id
assert auth_params["redirect_uri"] == _client.context.claims.get_usage("redirect_uris")[0]
assert "state" in auth_params
assert "nonce" in auth_params