Skip to content

Commit c030106

Browse files
add generic wsgi middleware and tests
1 parent 0ac368b commit c030106

3 files changed

Lines changed: 256 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
from .asgi import MAuthASGIMiddleware
2+
from .wsgi import MAuthWSGIMiddleware

mauth_client/middlewares/wsgi.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import json
2+
import logging
3+
4+
from mauth_client.authenticator import LocalAuthenticator
5+
from mauth_client.config import Config
6+
from mauth_client.consts import (
7+
ENV_APP_UUID,
8+
ENV_AUTHENTIC,
9+
ENV_PROTOCOL_VERSION,
10+
)
11+
12+
from mauth_client.signable import RequestSignable
13+
from mauth_client.signed import Signed
14+
15+
logger = logging.getLogger("mauth_wsgi")
16+
17+
18+
class MAuthWSGIMiddleware:
19+
def __init__(self, app, exempt=None):
20+
self._validate_configs()
21+
self.app = app
22+
self.exempt = exempt.copy() if exempt else set()
23+
24+
def __call__(self, environ, start_response):
25+
req = environ["werkzeug.request"]
26+
27+
if req.path in self.exempt:
28+
return self.app(environ, start_response)
29+
30+
signable = RequestSignable(
31+
method=req.method,
32+
url=req.url,
33+
body=self._read_body(environ),
34+
)
35+
signed = Signed.from_headers(dict(req.headers))
36+
authenticator = LocalAuthenticator(signable, signed, logger)
37+
is_authentic, status, message = authenticator.is_authentic()
38+
39+
if is_authentic:
40+
environ[ENV_APP_UUID] = signed.app_uuid
41+
environ[ENV_AUTHENTIC] = True
42+
environ[ENV_PROTOCOL_VERSION] = signed.protocol_version()
43+
return self.app(environ, start_response)
44+
45+
start_response(status, [("content-type", "application/json")])
46+
body = {"errors": {"mauth": [message]}}
47+
return [json.dumps(body).encode("utf-8")]
48+
49+
def _validate_configs(self):
50+
# Validate the client settings (APP_UUID, PRIVATE_KEY)
51+
if not all([Config.APP_UUID, Config.PRIVATE_KEY]):
52+
raise TypeError("MAuthWSGIMiddleware requires APP_UUID and PRIVATE_KEY")
53+
# Validate the mauth settings (MAUTH_BASE_URL, MAUTH_API_VERSION)
54+
if not all([Config.MAUTH_URL, Config.MAUTH_API_VERSION]):
55+
raise TypeError("MAuthWSGIMiddleware requires MAUTH_URL and MAUTH_API_VERSION")
56+
57+
def _read_body(self, environ):
58+
input = environ["wsgi.input"]
59+
input.seek(0)
60+
body = input.read()
61+
input.seek(0)
62+
return body

