Skip to content

Commit 07e9e41

Browse files
committed
initial implementation
1 parent 78fa46b commit 07e9e41

2 files changed

Lines changed: 237 additions & 1 deletion

File tree

Framework/Built_In_Automation/Shared_Resources/BuiltInFunctionSharedResources.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,13 +616,26 @@ class a:
616616
# CommonUtil.prettify(copy_of_name, result)
617617
return result
618618
else:
619-
val = eval(name, shared_variables)
619+
from .secrets import secret
620+
621+
eval_context = dict(shared_variables)
622+
eval_context['secret'] = secret
623+
624+
val = eval(name, eval_context)
620625
val_to_print = copy.deepcopy(val)
621626

622627
for each_var in CommonUtil.zeuz_disable_var_print.keys():
623628
if each_var in name:
624629
val_to_print = '*****'
625630
break
631+
632+
if 'secret[' in name:
633+
secret_key_match = re.search(r"secret\[['\"](.*?)['\"]\]", name)
634+
if secret_key_match:
635+
secret_key = secret_key_match.group(1)
636+
if secret_key not in CommonUtil.zeuz_disable_var_print:
637+
CommonUtil.zeuz_disable_var_print[secret_key] = val
638+
val_to_print = '*****'
626639

627640
if str(shared_variables['zeuz_enable_variable_logging']).lower() in {"on", "yes", "true", "1"} and not "os.environ" in name:
628641
CommonUtil.AddVariableToLog(sModuleInfo, copy_of_name, val_to_print)
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
ZeuZ Secrets Management Module
4+
5+
This module provides secure access to encrypted secrets stored on the ZeuZ server.
6+
Secrets are automatically decrypted and can be accessed like a dictionary.
7+
8+
Example:
9+
from Framework.Built_In_Automation.Shared_Resources.secrets import secret
10+
11+
# Access a secret
12+
api_key = secret['my_api_key']
13+
db_password = secret['database_password']
14+
"""
15+
16+
import json
17+
import base64
18+
import inspect
19+
from pathlib import Path
20+
from typing import Any, Dict, Optional
21+
22+
from cryptography.hazmat.primitives import serialization, hashes
23+
from cryptography.hazmat.primitives.asymmetric import padding
24+
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
25+
from cryptography.hazmat.primitives.padding import PKCS7
26+
27+
from Framework.Utilities import CommonUtil, RequestFormatter
28+
from Framework.Built_In_Automation.Shared_Resources import BuiltInFunctionSharedResources as sr
29+
30+
31+
MODULE_NAME = inspect.getmodulename(__file__)
32+
33+
34+
class Secret:
35+
"""
36+
A class to securely fetch and decrypt secrets from the ZeuZ server.
37+
38+
Secrets are accessed like a dictionary: secret['key_name']
39+
The class automatically:
40+
- Fetches the encrypted secret from the server
41+
- Decrypts it using the private key
42+
- Caches the result for performance # Disabled for now
43+
"""
44+
45+
def __init__(self):
46+
self._cache: Dict[str, str] = {}
47+
self._private_key_path = Path.home() / "zeuz_node_downloads" / "private_key.pem"
48+
49+
def __getitem__(self, key_name: str) -> str:
50+
"""
51+
Retrieve a secret by key name.
52+
53+
Args:
54+
key_name: The name of the secret to retrieve
55+
56+
Returns:
57+
The decrypted secret value
58+
59+
Raises:
60+
KeyError: If the secret cannot be found or accessed
61+
Exception: If decryption fails
62+
"""
63+
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
64+
65+
if key_name in self._cache:
66+
CommonUtil.ExecLog(sModuleInfo, f"Retrieved secret '{key_name}' from cache", 0)
67+
return self._cache[key_name]
68+
69+
try:
70+
test_id = None
71+
step_id = None
72+
if sr.Test_Shared_Variables("zeuz_current_tc"):
73+
current_tc = sr.Get_Shared_Variables("zeuz_current_tc")
74+
if isinstance(current_tc, dict) and "testcase_no" in current_tc:
75+
test_id = current_tc["testcase_no"]
76+
if sr.Test_Shared_Variables("zeuz_current_step"):
77+
current_step = sr.Get_Shared_Variables("zeuz_current_step")
78+
if isinstance(current_step, dict) and "step_id" in current_step:
79+
step_id = current_step["step_id"]
80+
81+
params = {}
82+
if test_id:
83+
params["test_id"] = test_id
84+
if step_id:
85+
params["step_id"] = step_id
86+
87+
CommonUtil.ExecLog(sModuleInfo, f"Fetching secret '{key_name}' from server", 0)
88+
89+
response = RequestFormatter.request(
90+
method="GET",
91+
url=RequestFormatter.form_uri(f"/d/api/v1/zeuz-secrets/{key_name}"),
92+
params=params,
93+
verify=False
94+
)
95+
96+
if response.status_code == 403:
97+
CommonUtil.ExecLog(
98+
sModuleInfo,
99+
f"Access denied to secret '{key_name}'. Check permissions.",
100+
3
101+
)
102+
raise KeyError(f"Access denied to secret '{key_name}'")
103+
104+
if response.status_code != 200:
105+
CommonUtil.ExecLog(
106+
sModuleInfo,
107+
f"Failed to fetch secret '{key_name}': {response.status_code}",
108+
3
109+
)
110+
raise KeyError(f"Secret '{key_name}' not found or inaccessible")
111+
112+
data = response.json()
113+
encrypted_value = data.get("value")
114+
115+
if not encrypted_value:
116+
CommonUtil.ExecLog(
117+
sModuleInfo,
118+
f"Secret '{key_name}' has no value",
119+
3
120+
)
121+
raise KeyError(f"Secret '{key_name}' has no value")
122+
123+
decrypted_value = self._decrypt_data(encrypted_value)
124+
125+
126+
# Cache the decrypted value
127+
# self._cache[key_name] = decrypted_value
128+
129+
if key_name not in CommonUtil.zeuz_disable_var_print:
130+
CommonUtil.zeuz_disable_var_print[key_name] = decrypted_value
131+
132+
CommonUtil.ExecLog(
133+
sModuleInfo,
134+
f"Successfully retrieved and decrypted secret '{key_name}'",
135+
1
136+
)
137+
138+
return decrypted_value
139+
140+
except KeyError:
141+
raise
142+
except Exception as e:
143+
CommonUtil.ExecLog(
144+
sModuleInfo,
145+
f"Error retrieving secret '{key_name}': {str(e)}",
146+
3
147+
)
148+
raise KeyError(f"Failed to retrieve secret '{key_name}': {str(e)}")
149+
150+
151+
def _load_private_key(self):
152+
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
153+
154+
try:
155+
if not self._private_key_path.exists():
156+
CommonUtil.ExecLog(
157+
sModuleInfo,
158+
f"Private key not found at {self._private_key_path}",
159+
3
160+
)
161+
raise FileNotFoundError(f"Private key not found at {self._private_key_path}")
162+
163+
with open(self._private_key_path, 'rb') as f:
164+
return serialization.load_pem_private_key(f.read(), password=None)
165+
except Exception as e:
166+
CommonUtil.ExecLog(
167+
sModuleInfo,
168+
f"Failed to load private key: {str(e)}",
169+
3
170+
)
171+
raise
172+
173+
def _decrypt_data(self, encrypted_data: str) -> str:
174+
"""
175+
Decrypt data that was encrypted using hybrid encryption (RSA + AES).
176+
177+
Args:
178+
encrypted_data: Base64 encoded JSON string containing encrypted key, IV, and data
179+
180+
Returns:
181+
Decrypted plaintext string
182+
"""
183+
private_key = self._load_private_key()
184+
185+
decoded_data = base64.b64decode(encrypted_data)
186+
data = json.loads(decoded_data.decode('utf-8'))
187+
188+
encrypted_aes_key = base64.b64decode(data['encryptedKey'])
189+
aes_key = private_key.decrypt(
190+
encrypted_aes_key,
191+
padding.OAEP(
192+
mgf=padding.MGF1(algorithm=hashes.SHA256()),
193+
algorithm=hashes.SHA256(),
194+
label=None
195+
)
196+
)
197+
198+
iv = base64.b64decode(data['iv'])
199+
encrypted_content = base64.b64decode(data['encryptedData'])
200+
201+
cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv))
202+
decryptor = cipher.decryptor()
203+
decrypted_padded = decryptor.update(encrypted_content) + decryptor.finalize()
204+
205+
unpadder = PKCS7(128).unpadder()
206+
decrypted = unpadder.update(decrypted_padded) + unpadder.finalize()
207+
208+
return decrypted.decode('utf-8')
209+
210+
def clear_cache(self, key_name: Optional[str] = None):
211+
"""
212+
Clear the cache for a specific secret or all secrets.
213+
214+
Args:
215+
key_name: Optional specific secret to clear. If None, clears all.
216+
"""
217+
if key_name:
218+
self._cache.pop(key_name, None)
219+
else:
220+
self._cache.clear()
221+
222+
223+
secret = Secret()

0 commit comments

Comments
 (0)