@@ -255,16 +255,6 @@ def _curl_tls_args(self, insecure_tls: bool, stored_cacert: str | None) -> str:
255255 tls_args += f"--cacert { stored_cacert } "
256256 return tls_args .strip ()
257257
258- def _prepare_headers (self , headers : dict [str , str ] | None , bearer_token : str | None ) -> str :
259- all_headers = headers .copy () if headers else {}
260- if bearer_token :
261- if any (k .lower () == "authorization" for k in all_headers .keys ()):
262- self .logger .warning ("Authorization header provided - ignoring bearer token" )
263- else :
264- all_headers ["Authorization" ] = f"Bearer { bearer_token } "
265-
266- return self ._curl_header_args (all_headers )
267-
268258 def _curl_header_args (self , headers : dict [str , str ] | None ) -> str :
269259 """Generate header arguments for curl command"""
270260 if not headers :
@@ -692,28 +682,14 @@ def manifest(self):
692682 self ._manifest = FlasherBundleManifestV1Alpha1 .from_string (yaml_str )
693683 return self ._manifest
694684
695- def _parse_headers (self , headers : list [str ]) -> dict [str , str ]:
696- """Parse header strings into a dict
697- Args:
698- headers: List of header strings in 'Key: Value' format
699- Returns:
700- Dictionary mapping header keys to values
701- Raises:
702- click.ClickException: If header format is invalid
703- """
685+ def _validate_header_dict (self , header_map : dict [str , str ]) -> dict [str , str ]:
704686 token_re = re .compile (r"^[!#$%&'*+\-.^_`|~0-9A-Za-z]+$" )
705- header_map : dict [str , str ] = {}
706687 seen : set [str ] = set ()
707- for h in headers :
708- if ":" not in h :
709- raise click .ClickException (f"Invalid header format: { h !r} . Expected 'Key: Value'." )
710-
711- key , value = h .split (":" , 1 )
688+ for key , value in header_map .items ():
712689 key = key .strip ()
713690 value = value .strip ()
714-
715691 if not key :
716- raise click .ClickException (f"Invalid header key in: { h !r } " )
692+ raise click .ClickException (f"Invalid header key: ' { key } ' " )
717693
718694 if not token_re .match (key ):
719695 raise click .ClickException (f"Invalid header name '{ key } ': must be an HTTP token (RFC7230)" )
@@ -723,10 +699,30 @@ def _parse_headers(self, headers: list[str]) -> dict[str, str]:
723699 if kl in seen :
724700 raise click .ClickException (f"Duplicate header '{ key } '" )
725701 seen .add (kl )
726- header_map [key ] = value
727-
728702 return header_map
729703
704+ def _parse_headers (self , headers : list [str ]) -> dict [str , str ]:
705+ header_map : dict [str , str ] = {}
706+ for h in headers :
707+ if ":" not in h :
708+ raise click .ClickException (f"Invalid header format: { h !r} . Expected 'Key: Value'." )
709+
710+ key , value = h .split (":" , 1 )
711+ header_map [key .strip ()] = value .strip ()
712+
713+ return self ._validate_header_dict (header_map )
714+
715+ def _prepare_headers (self , headers : dict [str , str ] | None , bearer_token : str | None ) -> str :
716+ all_headers = headers .copy () if headers else {}
717+ if bearer_token :
718+ if any (k .lower () == "authorization" for k in all_headers .keys ()):
719+ self .logger .warning ("Authorization header provided - ignoring bearer token" )
720+ else :
721+ all_headers ["Authorization" ] = f"Bearer { bearer_token } "
722+
723+ validated_headers = self ._validate_header_dict (all_headers )
724+ return self ._curl_header_args (validated_headers )
725+
730726 def _validate_bearer_token (self , token : str | None ) -> str | None :
731727 if token is None :
732728 return None
0 commit comments