Skip to content

Commit 43946ab

Browse files
Merge pull request #602 from AutomationSolutionz/zeuz-secrets
Zeuz secrets feature
2 parents e040851 + 2b28e8a commit 43946ab

6 files changed

Lines changed: 652 additions & 7 deletions

File tree

Framework/Built_In_Automation/Shared_Resources/BuiltInFunctionSharedResources.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,13 +616,26 @@ class a:
616616
# CommonUtil.prettify(copy_of_name, result)
617617
return result
618618
else:
619-
val = eval(name, shared_variables)
619+
from .secrets import secret
620+
621+
eval_context = dict(shared_variables)
622+
eval_context['secret'] = secret
623+
624+
val = eval(name, eval_context)
620625
val_to_print = copy.deepcopy(val)
621626

622627
for each_var in CommonUtil.zeuz_disable_var_print.keys():
623628
if each_var in name:
624629
val_to_print = '*****'
625630
break
631+
632+
if 'secret[' in name:
633+
secret_key_match = re.search(r"secret\[['\"](.*?)['\"]\]", name)
634+
if secret_key_match:
635+
secret_key = secret_key_match.group(1)
636+
if secret_key not in CommonUtil.zeuz_disable_var_print:
637+
CommonUtil.zeuz_disable_var_print[secret_key] = val
638+
val_to_print = '*****'
626639

