@@ -10,7 +10,6 @@ import os
1010import sys
1111import argparse
1212import subprocess
13- import tempfile
1413import base64
1514from pathlib import Path
1615from typing import Optional , List
@@ -29,6 +28,21 @@ except ImportError:
2928 sys .exit (1 )
3029
3130
31+ def get_gpg_version () -> str :
32+ """
33+ Get the version of the system gpg command.
34+ Returns a generic version if gpg is not found.
35+ """
36+ try :
37+ result = subprocess .run (['gpg' , '--version' ], capture_output = True , text = True , check = True )
38+ first_line = result .stdout .splitlines ()[0 ]
39+ if 'gpg (GnuPG)' in first_line :
40+ return first_line .split ()[- 1 ]
41+ except (subprocess .CalledProcessError , FileNotFoundError , IndexError ):
42+ pass
43+ return "2.4.4"
44+
45+
3246def gpg_mode (args : List [str ]):
3347 """
3448 Emulate GPG behavior when called by rpmsign.
@@ -37,35 +51,57 @@ def gpg_mode(args: List[str]):
3751 """
3852 output_file = None
3953 input_file = "-"
40- key_id = None
4154
4255 # Simple argument parser for GPG flags
4356 i = 0
4457 while i < len (args ):
4558 arg = args [i ]
4659 if arg in ('-o' , '--output' ):
47- output_file = args [i + 1 ]
48- i += 2
60+ if i + 1 < len (args ):
61+ output_file = args [i + 1 ]
62+ i += 2
63+ else :
64+ print (f"Error: { arg } requires an argument" , file = sys .stderr )
65+ sys .exit (1 )
4966 elif arg == '-sbo' :
50- output_file = args [i + 1 ]
51- i += 2
67+ if i + 1 < len (args ):
68+ output_file = args [i + 1 ]
69+ i += 2
70+ else :
71+ print (f"Error: { arg } requires an argument" , file = sys .stderr )
72+ sys .exit (1 )
5273 elif arg == '-u' :
53- key_id = args [i + 1 ]
54- i += 2
74+ if i + 1 < len (args ):
75+ # key_id is parsed but we currently rely on environment variable CHELON_KEY_TYPE
76+ # as per the wrapper logic, but we should at least parse it correctly.
77+ # key_id = args[i+1]
78+ i += 2
79+ else :
80+ print (f"Error: { arg } requires an argument" , file = sys .stderr )
81+ sys .exit (1 )
5582 elif arg == '--' :
5683 if i + 1 < len (args ):
5784 input_file = args [i + 1 ]
5885 break
5986 elif arg == '--version' :
60- print ("gpg (GnuPG) 2.4.5" )
87+ version = get_gpg_version ()
88+ print (f"gpg (GnuPG) { version } " )
6189 sys .exit (0 )
6290 else :
6391 i += 1
6492
65- # Read input data
93+ # Read input data with a 10MB limit (DoS protection)
94+ MAX_INPUT_SIZE = 10 * 1024 * 1024 # 10MB
6695 if input_file == "-" :
67- data = sys .stdin .buffer .read ()
96+ data = sys .stdin .buffer .read (MAX_INPUT_SIZE + 1 )
97+ if len (data ) > MAX_INPUT_SIZE :
98+ print (f"Error: Input data from stdin exceeds limit of { MAX_INPUT_SIZE } bytes" , file = sys .stderr )
99+ sys .exit (1 )
68100 else :
101+ file_size = os .path .getsize (input_file )
102+ if file_size > MAX_INPUT_SIZE :
103+ print (f"Error: Input file too large ({ file_size } bytes)" , file = sys .stderr )
104+ sys .exit (1 )
69105 with open (input_file , 'rb' ) as f :
70106 data = f .read ()
71107
@@ -84,23 +120,58 @@ def gpg_mode(args: List[str]):
84120 is_armored = '--armor' in args or '-a' in args
85121
86122 if output_file and output_file != '-' :
87- with open ( output_file , 'w' if is_armored else 'wb' ) as f :
123+ try :
88124 if is_armored :
89- f .write (signature )
125+ with open (output_file , 'w' ) as f :
126+ f .write (signature )
90127 else :
91128 # Strip headers and decode if binary expected
92- # Simplified: just dearmor using gpg if available, or just write as is if armored
93129 if signature .startswith ('-----BEGIN' ):
94130 # Need to dearmor
95- proc = subprocess .Popen (['gpg' , '--dearmor' ], stdin = subprocess .PIPE , stdout = f )
96- proc .communicate (input = signature .encode ('utf-8' ))
131+ try :
132+ result = subprocess .run (
133+ ['gpg' , '--dearmor' ],
134+ input = signature .encode ('utf-8' ),
135+ capture_output = True ,
136+ check = True
137+ )
138+ with open (output_file , 'wb' ) as f :
139+ f .write (result .stdout )
140+ except subprocess .CalledProcessError as e :
141+ print (f"Error: Failed to dearmor signature: { e .stderr .decode ()} " , file = sys .stderr )
142+ sys .exit (1 )
143+ except FileNotFoundError :
144+ print ("Error: 'gpg' command not found. Please install GnuPG to handle armored signatures in binary mode." , file = sys .stderr )
145+ sys .exit (1 )
97146 else :
98- f .write (base64 .b64decode (signature ))
147+ with open (output_file , 'wb' ) as f :
148+ f .write (base64 .b64decode (signature ))
149+ except IOError as e :
150+ print (f"Error writing to { output_file } : { e } " , file = sys .stderr )
151+ sys .exit (1 )
99152 else :
100153 if is_armored :
101154 sys .stdout .write (signature )
155+ sys .stdout .flush ()
102156 else :
103- sys .stdout .buffer .write (base64 .b64decode (signature ))
157+ if signature .startswith ('-----BEGIN' ):
158+ try :
159+ result = subprocess .run (
160+ ['gpg' , '--dearmor' ],
161+ input = signature .encode ('utf-8' ),
162+ capture_output = True ,
163+ check = True
164+ )
165+ sys .stdout .buffer .write (result .stdout )
166+ except subprocess .CalledProcessError as e :
167+ print (f"Error: Failed to dearmor signature: { e .stderr .decode ()} " , file = sys .stderr )
168+ sys .exit (1 )
169+ except FileNotFoundError :
170+ print ("Error: 'gpg' command not found." , file = sys .stderr )
171+ sys .exit (1 )
172+ else :
173+ sys .stdout .buffer .write (base64 .b64decode (signature ))
174+ sys .stdout .buffer .flush ()
104175
105176 except Exception as e :
106177 print (f"GPG Emulation Error: { e } " , file = sys .stderr )
@@ -109,7 +180,7 @@ def gpg_mode(args: List[str]):
109180 sys .exit (0 )
110181
111182
112- def resign_rpm (rpm_path : str , key_type : str = 'modern' , verbose : bool = False ):
183+ def sign_rpm_integrated (rpm_path : str , key_type : str = 'modern' , verbose : bool = False ):
113184 """
114185 Embed signature into RPM using rpmsign and self as wrapper.
115186 """
@@ -124,11 +195,12 @@ def resign_rpm(rpm_path: str, key_type: str = 'modern', verbose: bool = False):
124195 key_id = key_ids .get (key_type , key_type )
125196
126197 if verbose :
127- print (f"Resigning RPM : { rpm_file } " )
198+ print (f"Integrated Signing : { rpm_file } " )
128199 print (f"Using Key: { key_type } ({ key_id } )" )
129200
130- # Set environment for the wrapper call
131- os .environ ['CHELON_KEY_TYPE' ] = key_type
201+ # Prepare environment for the wrapper call
202+ new_env = os .environ .copy ()
203+ new_env ['CHELON_KEY_TYPE' ] = key_type
132204
133205 cmd = [
134206 'rpmsign' ,
@@ -139,12 +211,18 @@ def resign_rpm(rpm_path: str, key_type: str = 'modern', verbose: bool = False):
139211 ]
140212
141213 try :
142- result = subprocess .run (cmd , check = True , capture_output = not verbose , text = True )
214+ subprocess .run (cmd , check = True , capture_output = not verbose , text = True , env = new_env )
143215 if verbose :
144216 print ("✓ Signature embedded successfully" )
145217 return True
146218 except subprocess .CalledProcessError as e :
147- print (f"Error during rpmsign: { e .stderr if e .stderr else e } " , file = sys .stderr )
219+ error_msg = e .stderr .strip () if e .stderr else str (e )
220+ if not error_msg and not verbose :
221+ error_msg = "rpmsign failed (check system logs or run with --verbose)"
222+ print (f"Error during integrated signing: { error_msg } " , file = sys .stderr )
223+ return False
224+ except Exception as e :
225+ print (f"Unexpected error during integrated signing: { e } " , file = sys .stderr )
148226 return False
149227
150228
@@ -183,12 +261,15 @@ def sign_rpm_detached(rpm_path: str, output_path: Optional[str] = None,
183261
184262def main ():
185263 # Detect if we are being called as a GPG wrapper
186- # rpmsign usually calls with a lot of flags, first one is often --no-verbose
264+ # rpmsign usually calls with a lot of flags, including -sbo or --detach-sign
187265 if len (sys .argv ) > 1 and sys .argv [1 ].startswith ('--' ):
188- # Check if it looks like a GPG call (batch, detach-sign, etc)
189- gpg_flags = {'--version' , '--batch' , '- sbo' , '--detach-sign' , '--armor' , '-u ' }
266+ # Check if it looks specifically like a GPG call from rpmsign
267+ gpg_flags = {'-sbo' , '--detach-sign' , '--armor' , '--no-secmem-warning ' }
190268 if any (arg in gpg_flags for arg in sys .argv ):
191269 gpg_mode (sys .argv [1 :])
270+ elif '--version' in sys .argv and len (sys .argv ) == 2 :
271+ # Handle standalone --version for discovery tools
272+ gpg_mode (sys .argv [1 :])
192273
193274 parser = argparse .ArgumentParser (
194275 description = 'Sign RPM packages using Chelon service' ,
@@ -210,7 +291,7 @@ def main():
210291
211292 try :
212293 if args .resign :
213- success = resign_rpm (args .rpm_file , key_type = args .key_type , verbose = args .verbose )
294+ success = sign_rpm_integrated (args .rpm_file , key_type = args .key_type , verbose = args .verbose )
214295 return 0 if success else 1
215296 else :
216297 output_file = sign_rpm_detached (
@@ -224,6 +305,7 @@ def main():
224305 return 0
225306
226307 except KeyboardInterrupt :
308+ print ("\n Interrupted by user" , file = sys .stderr )
227309 return 130
228310 except Exception as e :
229311 print (f"Error: { e } " , file = sys .stderr )
0 commit comments