Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/investment/portfolio/dto/rebalancing.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ export class ExecuteRebalancingDto {
dryRun?: boolean;
}

export class ScheduleRebalancingDto {
@IsEnum(["daily", "weekly", "monthly", "custom"])
frequency: "daily" | "weekly" | "monthly" | "custom";

@IsOptional()
@IsString()
cron?: string;
}

export class CancelRebalancingDto {
@IsString()
rebalancingEventId: string;
Expand All @@ -82,6 +91,3 @@ export class RebalancingEventResponseDto {
executedAt?: Date;
completedAt?: Date;
}



13 changes: 10 additions & 3 deletions src/investment/portfolio/portfolio.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BullModule } from "@nestjs/bull";

// Modules
import { AlertsModule } from "src/growth/alerts/alerts.module";
import { DeFiModule } from "src/defi/defi.module";

// Entities
import { Portfolio } from "./entities/portfolio.entity";
import { PortfolioAsset } from "./entities/portfolio-asset.entity";
Expand All @@ -22,6 +26,9 @@ import { PortfolioConstraintService } from "./services/portfolio-constraint.serv
import { AuditLogService } from "src/infrastructure/audit/audit-log.service";
import { TradingTransactionService } from "./services/trading-transaction.service";

// Processors
import { RebalancingProcessor } from "./processors/rebalancing.processor";

// Controllers
import { PortfolioController } from "./portfolio.controller";
import { PortfolioManagementController } from "./portfolio-management.controller";
Expand Down Expand Up @@ -56,6 +63,8 @@ import { PortfolioOwnerGuard } from "./guards/portfolio-owner.guard";
name: "ml-predictions",
},
),
AlertsModule,
DeFiModule,
],
providers: [
PortfolioService,
Expand All @@ -67,6 +76,7 @@ import { PortfolioOwnerGuard } from "./guards/portfolio-owner.guard";
AuditLogService,
TradingTransactionService,
PortfolioOwnerGuard,
RebalancingProcessor,
],
controllers: [PortfolioController, PortfolioManagementController],

Expand All @@ -81,6 +91,3 @@ import { PortfolioOwnerGuard } from "./guards/portfolio-owner.guard";
],
})
export class PortfolioModule {}



70 changes: 70 additions & 0 deletions src/investment/portfolio/processors/rebalancing.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Process, Processor } from "@nestjs/bull";
import { Logger } from "@nestjs/common";
import { Job } from "bull";
import { RebalancingService } from "../services/rebalancing.service";
import { RebalanceTrigger } from "../entities/rebalancing-event.entity";

@Processor("rebalancing")
export class RebalancingProcessor {
private readonly logger = new Logger(RebalancingProcessor.name);

constructor(private readonly rebalancingService: RebalancingService) {}

@Process("rebalance-task")
async handleRebalanceTask(job: Job<{ portfolioId: string }>) {
const { portfolioId } = job.data;
this.logger.log(
`Processing scheduled rebalance for portfolio: ${portfolioId}`,
);

try {
const { shouldRebalance } =
await this.rebalancingService.shouldRebalance(portfolioId);

if (shouldRebalance) {
this.logger.log(
`Portfolio ${portfolioId} needs rebalancing. Triggering...`,
);
const event = await this.rebalancingService.triggerRebalancing(
portfolioId,
RebalanceTrigger.TIME_BASED,
"Scheduled rebalancing triggered by system",
);

// For automated rebalancing, we might want to execute immediately if the portfolio is set to auto-rebalance
// In this implementation, we simulate and then execute if safe.
const simulation =
await this.rebalancingService.simulateRebalance(portfolioId);

if (simulation.expectedSlippage <= 0.02) {
await this.rebalancingService.executeRebalancing(
event.id,
undefined,
simulation.expectedSlippage,
);
this.logger.log(
`Successfully executed scheduled rebalance for portfolio ${portfolioId}`,
);
} else {
this.logger.warn(
`Slippage too high for portfolio ${portfolioId}, skipping execution.`,
);
await this.rebalancingService.cancelRebalancing(
event.id,
"High slippage detected during scheduled task",
);
}
} else {
this.logger.log(
`Portfolio ${portfolioId} does not need rebalancing at this time.`,
);
}
} catch (error) {
this.logger.error(
`Failed to process rebalance task for portfolio ${portfolioId}`,
error.stack,
);
throw error;
}
}
}
197 changes: 197 additions & 0 deletions src/investment/portfolio/services/rebalancing.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import { Test, TestingModule } from "@nestjs/testing";
import { getRepositoryToken } from "@nestjs/typeorm";
import { RebalancingService } from "./rebalancing.service";
import {
RebalancingEvent,
RebalanceStatus,
RebalanceTrigger,
} from "../entities/rebalancing-event.entity";
import { Portfolio } from "../entities/portfolio.entity";
import { PortfolioAsset } from "../entities/portfolio-asset.entity";
import { PortfolioService } from "./portfolio.service";
import { TradingTransactionService } from "./trading-transaction.service";
import { AuditLogService } from "src/infrastructure/audit/audit-log.service";
import { AlertDispatcherService } from "src/growth/alerts/services/alert-dispatcher.service";
import { TransactionOptimizationService } from "src/defi/services/transaction-optimization.service";
import { getQueueToken } from "@nestjs/bull";
import { BadRequestException } from "@nestjs/common";

