Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

102 changes: 94 additions & 8 deletions src/Aevatar.AI.Core/RoleGAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/// <summary>
/// Role-based AI GAgent. Receives ChatRequestEvent and streams LLM response.
/// </summary>
public class RoleGAgent : AIGAgentBase<RoleGAgentState>, IRoleAgent

Check warning on line 31 in src/Aevatar.AI.Core/RoleGAgent.cs

View workflow job for this annotation

GitHub Actions / projection-provider-e2e

'RoleGAgent' is coupled with '98' different types from '28' different namespaces. Rewrite or refactor the code to decrease its class coupling below '96'. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1506)

Check warning on line 31 in src/Aevatar.AI.Core/RoleGAgent.cs

View workflow job for this annotation

GitHub Actions / host-composition-smoke

'RoleGAgent' is coupled with '98' different types from '28' different namespaces. Rewrite or refactor the code to decrease its class coupling below '96'. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1506)
{
private const string LlmFailureContentPrefix = "[[AEVATAR_LLM_ERROR]]";
private const int MaxTrackedSessions = 128;
Expand Down Expand Up @@ -137,9 +137,10 @@
"[{Role}] Approval continuation FAILED. request={RequestId}",
RoleName, pending.RequestId);

// Still clear pending so we don't get stuck
try { await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); }
catch { /* best effort */ }
await TryPersistApprovalTerminalFailureThenClearPendingAsync(
pending,
"approval_continuation_failed",
ex.Message);

throw; // Re-throw so the SSE endpoint sees the error
}
Expand All @@ -150,7 +151,12 @@
}
else
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[kimi] severity=major, category=bug

The approval-denied else branch calls PersistApprovalTerminalFailureAsync without a try / catch. If this throws after ClearPendingApprovalEvent is already persisted, the actor event fails and retries; on retry State.PendingApproval is already null so the method returns early, permanently losing the terminal failure fact — exactly what the durable readmodel needs.

The same gap exists in two more call sites in this file:

  • HandleToolApprovalTimeout no-remote-handler branch (around line 195)
  • HandleToolApprovalTimeout remote-failed/denied/timed-out branch (around line 249)

Only the exception path at lines 143-150 wraps it in try { … } catch { /* best effort */ }.

Fix: either wrap each call site in best-effort try/catch (matching the pattern at lines 143-150), or move the try/catch inside PersistApprovalTerminalFailureAsync itself so every caller is safe.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 16eccf9. Approval denied, no-remote timeout, and remote denied/failed/timed-out paths now persist the terminal failure fact before clearing PendingApproval, preserving the durable completion source if terminal persistence fails.

{
await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId });
await PersistApprovalTerminalFailureThenClearPendingAsync(
pending,
"approval_denied",
string.IsNullOrWhiteSpace(evt.Reason)
? "Tool approval denied."
: evt.Reason);
}
}

Expand All @@ -176,19 +182,23 @@
{
Logger.LogWarning("[{Role}] No remote approval handler configured. Clearing pending. request={RequestId}",
RoleName, evt.RequestId);
await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId });
await PersistApprovalTerminalFailureThenClearPendingAsync(
pending,
"approval_timeout",
"Tool approval timed out and no remote approval handler is configured.");
return;
}

// Restore metadata (NyxID access token) for the remote handler
var prevMetadata = AgentToolRequestContext.CurrentMetadata;
ToolApprovalResult? result = null;
try
{
AgentToolRequestContext.CurrentMetadata = pending.Metadata.Count > 0
? new Dictionary<string, string>(pending.Metadata, StringComparer.Ordinal)
: null;

var result = await remoteHandler.RequestApprovalAsync(
result = await remoteHandler.RequestApprovalAsync(
new ToolApprovalRequest
{
RequestId = pending.RequestId,
Expand Down Expand Up @@ -225,8 +235,13 @@
AgentToolRequestContext.CurrentMetadata = prevMetadata;
}

// Remote failed/denied/timed out → clear pending
await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId });
// Remote failed/denied/timed out → persist terminal fact before clearing pending.
await PersistApprovalTerminalFailureThenClearPendingAsync(
pending,
ResolveApprovalTerminalReasonCode(result?.Decision),
string.IsNullOrWhiteSpace(result?.Reason)
? "Tool approval timed out or was denied remotely."
: result.Reason);
}

