Skip to content

Commit f2f51c4

Browse files
committed
Merge branch 'main.platfor_support.sw' of https://github.com/DelineaXPM/python-tss-sdk into main.platfor_support.sw
2 parents 2a8b913 + 56cd31f commit f2f51c4

3 files changed

Lines changed: 78 additions & 40 deletions

File tree

conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def env_vars():
1818
"folder_path": os.getenv("TSS_FOLDER_PATH"),
1919
}
2020

21+
2122
@pytest.fixture
2223
def platform_env_vars():
2324
return {
@@ -30,6 +31,7 @@ def platform_env_vars():
3031
"folder_path": os.getenv("TSS_FOLDER_PATH"),
3132
}
3233

34+
3335
@pytest.fixture
3436
def authorizer(env_vars):
3537
return PasswordGrantAuthorizer(
@@ -38,20 +40,25 @@ def authorizer(env_vars):
3840
env_vars["password"],
3941
)
4042

43+
4144
@pytest.fixture
4245
def platform_authorizer(platform_env_vars):
4346
from delinea.secrets.server import PasswordGrantAuthorizer
47+
4448
return PasswordGrantAuthorizer(
4549
platform_env_vars["base_url"],
4650
platform_env_vars["username"],
4751
platform_env_vars["password"],
4852
)
4953

54+
5055
@pytest.fixture
5156
def secret_server(env_vars, authorizer):
5257
return SecretServerCloud(env_vars["tenant"], authorizer)
5358

59+
5460
@pytest.fixture
5561
def platform_server(platform_env_vars, platform_authorizer):
5662
from delinea.secrets.server import SecretServer
63+
5764
return SecretServer(platform_env_vars["base_url"], platform_authorizer)

delinea/secrets/server.py

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ class AccessTokenAuthorizer(Authorizer):
198198
def get_access_token(self):
199199
return self.access_token
200200

201-
def __init__(self, access_token, server_type='secret_server'):
201+
def __init__(self, access_token, server_type="secret_server"):
202202
self.access_token = access_token
203203
self._server_type = server_type.lower()
204204

@@ -221,7 +221,9 @@ def _detect_server_type(self):
221221
if self._check_json_response(platform_health_url):
222222
self._server_type = "platform"
223223
return
224-
raise SecretServerError("Unable to detect server type via health check endpoints.")
224+
raise SecretServerError(
225+
"Unable to detect server type via health check endpoints."
226+
)
225227

