forked from strands-agents/devtools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent_runner.py
More file actions
278 lines (228 loc) · 8.95 KB
/
agent_runner.py
File metadata and controls
278 lines (228 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#!/usr/bin/env python3
"""
Strands GitHub Agent Runner
A portable agent runner for use in GitHub Actions across different repositories.
"""
import base64
import json
import os
import sys
from datetime import datetime
from typing import Any
import boto3
from strands import Agent
from strands.telemetry import StrandsTelemetry
from strands.agent.conversation_manager import SlidingWindowConversationManager
from strands.session import S3SessionManager
from strands.models.bedrock import BedrockModel
from botocore.config import Config
from strands_tools import http_request, shell
# Import local GitHub tools we need
from github_tools import (
add_issue_comment,
add_pr_comment,
create_issue,
create_pull_request,
get_issue,
get_issue_comments,
get_pull_request,
get_pr_files,
get_pr_review_and_comments,
list_issues,
list_pull_requests,
reply_to_review_comment,
update_issue,
update_pull_request,
)
# Import local tools we need
from handoff_to_user import handoff_to_user
from notebook import notebook
from str_replace_based_edit_tool import str_replace_based_edit_tool
# Strands configuration constants — matches strands-coder settings
# Opus 4.6 with adaptive thinking and 1M context window
STRANDS_MODEL_ID = "global.anthropic.claude-opus-4-6-v1"
STRANDS_MAX_TOKENS = 128000
STRANDS_REGION = "us-west-2"
# Default values for environment variables used only in this file
DEFAULT_SYSTEM_PROMPT = "You are an autonomous GitHub agent powered by Strands Agents SDK."
def _send_eval_trigger(session_id: str, eval_type: str) -> None:
"""Send evaluation trigger to SQS queue after agent completion.
Only sends if EVALS_SQS_QUEUE_ARN environment variable is set.
Derives queue URL from ARN (format: arn:aws:sqs:{region}:{account_id}:{queue_name}).
Args:
session_id: The unique session ID as stored in Langfuse (may include repo prefix).
eval_type: The evaluation type (e.g., "reviewer", "implementer").
"""
queue_arn = os.environ.get("EVALS_SQS_QUEUE_ARN")
if not queue_arn:
return
# Parse ARN: arn:aws:sqs:{region}:{account_id}:{queue_name}
arn_parts = queue_arn.split(":")
if len(arn_parts) != 6:
print(f"⚠️ Invalid SQS ARN format: {queue_arn}")
return
region = arn_parts[3]
account_id = arn_parts[4]
queue_name = arn_parts[5]
queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}"
try:
sqs_client = boto3.client("sqs", region_name=region)
message_body = json.dumps({
"session_id": session_id,
"eval_type": eval_type
})
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=message_body
)
print(f"✅ Sent eval trigger to SQS: {message_body}")
except Exception as e:
print(f"⚠️ Failed to send eval trigger to SQS: {e}")
def _setup_langfuse_telemetry() -> bool:
"""Set up Langfuse telemetry if environment variables are configured.
Returns:
True if telemetry was successfully configured, False otherwise.
"""
langfuse_public_key = os.environ.get("LANGFUSE_PUBLIC_KEY")
langfuse_secret_key = os.environ.get("LANGFUSE_SECRET_KEY")
langfuse_host = os.environ.get("LANGFUSE_HOST")
if not all([langfuse_public_key, langfuse_secret_key, langfuse_host]):
print("ℹ️ Langfuse telemetry not configured (missing environment variables)")
return False
try:
langfuse_auth = base64.b64encode(
f"{langfuse_public_key}:{langfuse_secret_key}".encode()
).decode()
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{langfuse_host}/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}"
StrandsTelemetry().setup_otlp_exporter()
print("✅ Langfuse telemetry configured successfully")
return True
except Exception as e:
print(f"⚠️ Failed to configure Langfuse telemetry: {e}")
return False
def _get_trace_attributes() -> dict:
"""Build trace attributes from environment context."""
session_id = os.getenv("SESSION_ID", "")
github_actor = os.getenv("GITHUB_ACTOR", "")
github_repository = os.getenv("GITHUB_REPOSITORY", "")
github_workflow = os.getenv("GITHUB_WORKFLOW", "")
github_run_id = os.getenv("GITHUB_RUN_ID", "")
# Include repo name in session ID for uniqueness across repos
# Format: "owner_repo:session-id" (e.g., "strands-agents_sdk-typescript:reviewer-443")
repo_prefix = github_repository.replace("/", "_") if github_repository else "unknown"
unique_session_id = f"{repo_prefix}:{session_id}" if session_id else f"{repo_prefix}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
return {
"session.id": unique_session_id,
"user.id": github_actor,
"langfuse.tags": [
f"repo:{github_repository}",
f"workflow:{github_workflow}",
f"run:{github_run_id}",
"strands-github-agent",
],
}
def _get_all_tools() -> list[Any]:
return [
# File editing
str_replace_based_edit_tool,
# System tools
shell,
http_request,
# GitHub issue tools
create_issue,
get_issue,
update_issue,
list_issues,
add_issue_comment,
get_issue_comments,
# GitHub PR tools
create_pull_request,
get_pull_request,
update_pull_request,
list_pull_requests,
get_pr_files,
get_pr_review_and_comments,
reply_to_review_comment,
add_pr_comment,
# Agent tools
notebook,
handoff_to_user,
]
def run_agent(query: str):
"""Run the agent with the provided query."""
try:
# Set up Langfuse telemetry (optional - gracefully degrades if not configured)
telemetry_enabled = _setup_langfuse_telemetry()
trace_attributes = _get_trace_attributes() if telemetry_enabled else {}
# Get tools and create model
tools = _get_all_tools()
# Create Bedrock model — Opus 4.6 with adaptive thinking and 1M context
additional_request_fields = {}
additional_request_fields["thinking"] = {
"type": "adaptive",
}
model = BedrockModel(
model_id=STRANDS_MODEL_ID,
max_tokens=STRANDS_MAX_TOKENS,
region_name=STRANDS_REGION,
boto_client_config=Config(
read_timeout=900,
connect_timeout=900,
retries={"max_attempts": 3, "mode": "adaptive"},
),
additional_request_fields=additional_request_fields,
cache_prompt="default",
cache_tools="default",
)
system_prompt = os.getenv("INPUT_SYSTEM_PROMPT", DEFAULT_SYSTEM_PROMPT)
session_id = os.getenv("SESSION_ID")
s3_bucket = os.getenv("S3_SESSION_BUCKET")
s3_prefix = os.getenv("GITHUB_REPOSITORY", "")
if s3_bucket and session_id:
print(f"🤖 Using session manager with session ID: {session_id}")
session_manager = S3SessionManager(
session_id=session_id,
bucket=s3_bucket,
prefix=s3_prefix,
)
else:
raise ValueError("Both SESSION_ID and S3_SESSION_BUCKET must be set")
# Create agent with optional trace attributes for Langfuse
agent_kwargs = {
"model": model,
"system_prompt": system_prompt,
"tools": tools,
"session_manager": session_manager,
}
if trace_attributes:
agent_kwargs["trace_attributes"] = trace_attributes
agent = Agent(**agent_kwargs)
print("Processing user query...")
result = agent(query)
print(f"\n\nAgent Result 🤖\nStop Reason: {result.stop_reason}\nMessage: {json.dumps(result.message, indent=2)}")
# Use the unique session ID from trace attributes (includes repo prefix)
unique_session_id = trace_attributes.get("session.id", session_id)
eval_type = session_id.split("-")[0] if "-" in session_id else session_id
_send_eval_trigger(unique_session_id, eval_type)
except Exception as e:
error_msg = f"❌ Agent execution failed: {e}"
print(error_msg)
raise e
def main() -> None:
"""Main entry point for the agent runner."""
try:
# Read task from command line arguments
if len(sys.argv) < 2:
raise ValueError("Task argument is required")
task = " ".join(sys.argv[1:])
if not task.strip():
raise ValueError("Task cannot be empty")
print(f"🤖 Running agent with task: {task}")
run_agent(task)
except Exception as e:
error_msg = f"Fatal error: {e}"
print(error_msg)
sys.exit(1)
if __name__ == "__main__":
main()