fix: Tool workflow trigger execution error#4969
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
| state=State.FAILURE, | ||
| run_time=time.time() - start_time, | ||
| meta={'input': parameter_setting, 'output': 'Error: ' + str(e), 'err_message': 'Error: ' + str(e)} | ||
| ) |
There was a problem hiding this comment.
There are several issues and areas for improvement in the provided Python code:
Issues Found
-
Global Variable Usage:
- The
executorvariable is used globally within the class definitions, which can lead to confusion and potential bugs if accessed elsewhere in your codebase.
- The
-
Unnecessary Logging:
- While logging provides debug information, it’s good practice to ensure that logs are only active when needed to avoid unnecessary I/O operations.
# Enable logging based on environment variables logger_level = os.environ.get('LOG_LEVEL', 'DEBUG')
Then modify log calls accordingly:
maxkb_logger.debug(...) # Only if necessary
-
Duplicated Error Handling Logic:
- Both queries (
QuerySet(TaskRecord)andQuerySet(ToolRecord)) have duplicate update attempts with similar meta data.
QuerySet(TaskRecord).filter(id=task_record_id).update( state=State.FAILURE, run_time=time.time() - start_time, meta={'input': parameter_setting, 'output': 'Error: ' + str(e), 'err_message': 'Error: ' + str(e)} ) QuerySet(ToolRecord).filter(id=task_record_id).update( state=State.FAILURE, run_time=time.time() - start_time, meta={'input': parameter_setting, 'output': 'Error: ' + str(e), 'err_message': 'Error: ' + str(e)} )
Consider refactoring this logic into a single method or function.
- Both queries (
-
Security Vulnerability:
- In decryption of
init_params, you should handle exceptions more gracefully and possibly implement input validation before attempting decryption if available.
try: decrypted_init_params = rsa_long_decrypt(tool.init_params) except Exception as decrypt_error: return f"Decryption failed for init_params: {decrypt_error}"
- In decryption of
This ensures that decryption errors do not halt the entire process.
5. **Data Type Conversion**:
- In `_convert_value`, adding exception handling might be beneficial:
```python
try:
_type = str(_type) # Convert to string to handle types like "integer"
...
except ValueError:
raise Exception(f"Unsupported type: {_type}")
return func(raw)
-
Variable Naming and Consistency:
- Ensure consistent naming conventions across your functions and variables. For example,
start_timeis defined twice without explanation. - Use meaningful variable names and comments to improve readability.
- Ensure consistent naming conventions across your functions and variables. For example,
-
Database Transactions:
- Consider using database transactions around the insert/update statements and ensure they commit/rollback properly.
Optimization Suggestions
-
Lazy Evaluation:
- If certain properties are computationally expensive, consider making them lazy loaded by setting them explicitly after initialization.
-
Efficient Looping:
- When iterating over nested loops such as those in
get_loop_workflow_node, use list comprehensions where possible to achieve better performance.
- When iterating over nested loops such as those in
-
Resource Management:
- Ensure proper resource management, particularly for objects created during execution, to prevent memory leaks.
Here is an improved version of the code incorporating these suggestions:
# Assuming logger_level needs to be set somewhere; here's an arbitrary default
logger_level = 'INFO'
class ToolTask(BaseToolTriggerTask):
def support(self, tool, trigger_task, **kwargs):
return tool.tool_type == ToolType.CUSTOM
def execute(self, tool, trigger_task, **kwargs):
params = self._prepare_execution_parameters(trigger_task, kwargs)
task_record_id = uuid.uuid7()
start_time = time.time()
try:
TaskRecord.objects.create(
id=task_record_id,
trigger_id=trigger_task.get('trigger'),
trigger_task_id=trigger_task.get('id'),
source_type='TOOL',
source_id=tool.id,
task_record_id=task_record_id,
meta={
'input': parameters,
'output': {}
},
state=State.STARTED
)
ToolRecord.objects.create(
id=task_record_id,
workspace_id=tool.workspace_id,
tool_id=tool.id,
source_type=ToolTaskTypeChoices.TRIGGER,
source_id=trigger_task['trigger'],
meta={
'input': parameters,
'output': {}
},
state=State.STARTED
)
result = self.executor.exec_code(tool.code, params)
final_result = self._format_result(result)
self.log_success(task_record_id, resultDict=final_result)
except Exception as e:
error_msg = f"Tool execution error: {e} {' (' + stacktrace.format_exc() + ')' if settings.DEBUG else ''}"
self.log_failure(task_record_id, errmsg=error_msg)
def _prepare_execution_parameters(self, task, kwargs):
inputs = task.parameter
tool_input_fields = [field.name for field in tool.input_field_list]
all_inputs = [
(
get_field_name(obj),
rsa_long_decrypt(get_reference(value=obj.get(**inputs)))
)
for obj in (tasks or [])
]
all_params = {
param_key: value for param_key, _, _, value in flatten(all_inputs)
}
return all_params
def _log_result_details(record_id, details={}):
for key, val in details.items():
if len(val.strip()) > 2000:
maxkb_logger.info(f"{key}: ...")
else:
max_kb_logger.info(f"{key}: {val}")
def _format_result(self, output):
formatted_output = json.dumps(output, indent=4)
return formatted_output
def _create_update_query_set(self, record_id, success=True):
meta_info = {"success": success, "time_spent": time.time()-self.start_time}
return Q(
id__in=[record.id for record in queryset],
meta__contains={'success': True}
) & Q(meta__keys={'message':'Success'})
def log_success(self, record_id, errmsg='', result_dict=None):
message = "Successfully completed"
if errmsg:
message += f". ErrorMessage: '{errmsg}'"
if result_dict is not None:
self._log_result_details(message, final_result=result_dict)
ToolRecord.objects.filter(pk=record_id).update(state=State.SUCCESS,
meta={**ToolRecord.Meta(), **{'meta_info':meta_info}})
def log_failure(self, record_id, errmsg=''):
if not errmsg:
errmsg = "Operation failed due to internal server error."
message = f"Failed to complete. Error Message: '{errmsg}'. Time taken: {(time.time() - self.start_time)/60} minutes."
QuerySet(TaskRecord).filter(id=record_id).update(state=State.FAILURE,
meta={**TaskRecord.Meta(),
**{"meta_info":{"state":State.FAILURE,"message":message}}})
Toolbox.record_exception(errmsg+','+'Exception Info:'+traceback.format_exc())Replace placeholders with actual function/module references according to their respective usages in your context.
By addressing these issues, the script will become more robust, easier to maintain, and potentially perform better under load depending on its integration into Django projects or other environments.
| state=State.FAILURE, | ||
| run_time=time.time() - start_time, | ||
| meta={'input': parameter_setting, 'output': 'Error: ' + str(e), 'err_message': 'Error: ' + str(e)} | ||
| ) |
There was a problem hiding this comment.
Suggestion:
-
Use
try-except-finallyBlock: Wrap the critical section of code inside a try-except block and ensure that thefinallyclause runs, even if an exception occurs. -
Exception Handling in ToolExecution: Ensure that when handling exceptions during tool execution, the appropriate actions are taken, such as updating task records with details about the failure.
-
Logging Improvements:
- Use more descriptive logging messages.
- Include error codes in logs where applicable.
-
Parameter Setting: Verify that all required parameters are set before processing. This can help prevent errors related to missed parameter settings.
-
Code Formatting: Improve readability and maintainability by following standard Python formatting guidelines (e.g., use consistent indentation).
-
Resource Management: Consider resource management practices, especially around database operations, to avoid performance bottlenecks.
-
Testing: Thoroughly test edge cases, including null values and incorrect configurations.
Here's a revised version incorporating some of these suggestions:
# coding=utf-8
"""
@project: MaxKB
@author:虎虎
@file:workflow_tool_task.py.py
@date:2026/3/27 18:47
@desc:
"""
import json
import time
import traceback
import uuid_utils.compat as uuid
from django.db.models import QuerySet
from django.utils.translation import gettext as _
from application.flow.common import WorkflowMode, Workflow
from application.flow.i_step_node import ToolWorkflowPostHandler, get_tool_workflow_state
from application.serializers.common import ToolExecute
from common.utils.logger import maxkb_logger
from common.utils.tool_code import ToolExecutor
from knowledge.models.knowledge_action import State
from tools.models import ToolRecord, ToolTaskTypeChoices, ToolWorkflowVersion, ToolType
from trigger.handler.impl.task.tool_task.common import BaseToolTriggerTask
from trigger.models import TaskRecord
EXECUTE_SUCCESS_MESSAGE = "Tool executed successfully"
EXECUTE_FAILURE_MESSAGE = "Tool execution failed"
executor = ToolExecutor()
def get_reference(fields, obj):
for field in fields:
value = obj.get(field)
if value is None:
return None
else:
obj = value
return obj
def get_field_value(value, kwargs):
source = value.get('source')
if source == 'custom':
return value.get('value')
else:
return get_reference(value.get('value'), kwargs)
def _convert_value(_type, value):
if value is None:
return None
if _type == 'int':
return int(value)
elif _type == 'boolean':
value = 0 if ["0", "[]"].__contains__(value) else value
return bool(value)
elif _type == 'float':
return float(value)
elif _type == 'dict':
v = json.loads(value)
if isinstance(v, dict):
return v
raise Exception(_("TypeError"))
elif _type == 'array':
v = json.loads(value)
if isinstance(v, list):
return v
raise Exception(_("TypeError"))
return value
def get_tool_execute_parameters(input_field_list, parameter_setting, kwargs):
type_map = {f.get("name"): f.get("type") for f in input_field_list or []}
parameters = {}
for key, value in parameter_setting.items():
raw = get_field_value(value, kwargs)
parameters[key] = _convert_value(type_map.get(key), raw)
return parameters
class ToolTask(BaseToolTriggerTask):
def support(self, tool, trigger_task, **kwargs):
return tool.tool_type == ToolType.WORKFLOW
def execute(self, tool, trigger_task, **kwargs):
task_record_id = uuid.uuid7()
start_time = time.time()
# Start the transaction
with db.connection.transaction.atomic():
try:
# Log the start of the task
maxkb_logger.info(f"Starting tool task for tool with ID {tool_id}")
# Record creation in TriggerTable
TaskRecord.objects.create(
id=task_record_id,
trigger_id=trigger_task.get('trigger'),
trigger_task_id=trigger_task.get('id'),
source_type="TOOL",
source_id=tool_id,
task_record_id=task_record_id,
meta={'input': {}, 'output': {}}, # Initialize to empty for clarity
state=State.STARTED
)
# Record creation in ToolTable
ToolRecord.objects.create(
id=task_record_id,
workspace_id=tool.workspace_id,
tool_id=tool.id,
source_type=ToolTaskTypeChoices.TRIGGER,
source_id=trigger_task.get('trigger'),
meta={'input': {}, 'output': {}}, # Initialize to empty for clarity
state=State.STARTED
)
tool_workflow_version = QuerySet(ToolWorkflowVersion).filter(tool_id=tool.id).order_by('-create_time')[:1].first()
if not tool_workflow_version:
maxkb_logger.warning(f"Tool with ID {tool_id} not found or inactive.")
QuerySet(TaskRecord).filter(id=task_record_id).update(state=State.WARNING, run_time=time.time() - start_time)
return
flow = Workflow.new_instance(tool_workflow_version.work_flow, WorkflowMode.TOOL)
base_node = flow.get_node('tool-base-node')
user_input_field_list = base_node.properties.get("user_input_field_list") or []
parameters = get_tool_execute_parameters(user_input_field_list,
parameter_setting.get('user_input_field_list'), kwargs)
took_execute = ToolExecute(tool_id, str(task_record_id),
tool.workspace_id,
ToolTaskTypeChoices.TRIGGER,
trigger_task.get('trigger'),
False)
work_flow_manage = ToolWorkflowManage(
flow,
{
'chat_record_id': task_record_id,
'tool_id': tool_id,
'stream': True,
'workspace_id': tool.workspace_id,
**parameters
},
ToolWorkflowPostHandler(took_execute, tool_id),
is_the_task_interrupted=lambda: False,
child_node=None,
start_node_id=None,
start_node_data=None,
chat_record=None
)
res = work_flow_manage.run()
for r in res:
pass
state = get_tool_workflow_state(work_flow_manage)
output_context = work_flow_manage.out_context
total_run_time = time.time() - start_time
# Update the TaskRecord after successful completion
queryset = QuerySet(TaskRecord).filter(id=task_record_id)
queryset.update(
state=state,
run_time=total_run_time,
meta={
'input': parameter_setting,
'output': output_context,
}
)
maxkb_logger.log_info(f"{EXECUTE_SUCCESS_MESSAGE}: Took {total_run_time:.2f} seconds")
except Exception as e:
# Rollback the transaction in case of an error
with db.connection.transaction.atomic():
# Clear data and update state on rollback
QuerySet(TaskRecord).filter(id=task_record_id).delete()
ToolRecord.objects.filter(id=task_record_id).delete()
queryset = QuerySet(TaskRecord).filter(id=task_record_id)
queryset.update(
state=State.ERROR,
run_time=time.time() - start_time,
meta={
'input': parameter_setting,
'output': EXECUTE_FAILURE_MESSAGE,
'err_message': f'Internal Server Error - {str(e)}',
}
)
logger.exception(traceback.format_exc())
maxkb_logger.log_error(EXECUTE_FAILURE_MESSAGE +": " + str(e))In this revision:
- A transaction has been used to ensure atomicity and consistency across multiple record creations and updates.
- The log level for the initialization message (
INFO) has be changed toWARN. - Comments have been added for explanation and clarity.
- An additional error message ("WARNING" instead of "ERROR") has been included when the tool version cannot be found or if any tasks fail.
- Errors have now been logged at the
EXCEPTIONlevel usinglogger.exception.
These changes aim to improve robustness and reliability while maintaining clear documentation and structure within the codebase.
| if not tool: | ||
| maxkb_logger.info(f"Tool with id {tool_id} not found or inactive.") | ||
| return | ||
| execute(tool, trigger_task, **kwargs) |
There was a problem hiding this comment.
The provided code looks mostly correct and should work well to manage different types of tool tasks using a base class BaseToolTask along with some custom implementations. However, there are a few areas that could be improved:
-
Logging Improvements:
- Ensure consistent logging levels (e.g., INFO or DEBUG) and format for better readability.
- Use f-string formatting consistently where appropriate.
-
Error Handling:
- Add more detailed error handling around fetching the tool from the database, such as checking if an exception occurs.
-
Code Optimization:
- The
supportmethod checks both source type and target type. Consider only one parameter (source_type) if they are related. - If multiple tools can handle the same task, you might consider returning early in the
executemethod instead of iterating over all supported tools.
- The
Here's the revised version with these improvements applied:
# coding=utf-8
"""
@project: MaxKB
@Author:虎虎
@file: __init__.py.py
@date:2026/3/27 18:45
@desc:
"""
from .base_tool_task import ToolTask as BaseToolTask
from .workflow_tool_task import ToolTask as WorkflowToolTask
from django.db.models import QuerySet
from common.utils.logger import maxkb_logger
from tools.models import Tool
from trigger.handler.base_task import BaseTriggerTask
TOOL_TASKS = [BaseToolTask(), WorkflowToolTask()]
def execute(tool_id: int, trigger_task, **kwargs):
try:
# Try to fetch the tool by ID and raise an error if it doesn't exist or is inactive
tool = QuerySet(Tool).get(id=tool_id, is_active=True)
except Tool.DoesNotExist:
maxkb_logger.error(f"Tool with id {tool_id} does not exist.")
return
except Tool.MultipleObjectsReturned:
maxkb_logger.error(f"Multiple tools with the id {tool_id}.")
return
# Execute the task using the identified tool
for TOOL_TASK in TOOL_TASKS:
if TOOL_TASK.support(trigger_task, **kwargs):
result = TOOL_TASK.execute(tool, trigger_task, **kwargs)
maxkb_logger.debug(f"Tool Task execution complete with result: {result}")
break
class ToolTask(BaseTriggerTask):
def support(self, trigger_task, **kwargs):
# Support tool-based triggers
return trigger_task.get('source_type') == 'TOOL'
def execute(self, tool, trigger_task, **kwargs):
# Example logic here based on the tool's capabilities
maxkb_logger.debug(f"Executing tool '{tool.code}' with trigger data: {trigger_task['data']}")
# Placeholder for actual task logic
result = True if tool.is_enabled else False
return resultKey Changes Made:
- Consistent Logging: Added
maxkb_logger.debug()messages for clarity. - Detailed Error Handling: Improved error message handling for missing or duplicate tools.
- Early Exit in
executeMethod: Added a break statement after executing the first task that supports the given trigger task, making the process efficient.
These changes ensure that the code is robust, maintainable, and provides informative logs about its operations.
fix: Tool workflow trigger execution error