-
Notifications
You must be signed in to change notification settings - Fork 63
Expand file tree
/
Copy pathregister_skills_tool.py
More file actions
250 lines (213 loc) · 9.39 KB
/
register_skills_tool.py
File metadata and controls
250 lines (213 loc) · 9.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import os
import zipfile
import frontmatter
from pathlib import Path
from datetime import datetime
from google.adk.tools import ToolContext
from veadk.tools.skills_tools.session_path import get_session_path
from veadk.integrations.ve_tos.ve_tos import VeTOS
from veadk.utils.volcengine_sign import ve_request
from veadk.utils.logger import get_logger
logger = get_logger(__name__)
def register_skills_tool(
skill_local_path: str,
tool_context: ToolContext,
) -> str:
"""Register a skill to the remote skill space by uploading its zip package to TOS and calling the CreateSkill API.
Args:
skill_local_path (str): The local path of the skill directory.
- The format of the skill directory is as follows:
skill_local_path/
SKILL.md
other files...
tool_context (ToolContext): The context of the tool execution.
Returns:
str: Result message indicating success or failure.
"""
working_dir = get_session_path(session_id=tool_context.session.id)
# skill_path = Path(skill_local_path)
raw = Path(skill_local_path).expanduser()
if not raw.is_absolute():
skill_path = (working_dir / raw).resolve()
else:
skill_path = raw.resolve()
if not skill_path.exists() or not skill_path.is_dir():
logger.error(f"Skill path '{skill_path}' does not exist or is not a directory.")
return f"Skill path '{skill_path}' does not exist or is not a directory."
skill_readme = skill_path / "SKILL.md"
if not skill_readme.exists():
logger.error(f"Skill path '{skill_path}' has no SKILL.md file.")
return f"Skill path '{skill_path}' has no SKILL.md file."
try:
skill = frontmatter.load(str(skill_readme))
skill_name = skill.get("name", "")
# skill_description = skill.get("description", "")
except Exception as e:
logger.error(
f"Failed to get skill name and description from {skill_readme}: {e}"
)
return f"Failed to get skill name and description from {skill_readme}: {e}"
zip_file_path = working_dir / "outputs" / f"{skill_name}.zip"
with zipfile.ZipFile(zip_file_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(skill_path):
for file in files:
file_path = Path(root) / file
arcname = Path(skill_name) / file_path.relative_to(skill_path)
zipf.write(file_path, arcname)
try:
from veadk.auth.veauth.utils import get_credential_from_vefaas_iam
cloud_provider = (os.getenv("CLOUD_PROVIDER") or "").lower()
if cloud_provider == "byteplus":
sld = "byteplusapi"
default_region = "ap-southeast-1"
else:
sld = "volcengineapi"
default_region = "cn-beijing"
agentkit_tool_service = os.getenv("AGENTKIT_TOOL_SERVICE_CODE", "agentkit")
region = os.getenv("AGENTKIT_TOOL_REGION", default_region)
agentkit_skill_host = os.getenv(
"AGENTKIT_SKILL_HOST", agentkit_tool_service + "." + region + f".{sld}.com"
)
access_key = os.getenv("VOLCENGINE_ACCESS_KEY")
secret_key = os.getenv("VOLCENGINE_SECRET_KEY")
session_token = ""
if not (access_key and secret_key):
cred = get_credential_from_vefaas_iam()
access_key = cred.access_key_id
secret_key = cred.secret_access_key
session_token = cred.session_token
account_id = ""
if cloud_provider != "vestack":
res = ve_request(
request_body={},
action="GetCallerIdentity",
ak=access_key,
sk=secret_key,
service="sts",
version="2018-01-01",
region=region,
host="sts.volcengineapi.com"
if cloud_provider != "byteplus"
else "open.byteplusapi.com",
header={"X-Security-Token": session_token},
)
try:
account_id = res["Result"]["AccountId"]
except KeyError as e:
logger.error(
f"Error occurred while getting account id: {e}, response is {res}"
)
return f"Error: Failed to get account id when registering skill '{skill_name}'."
tos_bucket = f"agentkit-platform-{region}-{account_id}-skill"
scheme = os.getenv("AGENTKIT_TOP_SCHEME", "https").lower()
if cloud_provider == "vestack":
import requests
# Call GenTempTosObjectUrl API
temp_url_request_body = {
"SkillName": skill_name,
}
logger.debug(f"GenTempTosObjectUrl request body: {temp_url_request_body}")
temp_url_res = ve_request(
request_body=temp_url_request_body,
action="GenTempTosObjectUrl",
ak=access_key,
sk=secret_key,
service=agentkit_tool_service,
version="2025-10-30",
region=region,
host=agentkit_skill_host,
header={"X-Security-Token": session_token},
scheme=scheme,
)
if isinstance(temp_url_res, str):
temp_url_res = json.loads(temp_url_res)
if (
"ResponseMetadata" in temp_url_res
and "Error" in temp_url_res["ResponseMetadata"]
):
error_details = temp_url_res["ResponseMetadata"]["Error"]
logger.error(
f"Failed to get temporary upload URL for '{skill_name}': {error_details}"
)
return f"Failed to get temporary upload URL for '{skill_name}': {error_details}"
signed_url = temp_url_res.get("Result", {}).get("SignedUrl")
tos_url = temp_url_res.get("Result", {}).get("TosUrl")
if not signed_url or not tos_url:
logger.error(
f"Failed to get SignedUrl or TosUrl from GenTempTosObjectUrl response: {temp_url_res}"
)
return f"Failed to get temporary upload URL for '{skill_name}'."
try:
with open(zip_file_path, "rb") as f:
response = requests.put(signed_url, data=f)
response.raise_for_status()
except Exception as e:
logger.error(f"Failed to upload skill '{skill_name}' to minio: {e}")
return f"Failed to upload skill '{skill_name}' to minio: {e}"
else:
tos_client = VeTOS(
ak=access_key,
sk=secret_key,
session_token=session_token,
bucket_name=tos_bucket,
region=region,
)
object_key = (
f"uploads/{datetime.now().strftime('%Y%m%d_%H%M%S')}/{skill_name}.zip"
)
tos_client.upload_file(
file_path=zip_file_path, bucket_name=tos_bucket, object_key=object_key
)
tos_url = tos_client.build_tos_url(
bucket_name=tos_bucket, object_key=object_key
)
skill_space_ids = os.getenv("SKILL_SPACE_ID", "")
skill_space_ids_list = [
x.strip() for x in skill_space_ids.split(",") if x.strip()
]
request_body = {
"TosUrl": tos_url,
"SkillSpaces": skill_space_ids_list,
}
logger.debug(f"CreateSkill request body: {request_body}")
response = ve_request(
request_body=request_body,
action="CreateSkill",
ak=access_key,
sk=secret_key,
service=agentkit_tool_service,
version="2025-10-30",
region=region,
host=agentkit_skill_host,
header={"X-Security-Token": session_token},
scheme=scheme,
)
if isinstance(response, str):
response = json.loads(response)
logger.debug(f"CreateSkill response: {response}")
if "ResponseMetadata" in response and "Error" in response["ResponseMetadata"]:
error_details = response["ResponseMetadata"]["Error"]
logger.error(f"Failed to register skill '{skill_name}': {error_details}")
return f"Failed to register skill '{skill_name}': {error_details}"
logger.info(
f"Successfully registered skill '{skill_name}' to skill space {skill_space_ids_list}."
)
return f"Successfully registered skill '{skill_name}' to skill space {skill_space_ids_list}."
except Exception as e:
logger.error(f"Failed to register skill '{skill_name}': {e}")
return f"Failed to register skill '{skill_name}'"