-
Notifications
You must be signed in to change notification settings - Fork 138
Expand file tree
/
Copy pathidpy_oidc.py
More file actions
150 lines (131 loc) · 6.03 KB
/
idpy_oidc.py
File metadata and controls
150 lines (131 loc) · 6.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
OIDC/OAuth2 backend module.
"""
import logging
from datetime import datetime
from urllib.parse import urlparse
from idpyoidc.client.oauth2.stand_alone_client import StandAloneClient
from idpyoidc.server.user_authn.authn_context import UNSPECIFIED
import satosa.logging_util as lu
from satosa.backends.base import BackendModule
from satosa.internal import AuthenticationInformation
from satosa.internal import InternalData
from ..exception import SATOSAAuthenticationError
from ..exception import SATOSAError
from ..response import Redirect
UTC = datetime.timezone.utc
logger = logging.getLogger(__name__)
class IdpyOIDCBackend(BackendModule):
"""
Backend module for OIDC and OAuth 2.0, can be directly used.
"""
def __init__(self, auth_callback_func, internal_attributes, config, base_url, name):
"""
OIDC backend module.
:param auth_callback_func: Callback should be called by the module after the authorization
in the backend is done.
:param internal_attributes: Mapping dictionary between SATOSA internal attribute names and
the names returned by underlying IdP's/OP's as well as what attributes the calling SP's and
RP's expects namevice.
:param config: Configuration parameters for the module.
:param base_url: base url of the service
:param name: name of the plugin
:type auth_callback_func:
(satosa.context.Context, satosa.internal.InternalData) -> satosa.response.Response
:type internal_attributes: dict[string, dict[str, str | list[str]]]
:type config: dict[str, dict[str, str] | list[str]]
:type base_url: str
:type name: str
"""
super().__init__(auth_callback_func, internal_attributes, base_url, name)
# self.auth_callback_func = auth_callback_func
# self.config = config
self.client = create_client(config["client"])
def start_auth(self, context, internal_request):
"""
See super class method satosa.backends.base#start_auth
:type context: satosa.context.Context
:type internal_request: satosa.internal.InternalData
:rtype satosa.response.Redirect
"""
login_url = self.client.init_authorization()
return Redirect(login_url)
def register_endpoints(self):
"""
Creates a list of all the endpoints this backend module needs to listen to. In this case
it's the authentication response from the underlying OP that is redirected from the OP to
the proxy.
:rtype: Sequence[(str, Callable[[satosa.context.Context], satosa.response.Response]]
:return: A list that can be used to map the request to SATOSA to this endpoint.
"""
url_map = []
redirect_path = self.client.context.claims.get_usage('redirect_uris')
if not redirect_path:
raise SATOSAError("Missing path in redirect uri")
redirect_path = urlparse(redirect_path[0]).path
url_map.append(("^%s$" % redirect_path.lstrip("/"), self.response_endpoint))
return url_map
def response_endpoint(self, context, *args):
"""
Handles the authentication response from the OP.
:type context: satosa.context.Context
:type args: Any
:rtype: satosa.response.Response
:param context: SATOSA context
:param args: None
:return:
"""
_info = self.client.finalize(context.request)
self._check_error_response(_info, context)
userinfo = _info.get('userinfo')
id_token = _info.get('id_token')
if not id_token and not userinfo:
msg = "No id_token or userinfo, nothing to do.."
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.error(logline)
raise SATOSAAuthenticationError(context.state, "No user info available.")
all_user_claims = dict(list(userinfo.items()) + list(id_token.items()))
msg = "UserInfo: {}".format(all_user_claims)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
internal_resp = self._translate_response(all_user_claims, _info["issuer"])
return self.auth_callback_func(context, internal_resp)
def _translate_response(self, response, issuer):
"""
Translates oidc response to SATOSA internal response.
:type response: dict[str, str]
:type issuer: str
:type subject_type: str
:rtype: InternalData
:param response: Dictioary with attribute name as key.
:param issuer: The oidc op that gave the repsonse.
:param subject_type: public or pairwise according to oidc standard.
:return: A SATOSA internal response.
"""
auth_info = AuthenticationInformation(UNSPECIFIED, str(datetime.now()), issuer)
internal_resp = InternalData(auth_info=auth_info)
internal_resp.attributes = self.converter.to_internal("openid", response)
internal_resp.subject_id = response["sub"]
return internal_resp
def _check_error_response(self, response, context):
"""
Check if the response is an error response.
:param response: the response from finalize()
:type response: oic.oic.message
:raise SATOSAAuthenticationError: if the response is an OAuth error response
"""
if "error" in response:
msg = "{name} error: {error} {description}".format(
name=type(response).__name__,
error=response["error"],
description=response.get("error_description", ""),
)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.debug(logline)
raise SATOSAAuthenticationError(context.state, "Access denied")
def create_client(config: dict):
_client_type = config.get('client_type') or "oidc"
_client = StandAloneClient(config=config, client_type=_client_type)
_client.do_provider_info()
_client.do_client_registration()
return _client