-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathopenid_connect.py
More file actions
333 lines (278 loc) · 13 KB
/
openid_connect.py
File metadata and controls
333 lines (278 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
from keycloak.mixins import WellKnownMixin
try:
from urllib.parse import urlencode # noqa: F041
except ImportError:
from urllib import urlencode # noqa: F041
from jose import jwt, ExpiredSignatureError
PATH_WELL_KNOWN = "auth/realms/{}/.well-known/openid-configuration"
class KeycloakOpenidConnect(WellKnownMixin):
_well_known = None
_client_id = None
_client_secret = None
_realm = None
def __init__(self, realm, client_id, client_secret):
"""
:param keycloak.realm.KeycloakRealm realm:
:param str client_id:
:param str client_secret:
"""
self._client_id = client_id
self._client_secret = client_secret
self._realm = realm
def get_path_well_known(self):
return PATH_WELL_KNOWN
def get_url(self, name):
return self.well_known[name]
def decode_token(self, token, key, algorithms=None, **kwargs):
"""
A JSON Web Key (JWK) is a JavaScript Object Notation (JSON) data
structure that represents a cryptographic key. This specification
also defines a JWK Set JSON data structure that represents a set of
JWKs. Cryptographic algorithms and identifiers for use with this
specification are described in the separate JSON Web Algorithms (JWA)
specification and IANA registries established by that specification.
https://tools.ietf.org/html/rfc7517
:param str token: A signed JWS to be verified.
:param str key: A key to attempt to verify the payload with.
:param str,list algorithms: (optional) Valid algorithms that should be
used to verify the JWS. Defaults to `['RS256']`
:param str audience: (optional) The intended audience of the token. If
the "aud" claim is included in the claim set, then the audience
must be included and must equal the provided claim.
:param str,iterable issuer: (optional) Acceptable value(s) for the
issuer of the token. If the "iss" claim is included in the claim
set, then the issuer must be given and the claim in the token must
be among the acceptable values.
:param str subject: (optional) The subject of the token. If the "sub"
claim is included in the claim set, then the subject must be
included and must equal the provided claim.
:param str access_token: (optional) An access token returned alongside
the id_token during the authorization grant flow. If the "at_hash"
claim is included in the claim set, then the access_token must be
included, and it must match the "at_hash" claim.
:param dict options: (optional) A dictionary of options for skipping
validation steps.
defaults:
.. code-block:: python
{
'verify_signature': True,
'verify_aud': True,
'verify_iat': True,
'verify_exp': True,
'verify_nbf': True,
'verify_iss': True,
'verify_sub': True,
'verify_jti': True,
'leeway': 0,
}
:return: The dict representation of the claims set, assuming the
signature is valid and all requested data validation passes.
:rtype: dict
:raises jose.exceptions.JWTError: If the signature is invalid in any
way.
:raises jose.exceptions.ExpiredSignatureError: If the signature has
expired.
:raises jose.exceptions.JWTClaimsError: If any claim is invalid in any
way.
"""
return jwt.decode(
token, key,
audience=kwargs.pop('audience', None) or self._client_id,
algorithms=algorithms or ['RS256'], **kwargs
)
def logout(self, refresh_token):
"""
The logout endpoint logs out the authenticated user.
:param str refresh_token:
"""
return self._realm.client.post(self.get_url('end_session_endpoint'),
data={
'refresh_token': refresh_token,
'client_id': self._client_id,
'client_secret': self._client_secret
})
def certs(self):
"""
The certificate endpoint returns the public keys enabled by the realm,
encoded as a JSON Web Key (JWK). Depending on the realm settings there
can be one or more keys enabled for verifying tokens.
https://tools.ietf.org/html/rfc7517
:rtype: dict
"""
return self._realm.client.get(self.get_url('jwks_uri'))
def userinfo(self, token):
"""
The UserInfo Endpoint is an OAuth 2.0 Protected Resource that returns
Claims about the authenticated End-User. To obtain the requested Claims
about the End-User, the Client makes a request to the UserInfo Endpoint
using an Access Token obtained through OpenID Connect Authentication.
These Claims are normally represented by a JSON object that contains a
collection of name and value pairs for the Claims.
http://openid.net/specs/openid-connect-core-1_0.html#UserInfo
:param str token:
:rtype: dict
"""
url = self.well_known['userinfo_endpoint']
return self._realm.client.get(url, headers={
"Authorization": "Bearer {}".format(
token
)
})
def uma_ticket(self, token, **kwargs):
"""
:param str audience: (optional) Client ID to get te permissions for.
:rtype: dict
"""
payload = {"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket"}
payload.update(**kwargs)
return self._realm.client.post(
self.get_url("token_endpoint"),
payload,
headers={"Authorization": "Bearer {}".format(token)}
)
def authorization_url(self, **kwargs):
"""
Get authorization URL to redirect the resource owner to.
https://tools.ietf.org/html/rfc6749#section-4.1.1
:param str redirect_uri: (optional) Absolute URL of the client where
the user-agent will be redirected to.
:param str scope: (optional) Space delimited list of strings.
:param str state: (optional) An opaque value used by the client to
maintain state between the request and callback
:return: URL to redirect the resource owner to
:rtype: str
"""
payload = {'response_type': 'code', 'client_id': self._client_id}
for key in kwargs.keys():
# Add items in a sorted way for unittest purposes.
payload[key] = kwargs[key]
payload = sorted(payload.items(), key=lambda val: val[0])
params = urlencode(payload)
url = self.get_url('authorization_endpoint')
return '{}?{}'.format(url, params)
def authorization_code(self, code, redirect_uri):
"""
Retrieve access token by `authorization_code` grant.
https://tools.ietf.org/html/rfc6749#section-4.1.3
:param str code: The authorization code received from the authorization
server.
:param str redirect_uri: the identical value of the "redirect_uri"
parameter in the authorization request.
:rtype: dict
:return: Access token response
"""
token = self._token_request(grant_type='authorization_code', code=code,
redirect_uri=redirect_uri)
return Token(token, self)
def password_credentials(self, username, password, **kwargs):
"""
Retrieve access token by 'password credentials' grant.
https://tools.ietf.org/html/rfc6749#section-4.3
:param str username: The user name to obtain an access token for
:param str password: The user's password
:rtype: dict
:return: Access token response
"""
token = self._token_request(grant_type='password',
username=username, password=password,
**kwargs)
return Token(token, self)
def client_credentials(self, **kwargs):
"""
Retrieve access token by `client_credentials` grant.
https://tools.ietf.org/html/rfc6749#section-4.4
:param str scope: (optional) Space delimited list of strings.
:rtype: dict
:return: Access token response
"""
token = self._token_request(grant_type='client_credentials', **kwargs)
return Token(token, self)
def refresh_token(self, refresh_token, **kwargs):
"""
Refresh an access token
https://tools.ietf.org/html/rfc6749#section-6
:param str refresh_token:
:param str scope: (optional) Space delimited list of strings.
:rtype: dict
:return: Access token response
"""
return self._token_request(grant_type='refresh_token',
refresh_token=refresh_token, **kwargs)
def token_exchange(self, **kwargs):
"""
Token exchange is the process of using a set of credentials or token to
obtain an entirely different token.
http://www.keycloak.org/docs/latest/securing_apps/index.html
#_token-exchange
https://www.ietf.org/id/draft-ietf-oauth-token-exchange-12.txt
:param subject_token: A security token that represents the identity of
the party on behalf of whom the request is being made. It is
required if you are exchanging an existing token for a new one.
:param subject_issuer: Identifies the issuer of the subject_token. It
can be left blank if the token comes from the current realm or if
the issuer can be determined from the subject_token_type. Otherwise
it is required to be specified. Valid values are the alias of an
Identity Provider configured for your realm. Or an issuer claim
identifier configured by a specific Identity Provider.
:param subject_token_type: This parameter is the type of the token
passed with the subject_token parameter. This defaults to
urn:ietf:params:oauth:token-type:access_token if the subject_token
comes from the realm and is an access token. If it is an external
token, this parameter may or may not have to be specified depending
on the requirements of the subject_issuer.
:param requested_token_type: This parameter represents the type of
token the client wants to exchange for. Currently only oauth and
OpenID Connect token types are supported. The default value for
this depends on whether the is
urn:ietf:params:oauth:token-type:refresh_token in which case you
will be returned both an access token and refresh token within the
response. Other appropriate values are
urn:ietf:params:oauth:token-type:access_token and
urn:ietf:params:oauth:token-type:id_token
:param audience: This parameter specifies the target client you want
the new token minted for.
:param requested_issuer: This parameter specifies that the client wants
a token minted by an external provider. It must be the alias of an
Identity Provider configured within the realm.
:param requested_subject: This specifies a username or user id if your
client wants to impersonate a different user.
:rtype: dict
:return: access_token, refresh_token and expires_in
"""
return self._token_request(
grant_type='urn:ietf:params:oauth:grant-type:token-exchange',
**kwargs
)
def _token_request(self, grant_type, **kwargs):
"""
Do the actual call to the token end-point.
:param grant_type:
:param kwargs: See invoking methods.
:return:
"""
payload = {
'grant_type': grant_type,
'client_id': self._client_id,
'client_secret': self._client_secret
}
payload.update(**kwargs)
return self._realm.client.post(self.get_url('token_endpoint'),
data=payload)
class Token:
def __init__(self, token, oidc: KeycloakOpenidConnect) -> None:
self.oidc = oidc
self.key = self.oidc.certs()['keys'][0]
self.token = token
def __getattr__(self, attr):
return self.token[attr]
def __call__(self):
if self.is_expired():
print("Token expired, trying a new one")
self.token = self.oidc.refresh_token(self.token['refresh_token'])
return self.token["access_token"]
def is_expired(self):
try:
self.oidc.decode_token(self.token['access_token'], self.key)
return False
except ExpiredSignatureError:
return True