diff --git a/docs/history/2026-05/2026-05-14-issue-370-gagent-durable-terminal-completion-design.md b/docs/history/2026-05/2026-05-14-issue-370-gagent-durable-terminal-completion-design.md new file mode 100644 index 000000000..5bb1171db --- /dev/null +++ b/docs/history/2026-05/2026-05-14-issue-370-gagent-durable-terminal-completion-design.md @@ -0,0 +1,499 @@ +--- +title: "Issue 370:GAgent durable terminal completion 设计" +status: design +owner: liyingpei +--- + +# Issue 370:GAgent durable terminal completion 设计 + +> 本文面向 [Issue 370](https://github.com/aevatarAI/aevatar/issues/370):`GAgentDraftRunInteraction` 与 `GAgentApprovalInteraction` 的 durable completion resolver 仍是占位实现。当前权威架构口径仍以 [ADR-0015:AGUI / SSE Projection Session Pipeline](../../adr/0015-agui-sse-projection-session-pipeline.md) 为准;本文只把 GAgent draft-run / approval 这条缺口落成可实现设计。 + +## 1. 问题判断 + +最新 `dev` 上仍有两个明确占位点: + +- `GAgentDraftRunDurableCompletionResolver.ResolveAsync(...)` 直接返回 `CommandDurableCompletionObservation.Incomplete` +- `GAgentApprovalDurableCompletionResolver.ResolveAsync(...)` 直接返回 `CommandDurableCompletionObservation.Incomplete` + +这意味着 live stream happy path 可以结束,但以下场景没有 durable terminal source: + +1. SSE/AGUI 连接断开后重连。 +2. live sink 因释放、网络或进程切换错过 terminal event。 +3. approval continuation 已经完成,但前端只看到 `Unknown` / `Incomplete`。 + +这个问题不是“多补一次轮询”能解决。终态必须来自 committed fact 经 Projection Pipeline 物化出的 readmodel,而不是 session hub、runtime lease、actor 内部状态侧读或 query-time replay。 + +## 2. 目标 + +本设计目标是给 draft-run 与 approval 共用一条 durable terminal completion 主链: + +1. Actor 串行处理 `ChatRequestEvent` / `ToolApprovalDecisionEvent`。 +2. Actor 持久化 committed terminal domain event。 +3. Projection Pipeline 将终态物化为 actor-scoped current-state readmodel。 +4. Durable completion resolver 只读取窄 query port。 +5. Interaction service 在 missed-live-event 场景下能恢复 `Completed` / `Failed`。 + +非目标: + +- 不把 `IGAgentDraftRunProjectionPort` 扩成查询端口。 +- 不在 resolver 中读取 event store、actor state、runtime lease 或 session event hub。 +- 不在 Host 中补 fallback 编排。 +- 不引入 `Metadata` / string bag 承载 terminal status。 +- 不改变 accepted ACK 语义;同步 accepted 仍只代表 command 已被接收用于 dispatch。 + +## 3. 总体链路 + +```mermaid +%%{init: {"maxTextSize": 100000, "flowchart": {"useMaxWidth": false, "nodeSpacing": 10, "rankSpacing": 50}, "themeVariables": {"fontSize": "10px"}}}%% +flowchart LR + H["Host endpoint"] --> I["ICommandInteractionService"] + I --> A["RoleGAgent / target actor"] + A --> E["Committed terminal event"] + E --> P["Projection Pipeline"] + P --> R["GAgentRunTerminalSnapshot readmodel"] + I --> L["Live AGUI sink"] + I --> D["Durable completion resolver"] + D --> Q["IGAgentRunTerminalQueryPort"] + Q --> R +``` + +关键口径: + +- live AGUI sink 只负责实时输出,不是事实源。 +- durable resolver 只补“live 没看到终态”这一段,不重新执行业务。 +- readmodel 是 actor-scoped current-state replica,版本来自权威 actor committed version 或等价 committed envelope 水位。 + +## 4. 与 Workflow / Scripting 的共同模式 + +本设计确实参考了当前已有的两个 durable completion 落点: + +| 能力 | Resolver | Receipt 稳定键 | Durable source | Query/read port | Completion 映射 | +|---|---|---|---|---|---| +| Workflow run | `WorkflowRunDurableCompletionResolver` | `ActorId` | `WorkflowActorSnapshot` current-state readmodel | `IWorkflowExecutionCurrentStateQueryPort` | `WorkflowRunCompletionStatus` -> `WorkflowProjectionCompletionStatus` | +| Script evolution | `ScriptEvolutionDurableCompletionResolver` | `ProposalId` | `ScriptEvolutionReadModel` terminal promotion decision | `IScriptEvolutionDecisionReadPort` | `ScriptPromotionDecision` -> `ScriptEvolutionInteractionCompletion` | +| GAgent draft-run / approval | 本设计新增 resolver 实现 | `ActorId + CorrelationId/SessionId` | `GAgentRunTerminalReadModel` current-state readmodel | `IGAgentRunTerminalQueryPort` | `GAgentRunTerminalStatus` -> draft-run / approval completion | + +三条链路的共同点是: + +1. `ICommandDurableCompletionResolver` 是 CQRS Core 已经抽好的公共扩展点。 +2. receipt 必须携带足够稳定的定位键,resolver 不临时发明查询上下文。 +3. resolver 只读 capability 自己的 read/query port。 +4. query port 后面只能是 readmodel / durable decision document,不是 actor state、event replay、runtime lease 或 live session hub。 +5. resolver 的职责只是把 durable terminal fact 映射成 interaction completion。 +6. 缺失 terminal fact 时返回 `Incomplete` 或按 capability 既有语义抛出 timeout,不在 resolver 内重试业务。 + +这说明 #370 要补的是缺失的 GAgent terminal readmodel/query port,而不是一条新并行机制。 + +## 5. 是否需要进一步抽象 + +### 5.1 暂不抽象统一 terminal snapshot + +不建议现在新增类似 `ICommandTerminalSnapshot`、`ITerminalCompletionQueryPort` 或 `TerminalStatus` 的跨能力统一模型。 + +原因: + +1. Workflow 的终态是 workflow run 生命周期,包含 `Completed/TimedOut/Failed/Stopped/NotFound/Disabled` 等语义。 +2. Scripting 的终态是 promotion decision,核心结果是 `Accepted`、`Status`、`DefinitionActorId`、`ValidationReport` 等领域决策。 +3. GAgent 的终态是 AGUI/chat interaction 终态,核心是 `TextMessageCompleted/RunFinished/Failed`,且 session id 与 command id 的关系更强。 +4. 这些终态名字相似,但业务完成含义不同;强行共用一个 terminal snapshot 会把领域语义压扁成泛化 status bag。 + +仓库当前已经在 CQRS Core 抽象了正确的公共层:`ICommandDurableCompletionResolver` 与 `CommandDurableCompletionObservation`。再往下的 durable source、query port、snapshot 与 mapping 应保持 capability-owned。 + +### 5.2 可以抽象小型模式,不抽象领域语义 + +允许的后续小抽象: + +1. 一个 `DurableCompletionResolverSupport` 静态 helper,用于统一“非取消异常返回 Incomplete、取消异常继续抛出”的样板。 +2. 一个测试 fixture pattern,用来验证 resolver 不吞取消、不从 missing snapshot 伪造 completion。 +3. 文档/guard 规则:新增 interaction durable resolver 不得固定返回 `Incomplete`,必须说明 durable terminal source。 + +不建议的抽象: + +1. 泛型 terminal readmodel 基类。 +2. 泛型 `Status` enum。 +3. 在 CQRS Core 中引入“按 actorId 查询 terminal snapshot”的接口。 +4. 把 `ActorId/CorrelationId/SessionId/ProposalId` 等定位键揉成通用 key-value bag。 + +一句话:**抽象交互骨架,不抽象领域终态。** #370 应该把 GAgent 补齐到 Workflow/Scripting 已经遵守的模式,而不是把三者拉进一个过早统一的 terminal 框架。 + +## 6. 工程 review 结论 + +这份设计可以进入开发,但需要按下面三条约束开发;否则会实现到错误事实源或卡在 projection runtime 装配上。 + +### 6.1 Durable source 必须是 committed event + +`RoleGAgent` 当前执行顺序是: + +1. live 发布 `TextMessageEndEvent` +2. best-effort 持久化 `RoleChatSessionCompletedEvent` + +因此 durable terminal projector 不能消费 live `TextMessageEndEvent` 作为事实源。`TextMessageEndEvent` 可以作为实时输出,但不满足 durable recovery 的事实要求。 + +可开发口径固定为: + +- durable terminal readmodel 只从 `CommittedStateEventPublished.StateEvent.EventData` 中的 `RoleChatSessionCompletedEvent` 或后续新增的强类型 committed terminal event 物化。 +- 目标实现应优先使用 typed terminal status / reason 字段或专用 committed terminal event;`Content` 中的 legacy failure marker 只能作为过渡兼容口径,不得成为长期事实协议。 +- `TextMessageEndEvent`、synthetic `RunFinished`、live `RunError` 只用于 live completion policy,不作为 durable resolver 的事实源。 +- 如果 `RoleChatSessionCompletedEvent` 因 best-effort persist 失败而不存在,resolver 必须诚实返回 `Incomplete`;不能回退读取 live stream 或 actor state。 + +### 6.2 Lookup key 使用 correlation/session,不把 commandId 当事实键 + +当前 `RoleChatSessionCompletedEvent` 没有 `command_id` 字段。Committed envelope 可以从 inbound command envelope 继承 `EnvelopePropagation.CorrelationId`,但不能可靠还原 CQRS `CommandId`。 + +因此首版实现不要设计 `GetByCommandIdAsync(...)`。resolver 查找顺序固定为: + +1. `actorId + correlationId` +2. `actorId + sessionId` + +`CommandId` 仍然保留在 receipt 中作为 accepted ACK 与追踪 id,但不作为 durable terminal readmodel 的主查询键。后续如果确实需要按 `commandId` 查询,必须先把 `command_id` 作为 typed proto field 加到 `ChatRequestEvent` 与 `RoleChatSessionCompletedEvent` 或专用 terminal event 中,禁止把 `commandId` 塞进 `Metadata` 或假设它等于 `CorrelationId`。 + +### 6.3 Durable materialization 必须显式 activation + +现有 `GAgentDraftRunProjectionPort` 是 `SessionObservation` port,只负责 live session sink;新增 terminal readmodel 需要 `DurableMaterialization` scope。 + +开发时必须新增一个 activation port,例如: + +- `IGAgentRunTerminalProjectionPort` +- `GAgentRunTerminalProjectionPort` +- `GAgentRunTerminalProjectionContext` +- `GAgentRunTerminalRuntimeLease` + +draft-run / approval binder 在 dispatch 前同时做两件事: + +1. activate durable terminal materialization:`EnsureActorProjectionAsync(actorId)` +2. attach live session observation:`EnsureAndAttachAsync(...)` + +这不是 query-time priming,因为它发生在 command dispatch 前的 write-side binding 阶段;resolver/query path 仍然只读已物化 readmodel。 + +## 7. 新增契约 + +### 7.1 Abstractions:terminal snapshot + +新增位置建议: + +- `src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentRunTerminalModels.cs` +- `src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/IGAgentRunTerminalQueryPort.cs` + +核心模型: + +```csharp +public enum GAgentRunTerminalStatus +{ + Unknown = 0, + TextMessageCompleted = 1, + RunFinished = 2, + Failed = 3, +} + +public enum GAgentRunTerminalInteractionKind +{ + Unknown = 0, + DraftRun = 1, + Approval = 2, +} + +public sealed record GAgentRunTerminalSnapshot( + string ActorId, + string SessionId, + string CorrelationId, + GAgentRunTerminalInteractionKind InteractionKind, + GAgentRunTerminalStatus Status, + string ReasonCode, + string ReasonMessage, + long StateVersion, + string LastEventId, + DateTimeOffset ObservedAt); + +public interface IGAgentRunTerminalQueryPort +{ + Task GetByCorrelationIdAsync( + string actorId, + string correlationId, + CancellationToken ct = default); + + Task GetBySessionIdAsync( + string actorId, + string sessionId, + CancellationToken ct = default); +} +``` + +字段语义: + +- `ActorId`:权威 actor 地址。 +- `SessionId`:AI chat session id;draft-run 默认来自 command correlation id,approval approved continuation 会生成新的 continuation session id。 +- `CorrelationId`:追踪 id,不作为 actor identity。 +- `InteractionKind`:区分 draft-run 与 approval,避免同一 readmodel 混淆入口语义。 +- `Status`:强类型终态,不放入 bag。 +- `ReasonCode` / `ReasonMessage`:失败、拒绝、超时等用户可见或审计相关原因;首选强类型 code,message 只承载展示文本,不作为控制流依据。 +- `StateVersion` / `LastEventId`:来自 committed source 的版本/事件标识,用于诚实暴露读模型水位。 +- `ObservedAt`:投影观察时间,不冒充 actor commit time。 + +### 7.2 Projection readmodel proto + +在 `src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto` 中新增: + +```proto +message GAgentRunTerminalReadModel { + string id = 1; + string actor_id = 2; + int64 state_version = 3; + string last_event_id = 4; + string session_id = 5; + string correlation_id = 6; + int32 interaction_kind = 7; + int32 status = 8; + string reason_code = 9; + string reason_message = 10; + google.protobuf.Timestamp observed_at_utc_value = 11; +} +``` + +`id` 建议使用稳定复合键: + +```text +gagent-run-terminal:{actorId}:{correlationId} +``` + +如果事件只有 `session_id` 没有 `correlation_id`,投影可以写入 session-keyed 文档: + +```text +gagent-run-terminal-session:{actorId}:{sessionId} +``` + +但 resolver 查询优先级必须固定为: + +1. `actorId + correlationId` +2. `actorId + sessionId` + +这能兼容 draft-run 默认 session,也能覆盖 approval approved continuation。approval continuation 会生成新的 `ChatRequestEvent.SessionId`,但 self-message 仍应继承 approval command 的 `CorrelationId`,因此 correlation 是更稳定的恢复键。 + +## 8. Terminal fact 来源 + +### 8.1 Draft-run + +draft-run durable 终态来自目标 actor committed 的 `RoleChatSessionCompletedEvent` 或后续专用 typed terminal event。 + +目标映射规则: + +| 输入 | durable status | +|---|---| +| committed terminal fact 明确成功 | `TextMessageCompleted` | +| committed terminal fact 明确失败 | `Failed` | + +过渡期若只能消费现有 `RoleChatSessionCompletedEvent`,可以把正常内容映射为 `TextMessageCompleted`,并临时识别 `Content` 中的 `[[AEVATAR_LLM_ERROR]]` / `LLM request failed:` legacy marker 为 `Failed`。但 failure marker 只能作为 legacy fallback;实现应尽快前移为 typed status/reason 字段或专用 committed terminal event,禁止把内容字符串格式固化为长期事实协议。 + +`RunFinished` 是 interaction finalize 阶段为 `TextMessageCompleted` 补发的 AGUI frame,不是首版 durable readmodel 的事实输入。 + +### 8.2 Approval + +approval 的终态不是 `ToolApprovalDecisionEvent` 本身。`ToolApprovalDecisionEvent` 只是 command 输入;真正终态必须是 actor 处理审批后持久化出的 committed terminal fact: + +- approval 被批准后,actor continuation 继续执行并持久化 `RoleChatSessionCompletedEvent`。 +- approval 被拒绝或超时后,actor 必须持久化可映射为 `Failed` 的 terminal fact。 + +当前代码中 denied / remote timeout 路径只持久化 `ClearPendingApprovalEvent`。这还不能开发成完整 #370,因为 `ClearPendingApprovalEvent` 只表示 pending approval 被清掉,不等价于用户会话终态。 + +开发前置项: + +- 要么在 denied / timeout 路径持久化一个失败的 `RoleChatSessionCompletedEvent`。 +- 要么新增一个强类型 `ToolApprovalTerminalEvent` / `GAgentRunTerminalEvent`,字段包含 `request_id/session_id/correlation_id/status/reason_code/reason_message`,并由 terminal projector 消费。 + +这是 approval durable completion 的 blocker:如果 denied / timeout 的 committed terminal fact 没有补齐,approval 部分不得声明完成,只能保持 `Incomplete`。不允许把 `ClearPendingApprovalEvent` 直接投影成 terminal failure。 + +## 9. Projection 设计 + +新增投影组件建议: + +- `GAgentRunTerminalProjectionContext` +- `GAgentRunTerminalRuntimeLease` +- `GAgentRunTerminalProjectionPort` +- `GAgentRunTerminalReadModelMetadataProvider` +- `GAgentRunTerminalProjector` +- `GAgentRunTerminalQueryReader` + +其中 `GAgentRunTerminalProjector` 应实现 current-state materializer,写入 `GAgentRunTerminalReadModel`。它可以参考 `ServiceRunCurrentStateProjector` 的覆盖写入模型: + +1. 只消费 `CommittedStateEventPublished` 中的 committed terminal payload。 +2. 首版只处理 `RoleChatSessionCompletedEvent` 与后续显式 terminal event。 +3. 使用权威版本写 `StateVersion`,不自增本地版本。 +4. 旧版本不得覆盖新版本,重复写入必须幂等。 +5. projector 可以读取 committed envelope 的 `Propagation.CorrelationId`,但不得把 `CorrelationId` 当成 `CommandId`。 + +最小实现可以先复用现有 GAgent projection runtime 装配方式: + +- session live projector 继续保留 `GAgentDraftRunSessionEventProjector`。 +- durable terminal projector 新增为 independent durable materialization projector。 +- 二者消费同一 Projection Pipeline 输入,一对多分发 + +这样 CQRS 与 AGUI 仍然是统一入口,区别只在输出:一个写 live session hub,一个写 durable readmodel。 + +## 10. Resolver 设计 + +`GAgentDraftRunDurableCompletionResolver` 改为依赖 `IGAgentRunTerminalQueryPort`: + +```csharp +internal sealed class GAgentDraftRunDurableCompletionResolver + : ICommandDurableCompletionResolver +{ + private readonly IGAgentRunTerminalQueryPort _queryPort; + + public async Task> ResolveAsync( + GAgentDraftRunAcceptedReceipt receipt, + CancellationToken ct = default) + { + var snapshot = await _queryPort.GetByCorrelationIdAsync(receipt.ActorId, receipt.CorrelationId, ct); + return Map(snapshot); + } +} +``` + +`GAgentApprovalDurableCompletionResolver` 同样依赖 `IGAgentRunTerminalQueryPort`,优先 `correlationId`,若 receipt 扩展携带原始 `SessionId`,再 fallback 到 `sessionId`。 + +映射规则: + +| snapshot status | draft-run completion | approval completion | +|---|---|---| +| `TextMessageCompleted` | `TextMessageCompleted` | `TextMessageCompleted` | +| `RunFinished` | `RunFinished` | `RunFinished` | +| `Failed` | `Failed` | `Failed` | +| missing / non-terminal / query error | `Incomplete` | `Incomplete` | + +异常策略: + +- `OperationCanceledException` 且 `ct.IsCancellationRequested`:继续抛出。 +- projection/read store 临时异常:返回 `Incomplete`,保持 interaction service 的现有容错语义。 +- 结构性异常,例如 receipt 缺 actor id:抛参数异常。 + +## 11. Receipt 与 session id + +当前 receipt: + +- `GAgentDraftRunAcceptedReceipt(ActorId, ActorTypeName, CommandId, CorrelationId)` +- `GAgentApprovalAcceptedReceipt(ActorId, CommandId, CorrelationId)` + +draft-run envelope 的 `ChatRequestEvent.SessionId` 默认使用 `context.CorrelationId`。因此 durable resolver 按 `CorrelationId` 查询,必要时再按同值 `SessionId` fallback。 + +approval command 模型已有 `SessionId`,但 `GAgentApprovalAcceptedReceipt` 当前没有携带它。建议把 receipt 扩展为: + +```csharp +public sealed record GAgentApprovalAcceptedReceipt( + string ActorId, + string CommandId, + string CorrelationId, + string SessionId); +``` + +这不是兼容性负担,仓库规则允许删除/调整无价值兼容壳。补上后 resolver 能在 correlation lookup 缺失时 fallback 到 `actorId + sessionId`。但 approved continuation 可能会生成新的 continuation `SessionId`,所以 `CorrelationId` 必须是主键。 + +## 12. 不能做的实现 + +以下实现路径禁止: + +1. Resolver 中读取 `IEventStore` 并 replay `RoleGAgentState`。 +2. Resolver 中调用 `IActorRuntime.GetAsync(actorId)` 后读取 actor 内部 state。 +3. 在 `IGAgentDraftRunProjectionPort` 上加 `actorId -> context` 查询。 +4. 在 Host endpoint 中保存 `Dictionary`。 +5. 用 `ConcurrentDictionary` / `HashSet` 在中间层缓存 terminal fact。 +6. 用 `Metadata["status"]`、`Headers["completion"]` 或 arbitrary string key 承载终态。 +7. 把 `ToolApprovalDecisionEvent` 或 `ClearPendingApprovalEvent` 直接当成 approval 完成。 +8. 查询前同步 activate projection 或 query-time priming。 +9. 假设 `CommandId == CorrelationId`,或从 `CorrelationId` 反推出 `CommandId`。 +10. 把 live `TextMessageEndEvent` 当成 durable terminal fact。 + +## 13. 本次明确不做项 + +本次实现只补齐 #370 所需的 durable terminal completion 主链,不同时重做 terminal reason 协议。下面两个点保留为后续演进项,原因是它们不影响当前 durable completion 的正确性,但会扩大 committed event/proto 协议面。 + +### 13.1 不把 legacy failure marker 立刻替换为 typed terminal event + +当前 `RoleChatSessionCompletedEvent.Content` 中的 `[[AEVATAR_LLM_ERROR]]` / `LLM request failed:` marker 只作为 legacy fallback 使用。它可以让既有 committed fact 映射为 `Failed`,从而覆盖 approval denied / timeout / failure 等 missed-live-event 恢复路径。 + +后续更完整的方向是新增专用 typed terminal event,或在现有 terminal committed event 中补 typed `status/reason_code/reason_message` 字段。该改动会触及 `RoleGAgent` committed event 契约与 projector 输入协议,应该单独设计和迁移,不混进 #370 的 resolver/readmodel 修复。 + +### 13.2 不把 `ReasonCode` 改成 enum + +`ReasonCode` 本次保持为 string,是因为它当前只用于 durable readmodel 的失败原因记录与展示映射,不作为跨模块控制流或稳定过滤条件。它比开放 bag 更收敛,也避免在 legacy marker 过渡期过早冻结 enum 值集合。 + +如果后续 terminal reason 参与控制流、查询过滤、API 稳定契约或外部 SDK 类型生成,应随 typed terminal event 一起演进为 proto enum / typed code,并补充对应兼容迁移测试。 + +## 14. 实施步骤 + +建议按以下顺序实现: + +1. 在 Abstractions 新增 `GAgentRunTerminalStatus`、`GAgentRunTerminalInteractionKind`、`GAgentRunTerminalSnapshot`、`IGAgentRunTerminalQueryPort`。 +2. 在 Projection proto 新增 `GAgentRunTerminalReadModel`,补 partial、metadata provider、query reader。 +3. 新增 durable materialization context/runtime lease/projection port。 +4. 新增 durable terminal projector,消费 committed `RoleChatSessionCompletedEvent` 并只写 terminal readmodel。 +5. 注册 projection materializer、metadata provider、query port、activation port。 +6. 修改 draft-run / approval binder,在 dispatch 前 activate terminal materialization。 +7. 修改 draft-run / approval durable resolver 注入 query port。 +8. 扩展 approval receipt 携带 `SessionId`。 +9. 补 approval denied / timeout committed terminal fact。 +10. 移除“resolver always incomplete”测试,改为 durable terminal recovery 测试。 + +## 15. 测试矩阵 + +单元测试: + +1. `GAgentDraftRunDurableCompletionResolver` 读取 `TextMessageCompleted` snapshot 后返回 terminal completion。 +2. `GAgentDraftRunDurableCompletionResolver` 读取 `Failed` snapshot 后返回 failed completion。 +3. `GAgentApprovalDurableCompletionResolver` 优先按 correlation id 查询。 +4. `GAgentApprovalDurableCompletionResolver` correlation id miss 后按 session id 查询。 +5. query port 返回 null 或抛非取消异常时 resolver 返回 `Incomplete`。 +6. 取消 token 触发的 `OperationCanceledException` 不被吞掉。 + +投影测试: + +1. committed `RoleChatSessionCompletedEvent` 正常内容物化 `TextMessageCompleted`。 +2. committed `RoleChatSessionCompletedEvent` 失败 marker 物化 `Failed`。 +3. live `TextMessageEndEvent` 不写 durable readmodel。 +4. `ClearPendingApprovalEvent` 不写 durable terminal readmodel。 +5. 旧 `StateVersion` 不覆盖新 readmodel。 +6. 非 terminal payload 不写 readmodel。 + +交互测试: + +1. live stream 未收到 terminal event,但 durable readmodel 已有 `TextMessageCompleted`,interaction finalize 返回 completed。 +2. live stream 未收到 terminal event,但 durable readmodel 已有 `Failed`,interaction finalize 返回 failed。 +3. approval continuation 完成后,resolver 可以通过 receipt/session 恢复 terminal 状态。 +4. approval denied / timeout 后,resolver 可以通过 committed terminal fact 恢复 failed 状态。 +5. `CommandId != CorrelationId` 时,live session 与 durable resolver 都仍能按 correlation/session 正常完成。 +6. draft-run binder 在 dispatch 前 activate terminal materialization,再 attach live session observation。 +7. approval binder 在 dispatch 前 activate terminal materialization,再 attach live session observation。 +8. resolver/query path 不触发 projection activation,防止 query-time priming 回潮。 + +## 16. 验证命令 + +涉及 current-state readmodel、projection query path 与测试新增,提交前至少执行: + +```bash +dotnet build aevatar.slnx --nologo +dotnet test test/Aevatar.GAgentService.Tests/Aevatar.GAgentService.Tests.csproj --nologo --filter "GAgentDraftRunInteraction|GAgentApprovalInteraction|GAgentRunTerminal" +dotnet test test/Aevatar.GAgentService.Integration.Tests/Aevatar.GAgentService.Integration.Tests.csproj --nologo --filter "ScopeGAgent|ScopeService" +bash tools/ci/test_stability_guards.sh +bash tools/ci/query_projection_priming_guard.sh +bash tools/ci/projection_state_version_guard.sh +bash tools/ci/projection_state_mirror_current_state_guard.sh +bash tools/ci/projection_route_mapping_guard.sh +``` + +若实现同时调整 projection runtime 注册或 actor binding 边界,也必须执行: + +```bash +bash tools/ci/architecture_guards.sh +bash tools/ci/workflow_binding_boundary_guard.sh +``` + +## 17. 验收口径 + +Issue 370 可关闭的条件: + +1. 两个 resolver 不再返回固定 `Incomplete`。 +2. resolver 只依赖 `IGAgentRunTerminalQueryPort` 这类 readmodel query contract。 +3. draft-run 和 approval missed-live-event 场景可从 durable readmodel 恢复 terminal completion。 +4. 没有 query-time replay、query-time priming、runtime lease 侧读或中间层内存事实映射。 +5. 测试不再断言占位行为,而是覆盖 durable terminal recovery。 +6. durable readmodel 只由 committed terminal fact 物化,不消费 live-only terminal payload。 +7. `CommandId` 与 `CorrelationId` 语义保持分离,测试覆盖二者不相等的情况。 +8. 文档与 ADR related 链接同步更新。 diff --git a/src/Aevatar.AI.Core/RoleGAgent.cs b/src/Aevatar.AI.Core/RoleGAgent.cs index b978224aa..0adf5c3d1 100644 --- a/src/Aevatar.AI.Core/RoleGAgent.cs +++ b/src/Aevatar.AI.Core/RoleGAgent.cs @@ -137,9 +137,10 @@ public async Task HandleToolApprovalDecision(ToolApprovalDecisionEvent evt) "[{Role}] Approval continuation FAILED. request={RequestId}", RoleName, pending.RequestId); - // Still clear pending so we don't get stuck - try { await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); } - catch { /* best effort */ } + await TryPersistApprovalTerminalFailureThenClearPendingAsync( + pending, + "approval_continuation_failed", + ex.Message); throw; // Re-throw so the SSE endpoint sees the error } @@ -150,7 +151,12 @@ public async Task HandleToolApprovalDecision(ToolApprovalDecisionEvent evt) } else { - await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); + await PersistApprovalTerminalFailureThenClearPendingAsync( + pending, + "approval_denied", + string.IsNullOrWhiteSpace(evt.Reason) + ? "Tool approval denied." + : evt.Reason); } } @@ -176,19 +182,23 @@ public async Task HandleToolApprovalTimeout(ToolApprovalTimeoutFiredEvent evt) { Logger.LogWarning("[{Role}] No remote approval handler configured. Clearing pending. request={RequestId}", RoleName, evt.RequestId); - await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); + await PersistApprovalTerminalFailureThenClearPendingAsync( + pending, + "approval_timeout", + "Tool approval timed out and no remote approval handler is configured."); return; } // Restore metadata (NyxID access token) for the remote handler var prevMetadata = AgentToolRequestContext.CurrentMetadata; + ToolApprovalResult? result = null; try { AgentToolRequestContext.CurrentMetadata = pending.Metadata.Count > 0 ? new Dictionary(pending.Metadata, StringComparer.Ordinal) : null; - var result = await remoteHandler.RequestApprovalAsync( + result = await remoteHandler.RequestApprovalAsync( new ToolApprovalRequest { RequestId = pending.RequestId, @@ -225,8 +235,13 @@ await HandleToolApprovalDecision(new ToolApprovalDecisionEvent AgentToolRequestContext.CurrentMetadata = prevMetadata; } - // Remote failed/denied/timed out → clear pending - await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); + // Remote failed/denied/timed out → persist terminal fact before clearing pending. + await PersistApprovalTerminalFailureThenClearPendingAsync( + pending, + ResolveApprovalTerminalReasonCode(result?.Decision), + string.IsNullOrWhiteSpace(result?.Reason) + ? "Tool approval timed out or was denied remotely." + : result.Reason); } /// Override in subclasses to provide the NyxID remote approval handler for timeout escalation. @@ -703,6 +718,77 @@ private Task PersistSessionCompletionAsync(ChatRequestEvent request, SessionRepl }); } + private async Task PersistApprovalTerminalFailureThenClearPendingAsync( + PendingToolApprovalState pending, + string reasonCode, + string reasonMessage) + { + await PersistApprovalTerminalFailureAsync(pending, reasonCode, reasonMessage); + await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); + } + + private async Task TryPersistApprovalTerminalFailureThenClearPendingAsync( + PendingToolApprovalState pending, + string reasonCode, + string reasonMessage) + { + try + { + await PersistApprovalTerminalFailureAsync(pending, reasonCode, reasonMessage); + } + catch (Exception ex) + { + Logger.LogError( + ex, + "[{Role}] Failed to persist approval terminal failure. request={RequestId} session={SessionId} reasonCode={ReasonCode}", + RoleName, + pending.RequestId, + pending.SessionId, + reasonCode); + return; + } + + try + { + await PersistDomainEventAsync(new ClearPendingApprovalEvent { RequestId = pending.RequestId }); + } + catch (Exception ex) + { + Logger.LogWarning( + ex, + "[{Role}] Failed to clear pending approval after terminal failure was persisted. request={RequestId} session={SessionId} reasonCode={ReasonCode}", + RoleName, + pending.RequestId, + pending.SessionId, + reasonCode); + } + } + + private Task PersistApprovalTerminalFailureAsync( + PendingToolApprovalState pending, + string reasonCode, + string reasonMessage) + { + if (string.IsNullOrWhiteSpace(pending.SessionId)) + return Task.CompletedTask; + + var safeReason = string.IsNullOrWhiteSpace(reasonMessage) + ? "Tool approval failed." + : reasonMessage.Trim(); + return PersistDomainEventAsync(new RoleChatSessionCompletedEvent + { + SessionId = pending.SessionId, + Content = BuildLlmFailureContent($"{reasonCode}: {safeReason}"), + Prompt = BuildContinuationPrompt(pending, safeReason), + ContentEmitted = false, + }); + } + + private static string ResolveApprovalTerminalReasonCode(ToolApprovalDecision? decision) => + decision == ToolApprovalDecision.Denied + ? "approval_denied" + : "approval_timeout"; + private async Task ReplayCompletedSessionAsync(string sessionId, RoleChatSessionState trackedSession) { await PublishAsync(new TextMessageStartEvent diff --git a/src/Aevatar.CQRS.Projection.Core.Abstractions/Abstractions/Core/IProjectionSessionScopedMaterializationContext.cs b/src/Aevatar.CQRS.Projection.Core.Abstractions/Abstractions/Core/IProjectionSessionScopedMaterializationContext.cs new file mode 100644 index 000000000..a3ed0aaeb --- /dev/null +++ b/src/Aevatar.CQRS.Projection.Core.Abstractions/Abstractions/Core/IProjectionSessionScopedMaterializationContext.cs @@ -0,0 +1,9 @@ +namespace Aevatar.CQRS.Projection.Core.Abstractions; + +/// +/// Durable materialization scope that is partitioned by an explicit session key. +/// +public interface IProjectionSessionScopedMaterializationContext : IProjectionMaterializationContext +{ + string SessionId { get; } +} diff --git a/src/Aevatar.CQRS.Projection.Core/DependencyInjection/ProjectionMaterializationRuntimeRegistration.cs b/src/Aevatar.CQRS.Projection.Core/DependencyInjection/ProjectionMaterializationRuntimeRegistration.cs index 89ef6a27b..f5c917e20 100644 --- a/src/Aevatar.CQRS.Projection.Core/DependencyInjection/ProjectionMaterializationRuntimeRegistration.cs +++ b/src/Aevatar.CQRS.Projection.Core/DependencyInjection/ProjectionMaterializationRuntimeRegistration.cs @@ -37,7 +37,8 @@ public static IServiceCollection AddProjectionMaterializationRuntimeCore contextFactory(new ProjectionRuntimeScopeKey( request.RootActorId, request.ProjectionKind, - ProjectionRuntimeMode.DurableMaterialization)), + ProjectionRuntimeMode.DurableMaterialization, + request.SessionId)), (_, context) => leaseFactory(context), sp.GetService(), sp.GetService(), @@ -51,7 +52,10 @@ public static IServiceCollection AddProjectionMaterializationRuntimeCore new ProjectionRuntimeScopeKey( lease.Context.RootActorId, lease.Context.ProjectionKind, - ProjectionRuntimeMode.DurableMaterialization), + ProjectionRuntimeMode.DurableMaterialization, + lease.Context is IProjectionSessionScopedMaterializationContext scopedContext + ? scopedContext.SessionId + : string.Empty), sp.GetService())); return services; } diff --git a/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentDraftRunModels.cs b/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentDraftRunModels.cs index 817b93de6..8bea47a5a 100644 --- a/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentDraftRunModels.cs +++ b/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentDraftRunModels.cs @@ -58,7 +58,8 @@ public sealed record GAgentDraftRunAcceptedReceipt( string ActorId, string ActorTypeName, string CommandId, - string CorrelationId); + string CorrelationId, + string SessionId = ""); public sealed record GAgentApprovalCommand( string ActorId, @@ -90,4 +91,5 @@ public enum GAgentApprovalCompletionStatus public sealed record GAgentApprovalAcceptedReceipt( string ActorId, string CommandId, - string CorrelationId); + string CorrelationId, + string SessionId); diff --git a/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentRunTerminalModels.cs b/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentRunTerminalModels.cs new file mode 100644 index 000000000..458b69a81 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Abstractions/ScopeGAgents/GAgentRunTerminalModels.cs @@ -0,0 +1,63 @@ +namespace Aevatar.GAgentService.Abstractions.ScopeGAgents; + +public enum GAgentRunTerminalStatus +{ + Unknown = 0, + TextMessageCompleted = 1, + RunFinished = 2, + Failed = 3, +} + +public enum GAgentRunTerminalInteractionKind +{ + Unknown = 0, + DraftRun = 1, + Approval = 2, +} + +public sealed record GAgentRunTerminalSnapshot( + string ActorId, + string SessionId, + string CorrelationId, + GAgentRunTerminalInteractionKind InteractionKind, + GAgentRunTerminalStatus Status, + string ReasonCode, + string ReasonMessage, + long StateVersion, + string LastEventId, + DateTimeOffset ObservedAt); + +public interface IGAgentRunTerminalQueryPort +{ + Task GetByCorrelationIdAsync( + string actorId, + string correlationId, + CancellationToken ct = default); + + Task GetBySessionIdAsync( + string actorId, + string sessionId, + CancellationToken ct = default); +} + +public interface IGAgentRunTerminalProjectionLease +{ + string ActorId { get; } + + string CorrelationId { get; } + + GAgentRunTerminalInteractionKind InteractionKind { get; } +} + +public interface IGAgentRunTerminalProjectionPort +{ + Task EnsureProjectionAsync( + string actorId, + string correlationId, + GAgentRunTerminalInteractionKind interactionKind, + CancellationToken ct = default); + + Task ReleaseProjectionAsync( + IGAgentRunTerminalProjectionLease lease, + CancellationToken ct = default); +} diff --git a/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentApprovalInteraction.cs b/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentApprovalInteraction.cs index 78d31b542..66a23d48e 100644 --- a/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentApprovalInteraction.cs +++ b/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentApprovalInteraction.cs @@ -18,27 +18,39 @@ internal sealed class GAgentApprovalCommandTarget ICommandDispatchCleanupAware { private readonly IGAgentDraftRunProjectionPort _projectionPort; + private readonly IGAgentRunTerminalProjectionPort _terminalProjectionPort; public GAgentApprovalCommandTarget( IActor actor, - IGAgentDraftRunProjectionPort projectionPort) + IGAgentDraftRunProjectionPort projectionPort, + IGAgentRunTerminalProjectionPort terminalProjectionPort) { Actor = actor ?? throw new ArgumentNullException(nameof(actor)); _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + _terminalProjectionPort = terminalProjectionPort ?? throw new ArgumentNullException(nameof(terminalProjectionPort)); } public IActor Actor { get; } public string TargetId => Actor.Id; public string ActorId => Actor.Id; + public string SessionId { get; private set; } = string.Empty; public IGAgentDraftRunProjectionLease? ProjectionLease { get; private set; } + public IGAgentRunTerminalProjectionLease? TerminalProjectionLease { get; private set; } public IEventSink? LiveSink { get; private set; } + public void BindTerminalProjection(IGAgentRunTerminalProjectionLease? lease) + { + TerminalProjectionLease = lease; + } + public void BindLiveObservation( IGAgentDraftRunProjectionLease lease, - IEventSink sink) + IEventSink sink, + string sessionId) { ProjectionLease = lease ?? throw new ArgumentNullException(nameof(lease)); LiveSink = sink ?? throw new ArgumentNullException(nameof(sink)); + SessionId = sessionId; } public IEventSink RequireLiveSink() => @@ -110,6 +122,20 @@ await _projectionPort.DetachReleaseAndDisposeAsync( } } + var terminalProjectionLease = TerminalProjectionLease; + if (terminalProjectionLease != null) + { + try + { + await _terminalProjectionPort.ReleaseProjectionAsync(terminalProjectionLease, ct); + TerminalProjectionLease = null; + } + catch (Exception ex) + { + firstException ??= ex; + } + } + if (firstException != null) ExceptionDispatchInfo.Capture(firstException).Throw(); } @@ -120,13 +146,16 @@ internal sealed class GAgentApprovalCommandTargetResolver { private readonly IActorRuntime _actorRuntime; private readonly IGAgentDraftRunProjectionPort _projectionPort; + private readonly IGAgentRunTerminalProjectionPort _terminalProjectionPort; public GAgentApprovalCommandTargetResolver( IActorRuntime actorRuntime, - IGAgentDraftRunProjectionPort projectionPort) + IGAgentDraftRunProjectionPort projectionPort, + IGAgentRunTerminalProjectionPort terminalProjectionPort) { _actorRuntime = actorRuntime ?? throw new ArgumentNullException(nameof(actorRuntime)); _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + _terminalProjectionPort = terminalProjectionPort ?? throw new ArgumentNullException(nameof(terminalProjectionPort)); } public async Task> ResolveAsync( @@ -143,7 +172,7 @@ public async Task.Success( - new GAgentApprovalCommandTarget(actor, _projectionPort)); + new GAgentApprovalCommandTarget(actor, _projectionPort, _terminalProjectionPort)); } } @@ -151,11 +180,14 @@ internal sealed class GAgentApprovalCommandTargetBinder : ICommandTargetBinder { private readonly IGAgentDraftRunProjectionPort _projectionPort; + private readonly IGAgentRunTerminalProjectionPort _terminalProjectionPort; public GAgentApprovalCommandTargetBinder( - IGAgentDraftRunProjectionPort projectionPort) + IGAgentDraftRunProjectionPort projectionPort, + IGAgentRunTerminalProjectionPort terminalProjectionPort) { _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + _terminalProjectionPort = terminalProjectionPort ?? throw new ArgumentNullException(nameof(terminalProjectionPort)); } public async Task> BindAsync( @@ -169,13 +201,21 @@ public async Task> BindAsyn ArgumentNullException.ThrowIfNull(context); var sink = new EventChannel(); + IGAgentRunTerminalProjectionLease? terminalProjectionLease = null; try { + terminalProjectionLease = await _terminalProjectionPort.EnsureProjectionAsync( + target.ActorId, + context.CorrelationId, + GAgentRunTerminalInteractionKind.Approval, + ct); + target.BindTerminalProjection(terminalProjectionLease); + var projectionLease = await _projectionPort.EnsureAndAttachAsync( token => _projectionPort.EnsureActorProjectionAsync( target.ActorId, - context.CommandId, + context.CorrelationId, token), sink, ct); @@ -187,11 +227,20 @@ public async Task> BindAsyn throw new InvalidOperationException("GAgent approval projection pipeline is unavailable."); } - target.BindLiveObservation(projectionLease, sink); + target.BindLiveObservation( + projectionLease, + sink, + command.SessionId?.Trim() ?? string.Empty); return CommandTargetBindingResult.Success(); } catch { + if (terminalProjectionLease != null) + { + await _terminalProjectionPort.ReleaseProjectionAsync(terminalProjectionLease, ct); + target.BindTerminalProjection(null); + } + sink.Complete(); await sink.DisposeAsync(); throw; @@ -242,7 +291,8 @@ public GAgentApprovalAcceptedReceipt Create( return new GAgentApprovalAcceptedReceipt( target.ActorId, context.CommandId, - context.CorrelationId); + context.CorrelationId, + target.SessionId); } } @@ -307,12 +357,58 @@ public Task EmitAsync( internal sealed class GAgentApprovalDurableCompletionResolver : ICommandDurableCompletionResolver { - public Task> ResolveAsync( + private readonly IGAgentRunTerminalQueryPort _queryPort; + + public GAgentApprovalDurableCompletionResolver( + IGAgentRunTerminalQueryPort queryPort) + { + _queryPort = queryPort ?? throw new ArgumentNullException(nameof(queryPort)); + } + + public async Task> ResolveAsync( GAgentApprovalAcceptedReceipt receipt, CancellationToken ct = default) { ArgumentNullException.ThrowIfNull(receipt); - _ = ct; - return Task.FromResult(CommandDurableCompletionObservation.Incomplete); + + try + { + var snapshot = await _queryPort.GetByCorrelationIdAsync(receipt.ActorId, receipt.CorrelationId, ct); + if (!MatchesReceipt(snapshot, receipt)) + snapshot = null; + if (snapshot == null && !string.IsNullOrWhiteSpace(receipt.SessionId)) + { + var sessionSnapshot = await _queryPort.GetBySessionIdAsync(receipt.ActorId, receipt.SessionId, ct); + if (MatchesReceipt(sessionSnapshot, receipt)) + snapshot = sessionSnapshot; + } + return Map(snapshot); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch + { + return CommandDurableCompletionObservation.Incomplete; + } } + + private static bool MatchesReceipt( + GAgentRunTerminalSnapshot? snapshot, + GAgentApprovalAcceptedReceipt receipt) => + snapshot != null && + string.Equals(snapshot.ActorId, receipt.ActorId, StringComparison.Ordinal) && + string.Equals(snapshot.CorrelationId, receipt.CorrelationId, StringComparison.Ordinal) && + snapshot.InteractionKind == GAgentRunTerminalInteractionKind.Approval; + + private static CommandDurableCompletionObservation Map( + GAgentRunTerminalSnapshot? snapshot) => + snapshot?.Status switch + { + GAgentRunTerminalStatus.TextMessageCompleted => new(true, GAgentApprovalCompletionStatus.TextMessageCompleted), + GAgentRunTerminalStatus.RunFinished => new(true, GAgentApprovalCompletionStatus.RunFinished), + GAgentRunTerminalStatus.Failed => new(true, GAgentApprovalCompletionStatus.Failed), + _ => CommandDurableCompletionObservation.Incomplete, + }; } diff --git a/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentDraftRunInteraction.cs b/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentDraftRunInteraction.cs index 893c64c70..000fd8025 100644 --- a/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentDraftRunInteraction.cs +++ b/src/platform/Aevatar.GAgentService.Application/ScopeGAgents/GAgentDraftRunInteraction.cs @@ -20,32 +20,44 @@ internal sealed class GAgentDraftRunCommandTarget ICommandDispatchCleanupAware { private readonly IGAgentDraftRunProjectionPort _projectionPort; + private readonly IGAgentRunTerminalProjectionPort _terminalProjectionPort; public GAgentDraftRunCommandTarget( IActor actor, string actorTypeName, - IGAgentDraftRunProjectionPort projectionPort) + IGAgentDraftRunProjectionPort projectionPort, + IGAgentRunTerminalProjectionPort terminalProjectionPort) { Actor = actor ?? throw new ArgumentNullException(nameof(actor)); ActorTypeName = string.IsNullOrWhiteSpace(actorTypeName) ? throw new ArgumentException("Actor type name is required.", nameof(actorTypeName)) : actorTypeName.Trim(); _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + _terminalProjectionPort = terminalProjectionPort ?? throw new ArgumentNullException(nameof(terminalProjectionPort)); } public IActor Actor { get; } public string ActorTypeName { get; } public string TargetId => Actor.Id; public string ActorId => Actor.Id; + public string SessionId { get; private set; } = string.Empty; public IGAgentDraftRunProjectionLease? ProjectionLease { get; private set; } + public IGAgentRunTerminalProjectionLease? TerminalProjectionLease { get; private set; } public IEventSink? LiveSink { get; private set; } + public void BindTerminalProjection(IGAgentRunTerminalProjectionLease? lease) + { + TerminalProjectionLease = lease; + } + public void BindLiveObservation( IGAgentDraftRunProjectionLease lease, - IEventSink sink) + IEventSink sink, + string sessionId) { ProjectionLease = lease ?? throw new ArgumentNullException(nameof(lease)); LiveSink = sink ?? throw new ArgumentNullException(nameof(sink)); + SessionId = sessionId; } public IEventSink RequireLiveSink() => @@ -117,6 +129,20 @@ await _projectionPort.DetachReleaseAndDisposeAsync( } } + var terminalProjectionLease = TerminalProjectionLease; + if (terminalProjectionLease != null) + { + try + { + await _terminalProjectionPort.ReleaseProjectionAsync(terminalProjectionLease, ct); + TerminalProjectionLease = null; + } + catch (Exception ex) + { + firstException ??= ex; + } + } + if (firstException != null) ExceptionDispatchInfo.Capture(firstException).Throw(); } @@ -127,15 +153,18 @@ internal sealed class GAgentDraftRunCommandTargetResolver { private readonly IActorRuntime _actorRuntime; private readonly IGAgentDraftRunProjectionPort _projectionPort; + private readonly IGAgentRunTerminalProjectionPort _terminalProjectionPort; private readonly IAgentTypeVerifier? _agentTypeVerifier; public GAgentDraftRunCommandTargetResolver( IActorRuntime actorRuntime, IGAgentDraftRunProjectionPort projectionPort, + IGAgentRunTerminalProjectionPort terminalProjectionPort, IAgentTypeVerifier? agentTypeVerifier = null) { _actorRuntime = actorRuntime ?? throw new ArgumentNullException(nameof(actorRuntime)); _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + _terminalProjectionPort = terminalProjectionPort ?? throw new ArgumentNullException(nameof(terminalProjectionPort)); _agentTypeVerifier = agentTypeVerifier; } @@ -181,7 +210,7 @@ public async Task.Success( - new GAgentDraftRunCommandTarget(actor, command.ActorTypeName, _projectionPort)); + new GAgentDraftRunCommandTarget(actor, command.ActorTypeName, _projectionPort, _terminalProjectionPort)); } private async Task MatchesExpectedTypeAsync( @@ -206,11 +235,14 @@ internal sealed class GAgentDraftRunCommandTargetBinder : ICommandTargetBinder { private readonly IGAgentDraftRunProjectionPort _projectionPort; + private readonly IGAgentRunTerminalProjectionPort _terminalProjectionPort; public GAgentDraftRunCommandTargetBinder( - IGAgentDraftRunProjectionPort projectionPort) + IGAgentDraftRunProjectionPort projectionPort, + IGAgentRunTerminalProjectionPort terminalProjectionPort) { _projectionPort = projectionPort ?? throw new ArgumentNullException(nameof(projectionPort)); + _terminalProjectionPort = terminalProjectionPort ?? throw new ArgumentNullException(nameof(terminalProjectionPort)); } public async Task> BindAsync( @@ -224,9 +256,17 @@ public async Task> BindAsyn ArgumentNullException.ThrowIfNull(context); var sink = new EventChannel(); + IGAgentRunTerminalProjectionLease? terminalProjectionLease = null; try { + terminalProjectionLease = await _terminalProjectionPort.EnsureProjectionAsync( + target.ActorId, + context.CorrelationId, + GAgentRunTerminalInteractionKind.DraftRun, + ct); + target.BindTerminalProjection(terminalProjectionLease); + var projectionLease = await _projectionPort.EnsureAndAttachAsync( token => _projectionPort.EnsureActorProjectionAsync( target.ActorId, @@ -242,16 +282,32 @@ public async Task> BindAsyn throw new InvalidOperationException("GAgent draft-run projection pipeline is unavailable."); } - target.BindLiveObservation(projectionLease, sink); + target.BindLiveObservation( + projectionLease, + sink, + ResolveSessionId(command, context)); return CommandTargetBindingResult.Success(); } catch { + if (terminalProjectionLease != null) + { + await _terminalProjectionPort.ReleaseProjectionAsync(terminalProjectionLease, ct); + target.BindTerminalProjection(null); + } + sink.Complete(); await sink.DisposeAsync(); throw; } } + + private static string ResolveSessionId( + GAgentDraftRunCommand command, + CommandContext context) => + string.IsNullOrWhiteSpace(command.SessionId) + ? (command.UseCorrelationIdAsFallbackSessionId ? context.CorrelationId : string.Empty) + : command.SessionId.Trim(); } internal sealed class GAgentDraftRunCommandEnvelopeFactory @@ -351,7 +407,8 @@ public GAgentDraftRunAcceptedReceipt Create( target.ActorId, target.ActorTypeName, context.CommandId, - context.CorrelationId); + context.CorrelationId, + target.SessionId); } } @@ -416,12 +473,58 @@ public Task EmitAsync( internal sealed class GAgentDraftRunDurableCompletionResolver : ICommandDurableCompletionResolver { - public Task> ResolveAsync( + private readonly IGAgentRunTerminalQueryPort _queryPort; + + public GAgentDraftRunDurableCompletionResolver( + IGAgentRunTerminalQueryPort queryPort) + { + _queryPort = queryPort ?? throw new ArgumentNullException(nameof(queryPort)); + } + + public async Task> ResolveAsync( GAgentDraftRunAcceptedReceipt receipt, CancellationToken ct = default) { ArgumentNullException.ThrowIfNull(receipt); - _ = ct; - return Task.FromResult(CommandDurableCompletionObservation.Incomplete); + + try + { + var snapshot = await _queryPort.GetByCorrelationIdAsync(receipt.ActorId, receipt.CorrelationId, ct); + if (!MatchesReceipt(snapshot, receipt)) + snapshot = null; + if (snapshot == null && !string.IsNullOrWhiteSpace(receipt.SessionId)) + { + var sessionSnapshot = await _queryPort.GetBySessionIdAsync(receipt.ActorId, receipt.SessionId, ct); + if (MatchesReceipt(sessionSnapshot, receipt)) + snapshot = sessionSnapshot; + } + return Map(snapshot); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + throw; + } + catch + { + return CommandDurableCompletionObservation.Incomplete; + } } + + private static bool MatchesReceipt( + GAgentRunTerminalSnapshot? snapshot, + GAgentDraftRunAcceptedReceipt receipt) => + snapshot != null && + string.Equals(snapshot.ActorId, receipt.ActorId, StringComparison.Ordinal) && + string.Equals(snapshot.CorrelationId, receipt.CorrelationId, StringComparison.Ordinal) && + snapshot.InteractionKind == GAgentRunTerminalInteractionKind.DraftRun; + + private static CommandDurableCompletionObservation Map( + GAgentRunTerminalSnapshot? snapshot) => + snapshot?.Status switch + { + GAgentRunTerminalStatus.TextMessageCompleted => new(true, GAgentDraftRunCompletionStatus.TextMessageCompleted), + GAgentRunTerminalStatus.RunFinished => new(true, GAgentDraftRunCompletionStatus.RunFinished), + GAgentRunTerminalStatus.Failed => new(true, GAgentDraftRunCompletionStatus.Failed), + _ => CommandDurableCompletionObservation.Incomplete, + }; } diff --git a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs index c8d4d1515..15bee348d 100644 --- a/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/platform/Aevatar.GAgentService.Hosting/DependencyInjection/ServiceCollectionExtensions.cs @@ -120,6 +120,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders( TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); + TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); TryAddElasticsearchDocumentProjectionStore(services, configuration, static readModel => readModel.Id); } else @@ -132,6 +133,7 @@ public static IServiceCollection AddGAgentServiceProjectionReadModelProviders( TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); + TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); TryAddInMemoryDocumentProjectionStore(services, static readModel => readModel.Id); } @@ -150,6 +152,7 @@ private static bool HasAllGAgentServiceProjectionReaders( && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind) + && HasProjectionDocumentReaderForProvider(services, providerKind) && HasProjectionDocumentReaderForProvider(services, providerKind); } diff --git a/src/platform/Aevatar.GAgentService.Projection/Contexts/GAgentRunTerminalProjectionContext.cs b/src/platform/Aevatar.GAgentService.Projection/Contexts/GAgentRunTerminalProjectionContext.cs new file mode 100644 index 000000000..78cd5b81b --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Contexts/GAgentRunTerminalProjectionContext.cs @@ -0,0 +1,17 @@ +using Aevatar.GAgentService.Abstractions.ScopeGAgents; + +namespace Aevatar.GAgentService.Projection.Contexts; + +public sealed class GAgentRunTerminalProjectionContext + : IProjectionSessionScopedMaterializationContext +{ + public required string RootActorId { get; init; } + + public required string ProjectionKind { get; init; } + + public required string CorrelationId { get; init; } + + public required GAgentRunTerminalInteractionKind InteractionKind { get; init; } + + public string SessionId => CorrelationId; +} diff --git a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs index 5b557a59c..7e57f3d13 100644 --- a/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/platform/Aevatar.GAgentService.Projection/DependencyInjection/ServiceCollectionExtensions.cs @@ -80,6 +80,15 @@ public static IServiceCollection AddGAgentServiceProjection( ProjectionKind = scopeKey.ProjectionKind, }, static context => new ServiceProjectionRuntimeLease(context.RootActorId, context)); + services.AddServiceProjectionRuntime>( + static scopeKey => new GAgentRunTerminalProjectionContext + { + RootActorId = scopeKey.RootActorId, + ProjectionKind = scopeKey.ProjectionKind, + CorrelationId = scopeKey.SessionId, + InteractionKind = GAgentRunTerminalProjectionPort.ResolveInteractionKind(scopeKey.ProjectionKind), + }, + static context => new ServiceProjectionRuntimeLease(context.RootActorId, context)); services.AddEventSinkProjectionRuntimeCore< GAgentDraftRunProjectionContext, GAgentDraftRunRuntimeLease, @@ -100,6 +109,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton, GAgentDraftRunSessionEventCodec>(); services.TryAddSingleton, ProjectionSessionEventHub>(); services.TryAddSingleton(); @@ -111,6 +121,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton, ServiceTrafficViewReadModelMetadataProvider>(); services.TryAddSingleton, ServiceRevisionCatalogReadModelMetadataProvider>(); services.TryAddSingleton, ServiceRunCurrentStateReadModelMetadataProvider>(); + services.TryAddSingleton, GAgentRunTerminalReadModelMetadataProvider>(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); @@ -119,6 +130,7 @@ public static IServiceCollection AddGAgentServiceProjection( services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.AddProjectionArtifactMaterializer< ServiceCatalogProjectionContext, ServiceCatalogProjector>(); @@ -143,6 +155,9 @@ public static IServiceCollection AddGAgentServiceProjection( services.AddCurrentStateProjectionMaterializer< ServiceRunCurrentStateProjectionContext, ServiceRunCurrentStateProjector>(); + services.AddCurrentStateProjectionMaterializer< + GAgentRunTerminalProjectionContext, + GAgentRunTerminalProjector>(); services.TryAddEnumerable(ServiceDescriptor.Singleton< IProjectionProjector, GAgentDraftRunSessionEventProjector>()); diff --git a/src/platform/Aevatar.GAgentService.Projection/Metadata/GAgentRunTerminalReadModelMetadataProvider.cs b/src/platform/Aevatar.GAgentService.Projection/Metadata/GAgentRunTerminalReadModelMetadataProvider.cs new file mode 100644 index 000000000..09b58d802 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Metadata/GAgentRunTerminalReadModelMetadataProvider.cs @@ -0,0 +1,13 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Metadata; + +public sealed class GAgentRunTerminalReadModelMetadataProvider : IProjectionDocumentMetadataProvider +{ + public DocumentIndexMetadata Metadata { get; } = new( + "gagent-run-terminals", + Mappings: new Dictionary(), + Settings: new Dictionary(), + Aliases: new Dictionary()); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/GAgentRunTerminalProjectionPort.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/GAgentRunTerminalProjectionPort.cs new file mode 100644 index 000000000..055ba4711 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/GAgentRunTerminalProjectionPort.cs @@ -0,0 +1,87 @@ +using Aevatar.GAgentService.Abstractions.ScopeGAgents; +using Aevatar.GAgentService.Projection.Configuration; +using Aevatar.GAgentService.Projection.Contexts; + +namespace Aevatar.GAgentService.Projection.Orchestration; + +public sealed class GAgentRunTerminalProjectionPort + : ServiceProjectionPortBase, + IGAgentRunTerminalProjectionPort +{ + public GAgentRunTerminalProjectionPort( + ServiceProjectionOptions options, + IProjectionScopeActivationService> activationService, + IProjectionScopeReleaseService> releaseService) + : base(options, activationService, releaseService, ServiceProjectionKinds.GAgentRunTerminalDraftRun) + { + } + + public async Task EnsureProjectionAsync( + string actorId, + string correlationId, + GAgentRunTerminalInteractionKind interactionKind, + CancellationToken ct = default) + { + if (string.IsNullOrWhiteSpace(actorId) || string.IsNullOrWhiteSpace(correlationId)) + return null; + + var runtimeLease = await EnsureProjectionAsync( + new ProjectionScopeStartRequest + { + RootActorId = actorId, + ProjectionKind = ResolveProjectionKind(interactionKind), + Mode = ProjectionRuntimeMode.DurableMaterialization, + SessionId = correlationId.Trim(), + }, + ct); + + return runtimeLease == null + ? null + : new GAgentRunTerminalProjectionLease(runtimeLease); + } + + public Task ReleaseProjectionAsync( + IGAgentRunTerminalProjectionLease lease, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(lease); + + if (lease is not GAgentRunTerminalProjectionLease terminalLease) + throw new InvalidOperationException("Unknown GAgent run terminal projection lease implementation."); + + return ReleaseProjectionAsync(terminalLease.RuntimeLease, ct); + } + + internal static string ResolveProjectionKind(GAgentRunTerminalInteractionKind interactionKind) => + interactionKind switch + { + GAgentRunTerminalInteractionKind.DraftRun => ServiceProjectionKinds.GAgentRunTerminalDraftRun, + GAgentRunTerminalInteractionKind.Approval => ServiceProjectionKinds.GAgentRunTerminalApproval, + _ => throw new ArgumentOutOfRangeException(nameof(interactionKind), interactionKind, "Unknown GAgent run terminal interaction kind."), + }; + + public static GAgentRunTerminalInteractionKind ResolveInteractionKind(string projectionKind) => + projectionKind switch + { + ServiceProjectionKinds.GAgentRunTerminalDraftRun => GAgentRunTerminalInteractionKind.DraftRun, + ServiceProjectionKinds.GAgentRunTerminalApproval => GAgentRunTerminalInteractionKind.Approval, + _ => throw new ArgumentOutOfRangeException(nameof(projectionKind), projectionKind, "Unknown GAgent run terminal projection kind."), + }; + + private sealed class GAgentRunTerminalProjectionLease : IGAgentRunTerminalProjectionLease + { + public GAgentRunTerminalProjectionLease( + ServiceProjectionRuntimeLease runtimeLease) + { + RuntimeLease = runtimeLease ?? throw new ArgumentNullException(nameof(runtimeLease)); + } + + internal ServiceProjectionRuntimeLease RuntimeLease { get; } + + public string ActorId => RuntimeLease.Context.RootActorId; + + public string CorrelationId => RuntimeLease.Context.CorrelationId; + + public GAgentRunTerminalInteractionKind InteractionKind => RuntimeLease.Context.InteractionKind; + } +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs index 752c2d8ad..a80eeffb2 100644 --- a/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs +++ b/src/platform/Aevatar.GAgentService.Projection/Orchestration/ServiceProjectionNames.cs @@ -10,4 +10,6 @@ internal static class ServiceProjectionKinds public const string Traffic = "service-traffic"; public const string DraftRunSession = "service-draft-run-session"; public const string Runs = "service-runs"; + public const string GAgentRunTerminalDraftRun = "gagent-run-terminal-draft-run"; + public const string GAgentRunTerminalApproval = "gagent-run-terminal-approval"; } diff --git a/src/platform/Aevatar.GAgentService.Projection/Projectors/GAgentRunTerminalProjector.cs b/src/platform/Aevatar.GAgentService.Projection/Projectors/GAgentRunTerminalProjector.cs new file mode 100644 index 000000000..f4525076d --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Projectors/GAgentRunTerminalProjector.cs @@ -0,0 +1,128 @@ +using Aevatar.AI.Abstractions; +using Aevatar.CQRS.Projection.Core.Orchestration; +using Aevatar.CQRS.Projection.Runtime.Abstractions; +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Abstractions.ScopeGAgents; +using Aevatar.GAgentService.Projection.Contexts; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Projectors; + +public sealed class GAgentRunTerminalProjector + : ICurrentStateProjectionMaterializer +{ + private const string LegacyLlmErrorPrefix = "[[AEVATAR_LLM_ERROR]]"; + private const string LegacyLlmFailedPrefix = "LLM request failed:"; + private const string ReasonCodeLegacyLlmError = "legacy_llm_error"; + + private readonly IProjectionWriteDispatcher _writeDispatcher; + private readonly IProjectionClock _clock; + + public GAgentRunTerminalProjector( + IProjectionWriteDispatcher writeDispatcher, + IProjectionClock clock) + { + _writeDispatcher = writeDispatcher ?? throw new ArgumentNullException(nameof(writeDispatcher)); + _clock = clock ?? throw new ArgumentNullException(nameof(clock)); + } + + public async ValueTask ProjectAsync( + GAgentRunTerminalProjectionContext context, + EventEnvelope envelope, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(context); + ArgumentNullException.ThrowIfNull(envelope); + + if (!CommittedStateEventEnvelope.TryGetObservedPayload( + envelope, + out var payload, + out var eventId, + out var stateVersion) || + payload?.Is(RoleChatSessionCompletedEvent.Descriptor) != true) + { + return; + } + + var completed = payload.Unpack(); + if (string.IsNullOrWhiteSpace(completed.SessionId)) + return; + + var correlationId = envelope.Propagation?.CorrelationId?.Trim() ?? string.Empty; + if (string.IsNullOrWhiteSpace(context.CorrelationId) || + !string.Equals(correlationId, context.CorrelationId, StringComparison.Ordinal)) + { + return; + } + + var documentId = BuildDocumentId(context.RootActorId, correlationId); + var observedAt = CommittedStateEventEnvelope.ResolveTimestamp(envelope, _clock.UtcNow); + var (status, reasonCode, reasonMessage) = ResolveTerminal(completed); + var document = new GAgentRunTerminalReadModel + { + Id = documentId, + ActorId = context.RootActorId, + StateVersion = stateVersion, + LastEventId = eventId, + SessionId = completed.SessionId, + CorrelationId = correlationId, + InteractionKind = (int)context.InteractionKind, + Status = (int)status, + ReasonCode = reasonCode, + ReasonMessage = reasonMessage, + ObservedAt = observedAt, + }; + + await _writeDispatcher.UpsertAsync(document, ct); + } + + public static string BuildDocumentId(string actorId, string key) + { + ArgumentException.ThrowIfNullOrWhiteSpace(actorId); + ArgumentException.ThrowIfNullOrWhiteSpace(key); + return $"gagent-run-terminal:{actorId}:{key}"; + } + + private static (GAgentRunTerminalStatus status, string reasonCode, string reasonMessage) ResolveTerminal( + RoleChatSessionCompletedEvent completed) + { + var content = completed.Content ?? string.Empty; + if (content.StartsWith(LegacyLlmErrorPrefix, StringComparison.Ordinal)) + { + var legacyMessage = content[LegacyLlmErrorPrefix.Length..].Trim(); + var separatorIndex = legacyMessage.IndexOf(':', StringComparison.Ordinal); + if (separatorIndex > 0) + { + var candidateReasonCode = legacyMessage[..separatorIndex].Trim(); + if (IsKnownReasonCode(candidateReasonCode)) + { + return ( + GAgentRunTerminalStatus.Failed, + candidateReasonCode, + legacyMessage[(separatorIndex + 1)..].Trim()); + } + } + + return ( + GAgentRunTerminalStatus.Failed, + ReasonCodeLegacyLlmError, + legacyMessage); + } + + if (content.StartsWith(LegacyLlmFailedPrefix, StringComparison.Ordinal)) + { + return ( + GAgentRunTerminalStatus.Failed, + ReasonCodeLegacyLlmError, + content.Trim()); + } + + return (GAgentRunTerminalStatus.TextMessageCompleted, string.Empty, string.Empty); + } + + private static bool IsKnownReasonCode(string reasonCode) => + reasonCode is + "approval_continuation_failed" or + "approval_denied" or + "approval_timeout"; +} diff --git a/src/platform/Aevatar.GAgentService.Projection/Queries/GAgentRunTerminalQueryReader.cs b/src/platform/Aevatar.GAgentService.Projection/Queries/GAgentRunTerminalQueryReader.cs new file mode 100644 index 000000000..dedbdc917 --- /dev/null +++ b/src/platform/Aevatar.GAgentService.Projection/Queries/GAgentRunTerminalQueryReader.cs @@ -0,0 +1,96 @@ +using Aevatar.CQRS.Projection.Stores.Abstractions; +using Aevatar.GAgentService.Abstractions.ScopeGAgents; +using Aevatar.GAgentService.Projection.Configuration; +using Aevatar.GAgentService.Projection.Projectors; +using Aevatar.GAgentService.Projection.ReadModels; + +namespace Aevatar.GAgentService.Projection.Queries; + +public sealed class GAgentRunTerminalQueryReader : IGAgentRunTerminalQueryPort +{ + private readonly IProjectionDocumentReader _documentStore; + private readonly bool _enabled; + + public GAgentRunTerminalQueryReader( + IProjectionDocumentReader documentStore, + ServiceProjectionOptions? options = null) + { + _documentStore = documentStore ?? throw new ArgumentNullException(nameof(documentStore)); + _enabled = options?.Enabled ?? true; + } + + public async Task GetByCorrelationIdAsync( + string actorId, + string correlationId, + CancellationToken ct = default) + { + if (!_enabled || + string.IsNullOrWhiteSpace(actorId) || + string.IsNullOrWhiteSpace(correlationId)) + { + return null; + } + + var readModel = await _documentStore.GetAsync( + GAgentRunTerminalProjector.BuildDocumentId(actorId.Trim(), correlationId.Trim()), + ct); + return readModel == null ? null : Map(readModel); + } + + public async Task GetBySessionIdAsync( + string actorId, + string sessionId, + CancellationToken ct = default) + { + if (!_enabled || + string.IsNullOrWhiteSpace(actorId) || + string.IsNullOrWhiteSpace(sessionId)) + { + return null; + } + + var result = await _documentStore.QueryAsync( + new ProjectionDocumentQuery + { + Take = 1, + Filters = + [ + new ProjectionDocumentFilter + { + FieldPath = nameof(GAgentRunTerminalReadModel.ActorId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(actorId.Trim()), + }, + new ProjectionDocumentFilter + { + FieldPath = nameof(GAgentRunTerminalReadModel.SessionId), + Operator = ProjectionDocumentFilterOperator.Eq, + Value = ProjectionDocumentValue.FromString(sessionId.Trim()), + }, + ], + Sorts = + [ + new ProjectionDocumentSort + { + FieldPath = nameof(GAgentRunTerminalReadModel.ObservedAt), + Direction = ProjectionDocumentSortDirection.Desc, + }, + ], + }, + ct); + return result.Items.Select(Map).FirstOrDefault(); + } + + private static GAgentRunTerminalSnapshot Map(GAgentRunTerminalReadModel readModel) => + new( + readModel.ActorId, + readModel.SessionId, + readModel.CorrelationId, + (GAgentRunTerminalInteractionKind)readModel.InteractionKind, + (GAgentRunTerminalStatus)readModel.Status, + readModel.ReasonCode, + readModel.ReasonMessage, + readModel.StateVersion, + readModel.LastEventId, + readModel.ObservedAt); +} diff --git a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs index 3347f068a..32f2b59af 100644 --- a/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs +++ b/src/platform/Aevatar.GAgentService.Projection/ReadModels/ServiceProjectionReadModels.Partial.cs @@ -217,6 +217,21 @@ public DateTimeOffset UpdatedAt } } +public sealed partial class GAgentRunTerminalReadModel : IProjectionReadModel +{ + public DateTimeOffset UpdatedAt + { + get => ObservedAt; + set => ObservedAt = value; + } + + public DateTimeOffset ObservedAt + { + get => ServiceProjectionReadModelSupport.ToDateTimeOffset(ObservedAtUtcValue); + set => ObservedAtUtcValue = ServiceProjectionReadModelSupport.ToTimestamp(value); + } +} + internal static class ServiceProjectionReadModelSupport { public static Timestamp ToTimestamp(DateTimeOffset value) => diff --git a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto index 684eb1c82..da0e0e273 100644 --- a/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto +++ b/src/platform/Aevatar.GAgentService.Projection/service_projection_read_models.proto @@ -203,3 +203,19 @@ message ServiceRunCurrentStateReadModel { google.protobuf.Timestamp created_at_utc_value = 20; google.protobuf.Timestamp updated_at_utc_value = 21; } + +// --- GAgentRunTerminalReadModel --- + +message GAgentRunTerminalReadModel { + string id = 1; + string actor_id = 2; + int64 state_version = 3; + string last_event_id = 4; + string session_id = 5; + string correlation_id = 6; + int32 interaction_kind = 7; + int32 status = 8; + string reason_code = 9; + string reason_message = 10; + google.protobuf.Timestamp observed_at_utc_value = 11; +} diff --git a/test/Aevatar.AI.Tests/RoleGAgentStateCoverageTests.cs b/test/Aevatar.AI.Tests/RoleGAgentStateCoverageTests.cs index 7978e82f1..62421f59e 100644 --- a/test/Aevatar.AI.Tests/RoleGAgentStateCoverageTests.cs +++ b/test/Aevatar.AI.Tests/RoleGAgentStateCoverageTests.cs @@ -175,6 +175,8 @@ await agent.HandleToolApprovalDecision(new ToolApprovalDecisionEvent }); agent.State.PendingApproval.Should().BeNull(); + agent.State.Sessions["session-a"].Completed.Should().BeTrue(); + agent.State.Sessions["session-a"].FinalContent.Should().Contain("approval_denied: user denied"); } [Fact] @@ -258,6 +260,8 @@ await FluentActions.Invoking(() => agent.HandleToolApprovalDecision(new ToolAppr .WithMessage("dispatch failed"); agent.State.PendingApproval.Should().BeNull(); + agent.State.Sessions["session-a"].Completed.Should().BeTrue(); + agent.State.Sessions["session-a"].FinalContent.Should().Contain("approval_continuation_failed: dispatch failed"); AgentToolRequestContext.CurrentMetadata.Should().BeNull(); } @@ -316,6 +320,8 @@ await agent.HandleToolApprovalTimeout(new ToolApprovalTimeoutFiredEvent }); agent.State.PendingApproval.Should().BeNull(); + agent.State.Sessions["session-a"].Completed.Should().BeTrue(); + agent.State.Sessions["session-a"].FinalContent.Should().Contain("approval_timeout: Tool approval timed out and no remote approval handler is configured."); } [Fact] @@ -344,6 +350,8 @@ await agent.HandleToolApprovalTimeout(new ToolApprovalTimeoutFiredEvent }); agent.State.PendingApproval.Should().BeNull(); + agent.State.Sessions["session-a"].Completed.Should().BeTrue(); + agent.State.Sessions["session-a"].FinalContent.Should().Contain("approval_denied: not approved"); AgentToolRequestContext.CurrentMetadata.Should().BeNull(); } diff --git a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionRuntimeRegistrationTests.cs b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionRuntimeRegistrationTests.cs index 34fa00cd2..19111cef7 100644 --- a/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionRuntimeRegistrationTests.cs +++ b/test/Aevatar.CQRS.Projection.Core.Tests/ProjectionRuntimeRegistrationTests.cs @@ -56,6 +56,53 @@ public async Task AddProjectionMaterializationRuntimeCore_ShouldRegisterLifecycl dispatchPort.Dispatched[1].command.Payload!.Unpack().ProjectionKind.Should().Be("projection-a"); } + [Fact] + public async Task AddProjectionMaterializationRuntimeCore_ShouldReleaseSessionScopedMaterialization() + { + var runtime = new RecordingActorRuntime(); + var dispatchPort = new RecordingActorDispatchPort(); + var services = new ServiceCollection(); + services.AddSingleton(runtime); + services.AddSingleton(dispatchPort); + + services.AddProjectionMaterializationRuntimeCore< + TestSessionScopedMaterializationContext, + TestSessionScopedMaterializationLease, + ProjectionMaterializationScopeGAgent>( + scopeKey => new TestSessionScopedMaterializationContext + { + RootActorId = scopeKey.RootActorId, + ProjectionKind = scopeKey.ProjectionKind, + SessionId = scopeKey.SessionId, + }, + context => new TestSessionScopedMaterializationLease(context)); + + await using var provider = services.BuildServiceProvider(); + var activation = provider.GetRequiredService>(); + var release = provider.GetRequiredService>(); + + var scopeKey = new ProjectionRuntimeScopeKey( + "actor-1", + "projection-a", + ProjectionRuntimeMode.DurableMaterialization, + "correlation-1"); + var lease = await activation.EnsureAsync(new ProjectionScopeStartRequest + { + RootActorId = scopeKey.RootActorId, + ProjectionKind = scopeKey.ProjectionKind, + Mode = scopeKey.Mode, + SessionId = scopeKey.SessionId, + }); + await release.ReleaseIfIdleAsync(lease); + + runtime.CreatedActorIds.Should().ContainSingle() + .Which.Should().Be(ProjectionScopeActorId.Build(scopeKey)); + dispatchPort.Dispatched.Should().HaveCount(2); + dispatchPort.Dispatched[1].actorId.Should().Be(ProjectionScopeActorId.Build(scopeKey)); + dispatchPort.Dispatched[1].command.Payload!.Unpack().SessionId + .Should().Be("correlation-1"); + } + [Fact] public async Task AddEventSinkProjectionRuntimeCore_ShouldRegisterSessionLifecycleAndSessionScopeContext() { @@ -241,6 +288,28 @@ public TestMaterializationLease(TestMaterializationContext context) public TestMaterializationContext Context { get; } } + private sealed class TestSessionScopedMaterializationContext : IProjectionSessionScopedMaterializationContext + { + public string RootActorId { get; init; } = string.Empty; + + public string ProjectionKind { get; init; } = string.Empty; + + public string SessionId { get; init; } = string.Empty; + } + + private sealed class TestSessionScopedMaterializationLease + : ProjectionRuntimeLeaseBase, + IProjectionContextRuntimeLease + { + public TestSessionScopedMaterializationLease(TestSessionScopedMaterializationContext context) + : base(context.RootActorId) + { + Context = context; + } + + public TestSessionScopedMaterializationContext Context { get; } + } + private sealed class TestSessionContext : IProjectionSessionContext { public string RootActorId { get; init; } = string.Empty; diff --git a/test/Aevatar.GAgentService.Integration.Tests/GAgentServiceHostingServiceCollectionExtensionsTests.cs b/test/Aevatar.GAgentService.Integration.Tests/GAgentServiceHostingServiceCollectionExtensionsTests.cs index 264fef19f..9338c9a5c 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/GAgentServiceHostingServiceCollectionExtensionsTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/GAgentServiceHostingServiceCollectionExtensionsTests.cs @@ -1,5 +1,6 @@ #pragma warning disable CS0618 // Tests exercise legacy migration utilities pending removal using Aevatar.GAgentService.Abstractions.Ports; +using Aevatar.GAgentService.Abstractions.ScopeGAgents; using Aevatar.GAgentService.Governance.Abstractions.Ports; using Aevatar.GAgentService.Governance.Hosting.DependencyInjection; using Aevatar.GAgentService.Governance.Hosting.Migration; @@ -63,6 +64,7 @@ public void AddGAgentServiceCapability_ShouldRegisterCorePortsAndAdapters() using var provider = services.BuildServiceProvider(); provider.GetRequiredService().Should().NotBeNull(); + provider.GetRequiredService().Should().NotBeNull(); } [Fact] @@ -314,6 +316,7 @@ public void AddGAgentServiceProjectionReadModelProviders_ShouldRegisterElasticse provider.GetRequiredService>().Should().NotBeNull(); provider.GetRequiredService>().Should().NotBeNull(); provider.GetRequiredService>().Should().NotBeNull(); + provider.GetRequiredService>().Should().NotBeNull(); } [Fact] diff --git a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs index 9c64eda14..b7a6794bc 100644 --- a/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs +++ b/test/Aevatar.GAgentService.Integration.Tests/ScopeServiceEndpointsStreamTests.cs @@ -701,9 +701,12 @@ private static ICommandInteractionService( new GAgentDraftRunCommandTargetResolver( runtime, - projectionPort), + projectionPort, + new StubGAgentRunTerminalProjectionPort()), new DefaultCommandContextPolicy(), - new GAgentDraftRunCommandTargetBinder(projectionPort), + new GAgentDraftRunCommandTargetBinder( + projectionPort, + new StubGAgentRunTerminalProjectionPort()), new GAgentDraftRunCommandEnvelopeFactory(), new ActorCommandTargetDispatcher(new InlineActorDispatchPort(runtime)), new GAgentDraftRunAcceptedReceiptFactory()); @@ -713,7 +716,7 @@ private static ICommandInteractionService(new IdentityEventFrameMapper()), new GAgentDraftRunCompletionPolicy(), new GAgentDraftRunFinalizeEmitter(), - new GAgentDraftRunDurableCompletionResolver(), + new GAgentDraftRunDurableCompletionResolver(new StubGAgentRunTerminalQueryPort()), NullLogger>.Instance); } @@ -835,6 +838,59 @@ public Task ReleaseActorProjectionAsync( private sealed record StubDraftRunProjectionLease(string ActorId, string CommandId) : IGAgentDraftRunProjectionLease; + private sealed class StubGAgentRunTerminalProjectionPort : IGAgentRunTerminalProjectionPort + { + public Task EnsureProjectionAsync( + string actorId, + string correlationId, + GAgentRunTerminalInteractionKind interactionKind, + CancellationToken ct = default) + { + _ = ct; + return Task.FromResult( + new StubGAgentRunTerminalProjectionLease(actorId, correlationId, interactionKind)); + } + + public Task ReleaseProjectionAsync( + IGAgentRunTerminalProjectionLease lease, + CancellationToken ct = default) + { + _ = lease; + _ = ct; + return Task.CompletedTask; + } + } + + private sealed record StubGAgentRunTerminalProjectionLease( + string ActorId, + string CorrelationId, + GAgentRunTerminalInteractionKind InteractionKind) : IGAgentRunTerminalProjectionLease; + + private sealed class StubGAgentRunTerminalQueryPort : IGAgentRunTerminalQueryPort + { + public Task GetByCorrelationIdAsync( + string actorId, + string correlationId, + CancellationToken ct = default) + { + _ = actorId; + _ = correlationId; + _ = ct; + return Task.FromResult(null); + } + + public Task GetBySessionIdAsync( + string actorId, + string sessionId, + CancellationToken ct = default) + { + _ = actorId; + _ = sessionId; + _ = ct; + return Task.FromResult(null); + } + } + private sealed class StubScriptExecutionProjectionPort : IScriptExecutionProjectionPort { public List Messages { get; } = []; diff --git a/test/Aevatar.GAgentService.Tests/Application/GAgentApprovalInteractionTests.cs b/test/Aevatar.GAgentService.Tests/Application/GAgentApprovalInteractionTests.cs index 9c53afaf9..8074dc5ef 100644 --- a/test/Aevatar.GAgentService.Tests/Application/GAgentApprovalInteractionTests.cs +++ b/test/Aevatar.GAgentService.Tests/Application/GAgentApprovalInteractionTests.cs @@ -19,7 +19,8 @@ public async Task Resolver_ShouldReturnActorNotFound_WhenActorDoesNotExist() { var resolver = new GAgentApprovalCommandTargetResolver( new ApprovalStubActorRuntime(), - new ApprovalProjectionPort()); + new ApprovalProjectionPort(), + new ApprovalTerminalProjectionPort()); var result = await resolver.ResolveAsync( new GAgentApprovalCommand("actor-1", "req-1"), @@ -35,7 +36,8 @@ public async Task Resolver_ShouldReturnTarget_WhenActorExists() var actor = new ApprovalStubActor("actor-1", new ApprovalStubAgent()); var resolver = new GAgentApprovalCommandTargetResolver( new ApprovalStubActorRuntime(actor), - new ApprovalProjectionPort()); + new ApprovalProjectionPort(), + new ApprovalTerminalProjectionPort()); var result = await resolver.ResolveAsync( new GAgentApprovalCommand(" actor-1 ", "req-1"), @@ -54,10 +56,12 @@ public async Task Binder_ShouldBindProjectionLeaseAndLiveSink_WhenProjectionIsAv { LeaseToReturn = new ApprovalProjectionLease("actor-1", "cmd-1"), }; - var binder = new GAgentApprovalCommandTargetBinder(projectionPort); + var terminalPort = new ApprovalTerminalProjectionPort(); + var binder = new GAgentApprovalCommandTargetBinder(projectionPort, terminalPort); var target = new GAgentApprovalCommandTarget( new ApprovalStubActor("actor-1", new ApprovalStubAgent()), - projectionPort); + projectionPort, + terminalPort); var result = await binder.BindAsync( new GAgentApprovalCommand("actor-1", "req-1"), @@ -68,8 +72,12 @@ public async Task Binder_ShouldBindProjectionLeaseAndLiveSink_WhenProjectionIsAv result.Succeeded.Should().BeTrue(); target.ProjectionLease.Should().BeSameAs(projectionPort.LeaseToReturn); target.LiveSink.Should().NotBeNull(); - projectionPort.EnsureCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.commandId == "cmd-1"); + projectionPort.EnsureCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.commandId == "corr-1"); projectionPort.AttachCalls.Should().ContainSingle(); + terminalPort.Calls.Should().ContainSingle(x => + x.actorId == "actor-1" && + x.correlationId == "corr-1" && + x.interactionKind == GAgentRunTerminalInteractionKind.Approval); } [Fact] @@ -79,10 +87,12 @@ public async Task Binder_ShouldThrow_WhenProjectionPipelineIsUnavailable() { LeaseToReturn = null, }; - var binder = new GAgentApprovalCommandTargetBinder(projectionPort); + var terminalPort = new ApprovalTerminalProjectionPort(); + var binder = new GAgentApprovalCommandTargetBinder(projectionPort, terminalPort); var target = new GAgentApprovalCommandTarget( new ApprovalStubActor("actor-1", new ApprovalStubAgent()), - projectionPort); + projectionPort, + terminalPort); var act = async () => await binder.BindAsync( new GAgentApprovalCommand("actor-1", "req-1"), @@ -93,18 +103,30 @@ public async Task Binder_ShouldThrow_WhenProjectionPipelineIsUnavailable() await act.Should().ThrowAsync() .WithMessage("GAgent approval projection pipeline is unavailable."); projectionPort.AttachCalls.Should().BeEmpty(); + terminalPort.Calls.Should().ContainSingle(x => + x.actorId == "actor-1" && + x.correlationId == "corr-1" && + x.interactionKind == GAgentRunTerminalInteractionKind.Approval); + terminalPort.ReleaseCalls.Should().ContainSingle(); } [Fact] public async Task CleanupAfterDispatchFailureAsync_ShouldDetachReleaseAndDisposeBoundObservation() { var projectionPort = new ApprovalProjectionPort(); + var terminalPort = new ApprovalTerminalProjectionPort(); var target = new GAgentApprovalCommandTarget( new ApprovalStubActor("actor-1", new ApprovalStubAgent()), - projectionPort); + projectionPort, + terminalPort); var sink = new RecordingAguiEventSink(); var lease = new ApprovalProjectionLease("actor-1", "cmd-1"); - target.BindLiveObservation(lease, sink); + var terminalLease = new ApprovalTerminalProjectionLease( + "actor-1", + "corr-1", + GAgentRunTerminalInteractionKind.Approval); + target.BindTerminalProjection(terminalLease); + target.BindLiveObservation(lease, sink, "session-1"); await target.CleanupAfterDispatchFailureAsync(CancellationToken.None); @@ -114,6 +136,8 @@ public async Task CleanupAfterDispatchFailureAsync_ShouldDetachReleaseAndDispose sink.DisposeCalls.Should().Be(1); target.ProjectionLease.Should().BeNull(); target.LiveSink.Should().BeNull(); + terminalPort.ReleaseCalls.Should().ContainSingle(x => ReferenceEquals(x, terminalLease)); + target.TerminalProjectionLease.Should().BeNull(); } [Fact] @@ -121,7 +145,8 @@ public void RequireLiveSink_ShouldThrow_WhenObservationIsNotBound() { var target = new GAgentApprovalCommandTarget( new ApprovalStubActor("actor-1", new ApprovalStubAgent()), - new ApprovalProjectionPort()); + new ApprovalProjectionPort(), + new ApprovalTerminalProjectionPort()); var act = () => target.RequireLiveSink(); @@ -152,14 +177,15 @@ public void ReceiptFactory_ShouldCreateAcceptedReceipt() { var target = new GAgentApprovalCommandTarget( new ApprovalStubActor("actor-1", new ApprovalStubAgent()), - new ApprovalProjectionPort()); + new ApprovalProjectionPort(), + new ApprovalTerminalProjectionPort()); var factory = new GAgentApprovalAcceptedReceiptFactory(); var receipt = factory.Create( target, new CommandContext("actor-1", "cmd-1", "corr-1", new Dictionary())); - receipt.Should().Be(new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1")); + receipt.Should().Be(new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1", string.Empty)); } [Fact] @@ -188,7 +214,7 @@ public void CompletionPolicy_ShouldResolveTerminalEvents() public async Task FinalizeEmitter_ShouldEmitRunFinished_OnlyForCompletedTextMessages() { var emitter = new GAgentApprovalFinalizeEmitter(); - var receipt = new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1"); + var receipt = new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1", "session-1"); var emitted = new List(); await emitter.EmitAsync( @@ -221,15 +247,113 @@ await emitter.EmitAsync( } [Fact] - public async Task DurableCompletionResolver_ShouldAlwaysReturnIncomplete() + public async Task DurableCompletionResolver_ShouldResolveTerminalSnapshot() { - var resolver = new GAgentApprovalDurableCompletionResolver(); + var queryPort = new ApprovalTerminalQueryPort + { + CorrelationSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "corr-1", + GAgentRunTerminalInteractionKind.Approval, + GAgentRunTerminalStatus.Failed, + "approval_denied", + "denied", + 2, + "evt-1", + DateTimeOffset.UtcNow), + }; + var resolver = new GAgentApprovalDurableCompletionResolver(queryPort); + + var result = await resolver.ResolveAsync( + new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1", "session-1"), + CancellationToken.None); + + result.Should().Be(new CommandDurableCompletionObservation( + true, + GAgentApprovalCompletionStatus.Failed)); + queryPort.CorrelationCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.correlationId == "corr-1"); + } + + [Fact] + public async Task DurableCompletionResolver_ShouldIgnoreSessionFallback_WhenCorrelationDiffers() + { + var queryPort = new ApprovalTerminalQueryPort + { + SessionSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "old-corr", + GAgentRunTerminalInteractionKind.Approval, + GAgentRunTerminalStatus.Failed, + "approval_denied", + "denied", + 2, + "evt-1", + DateTimeOffset.UtcNow), + }; + var resolver = new GAgentApprovalDurableCompletionResolver(queryPort); var result = await resolver.ResolveAsync( - new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1"), + new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1", "session-1"), CancellationToken.None); result.Should().Be(CommandDurableCompletionObservation.Incomplete); + queryPort.SessionCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.sessionId == "session-1"); + } + + [Fact] + public async Task DurableCompletionResolver_ShouldIgnoreSessionFallback_WhenInteractionKindDiffers() + { + var queryPort = new ApprovalTerminalQueryPort + { + SessionSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "corr-1", + GAgentRunTerminalInteractionKind.DraftRun, + GAgentRunTerminalStatus.Failed, + string.Empty, + string.Empty, + 2, + "evt-1", + DateTimeOffset.UtcNow), + }; + var resolver = new GAgentApprovalDurableCompletionResolver(queryPort); + + var result = await resolver.ResolveAsync( + new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1", "session-1"), + CancellationToken.None); + + result.Should().Be(CommandDurableCompletionObservation.Incomplete); + } + + [Fact] + public async Task DurableCompletionResolver_ShouldUseSessionFallback_WhenReceiptMatches() + { + var queryPort = new ApprovalTerminalQueryPort + { + SessionSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "corr-1", + GAgentRunTerminalInteractionKind.Approval, + GAgentRunTerminalStatus.RunFinished, + string.Empty, + string.Empty, + 2, + "evt-1", + DateTimeOffset.UtcNow), + }; + var resolver = new GAgentApprovalDurableCompletionResolver(queryPort); + + var result = await resolver.ResolveAsync( + new GAgentApprovalAcceptedReceipt("actor-1", "cmd-1", "corr-1", "session-1"), + CancellationToken.None); + + result.Should().Be(new CommandDurableCompletionObservation( + true, + GAgentApprovalCompletionStatus.RunFinished)); } private sealed class ApprovalProjectionPort : IGAgentDraftRunProjectionPort @@ -279,6 +403,62 @@ public Task ReleaseActorProjectionAsync( private sealed record ApprovalProjectionLease(string ActorId, string CommandId) : IGAgentDraftRunProjectionLease; + private sealed class ApprovalTerminalProjectionPort : IGAgentRunTerminalProjectionPort + { + public List<(string actorId, string correlationId, GAgentRunTerminalInteractionKind interactionKind)> Calls { get; } = []; + public List ReleaseCalls { get; } = []; + + public Task EnsureProjectionAsync( + string actorId, + string correlationId, + GAgentRunTerminalInteractionKind interactionKind, + CancellationToken ct = default) + { + Calls.Add((actorId, correlationId, interactionKind)); + return Task.FromResult( + new ApprovalTerminalProjectionLease(actorId, correlationId, interactionKind)); + } + + public Task ReleaseProjectionAsync( + IGAgentRunTerminalProjectionLease lease, + CancellationToken ct = default) + { + ReleaseCalls.Add(lease); + return Task.CompletedTask; + } + } + + private sealed record ApprovalTerminalProjectionLease( + string ActorId, + string CorrelationId, + GAgentRunTerminalInteractionKind InteractionKind) : IGAgentRunTerminalProjectionLease; + + private sealed class ApprovalTerminalQueryPort : IGAgentRunTerminalQueryPort + { + public GAgentRunTerminalSnapshot? CorrelationSnapshot { get; init; } + public GAgentRunTerminalSnapshot? SessionSnapshot { get; init; } + public List<(string actorId, string correlationId)> CorrelationCalls { get; } = []; + public List<(string actorId, string sessionId)> SessionCalls { get; } = []; + + public Task GetByCorrelationIdAsync( + string actorId, + string correlationId, + CancellationToken ct = default) + { + CorrelationCalls.Add((actorId, correlationId)); + return Task.FromResult(CorrelationSnapshot); + } + + public Task GetBySessionIdAsync( + string actorId, + string sessionId, + CancellationToken ct = default) + { + SessionCalls.Add((actorId, sessionId)); + return Task.FromResult(SessionSnapshot); + } + } + private sealed class ApprovalStubActorRuntime(params IActor[] actors) : IActorRuntime { private readonly Dictionary _actors = actors.ToDictionary(x => x.Id, StringComparer.Ordinal); diff --git a/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionCoverageTests.cs b/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionCoverageTests.cs index 55ce8dc07..f8b892dbe 100644 --- a/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionCoverageTests.cs +++ b/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionCoverageTests.cs @@ -21,7 +21,8 @@ public async Task Resolver_ShouldReturnUnknownActorType_WhenTypeCannotBeResolved { var resolver = new GAgentDraftRunCommandTargetResolver( new DraftRunStubActorRuntime(), - new DraftRunProjectionPort()); + new DraftRunProjectionPort(), + new RecordingGAgentRunTerminalProjectionPort()); var result = await resolver.ResolveAsync( new GAgentDraftRunCommand("scope-a", "missing-type", "hello"), @@ -37,7 +38,8 @@ public async Task Resolver_ShouldCreatePreferredActor_WhenMissing() var runtime = new DraftRunStubActorRuntime(); var resolver = new GAgentDraftRunCommandTargetResolver( runtime, - new DraftRunProjectionPort()); + new DraftRunProjectionPort(), + new RecordingGAgentRunTerminalProjectionPort()); var result = await resolver.ResolveAsync( new GAgentDraftRunCommand( @@ -58,13 +60,20 @@ public async Task Resolver_ShouldCreatePreferredActor_WhenMissing() public async Task CommandTargetCleanup_ShouldDetachReleaseAndDisposeBoundObservation() { var projectionPort = new DraftRunProjectionPort(); + var terminalPort = new RecordingGAgentRunTerminalProjectionPort(); var target = new GAgentDraftRunCommandTarget( new DraftRunStubActor("actor-1", new DraftRunExpectedAgent()), typeof(DraftRunExpectedAgent).AssemblyQualifiedName!, - projectionPort); + projectionPort, + terminalPort); var lease = new DraftRunProjectionLease("actor-1", "cmd-1"); + var terminalLease = new RecordingGAgentRunTerminalProjectionLease( + "actor-1", + "corr-1", + GAgentRunTerminalInteractionKind.DraftRun); var sink = new DraftRunRecordingSink(); - target.BindLiveObservation(lease, sink); + target.BindTerminalProjection(terminalLease); + target.BindLiveObservation(lease, sink, "session-1"); await target.CleanupAfterDispatchFailureAsync(CancellationToken.None); @@ -74,17 +83,21 @@ public async Task CommandTargetCleanup_ShouldDetachReleaseAndDisposeBoundObserva sink.DisposeCalls.Should().Be(1); target.ProjectionLease.Should().BeNull(); target.LiveSink.Should().BeNull(); + terminalPort.ReleaseCalls.Should().ContainSingle(x => ReferenceEquals(x, terminalLease)); + target.TerminalProjectionLease.Should().BeNull(); } [Fact] public async Task Binder_ShouldThrow_WhenProjectionPipelineIsUnavailable() { var projectionPort = new DraftRunProjectionPort { LeaseToReturn = null }; - var binder = new GAgentDraftRunCommandTargetBinder(projectionPort); + var terminalPort = new RecordingGAgentRunTerminalProjectionPort(); + var binder = new GAgentDraftRunCommandTargetBinder(projectionPort, terminalPort); var target = new GAgentDraftRunCommandTarget( new DraftRunStubActor("actor-1", new DraftRunExpectedAgent()), typeof(DraftRunExpectedAgent).AssemblyQualifiedName!, - projectionPort); + projectionPort, + terminalPort); var act = async () => await binder.BindAsync( new GAgentDraftRunCommand("scope-a", typeof(DraftRunExpectedAgent).AssemblyQualifiedName!, "hello"), @@ -94,6 +107,11 @@ public async Task Binder_ShouldThrow_WhenProjectionPipelineIsUnavailable() await act.Should().ThrowAsync() .WithMessage("GAgent draft-run projection pipeline is unavailable."); + terminalPort.Calls.Should().ContainSingle(x => + x.actorId == "actor-1" && + x.correlationId == "corr-1" && + x.interactionKind == GAgentRunTerminalInteractionKind.DraftRun); + terminalPort.ReleaseCalls.Should().ContainSingle(); } [Fact] @@ -176,13 +194,14 @@ public async Task ReceiptFactoryCompletionPolicyFinalizeEmitterAndDurableResolve var target = new GAgentDraftRunCommandTarget( new DraftRunStubActor("actor-1", new DraftRunExpectedAgent()), "actor-type", - new DraftRunProjectionPort()); + new DraftRunProjectionPort(), + new RecordingGAgentRunTerminalProjectionPort()); var receiptFactory = new GAgentDraftRunAcceptedReceiptFactory(); var receipt = receiptFactory.Create( target, new CommandContext("actor-1", "cmd-1", "corr-1", new Dictionary())); - receipt.Should().Be(new GAgentDraftRunAcceptedReceipt("actor-1", "actor-type", "cmd-1", "corr-1")); + receipt.Should().Be(new GAgentDraftRunAcceptedReceipt("actor-1", "actor-type", "cmd-1", "corr-1", string.Empty)); var completionPolicy = new GAgentDraftRunCompletionPolicy(); completionPolicy.TryResolve(new AGUIEvent { TextMessageEnd = new Aevatar.Presentation.AGUI.TextMessageEndEvent() }, out var textCompletion) @@ -214,9 +233,134 @@ await emitter.EmitAsync( emitted[0].RunFinished.ThreadId.Should().Be("actor-1"); emitted[0].RunFinished.RunId.Should().Be("cmd-1"); - var durableResolver = new GAgentDraftRunDurableCompletionResolver(); + var terminalQuery = new RecordingGAgentRunTerminalQueryPort + { + CorrelationSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "corr-1", + GAgentRunTerminalInteractionKind.DraftRun, + GAgentRunTerminalStatus.TextMessageCompleted, + string.Empty, + string.Empty, + 3, + "evt-1", + DateTimeOffset.UtcNow), + }; + var durableResolver = new GAgentDraftRunDurableCompletionResolver(terminalQuery); (await durableResolver.ResolveAsync(receipt, CancellationToken.None)) - .Should().Be(CommandDurableCompletionObservation.Incomplete); + .Should().Be(new CommandDurableCompletionObservation( + true, + GAgentDraftRunCompletionStatus.TextMessageCompleted)); + terminalQuery.CorrelationCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.correlationId == "corr-1"); + } + + [Fact] + public async Task DurableCompletionResolver_ShouldIgnoreSessionFallback_WhenCorrelationDiffers() + { + var terminalQuery = new RecordingGAgentRunTerminalQueryPort + { + SessionSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "old-corr", + GAgentRunTerminalInteractionKind.DraftRun, + GAgentRunTerminalStatus.TextMessageCompleted, + string.Empty, + string.Empty, + 3, + "evt-1", + DateTimeOffset.UtcNow), + }; + var durableResolver = new GAgentDraftRunDurableCompletionResolver(terminalQuery); + + var result = await durableResolver.ResolveAsync( + new GAgentDraftRunAcceptedReceipt("actor-1", "actor-type", "cmd-1", "corr-1", "session-1"), + CancellationToken.None); + + result.Should().Be(CommandDurableCompletionObservation.Incomplete); + terminalQuery.SessionCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.sessionId == "session-1"); + } + + [Fact] + public async Task DurableCompletionResolver_ShouldIgnoreSessionFallback_WhenInteractionKindDiffers() + { + var terminalQuery = new RecordingGAgentRunTerminalQueryPort + { + SessionSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "corr-1", + GAgentRunTerminalInteractionKind.Approval, + GAgentRunTerminalStatus.TextMessageCompleted, + string.Empty, + string.Empty, + 3, + "evt-1", + DateTimeOffset.UtcNow), + }; + var durableResolver = new GAgentDraftRunDurableCompletionResolver(terminalQuery); + + var result = await durableResolver.ResolveAsync( + new GAgentDraftRunAcceptedReceipt("actor-1", "actor-type", "cmd-1", "corr-1", "session-1"), + CancellationToken.None); + + result.Should().Be(CommandDurableCompletionObservation.Incomplete); + } + + [Fact] + public async Task DurableCompletionResolver_ShouldUseSessionFallback_WhenReceiptMatches() + { + var terminalQuery = new RecordingGAgentRunTerminalQueryPort + { + SessionSnapshot = new GAgentRunTerminalSnapshot( + "actor-1", + "session-1", + "corr-1", + GAgentRunTerminalInteractionKind.DraftRun, + GAgentRunTerminalStatus.RunFinished, + string.Empty, + string.Empty, + 3, + "evt-1", + DateTimeOffset.UtcNow), + }; + var durableResolver = new GAgentDraftRunDurableCompletionResolver(terminalQuery); + + var result = await durableResolver.ResolveAsync( + new GAgentDraftRunAcceptedReceipt("actor-1", "actor-type", "cmd-1", "corr-1", "session-1"), + CancellationToken.None); + + result.Should().Be(new CommandDurableCompletionObservation( + true, + GAgentDraftRunCompletionStatus.RunFinished)); + } + + [Fact] + public async Task Binder_ShouldActivateTerminalMaterialization_BeforeLiveObservation() + { + var projectionPort = new DraftRunProjectionPort(); + var terminalPort = new RecordingGAgentRunTerminalProjectionPort(); + var binder = new GAgentDraftRunCommandTargetBinder(projectionPort, terminalPort); + var target = new GAgentDraftRunCommandTarget( + new DraftRunStubActor("actor-1", new DraftRunExpectedAgent()), + typeof(DraftRunExpectedAgent).AssemblyQualifiedName!, + projectionPort, + terminalPort); + + var result = await binder.BindAsync( + new GAgentDraftRunCommand("scope-a", typeof(DraftRunExpectedAgent).AssemblyQualifiedName!, "hello"), + target, + new CommandContext("actor-1", "cmd-1", "corr-1", new Dictionary()), + CancellationToken.None); + + result.Succeeded.Should().BeTrue(); + terminalPort.Calls.Should().ContainSingle(x => + x.actorId == "actor-1" && + x.correlationId == "corr-1" && + x.interactionKind == GAgentRunTerminalInteractionKind.DraftRun); + projectionPort.EnsureCalls.Should().ContainSingle(x => x.actorId == "actor-1" && x.commandId == "cmd-1"); + projectionPort.AttachCalls.Should().ContainSingle(); } private sealed class DraftRunProjectionPort : IGAgentDraftRunProjectionPort @@ -266,6 +410,62 @@ public Task ReleaseActorProjectionAsync( private sealed record DraftRunProjectionLease(string ActorId, string CommandId) : IGAgentDraftRunProjectionLease; + private sealed class RecordingGAgentRunTerminalProjectionPort : IGAgentRunTerminalProjectionPort + { + public List<(string actorId, string correlationId, GAgentRunTerminalInteractionKind interactionKind)> Calls { get; } = []; + public List ReleaseCalls { get; } = []; + + public Task EnsureProjectionAsync( + string actorId, + string correlationId, + GAgentRunTerminalInteractionKind interactionKind, + CancellationToken ct = default) + { + Calls.Add((actorId, correlationId, interactionKind)); + return Task.FromResult( + new RecordingGAgentRunTerminalProjectionLease(actorId, correlationId, interactionKind)); + } + + public Task ReleaseProjectionAsync( + IGAgentRunTerminalProjectionLease lease, + CancellationToken ct = default) + { + ReleaseCalls.Add(lease); + return Task.CompletedTask; + } + } + + private sealed record RecordingGAgentRunTerminalProjectionLease( + string ActorId, + string CorrelationId, + GAgentRunTerminalInteractionKind InteractionKind) : IGAgentRunTerminalProjectionLease; + + private sealed class RecordingGAgentRunTerminalQueryPort : IGAgentRunTerminalQueryPort + { + public GAgentRunTerminalSnapshot? CorrelationSnapshot { get; init; } + public GAgentRunTerminalSnapshot? SessionSnapshot { get; init; } + public List<(string actorId, string correlationId)> CorrelationCalls { get; } = []; + public List<(string actorId, string sessionId)> SessionCalls { get; } = []; + + public Task GetByCorrelationIdAsync( + string actorId, + string correlationId, + CancellationToken ct = default) + { + CorrelationCalls.Add((actorId, correlationId)); + return Task.FromResult(CorrelationSnapshot); + } + + public Task GetBySessionIdAsync( + string actorId, + string sessionId, + CancellationToken ct = default) + { + SessionCalls.Add((actorId, sessionId)); + return Task.FromResult(SessionSnapshot); + } + } + private sealed class DraftRunStubActorRuntime(params IActor[] actors) : IActorRuntime { private readonly Dictionary _actors = actors.ToDictionary(x => x.Id, StringComparer.Ordinal); diff --git a/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionTests.cs b/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionTests.cs index 1027871c3..2f83d62e5 100644 --- a/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionTests.cs +++ b/test/Aevatar.GAgentService.Tests/Application/GAgentDraftRunInteractionTests.cs @@ -14,7 +14,10 @@ public sealed class GAgentDraftRunInteractionTests public async Task Resolver_ShouldRejectExistingActor_WhenRuntimeTypeDoesNotMatchRequestedType() { var runtime = new StubActorRuntime(new StubActor("actor-1", new DifferentAgent())); - var resolver = new GAgentDraftRunCommandTargetResolver(runtime, new NoOpDraftRunProjectionPort()); + var resolver = new GAgentDraftRunCommandTargetResolver( + runtime, + new NoOpDraftRunProjectionPort(), + new NoOpGAgentRunTerminalProjectionPort()); var result = await resolver.ResolveAsync( new GAgentDraftRunCommand( @@ -36,6 +39,7 @@ public async Task Resolver_ShouldAllowExistingActor_WhenVerifierConfirmsExpected var resolver = new GAgentDraftRunCommandTargetResolver( runtime, new NoOpDraftRunProjectionPort(), + new NoOpGAgentRunTerminalProjectionPort(), new StubAgentTypeVerifier(result: true)); var result = await resolver.ResolveAsync( @@ -119,6 +123,27 @@ public Task ReleaseActorProjectionAsync( Task.CompletedTask; } + private sealed class NoOpGAgentRunTerminalProjectionPort : IGAgentRunTerminalProjectionPort + { + public Task EnsureProjectionAsync( + string actorId, + string correlationId, + GAgentRunTerminalInteractionKind interactionKind, + CancellationToken ct = default) => + Task.FromResult( + new NoOpGAgentRunTerminalProjectionLease(actorId, correlationId, interactionKind)); + + public Task ReleaseProjectionAsync( + IGAgentRunTerminalProjectionLease lease, + CancellationToken ct = default) => + Task.CompletedTask; + } + + private sealed record NoOpGAgentRunTerminalProjectionLease( + string ActorId, + string CorrelationId, + GAgentRunTerminalInteractionKind InteractionKind) : IGAgentRunTerminalProjectionLease; + private sealed class StubAgentTypeVerifier(bool result) : IAgentTypeVerifier { public Task IsExpectedAsync(string actorId, Type expectedType, CancellationToken ct = default) diff --git a/test/Aevatar.GAgentService.Tests/Projection/GAgentRunTerminalProjectorTests.cs b/test/Aevatar.GAgentService.Tests/Projection/GAgentRunTerminalProjectorTests.cs new file mode 100644 index 000000000..a0ab8b79e --- /dev/null +++ b/test/Aevatar.GAgentService.Tests/Projection/GAgentRunTerminalProjectorTests.cs @@ -0,0 +1,305 @@ +using Aevatar.AI.Abstractions; +using Aevatar.Foundation.Abstractions; +using Aevatar.GAgentService.Abstractions.ScopeGAgents; +using Aevatar.GAgentService.Projection.Contexts; +using Aevatar.GAgentService.Projection.Projectors; +using Aevatar.GAgentService.Projection.Queries; +using Aevatar.GAgentService.Projection.ReadModels; +using Aevatar.Presentation.AGUI; +using FluentAssertions; +using Google.Protobuf.WellKnownTypes; +using AiTextMessageEndEvent = Aevatar.AI.Abstractions.TextMessageEndEvent; + +namespace Aevatar.GAgentService.Tests.Projection; + +public sealed class GAgentRunTerminalProjectorTests +{ + [Fact] + public async Task ProjectAsync_ShouldMaterializeCompletedSession_FromCommittedSessionCompletion() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-05-14T00:00:00+00:00"))); + var observedAt = DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"); + + await projector.ProjectAsync( + CreateContext("actor-1", "corr-1"), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "corr-1", + Content = "done", + ContentEmitted = true, + }, + stateVersion: 4, + eventId: "evt-1", + correlationId: "corr-1", + observedAt: observedAt)); + + var doc = await store.GetAsync(GAgentRunTerminalProjector.BuildDocumentId("actor-1", "corr-1")); + doc.Should().NotBeNull(); + doc!.ActorId.Should().Be("actor-1"); + doc.SessionId.Should().Be("corr-1"); + doc.CorrelationId.Should().Be("corr-1"); + doc.Status.Should().Be((int)GAgentRunTerminalStatus.TextMessageCompleted); + doc.InteractionKind.Should().Be((int)GAgentRunTerminalInteractionKind.DraftRun); + doc.StateVersion.Should().Be(4); + doc.LastEventId.Should().Be("evt-1"); + doc.ObservedAt.Should().Be(observedAt); + } + + [Fact] + public async Task ProjectAsync_ShouldMaterializeFailedSession_FromLegacyFailureMarker() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-05-14T00:00:00+00:00"))); + + await projector.ProjectAsync( + CreateContext("actor-1", "corr-approval", GAgentRunTerminalInteractionKind.Approval), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "session-1", + Content = "[[AEVATAR_LLM_ERROR]] denied", + }, + stateVersion: 2, + eventId: "evt-failed", + correlationId: "corr-approval", + observedAt: DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"))); + + var doc = await store.GetAsync(GAgentRunTerminalProjector.BuildDocumentId("actor-1", "corr-approval")); + doc.Should().NotBeNull(); + doc!.Status.Should().Be((int)GAgentRunTerminalStatus.Failed); + doc.ReasonCode.Should().Be("legacy_llm_error"); + doc.ReasonMessage.Should().Be("denied"); + doc.InteractionKind.Should().Be((int)GAgentRunTerminalInteractionKind.Approval); + } + + [Fact] + public async Task ProjectAsync_ShouldPreserveKnownApprovalReasonCode_FromFailureMarker() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-05-14T00:00:00+00:00"))); + + await projector.ProjectAsync( + CreateContext("actor-1", "corr-approval", GAgentRunTerminalInteractionKind.Approval), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "session-1", + Content = "[[AEVATAR_LLM_ERROR]] approval_denied: User said no.", + }, + stateVersion: 3, + eventId: "evt-denied", + correlationId: "corr-approval", + observedAt: DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"))); + + var doc = await store.GetAsync(GAgentRunTerminalProjector.BuildDocumentId("actor-1", "corr-approval")); + doc.Should().NotBeNull(); + doc!.Status.Should().Be((int)GAgentRunTerminalStatus.Failed); + doc.ReasonCode.Should().Be("approval_denied"); + doc.ReasonMessage.Should().Be("User said no."); + } + + [Fact] + public async Task ProjectAsync_ShouldKeepDraftRunKind_WhenExplicitSessionDiffersFromCorrelation() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-05-14T00:00:00+00:00"))); + + await projector.ProjectAsync( + CreateContext("actor-1", "corr-1", GAgentRunTerminalInteractionKind.DraftRun), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "explicit-session-1", + Content = "done", + ContentEmitted = true, + }, + stateVersion: 5, + eventId: "evt-explicit-session", + correlationId: "corr-1", + observedAt: DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"))); + + var doc = await store.GetAsync(GAgentRunTerminalProjector.BuildDocumentId("actor-1", "corr-1")); + doc.Should().NotBeNull(); + doc!.SessionId.Should().Be("explicit-session-1"); + doc.CorrelationId.Should().Be("corr-1"); + doc.InteractionKind.Should().Be((int)GAgentRunTerminalInteractionKind.DraftRun); + } + + [Fact] + public async Task ProjectAsync_ShouldNotOverwriteNewerReadModel_WithOlderStateVersion() + { + var store = new RecordingDocumentStore(x => x.Id) + { + EnforceMonotonicWrites = true, + }; + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.Parse("2026-05-14T00:00:00+00:00"))); + + await projector.ProjectAsync( + CreateContext("actor-1", "corr-1"), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "corr-1", + Content = "newer", + ContentEmitted = true, + }, + stateVersion: 6, + eventId: "evt-newer", + correlationId: "corr-1", + observedAt: DateTimeOffset.Parse("2026-05-14T02:00:00+00:00"))); + await projector.ProjectAsync( + CreateContext("actor-1", "corr-1"), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "corr-1", + Content = "older", + ContentEmitted = true, + }, + stateVersion: 5, + eventId: "evt-older", + correlationId: "corr-1", + observedAt: DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"))); + + var doc = await store.GetAsync(GAgentRunTerminalProjector.BuildDocumentId("actor-1", "corr-1")); + doc.Should().NotBeNull(); + doc!.StateVersion.Should().Be(6); + doc.LastEventId.Should().Be("evt-newer"); + doc.ObservedAt.Should().Be(DateTimeOffset.Parse("2026-05-14T02:00:00+00:00")); + } + + [Fact] + public async Task ProjectAsync_ShouldIgnoreLiveOnlyTerminalPayloads() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.UtcNow)); + + await projector.ProjectAsync( + CreateContext("actor-1"), + new EventEnvelope + { + Id = "live", + Payload = Any.Pack(new AiTextMessageEndEvent { SessionId = "session-1", Content = "done" }), + }); + await projector.ProjectAsync( + CreateContext("actor-1"), + new EventEnvelope + { + Id = "agui", + Payload = Any.Pack(new AGUIEvent { RunError = new RunErrorEvent { Message = "boom" } }), + }); + + (await store.ReadItemsAsync()).Should().BeEmpty(); + } + + [Fact] + public async Task ProjectAsync_ShouldIgnoreCompletion_WhenCorrelationIsOutsideActivatedInteraction() + { + var store = new RecordingDocumentStore(x => x.Id); + var projector = new GAgentRunTerminalProjector( + store, + new FixedProjectionClock(DateTimeOffset.UtcNow)); + + await projector.ProjectAsync( + CreateContext("actor-1", "corr-interaction"), + WrapCommitted( + new RoleChatSessionCompletedEvent + { + SessionId = "ordinary-session", + Content = "ordinary chat done", + }, + stateVersion: 8, + eventId: "evt-ordinary", + correlationId: "ordinary-chat-correlation", + observedAt: DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"))); + + (await store.ReadItemsAsync()).Should().BeEmpty(); + } + + [Fact] + public async Task QueryReader_ShouldResolveByCorrelationId_ThenSessionId() + { + var store = new RecordingDocumentStore(x => x.Id); + var reader = new GAgentRunTerminalQueryReader(store); + var doc = new GAgentRunTerminalReadModel + { + Id = GAgentRunTerminalProjector.BuildDocumentId("actor-1", "corr-1"), + ActorId = "actor-1", + SessionId = "session-1", + CorrelationId = "corr-1", + InteractionKind = (int)GAgentRunTerminalInteractionKind.Approval, + Status = (int)GAgentRunTerminalStatus.Failed, + ReasonCode = "approval_denied", + ReasonMessage = "denied", + StateVersion = 7, + LastEventId = "evt-7", + ObservedAt = DateTimeOffset.Parse("2026-05-14T01:00:00+00:00"), + }; + await store.UpsertAsync(doc); + + var byCorrelation = await reader.GetByCorrelationIdAsync("actor-1", "corr-1"); + byCorrelation.Should().NotBeNull(); + byCorrelation!.Status.Should().Be(GAgentRunTerminalStatus.Failed); + byCorrelation.ReasonCode.Should().Be("approval_denied"); + + var bySession = await reader.GetBySessionIdAsync("actor-1", "session-1"); + bySession.Should().NotBeNull(); + bySession!.CorrelationId.Should().Be("corr-1"); + } + + private static GAgentRunTerminalProjectionContext CreateContext( + string actorId, + string correlationId = "corr-1", + GAgentRunTerminalInteractionKind interactionKind = GAgentRunTerminalInteractionKind.DraftRun) => + new() + { + RootActorId = actorId, + ProjectionKind = interactionKind == GAgentRunTerminalInteractionKind.Approval + ? "gagent-run-terminal-approval" + : "gagent-run-terminal-draft-run", + CorrelationId = correlationId, + InteractionKind = interactionKind, + }; + + private static EventEnvelope WrapCommitted( + RoleChatSessionCompletedEvent evt, + long stateVersion, + string eventId, + string correlationId, + DateTimeOffset observedAt) => + new() + { + Id = $"outer-{eventId}", + Timestamp = Timestamp.FromDateTimeOffset(observedAt), + Route = EnvelopeRouteSemantics.CreateObserverPublication("actor-1"), + Propagation = new EnvelopePropagation + { + CorrelationId = correlationId, + }, + Payload = Any.Pack(new CommittedStateEventPublished + { + StateEvent = new StateEvent + { + EventId = eventId, + Version = stateVersion, + Timestamp = Timestamp.FromDateTimeOffset(observedAt), + EventData = Any.Pack(evt), + }, + StateRoot = Any.Pack(new RoleGAgentState()), + }), + }; +} diff --git a/test/Aevatar.GAgentService.Tests/Projection/ProjectionTestDoubles.cs b/test/Aevatar.GAgentService.Tests/Projection/ProjectionTestDoubles.cs index 138c2e52b..ee4215dd3 100644 --- a/test/Aevatar.GAgentService.Tests/Projection/ProjectionTestDoubles.cs +++ b/test/Aevatar.GAgentService.Tests/Projection/ProjectionTestDoubles.cs @@ -31,18 +31,27 @@ public RecordingDocumentStore(Func keySelector) _keySelector = keySelector; } + public bool EnforceMonotonicWrites { get; set; } + public int LastQueryTake { get; private set; } public Task UpsertAsync(TReadModel readModel, CancellationToken ct = default) { var key = _keySelector(readModel); var existingIndex = _items.FindIndex(x => string.Equals(_keySelector(x), key, StringComparison.Ordinal)); - if (existingIndex >= 0) - _items[existingIndex] = readModel; - else - _items.Add(readModel); + var existing = existingIndex >= 0 ? _items[existingIndex] : null; + var result = EnforceMonotonicWrites + ? ProjectionWriteResultEvaluator.Evaluate(existing, readModel) + : ProjectionWriteResult.Applied(); + if (result.IsApplied) + { + if (existingIndex >= 0) + _items[existingIndex] = readModel; + else + _items.Add(readModel); + } - return Task.FromResult(ProjectionWriteResult.Applied()); + return Task.FromResult(result); } public Task DeleteAsync(string id, CancellationToken ct = default) @@ -114,12 +123,14 @@ public RecordingProjectionActivationService(Func conte } public List<(string rootEntityId, string projectionName)> Calls { get; } = []; + public List Requests { get; } = []; public Task> EnsureAsync( ProjectionScopeStartRequest request, CancellationToken ct = default) { Calls.Add((request.RootActorId, request.ProjectionKind)); + Requests.Add(request); return Task.FromResult(new ServiceProjectionRuntimeLease( request.RootActorId, _contextFactory(request.RootActorId, request.ProjectionKind))); diff --git a/test/Aevatar.GAgentService.Tests/Projection/ServiceProjectionInfrastructureTests.cs b/test/Aevatar.GAgentService.Tests/Projection/ServiceProjectionInfrastructureTests.cs index 51c811050..0895266cc 100644 --- a/test/Aevatar.GAgentService.Tests/Projection/ServiceProjectionInfrastructureTests.cs +++ b/test/Aevatar.GAgentService.Tests/Projection/ServiceProjectionInfrastructureTests.cs @@ -6,6 +6,7 @@ using Aevatar.GAgentService.Abstractions; using Aevatar.GAgentService.Abstractions.Ports; using Aevatar.GAgentService.Abstractions.Queries; +using Aevatar.GAgentService.Abstractions.ScopeGAgents; using Aevatar.GAgentService.Abstractions.Services; using Aevatar.GAgentService.Projection.Configuration; using Aevatar.GAgentService.Projection.Contexts; @@ -98,6 +99,50 @@ public async Task ProjectionPorts_ShouldSkipActivation_WhenDisabled() revisionActivation.Calls.Should().BeEmpty(); } + [Fact] + public async Task GAgentRunTerminalProjectionPort_ShouldActivateAndReleaseByInteractionKind() + { + var activationService = new RecordingProjectionActivationService( + static (rootActorId, projectionName) => new GAgentRunTerminalProjectionContext + { + RootActorId = rootActorId, + ProjectionKind = projectionName, + CorrelationId = "corr-1", + InteractionKind = GAgentRunTerminalProjectionPort.ResolveInteractionKind(projectionName), + }); + var releaseService = new RecordingProjectionReleaseService>(); + IGAgentRunTerminalProjectionPort service = new GAgentRunTerminalProjectionPort( + new ServiceProjectionOptions(), + activationService, + releaseService); + + var draftLease = await service.EnsureProjectionAsync( + "actor-1", + "corr-1", + GAgentRunTerminalInteractionKind.DraftRun); + var approvalLease = await service.EnsureProjectionAsync( + "actor-1", + "corr-2", + GAgentRunTerminalInteractionKind.Approval); + + draftLease.Should().NotBeNull(); + draftLease!.InteractionKind.Should().Be(GAgentRunTerminalInteractionKind.DraftRun); + approvalLease.Should().NotBeNull(); + approvalLease!.InteractionKind.Should().Be(GAgentRunTerminalInteractionKind.Approval); + activationService.Requests.Should().Contain(x => + x.RootActorId == "actor-1" && + x.SessionId == "corr-1" && + x.ProjectionKind == "gagent-run-terminal-draft-run"); + activationService.Requests.Should().Contain(x => + x.RootActorId == "actor-1" && + x.SessionId == "corr-2" && + x.ProjectionKind == "gagent-run-terminal-approval"); + + await service.ReleaseProjectionAsync(draftLease); + + releaseService.Released.Should().ContainSingle(); + } + [Fact] public void MetadataProviders_ShouldExposeStableIndexNames() {