-
Notifications
You must be signed in to change notification settings - Fork 650
fix: prevent A2A on_message trigger infinite loop #666
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,9 +32,13 @@ | |
|
|
||
| TICK_INTERVAL = 15 # seconds | ||
| DEDUP_WINDOW = 30 # seconds — same agent won't be invoked twice within this window | ||
| MAX_AGENT_CHAIN_DEPTH = 5 # A→B→A→B→A max depth before stopping | ||
| MIN_POLL_INTERVAL_MINUTES = 5 # minimum poll interval to prevent abuse | ||
|
|
||
| # Safety: per-agent on_message fire rate limiter | ||
| _ON_MSG_RATE_WINDOW = 3600 # 1 hour window | ||
| _ON_MSG_RATE_LIMIT = 30 # max on_message fires per agent per hour | ||
| _on_msg_fire_log: dict[uuid.UUID, list[datetime]] = {} # agent_id -> list of fire timestamps | ||
|
Comment on lines
+37
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Store this safety state in shared durable storage rather than process memory. Every backend worker starts its own trigger daemon, Helm explicitly supports multiple backend replicas, and this dictionary is empty after every restart or rolling deployment, so an existing unlimited looping trigger receives a fresh allowance on each process lifecycle and the advertised per-agent hourly limit is not reliably enforced across the deployment. Useful? React with 👍 / 👎. |
||
|
|
||
| _last_invoke: dict[uuid.UUID, datetime] = {} | ||
|
|
||
| _A2A_WAKE_CHAIN: dict[str, int] = {} | ||
|
|
@@ -47,6 +51,15 @@ def _cleanup_stale_invoke_cache(): | |
| stale = [k for k, v in _last_invoke.items() if (now - v).total_seconds() > DEDUP_WINDOW * 2] | ||
| for k in stale: | ||
| del _last_invoke[k] | ||
| # Clean up old on_message rate limiter entries | ||
| cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW) | ||
| stale_agents = [] | ||
| for aid, timestamps in _on_msg_fire_log.items(): | ||
| _on_msg_fire_log[aid] = [t for t in timestamps if t > cutoff] | ||
| if not _on_msg_fire_log[aid]: | ||
| stale_agents.append(aid) | ||
| for aid in stale_agents: | ||
| del _on_msg_fire_log[aid] | ||
|
|
||
|
|
||
| async def _should_skip_non_workday(trigger: AgentTrigger, local_now: datetime) -> bool: | ||
|
|
@@ -118,6 +131,28 @@ async def _tick(): | |
| if not handled: | ||
| handled = await _handle_okr_collection_trigger(trigger, now) | ||
| if not handled: | ||
| # Fix 3: Rate limit on_message triggers per agent | ||
| if trigger.type == "on_message": | ||
| agent_fires = _on_msg_fire_log.get(trigger.agent_id, []) | ||
| cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW) | ||
| recent = [t for t in agent_fires if t > cutoff] | ||
| if len(recent) >= _ON_MSG_RATE_LIMIT: | ||
| logger.warning( | ||
| f"[A2A Safety] Agent {trigger.agent_id} hit " | ||
| f"on_message rate limit ({_ON_MSG_RATE_LIMIT}/hr). " | ||
| f"Auto-disabling trigger '{trigger.name}'." | ||
| ) | ||
| async with async_session() as db: | ||
| result = await db.execute( | ||
| select(AgentTrigger).where(AgentTrigger.id == trigger.id) | ||
| ) | ||
| t_obj = result.scalar_one_or_none() | ||
| if t_obj: | ||
| t_obj.is_enabled = False | ||
| await db.commit() | ||
| continue | ||
| recent.append(now) | ||
| _on_msg_fire_log[trigger.agent_id] = recent | ||
| await enqueue_due_trigger(trigger, now) | ||
|
Comment on lines
+154
to
156
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Record a rate-limit hit only when Useful? React with 👍 / 👎. |
||
| except Exception as e: | ||
| logger.warning(f"Error evaluating trigger {trigger.name}: {e}") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move these defaults into the existing-trigger path as well. When an agent cancels and recreates a previously unlimited
on_messagetrigger (or converts any disabled trigger to this type), the function returns at line 8389 before reaching this block, preservingmax_fires=Noneandexpires_at=None; consequently the triggers most likely to predate this fix remain permanently uncapped.Useful? React with 👍 / 👎.