@@ -41,6 +41,38 @@ def __init__(self, credentials_store: dict[str, str]):
4141 """Initialize with a mapping of access_key -> secret_key."""
4242 self .credentials_store = credentials_store
4343
44+ def _parse_v4_credential (
45+ self , credential : str
46+ ) -> tuple [S3Credentials | None , str , str , str , str | None ]:
47+ """Parse V4 credential string and lookup secret key.
48+
49+ Returns: (credentials, date_stamp, region, service, error)
50+ """
51+ try :
52+ parts = credential .split ("/" )
53+ access_key , date_stamp , region , service = parts [0 ], parts [1 ], parts [2 ], parts [3 ]
54+ except (IndexError , ValueError ):
55+ return None , "" , "" , "" , "Invalid credential format"
56+
57+ secret_key = self .credentials_store .get (access_key )
58+ if not secret_key :
59+ return None , "" , "" , "" , f"Unknown access key: { access_key } "
60+
61+ creds = S3Credentials (
62+ access_key = access_key , secret_key = secret_key , region = region , service = service
63+ )
64+ return creds , date_stamp , region , service , None
65+
66+ def _compute_v4_signature (
67+ self , canonical_request : str , amz_date : str , date_stamp : str , region : str , service : str , secret_key : str
68+ ) -> str :
69+ """Compute V4 signature for a canonical request."""
70+ string_to_sign = self ._build_string_to_sign (
71+ amz_date , date_stamp , region , service , canonical_request
72+ )
73+ signing_key = _derive_signing_key (secret_key , date_stamp , region , service )
74+ return hmac .new (signing_key , string_to_sign .encode (), hashlib .sha256 ).hexdigest ()
75+
4476 def verify (self , request : ParsedRequest , path : str ) -> tuple [bool , S3Credentials | None , str ]:
4577 """Verify SigV4 signature. Returns (is_valid, credentials, error_message)."""
4678 # Check for Authorization header (standard SigV4)
@@ -73,60 +105,38 @@ def _verify_header_signature(
73105 signed_headers = auth_parts ["SignedHeaders" ]
74106 signature = auth_parts ["Signature" ]
75107
76- cred_parts = credential .split ("/" )
77- access_key = cred_parts [0 ]
78- date_stamp = cred_parts [1 ]
79- region = cred_parts [2 ]
80- service = cred_parts [3 ]
81-
82- secret_key = self .credentials_store .get (access_key )
83- if not secret_key :
84- return False , None , f"Unknown access key: { access_key } "
85-
86- credentials = S3Credentials (
87- access_key = access_key ,
88- secret_key = secret_key ,
89- region = region ,
90- service = service ,
91- )
108+ credentials , date_stamp , region , service , error = self ._parse_v4_credential (credential )
109+ if error :
110+ return False , None , error
92111
93112 amz_date = request .headers .get ("x-amz-date" , "" )
94113 if not amz_date :
95114 return False , credentials , "Missing x-amz-date header"
96115
97116 try :
98117 request_time = datetime .strptime (amz_date , "%Y%m%dT%H%M%SZ" ).replace (tzinfo = UTC )
99- now = datetime .now (UTC )
100- if abs (now - request_time ) > CLOCK_SKEW_TOLERANCE :
118+ if abs (datetime .now (UTC ) - request_time ) > CLOCK_SKEW_TOLERANCE :
101119 return False , credentials , "Request time too skewed"
102120 except ValueError :
103121 return False , credentials , "Invalid x-amz-date format"
104122
105123 canonical_request = self ._build_canonical_request (
106124 request , path , signed_headers .split (";" )
107125 )
108-
109- string_to_sign = self ._build_string_to_sign (
110- amz_date , date_stamp , region , service , canonical_request
126+ calculated_sig = self ._compute_v4_signature (
127+ canonical_request , amz_date , date_stamp , region , service , credentials .secret_key
111128 )
112129
113- signing_key = _derive_signing_key (secret_key , date_stamp , region , service )
114- calculated_sig = hmac .new (
115- signing_key , string_to_sign .encode (), hashlib .sha256
116- ).hexdigest ()
117-
118130 if hmac .compare_digest (calculated_sig , signature ):
119131 return True , credentials , ""
120132
121- # Log debug info for signature mismatch to help diagnose issues
122133 logger .debug (
123134 "Signature verification failed" ,
124135 method = request .method ,
125136 path = path ,
126137 signed_headers = signed_headers ,
127138 expected_sig = signature [:16 ] + "..." ,
128139 calculated_sig = calculated_sig [:16 ] + "..." ,
129- canonical_request_hash = hashlib .sha256 (canonical_request .encode ()).hexdigest ()[:16 ],
130140 )
131141 return False , credentials , "Signature mismatch"
132142
@@ -144,61 +154,39 @@ def _verify_presigned_v4(
144154 signed_headers = request .query_params .get ("X-Amz-SignedHeaders" , ["" ])[0 ]
145155 signature = request .query_params .get ("X-Amz-Signature" , ["" ])[0 ]
146156
147- cred_parts = credential .split ("/" )
148- access_key = cred_parts [0 ]
149- date_stamp = cred_parts [1 ]
150- region = cred_parts [2 ]
151- service = cred_parts [3 ]
152-
153- secret_key = self .credentials_store .get (access_key )
154- if not secret_key :
155- return False , None , f"Unknown access key: { access_key } "
156-
157- credentials = S3Credentials (
158- access_key = access_key ,
159- secret_key = secret_key ,
160- region = region ,
161- service = service ,
162- )
157+ credentials , date_stamp , region , service , error = self ._parse_v4_credential (credential )
158+ if error :
159+ return False , None , error
163160
164161 request_time = datetime .strptime (amz_date , "%Y%m%dT%H%M%SZ" ).replace (tzinfo = UTC )
165- expiry_time = request_time + timedelta (seconds = expires )
166- if datetime .now (UTC ) > expiry_time :
162+ if datetime .now (UTC ) > request_time + timedelta (seconds = expires ):
167163 return False , credentials , "Presigned URL expired"
168164
169165 query_for_signing = {
170166 k : v for k , v in request .query_params .items () if k != "X-Amz-Signature"
171167 }
168+ signed_headers_list = signed_headers .split (";" )
172169
173170 # Try verification with original headers first
174171 canonical_request = self ._build_canonical_request_presigned (
175- request , path , signed_headers . split ( ";" ) , query_for_signing
172+ request , path , signed_headers_list , query_for_signing
176173 )
177-
178- string_to_sign = self ._build_string_to_sign (
179- amz_date , date_stamp , region , service , canonical_request
174+ calculated_sig = self ._compute_v4_signature (
175+ canonical_request , amz_date , date_stamp , region , service , credentials .secret_key
180176 )
181177
182- signing_key = _derive_signing_key (secret_key , date_stamp , region , service )
183- calculated_sig = hmac .new (
184- signing_key , string_to_sign .encode (), hashlib .sha256
185- ).hexdigest ()
186-
187178 if hmac .compare_digest (calculated_sig , signature ):
188179 return True , credentials , ""
189180
190181 # Try with alternate host header (with/without :80) for HTTP port normalization
191- # Some clients sign with explicit :80, others normalize it away
192182 host_header = request .headers .get ("host" , "" )
193- if "host" in signed_headers .split (";" ):
194- alternate_host = None
195- if host_header .endswith (":80" ):
196- alternate_host = host_header [:- 3 ] # Try without :80
197- elif ":" not in host_header :
198- alternate_host = host_header + ":80" # Try with :80
199-
183+ if "host" in signed_headers_list :
184+ alternate_host = (
185+ host_header [:- 3 ] if host_header .endswith (":80" )
186+ else host_header + ":80" if ":" not in host_header
187+ else None
188+ )
200189 if alternate_host :
201- # Create modified request with alternate host
202190 modified_headers = dict (request .headers )
203191 modified_headers ["host" ] = alternate_host
204192 modified_request = ParsedRequest (
@@ -210,29 +198,23 @@ def _verify_presigned_v4(
210198 body = request .body ,
211199 is_presigned = request .is_presigned ,
212200 )
213-
214201 canonical_request_alt = self ._build_canonical_request_presigned (
215- modified_request , path , signed_headers . split ( ";" ) , query_for_signing
202+ modified_request , path , signed_headers_list , query_for_signing
216203 )
217- string_to_sign_alt = self ._build_string_to_sign (
218- amz_date , date_stamp , region , service , canonical_request_alt
204+ calculated_sig_alt = self ._compute_v4_signature (
205+ canonical_request_alt , amz_date , date_stamp , region , service ,
206+ credentials .secret_key ,
219207 )
220- calculated_sig_alt = hmac .new (
221- signing_key , string_to_sign_alt .encode (), hashlib .sha256
222- ).hexdigest ()
223-
224208 if hmac .compare_digest (calculated_sig_alt , signature ):
225209 return True , credentials , ""
226210
227- # Debug logging for presigned URL signature mismatch
228211 logger .warning (
229212 "Presigned URL signature mismatch" ,
230213 path = path ,
231214 signed_headers = signed_headers ,
232215 host_header = host_header ,
233216 expected_sig = signature [:16 ] + "..." ,
234217 calculated_sig = calculated_sig [:16 ] + "..." ,
235- canonical_request_preview = canonical_request [:500 ],
236218 )
237219 return False , credentials , "Signature mismatch"
238220
0 commit comments