/// <summary>Override in subclasses to provide the NyxID remote approval handler for timeout escalation.</summary>
Expand Down Expand Up @@ -703,6 +718,77 @@
});
}

private async Task PersistApprovalTerminalFailureThenClearPendingAsync(
PendingToolApprovalState pending,
string reasonCode,
string reasonMessage)
{
await PersistApprovalTerminalFailureAsync(pending, reasonCode, reasonMessage);
await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId });
}

private async Task TryPersistApprovalTerminalFailureThenClearPendingAsync(
PendingToolApprovalState pending,
string reasonCode,
string reasonMessage)
{
try
{
await PersistApprovalTerminalFailureAsync(pending, reasonCode, reasonMessage);
}
catch (Exception ex)
{
Logger.LogError(
ex,
"[{Role}] Failed to persist approval terminal failure. request={RequestId} session={SessionId} reasonCode={ReasonCode}",
RoleName,
pending.RequestId,
pending.SessionId,
reasonCode);
return;
}

try
{
await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId });
}
catch (Exception ex)
{
Logger.LogWarning(
ex,
"[{Role}] Failed to clear pending approval after terminal failure was persisted. request={RequestId} session={SessionId} reasonCode={ReasonCode}",
RoleName,
pending.RequestId,
pending.SessionId,
reasonCode);
}
}

private Task PersistApprovalTerminalFailureAsync(
PendingToolApprovalState pending,
string reasonCode,
string reasonMessage)
{
if (string.IsNullOrWhiteSpace(pending.SessionId))
return Task.CompletedTask;

var safeReason = string.IsNullOrWhiteSpace(reasonMessage)
? "Tool approval failed."
: reasonMessage.Trim();
return PersistDomainEventAsync(new RoleChatSessionCompletedEvent
{
SessionId = pending.SessionId,
Content = BuildLlmFailureContent($"{reasonCode}: {safeReason}"),
Prompt = BuildContinuationPrompt(pending, safeReason),
ContentEmitted = false,
});
}

private static string ResolveApprovalTerminalReasonCode(ToolApprovalDecision? decision) =>
decision == ToolApprovalDecision.Denied
? "approval_denied"
: "approval_timeout";

private async Task ReplayCompletedSessionAsync(string sessionId, RoleChatSessionState trackedSession)
{
await PublishAsync(new TextMessageStartEvent
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Aevatar.CQRS.Projection.Core.Abstractions;

/// <summary>
/// Durable materialization scope that is partitioned by an explicit session key.
/// </summary>
public interface IProjectionSessionScopedMaterializationContext : IProjectionMaterializationContext
{
string SessionId { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static IServiceCollection AddProjectionMaterializationRuntimeCore<TContex
request => contextFactory(new ProjectionRuntimeScopeKey(
request.RootActorId,
request.ProjectionKind,
ProjectionRuntimeMode.DurableMaterialization)),
ProjectionRuntimeMode.DurableMaterialization,
request.SessionId)),
(_, context) => leaseFactory(context),
sp.GetService<Aevatar.Foundation.Abstractions.TypeSystem.IAgentTypeVerifier>(),
sp.GetService<IStreamPubSubMaintenance>(),
Expand All @@ -51,7 +52,10 @@ public static IServiceCollection AddProjectionMaterializationRuntimeCore<TContex
lease => new ProjectionRuntimeScopeKey(
lease.Context.RootActorId,
lease.Context.ProjectionKind,
ProjectionRuntimeMode.DurableMaterialization),
ProjectionRuntimeMode.DurableMaterialization,
lease.Context is IProjectionSessionScopedMaterializationContext scopedContext
? scopedContext.SessionId
: string.Empty),
sp.GetService<Aevatar.Foundation.Abstractions.TypeSystem.IAgentTypeVerifier>()));
return services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public sealed record GAgentDraftRunAcceptedReceipt(
string ActorId,
string ActorTypeName,
string CommandId,
string CorrelationId);
string CorrelationId,
string SessionId = "");

