diff --git a/backend/app/services/agent_tools.py b/backend/app/services/agent_tools.py index 4fe4cfe49..91060e2aa 100644 --- a/backend/app/services/agent_tools.py +++ b/backend/app/services/agent_tools.py @@ -8396,6 +8396,12 @@ async def _handle_set_trigger( reason=reason, focus_ref=focus_ref, ) + # Fix 4: Safety cap for on_message triggers — + # prevent infinite loops if agent creates broad watchers. + if ttype == "on_message": + trigger.max_fires = trigger.max_fires or 100 + if not trigger.expires_at: + trigger.expires_at = datetime.now(timezone.utc) + timedelta(days=7) db.add(trigger) await db.commit() diff --git a/backend/app/services/trigger_daemon.py b/backend/app/services/trigger_daemon.py index 9877b1182..278bce6ad 100644 --- a/backend/app/services/trigger_daemon.py +++ b/backend/app/services/trigger_daemon.py @@ -32,9 +32,13 @@ TICK_INTERVAL = 15 # seconds DEDUP_WINDOW = 30 # seconds — same agent won't be invoked twice within this window -MAX_AGENT_CHAIN_DEPTH = 5 # A→B→A→B→A max depth before stopping MIN_POLL_INTERVAL_MINUTES = 5 # minimum poll interval to prevent abuse +# Safety: per-agent on_message fire rate limiter +_ON_MSG_RATE_WINDOW = 3600 # 1 hour window +_ON_MSG_RATE_LIMIT = 30 # max on_message fires per agent per hour +_on_msg_fire_log: dict[uuid.UUID, list[datetime]] = {} # agent_id -> list of fire timestamps + _last_invoke: dict[uuid.UUID, datetime] = {} _A2A_WAKE_CHAIN: dict[str, int] = {} @@ -47,6 +51,15 @@ def _cleanup_stale_invoke_cache(): stale = [k for k, v in _last_invoke.items() if (now - v).total_seconds() > DEDUP_WINDOW * 2] for k in stale: del _last_invoke[k] + # Clean up old on_message rate limiter entries + cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW) + stale_agents = [] + for aid, timestamps in _on_msg_fire_log.items(): + _on_msg_fire_log[aid] = [t for t in timestamps if t > cutoff] + if not _on_msg_fire_log[aid]: + stale_agents.append(aid) + for aid in stale_agents: + del _on_msg_fire_log[aid] async def _should_skip_non_workday(trigger: AgentTrigger, local_now: datetime) -> bool: @@ -118,6 +131,28 @@ async def _tick(): if not handled: handled = await _handle_okr_collection_trigger(trigger, now) if not handled: + # Fix 3: Rate limit on_message triggers per agent + if trigger.type == "on_message": + agent_fires = _on_msg_fire_log.get(trigger.agent_id, []) + cutoff = now - timedelta(seconds=_ON_MSG_RATE_WINDOW) + recent = [t for t in agent_fires if t > cutoff] + if len(recent) >= _ON_MSG_RATE_LIMIT: + logger.warning( + f"[A2A Safety] Agent {trigger.agent_id} hit " + f"on_message rate limit ({_ON_MSG_RATE_LIMIT}/hr). " + f"Auto-disabling trigger '{trigger.name}'." + ) + async with async_session() as db: + result = await db.execute( + select(AgentTrigger).where(AgentTrigger.id == trigger.id) + ) + t_obj = result.scalar_one_or_none() + if t_obj: + t_obj.is_enabled = False + await db.commit() + continue + recent.append(now) + _on_msg_fire_log[trigger.agent_id] = recent await enqueue_due_trigger(trigger, now) except Exception as e: logger.warning(f"Error evaluating trigger {trigger.name}: {e}") diff --git a/backend/app/services/trigger_runtime/evaluator.py b/backend/app/services/trigger_runtime/evaluator.py index 31a02b7f5..548d1291c 100644 --- a/backend/app/services/trigger_runtime/evaluator.py +++ b/backend/app/services/trigger_runtime/evaluator.py @@ -370,6 +370,12 @@ async def check_new_agent_messages(trigger: AgentTrigger) -> bool: .where( ChatMessage.participant_id == from_participant, ChatMessage.created_at > since, + # Fix 1: Only match real conversational messages, + # not internal tool_call / system records. + ChatMessage.role.in_(["assistant", "user"]), + # Fix 2: Exclude trigger internal "reflection" + # sessions to avoid cross-trigger false matches. + ChatSession.source_channel != "trigger", ) .order_by(ChatMessage.created_at.desc()) .limit(1)