Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ model User {
displayName String?
email String? @unique
avatarUrl String?
riskTolerance Int @default(5)
isActive Boolean @default(true)
riskTolerance Int @default(5)
rebalanceStrategy String? // 'MAX_YIELD' | 'TARGET_ALLOCATION' | null (defaults to MAX_YIELD)
strategyConfig Json? // e.g. { "targetAllocations": { "Blend": 50, "Stellar DEX": 30, "Luma": 20 } }
isActive Boolean @default(true)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

Expand Down
35 changes: 27 additions & 8 deletions src/agent/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
recordRebalanceTriggered,
recordDbOperation,
recordBackgroundJob,
recordExternalServiceError
recordExternalServiceError,
} from '../utils/metrics';

let isRunning = false;
Expand Down Expand Up @@ -95,26 +95,45 @@ async function rebalanceCheckJob(): Promise<void> {
return;
}

const byProtocol = new Map<string, typeof positions>();
// Group by (protocol, strategy) so users with different strategies
// are evaluated independently
type PositionWithUser = typeof positions[number];
const byProtocolAndStrategy = new Map<string, PositionWithUser[]>();
for (const pos of positions) {
const key = pos.protocolName;
if (!byProtocol.has(key)) {
byProtocol.set(key, []);
const strategy = (pos.user as any).rebalanceStrategy || 'DEFAULT';
const key = `${pos.protocolName}:${strategy}`;
if (!byProtocolAndStrategy.has(key)) {
byProtocolAndStrategy.set(key, []);
}
byProtocol.get(key)!.push(pos);
byProtocolAndStrategy.get(key)!.push(pos);
}

let rebalancesTriggered = 0;
const thresholds = getThresholds();

for (const [protocol, protocolPositions] of byProtocol.entries()) {
for (const [key, protocolPositions] of byProtocolAndStrategy.entries()) {
const [protocol, strategyKey] = key.split(':');
const strategyName = strategyKey === 'DEFAULT' ? undefined : strategyKey;

// Build per-user strategy preferences
const userStrategyPreferences = strategyName
? protocolPositions.map((p: any) => ({
userId: p.userId,
strategyName: p.user.rebalanceStrategy || null,
targetAllocations: p.user.strategyConfig?.targetAllocations || undefined,
riskTolerance: p.user.riskTolerance,
}))
: undefined;

const result = await executeRebalanceIfNeeded(
protocol,
protocolPositions.map((p: any) => ({
id: p.id,
amount: p.currentValue.toString(),
userId: p.userId,
})),
thresholds
thresholds,
userStrategyPreferences,
);

if (result) {
Expand Down
81 changes: 74 additions & 7 deletions src/agent/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import { logger } from '../utils/logger';
import { getCorrelationId } from '../utils/correlation';
import { ProtocolComparison, RebalanceDetails, RebalanceThresholds } from './types';
import { ProtocolComparison, RebalanceDetails, RebalanceThresholds, RebalanceStrategy, UserStrategyPreferences } from './types';
import { scanAllProtocols, getCurrentOnChainApy } from './scanner';
import { triggerRebalance as submitRebalance } from '../stellar/contract';
import { MaxYieldStrategy, TargetAllocationStrategy } from './strategies';
import db from '../db';

const DEFAULT_THRESHOLDS: RebalanceThresholds = {
Expand Down Expand Up @@ -128,6 +129,7 @@ export async function triggerRebalance(
toProtocol: string,
amount: string,
positionIds: string[] = [],
strategyInfo?: { name: string; reasoning: string; deviationTrigger?: string },
): Promise<RebalanceDetails | null> {
const startTime = Date.now();

Expand Down Expand Up @@ -215,11 +217,19 @@ export async function triggerRebalance(
seen.add(key);
await logAgentAction('REBALANCE', 'SUCCESS', {
rebalanceDetail,
strategyName: strategyInfo?.name,
reasoning: strategyInfo?.reasoning,
deviationTrigger: strategyInfo?.deviationTrigger,
}, pos.userId, pos.id);
}
} else {
// No positions linked – log as system-level (userId stays null)
await logAgentAction('REBALANCE', 'SUCCESS', { rebalanceDetail });
await logAgentAction('REBALANCE', 'SUCCESS', {
rebalanceDetail,
strategyName: strategyInfo?.name,
reasoning: strategyInfo?.reasoning,
deviationTrigger: strategyInfo?.deviationTrigger,
});
}

logger.info('Rebalance successful', {
Expand Down Expand Up @@ -257,20 +267,72 @@ export async function triggerRebalance(
*/
export async function executeRebalanceIfNeeded(
currentProtocol: string,
userPositions: Array<{ id: string; amount: string }>,
thresholds?: RebalanceThresholds
userPositions: Array<{ id: string; amount: string; userId?: string }>,
thresholds?: RebalanceThresholds,
userStrategyPreferences?: UserStrategyPreferences[],
): Promise<RebalanceDetails | null> {
try {
// Sum all user positions FIRST to account for costs
const totalAmount = userPositions
.reduce(
(sum, pos) => sum + BigInt(pos.amount),
BigInt(0)
)
.toString();

// FIXED: Pass totalAmount to compareProtocols so it can account for transaction costs
const comparison = await compareProtocols(currentProtocol, totalAmount, thresholds);
const effectiveThresholds = thresholds ?? getThresholds();

// Use strategy engine when user preferences are present
if (userStrategyPreferences && userStrategyPreferences.length > 0) {
const currentApy = await getCurrentOnChainApy(currentProtocol);
if (!currentApy) {
logger.warn(`Cannot get current APY for ${currentProtocol}`);
return null;
}

const allProtocols = await scanAllProtocols();
if (allProtocols.length === 0) {
logger.warn('No protocols available for comparison');
return null;
}

const preferredStrategy = userStrategyPreferences[0]?.strategyName;
const strategy: RebalanceStrategy =
preferredStrategy === 'TARGET_ALLOCATION'
? new TargetAllocationStrategy()
: new MaxYieldStrategy();

const decision = await strategy.analyze({
currentProtocol,
totalAmount,
currentApy,
availableProtocols: allProtocols,
thresholds: effectiveThresholds,
userStrategyPreferences,
});

if (!decision.shouldRebalance) {
logger.info('No rebalance needed (strategy)', {
strategy: strategy.name,
reasoning: decision.reasoning,
});
return null;
}

return await triggerRebalance(
currentProtocol,
decision.targetProtocol,
totalAmount,
userPositions.map(pos => pos.id),
{
name: strategy.name,
reasoning: decision.reasoning,
deviationTrigger: decision.deviationTrigger,
},
);
}

// Default: existing compareProtocols flow (backward compatible)
const comparison = await compareProtocols(currentProtocol, totalAmount, effectiveThresholds);

if (!comparison || !comparison.shouldRebalance) {
logger.info('No rebalance needed', {
Expand All @@ -286,6 +348,11 @@ export async function executeRebalanceIfNeeded(
comparison.best.name,
totalAmount,
userPositions.map(pos => pos.id),
{
name: 'MAX_YIELD',
reasoning: `Moving from ${currentProtocol} to ${comparison.best.name} — net gain ${comparison.improvement.toFixed(2)}% after costs`,
deviationTrigger: `APY delta: ${(comparison.best.apy - (comparison.current.apy)).toFixed(2)}%`,
},
);
} catch (error) {
logger.error('Rebalance execution check failed', {
Expand Down
Loading