Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .github/workflows/k8s-validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ on:
push:
paths:
- 'deploy/k8s/**'
- 'deploy/monitoring/**'
- '.github/workflows/k8s-validate.yml'
pull_request:
paths:
- 'deploy/k8s/**'
- 'deploy/monitoring/**'
- '.github/workflows/k8s-validate.yml'

jobs:
Expand All @@ -25,3 +27,14 @@ jobs:
- name: Validate manifests
run: |
kubeconform -summary -ignore-missing-schemas deploy/k8s/*.yaml

- name: Install promtool
run: |
VERSION="2.54.1"
curl -sSL "https://github.com/prometheus/prometheus/releases/download/v${VERSION}/prometheus-${VERSION}.linux-amd64.tar.gz" \
| tar xz --strip-components=1 "prometheus-${VERSION}.linux-amd64/promtool"
sudo mv promtool /usr/local/bin/

- name: Validate Prometheus alert rules
run: |
promtool check rules deploy/monitoring/prometheus/alert-rules.yaml
25 changes: 23 additions & 2 deletions deploy/monitoring/prometheus/alert-rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,35 @@ groups:
summary: "Event cursor lag critical"
description: "Cursor lag is {{ $value }} ledgers (> 100)"

- alert: DLQSizeCritical
- alert: AgentLoopConsecutiveFailures
expr: increase(agent_loop_errors_total[5m]) > 3
for: 2m
labels:
severity: critical
annotations:
summary: "Agent loop consecutive failures detected"
description: "Agent loop has failed more than 3 times in 5 minutes"
runbook_url: "https://github.com/anomalyco/neurowealth/wiki/runbooks/agent-loop-failures"

- alert: AgentLoopStalled
expr: time() - agent_loop_last_success_timestamp > 600
for: 2m
labels:
severity: critical
annotations:
summary: "Agent loop has stalled"
description: "No successful agent loop tick for {{ $value }}s (> 600s)"
runbook_url: "https://github.com/anomalyco/neurowealth/wiki/runbooks/agent-loop-stalled"

- alert: DLQDepthHigh
expr: dlq_size > 50
for: 1m
labels:
severity: critical
annotations:
summary: "Dead Letter Queue critically large"
summary: "Dead Letter Queue depth high"
description: "DLQ has {{ $value }} events (> 50)"
runbook_url: "https://github.com/anomalyco/neurowealth/wiki/runbooks/dlq-clearance"

- alert: HighFailureRate
expr: rate(failures_total[5m]) > 10
Expand Down
2 changes: 1 addition & 1 deletion eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export default [
},
rules: {
...tseslint.configs.recommended.rules,
'@typescript-eslint/no-explicit-any': 'off',
'@typescript-eslint/no-explicit-any': 'error',
'@typescript-eslint/no-require-imports': 'off',
'@typescript-eslint/no-unused-vars': 'off',
'@typescript-eslint/no-unsafe-function-type': 'off',
Expand Down
22 changes: 12 additions & 10 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum AgentAction {
REBALANCE
ANALYZE
ALERT
SCAN
CLAIM_YIELD
}

Expand Down Expand Up @@ -95,16 +96,17 @@ model Session {
}

model AdminApiKey {
id String @id @default(uuid())
name String @unique
role String
scopes String[]
hash String
expiresAt DateTime?
revokedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
lastUsedAt DateTime?
id String @id @default(uuid())
name String @unique
role String
scopes String[]
hash String
tokenPrefix String?
expiresAt DateTime?
revokedAt DateTime?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
lastUsedAt DateTime?

auditLogs AdminAuditLog[]

Expand Down
26 changes: 16 additions & 10 deletions src/agent/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import {
recordRebalanceTriggered,
recordDbOperation,
recordBackgroundJob,
recordExternalServiceError
recordExternalServiceError,
recordAgentLoopError,
recordAgentLoopSuccess
} from '../utils/metrics';

let isRunning = false;
Expand Down Expand Up @@ -110,7 +112,7 @@ async function rebalanceCheckJob(): Promise<void> {
for (const [protocol, protocolPositions] of byProtocol.entries()) {
const result = await executeRebalanceIfNeeded(
protocol,
protocolPositions.map((p: any) => ({
protocolPositions.map((p) => ({
id: p.id,
amount: p.currentValue.toString(),
})),
Expand Down Expand Up @@ -141,6 +143,7 @@ async function rebalanceCheckJob(): Promise<void> {
recordDbOperation('rebalance_check', duration);

lastError = null;
recordAgentLoopSuccess();
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
lastError = errorMessage;
Expand All @@ -150,6 +153,7 @@ async function rebalanceCheckJob(): Promise<void> {
error: errorMessage,
});

recordAgentLoopError();
recordRebalanceCheck('failed');
recordDbOperation('rebalance_check', duration);

Expand Down Expand Up @@ -196,18 +200,20 @@ async function snapshotJob(): Promise<void> {
// Record Prometheus metrics
recordDbOperation('snapshot_job', duration / 1000);
recordBackgroundJob('snapshot', 'success', duration / 1000);
recordAgentLoopSuccess();
logger.info(`${jobName} scheduled`, { duration });
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
const duration = Date.now() - startTime;
logger.error(`${jobName} failed`, {
error: errorMessage,
duration,
});
// Record Prometheus metrics
recordDbOperation('snapshot_job', duration / 1000);
recordBackgroundJob('snapshot', 'failed', duration / 1000);
}
logger.error(`${jobName} failed`, {
error: errorMessage,
duration,
});
// Record Prometheus metrics
recordDbOperation('snapshot_job', duration / 1000);
recordBackgroundJob('snapshot', 'failed', duration / 1000);
recordAgentLoopError();
}
});
}

Expand Down
11 changes: 6 additions & 5 deletions src/agent/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import { logger } from '../utils/logger';
import { getCorrelationId } from '../utils/correlation';
import type { AgentAction, AgentStatus } from '@prisma/client';
import { ProtocolComparison, RebalanceDetails, RebalanceThresholds } from './types';
import { scanAllProtocols, getCurrentOnChainApy } from './scanner';
import { triggerRebalance as submitRebalance } from '../stellar/contract';
Expand Down Expand Up @@ -178,7 +179,7 @@ export async function triggerRebalance(
network: representativePosition.user.network,
protocolName: toProtocol,
memo: `Agent rebalance from ${fromProtocol} to ${toProtocol}`,
} as any,
},
});
} else {
logger.warn('No position found to persist rebalance transaction', {
Expand Down Expand Up @@ -307,8 +308,8 @@ export async function executeRebalanceIfNeeded(
* a null userId so it is distinguishable from user-level actions.
*/
export async function logAgentAction(
action: string,
status: 'SUCCESS' | 'FAILED' | 'SKIPPED',
action: AgentAction,
status: AgentStatus,
data?: Record<string, unknown>,
userId?: string,
positionId?: string,
Expand All @@ -327,8 +328,8 @@ export async function logAgentAction(
data: {
userId: userId ?? null,
positionId: positionId ?? null,
action: action as any,
status: status as any,
action,
status,
inputData: inputWithCorrelation ? JSON.stringify(inputWithCorrelation) : data?.input ? JSON.stringify(data.input) : undefined,
outputData: data?.output ? JSON.stringify(data.output) : undefined,
reasoning: data?.reasoning as string | undefined,
Expand Down
30 changes: 15 additions & 15 deletions src/agent/scanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Scanner - Fetches real APY rates from Stellar yield protocols
*/

import { Network } from '@prisma/client';
import { logger } from '../utils/logger';
import { YieldProtocol, ProtocolRate } from './types';
import db from '../db';
Expand Down Expand Up @@ -40,13 +41,12 @@ async function fetchBlendApy(): Promise<YieldProtocol | null> {

const poolId = process.env.BLEND_POOL_ID || 'GBUQWP3BOUZX34PISXEAMBNIZJLNCLVNX77MHAHVXHVVB4CMYAOK6BAC';

const data = await fetchWithRetry(
const data = await fetchWithRetry<{ reserves?: Array<{ asset?: { code?: string; symbol?: string }; supplyApy?: string; totalSupply?: string }> }>(
`${network}/api/v1/pool/${poolId}`,
{ timeout: 5000, retries: 3 }
);

// Extract USDC reserve APY and TVL from response
const reserve = data?.reserves?.find((r: any) =>
const reserve = data?.reserves?.find((r) =>
r.asset?.code === 'USDC' || r.asset?.symbol === 'USDC'
);

Expand Down Expand Up @@ -87,7 +87,7 @@ async function fetchStellarDexApy(): Promise<YieldProtocol | null> {
const horizonUrl = process.env.HORIZON_URL || 'https://horizon.stellar.org';
const usdcIssuer = process.env.USDC_ISSUER || 'GA5ZSEJYB37JRC5AVCIA5MOP4RHTM335X2KGX3IHOJAPP5RE34K4KZVN';

const data = await fetchWithRetry(
const data = await fetchWithRetry<{ _embedded?: { records?: Array<{ total_shares?: string; fee_bp?: string }> } }>(
`${horizonUrl}/liquidity_pools?reserves=${ASSET_SYMBOL}:${usdcIssuer}&limit=10&order=desc`,
{ timeout: 5000, retries: 3 }
);
Expand Down Expand Up @@ -134,16 +134,16 @@ async function fetchLumaApy(): Promise<YieldProtocol | null> {
try {
const lumaUrl = process.env.LUMA_API_URL || 'https://api.luma.finance';

const data = await fetchWithRetry(
const data = await fetchWithRetry<{ rates?: Array<{ asset?: string; symbol?: string; apy?: string; tvl?: string }> }>(
`${lumaUrl}/v1/rates?asset=${ASSET_SYMBOL}`,
{ timeout: 5000, retries: 3 }
);

const rate = data?.rates?.find((r: any) =>
const rate = data?.rates?.find((r) =>
r.asset === ASSET_SYMBOL || r.symbol === ASSET_SYMBOL
);

if (!rate) throw new Error('USDC rate not found in Luma response');
if (!rate?.apy) throw new Error('USDC rate not found in Luma response');

const apyRate = parseFloat(rate.apy) * 100;
const tvl = rate.tvl ? parseFloat(rate.tvl) : undefined;
Expand Down Expand Up @@ -209,15 +209,16 @@ export async function scanAllProtocols(): Promise<YieldProtocol[]> {
return filtered;
}

function normalizeNetwork(): string {
function normalizeNetwork(): Network {
const network = process.env.STELLAR_NETWORK?.toLowerCase();
const validNetworks = ['mainnet', 'testnet', 'futurenet'];
if (!network || !validNetworks.includes(network)) {
const validNetworks: Network[] = ['MAINNET', 'TESTNET', 'FUTURENET'];
const upper = network?.toUpperCase() as Network | undefined;
if (!upper || !validNetworks.includes(upper)) {
throw new Error(
`Invalid STELLAR_NETWORK: "${process.env.STELLAR_NETWORK}". Must be one of: ${validNetworks.join(', ')}`
);
}
return network.toUpperCase();
return upper;
}

/**
Expand All @@ -231,10 +232,9 @@ async function saveProtocolRates(protocols: YieldProtocol[]): Promise<void> {
data: {
protocolName: protocol.name,
assetSymbol: protocol.assetSymbol,
supplyApy: protocol.apy as any,
tvl: protocol.tvl === undefined ? undefined : (protocol.tvl as any),
network: networkLabel as any,
rawResponse: JSON.stringify({ fetchedAt: new Date(), source: protocol.name }),
supplyApy: protocol.apy,
tvl: protocol.tvl === undefined ? undefined : protocol.tvl,
network: networkLabel,
},
});
}
Expand Down
6 changes: 3 additions & 3 deletions src/agent/snapshotter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export async function captureAllUserBalances(): Promise<void> {

// CRITICAL FIX: Use batch insert (createMany) instead of individual awaits
// This scales much better as user base grows
const snapshotData = positions.map((pos: any) => {
const snapshotData = positions.map((pos) => {
const yearsActive = calculateYearsActive(pos.openedAt);
const apy = calculateApy(
pos.depositedAmount.toNumber(),
Expand All @@ -46,7 +46,7 @@ export async function captureAllUserBalances(): Promise<void> {
return {
positionId: pos.id,
// Coerce computed APY (number) into Prisma Decimal field.
apy: apy as any,
apy,
yieldAmount: pos.yieldEarned,
principalAmount: pos.depositedAmount,
};
Expand Down Expand Up @@ -122,7 +122,7 @@ export async function getPositionHistory(
},
});

return snapshots.map((snapshot: any) => ({
return snapshots.map((snapshot) => ({
userId: snapshot.position.userId,
walletAddress: snapshot.position.user.walletAddress,
positionId,
Expand Down
4 changes: 2 additions & 2 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ function validateAllRequiredEnvVars(): void {
// ── 10. NODE_ENV: must be one of the known deployment environments ────────
const nodeEnv = process.env.NODE_ENV
const validNodeEnvs = ['development', 'staging', 'production', 'test'] as const
if (nodeEnv && !validNodeEnvs.includes(nodeEnv as any)) {
if (nodeEnv && !(validNodeEnvs as readonly string[]).includes(nodeEnv)) {
errors.push(
`NODE_ENV is invalid: "${nodeEnv}". Must be one of: ${validNodeEnvs.join(' | ')}`
)
Expand All @@ -137,7 +137,7 @@ function validateStellarNetwork(network: string): 'testnet' | 'mainnet' | 'futur
const validNetworks = ['testnet', 'mainnet', 'futurenet'] as const
const lowerNetwork = network.toLowerCase()

if (!validNetworks.includes(lowerNetwork as any)) {
if (!(validNetworks as readonly string[]).includes(lowerNetwork)) {
throw new Error(
`Invalid STELLAR_NETWORK: "${network}". Must be one of: ${validNetworks.join(', ')}`
)
Expand Down
2 changes: 1 addition & 1 deletion src/middleware/adminAuth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import db from '../db'
import { logger } from '../utils/logger'
import { recordAuthFailure } from '../utils/metrics'

const prisma = db as any
const prisma = db

export interface AdminAuthContext {
id: string
Expand Down
4 changes: 2 additions & 2 deletions src/middleware/corsandbody.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ export const urlencodedBodyParser = express.urlencoded({
* this converts those into a consistent JSON response.
*/
export function payloadSizeErrorHandler(
err: any,
err: unknown,
_req: Request,
res: Response,
next: NextFunction
): void {
if (err.type === 'entity.too.large') {
if (err && typeof err === 'object' && 'type' in err && (err as Record<string, unknown>).type === 'entity.too.large') {
res.status(413).json({
success: false,
error: 'Payload Too Large',
Expand Down
2 changes: 1 addition & 1 deletion src/middleware/errorHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export function errorHandler(
const isDevelopment = process.env.NODE_ENV === 'development'
const errorResponse = ErrorResponses.internalError(
'Internal server error',
requestId,
requestId ?? 'unknown',
isDevelopment ? { message: err.message } : undefined
)

Expand Down
Loading