-
Notifications
You must be signed in to change notification settings - Fork 138
Expand file tree
/
Copy pathaccount_linking.py
More file actions
172 lines (144 loc) · 6.56 KB
/
account_linking.py
File metadata and controls
172 lines (144 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
An account linking module for the satosa proxy
"""
import json
import logging
import requests
from jwkest.jwk import rsa_load, RSAKey
from jwkest.jws import JWS
from satosa.internal import InternalData
from ..exception import SATOSAAuthenticationError
from ..micro_services.base import ResponseMicroService
from ..response import Redirect
from ..util import join_paths
import satosa.logging_util as lu
logger = logging.getLogger(__name__)
class AccountLinking(ResponseMicroService):
"""
Module for handling account linking and recovery. Uses an external account linking service
"""
def __init__(self, config, *args, **kwargs):
"""
:type config: satosa.satosa_config.SATOSAConfig
:param config: The SATOSA proxy config
"""
super().__init__(*args, **kwargs)
self.api_url = config["api_url"]
self.redirect_url = config["redirect_url"]
self.signing_key = RSAKey(key=rsa_load(config["sign_key"]), use="sig", alg="RS256")
self.endpoint = "/handle_account_linking"
self.id_to_attr = config.get("id_to_attr", None)
logger.info("Account linking is active")
def _handle_al_response(self, context):
"""
Endpoint for handling account linking service response. When getting here
user might have approved or rejected linking their account
:type context: satosa.context.Context
:rtype: satosa.response.Response
:param context: The current context
:return: response
"""
saved_state = context.state[self.name]
internal_response = InternalData.from_dict(saved_state)
#subject_id here is the linked id , not the facebook one, Figure out what to do
status_code, message = self._get_uuid(context, internal_response.auth_info.issuer, internal_response.attributes['issuer_user_id'])
if status_code == 200:
msg = "issuer/id pair is linked in AL service"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
internal_response.subject_id = message
if self.id_to_attr:
internal_response.attributes[self.id_to_attr] = [message]
del context.state[self.name]
return super().process(context, internal_response)
else:
# User selected not to link their accounts, so the internal.response.subject_id is based on the
# issuers id/sub which is fine
msg = "User selected to not link their identity in AL service"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
del context.state[self.name]
return super().process(context, internal_response)
def process(self, context, internal_response):
"""
Manage account linking and recovery
:type context: satosa.context.Context
:type internal_response: satosa.internal.InternalData
:rtype: satosa.response.Response
:param context:
:param internal_response:
:return: response
:
"""
status_code, message = self._get_uuid(context, internal_response.auth_info.issuer, internal_response.subject_id)
data = {
"issuer": internal_response.auth_info.issuer,
"redirect_endpoint": "%s/account_linking%s" % (self.base_url, self.endpoint)
}
# Store the issuer subject_id/sub because we'll need it in handle_al_response
internal_response.attributes['issuer_user_id'] = internal_response.subject_id
if status_code == 200:
msg = "issuer/id pair is linked in AL service"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
internal_response.subject_id = message
data['user_id'] = message
if self.id_to_attr:
internal_response.attributes[self.id_to_attr] = [message]
else:
msg = "issuer/id pair is not linked in AL service. Got a ticket"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.info(logline)
data['ticket'] = message
jws = JWS(json.dumps(data), alg=self.signing_key.alg).sign_compact([self.signing_key])
context.state[self.name] = internal_response.to_dict()
return Redirect("%s/%s" % (self.redirect_url, jws))
def _get_uuid(self, context, issuer, id):
"""
Ask the account linking service for a uuid.
If the given issuer/id pair is not linked, then the function will return a ticket.
This ticket should be used for linking the issuer/id pair to the user account
:type context: satosa.context.Context
:type issuer: str
:type id: str
:rtype: (int, str)
:param context: The current context
:param issuer: the issuer used for authentication
:param id: the given id
:return: response status code and message
(200, uuid) or (404, ticket)
"""
data = {
"idp": issuer,
"id": id,
"redirect_endpoint": "%s/account_linking%s" % (self.base_url, self.endpoint)
}
jws = JWS(json.dumps(data), alg=self.signing_key.alg).sign_compact([self.signing_key])
try:
request = "{}/get_id?jwt={}".format(self.api_url, jws)
response = requests.get(request)
except Exception as con_exc:
msg = "Could not connect to account linking service"
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.critical(logline)
raise SATOSAAuthenticationError(context.state, msg) from con_exc
if response.status_code not in [200, 404]:
msg = "Got status code '{}' from account linking service".format(response.status_code)
logline = lu.LOG_FMT.format(id=lu.get_session_id(context.state), message=msg)
logger.critical(logline)
raise SATOSAAuthenticationError(context.state, msg)
return response.status_code, response.text
def register_endpoints(self):
"""
Register consent module endpoints
:rtype: list[(srt, (satosa.context.Context) -> Any)]
:return: A list of endpoints bound to a function
"""
return [
(
"^{}$".format(
join_paths(self.base_path, "account_linking", self.endpoint)
),
self._handle_al_response,
)
]