describe("RebalancingService", () => {
let service: RebalancingService;
let portfolioRepository: any;
let portfolioAssetRepository: any;
let rebalancingRepository: any;
let portfolioService: any;
let alertService: any;
let auditLogService: any;

const mockPortfolio = {
id: "portfolio-1",
userId: "user-1",
name: "Test Portfolio",
totalValue: 10000,
targetAllocation: { BTC: 60, ETH: 40 },
currentAllocation: { BTC: 50, ETH: 50 },
rebalanceThreshold: 5,
};

const mockAssets = [
{ ticker: "BTC", allocationPercentage: 50, currentPrice: 50000 },
{ ticker: "ETH", allocationPercentage: 50, currentPrice: 2000 },
];

beforeEach(async () => {
portfolioRepository = {
findOne: jest.fn().mockResolvedValue(mockPortfolio),
save: jest.fn().mockImplementation((p) => Promise.resolve(p)),
};
portfolioAssetRepository = {
find: jest.fn().mockResolvedValue(mockAssets),
findOne: jest.fn().mockImplementation(({ where: { ticker } }) => {
return Promise.resolve(mockAssets.find((a) => a.ticker === ticker));
}),
};
rebalancingRepository = {
create: jest.fn().mockImplementation((d) => ({ ...d, id: "event-1" })),
save: jest.fn().mockImplementation((e) => Promise.resolve(e)),
findOne: jest.fn(),
};
portfolioService = {
getPortfolio: jest.fn().mockResolvedValue(mockPortfolio),
};
alertService = {
dispatch: jest.fn().mockResolvedValue(undefined),
};
auditLogService = {
recordVerification: jest.fn().mockResolvedValue(undefined),
};

const module: TestingModule = await Test.createTestingModule({
providers: [
RebalancingService,
{
provide: getRepositoryToken(Portfolio),
useValue: portfolioRepository,
},
{
provide: getRepositoryToken(PortfolioAsset),
useValue: portfolioAssetRepository,
},
{
provide: getRepositoryToken(RebalancingEvent),
useValue: rebalancingRepository,
},
{ provide: PortfolioService, useValue: portfolioService },
{
provide: TradingTransactionService,
useValue: { executeTrade: jest.fn() },
},
{ provide: AuditLogService, useValue: auditLogService },
{ provide: AlertDispatcherService, useValue: alertService },
{ provide: TransactionOptimizationService, useValue: {} },
{
provide: getQueueToken("rebalancing"),
useValue: {
add: jest.fn(),
getRepeatableJobs: jest.fn().mockResolvedValue([]),
},
},
],
}).compile();

service = module.get<RebalancingService>(RebalancingService);
});

it("should detect when rebalancing is needed", async () => {
const result = await service.shouldRebalance("portfolio-1");
expect(result.shouldRebalance).toBe(true);
expect(result.maxDrift).toBe(10); // BTC drift is 10%
});

it("should not rebalance if drift is below threshold", async () => {
const smallDriftPortfolio = {
...mockPortfolio,
currentAllocation: { BTC: 58, ETH: 42 },
};
const smallDriftAssets = [
{ ticker: "BTC", allocationPercentage: 58 },
{ ticker: "ETH", allocationPercentage: 42 },
];
portfolioService.getPortfolio.mockResolvedValue(smallDriftPortfolio);
portfolioAssetRepository.findOne.mockImplementation(
({ where: { ticker } }) => {
return Promise.resolve(
smallDriftAssets.find((a) => a.ticker === ticker),
);
},
);

const result = await service.shouldRebalance("portfolio-1");
expect(result.shouldRebalance).toBe(false);
expect(result.maxDrift).toBe(2);
});

it("should simulate rebalancing with correct trades", async () => {
const simulation = await service.simulateRebalance("portfolio-1");
expect(simulation.tradePlan).toHaveLength(2);
expect(simulation.gasEstimate).toBeGreaterThan(0);
expect(auditLogService.recordVerification).toHaveBeenCalledWith(
expect.objectContaining({
type: "REBALANCE_SIMULATION",
}),
);
});

it("should validate slippage correctly", () => {
expect(service.validateSlippage(0.01).safe).toBe(true);
expect(service.validateSlippage(0.03).safe).toBe(false);
});

it("should fail execution if slippage exceeds limit", async () => {
const mockEvent = {
id: "event-1",
portfolioId: "portfolio-1",
portfolio: mockPortfolio,
status: RebalanceStatus.PENDING,
};
rebalancingRepository.findOne.mockResolvedValue(mockEvent);

await expect(
service.executeRebalancing("event-1", undefined, 0.05),
).rejects.toThrow(BadRequestException);

expect(alertService.dispatch).toHaveBeenCalledWith(
"user-1",
expect.objectContaining({
type: "REBALANCE_CANCELLED",
}),
);
});

it("should execute rebalancing successfully", async () => {
const mockEvent = {
id: "event-1",
portfolioId: "portfolio-1",
portfolio: mockPortfolio,
trades: [{ ticker: "BTC", action: "buy", quantity: 1, price: 50000 }],
allocationAfter: { BTC: 60, ETH: 40 },
status: RebalanceStatus.PENDING,
};
rebalancingRepository.findOne.mockResolvedValue(mockEvent);

const result = await service.executeRebalancing("event-1", 100, 0.01);

expect(result.status).toBe(RebalanceStatus.COMPLETED);
expect(alertService.dispatch).toHaveBeenCalledWith(
"user-1",
expect.objectContaining({
type: "REBALANCE_SUCCESS",
}),
);
expect(auditLogService.recordVerification).toHaveBeenCalledWith(
expect.objectContaining({
type: "REBALANCE_SUCCESS",
}),
);
});
});
Loading
Loading