Skip to content

Commit 6c35dee

Browse files
Merge pull request #80 from DelineaXPM/remove.server_type.flag
Refactor: Implement automatic server type detection for AccessTokenAuthorizer
2 parents 7b9bb1c + 69df533 commit 6c35dee

4 files changed

Lines changed: 58 additions & 40 deletions

File tree

README.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ If using traditional `username` and `password` authentication to log in to your
3737
```python
3838
from delinea.secrets.server import PasswordGrantAuthorizer
3939

40-
authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password")")
40+
authorizer = PasswordGrantAuthorizer("https://hostname/SecretServer", os.getenv("myusername"), os.getenv("password"))
4141
```
4242

4343
##### With Platform
@@ -60,12 +60,21 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g
6060

6161
#### Access Token Authorization
6262

63-
If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`.
63+
If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`.
64+
65+
##### With Secret Server
66+
```python
67+
from delinea.secrets.server import AccessTokenAuthorizer
68+
69+
authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://hostname/SecretServer")
70+
```
71+
72+
##### With Platform
6473

6574
```python
6675
from delinea.secrets.server import AccessTokenAuthorizer
6776

68-
authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...")
77+
authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://platform.delinea.app")
6978
```
7079

7180
## Secret Server Cloud

delinea/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""The Delinea Secret Server Python SDK"""
22

3-
__version__ = "2.0.0"
3+
__version__ = "2.0.1"

delinea/secrets/server.py

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,39 @@ def add_bearer_token_authorization_header(bearer_token, existing_headers={}):
179179
**existing_headers,
180180
}
181181

182+
def _perform_server_detection(self, base_url):
183+
"""Detects if the server is Secret Server or Platform by health check endpoints."""
184+
secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck"
185+
platform_endpoint = base_url.rstrip("/") + "/health"
186+
187+
if self._validate_health_endpoint(secret_server_endpoint):
188+
self._server_type = "secret_server"
189+
return
190+
if self._validate_health_endpoint(platform_endpoint):
191+
self._server_type = "platform"
192+
return
193+
raise SecretServerError(
194+
"Unable to detect server type via health check endpoints."
195+
)
196+
197+
def _validate_health_endpoint(self, url):
198+
"""Validates if an endpoint returns healthy status."""
199+
try:
200+
response = requests.get(url, timeout=60)
201+
except Exception:
202+
return False
203+
204+
try:
205+
response_body = response.content
206+
except Exception:
207+
return False
208+
209+
try:
210+
json_data = response.json()
211+
return json_data.get("Healthy", False)
212+
except Exception:
213+
return b"Healthy" in response_body or b"healthy" in response_body
214+
182215
@abstractmethod
183216
def get_access_token(self):
184217
"""Returns the access_token from a Grant Request"""
@@ -198,9 +231,10 @@ class AccessTokenAuthorizer(Authorizer):
198231
def get_access_token(self):
199232
return self.access_token
200233

201-
def __init__(self, access_token, server_type="secret_server"):
234+
def __init__(self, access_token, base_url):
202235
self.access_token = access_token
203-
self._server_type = server_type.lower()
236+
self.base_url = base_url.rstrip("/")
237+
self._perform_server_detection(self.base_url)
204238

205239

206240
class PasswordGrantAuthorizer(Authorizer):
@@ -211,37 +245,6 @@ class PasswordGrantAuthorizer(Authorizer):
211245
TOKEN_PATH_URI = "/oauth2/token"
212246
PLATFORM_TOKEN_PATH_URI = "/identity/api/oauth2/token/xpmplatform"
213247

214-
def _detect_server_type(self):
215-
"""Detects if the server is Secret Server or Platform by health check endpoints, using _check_json_response."""
216-
ss_health_url = self.base_url.rstrip("/") + "/api/v1/healthcheck"
217-
platform_health_url = self.base_url.rstrip("/") + "/health"
218-
if self._check_json_response(ss_health_url):
219-
self._server_type = "secret_server"
220-
return
221-
if self._check_json_response(platform_health_url):
222-
self._server_type = "platform"
223-
return
224-
raise SecretServerError(
225-
"Unable to detect server type via health check endpoints."
226-
)
227-
228-
def _check_json_response(self, url):
229-
"""Python equivalent of Go checkJSONResponse for health check detection."""
230-
try:
231-
resp = requests.get(url, timeout=60)
232-
except Exception:
233-
return False
234-
try:
235-
body = resp.content
236-
except Exception:
237-
return False
238-
try:
239-
data = resp.json()
240-
healthy = data.get("Healthy", False)
241-
return healthy
242-
except Exception:
243-
return b"Healthy" in body or b"healthy" in body
244-
245248
@staticmethod
246249
def get_access_grant(token_url, grant_request):
247250
"""Gets an *OAuth2 Access Grant* by calling the Secret Server REST API
@@ -276,7 +279,7 @@ def _refresh(self, seconds_of_drift=300):
276279
else:
277280
# Detect server type if not already done
278281
if not hasattr(self, "_server_type"):
279-
self._detect_server_type()
282+
self._perform_server_detection(self.base_url)
280283
# Decide token_path_uri if not provided
281284
if not self.token_path_uri:
282285
if self._server_type == "secret_server":

tests/test_server.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ def test_api_url(secret_server, env_vars):
3737
def test_access_token_authorizer(env_vars, authorizer):
3838
assert SecretServer(
3939
f"https://{env_vars['tenant']}.secretservercloud.com/",
40-
AccessTokenAuthorizer(authorizer.get_access_token()),
40+
AccessTokenAuthorizer(
41+
authorizer.get_access_token(),
42+
f"https://{env_vars['tenant']}.secretservercloud.com/",
43+
),
4144
).get_secret(env_vars["secret_id"])["id"] == int(env_vars["secret_id"])
4245

4346

@@ -104,7 +107,10 @@ def test_platform_api_url(platform_server, platform_env_vars):
104107
def test_platform_access_token_authorizer(platform_env_vars, platform_authorizer):
105108
assert SecretServer(
106109
platform_env_vars["platform_base_url"],
107-
AccessTokenAuthorizer(platform_authorizer.get_access_token(), "platform"),
110+
AccessTokenAuthorizer(
111+
platform_authorizer.get_access_token(),
112+
platform_env_vars["platform_base_url"],
113+
),
108114
).get_secret(platform_env_vars["secret_id"])["id"] == int(
109115
platform_env_vars["secret_id"]
110116
)

0 commit comments

Comments
 (0)