11import base64
22import json
33import logging
4- from typing import Optional
4+ from typing import cast , Optional
55from uuid import UUID
66
77from fastapi import Depends , HTTPException , Request
88from fastapi .exceptions import HTTPException
99from fastapi .security .utils import get_authorization_scheme_param
1010from web3login .auth import to_checksum_address , verify
11- from web3login .exceptions import Web3VerificationError
11+ from web3login .exceptions import (
12+ Web3AuthorizationExpired ,
13+ Web3AuthorizationWrongApplication ,
14+ Web3VerificationError ,
15+ )
1216from web3login .middlewares .fastapi import OAuth2BearerOrWeb3
1317
1418from . import actions , data
1519from .db import yield_db_read_only_session
1620from .settings import (
17- APPLICATION_NAME ,
1821 BOT_INSTALLATION_TOKEN ,
1922 BOT_INSTALLATION_TOKEN_HEADER ,
23+ BUGOUT_WEB3_SIGNATURE_APPLICATION_HEADER ,
2024)
2125
2226logger = logging .getLogger (__name__ )
@@ -41,13 +45,27 @@ async def get_current_user(
4145 if token is None or token == "" :
4246 raise HTTPException (status_code = 404 , detail = "Access token not found" )
4347
48+ signature_application : str = request .headers .get (
49+ BUGOUT_WEB3_SIGNATURE_APPLICATION_HEADER
50+ )
51+ application_id = None
52+ if signature_application is not None :
53+ try :
54+ application_id = cast (UUID , signature_application )
55+ except Exception :
56+ raise HTTPException (
57+ status_code = 403 , detail = "Wrong Web3 signature application provided"
58+ )
59+
4460 try :
4561 if scheme == "web3" :
4662 payload_json = base64 .decodebytes (str (token ).encode ()).decode ("utf-8" )
4763 payload = json .loads (payload_json )
4864 verified = verify (
4965 authorization_payload = payload ,
50- application_to_check = APPLICATION_NAME ,
66+ application_to_check = str (application_id )
67+ if application_id is not None
68+ else "" ,
5169 )
5270 if not verified :
5371 logger .info ("Web3 verification error" )
@@ -57,7 +75,11 @@ async def get_current_user(
5775 logger .error ("Web3 address in payload could not be None" )
5876 raise Exception ()
5977 web3_address = to_checksum_address (web3_address )
60- user = actions .get_user (session = db_session , web3_address = web3_address )
78+ user = actions .get_user (
79+ session = db_session ,
80+ web3_address = web3_address ,
81+ application_id = application_id ,
82+ )
6183
6284 elif scheme == "bearer" :
6385 is_token_active , user = actions .get_current_user_by_token (
@@ -82,6 +104,10 @@ async def get_current_user(
82104 except actions .UserInvalidParameters as e :
83105 logger .info (e )
84106 raise HTTPException (status_code = 500 )
107+ except Web3AuthorizationExpired :
108+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
109+ except Web3AuthorizationWrongApplication :
110+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
85111 except Web3VerificationError :
86112 raise HTTPException (status_code = 403 , detail = "Signature not verified" )
87113 except Exception :
@@ -117,13 +143,27 @@ async def get_current_user_with_groups(
117143 if token is None or token == "" :
118144 raise HTTPException (status_code = 404 , detail = "Access token not found" )
119145
146+ signature_application : str = request .headers .get (
147+ BUGOUT_WEB3_SIGNATURE_APPLICATION_HEADER
148+ )
149+ application_id = None
150+ if signature_application is not None :
151+ try :
152+ application_id = cast (UUID , signature_application )
153+ except Exception :
154+ raise HTTPException (
155+ status_code = 403 , detail = "Wrong Web3 signature application provided"
156+ )
157+
120158 try :
121159 if scheme == "web3" :
122160 payload_json = base64 .decodebytes (str (token ).encode ()).decode ("utf-8" )
123161 payload = json .loads (payload_json )
124162 verified = verify (
125163 authorization_payload = payload ,
126- application_to_check = APPLICATION_NAME ,
164+ application_to_check = str (application_id )
165+ if application_id is not None
166+ else "" ,
127167 )
128168 if not verified :
129169 logger .info ("Web3 authorization verification error" )
@@ -134,7 +174,9 @@ async def get_current_user_with_groups(
134174 raise Exception ()
135175 web3_address = to_checksum_address (web3_address )
136176 user_extended = actions .get_user_with_groups (
137- session = db_session , web3_address = web3_address
177+ session = db_session ,
178+ web3_address = web3_address ,
179+ application_id = application_id ,
138180 )
139181
140182 elif scheme == "bearer" :
@@ -163,6 +205,10 @@ async def get_current_user_with_groups(
163205 except actions .UserInvalidParameters as e :
164206 logger .info (e )
165207 raise HTTPException (status_code = 500 )
208+ except Web3AuthorizationExpired :
209+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
210+ except Web3AuthorizationWrongApplication :
211+ raise HTTPException (status_code = 403 , detail = "Signature not verified" )
166212 except Web3VerificationError :
167213 raise HTTPException (status_code = 403 , detail = "Signature not verified" )
168214 except Exception :
0 commit comments