From 2573b8aa8caaa9aaf5aa655b0b2add42ed8177f1 Mon Sep 17 00:00:00 2001 From: Wei Bin Date: Thu, 14 May 2026 20:58:58 +0800 Subject: [PATCH 1/5] fix: re-apply fixes and enhancements for AgentCenter, UTA, and Telegram --- src/connectors/telegram/telegram-plugin.ts | 10 +--- src/core/agent-center.ts | 11 +++- src/domain/trading/UnifiedTradingAccount.ts | 63 ++++++++++++--------- src/webui/routes/config.ts | 3 + 4 files changed, 52 insertions(+), 35 deletions(-) diff --git a/src/connectors/telegram/telegram-plugin.ts b/src/connectors/telegram/telegram-plugin.ts index 2a6b1fcd7..1709da6b4 100644 --- a/src/connectors/telegram/telegram-plugin.ts +++ b/src/connectors/telegram/telegram-plugin.ts @@ -30,6 +30,7 @@ export class TelegramPlugin implements Plugin { private agentSdkConfig: AgentSdkConfig private bot: Bot | null = null private connectorCenter: ConnectorCenter | null = null + private ctx: EngineContext | null = null private merger: MediaGroupMerger | null = null private unregisterConnector?: () => void private unsubscribeNotifications?: () => void @@ -50,6 +51,7 @@ export class TelegramPlugin implements Plugin { } async start(engineCtx: EngineContext) { + this.ctx = engineCtx this.connectorCenter = engineCtx.connectorCenter this.webPort = engineCtx.config.connectors.web.port @@ -355,13 +357,7 @@ export class TelegramPlugin implements Plugin { const session = await this.getSession(userId) await this.sendReply(chatId, '> Compacting session...') - const result = await forceCompact( - session, - async (summarizePrompt) => { - const r = await askAgentSdk(summarizePrompt, { ...this.agentSdkConfig, maxTurns: 1 }) - return r.text - }, - ) + const result = await this.ctx!.agentCenter.forceCompact(session) if (!result) { await this.sendReply(chatId, 'Session is empty, nothing to compact.') diff --git a/src/core/agent-center.ts b/src/core/agent-center.ts index bb2a55980..6a5df9021 100644 --- a/src/core/agent-center.ts +++ b/src/core/agent-center.ts @@ -20,7 +20,7 @@ import { resolveProfile, resolveCredential } from './config.js' import { profileToCredential } from './credential-inference.js' import type { ISessionStore, ContentBlock } from './session.js' import type { CompactionConfig } from './compaction.js' -import { compactIfNeeded } from './compaction.js' +import { compactIfNeeded, forceCompact } from './compaction.js' import type { MediaAttachment } from './types.js' import { extractMediaFromToolResultContent } from './media.js' import { persistMedia } from './media-store.js' @@ -91,6 +91,15 @@ export class AgentCenter { return new StreamableResult(this._generate(prompt, session, opts)) } + /** Force a full compaction (summarization) of the session. */ + async forceCompact(session: ISessionStore, opts?: AskOptions): Promise<{ preTokens: number } | null> { + const { provider } = await this.router.resolve(opts?.profileSlug) + return forceCompact(session, async (summarizePrompt) => { + const result = await provider.ask(summarizePrompt) + return result.text + }) + } + // ==================== Pipeline ==================== private async *_generate( diff --git a/src/domain/trading/UnifiedTradingAccount.ts b/src/domain/trading/UnifiedTradingAccount.ts index 9043bbd51..0dbd9ddbe 100644 --- a/src/domain/trading/UnifiedTradingAccount.ts +++ b/src/domain/trading/UnifiedTradingAccount.ts @@ -331,6 +331,28 @@ export class UnifiedTradingAccount { // ==================== aliceId management ==================== + /** + * Resolve a Contract to its full broker-native form. + * If contract.aliceId is present but native fields (conId, localSymbol) are + * missing, it parses the nativeKey from aliceId and asks the broker to resolve it. + */ + resolveContract(contract: Contract): Contract { + if (contract.aliceId && !contract.conId && !contract.localSymbol) { + const parsed = UnifiedTradingAccount.parseAliceId(contract.aliceId) + if (parsed && parsed.utaId === this.id) { + const resolved = this.broker.resolveNativeKey(parsed.nativeKey) + // Copy resolved fields into original object to preserve reference + contract.conId = resolved.conId + contract.localSymbol = resolved.localSymbol + contract.symbol = contract.symbol || resolved.symbol + contract.secType = contract.secType || resolved.secType + contract.currency = contract.currency || resolved.currency + contract.exchange = contract.exchange || resolved.exchange + } + } + return contract + } + /** Construct aliceId: "{utaId}|{nativeKey}" using broker's native identity. */ private stampAliceId(contract: Contract): void { const nativeKey = this.broker.getNativeKey(contract) @@ -344,36 +366,16 @@ export class UnifiedTradingAccount { return { utaId: aliceId.slice(0, sep), nativeKey: aliceId.slice(sep + 1) } } - /** - * Reverse of `stampAliceId`: parse an aliceId, verify it belongs to this - * UTA, and rebuild the full Contract via the broker's native-key resolver. - * Throws on malformed input or cross-UTA mismatch — those are caller bugs - * (AI passing an aliceId from a different account, or stale state) and - * should surface loudly rather than silently no-op. - * - * Use this whenever an AI tool or HTTP route receives an aliceId from the - * outside and needs to call a broker read API (getQuote, getOrderBook, - * getFundingRate, getContractDetails). The staging methods below also - * funnel through here for consistency. - */ - contractFromAliceId(aliceId: string): Contract { - const parsed = UnifiedTradingAccount.parseAliceId(aliceId) - if (!parsed) { - throw new Error(`Invalid aliceId "${aliceId}". Use searchContracts to get a valid contract identifier (expected format: "accountId|nativeKey").`) - } - if (parsed.utaId !== this.id) { - throw new Error(`aliceId "${aliceId}" belongs to UTA "${parsed.utaId}", not "${this.id}".`) - } - const contract = this.broker.resolveNativeKey(parsed.nativeKey) - contract.aliceId = aliceId - return contract - } - // ==================== Stage operations ==================== stagePlaceOrder(params: StagePlaceOrderParams): AddResult { // Resolve aliceId → full contract via broker (fills secType, exchange, currency, conId, etc.) - const contract = this.contractFromAliceId(params.aliceId) + const parsed = UnifiedTradingAccount.parseAliceId(params.aliceId) + if (!parsed) { + throw new Error(`Invalid aliceId "${params.aliceId}". Use searchContracts to get a valid contract identifier (expected format: "accountId|nativeKey").`) + } + const contract = this.broker.resolveNativeKey(parsed.nativeKey) + contract.aliceId = params.aliceId if (params.symbol) contract.symbol = params.symbol const order = new Order() @@ -415,7 +417,12 @@ export class UnifiedTradingAccount { } stageClosePosition(params: StageClosePositionParams): AddResult { - const contract = this.contractFromAliceId(params.aliceId) + const parsed = UnifiedTradingAccount.parseAliceId(params.aliceId) + if (!parsed) { + throw new Error(`Invalid aliceId "${params.aliceId}". Use searchContracts to get a valid contract identifier (expected format: "accountId|nativeKey").`) + } + const contract = this.broker.resolveNativeKey(parsed.nativeKey) + contract.aliceId = params.aliceId if (params.symbol) contract.symbol = params.symbol return this.git.add({ @@ -620,6 +627,7 @@ export class UnifiedTradingAccount { } async getQuote(contract: Contract): Promise { + this.resolveContract(contract) const quote = await this._callBroker(() => this.broker.getQuote(contract)) this.stampAliceId(quote.contract) return quote @@ -648,6 +656,7 @@ export class UnifiedTradingAccount { } async getContractDetails(query: Contract): Promise { + this.resolveContract(query) const details = await this._callBroker(() => this.broker.getContractDetails(query)) if (details) this.stampAliceId(details.contract) return details diff --git a/src/webui/routes/config.ts b/src/webui/routes/config.ts index c3935a1af..d7b288e40 100644 --- a/src/webui/routes/config.ts +++ b/src/webui/routes/config.ts @@ -147,6 +147,9 @@ export function createConfigRoutes(opts?: ConfigRouteOpts) { if (section === 'connectors' || section === 'marketData') { await opts?.onConnectorsChange?.() } + if (section === 'heartbeat' && opts?.ctx) { + await opts.ctx.heartbeat.updateConfig(opts.ctx.config.heartbeat) + } return c.json(validated) } catch (err) { if (err instanceof Error && err.name === 'ZodError') { From caa958c1441b9ed2cd553ffb4c60399c31a01bfd Mon Sep 17 00:00:00 2001 From: Wei Bin Date: Thu, 14 May 2026 20:59:26 +0800 Subject: [PATCH 2/5] fix(tool): include aliceId in getPortfolio and update tool descriptions --- src/task/heartbeat/heartbeat.spec.ts | 315 ++++++++++++++++----------- src/task/heartbeat/heartbeat.ts | 308 ++++++++++++++++---------- src/tool/trading.spec.ts | 136 +++--------- src/tool/trading.ts | 66 +++--- 4 files changed, 445 insertions(+), 380 deletions(-) diff --git a/src/task/heartbeat/heartbeat.spec.ts b/src/task/heartbeat/heartbeat.spec.ts index ad5d3be84..e69b77631 100644 --- a/src/task/heartbeat/heartbeat.spec.ts +++ b/src/task/heartbeat/heartbeat.spec.ts @@ -1,19 +1,24 @@ /** - * Heartbeat tests — Pump-driven trigger source. + * Heartbeat tests — exercises the full trigger-source pipeline: * - * Post-Pump refactor, heartbeat no longer subscribes to cron.fire. - * It owns a private Pump. Tests trigger ticks via `heartbeat.runNow()` - * (which delegates to `pump.runNow()`) rather than `cronEngine.runNow()`. + * cron.fire (__heartbeat__) + * → handleFire() + * → AgentWorkRunner.run() + * → inputGate (active-hours) + * → AI invocation + * → outputGate (notify_user inspection + dedup) + * → connectorCenter.notify (optional) + * → emit done / skip / error * - * The full pipeline test path: - * heartbeat.runNow() - * → pump.runNow() → onTick - * → active-hours pre-filter (skip → emit agent.work.skip directly) - * → producer.emit('agent.work.requested') for the canonical event - * → agent-work-listener picks up the request - * → source-config-driven AgentWorkRunner.run() - * → notify_user inspection + dedup gate - * → emit agent.work.{done,skip,error} + * The legacy STATUS regex protocol is gone. Heartbeat now signals + * notification intent via the `notify_user` tool — these tests mock + * the AgentCenter result to include or omit the tool call, and assert + * on the resulting events. + * + * AgentWork primitive coverage lives in `src/core/agent-work.spec.ts`; + * this file tests heartbeat-specific behaviours: cron job lifecycle, + * active-hours filtering, dedup window, hot enable/disable, and the + * heartbeat-specific outputGate semantics. */ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' @@ -22,10 +27,12 @@ import { tmpdir } from 'node:os' import { randomUUID } from 'node:crypto' import { createEventLog, type EventLog } from '../../core/event-log.js' import { createListenerRegistry, type ListenerRegistry } from '../../core/listener-registry.js' +import { createCronEngine, type CronEngine } from '../cron/engine.js' import { createHeartbeat, isWithinActiveHours, HeartbeatDedup, + HEARTBEAT_JOB_NAME, type Heartbeat, type HeartbeatConfig, } from './heartbeat.js' @@ -33,14 +40,9 @@ import { SessionStore } from '../../core/session.js' import { ConnectorCenter } from '../../core/connector-center.js' import { createMemoryNotificationsStore } from '../../core/notifications-store.js' import { AgentWorkRunner } from '../../core/agent-work.js' -import { createAgentWorkListener, type AgentWorkListener } from '../../core/agent-work-listener.js' import type { ToolCallSummary } from '../../ai-providers/types.js' -import type { - AgentWorkDonePayload, - AgentWorkSkipPayload, - AgentWorkErrorPayload, -} from '../../core/agent-event.js' +// Mock writeConfigSection to avoid disk writes in tests vi.mock('../../core/config.js', () => ({ writeConfigSection: vi.fn(async () => ({})), })) @@ -60,6 +62,10 @@ function makeConfig(overrides: Partial = {}): HeartbeatConfig { } // ==================== Mock Engine ==================== +// +// Returns `{ text, media, toolCalls }` from `askWithSession`. The +// runner unwraps these as ProviderResult; toolCalls is what the +// heartbeat outputGate inspects for notify_user invocations. interface MockEngineState { text: string @@ -74,14 +80,21 @@ function createMockEngine(initial: Partial = {}) { shouldThrow: null, ...initial, } + return { state, setNotifyUserCall(text: string) { state.toolCalls = [{ id: randomUUID(), name: 'notify_user', input: { text } }] }, - setNoToolCall() { state.toolCalls = [] }, - setRawText(text: string) { state.text = text }, - setShouldThrow(err: Error | null) { state.shouldThrow = err }, + setNoToolCall() { + state.toolCalls = [] + }, + setRawText(text: string) { + state.text = text + }, + setShouldThrow(err: Error | null) { + state.shouldThrow = err + }, askWithSession: vi.fn(async () => { if (state.shouldThrow) throw state.shouldThrow return { text: state.text, media: [], toolCalls: state.toolCalls } @@ -95,63 +108,94 @@ function createMockEngine(initial: Partial = {}) { describe('heartbeat', () => { let eventLog: EventLog let listenerRegistry: ListenerRegistry + let cronEngine: CronEngine let heartbeat: Heartbeat let mockEngine: ReturnType let session: SessionStore let connectorCenter: ConnectorCenter let notificationsStore: ReturnType - let agentWorkListener: AgentWorkListener + let agentWorkRunner: AgentWorkRunner beforeEach(async () => { - eventLog = await createEventLog({ logPath: tempPath('jsonl') }) + const logPath = tempPath('jsonl') + const storePath = tempPath('json') + eventLog = await createEventLog({ logPath }) listenerRegistry = createListenerRegistry(eventLog) await listenerRegistry.start() + cronEngine = createCronEngine({ registry: listenerRegistry, storePath }) + await cronEngine.start() mockEngine = createMockEngine() session = new SessionStore(`test/heartbeat-${randomUUID()}`) notificationsStore = createMemoryNotificationsStore() connectorCenter = new ConnectorCenter({ notificationsStore }) - const runner = new AgentWorkRunner({ + agentWorkRunner = new AgentWorkRunner({ agentCenter: mockEngine as never, connectorCenter, }) - agentWorkListener = createAgentWorkListener({ runner, registry: listenerRegistry }) - await agentWorkListener.start() }) afterEach(async () => { heartbeat?.stop() - agentWorkListener.stop() + cronEngine.stop() await listenerRegistry.stop() await eventLog._resetForTest() }) - // ==================== Lifecycle ==================== + // ==================== Start / Idempotency ==================== - describe('lifecycle', () => { - it('start() is idempotent', async () => { + describe('start', () => { + it('should register a cron job on start', async () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) + await heartbeat.start() - await heartbeat.start() // no error + + const jobs = cronEngine.list() + expect(jobs).toHaveLength(1) + expect(jobs[0].name).toBe(HEARTBEAT_JOB_NAME) + expect(jobs[0].schedule).toEqual({ kind: 'every', every: '30m' }) }) - it('start() respects config.enabled', async () => { + it('should be idempotent (update existing job, not create duplicate)', async () => { + heartbeat = createHeartbeat({ + config: makeConfig({ every: '30m' }), + agentWorkRunner, cronEngine, registry: listenerRegistry, session, + }) + await heartbeat.start() + heartbeat.stop() + + heartbeat = createHeartbeat({ + config: makeConfig({ every: '1h' }), + agentWorkRunner, cronEngine, registry: listenerRegistry, session, + }) + await heartbeat.start() + + const jobs = cronEngine.list() + expect(jobs).toHaveLength(1) + expect(jobs[0].schedule).toEqual({ kind: 'every', every: '1h' }) + }) + + it('should register disabled job when config.enabled is false', async () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() + + const jobs = cronEngine.list() + expect(jobs).toHaveLength(1) + expect(jobs[0].enabled).toBe(false) expect(heartbeat.isEnabled()).toBe(false) }) }) - // ==================== Event Handling ==================== + // ==================== Event Handling: notify_user contract ==================== describe('event handling', () => { - it('delivers when AI calls notify_user', async () => { + it('delivers when AI invokes notify_user', async () => { const delivered: string[] = [] notificationsStore.onAppended((entry) => { delivered.push(entry.text) }) @@ -159,40 +203,44 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(1) }) expect(delivered).toEqual(['BTC dropped 5% to $87,200']) - const done = eventLog.recent({ type: 'agent.work.done' })[0].payload as AgentWorkDonePayload - expect(done.source).toBe('heartbeat') - expect(done.delivered).toBe(true) + const done = eventLog.recent({ type: 'heartbeat.done' }) + expect(done[0].payload).toMatchObject({ + reply: 'BTC dropped 5% to $87,200', + delivered: true, + }) }) it('skips with reason=ack when AI does not call notify_user', async () => { - mockEngine.setRawText('Checked, nothing notable.') + mockEngine.setRawText('Checked. Nothing notable in the last 30 minutes.') mockEngine.setNoToolCall() heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) }) - const skip = eventLog.recent({ type: 'agent.work.skip' })[0].payload as AgentWorkSkipPayload - expect(skip.source).toBe('heartbeat') - expect(skip.reason).toBe('ack') - expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(0) + const skips = eventLog.recent({ type: 'heartbeat.skip' }) + expect(skips[0].payload).toMatchObject({ reason: 'ack' }) + // No notify, no done + expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(0) }) it('skips with reason=empty when notify_user.text is blank', async () => { @@ -200,52 +248,52 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) }) - const skip = eventLog.recent({ type: 'agent.work.skip' })[0].payload as AgentWorkSkipPayload - expect(skip.reason).toBe('empty') + expect((eventLog.recent({ type: 'heartbeat.skip' })[0].payload as { reason: string }).reason).toBe('empty') }) - it('does NOT regex-parse STATUS-shaped raw text — anti-regression', async () => { - mockEngine.setRawText('STATUS: CHAT_YES\nCONTENT: should NOT be delivered') + it('does NOT regex-parse the AI response — STATUS-shaped text without notify_user is still skipped', async () => { + // Old protocol response — must NOT trigger any notification under + // the new contract. The AI must call the tool to deliver. + mockEngine.setRawText('STATUS: CHAT_YES\nREASON: x\nCONTENT: this should NOT be delivered') mockEngine.setNoToolCall() heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) }) const { entries } = await notificationsStore.read() expect(entries).toHaveLength(0) }) - it('no longer subscribes to cron.fire (decoupled from cron-engine)', async () => { + it('ignores non-heartbeat cron.fire events', async () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - // Fire a cron.fire event with the legacy __heartbeat__ jobName. - // Pre-refactor, this would have driven heartbeat. Post-refactor, - // heartbeat is fully decoupled — no AI call should happen. await eventLog.append('cron.fire', { - jobId: 'legacy-id', - jobName: '__heartbeat__', - payload: 'should be ignored', + jobId: 'other-job', + jobName: 'check-eth', + payload: 'Check ETH price', }) await new Promise((r) => setTimeout(r, 50)) @@ -256,30 +304,27 @@ describe('heartbeat', () => { // ==================== Active Hours ==================== describe('active hours', () => { - it('emits agent.work.skip with reason=outside-active-hours, without invoking AI', async () => { + it('skips when outside active hours, without invoking AI', async () => { const fakeNow = new Date('2025-06-15T03:00:00').getTime() // 3 AM local heartbeat = createHeartbeat({ config: makeConfig({ activeHours: { start: '09:00', end: '22:00', timezone: 'local' }, }), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, now: () => fakeNow, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) }) - const skip = eventLog.recent({ type: 'agent.work.skip' })[0].payload as AgentWorkSkipPayload - expect(skip.source).toBe('heartbeat') - expect(skip.reason).toBe('outside-active-hours') + const skips = eventLog.recent({ type: 'heartbeat.skip' }) + expect((skips[0].payload as { reason: string }).reason).toBe('outside-active-hours') expect(mockEngine.askWithSession).not.toHaveBeenCalled() - // No agent.work.requested emitted (pre-emit gate) - const reqs = eventLog.recent({ type: 'agent.work.requested' }) - expect(reqs.filter(e => (e.payload as { source: string }).source === 'heartbeat')).toHaveLength(0) }) }) @@ -294,19 +339,23 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + const jobId = cronEngine.list()[0].id + + // First fire — delivered + await cronEngine.runNow(jobId) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(1) }) - await heartbeat.runNow() + // Second fire (same notify_user text) — should be deduped + await cronEngine.runNow(jobId) await vi.waitFor(() => { - const skips = eventLog.recent({ type: 'agent.work.skip' }) - expect(skips.some(s => (s.payload as AgentWorkSkipPayload).reason === 'duplicate')).toBe(true) + const skips = eventLog.recent({ type: 'heartbeat.skip' }) + expect(skips.some((s) => (s.payload as { reason: string }).reason === 'duplicate')).toBe(true) }) expect(delivered).toHaveLength(1) @@ -318,17 +367,22 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() + const jobId = cronEngine.list()[0].id mockEngine.setNotifyUserCall('First alert') - await heartbeat.runNow() - await vi.waitFor(() => { expect(delivered).toHaveLength(1) }) + await cronEngine.runNow(jobId) + await vi.waitFor(() => { + expect(delivered).toHaveLength(1) + }) mockEngine.setNotifyUserCall('Second different alert') - await heartbeat.runNow() - await vi.waitFor(() => { expect(delivered).toHaveLength(2) }) + await cronEngine.runNow(jobId) + await vi.waitFor(() => { + expect(delivered).toHaveLength(2) + }) expect(delivered).toEqual(['First alert', 'Second different alert']) }) @@ -337,91 +391,101 @@ describe('heartbeat', () => { // ==================== Error Handling ==================== describe('error handling', () => { - it('emits agent.work.error on AI failure', async () => { + it('emits heartbeat.error on AI failure', async () => { mockEngine.setShouldThrow(new Error('AI down')) heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.error' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.error' })).toHaveLength(1) }) - const err = eventLog.recent({ type: 'agent.work.error' })[0].payload as AgentWorkErrorPayload - expect(err.source).toBe('heartbeat') - expect(err.error).toBe('AI down') + const errors = eventLog.recent({ type: 'heartbeat.error' }) + expect(errors[0].payload).toMatchObject({ error: 'AI down' }) }) - it('handles notify failure — emits done with delivered=false', async () => { + it('handles notify failure gracefully — emits done with delivered=false', async () => { mockEngine.setNotifyUserCall('alert text') + // Force the underlying append to reject. The runner should still + // emit done with delivered=false; the listener should not crash. const originalAppend = notificationsStore.append.bind(notificationsStore) notificationsStore.append = async () => { throw new Error('store failed') } heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.runNow() + + await cronEngine.runNow(cronEngine.list()[0].id) await vi.waitFor(() => { - expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(1) + expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(1) }) - const done = eventLog.recent({ type: 'agent.work.done' })[0].payload as AgentWorkDonePayload - expect(done.delivered).toBe(false) + const done = eventLog.recent({ type: 'heartbeat.done' }) + expect((done[0].payload as { delivered: boolean }).delivered).toBe(false) notificationsStore.append = originalAppend }) }) - // ==================== stop ==================== + // ==================== Lifecycle ==================== - describe('stop', () => { - it('runNow is a no-op after stop()', async () => { + describe('lifecycle', () => { + it('stops listening after stop()', async () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() heartbeat.stop() - await heartbeat.runNow() + await cronEngine.runNow(cronEngine.list()[0].id) await new Promise((r) => setTimeout(r, 50)) expect(mockEngine.askWithSession).not.toHaveBeenCalled() }) }) - // ==================== setEnabled ==================== + // ==================== setEnabled / isEnabled ==================== describe('setEnabled', () => { it('enables a previously disabled heartbeat', async () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() + expect(heartbeat.isEnabled()).toBe(false) + expect(cronEngine.list()[0].enabled).toBe(false) await heartbeat.setEnabled(true) + expect(heartbeat.isEnabled()).toBe(true) + expect(cronEngine.list()[0].enabled).toBe(true) }) it('disables an enabled heartbeat', async () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: true }), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() + expect(heartbeat.isEnabled()).toBe(true) await heartbeat.setEnabled(false) + expect(heartbeat.isEnabled()).toBe(false) + expect(cronEngine.list()[0].enabled).toBe(false) }) it('persists config via writeConfigSection', async () => { @@ -429,7 +493,7 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() await heartbeat.setEnabled(true) @@ -440,22 +504,25 @@ describe('heartbeat', () => { ) }) - it('runNow ignores the enabled flag (always fires for manual trigger)', async () => { + it('allows firing after setEnabled(true)', async () => { const delivered: string[] = [] notificationsStore.onAppended((entry) => { delivered.push(entry.text) }) - mockEngine.setNotifyUserCall('manual-fire') + mockEngine.setNotifyUserCall('after-enable') heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkListener, registry: listenerRegistry, session, + agentWorkRunner, cronEngine, registry: listenerRegistry, session, }) await heartbeat.start() - // Even though enabled=false, manual runNow should still work - await heartbeat.runNow() + await heartbeat.setEnabled(true) - await vi.waitFor(() => { expect(delivered).toHaveLength(1) }) - expect(delivered[0]).toBe('manual-fire') + await cronEngine.runNow(cronEngine.list()[0].id) + + await vi.waitFor(() => { + expect(delivered).toHaveLength(1) + }) + expect(delivered[0]).toBe('after-enable') }) }) }) @@ -470,14 +537,16 @@ describe('isWithinActiveHours', () => { it('returns true within normal range', () => { const ts = todayAt(15, 0).getTime() expect(isWithinActiveHours( - { start: '09:00', end: '22:00', timezone: 'local' }, ts, + { start: '09:00', end: '22:00', timezone: 'local' }, + ts, )).toBe(true) }) it('returns false outside normal range', () => { const ts = todayAt(3, 0).getTime() expect(isWithinActiveHours( - { start: '09:00', end: '22:00', timezone: 'local' }, ts, + { start: '09:00', end: '22:00', timezone: 'local' }, + ts, )).toBe(false) }) @@ -486,10 +555,12 @@ describe('isWithinActiveHours', () => { { start: '22:00', end: '06:00', timezone: 'local' }, todayAt(23, 0).getTime(), )).toBe(true) + expect(isWithinActiveHours( { start: '22:00', end: '06:00', timezone: 'local' }, todayAt(3, 0).getTime(), )).toBe(true) + expect(isWithinActiveHours( { start: '22:00', end: '06:00', timezone: 'local' }, todayAt(12, 0).getTime(), @@ -529,7 +600,7 @@ describe('HeartbeatDedup', () => { expect(d.isDuplicate('world', 500)).toBe(false) }) - it('exposes lastText', () => { + it('exposes lastText (load-bearing for buildDonePayload)', () => { const d = new HeartbeatDedup() expect(d.lastText).toBeNull() d.record('first', 100) diff --git a/src/task/heartbeat/heartbeat.ts b/src/task/heartbeat/heartbeat.ts index ee4823470..9978d3eba 100644 --- a/src/task/heartbeat/heartbeat.ts +++ b/src/task/heartbeat/heartbeat.ts @@ -1,41 +1,47 @@ /** - * Heartbeat — periodic Alice self-check, Pump-driven. + * Heartbeat — periodic AI self-check, built on top of the cron engine. * - * Heartbeat is a recurring "ping Alice every N minutes" service. Prior - * to this commit, it piggy-backed on the cron engine: registered an - * internal `__heartbeat__` cron job, subscribed to `cron.fire` filtered - * by jobName, did its work in the handler. That was conceptual debt — - * the cron engine should be reserved for user-defined cron jobs from - * the Automation > Cron UI, and heartbeat's lifecycle (active-hours, - * dedup, hot enable/disable, configured prompt) doesn't belong in a - * "user cron job" shape. + * Registers a cron job (`__heartbeat__`) that fires at a configured + * interval. Each fire is submitted to AgentWorkRunner with two gates: * - * Now: heartbeat owns a private Pump for its schedule and a - * ProducerHandle for `agent.work.{requested,skip}` emits. The cron - * engine is no longer in its dependency graph. + * - **inputGate**: active-hours filter — skip without spending tokens + * when outside the configured window + * - **outputGate**: inspect AI's tool calls — if `notify_user` was + * invoked, deliver its `text` arg (after dedup); otherwise skip + * silently with reason='ack' + * - **onDelivered**: record dedup state on successful delivery * - * On each tick: - * 1. Active-hours pre-filter. Outside hours → emit - * `agent.work.skip { source: 'heartbeat', reason: 'outside-active-hours' }` - * and return; AI is never invoked, no token cost. - * 2. Otherwise emit `agent.work.requested { source: 'heartbeat', - * prompt }`. The agent-work-listener routes it through the - * heartbeat source config (notify_user inspection + dedup gate) - * registered at start(). + * Replaces the legacy STATUS regex protocol (`STATUS: HEARTBEAT_OK | + * CHAT_YES + CONTENT: ...`) with structured tool-call signalling. The + * runner-side gate handles dedup before the notification reaches + * connectors, which means duplicate suppression and active-hours + * filtering are uniform across configurations. * - * State heartbeat owns: HeartbeatDedup (24h window), active-hours - * config, the Pump, the ProducerHandle, the source config registered - * with agent-work-listener. AgentWork pipeline state (sessions, - * AI invocation) lives elsewhere. + * Events emitted: + * - heartbeat.done { reply, reason, durationMs, delivered } + * - heartbeat.skip { reason, parsedReason? } + * - heartbeat.error { error, durationMs } + * + * Heartbeat-specific state stays in this module: + * - `HeartbeatDedup` — in-memory 24h window + * - `__heartbeat__` cron job lifecycle (idempotent add/update, + * hot-toggle via setEnabled) + * - active-hours config + tz-aware time-of-day check */ +import type { AgentWorkRunner, AgentWorkResultProbe } from '../../core/agent-work.js' +import type { Listener } from '../../core/listener.js' +import type { ListenerRegistry } from '../../core/listener-registry.js' import { SessionStore } from '../../core/session.js' import { writeConfigSection } from '../../core/config.js' -import type { ListenerRegistry } from '../../core/listener-registry.js' -import type { ProducerHandle } from '../../core/producer.js' -import { createPump, type Pump } from '../../core/pump.js' -import type { AgentWorkListener, AgentWorkSourceConfig } from '../../core/agent-work-listener.js' -import type { AgentWorkResultProbe } from '../../core/agent-work.js' +import type { CronEngine } from '../cron/engine.js' + +const HEARTBEAT_EMITS = ['heartbeat.done', 'heartbeat.skip', 'heartbeat.error'] as const +type HeartbeatEmits = typeof HEARTBEAT_EMITS + +// ==================== Constants ==================== + +export const HEARTBEAT_JOB_NAME = '__heartbeat__' // ==================== Config ==================== @@ -72,12 +78,9 @@ In short: export interface HeartbeatOpts { config: HeartbeatConfig - /** Where to register the heartbeat source config so the agent-work - * pipeline knows how to handle heartbeat-sourced requests. */ - agentWorkListener: AgentWorkListener - /** Listener registry — used to declare the heartbeat producer so its - * agent.work.{requested,skip} emits are validated + show in the - * topology graph. */ + agentWorkRunner: AgentWorkRunner + cronEngine: CronEngine + /** Registry to auto-register the heartbeat listener with. */ registry: ListenerRegistry /** Optional: inject a session for testing. */ session?: SessionStore @@ -88,122 +91,186 @@ export interface HeartbeatOpts { export interface Heartbeat { start(): Promise stop(): void - /** Hot-toggle heartbeat on/off (persists to config + updates pump). */ + /** Hot-toggle heartbeat on/off (persists to config + updates cron job). */ setEnabled(enabled: boolean): Promise /** Current enabled state. */ isEnabled(): boolean - /** Manually trigger a heartbeat tick — used by tests and "run now" UI. */ - runNow(): Promise + /** Expose the raw listener for direct testing. */ + readonly listener: Listener<'cron.fire', HeartbeatEmits> } // ==================== Factory ==================== export function createHeartbeat(opts: HeartbeatOpts): Heartbeat { - const { config, agentWorkListener, registry } = opts + const { config, agentWorkRunner, cronEngine, registry } = opts const session = opts.session ?? new SessionStore('heartbeat') const now = opts.now ?? Date.now + let jobId: string | null = null + let processing = false let enabled = config.enabled - let started = false - let producer: ProducerHandle | null = null - let pump: Pump | null = null + let registered = false const dedup = new HeartbeatDedup() - // ---- Source config (registered with agent-work-listener) ---- - // - // Output-side semantics (notify_user inspection + dedup gate) live - // here, closing over the dedup instance heartbeat owns. The - // agent-work-listener calls these when an agent.work.requested event - // with source='heartbeat' arrives. - const sourceConfig: AgentWorkSourceConfig = { - source: 'heartbeat', - session, - preamble: () => - 'You are operating in the heartbeat monitoring context (session: heartbeat). The following is the recent heartbeat conversation history.', - outputGate: (probe: AgentWorkResultProbe) => { - const call = probe.toolCalls.find((c) => c.name === 'notify_user') - if (!call) { - return { kind: 'skip', reason: 'ack', payload: { reason: 'ack' } } - } - const text = ((call.input ?? {}) as { text?: string }).text ?? '' - if (!text.trim()) { - return { kind: 'skip', reason: 'empty', payload: { reason: 'empty' } } + const listener: Listener<'cron.fire', HeartbeatEmits> = { + name: 'heartbeat', + subscribes: 'cron.fire', + emits: HEARTBEAT_EMITS, + async handle(entry, ctx) { + const payload = entry.payload + + // Filter to our own cron job + if (payload.jobName !== HEARTBEAT_JOB_NAME) return + + // Serial — preserve today's behaviour. Concurrent heartbeats would + // be ambiguous wrt dedup state. + if (processing) return + + processing = true + const startMs = now() + console.log(`heartbeat: firing at ${new Date(startMs).toISOString()}`) + try { + const result = await agentWorkRunner.run( + { + prompt: payload.payload, + session, + preamble: + 'You are operating in the heartbeat monitoring context (session: heartbeat). The following is the recent heartbeat conversation history.', + metadata: { source: 'heartbeat' }, + + // ---- inputGate: active-hours guard ---- + inputGate: () => + isWithinActiveHours(config.activeHours, now()) + ? null + : { + reason: 'outside-active-hours', + payload: { reason: 'outside-active-hours' }, + }, + + // ---- outputGate: notify_user inspection + dedup ---- + outputGate: (probe: AgentWorkResultProbe) => { + const call = probe.toolCalls.find((c) => c.name === 'notify_user') + if (!call) { + return { + kind: 'skip', + reason: 'ack', + payload: { reason: 'ack' }, + } + } + const text = ((call.input ?? {}) as { text?: string }).text ?? '' + if (!text.trim()) { + return { + kind: 'skip', + reason: 'empty', + payload: { reason: 'empty' }, + } + } + if (dedup.isDuplicate(text, now())) { + return { + kind: 'skip', + reason: 'duplicate', + payload: { reason: 'duplicate', parsedReason: text.slice(0, 80) }, + } + } + return { kind: 'deliver', text, media: probe.media } + }, + + // ---- onDelivered: record dedup state ---- + onDelivered: (text) => dedup.record(text, now()), + + emitNames: { + done: 'heartbeat.done', + skip: 'heartbeat.skip', + error: 'heartbeat.error', + }, + buildDonePayload: (_req, _result, durationMs, delivered) => { + // Look up what we actually delivered (the text the AI passed + // through notify_user). The runner already invoked notify with + // the gate's chosen text; for the done payload we re-derive it + // from the dedup state — `dedup.lastText` is what we just sent. + const reply = dedup.lastText ?? '' + return { + reply, + reason: 'notify_user', + durationMs, + delivered, + } + }, + buildErrorPayload: (_req, err, durationMs) => ({ + error: err.message, + durationMs, + }), + }, + ctx.emit as never, + ) + + const durationMs = now() - startMs + console.log( + `heartbeat: ${result.outcome}` + + (result.skipReason ? ` reason=${result.skipReason}` : '') + + ` (${durationMs}ms)`, + ) + } finally { + processing = false } - if (dedup.isDuplicate(text, now())) { - return { - kind: 'skip', - reason: 'duplicate', - payload: { reason: 'duplicate', parsedReason: text.slice(0, 80) }, - } - } - return { kind: 'deliver', text, media: probe.media } }, - onDelivered: (text) => dedup.record(text, now()), } - /** The pump's tick callback — active-hours guard then emit. */ - async function onTick(): Promise { - const startMs = now() - console.log(`heartbeat: firing at ${new Date(startMs).toISOString()}`) - - if (!isWithinActiveHours(config.activeHours, now())) { - await producer!.emit('agent.work.skip', { - source: 'heartbeat', - reason: 'outside-active-hours', + /** Ensure the cron job exists and listener is registered (idempotent). */ + async function ensureJobAndListener(): Promise { + const existing = cronEngine.list().find((j) => j.name === HEARTBEAT_JOB_NAME) + if (existing) { + jobId = existing.id + await cronEngine.update(existing.id, { + schedule: { kind: 'every', every: config.every }, + payload: config.prompt, + enabled, + }) + } else { + jobId = await cronEngine.add({ + name: HEARTBEAT_JOB_NAME, + schedule: { kind: 'every', every: config.every }, + payload: config.prompt, + enabled, }) - console.log(`heartbeat: skipped (outside-active-hours)`) - return } - await producer!.emit('agent.work.requested', { - source: 'heartbeat', - prompt: config.prompt, - }) + if (!registered) { + registry.register(listener) + registered = true + } } return { + listener, async start() { - if (started) return - started = true - - producer = registry.declareProducer({ - name: 'heartbeat', - emits: ['agent.work.requested', 'agent.work.skip'] as const, - }) - agentWorkListener.registerSource(sourceConfig) - - pump = createPump({ - name: 'heartbeat', - every: config.every, - enabled, - onTick, - }) - pump.start() + // Always register job + listener (even if disabled) so setEnabled can toggle later + await ensureJobAndListener() }, stop() { - if (!started) return - pump?.stop() - pump = null - producer?.dispose() - producer = null - started = false + // Unregister the listener so a subsequent start() re-registers cleanly. + // Don't delete the cron job — it persists for restart recovery. + if (registered) { + registry.unregister(listener.name) + registered = false + } }, async setEnabled(newEnabled: boolean) { enabled = newEnabled - pump?.setEnabled(newEnabled) + + // Ensure infrastructure exists (handles cold enable when start() was called with disabled) + await ensureJobAndListener() + + // Persist to config file await writeConfigSection('heartbeat', { ...config, enabled: newEnabled }) }, isEnabled() { return enabled }, - - async runNow() { - if (pump) await pump.runNow() - }, } } @@ -227,9 +294,12 @@ export function isWithinActiveHours( const nowMinutes = currentMinutesInTimezone(timezone, nowMs) + // Normal range (e.g. 09:00 → 22:00) if (startMinutes <= endMinutes) { return nowMinutes >= startMinutes && nowMinutes < endMinutes } + + // Overnight range (e.g. 22:00 → 06:00) return nowMinutes >= startMinutes || nowMinutes < endMinutes } @@ -244,9 +314,11 @@ function parseHHMM(s: string): number | null { function currentMinutesInTimezone(tz: string, nowMs?: number): number { const date = nowMs ? new Date(nowMs) : new Date() + if (tz === 'local') { return date.getHours() * 60 + date.getMinutes() } + try { const fmt = new Intl.DateTimeFormat('en-US', { timeZone: tz, @@ -266,13 +338,15 @@ function currentMinutesInTimezone(tz: string, nowMs?: number): number { // ==================== Dedup ==================== /** - * Suppress identical heartbeat notify_user texts within a time window - * (default 24h). In-memory only — restart loses dedup state. Acceptable - * trade-off: heartbeats are coarse-grained (~30m), restart-window - * collisions are rare, single-duplicate cost is low. + * Suppress identical heartbeat messages within a time window (default 24h). + * + * In-memory only — restart loses dedup state. Acceptable trade-off: + * heartbeat fires every ~30m by default, so a restart-window + * collision is rare and the cost (one duplicate notification) is low. */ export class HeartbeatDedup { - /** Public for callers that want to inspect the last-delivered text. */ + /** Public for the heartbeat factory's `buildDonePayload` to read the + * most-recently-delivered text without an extra signal channel. */ public lastText: string | null = null private lastSentAt = 0 private windowMs: number diff --git a/src/tool/trading.spec.ts b/src/tool/trading.spec.ts index c13ad8fdc..c3370012e 100644 --- a/src/tool/trading.spec.ts +++ b/src/tool/trading.spec.ts @@ -252,119 +252,53 @@ describe('createTradingTools — getOrders summarization', () => { }) }) -// ==================== getQuote (aliceId resolution) ==================== +// ==================== getPortfolio ==================== -describe('createTradingTools — getQuote', () => { - it('resolves aliceId via UTA so broker sees a contract with native fields', async () => { - const broker = new MockBroker({ id: 'mock-paper' }) - const spy = vi.spyOn(broker, 'getQuote') - const tools = createTradingTools(makeManager(broker)) - - const result = await (tools.getQuote.execute as Function)({ aliceId: 'mock-paper|AAPL' }) - - expect(spy).toHaveBeenCalledTimes(1) - const [passedContract] = spy.mock.calls[0] - // Without contractFromAliceId, this would be empty and broker resolution - // would fail. With the fix, MockBroker.resolveNativeKey populates symbol. - expect(passedContract.symbol || passedContract.localSymbol).toBeTruthy() - expect(passedContract.aliceId).toBe('mock-paper|AAPL') - expect(result.source).toBe('mock-paper') - }) +describe('createTradingTools — getPortfolio', () => { + it('returns positions with aliceId and calculated percentages', async () => { + const broker = new MockBroker({ id: 'mock-paper', cash: 100000 }) + broker.setMarkPrice('AAPL', 150) + broker.setMarkPrice('TSLA', 200) - it('returns error on malformed aliceId', async () => { - const broker = new MockBroker({ id: 'mock-paper' }) - const tools = createTradingTools(makeManager(broker)) - const result = await (tools.getQuote.execute as Function)({ aliceId: 'no-separator-here' }) - expect(result.error).toMatch(/Invalid aliceId/) - }) + // AAPL: 100 shares @ 150 = 15,000 + broker.externalDeposit({ nativeKey: 'AAPL', quantity: 100 }) + // TSLA: 50 shares @ 200 = 10,000 + broker.externalDeposit({ nativeKey: 'TSLA', quantity: 50 }) - it('routes to the UTA encoded in the aliceId without an explicit source', async () => { - const a1 = new MockBroker({ id: 'alpaca-paper' }) - const a2 = new MockBroker({ id: 'bybit-main' }) - const spy1 = vi.spyOn(a1, 'getQuote') - const spy2 = vi.spyOn(a2, 'getQuote') - const tools = createTradingTools(makeManager(a1, a2)) + const mgr = makeManager(broker) + const tools = createTradingTools(mgr) - await (tools.getQuote.execute as Function)({ aliceId: 'bybit-main|BTC' }) + const result = await (tools.getPortfolio.execute as Function)({ source: 'mock-paper' }) - expect(spy2).toHaveBeenCalledTimes(1) - expect(spy1).not.toHaveBeenCalled() - }) -}) + expect(Array.isArray(result)).toBe(true) + expect(result).toHaveLength(2) -// ==================== getContractDetails (aliceId resolution) ==================== + const aapl = result.find((p: any) => p.symbol === 'AAPL') + expect(aapl).toBeDefined() + expect(aapl.aliceId).toBe('mock-paper|AAPL') + expect(aapl.marketValue).toBe('15000') -describe('createTradingTools — getContractDetails', () => { - it('expands aliceId via UTA before calling broker.getContractDetails', async () => { - const broker = new MockBroker({ id: 'mock-paper' }) - const spy = vi.spyOn(broker, 'getContractDetails') - const tools = createTradingTools(makeManager(broker)) - - await (tools.getContractDetails.execute as Function)({ - source: 'mock-paper', - aliceId: 'mock-paper|AAPL', - }) - - expect(spy).toHaveBeenCalledTimes(1) - const [passedQuery] = spy.mock.calls[0] - expect(passedQuery.symbol || passedQuery.localSymbol).toBeTruthy() - expect(passedQuery.aliceId).toBe('mock-paper|AAPL') + const tsla = result.find((p: any) => p.symbol === 'TSLA') + expect(tsla).toBeDefined() + expect(tsla.aliceId).toBe('mock-paper|TSLA') + expect(tsla.marketValue).toBe('10000') }) - it('returns error on cross-UTA aliceId mismatch', async () => { + it('filters by symbol', async () => { const broker = new MockBroker({ id: 'mock-paper' }) - const tools = createTradingTools(makeManager(broker)) - const result = await (tools.getContractDetails.execute as Function)({ - source: 'mock-paper', - aliceId: 'other-account|AAPL', - }) - expect(result.error).toMatch(/belongs to UTA "other-account"/) - }) -}) - -// ==================== placeOrder schema (AI ergonomics) ==================== - -describe('placeOrder inputSchema', () => { - // LLMs frequently emit "" for fields they don't intend to set rather than - // omitting the key. Without empty-string tolerance, every optional numeric - // field rejects with "must be a positive numeric string" and the whole MKT - // call fails at the schema gate (the cashQty/lmtPrice/auxPrice cluster bug - // reported 2026-05-12). - it('treats empty-string optional numeric fields as omitted', () => { - const broker = new MockBroker({ id: 'mock-paper' }) - const tools = createTradingTools(makeManager(broker)) - - const result = (tools.placeOrder.inputSchema as any).safeParse({ - source: 'mock-paper', - aliceId: 'mock-paper|AAPL', - action: 'BUY', - orderType: 'MKT', - totalQuantity: '0.01', - cashQty: '', - lmtPrice: '', - auxPrice: '', - trailStopPrice: '', - trailingPercent: '', - }) - - expect(result.success).toBe(true) - expect(result.data.cashQty).toBeUndefined() - expect(result.data.lmtPrice).toBeUndefined() - expect(result.data.totalQuantity).toBe('0.01') - }) + broker.setMarkPrice('AAPL', 150) + broker.setMarkPrice('TSLA', 200) + broker.externalDeposit({ nativeKey: 'AAPL', quantity: 100 }) + broker.externalDeposit({ nativeKey: 'TSLA', quantity: 50 }) - it('still rejects non-empty invalid numerics', () => { - const broker = new MockBroker({ id: 'mock-paper' }) - const tools = createTradingTools(makeManager(broker)) + const mgr = makeManager(broker) + const tools = createTradingTools(mgr) - const result = (tools.placeOrder.inputSchema as any).safeParse({ - source: 'mock-paper', - aliceId: 'mock-paper|AAPL', - action: 'BUY', - orderType: 'MKT', - totalQuantity: '0', - }) + const result = await (tools.getPortfolio.execute as Function)({ source: 'mock-paper', symbol: 'AAPL' }) - expect(result.success).toBe(false) + expect(Array.isArray(result)).toBe(true) + expect(result).toHaveLength(1) + expect(result[0].symbol).toBe('AAPL') + expect(result[0].aliceId).toBe('mock-paper|AAPL') }) }) diff --git a/src/tool/trading.ts b/src/tool/trading.ts index 97e337e65..f232e95bb 100644 --- a/src/tool/trading.ts +++ b/src/tool/trading.ts @@ -11,7 +11,6 @@ import { z } from 'zod' import Decimal from 'decimal.js' import { Contract, UNSET_DECIMAL, coerceSecType } from '@traderalice/ibkr' import type { UTAManager } from '@/domain/trading/uta-manager.js' -import { UnifiedTradingAccount } from '@/domain/trading/UnifiedTradingAccount.js' import { BrokerError, type OpenOrder } from '@/domain/trading/brokers/types.js' import type { FxService } from '@/domain/trading/fx-service.js' import { normalizeBrokerSearchPattern } from '@/domain/trading/contract-search-rules.js' @@ -76,20 +75,11 @@ const sourceDesc = (required: boolean, extra?: string) => { * when the schema demands them; permissive `union([number, string])` * is unnecessary and re-opens the precision-loss path that this * whole sweep was meant to close. - * - * Empty string `""` is normalized to `undefined` before validation. - * Why: when this validator is used with `.optional()`, LLMs often - * emit `""` for fields they don't intend to set (instead of omitting - * the key), and a bare `z.string().refine(...).optional()` would - * then reject the empty string against the positive-number rule. - * Treating `""` as "not provided" matches the AI-ergonomics the - * `.optional()` site actually wants. */ const positiveNumeric = z .string() .refine( (v) => { - if (v === '') return true try { return new Decimal(v).gt(0) && new Decimal(v).isFinite() } catch { @@ -98,7 +88,6 @@ const positiveNumeric = z }, { message: 'must be a positive numeric string (e.g. "0.001", "150")' }, ) - .transform((v) => (v === '' ? undefined : v)) export function createTradingTools(manager: UTAManager, fxService?: FxService): Record { return { @@ -149,22 +138,15 @@ hitting the broker, which otherwise expects the bare base ticker.`, inputSchema: z.object({ source: z.string().describe(sourceDesc(true)), symbol: z.string().optional().describe('Symbol to look up'), - aliceId: z.string().optional().describe('Contract ID (format: accountId|nativeKey, from searchContracts)'), + aliceId: z.string().optional().describe('Contract ID (format: accountId|nativeKey, from searchContracts or getPortfolio)'), secType: z.string().optional().describe('Security type filter'), currency: z.string().optional().describe('Currency filter'), }), execute: async ({ source, symbol, aliceId, secType, currency }) => { const uta = manager.resolveOne(source) - // When aliceId is provided, expand it via the broker's native-key - // resolver so the broker actually sees `localSymbol`/`symbol` — - // bare `query.aliceId = aliceId` is invisible to broker resolution. - let query: Contract - try { - query = aliceId ? uta.contractFromAliceId(aliceId) : new Contract() - } catch (err) { - return handleBrokerError(err) - } + const query = new Contract() if (symbol) query.symbol = symbol + if (aliceId) query.aliceId = aliceId if (secType) query.secType = coerceSecType(secType) if (currency) query.currency = currency const details = await uta.getContractDetails(query) @@ -236,9 +218,17 @@ If this tool returns an error with transient=true, wait a few seconds and retry const percentOfEquity = netLiqUsd.gt(0) ? mvUsd.div(netLiqUsd).mul(100) : new Decimal(0) const percentOfPortfolio = totalMarketValueUsd.gt(0) ? mvUsd.div(totalMarketValueUsd).mul(100) : new Decimal(0) allPositions.push({ - source: uta.id, symbol: pos.contract.symbol, currency: pos.currency, side: pos.side, - quantity: pos.quantity.toString(), avgCost: pos.avgCost, marketPrice: pos.marketPrice, - marketValue: pos.marketValue, unrealizedPnL: pos.unrealizedPnL, realizedPnL: pos.realizedPnL, + source: uta.id, + aliceId: pos.contract.aliceId, + symbol: pos.contract.symbol, + currency: pos.currency, + side: pos.side, + quantity: pos.quantity.toString(), + avgCost: pos.avgCost, + marketPrice: pos.marketPrice, + marketValue: pos.marketValue, + unrealizedPnL: pos.unrealizedPnL, + realizedPnL: pos.realizedPnL, percentageOfEquity: `${percentOfEquity.toFixed(1)}%`, percentageOfPortfolio: `${percentOfPortfolio.toFixed(1)}%`, }) @@ -293,24 +283,20 @@ If this tool returns an error with transient=true, wait a few seconds and retry description: `Query the latest quote/price for a contract. If this tool returns an error with transient=true, wait a few seconds and retry once before reporting to the user.`, inputSchema: z.object({ - aliceId: z.string().describe('Contract ID (format: accountId|nativeKey, from searchContracts)'), + aliceId: z.string().describe('Contract ID (format: accountId|nativeKey, from searchContracts or getPortfolio)'), source: z.string().optional().describe(sourceDesc(false)), }), execute: async ({ aliceId, source }) => { - // aliceId is UTA-scoped (`{utaId}|{nativeKey}`); route directly to the - // owning UTA. Fall back to caller-supplied `source` if given (allows - // overrides / sanity-check). `contractFromAliceId` cross-validates. - const parsed = UnifiedTradingAccount.parseAliceId(aliceId) - if (!parsed) { - return { error: `Invalid aliceId "${aliceId}". Expected format: "accountId|nativeKey".` } - } - try { - const uta = manager.resolveOne(source ?? parsed.utaId) - const contract = uta.contractFromAliceId(aliceId) - return { source: uta.id, ...await uta.getQuote(contract) } - } catch (err) { - return handleBrokerError(err) + const targets = manager.resolve(source) + if (targets.length === 0) return { error: 'No accounts available.' } + const query = new Contract() + query.aliceId = aliceId + const results: Array> = [] + for (const uta of targets) { + try { results.push({ source: uta.id, ...await uta.getQuote(query) }) } catch { /* skip */ } } + if (results.length === 0) return { error: `No account could quote aliceId "${aliceId}".` } + return results.length === 1 ? results[0] : results }, }), @@ -406,7 +392,7 @@ Required params by orderType: Optional: attach takeProfit and/or stopLoss for automatic exit orders.`, inputSchema: z.object({ source: z.string().describe(sourceDesc(true)), - aliceId: z.string().describe('Contract ID (format: accountId|nativeKey, from searchContracts)'), + aliceId: z.string().describe('Contract ID (format: accountId|nativeKey, from searchContracts or getPortfolio)'), symbol: z.string().optional().describe('Human-readable symbol (optional, for display only)'), action: z.enum(['BUY', 'SELL']).describe('Order direction'), orderType: z.enum(['MKT', 'LMT', 'STP', 'STP LMT', 'TRAIL', 'TRAIL LIMIT', 'MOC']).describe('Order type'), @@ -453,7 +439,7 @@ Optional: attach takeProfit and/or stopLoss for automatic exit orders.`, description: 'Stage a position close.\nNOTE: This stages the operation. Call tradingCommit + tradingPush to execute.', inputSchema: z.object({ source: z.string().describe(sourceDesc(true)), - aliceId: z.string().describe('Contract ID (format: accountId|nativeKey, from searchContracts)'), + aliceId: z.string().describe('Contract ID (format: accountId|nativeKey, from searchContracts or getPortfolio)'), symbol: z.string().optional().describe('Human-readable symbol. Optional.'), qty: positiveNumeric.optional().describe('Number of shares to sell. Decimal string. Default: sell all.'), }), From 607f5532807c434fc93348fc5740375f41b6a5c8 Mon Sep 17 00:00:00 2001 From: Wei Bin Date: Sun, 10 May 2026 21:36:32 +0800 Subject: [PATCH 3/5] fix(cron): move engine start earlier to prevent overwriting jobs.json --- src/main.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.ts b/src/main.ts index c9350daa5..157861fd0 100644 --- a/src/main.ts +++ b/src/main.ts @@ -124,6 +124,8 @@ async function main() { // ==================== Cron ==================== const cronEngine = createCronEngine({ registry: listenerRegistry }) + await cronEngine.start() + console.log('cron: engine started') // ==================== News Collector Store ==================== @@ -303,9 +305,7 @@ async function main() { // ==================== Activate Listeners + Start Cron Engine ==================== await listenerRegistry.start() - await cronEngine.start() console.log(`listener-registry: started (${listenerRegistry.list().length} listeners)`) - console.log('cron: engine started') // ==================== News Collector ==================== From 41b816b6f9a5e3105517897ea89c22144f7d330c Mon Sep 17 00:00:00 2001 From: Wei Bin Date: Sun, 10 May 2026 21:45:00 +0800 Subject: [PATCH 4/5] feat(cron): auto-bootstrap custom jobs and fix heartbeat types --- src/task/heartbeat/heartbeat.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/task/heartbeat/heartbeat.ts b/src/task/heartbeat/heartbeat.ts index 9978d3eba..25cf9e729 100644 --- a/src/task/heartbeat/heartbeat.ts +++ b/src/task/heartbeat/heartbeat.ts @@ -91,6 +91,8 @@ export interface HeartbeatOpts { export interface Heartbeat { start(): Promise stop(): void + /** Update heartbeat config (interval, prompt, etc.) and sync with cron job. */ + updateConfig(newConfig: HeartbeatConfig): Promise /** Hot-toggle heartbeat on/off (persists to config + updates cron job). */ setEnabled(enabled: boolean): Promise /** Current enabled state. */ @@ -102,7 +104,8 @@ export interface Heartbeat { // ==================== Factory ==================== export function createHeartbeat(opts: HeartbeatOpts): Heartbeat { - const { config, agentWorkRunner, cronEngine, registry } = opts + let { config } = opts + const { agentWorkRunner, cronEngine, registry } = opts const session = opts.session ?? new SessionStore('heartbeat') const now = opts.now ?? Date.now @@ -258,8 +261,15 @@ export function createHeartbeat(opts: HeartbeatOpts): Heartbeat { } }, + async updateConfig(newConfig: HeartbeatConfig) { + config = { ...newConfig } + enabled = config.enabled + await ensureJobAndListener() + }, + async setEnabled(newEnabled: boolean) { enabled = newEnabled + config.enabled = newEnabled // Ensure infrastructure exists (handles cold enable when start() was called with disabled) await ensureJobAndListener() From fdf8dcd5410c4233c6fced83eea25c2b39e9f2c2 Mon Sep 17 00:00:00 2001 From: Wei Bin Date: Thu, 14 May 2026 21:07:50 +0800 Subject: [PATCH 5/5] fix(heartbeat): fix updateConfig type error and use pump.setEnabled --- src/domain/trading/UnifiedTradingAccount.ts | 25 ++ src/task/heartbeat/heartbeat.spec.ts | 315 ++++++++------------ src/task/heartbeat/heartbeat.ts | 315 ++++++++------------ 3 files changed, 268 insertions(+), 387 deletions(-) diff --git a/src/domain/trading/UnifiedTradingAccount.ts b/src/domain/trading/UnifiedTradingAccount.ts index 0dbd9ddbe..6560dc3b2 100644 --- a/src/domain/trading/UnifiedTradingAccount.ts +++ b/src/domain/trading/UnifiedTradingAccount.ts @@ -359,6 +359,31 @@ export class UnifiedTradingAccount { contract.aliceId = `${this.id}|${nativeKey}` } + /** + * Reverse of `stampAliceId`: parse an aliceId, verify it belongs to this + * UTA, and rebuild the full Contract via the broker's native-key resolver. + * Throws on malformed input or cross-UTA mismatch — those are caller bugs + * (AI passing an aliceId from a different account, or stale state) and + * should surface loudly rather than silently no-op. + * + * Use this whenever an AI tool or HTTP route receives an aliceId from the + * outside and needs to call a broker read API (getQuote, getOrderBook, + * getFundingRate, getContractDetails). The staging methods below also + * funnel through here for consistency. + */ + contractFromAliceId(aliceId: string): Contract { + const parsed = UnifiedTradingAccount.parseAliceId(aliceId) + if (!parsed) { + throw new Error(`Invalid aliceId "${aliceId}". Use searchContracts to get a valid contract identifier (expected format: "accountId|nativeKey").`) + } + if (parsed.utaId !== this.id) { + throw new Error(`aliceId "${aliceId}" belongs to UTA "${parsed.utaId}", not "${this.id}".`) + } + const contract = this.broker.resolveNativeKey(parsed.nativeKey) + contract.aliceId = aliceId + return contract + } + /** Parse aliceId → { utaId, nativeKey }, or null if invalid. */ static parseAliceId(aliceId: string): { utaId: string; nativeKey: string } | null { const sep = aliceId.indexOf('|') diff --git a/src/task/heartbeat/heartbeat.spec.ts b/src/task/heartbeat/heartbeat.spec.ts index e69b77631..ad5d3be84 100644 --- a/src/task/heartbeat/heartbeat.spec.ts +++ b/src/task/heartbeat/heartbeat.spec.ts @@ -1,24 +1,19 @@ /** - * Heartbeat tests — exercises the full trigger-source pipeline: + * Heartbeat tests — Pump-driven trigger source. * - * cron.fire (__heartbeat__) - * → handleFire() - * → AgentWorkRunner.run() - * → inputGate (active-hours) - * → AI invocation - * → outputGate (notify_user inspection + dedup) - * → connectorCenter.notify (optional) - * → emit done / skip / error + * Post-Pump refactor, heartbeat no longer subscribes to cron.fire. + * It owns a private Pump. Tests trigger ticks via `heartbeat.runNow()` + * (which delegates to `pump.runNow()`) rather than `cronEngine.runNow()`. * - * The legacy STATUS regex protocol is gone. Heartbeat now signals - * notification intent via the `notify_user` tool — these tests mock - * the AgentCenter result to include or omit the tool call, and assert - * on the resulting events. - * - * AgentWork primitive coverage lives in `src/core/agent-work.spec.ts`; - * this file tests heartbeat-specific behaviours: cron job lifecycle, - * active-hours filtering, dedup window, hot enable/disable, and the - * heartbeat-specific outputGate semantics. + * The full pipeline test path: + * heartbeat.runNow() + * → pump.runNow() → onTick + * → active-hours pre-filter (skip → emit agent.work.skip directly) + * → producer.emit('agent.work.requested') for the canonical event + * → agent-work-listener picks up the request + * → source-config-driven AgentWorkRunner.run() + * → notify_user inspection + dedup gate + * → emit agent.work.{done,skip,error} */ import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest' @@ -27,12 +22,10 @@ import { tmpdir } from 'node:os' import { randomUUID } from 'node:crypto' import { createEventLog, type EventLog } from '../../core/event-log.js' import { createListenerRegistry, type ListenerRegistry } from '../../core/listener-registry.js' -import { createCronEngine, type CronEngine } from '../cron/engine.js' import { createHeartbeat, isWithinActiveHours, HeartbeatDedup, - HEARTBEAT_JOB_NAME, type Heartbeat, type HeartbeatConfig, } from './heartbeat.js' @@ -40,9 +33,14 @@ import { SessionStore } from '../../core/session.js' import { ConnectorCenter } from '../../core/connector-center.js' import { createMemoryNotificationsStore } from '../../core/notifications-store.js' import { AgentWorkRunner } from '../../core/agent-work.js' +import { createAgentWorkListener, type AgentWorkListener } from '../../core/agent-work-listener.js' import type { ToolCallSummary } from '../../ai-providers/types.js' +import type { + AgentWorkDonePayload, + AgentWorkSkipPayload, + AgentWorkErrorPayload, +} from '../../core/agent-event.js' -// Mock writeConfigSection to avoid disk writes in tests vi.mock('../../core/config.js', () => ({ writeConfigSection: vi.fn(async () => ({})), })) @@ -62,10 +60,6 @@ function makeConfig(overrides: Partial = {}): HeartbeatConfig { } // ==================== Mock Engine ==================== -// -// Returns `{ text, media, toolCalls }` from `askWithSession`. The -// runner unwraps these as ProviderResult; toolCalls is what the -// heartbeat outputGate inspects for notify_user invocations. interface MockEngineState { text: string @@ -80,21 +74,14 @@ function createMockEngine(initial: Partial = {}) { shouldThrow: null, ...initial, } - return { state, setNotifyUserCall(text: string) { state.toolCalls = [{ id: randomUUID(), name: 'notify_user', input: { text } }] }, - setNoToolCall() { - state.toolCalls = [] - }, - setRawText(text: string) { - state.text = text - }, - setShouldThrow(err: Error | null) { - state.shouldThrow = err - }, + setNoToolCall() { state.toolCalls = [] }, + setRawText(text: string) { state.text = text }, + setShouldThrow(err: Error | null) { state.shouldThrow = err }, askWithSession: vi.fn(async () => { if (state.shouldThrow) throw state.shouldThrow return { text: state.text, media: [], toolCalls: state.toolCalls } @@ -108,94 +95,63 @@ function createMockEngine(initial: Partial = {}) { describe('heartbeat', () => { let eventLog: EventLog let listenerRegistry: ListenerRegistry - let cronEngine: CronEngine let heartbeat: Heartbeat let mockEngine: ReturnType let session: SessionStore let connectorCenter: ConnectorCenter let notificationsStore: ReturnType - let agentWorkRunner: AgentWorkRunner + let agentWorkListener: AgentWorkListener beforeEach(async () => { - const logPath = tempPath('jsonl') - const storePath = tempPath('json') - eventLog = await createEventLog({ logPath }) + eventLog = await createEventLog({ logPath: tempPath('jsonl') }) listenerRegistry = createListenerRegistry(eventLog) await listenerRegistry.start() - cronEngine = createCronEngine({ registry: listenerRegistry, storePath }) - await cronEngine.start() mockEngine = createMockEngine() session = new SessionStore(`test/heartbeat-${randomUUID()}`) notificationsStore = createMemoryNotificationsStore() connectorCenter = new ConnectorCenter({ notificationsStore }) - agentWorkRunner = new AgentWorkRunner({ + const runner = new AgentWorkRunner({ agentCenter: mockEngine as never, connectorCenter, }) + agentWorkListener = createAgentWorkListener({ runner, registry: listenerRegistry }) + await agentWorkListener.start() }) afterEach(async () => { heartbeat?.stop() - cronEngine.stop() + agentWorkListener.stop() await listenerRegistry.stop() await eventLog._resetForTest() }) - // ==================== Start / Idempotency ==================== + // ==================== Lifecycle ==================== - describe('start', () => { - it('should register a cron job on start', async () => { + describe('lifecycle', () => { + it('start() is idempotent', async () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) - await heartbeat.start() - - const jobs = cronEngine.list() - expect(jobs).toHaveLength(1) - expect(jobs[0].name).toBe(HEARTBEAT_JOB_NAME) - expect(jobs[0].schedule).toEqual({ kind: 'every', every: '30m' }) + await heartbeat.start() // no error }) - it('should be idempotent (update existing job, not create duplicate)', async () => { - heartbeat = createHeartbeat({ - config: makeConfig({ every: '30m' }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, - }) - await heartbeat.start() - heartbeat.stop() - - heartbeat = createHeartbeat({ - config: makeConfig({ every: '1h' }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, - }) - await heartbeat.start() - - const jobs = cronEngine.list() - expect(jobs).toHaveLength(1) - expect(jobs[0].schedule).toEqual({ kind: 'every', every: '1h' }) - }) - - it('should register disabled job when config.enabled is false', async () => { + it('start() respects config.enabled', async () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - const jobs = cronEngine.list() - expect(jobs).toHaveLength(1) - expect(jobs[0].enabled).toBe(false) expect(heartbeat.isEnabled()).toBe(false) }) }) - // ==================== Event Handling: notify_user contract ==================== + // ==================== Event Handling ==================== describe('event handling', () => { - it('delivers when AI invokes notify_user', async () => { + it('delivers when AI calls notify_user', async () => { const delivered: string[] = [] notificationsStore.onAppended((entry) => { delivered.push(entry.text) }) @@ -203,44 +159,40 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(1) }) expect(delivered).toEqual(['BTC dropped 5% to $87,200']) - const done = eventLog.recent({ type: 'heartbeat.done' }) - expect(done[0].payload).toMatchObject({ - reply: 'BTC dropped 5% to $87,200', - delivered: true, - }) + const done = eventLog.recent({ type: 'agent.work.done' })[0].payload as AgentWorkDonePayload + expect(done.source).toBe('heartbeat') + expect(done.delivered).toBe(true) }) it('skips with reason=ack when AI does not call notify_user', async () => { - mockEngine.setRawText('Checked. Nothing notable in the last 30 minutes.') + mockEngine.setRawText('Checked, nothing notable.') mockEngine.setNoToolCall() heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) }) - const skips = eventLog.recent({ type: 'heartbeat.skip' }) - expect(skips[0].payload).toMatchObject({ reason: 'ack' }) - // No notify, no done - expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(0) + const skip = eventLog.recent({ type: 'agent.work.skip' })[0].payload as AgentWorkSkipPayload + expect(skip.source).toBe('heartbeat') + expect(skip.reason).toBe('ack') + expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(0) }) it('skips with reason=empty when notify_user.text is blank', async () => { @@ -248,52 +200,52 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) }) - expect((eventLog.recent({ type: 'heartbeat.skip' })[0].payload as { reason: string }).reason).toBe('empty') + const skip = eventLog.recent({ type: 'agent.work.skip' })[0].payload as AgentWorkSkipPayload + expect(skip.reason).toBe('empty') }) - it('does NOT regex-parse the AI response — STATUS-shaped text without notify_user is still skipped', async () => { - // Old protocol response — must NOT trigger any notification under - // the new contract. The AI must call the tool to deliver. - mockEngine.setRawText('STATUS: CHAT_YES\nREASON: x\nCONTENT: this should NOT be delivered') + it('does NOT regex-parse STATUS-shaped raw text — anti-regression', async () => { + mockEngine.setRawText('STATUS: CHAT_YES\nCONTENT: should NOT be delivered') mockEngine.setNoToolCall() heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) }) const { entries } = await notificationsStore.read() expect(entries).toHaveLength(0) }) - it('ignores non-heartbeat cron.fire events', async () => { + it('no longer subscribes to cron.fire (decoupled from cron-engine)', async () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() + // Fire a cron.fire event with the legacy __heartbeat__ jobName. + // Pre-refactor, this would have driven heartbeat. Post-refactor, + // heartbeat is fully decoupled — no AI call should happen. await eventLog.append('cron.fire', { - jobId: 'other-job', - jobName: 'check-eth', - payload: 'Check ETH price', + jobId: 'legacy-id', + jobName: '__heartbeat__', + payload: 'should be ignored', }) await new Promise((r) => setTimeout(r, 50)) @@ -304,27 +256,30 @@ describe('heartbeat', () => { // ==================== Active Hours ==================== describe('active hours', () => { - it('skips when outside active hours, without invoking AI', async () => { + it('emits agent.work.skip with reason=outside-active-hours, without invoking AI', async () => { const fakeNow = new Date('2025-06-15T03:00:00').getTime() // 3 AM local heartbeat = createHeartbeat({ config: makeConfig({ activeHours: { start: '09:00', end: '22:00', timezone: 'local' }, }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, now: () => fakeNow, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.skip' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.skip' })).toHaveLength(1) }) - const skips = eventLog.recent({ type: 'heartbeat.skip' }) - expect((skips[0].payload as { reason: string }).reason).toBe('outside-active-hours') + const skip = eventLog.recent({ type: 'agent.work.skip' })[0].payload as AgentWorkSkipPayload + expect(skip.source).toBe('heartbeat') + expect(skip.reason).toBe('outside-active-hours') expect(mockEngine.askWithSession).not.toHaveBeenCalled() + // No agent.work.requested emitted (pre-emit gate) + const reqs = eventLog.recent({ type: 'agent.work.requested' }) + expect(reqs.filter(e => (e.payload as { source: string }).source === 'heartbeat')).toHaveLength(0) }) }) @@ -339,23 +294,19 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - const jobId = cronEngine.list()[0].id - - // First fire — delivered - await cronEngine.runNow(jobId) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(1) }) - // Second fire (same notify_user text) — should be deduped - await cronEngine.runNow(jobId) + await heartbeat.runNow() await vi.waitFor(() => { - const skips = eventLog.recent({ type: 'heartbeat.skip' }) - expect(skips.some((s) => (s.payload as { reason: string }).reason === 'duplicate')).toBe(true) + const skips = eventLog.recent({ type: 'agent.work.skip' }) + expect(skips.some(s => (s.payload as AgentWorkSkipPayload).reason === 'duplicate')).toBe(true) }) expect(delivered).toHaveLength(1) @@ -367,22 +318,17 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - const jobId = cronEngine.list()[0].id mockEngine.setNotifyUserCall('First alert') - await cronEngine.runNow(jobId) - await vi.waitFor(() => { - expect(delivered).toHaveLength(1) - }) + await heartbeat.runNow() + await vi.waitFor(() => { expect(delivered).toHaveLength(1) }) mockEngine.setNotifyUserCall('Second different alert') - await cronEngine.runNow(jobId) - await vi.waitFor(() => { - expect(delivered).toHaveLength(2) - }) + await heartbeat.runNow() + await vi.waitFor(() => { expect(delivered).toHaveLength(2) }) expect(delivered).toEqual(['First alert', 'Second different alert']) }) @@ -391,101 +337,91 @@ describe('heartbeat', () => { // ==================== Error Handling ==================== describe('error handling', () => { - it('emits heartbeat.error on AI failure', async () => { + it('emits agent.work.error on AI failure', async () => { mockEngine.setShouldThrow(new Error('AI down')) heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.error' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.error' })).toHaveLength(1) }) - const errors = eventLog.recent({ type: 'heartbeat.error' }) - expect(errors[0].payload).toMatchObject({ error: 'AI down' }) + const err = eventLog.recent({ type: 'agent.work.error' })[0].payload as AgentWorkErrorPayload + expect(err.source).toBe('heartbeat') + expect(err.error).toBe('AI down') }) - it('handles notify failure gracefully — emits done with delivered=false', async () => { + it('handles notify failure — emits done with delivered=false', async () => { mockEngine.setNotifyUserCall('alert text') - // Force the underlying append to reject. The runner should still - // emit done with delivered=false; the listener should not crash. const originalAppend = notificationsStore.append.bind(notificationsStore) notificationsStore.append = async () => { throw new Error('store failed') } heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await vi.waitFor(() => { - expect(eventLog.recent({ type: 'heartbeat.done' })).toHaveLength(1) + expect(eventLog.recent({ type: 'agent.work.done' })).toHaveLength(1) }) - const done = eventLog.recent({ type: 'heartbeat.done' }) - expect((done[0].payload as { delivered: boolean }).delivered).toBe(false) + const done = eventLog.recent({ type: 'agent.work.done' })[0].payload as AgentWorkDonePayload + expect(done.delivered).toBe(false) notificationsStore.append = originalAppend }) }) - // ==================== Lifecycle ==================== + // ==================== stop ==================== - describe('lifecycle', () => { - it('stops listening after stop()', async () => { + describe('stop', () => { + it('runNow is a no-op after stop()', async () => { heartbeat = createHeartbeat({ config: makeConfig(), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() heartbeat.stop() - await cronEngine.runNow(cronEngine.list()[0].id) + await heartbeat.runNow() await new Promise((r) => setTimeout(r, 50)) expect(mockEngine.askWithSession).not.toHaveBeenCalled() }) }) - // ==================== setEnabled / isEnabled ==================== + // ==================== setEnabled ==================== describe('setEnabled', () => { it('enables a previously disabled heartbeat', async () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - expect(heartbeat.isEnabled()).toBe(false) - expect(cronEngine.list()[0].enabled).toBe(false) await heartbeat.setEnabled(true) - expect(heartbeat.isEnabled()).toBe(true) - expect(cronEngine.list()[0].enabled).toBe(true) }) it('disables an enabled heartbeat', async () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: true }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - expect(heartbeat.isEnabled()).toBe(true) await heartbeat.setEnabled(false) - expect(heartbeat.isEnabled()).toBe(false) - expect(cronEngine.list()[0].enabled).toBe(false) }) it('persists config via writeConfigSection', async () => { @@ -493,7 +429,7 @@ describe('heartbeat', () => { heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() await heartbeat.setEnabled(true) @@ -504,25 +440,22 @@ describe('heartbeat', () => { ) }) - it('allows firing after setEnabled(true)', async () => { + it('runNow ignores the enabled flag (always fires for manual trigger)', async () => { const delivered: string[] = [] notificationsStore.onAppended((entry) => { delivered.push(entry.text) }) - mockEngine.setNotifyUserCall('after-enable') + mockEngine.setNotifyUserCall('manual-fire') heartbeat = createHeartbeat({ config: makeConfig({ enabled: false }), - agentWorkRunner, cronEngine, registry: listenerRegistry, session, + agentWorkListener, registry: listenerRegistry, session, }) await heartbeat.start() - await heartbeat.setEnabled(true) + // Even though enabled=false, manual runNow should still work + await heartbeat.runNow() - await cronEngine.runNow(cronEngine.list()[0].id) - - await vi.waitFor(() => { - expect(delivered).toHaveLength(1) - }) - expect(delivered[0]).toBe('after-enable') + await vi.waitFor(() => { expect(delivered).toHaveLength(1) }) + expect(delivered[0]).toBe('manual-fire') }) }) }) @@ -537,16 +470,14 @@ describe('isWithinActiveHours', () => { it('returns true within normal range', () => { const ts = todayAt(15, 0).getTime() expect(isWithinActiveHours( - { start: '09:00', end: '22:00', timezone: 'local' }, - ts, + { start: '09:00', end: '22:00', timezone: 'local' }, ts, )).toBe(true) }) it('returns false outside normal range', () => { const ts = todayAt(3, 0).getTime() expect(isWithinActiveHours( - { start: '09:00', end: '22:00', timezone: 'local' }, - ts, + { start: '09:00', end: '22:00', timezone: 'local' }, ts, )).toBe(false) }) @@ -555,12 +486,10 @@ describe('isWithinActiveHours', () => { { start: '22:00', end: '06:00', timezone: 'local' }, todayAt(23, 0).getTime(), )).toBe(true) - expect(isWithinActiveHours( { start: '22:00', end: '06:00', timezone: 'local' }, todayAt(3, 0).getTime(), )).toBe(true) - expect(isWithinActiveHours( { start: '22:00', end: '06:00', timezone: 'local' }, todayAt(12, 0).getTime(), @@ -600,7 +529,7 @@ describe('HeartbeatDedup', () => { expect(d.isDuplicate('world', 500)).toBe(false) }) - it('exposes lastText (load-bearing for buildDonePayload)', () => { + it('exposes lastText', () => { const d = new HeartbeatDedup() expect(d.lastText).toBeNull() d.record('first', 100) diff --git a/src/task/heartbeat/heartbeat.ts b/src/task/heartbeat/heartbeat.ts index 25cf9e729..fa22e511a 100644 --- a/src/task/heartbeat/heartbeat.ts +++ b/src/task/heartbeat/heartbeat.ts @@ -1,47 +1,41 @@ /** - * Heartbeat — periodic AI self-check, built on top of the cron engine. + * Heartbeat — periodic Alice self-check, Pump-driven. * - * Registers a cron job (`__heartbeat__`) that fires at a configured - * interval. Each fire is submitted to AgentWorkRunner with two gates: + * Heartbeat is a recurring "ping Alice every N minutes" service. Prior + * to this commit, it piggy-backed on the cron engine: registered an + * internal `__heartbeat__` cron job, subscribed to `cron.fire` filtered + * by jobName, did its work in the handler. That was conceptual debt — + * the cron engine should be reserved for user-defined cron jobs from + * the Automation > Cron UI, and heartbeat's lifecycle (active-hours, + * dedup, hot enable/disable, configured prompt) doesn't belong in a + * "user cron job" shape. * - * - **inputGate**: active-hours filter — skip without spending tokens - * when outside the configured window - * - **outputGate**: inspect AI's tool calls — if `notify_user` was - * invoked, deliver its `text` arg (after dedup); otherwise skip - * silently with reason='ack' - * - **onDelivered**: record dedup state on successful delivery + * Now: heartbeat owns a private Pump for its schedule and a + * ProducerHandle for `agent.work.{requested,skip}` emits. The cron + * engine is no longer in its dependency graph. * - * Replaces the legacy STATUS regex protocol (`STATUS: HEARTBEAT_OK | - * CHAT_YES + CONTENT: ...`) with structured tool-call signalling. The - * runner-side gate handles dedup before the notification reaches - * connectors, which means duplicate suppression and active-hours - * filtering are uniform across configurations. + * On each tick: + * 1. Active-hours pre-filter. Outside hours → emit + * `agent.work.skip { source: 'heartbeat', reason: 'outside-active-hours' }` + * and return; AI is never invoked, no token cost. + * 2. Otherwise emit `agent.work.requested { source: 'heartbeat', + * prompt }`. The agent-work-listener routes it through the + * heartbeat source config (notify_user inspection + dedup gate) + * registered at start(). * - * Events emitted: - * - heartbeat.done { reply, reason, durationMs, delivered } - * - heartbeat.skip { reason, parsedReason? } - * - heartbeat.error { error, durationMs } - * - * Heartbeat-specific state stays in this module: - * - `HeartbeatDedup` — in-memory 24h window - * - `__heartbeat__` cron job lifecycle (idempotent add/update, - * hot-toggle via setEnabled) - * - active-hours config + tz-aware time-of-day check + * State heartbeat owns: HeartbeatDedup (24h window), active-hours + * config, the Pump, the ProducerHandle, the source config registered + * with agent-work-listener. AgentWork pipeline state (sessions, + * AI invocation) lives elsewhere. */ -import type { AgentWorkRunner, AgentWorkResultProbe } from '../../core/agent-work.js' -import type { Listener } from '../../core/listener.js' -import type { ListenerRegistry } from '../../core/listener-registry.js' import { SessionStore } from '../../core/session.js' import { writeConfigSection } from '../../core/config.js' -import type { CronEngine } from '../cron/engine.js' - -const HEARTBEAT_EMITS = ['heartbeat.done', 'heartbeat.skip', 'heartbeat.error'] as const -type HeartbeatEmits = typeof HEARTBEAT_EMITS - -// ==================== Constants ==================== - -export const HEARTBEAT_JOB_NAME = '__heartbeat__' +import type { ListenerRegistry } from '../../core/listener-registry.js' +import type { ProducerHandle } from '../../core/producer.js' +import { createPump, type Pump } from '../../core/pump.js' +import type { AgentWorkListener, AgentWorkSourceConfig } from '../../core/agent-work-listener.js' +import type { AgentWorkResultProbe } from '../../core/agent-work.js' // ==================== Config ==================== @@ -78,9 +72,12 @@ In short: export interface HeartbeatOpts { config: HeartbeatConfig - agentWorkRunner: AgentWorkRunner - cronEngine: CronEngine - /** Registry to auto-register the heartbeat listener with. */ + /** Where to register the heartbeat source config so the agent-work + * pipeline knows how to handle heartbeat-sourced requests. */ + agentWorkListener: AgentWorkListener + /** Listener registry — used to declare the heartbeat producer so its + * agent.work.{requested,skip} emits are validated + show in the + * topology graph. */ registry: ListenerRegistry /** Optional: inject a session for testing. */ session?: SessionStore @@ -91,196 +88,133 @@ export interface HeartbeatOpts { export interface Heartbeat { start(): Promise stop(): void - /** Update heartbeat config (interval, prompt, etc.) and sync with cron job. */ + /** Update heartbeat config (interval, prompt, etc.) and sync with pump. */ updateConfig(newConfig: HeartbeatConfig): Promise - /** Hot-toggle heartbeat on/off (persists to config + updates cron job). */ + /** Hot-toggle heartbeat on/off (persists to config + updates pump). */ setEnabled(enabled: boolean): Promise /** Current enabled state. */ isEnabled(): boolean - /** Expose the raw listener for direct testing. */ - readonly listener: Listener<'cron.fire', HeartbeatEmits> + /** Manually trigger a heartbeat tick — used by tests and "run now" UI. */ + runNow(): Promise } // ==================== Factory ==================== export function createHeartbeat(opts: HeartbeatOpts): Heartbeat { let { config } = opts - const { agentWorkRunner, cronEngine, registry } = opts + const { agentWorkListener, registry } = opts const session = opts.session ?? new SessionStore('heartbeat') const now = opts.now ?? Date.now - let jobId: string | null = null - let processing = false let enabled = config.enabled - let registered = false + let started = false + let producer: ProducerHandle | null = null + let pump: Pump | null = null const dedup = new HeartbeatDedup() - const listener: Listener<'cron.fire', HeartbeatEmits> = { - name: 'heartbeat', - subscribes: 'cron.fire', - emits: HEARTBEAT_EMITS, - async handle(entry, ctx) { - const payload = entry.payload - - // Filter to our own cron job - if (payload.jobName !== HEARTBEAT_JOB_NAME) return - - // Serial — preserve today's behaviour. Concurrent heartbeats would - // be ambiguous wrt dedup state. - if (processing) return - - processing = true - const startMs = now() - console.log(`heartbeat: firing at ${new Date(startMs).toISOString()}`) - try { - const result = await agentWorkRunner.run( - { - prompt: payload.payload, - session, - preamble: - 'You are operating in the heartbeat monitoring context (session: heartbeat). The following is the recent heartbeat conversation history.', - metadata: { source: 'heartbeat' }, - - // ---- inputGate: active-hours guard ---- - inputGate: () => - isWithinActiveHours(config.activeHours, now()) - ? null - : { - reason: 'outside-active-hours', - payload: { reason: 'outside-active-hours' }, - }, - - // ---- outputGate: notify_user inspection + dedup ---- - outputGate: (probe: AgentWorkResultProbe) => { - const call = probe.toolCalls.find((c) => c.name === 'notify_user') - if (!call) { - return { - kind: 'skip', - reason: 'ack', - payload: { reason: 'ack' }, - } - } - const text = ((call.input ?? {}) as { text?: string }).text ?? '' - if (!text.trim()) { - return { - kind: 'skip', - reason: 'empty', - payload: { reason: 'empty' }, - } - } - if (dedup.isDuplicate(text, now())) { - return { - kind: 'skip', - reason: 'duplicate', - payload: { reason: 'duplicate', parsedReason: text.slice(0, 80) }, - } - } - return { kind: 'deliver', text, media: probe.media } - }, - - // ---- onDelivered: record dedup state ---- - onDelivered: (text) => dedup.record(text, now()), - - emitNames: { - done: 'heartbeat.done', - skip: 'heartbeat.skip', - error: 'heartbeat.error', - }, - buildDonePayload: (_req, _result, durationMs, delivered) => { - // Look up what we actually delivered (the text the AI passed - // through notify_user). The runner already invoked notify with - // the gate's chosen text; for the done payload we re-derive it - // from the dedup state — `dedup.lastText` is what we just sent. - const reply = dedup.lastText ?? '' - return { - reply, - reason: 'notify_user', - durationMs, - delivered, - } - }, - buildErrorPayload: (_req, err, durationMs) => ({ - error: err.message, - durationMs, - }), - }, - ctx.emit as never, - ) - - const durationMs = now() - startMs - console.log( - `heartbeat: ${result.outcome}` + - (result.skipReason ? ` reason=${result.skipReason}` : '') + - ` (${durationMs}ms)`, - ) - } finally { - processing = false + // ---- Source config (registered with agent-work-listener) ---- + // + // Output-side semantics (notify_user inspection + dedup gate) live + // here, closing over the dedup instance heartbeat owns. The + // agent-work-listener calls these when an agent.work.requested event + // with source='heartbeat' arrives. + const sourceConfig: AgentWorkSourceConfig = { + source: 'heartbeat', + session, + preamble: () => + 'You are operating in the heartbeat monitoring context (session: heartbeat). The following is the recent heartbeat conversation history.', + outputGate: (probe: AgentWorkResultProbe) => { + const call = probe.toolCalls.find((c) => c.name === 'notify_user') + if (!call) { + return { kind: 'skip', reason: 'ack', payload: { reason: 'ack' } } + } + const text = ((call.input ?? {}) as { text?: string }).text ?? '' + if (!text.trim()) { + return { kind: 'skip', reason: 'empty', payload: { reason: 'empty' } } + } + if (dedup.isDuplicate(text, now())) { + return { + kind: 'skip', + reason: 'duplicate', + payload: { reason: 'duplicate', parsedReason: text.slice(0, 80) }, + } } + return { kind: 'deliver', text, media: probe.media } }, + onDelivered: (text) => dedup.record(text, now()), } - /** Ensure the cron job exists and listener is registered (idempotent). */ - async function ensureJobAndListener(): Promise { - const existing = cronEngine.list().find((j) => j.name === HEARTBEAT_JOB_NAME) - if (existing) { - jobId = existing.id - await cronEngine.update(existing.id, { - schedule: { kind: 'every', every: config.every }, - payload: config.prompt, - enabled, - }) - } else { - jobId = await cronEngine.add({ - name: HEARTBEAT_JOB_NAME, - schedule: { kind: 'every', every: config.every }, - payload: config.prompt, - enabled, + /** The pump's tick callback — active-hours guard then emit. */ + async function onTick(): Promise { + const startMs = now() + console.log(`heartbeat: firing at ${new Date(startMs).toISOString()}`) + + if (!isWithinActiveHours(config.activeHours, now())) { + await producer!.emit('agent.work.skip', { + source: 'heartbeat', + reason: 'outside-active-hours', }) + console.log(`heartbeat: skipped (outside-active-hours)`) + return } - if (!registered) { - registry.register(listener) - registered = true - } + await producer!.emit('agent.work.requested', { + source: 'heartbeat', + prompt: config.prompt, + }) } return { - listener, async start() { - // Always register job + listener (even if disabled) so setEnabled can toggle later - await ensureJobAndListener() + if (started) return + started = true + + producer = registry.declareProducer({ + name: 'heartbeat', + emits: ['agent.work.requested', 'agent.work.skip'] as const, + }) + agentWorkListener.registerSource(sourceConfig) + + pump = createPump({ + name: 'heartbeat', + every: config.every, + enabled, + onTick, + }) + pump.start() }, stop() { - // Unregister the listener so a subsequent start() re-registers cleanly. - // Don't delete the cron job — it persists for restart recovery. - if (registered) { - registry.unregister(listener.name) - registered = false - } + if (!started) return + pump?.stop() + pump = null + producer?.dispose() + producer = null + started = false }, async updateConfig(newConfig: HeartbeatConfig) { config = { ...newConfig } enabled = config.enabled - await ensureJobAndListener() + if (pump) { + pump.setEnabled(enabled) + } }, async setEnabled(newEnabled: boolean) { enabled = newEnabled - config.enabled = newEnabled - - // Ensure infrastructure exists (handles cold enable when start() was called with disabled) - await ensureJobAndListener() - - // Persist to config file + pump?.setEnabled(newEnabled) await writeConfigSection('heartbeat', { ...config, enabled: newEnabled }) }, isEnabled() { return enabled }, + + async runNow() { + if (pump) await pump.runNow() + }, } } @@ -304,12 +238,9 @@ export function isWithinActiveHours( const nowMinutes = currentMinutesInTimezone(timezone, nowMs) - // Normal range (e.g. 09:00 → 22:00) if (startMinutes <= endMinutes) { return nowMinutes >= startMinutes && nowMinutes < endMinutes } - - // Overnight range (e.g. 22:00 → 06:00) return nowMinutes >= startMinutes || nowMinutes < endMinutes } @@ -324,11 +255,9 @@ function parseHHMM(s: string): number | null { function currentMinutesInTimezone(tz: string, nowMs?: number): number { const date = nowMs ? new Date(nowMs) : new Date() - if (tz === 'local') { return date.getHours() * 60 + date.getMinutes() } - try { const fmt = new Intl.DateTimeFormat('en-US', { timeZone: tz, @@ -348,15 +277,13 @@ function currentMinutesInTimezone(tz: string, nowMs?: number): number { // ==================== Dedup ==================== /** - * Suppress identical heartbeat messages within a time window (default 24h). - * - * In-memory only — restart loses dedup state. Acceptable trade-off: - * heartbeat fires every ~30m by default, so a restart-window - * collision is rare and the cost (one duplicate notification) is low. + * Suppress identical heartbeat notify_user texts within a time window + * (default 24h). In-memory only — restart loses dedup state. Acceptable + * trade-off: heartbeats are coarse-grained (~30m), restart-window + * collisions are rare, single-duplicate cost is low. */ export class HeartbeatDedup { - /** Public for the heartbeat factory's `buildDonePayload` to read the - * most-recently-delivered text without an extra signal channel. */ + /** Public for callers that want to inspect the last-delivered text. */ public lastText: string | null = null private lastSentAt = 0 private windowMs: number