226228
def _check_json_response(self, url):
227229
"""Python equivalent of Go checkJSONResponse for health check detection."""
@@ -284,7 +286,9 @@ def _refresh(self, seconds_of_drift=300):
284286
else:
285287
raise SecretServerError("Unknown server type for token request.")
286288
if self._server_type == "secret_server":
287-
self.token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/")
289+
self.token_url = (
290+
self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/")
291+
)
288292
grant_request = {
289293
"username": self.username,
290294
"password": self.password,
@@ -295,7 +299,9 @@ def _refresh(self, seconds_of_drift=300):
295299
self.access_grant = self.get_access_grant(self.token_url, grant_request)
296300
self.access_grant_refreshed = datetime.now()
297301
elif self._server_type == "platform":
298-
self.token_url = self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/")
302+
self.token_url = (
303+
self.base_url.rstrip("/") + "/" + self.token_path_uri.strip("/")
304+
)
299305
grant_request = {
300306
"client_id": self.username,
301307
"client_secret": self.password,
@@ -332,7 +338,9 @@ def __init__(
332338
password,
333339
token_path_uri=None,
334340
):
335-
super().__init__(base_url, username, password, token_path_uri=token_path_uri, domain=domain)
341+
super().__init__(
342+
base_url, username, password, token_path_uri=token_path_uri, domain=domain
343+
)
336344

337345

338346
class SecretServer:
@@ -378,11 +386,11 @@ def headers(self):
378386
return self.authorizer.headers()
379387

380388
def __init__(
381-
self,
382-
base_url,
383-
authorizer: Authorizer,
384-
api_path_uri=API_PATH_URI,
385-
):
389+
self,
390+
base_url,
391+
authorizer: Authorizer,
392+
api_path_uri=API_PATH_URI,
393+
):
386394
"""
387395
:param base_url: The base URL e.g. ``http://localhost/SecretServer``
388396
:type base_url: str
@@ -399,18 +407,23 @@ def __init__(
399407
@property
400408
def api_url(self):
401409
return f"{self.base_url}/{self._api_path_uri.strip('/')}"
402-
410+
403411
def ensure_vault_url(self):
404412
"""For platform, fetch and set the vault URL before making API calls."""
405413
# Only needed for platform scenario
406-
if hasattr(self.authorizer, "_server_type") and self.authorizer._server_type == "platform":
414+
if (
415+
hasattr(self.authorizer, "_server_type")
416+
and self.authorizer._server_type == "platform"
417+
):
407418
if not hasattr(self, "_vault_url_fetched") or not self._vault_url_fetched:
408419
access_token = self.authorizer.get_access_token()
409420
vaults_endpoint = self.platform_url + "/vaultbroker/api/vaults"
410421
headers = {"Authorization": f"Bearer {access_token}"}
411422
resp = requests.get(vaults_endpoint, headers=headers, timeout=60)
412423
if resp.status_code != 200:
413-
raise SecretServerError(f"Failed to fetch vault details: HTTP {resp.status_code} - {resp.text}")
424+
raise SecretServerError(
425+
f"Failed to fetch vault details: HTTP {resp.status_code} - {resp.text}"
426+
)
414427
try:
415428
data = resp.json()
416429
except Exception as ex:
@@ -423,8 +436,9 @@ def ensure_vault_url(self):
423436
self.base_url = url.rstrip("/")
424437
self._vault_url_fetched = True
425438
return
426-
raise SecretServerError("No configured default and active vault found in vault details.")
427-
439+
raise SecretServerError(
440+
"No configured default and active vault found in vault details."
441+
)
428442

429443
def get_secret_json(self, id, query_params=None):
430444
"""Gets a Secret from Secret Server
@@ -440,7 +454,7 @@ def get_secret_json(self, id, query_params=None):
440454
:raise: :class:`SecretServerError` when the REST API call fails for
441455
any other reason
442456
"""
443-
headers=self.headers()
457+
headers = self.headers()
444458
self.ensure_vault_url()
445459
endpoint_url = f"{self.api_url}/secrets/{id}"
446460

@@ -472,7 +486,7 @@ def get_folder_json(self, id, query_params=None, get_all_children=True):
472486
:raise: :class:`SecretServerError` when the REST API call fails for
473487
any other reason
474488
"""
475-
headers=self.headers()
489+
headers = self.headers()
476490
self.ensure_vault_url()
477491
endpoint_url = f"{self.api_url}/folders/{id}"
478492

@@ -615,7 +629,7 @@ def search_secrets(self, query_params=None):
615629
:raise: :class:`SecretServerError` when the REST API call fails for
616630
any other reason
617631
"""
618-
headers=self.headers()
632+
headers = self.headers()
619633
self.ensure_vault_url()
620634
endpoint_url = f"{self.api_url}/secrets"
621635

@@ -645,7 +659,7 @@ def lookup_folders(self, query_params=None):
645659
:raise: :class:`SecretServerError` when the REST API call fails for
646660
any other reason
647661
"""
648-
headers=self.headers()
662+
headers = self.headers()
649663
self.ensure_vault_url()
650664
endpoint_url = f"{self.api_url}/folders/lookup"
651665

@@ -672,14 +686,12 @@ def get_secret_ids_by_folderid(self, folder_id):
672686
:raise: :class:`SecretServerError` when the REST API call fails for
673687
any other reason
674688
"""
675-
headers=self.headers()
689+
headers = self.headers()
676690
self.ensure_vault_url()
677691
params = {"filter.folderId": folder_id}
678692
endpoint_url = f"{self.api_url}/secrets/search-total"
679693
params["take"] = self.process(
680-
requests.get(
681-
endpoint_url, params=params, headers=headers, timeout=60
682-
)
694+
requests.get(endpoint_url, params=params, headers=headers, timeout=60)
683695
).text
684696
response = self.search_secrets(query_params=params)
685697

@@ -705,7 +717,7 @@ def get_child_folder_ids_by_folderid(self, folder_id):
705717
:raise: :class:`SecretServerError` when the REST API call fails for
706718
any other reason
707719
"""
708-
headers=self.headers()
720+
headers = self.headers()
709721
self.ensure_vault_url()
710722
params = {
711723
"filter.parentFolderId": folder_id,
@@ -756,9 +768,7 @@ def __init__(
756768
):
757769
super().__init__(
758770
base_url,
759-
PasswordGrantAuthorizer(
760-
f"{base_url}", username, password, token_path_uri
761-
),
771+
PasswordGrantAuthorizer(f"{base_url}", username, password, token_path_uri),
762772
api_path_uri,
763773
)
764774

@@ -779,7 +789,9 @@ class SecretServerCloud(SecretServer):
779789

780790
def __init__(self, tenant=None, authorizer=None, tld=DEFAULT_TLD, base_url=None):
781791
if authorizer is None or not isinstance(authorizer, Authorizer):
782-
raise ValueError("authorizer must be provided and must be of type Authorizer")
792+
raise ValueError(
793+
"authorizer must be provided and must be of type Authorizer"
794+
)
783795
if tenant:
784796
url = self.URL_TEMPLATE.format(tenant, tld)
785797
elif base_url:

tests/test_server.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def test_server_child_folder_ids_by_folderid(env_vars, secret_server):
7979
is list
8080
)
8181

82+
8283
def test_platform_bad_url(platform_env_vars, platform_authorizer):
8384
bad_server = SecretServer(
8485
f"{platform_env_vars['base_url']}/nonexistent",
@@ -87,53 +88,71 @@ def test_platform_bad_url(platform_env_vars, platform_authorizer):
8788
with pytest.raises(SecretServerError):
8889
bad_server.get_secret(platform_env_vars["secret_id"])
8990

91+
9092
def test_platform_token_url(platform_env_vars, platform_authorizer):
9193
platform_authorizer.get_access_token()
9294
assert (
9395
platform_authorizer.token_url
9496
== f"{platform_env_vars['base_url']}/identity/api/oauth2/token/xpmplatform"
9597
)
9698

99+
97100
def test_platform_api_url(platform_server, platform_env_vars):
98-
assert (
99-
platform_server.api_url
100-
== f"{platform_env_vars['base_url']}/api/v1"
101-
)
101+
assert platform_server.api_url == f"{platform_env_vars['base_url']}/api/v1"
102+
102103

103104
def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer):
104105
assert SecretServer(
105106
platform_env_vars["base_url"],
106-
AccessTokenAuthorizer(platform_authorizer.get_access_token(), 'platform'),
107-
).get_secret(platform_env_vars["secret_id"])["id"] == int(platform_env_vars["secret_id"])
108-
109-
def test_platform_server_secret(platform_env_vars, platform_server):
110-
assert ServerSecret(**platform_server.get_secret(platform_env_vars["secret_id"])).id == int(
107+
AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"),
108+
).get_secret(platform_env_vars["secret_id"])["id"] == int(
111109
platform_env_vars["secret_id"]
112110
)
113111

112+
113+
def test_platform_server_secret(platform_env_vars, platform_server):
114+
assert ServerSecret(
115+
**platform_server.get_secret(platform_env_vars["secret_id"])
116+
).id == int(platform_env_vars["secret_id"])
117+
118+
114119
def test_platform_server_secret_by_path(platform_env_vars, platform_server):
115120
assert ServerSecret(
116121
**platform_server.get_secret_by_path(platform_env_vars["secret_path"])
117122
).id == int(platform_env_vars["secret_id"])
118123

124+
119125
def test_platform_server_folder_by_path(platform_env_vars, platform_server):
120126
assert ServerFolder(
121127
**platform_server.get_folder_by_path(platform_env_vars["folder_path"])
122128
).id == int(platform_env_vars["folder_id"])
123129

130+
124131
def test_platform_nonexistent_secret(platform_server):
125132
with pytest.raises(SecretServerClientError):
126133
platform_server.get_secret(1000)
127134

135+
128136
def test_platform_nonexistent_folder(platform_server):
129137
with pytest.raises(SecretServerClientError):
130138
platform_server.get_folder(1000)
131139

140+
132141
def test_platform_server_secret_ids_by_folderid(platform_env_vars, platform_server):
133-
assert type(platform_server.get_secret_ids_by_folderid(platform_env_vars["folder_id"])) is list
142+
assert (
143+
type(platform_server.get_secret_ids_by_folderid(platform_env_vars["folder_id"]))
144+
is list
145+
)
146+
134147

135-
def test_platform_server_child_folder_ids_by_folderid(platform_env_vars, platform_server):
148+
def test_platform_server_child_folder_ids_by_folderid(
149+
platform_env_vars, platform_server
150+
):
136151
assert (
137-
type(platform_server.get_child_folder_ids_by_folderid(platform_env_vars["folder_id"]))
152+
type(
153+
platform_server.get_child_folder_ids_by_folderid(
154+
platform_env_vars["folder_id"]
155+
)
156+
)
138157
is list
139158
)

0 commit comments

Comments
 (0)