public sealed record GAgentApprovalCommand(
string ActorId,
Expand Down Expand Up @@ -90,4 +91,5 @@ public enum GAgentApprovalCompletionStatus
public sealed record GAgentApprovalAcceptedReceipt(
string ActorId,
string CommandId,
string CorrelationId);
string CorrelationId,
string SessionId);
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
namespace Aevatar.GAgentService.Abstractions.ScopeGAgents;

public enum GAgentRunTerminalStatus
{
Unknown = 0,
TextMessageCompleted = 1,
RunFinished = 2,
Failed = 3,
}

public enum GAgentRunTerminalInteractionKind
{
Unknown = 0,
DraftRun = 1,
Approval = 2,
}

public sealed record GAgentRunTerminalSnapshot(
string ActorId,
string SessionId,
string CorrelationId,
GAgentRunTerminalInteractionKind InteractionKind,
GAgentRunTerminalStatus Status,
string ReasonCode,
string ReasonMessage,
long StateVersion,
string LastEventId,
DateTimeOffset ObservedAt);

public interface IGAgentRunTerminalQueryPort
{
Task<GAgentRunTerminalSnapshot?> GetByCorrelationIdAsync(
string actorId,
string correlationId,
CancellationToken ct = default);

Task<GAgentRunTerminalSnapshot?> GetBySessionIdAsync(
string actorId,
string sessionId,
CancellationToken ct = default);
}

public interface IGAgentRunTerminalProjectionLease
{
string ActorId { get; }

string CorrelationId { get; }

GAgentRunTerminalInteractionKind InteractionKind { get; }
}

public interface IGAgentRunTerminalProjectionPort
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Consensus: 2 models] severity=major, category=arch

IGAgentRunTerminalProjectionPort only exposes EnsureProjectionAsync (correlation-scoped) with no corresponding ReleaseProjectionAsync / lease return. Each draft-run / approval command currently activates a new correlation-keyed durable materialization scope (GAgentApprovalInteraction.cs:181, GAgentDraftRunInteraction.cs:236) and never releases it. Since correlation is per-command, these scopes accumulate until Orleans deactivation — defeating the point of the new IProjectionSessionScopedMaterializationContext release-aware design described in the PR summary.

Fix: return a lease (analogous to IGAgentDraftRunProjectionLease) from EnsureProjectionAsync, and have the interaction cleanup path (GAgentApprovalCommandTarget.ReleaseAsync / draft-run equivalent) release it. Alternatively, make this an actor-scoped (not correlation-scoped) materialization.

Per-model verbatim
  • codex (GAgentRunTerminalProjectionPort.cs:33): This activates a durable materialization scope keyed by correlation id, then discards the lease and exposes no release path. Because draft-run/approval create a new correlation per command, these projection actors/subscriptions can accumulate without cleanup; return a terminal projection lease from the port and release it from the interaction cleanup path, or make this an actor-scoped materialization instead of a per-correlation scope.
  • gemini (GAgentRunTerminalModels.cs:43): The interface is missing a corresponding ReleaseProjectionAsync or ReleaseIfIdleAsync method. Given the PR summary explicitly mentions making durable materialization 'release session-aware', the port should expose this capability to allow the interaction service to clean up command-scoped materializers.
  • gemini (GAgentApprovalInteraction.cs:157): The terminal projection is ensured here but never released. Since these materialization actors are now partitioned by CorrelationId (one per command), they will leak until Orleans deactivates them. The GAgentApprovalCommandTarget should hold a reference to the IGAgentRunTerminalProjectionPort and call a (yet to be exposed) release method in its ReleaseAsync implementation, similar to how the live _projectionPort is handled.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 16eccf9. IGAgentRunTerminalProjectionPort now returns an IGAgentRunTerminalProjectionLease and exposes ReleaseProjectionAsync. Draft-run and approval targets retain the terminal lease and release it during cleanup, including activation-failure cleanup after terminal materialization succeeds.

{
Task<IGAgentRunTerminalProjectionLease?> EnsureProjectionAsync(
string actorId,
string correlationId,
GAgentRunTerminalInteractionKind interactionKind,
CancellationToken ct = default);

Task ReleaseProjectionAsync(
IGAgentRunTerminalProjectionLease lease,
CancellationToken ct = default);
}
Loading
Loading