feat(aenv): Integrating aenvironment for environment handling#1018
feat(aenv): Integrating aenvironment for environment handling#1018guozhihao-224 wants to merge 2 commits intoinclusionAI:mainfrom
Conversation
|
/gemini review |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant new capability by integrating AEnvironment into the AReaL framework. This integration allows for sophisticated multi-turn tool calling within RL training, empowering models to dynamically interact with external environments and tools. The changes provide a structured and configurable way to manage these interactions, ensuring robustness and flexibility for advanced RL applications. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces the AEnvironment integration for AReaL, enabling MCP-based multi-turn tool calling for RL training. It includes new modules for configuration, adapter, schema conversion, and workflow management, along with corresponding unit and integration tests. The changes also update the logging utility to include new components. The review focuses on potential runtime errors, error handling, and code clarity.
| raise RuntimeError( | ||
| f"Tool call failed after {max_attempts} attempts: {tool_name}" | ||
| ) from last_error |
There was a problem hiding this comment.
high: The RuntimeError raised here doesn't provide specific context about the tool name that failed. Including the tool name in the exception message would significantly aid debugging.
Also, consider logging the last_error for more detailed diagnostics.
| raise RuntimeError( | |
| f"Tool call failed after {max_attempts} attempts: {tool_name}" | |
| ) from last_error | |
| raise RuntimeError( | |
| f"Tool call failed after {max_attempts} attempts: {tool_name}. Last error: {last_error}" | |
| ) from last_error |
| if not tool_name: | ||
| if self.aenv_config.tool_error_policy == "raise": | ||
| raise RuntimeError("Tool call is missing function name") |
There was a problem hiding this comment.
high: If tool_name is missing and tool_error_policy is set to raise, it's more appropriate to raise a ValueError since the tool name is a required field. This clearly indicates that the tool call is malformed.
if self.aenv_config.tool_error_policy == "raise":
raise ValueError("Tool call is missing function name")| if self._env is None: | ||
| raise RuntimeError("Environment not initialized. Call initialize() first.") |
There was a problem hiding this comment.
medium: It's crucial to ensure that self._env is properly initialized before proceeding. Raising a more specific exception type, such as AenvNotInitializedError, could provide better clarity for debugging and error handling.
| if self._env is None: | |
| raise RuntimeError("Environment not initialized. Call initialize() first.") | |
| class AenvNotInitializedError(Exception): | |
| """Exception raised when AEnvironment is not initialized.""" | |
| pass | |
| async def list_tools(self, use_cache: bool = True) -> list[dict[str, Any]]: | |
| """List tools exposed by the current environment.""" | |
| if self._env is None: | |
| raise AenvNotInitializedError("Environment not initialized. Call initialize() first.") |
| if not self.aenv_url: | ||
| raise ValueError("aenv_url must be non-empty") |
| # Argument parsing failed - log detailed context before handling | ||
| logger.warning( |
There was a problem hiding this comment.
There was a problem hiding this comment.
Code Review
This pull request introduces a new AEnvironment integration, enabling AReaL workflows to execute model tool calls through an AenvEnvironmentAdapter. Key changes include defining configuration (AenvConfig), adapting aenvironment SDK interactions (initialization, tool listing, and execution with retry logic), and providing schema helpers for OpenAI tool compatibility. A new AenvWorkflow class orchestrates multi-turn interactions, handling tool calls and reward computation. The logging utility is updated to include new workflow and adapter types. Review comments highlight several areas for improvement, including a high-severity arbitrary code execution vulnerability due to dynamic reward_fn imports, a medium-severity LLM tool abuse vulnerability requiring tool name and argument validation, and suggestions for more robust error handling with specific exception types and improved logging context in the adapter and workflow components. Additionally, a minor comment suggests renaming a logging category for better description.
| if not tool_name: | ||
| if self.aenv_config.tool_error_policy == "raise": | ||
| raise RuntimeError("Tool call is missing function name") | ||
| messages.append( | ||
| { | ||
| "role": "tool", | ||
| "content": "Error: missing tool function name", | ||
| "tool_call_id": tool_call_id, | ||
| } | ||
| ) |
There was a problem hiding this comment.
high: If tool_name is missing and tool_error_policy is set to raise, a RuntimeError is raised. However, if the policy is not raise, an error message is appended to the messages list. It would be better to append the error message regardless of the tool_error_policy to provide more comprehensive error reporting.
if self.aenv_config.tool_error_policy == "raise":
raise RuntimeError("Tool call is missing function name")
messages.append(
{
"role": "tool",
"content": "Error: missing tool function name",
"tool_call_id": tool_call_id,
}
)
continue| except Exception as exc: # pragma: no cover - defensive cleanup path | ||
| logger.warning(f"Failed to release AEnvironment instance: {exc}") |
There was a problem hiding this comment.
| if attempt >= self.config.max_retries or not self._is_retriable_error( | ||
| exc | ||
| ): |
There was a problem hiding this comment.
medium: It might be beneficial to include the last_error in the log message for better debugging information.
delay = self.config.retry_delay * (2**attempt)
logger.warning(
"Retriable tool call failure",
extra={
"tool_name": tool_name,
"attempt": attempt + 1,
"max_attempts": max_attempts,
"delay": delay,
"error": str(exc),
},
)
await asyncio.sleep(delay)| if tool_round_count >= self.max_turns: | ||
| logger.warning( | ||
| "Reached max_turns; stop before executing additional tool calls" | ||
| ) |
There was a problem hiding this comment.
medium: Consider adding more context to the log message, such as the current tool_round_count and max_turns values, to provide more insight into why the tool calls are being stopped.
| if tool_round_count >= self.max_turns: | |
| logger.warning( | |
| "Reached max_turns; stop before executing additional tool calls" | |
| ) | |
| if tool_round_count >= self.max_turns: | |
| logger.warning( | |
| "Reached max_turns; stop before executing additional tool calls" | |
| ) | |
| break |
| # Argument parsing failed - log detailed context before handling | ||
| logger.warning( | ||
| "Failed to parse tool arguments", | ||
| extra={ | ||
| "tool_name": tool_name, | ||
| "tool_call_id": tool_call_id, | ||
| "raw_arguments": repr(raw_arguments)[ | ||
| :500 | ||
| ], # Truncate for safety | ||
| "error": str(exc), | ||
| }, |
There was a problem hiding this comment.
medium: Consider including the exception type in the log message for better debugging.
# Argument parsing failed - log detailed context before handling
logger.warning(
"Failed to parse tool arguments",
extra={
"tool_name": tool_name,
"tool_call_id": tool_call_id,
"raw_arguments": repr(raw_arguments)[
:500
], # Truncate for safety
"error": str(exc),
},
)| def _render_tool_content(content: Any) -> str: | ||
| if isinstance(content, str): | ||
| return content | ||
| if isinstance(content, list): | ||
| chunks: list[str] = [] | ||
| for item in content: | ||
| if isinstance(item, dict): | ||
| if item.get("type") == "text" and "text" in item: | ||
| chunks.append(str(item["text"])) | ||
| else: | ||
| chunks.append(json.dumps(item, sort_keys=True)) | ||
| else: | ||
| chunks.append(str(item)) | ||
| return "\n".join(chunks) | ||
| if isinstance(content, dict): | ||
| return json.dumps(content, sort_keys=True) | ||
| return str(content) |
There was a problem hiding this comment.
medium: The _render_tool_content method could benefit from more robust error handling. If json.dumps fails, it might be useful to log the error and return a default string to prevent the entire workflow from crashing.
if isinstance(content, str):
return content
if isinstance(content, list):
chunks: list[str] = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text" and "text" in item:
chunks.append(str(item["text"]))
else:
chunks.append(json.dumps(item, sort_keys=True))
else:
chunks.append(str(item))
return "\n".join(chunks)
if isinstance(content, dict):
return json.dumps(content, sort_keys=True)
return str(content)There was a problem hiding this comment.
Code Review
The pull request introduces AEnvironment integration for AReaL workflows, including new modules for adapter, configuration, and schema handling. Review comments suggest raising an exception when a tool name is missing in AenvWorkflow if the error policy is set to "raise", sanitizing the aenv_url in the AenvEnvironmentAdapter to prevent logging sensitive information, validating the tool_name against the list of discovered tools in _execute_tool_calls to prevent prompt injection, catching specific exceptions in the release() call to avoid masking other issues, and including the exception and arguments in the RuntimeError raised when a tool call fails.
| if not tool_name: | ||
| if self.aenv_config.tool_error_policy == "raise": | ||
| raise RuntimeError("Tool call is missing function name") |
There was a problem hiding this comment.
If tool_name is indeed missing, it indicates a serious issue with the tool call structure. Instead of just logging an error, consider raising an exception, especially if self.aenv_config.tool_error_policy is set to 'raise'. This will halt the process and prevent potentially incorrect behavior.
if self.aenv_config.tool_error_policy == "raise":
raise RuntimeError("Tool call is missing function name")
else:
messages.append(
{
"role": "tool",
"content": "Error: missing tool function name",
"tool_call_id": tool_call_id,
}
)| logger.info( | ||
| "Initializing AEnvironment instance", | ||
| extra={"env_name": self.config.env_name, "aenv_url": self.config.aenv_url}, | ||
| ) |
There was a problem hiding this comment.
The initialize method logs the aenv_url configuration value. This URL may contain sensitive information such as credentials (e.g., basic authentication or API keys in the query string). Logging this information can lead to unauthorized access if logs are compromised. It is recommended to sanitize the URL before logging or avoid logging the full URL if it contains sensitive data.
| logger.info( | |
| "Initializing AEnvironment instance", | |
| extra={"env_name": self.config.env_name, "aenv_url": self.config.aenv_url}, | |
| ) | |
| logger.info( | |
| "Initializing AEnvironment instance", | |
| extra={"env_name": self.config.env_name}, | |
| ) |
| except Exception as exc: # pragma: no cover - defensive cleanup path | ||
| logger.warning(f"Failed to release AEnvironment instance: {exc}") |
There was a problem hiding this comment.
The except Exception as exc block is a very broad catch. It would be better to catch specific exceptions that you anticipate might occur during the release() call, such as network-related exceptions or exceptions raised by the aenv library itself. This will prevent unexpected exceptions from being silently caught and potentially masking other issues.
| except Exception as exc: # pragma: no cover - defensive cleanup path | |
| logger.warning(f"Failed to release AEnvironment instance: {exc}") | |
| try: | |
| await self._env.release() | |
| except (OSError, asyncio.TimeoutError) as exc: # Example specific exceptions | |
| logger.warning(f"Failed to release AEnvironment instance: {exc}") |
| raise RuntimeError( | ||
| f"Tool call failed after {max_attempts} attempts: {tool_name}" | ||
| ) from last_error |
There was a problem hiding this comment.
The RuntimeError being raised here includes the tool name, which is good for debugging. However, it might be helpful to also include the specific exception that caused the tool call to fail, as well as the arguments passed to the tool. This would provide more context for diagnosing the issue.
| raise RuntimeError( | |
| f"Tool call failed after {max_attempts} attempts: {tool_name}" | |
| ) from last_error | |
| raise RuntimeError( | |
| f"Tool call failed after {max_attempts} attempts: {tool_name} with arguments {arguments}. Last error: {last_error}" | |
| ) from last_error |
|
cc @rchardx I'm not entirely sure if the current implementation path meets the requirements. |
|
After reading the documentation, I found that the current implementation using the inheritance of RolloutWorkflow seems incorrect and should be changed to a proxy approach, using something like AenvAgent ? @rchardx @garrett4wade |
Your instinct is correct. After reviewing the code and documentation, I believe the current The Two Integration ParadigmsAReaL documents two distinct paradigms for agent/workflow integration (see
The documentation explicitly states:
The current Precedent: Existing Agent WorkflowsAll recently added agent integrations in AReaL follow the Proxy approach:
None of them inherit Precedent: AEnvironment's Own tau2_rl ExampleThe AEnvironment SDK's official RL training example ( async def run_agent_return_reward(data: Dict[str, Any]) -> float:
env = Environment(env_name="tau2-env@1.0.0", ...)
await env.initialize()
openai_tools = await env.list_openai_tools()
agent = OpenAIAgent(tools=openai_tools, instructions=...)
# ... agent loop ...
reward = await env.call_reward({})
return float(reward.get("total_reward", 0.0))No Suggested RefactoringRename class AenvAgent:
"""Agent that uses AEnvironment for MCP-based tool execution."""
def __init__(
self,
aenv_config: AenvConfig | None = None,
reward_fn: Callable | str | None = None,
max_turns: int = 8,
system_prompt: str | None = None,
):
self.aenv_config = aenv_config or AenvConfig()
self.reward_fn = import_from_string(reward_fn) if isinstance(reward_fn, str) else reward_fn
self.max_turns = max_turns
self.system_prompt = system_prompt
async def run(self, data: dict, **extra_kwargs) -> float:
# Standard OpenAI SDK — AReaL proxy handles token tracking automatically
client = AsyncOpenAI(
base_url=extra_kwargs.get("base_url") or os.getenv("OPENAI_BASE_URL"),
api_key=extra_kwargs.get("api_key") or os.getenv("OPENAI_API_KEY"),
http_client=extra_kwargs.get("http_client"),
max_retries=0,
)
async with AenvEnvironmentAdapter(self.aenv_config) as env:
tools = normalize_openai_tools(await env.list_tools())
messages = self._build_messages(data)
for _ in range(self.max_turns):
response = await client.chat.completions.create(
model="default",
messages=messages,
tools=tools or NOT_GIVEN,
tool_choice="auto" if tools else NOT_GIVEN,
)
msg = response.choices[0].message
messages.append(msg.model_dump(exclude_none=True))
if not msg.tool_calls:
break
for tc in msg.tool_calls:
args = parse_tool_arguments(tc.function.arguments)
result = await env.call_tool(tc.function.name, args)
messages.append({
"role": "tool",
"content": self._render_tool_content(result.content),
"tool_call_id": tc.id,
})
# Just return reward — AReaL handles discount, export, token tracking
if self.reward_fn:
return float(await self.reward_fn(messages=messages, answer=data.get("answer")))
return 0.0Usage stays almost the same: trainer.train(
workflow="areal.workflow.aenv.AenvAgent",
workflow_kwargs={
"aenv_config": AenvConfig(env_name="my-env@1.0.0"),
"reward_fn": "areal.reward.gsm8k.gsm8k_reward_fn",
"max_turns": 8,
},
)What This EliminatesBy switching to the proxy pattern, you can remove ~200 lines of manual lifecycle management that AReaL handles automatically:
What to KeepThe infrastructure modules under
Caveats
Hope this helps clarify the architectural direction. The bottom line: the |
|
This pull request has been automatically marked as stale because it has not had recent activity within the last 14 days. Please add a comment or push new commits to keep it active. Thank you for your contribution! |
Description
Related Issue
This feature is part of the 2026 Q1 Roadmap (see #907 - "Integrating aenvironment for environment handling").
Summary
Integrate AEnvironment to support MCP-based multi-turn tool calling for RL training.
New Modules
Key Components
AenvConfigAenvEnvironmentAdapterAenvWorkflownormalize_openai_tools()parse_tool_arguments()Usage
Tests
RUN_AENV_INTEGRATION=1)Compatibility
Type of Change
work as expected)
Checklist
jb build docs/gemini review)Breaking Change Details (if applicable):
Additional Context
Need help? Check the Contributing Guide or ask in
GitHub Discussions!