627640
if str(shared_variables['zeuz_enable_variable_logging']).lower() in {"on", "yes", "true", "1"} and not "os.environ" in name:
628641
CommonUtil.AddVariableToLog(sModuleInfo, copy_of_name, val_to_print)
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
ZeuZ Secrets Management Module
4+
5+
This module provides secure access to encrypted secrets stored on the ZeuZ server.
6+
Secrets are automatically decrypted and can be accessed like a dictionary.
7+
8+
Example:
9+
from Framework.Built_In_Automation.Shared_Resources.secrets import secret
10+
11+
# Access a secret
12+
api_key = secret['my_api_key']
13+
db_password = secret['database_password']
14+
"""
15+
16+
import json
17+
import base64
18+
import inspect
19+
from typing import Dict, Optional
20+
21+
from cryptography.hazmat.primitives import serialization, hashes
22+
from cryptography.hazmat.primitives.asymmetric import padding
23+
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
24+
from cryptography.hazmat.primitives.padding import PKCS7
25+
26+
from Framework.Utilities import CommonUtil, RequestFormatter
27+
from Framework.Built_In_Automation.Shared_Resources import BuiltInFunctionSharedResources as sr
28+
29+
30+
from settings import ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR
31+
32+
33+
MODULE_NAME = inspect.getmodulename(__file__)
34+
35+
36+
class Secret:
37+
"""
38+
A class to securely fetch and decrypt secrets from the ZeuZ server.
39+
40+
Secrets are accessed like a dictionary: secret['key_name']
41+
The class automatically:
42+
- Fetches the encrypted secret from the server
43+
- Decrypts it using the private key
44+
- Caches the result for performance # Disabled for now
45+
"""
46+
47+
def __init__(self):
48+
self._cache: Dict[str, str] = {}
49+
self._private_key_folder = ZEUZ_NODE_PRIVATE_RSA_KEYS_DIR
50+
51+
def __getitem__(self, key_name: str) -> str:
52+
"""
53+
Retrieve a secret by key name.
54+
55+
Args:
56+
key_name: The name of the secret to retrieve
57+
58+
Returns:
59+
The decrypted secret value
60+
61+
Raises:
62+
KeyError: If the secret cannot be found or accessed
63+
Exception: If decryption fails
64+
"""
65+
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
66+
67+
if key_name in self._cache:
68+
CommonUtil.ExecLog(sModuleInfo, f"Retrieved secret '{key_name}' from cache", 0)
69+
return self._cache[key_name]
70+
71+
try:
72+
test_id = None
73+
if sr.Test_Shared_Variables("zeuz_current_tc"):
74+
current_tc = sr.Get_Shared_Variables("zeuz_current_tc")
75+
if isinstance(current_tc, dict) and "testcase_no" in current_tc:
76+
test_id = current_tc["testcase_no"]
77+
78+
step_data = sr.Get_Shared_Variables(CommonUtil.dont_prettify_on_server[0])
79+
if step_data and isinstance(step_data, list) and len(step_data) >= int(CommonUtil.current_action_no):
80+
current_action = step_data[int(CommonUtil.current_action_no) - 1]
81+
action_details = []
82+
for action in current_action:
83+
if action and len(action) >= 3:
84+
action_details.append({"left": action[0], "middle": action[1], "right": action[2]})
85+
params = {}
86+
if test_id:
87+
params["test_id"] = test_id
88+
if action_details:
89+
params["action_details"] = json.dumps({"values": action_details, "extra": "{}"})
90+
91+
CommonUtil.ExecLog(sModuleInfo, f"Fetching secret '{key_name}' from server", 0)
92+
93+
response = RequestFormatter.request(
94+
method="GET",
95+
url=RequestFormatter.form_uri(f"/d/api/v1/zeuz-secrets/{key_name}"),
96+
params=params,
97+
verify=False
98+
)
99+
100+
if response.status_code == 403:
101+
CommonUtil.ExecLog(
102+
sModuleInfo,
103+
f"Access denied to secret '{key_name}'. Check permissions.",
104+
3
105+
)
106+
raise KeyError(f"Access denied to secret '{key_name}'")
107+
108+
if response.status_code != 200:
109+
CommonUtil.ExecLog(
110+
sModuleInfo,
111+
f"Failed to fetch secret '{key_name}': {response.status_code}",
112+
3
113+
)
114+
raise KeyError(f"Secret '{key_name}' not found or inaccessible")
115+
116+
data = response.json()
117+
encrypted_value = data.get("value")
118+
119+
if not encrypted_value:
120+
CommonUtil.ExecLog(
121+
sModuleInfo,
122+
f"Secret '{key_name}' has no value",
123+
3
124+
)
125+
raise KeyError(f"Secret '{key_name}' has no value")
126+
127+
decrypted_value = self._decrypt_data(encrypted_value)
128+
129+
130+
# Cache the decrypted value
131+
# self._cache[key_name] = decrypted_value
132+
133+
if key_name not in CommonUtil.zeuz_disable_var_print:
134+
CommonUtil.zeuz_disable_var_print[key_name] = decrypted_value
135+
136+
CommonUtil.ExecLog(
137+
sModuleInfo,
138+
f"Successfully retrieved and decrypted secret '{key_name}'",
139+
1
140+
)
141+
142+
return decrypted_value
143+
144+
except KeyError:
145+
raise
146+
except Exception as e:
147+
CommonUtil.ExecLog(
148+
sModuleInfo,
149+
f"Error retrieving secret '{key_name}': {str(e)}",
150+
3
151+
)
152+
raise KeyError(f"Failed to retrieve secret '{key_name}': {str(e)}")
153+
154+
155+
def _load_private_keys(self):
156+
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
157+
158+
try:
159+
if not self._private_key_folder.exists():
160+
CommonUtil.ExecLog(
161+
sModuleInfo,
162+
f"Private key folder not found at {self._private_key_folder}",
163+
3
164+
)
165+
raise FileNotFoundError(f"Private key folder not found at {self._private_key_folder}")
166+
167+
private_keys = []
168+
for pem_file in self._private_key_folder.glob("*.pem"):
169+
with open(pem_file, 'rb') as f:
170+
private_key = serialization.load_pem_private_key(f.read(), password=None)
171+
private_keys.append(private_key)
172+
173+
if not private_keys:
174+
raise FileNotFoundError(f"No .pem files found in {self._private_key_folder}")
175+
176+
return private_keys
177+
except Exception as e:
178+
CommonUtil.ExecLog(
179+
sModuleInfo,
180+
f"Failed to load private keys: {str(e)}",
181+
3
182+
)
183+
raise
184+
185+
def _decrypt_data(self, encrypted_data: str) -> str:
186+
"""
187+
Decrypt data that was encrypted using hybrid encryption (RSA + AES).
188+
189+
Args:
190+
encrypted_data: Base64 encoded JSON string containing encrypted key, IV, and data
191+
192+
Returns:
193+
Decrypted plaintext string
194+
"""
195+
private_keys = self._load_private_keys()
196+
197+
decoded_data = base64.b64decode(encrypted_data)
198+
data = json.loads(decoded_data.decode('utf-8'))
199+
200+
encrypted_aes_key = base64.b64decode(data['encryptedKey'])
201+
iv = base64.b64decode(data['iv'])
202+
encrypted_content = base64.b64decode(data['encryptedData'])
203+
204+
for private_key in private_keys:
205+
try:
206+
aes_key = private_key.decrypt(
207+
encrypted_aes_key,
208+
padding.OAEP(
209+
mgf=padding.MGF1(algorithm=hashes.SHA256()),
210+
algorithm=hashes.SHA256(),
211+
label=None
212+
)
213+
)
214+
215+
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv))
216+
decryptor = cipher.decryptor()
217+
decrypted_padded = decryptor.update(encrypted_content) + decryptor.finalize()
218+
219+
unpadder = PKCS7(128).unpadder()
220+
decrypted = unpadder.update(decrypted_padded) + unpadder.finalize()
221+
222+
return decrypted.decode('utf-8')
223+
except Exception:
224+
continue
225+
226+
raise Exception("No private key could decrypt the data")
227+
228+
def clear_cache(self, key_name: Optional[str] = None):
229+
"""
230+
Clear the cache for a specific secret or all secrets.
231+
232+
Args:
233+
key_name: Optional specific secret to clear. If None, clears all.
234+
"""
235+
if key_name:
236+
self._cache.pop(key_name, None)
237+
else:
238+
self._cache.clear()
239+
240+
241+
secret = Secret()

Framework/Built_In_Automation/Web/Selenium/utils.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
import struct
1919
import urllib.request
2020
from rich.progress import Progress
21-
22-
23-
# ZeuZ Node Downloads base directory
24-
ZEUZ_NODE_DOWNLOADS_DIR = Path.home() / "zeuz_node_downloads"
21+
from settings import ZEUZ_NODE_DOWNLOADS_DIR
2522

2623

2724
class ChromeForTesting:

0 commit comments

Comments
 (0)