tests/middlewares/wsgi_test.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import json
2+
import unittest
3+
from unittest.mock import patch
4+
5+
from flask import Flask, request, jsonify
6+
from uuid import uuid4
7+
8+
from mauth_client.authenticator import LocalAuthenticator
9+
from mauth_client.config import Config
10+
from mauth_client.consts import (
11+
AUTH_HEADER_DELIMITER,
12+
X_MWS_AUTH,
13+
MWS_TOKEN,
14+
MCC_AUTH,
15+
MWSV2_TOKEN,
16+
ENV_APP_UUID,
17+
ENV_AUTHENTIC,
18+
ENV_PROTOCOL_VERSION,
19+
)
20+
from mauth_client.middlewares import MAuthWSGIMiddleware
21+
22+
23+
class TestMAuthWSGIMiddlewareInitialization(unittest.TestCase):
24+
def setUp(self):
25+
self.app = Flask("Test App")
26+
Config.APP_UUID = "2f746447-c212-483c-9eec-d9b0216f7613"
27+
Config.MAUTH_URL = "https://mauth.com"
28+
Config.MAUTH_API_VERSION = "v1"
29+
Config.PRIVATE_KEY = "key"
30+
31+
def test_app_configuration(self):
32+
try:
33+
self.app.wsgi_app = MAuthWSGIMiddleware(self.app)
34+
except TypeError:
35+
self.fail("Shouldn't raise exception")
36+
37+
def test_app_configuration_missing_uuid(self):
38+
Config.APP_UUID = None
39+
with self.assertRaises(TypeError) as exc:
40+
self.app.wsgi_app = MAuthWSGIMiddleware(self.app)
41+
self.assertEqual(
42+
str(exc.exception),
43+
"MAuthWSGIMiddleware requires APP_UUID and PRIVATE_KEY"
44+
)
45+
46+
def test_app_configuration_missing_key(self):
47+
Config.PRIVATE_KEY = None
48+
with self.assertRaises(TypeError) as exc:
49+
self.app.wsgi_app = MAuthWSGIMiddleware(self.app)
50+
self.assertEqual(
51+
str(exc.exception),
52+
"MAuthWSGIMiddleware requires APP_UUID and PRIVATE_KEY"
53+
)
54+
55+
def test_app_configuration_missing_url(self):
56+
Config.MAUTH_URL = None
57+
with self.assertRaises(TypeError) as exc:
58+
self.app.wsgi_app = MAuthWSGIMiddleware(self.app)
59+
self.assertEqual(
60+
str(exc.exception),
61+
"MAuthWSGIMiddleware requires MAUTH_URL and MAUTH_API_VERSION"
62+
)
63+
64+
def test_app_configuration_missing_version(self):
65+
Config.MAUTH_API_VERSION = None
66+
with self.assertRaises(TypeError) as exc:
67+
self.app.wsgi_app = MAuthWSGIMiddleware(self.app)
68+
self.assertEqual(
69+
str(exc.exception),
70+
"MAuthWSGIMiddleware requires MAUTH_URL and MAUTH_API_VERSION"
71+
)
72+
73+
74+
class TestMAuthWSGIMiddlewareFunctionality(unittest.TestCase):
75+
def setUp(self):
76+
self.app_uuid = str(uuid4())
77+
Config.APP_UUID = self.app_uuid
78+
Config.MAUTH_URL = "https://mauth.com"
79+
Config.MAUTH_API_VERSION = "v1"
80+
Config.PRIVATE_KEY = "key"
81+
82+
self.app = Flask("Test App")
83+
self.app.wsgi_app = MAuthWSGIMiddleware(
84+
self.app.wsgi_app,
85+
exempt={"/app_status"},
86+
)
87+
88+
@self.app.get("/")
89+
def root():
90+
return "authenticated!"
91+
92+
@self.app.get("/app_status")
93+
def app_status():
94+
return "open"
95+
96+
self.client = self.app.test_client()
97+
98+
def test_401_response_when_not_authenticated(self):
99+
response = self.client.get("/")
100+
101+
self.assertEqual(response.status_code, 401)
102+
self.assertEqual(response.json, {
103+
"errors": {
104+
"mauth": [(
105+
"Authentication Failed. No mAuth signature present; "
106+
"X-MWS-Authentication header is blank, "
107+
"MCC-Authentication header is blank."
108+
)]
109+
}
110+
})
111+
112+
def test_ok_when_calling_open_route(self):
113+
response = self.client.get("/app_status")
114+
115+
self.assertEqual(response.status_code, 200)
116+
self.assertEqual(response.get_data(as_text=True), "open")
117+
118+
@patch.object(LocalAuthenticator, "is_authentic")
119+
def test_ok_when_authenticated(self, is_authentic_mock):
120+
is_authentic_mock.return_value = (True, 200, "")
121+
122+
response = self.client.get("/")
123+
124+
self.assertEqual(response.status_code, 200)
125+
self.assertEqual(response.get_data(as_text=True), "authenticated!")
126+
127+
@patch.object(LocalAuthenticator, "is_authentic")
128+
def test_adds_values_to_context_v1(self, is_authentic_mock):
129+
is_authentic_mock.return_value = (True, 200, "")
130+
131+
headers_v1 = {
132+
X_MWS_AUTH: f"{MWS_TOKEN} {self.app_uuid}:blah"
133+
}
134+
135+
@self.app.get("/v1_test")
136+
def v1_test():
137+
return jsonify({
138+
"app_uuid": request.environ[ENV_APP_UUID],
139+
"authentic": request.environ[ENV_AUTHENTIC],
140+
"protocol": request.environ[ENV_PROTOCOL_VERSION],
141+
})
142+
143+
response = self.client.get("/v1_test", headers=headers_v1)
144+
145+
self.assertEqual(response.status_code, 200)
146+
self.assertEqual(response.json, {
147+
"app_uuid": self.app_uuid,
148+
"authentic": True,
149+
"protocol": 1,
150+
})
151+
152+
@patch.object(LocalAuthenticator, "is_authentic")
153+
def test_adds_values_to_context_v2(self, is_authentic_mock):
154+
is_authentic_mock.return_value = (True, 200, "")
155+
156+
headers_v2 = {
157+
MCC_AUTH: f"{MWSV2_TOKEN} {self.app_uuid}:blah{AUTH_HEADER_DELIMITER}"
158+
}
159+
160+
@self.app.get("/v2_test")
161+
def v2_test():
162+
return jsonify({
163+
"app_uuid": request.environ[ENV_APP_UUID],
164+
"authentic": request.environ[ENV_AUTHENTIC],
165+
"protocol": request.environ[ENV_PROTOCOL_VERSION],
166+
})
167+
168+
response = self.client.get("/v2_test", headers=headers_v2)
169+
170+
self.assertEqual(response.status_code, 200)
171+
self.assertEqual(response.json, {
172+
"app_uuid": self.app_uuid,
173+
"authentic": True,
174+
"protocol": 2,
175+
})
176+
177+
@patch.object(LocalAuthenticator, "is_authentic")
178+
def test_downstream_can_receive_body(self, is_authentic_mock):
179+
is_authentic_mock.return_value = (True, 200, "")
180+
body = {"msg": "helloes"}
181+
182+
@self.app.post("/post_test")
183+
def post_test():
184+
return jsonify(request.json)
185+
186+
response = self.client.post(
187+
"/post_test",
188+
data=json.dumps(body),
189+
headers={"content-type": "application/json"},
190+
)
191+
192+
self.assertEqual(response.status_code, 200)
193+
self.assertEqual(response.json, body)

0 commit comments

Comments
 (0)