-
Notifications
You must be signed in to change notification settings - Fork 92
Expand file tree
/
Copy pathtee_plugin.py
More file actions
93 lines (79 loc) · 3.25 KB
/
tee_plugin.py
File metadata and controls
93 lines (79 loc) · 3.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import json
import requests_unixsocket
from requests import HTTPError
import hashlib
import jwt
import logging
from typing import Dict, Callable, Any, Optional, List, Callable
Audience = "http://aizel.com"
class CustomToken:
def __init__(self, audience, nonce, token_type="OIDC"):
self.audience = audience
self.nonces = [nonce]
self.token_type = token_type
class GcpConfidentialSpace:
def __init__(self, audience: str):
self.audience = audience
def attestation_report(self, nonce: str) -> str:
try:
hashed_nonce = hashlib.sha256(nonce.encode('utf-8')).hexdigest()
request = CustomToken(self.audience, hashed_nonce)
session = requests_unixsocket.Session()
url = 'http+unix://%2Frun%2Fcontainer_launcher%2Fteeserver.sock/v1/token'
headers = {'Content-Type': 'application/json'}
custom_json = json.dumps(request.__dict__)
response = session.post(url, headers=headers, data=custom_json)
response.raise_for_status()
return response.content.decode('utf-8')
except Exception as err:
raise RuntimeError(f"{err}")
class TeePlugin:
def __init__(self, options: Dict[str, Any]) -> None:
self.id: str = options.get("id", "tee_plugin")
self.name: str = options.get("name", "TEE Plugin")
self.description: str = options.get(
"description",
"A plugin that obtains the attestation report in the Trusted Execution Environment.",
)
# tee plugin type, current only support Google Confidential Space
self.type: str = options.get("tee_type", "GCS")
# Define internal function mappings
self._functions: Dict[str, Callable[..., Any]] = {
"get_attestation_report": self._get_attestation_report,
}
# Configure logging
logging.basicConfig(level=logging.INFO)
self.logger: logging.Logger = logging.getLogger(__name__)
def get_function(self, fn_name: str) -> Callable:
"""
Get a specific function by name.
Args:
fn_name: Name of the function to retrieve
Raises:
ValueError: If function name is not found
Returns:
Function object
"""
if fn_name not in self._functions:
raise ValueError(
f"Function '{fn_name}' not found. Available functions: {', '.join(self.available_functions)}"
)
return self._functions[fn_name]
def _get_attestation_report(self, nonce: str) -> str:
if self.type == "GCS":
try:
gcp = GcpConfidentialSpace(Audience)
gcp.attestation_report(nonce)
except RuntimeError as e:
self.logger.error(f"Failed to get attestation report for Google confidential space: {e}")
return ""
else:
raise ValueError(
f"Unsupport tee backend type '{self.type}'. Available type: GCS"
)
def decode_gcp_attestation_report(report: str) -> dict:
try:
decoded_report = jwt.decode(report, options={"verify_signature": False})
return decoded_report
except jwt.InvalidTokenError:
raise ValueError("Invalid token")