Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/api/src/api/services/phases/phase-processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import httpStatus from "http-status";
import logger from "../../../config/logger";
import { runWithRampContext } from "../../../config/ramp-context";
import { config } from "../../../config/vars";
import RampState from "../../../models/rampState.model";
import { APIError } from "../../errors/api-error";
import { PhaseError, RecoverablePhaseError } from "../../errors/phase-error";
Expand Down Expand Up @@ -40,6 +41,13 @@ export class PhaseProcessor {
});
}

if (state.flowVariant !== config.flowVariant) {
logger.warn(
`Refusing to process ramp ${rampId}: belongs to flow ${state.flowVariant}, this backend is ${config.flowVariant}`
);
return;
}

// Try to acquire the lock
let lockAcquired = await this.acquireLock(state);
if (!lockAcquired) {
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/api/services/quote/engines/finalize/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getPaymentMethodFromDestinations, QuoteResponse, RampDirection } from "@vortexfi/shared";
import Big from "big.js";
import httpStatus from "http-status";
import { config } from "../../../../../config/vars";
import QuoteTicket from "../../../../../models/quoteTicket.model";
import { APIError } from "../../../../errors/api-error";
import { trimTrailingZeros } from "../../core/helpers";
Expand Down Expand Up @@ -149,6 +150,7 @@ export abstract class BaseFinalizeEngine implements Stage {
apiKey: request.apiKey || null,
countryCode: request.countryCode,
expiresAt,
flowVariant: config.flowVariant,
from: request.from,
inputAmount: request.inputAmount,
inputCurrency: request.inputCurrency,
Expand Down
5 changes: 5 additions & 0 deletions apps/api/src/api/services/quote/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Big from "big.js";
import httpStatus from "http-status";
import pLimit from "p-limit";
import logger from "../../../config/logger";
import { config } from "../../../config/vars";
import Partner from "../../../models/partner.model";
import { APIError } from "../../errors/api-error";
import { BaseRampService } from "../ramp/base.service";
Expand All @@ -38,6 +39,10 @@ export class QuoteService extends BaseRampService {
return null;
}

if (quote.flowVariant !== config.flowVariant) {
return null;
}

return buildQuoteResponse(quote);
}

Expand Down
36 changes: 34 additions & 2 deletions apps/api/src/api/services/ramp/ramp.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
AveniaPaymentMethod,
BrlaApiService,
BrlaCurrency,
CreateAlfredpayOfframpRequest,
CreateAlfredpayOnrampRequest,
EphemeralAccountType,
ERC20_EURE_POLYGON_TOKEN_NAME,
Expand Down Expand Up @@ -47,6 +46,7 @@ import { Op, Transaction, WhereOptions } from "sequelize";
import { StrKey } from "stellar-sdk";
import { isAddress } from "viem";
import logger from "../../../config/logger";
import { config } from "../../../config/vars";
import { SEQUENCE_TIME_WINDOW_IN_SECONDS } from "../../../constants/constants";
import Partner from "../../../models/partner.model";
import QuoteTicket from "../../../models/quoteTicket.model";
Expand Down Expand Up @@ -168,6 +168,16 @@ export function normalizeAndValidateSigningAccounts(accounts: AccountMeta[]) {
}

export class RampService extends BaseRampService {
// Two backends share one database; each must only touch ramps/quotes for its own flow.
// We return 404 on mismatch so the wrong backend looks indistinguishable from "not found".
private static assertOwnedByThisFlow(entity: { flowVariant: string; id: string }, kind: "Ramp" | "Quote"): void {
if (entity.flowVariant !== config.flowVariant) {
throw new APIError({
message: `${kind} not found`,
status: httpStatus.NOT_FOUND
});
}
}
/**
* Register a new ramping process. This will create a new ramp state and create transactions that need to be signed
* on the client side.
Expand All @@ -185,6 +195,8 @@ export class RampService extends BaseRampService {
});
}

RampService.assertOwnedByThisFlow(quote, "Quote");

if (quote.status !== "pending") {
throw new APIError({
message: `Quote is ${quote.status}`,
Expand Down Expand Up @@ -230,6 +242,7 @@ export class RampService extends BaseRampService {
const rampState = await this.createRampState(
{
currentPhase: "initial" as RampPhase,
flowVariant: quote.flowVariant,
from: quote.from,
paymentMethod: quote.paymentMethod,
postCompleteState: {
Expand Down Expand Up @@ -297,6 +310,8 @@ export class RampService extends BaseRampService {
});
}

RampService.assertOwnedByThisFlow(rampState, "Ramp");

const quote = await QuoteTicket.findByPk(rampState.quoteId, { transaction });

if (!quote) {
Expand Down Expand Up @@ -413,6 +428,8 @@ export class RampService extends BaseRampService {
});
}

RampService.assertOwnedByThisFlow(rampState, "Ramp");

const quote = await QuoteTicket.findByPk(rampState.quoteId, { transaction });

if (!quote) {
Expand Down Expand Up @@ -508,6 +525,10 @@ export class RampService extends BaseRampService {
return null;
}

if (rampState.flowVariant !== config.flowVariant) {
return null;
}

// Fetch associated quote for fee data
const quote = await QuoteTicket.findByPk(rampState.quoteId);

Expand Down Expand Up @@ -619,6 +640,10 @@ export class RampService extends BaseRampService {
return null;
}

if (rampState.flowVariant !== config.flowVariant) {
return null;
}

return rampState.errorLogs;
}

Expand All @@ -635,7 +660,8 @@ export class RampService extends BaseRampService {
[Op.or]: [{ "state.walletAddress": walletAddress }, { "state.destinationAddress": walletAddress }],
currentPhase: {
[Op.ne]: "initial"
}
},
flowVariant: config.flowVariant
};

let where: WhereOptions<RampStateAttributes>;
Expand Down Expand Up @@ -750,6 +776,8 @@ export class RampService extends BaseRampService {
});
}

RampService.assertOwnedByThisFlow(rampState, "Ramp");

// Limit the number of error logs to 100
const updatedErrorLogs = [...(rampState.errorLogs || []), errorLog].slice(-100);
await rampState.update({
Expand Down Expand Up @@ -1312,6 +1340,8 @@ export class RampService extends BaseRampService {
throw new Error("Ramp not found.");
}

RampService.assertOwnedByThisFlow(rampState, "Ramp");

await this.updateRampState(id, {
currentPhase: "timedOut"
});
Expand All @@ -1337,6 +1367,8 @@ export class RampService extends BaseRampService {
throw new Error(`RampState with id ${id} not found`);
}

RampService.assertOwnedByThisFlow(rampState, "Ramp");

const oldPhase = rampState.currentPhase;

await super.logPhaseTransition(id, newPhase, metadata);
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/api/workers/cleanup.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { CronJob } from "cron";
import { Op } from "sequelize";
import logger from "../../config/logger";
import { runWithRampContext } from "../../config/ramp-context";
import { config } from "../../config/vars";
import RampState from "../../models/rampState.model";
import { postProcessHandlers } from "../services/phases/post-process";
import { BaseRampService } from "../services/ramp/base.service";
Expand Down Expand Up @@ -152,6 +153,7 @@ class CleanupWorker {
order: [["updatedAt", "DESC"]],
where: {
currentPhase: { [Op.in]: ["complete", "failed", "timedOut"] },
flowVariant: config.flowVariant,
postCompleteState: {
cleanup: {
[Op.or]: [{ cleanupCompleted: false }, { cleanupCompleted: { [Op.is]: null } }]
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/api/workers/ramp-recovery.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { RampErrorLog } from "@vortexfi/shared";
import { CronJob } from "cron";
import { Op } from "sequelize";
import logger from "../../config/logger";
import { config } from "../../config/vars";
import RampState from "../../models/rampState.model";
import phaseProcessor from "../services/phases/phase-processor";
import rampService from "../services/ramp/ramp.service";
Expand Down Expand Up @@ -58,6 +59,7 @@ class RampRecoveryWorker {
currentPhase: {
[Op.notIn]: ["complete", "failed", "initial"]
},
flowVariant: config.flowVariant,
presignedTxs: { [Op.not]: null },
updatedAt: {
[Op.lt]: new Date(Date.now() - TEN_MINUTES_IN_MS) // 10 minutes ago
Expand Down
3 changes: 3 additions & 0 deletions apps/api/src/api/workers/unhandled-payment.worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { BrlaApiService, generateReferenceLabel, normalizeTaxId } from "@vortexf
import { CronJob } from "cron";
import { Op } from "sequelize";
import logger from "../../config/logger";
import { config } from "../../config/vars";
import RampState from "../../models/rampState.model";
import TaxId from "../../models/taxId.model";
import { SlackNotifier } from "../services/slack.service";
Expand Down Expand Up @@ -79,6 +80,7 @@ class UnhandledPaymentWorker {
[Op.gt]: threeDaysAgo
},
currentPhase: "initial",
flowVariant: config.flowVariant,
id: {
[Op.notIn]: Array.from(this.processedStateIds)
}
Expand All @@ -105,6 +107,7 @@ class UnhandledPaymentWorker {
[Op.gt]: threeDaysAgo
},
currentPhase: "failed",
flowVariant: config.flowVariant,
id: {
[Op.notIn]: Array.from(this.processedStateIds)
}
Expand Down
19 changes: 19 additions & 0 deletions apps/api/src/config/vars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ interface SpreadsheetConfig {

type DeploymentEnv = "development" | "production" | "sandbox" | "staging" | "test";

// Identifies which onramp flow this backend instance serves. Two backends
// share one database; each ignores ramps/quotes belonging to the other flow.
// "monerium" is the legacy grace-period backend; "mykobo" is the new replacement.
export type FlowVariant = "monerium" | "mykobo";

const nodeEnv = process.env.NODE_ENV || "production";
const deploymentEnvValues: DeploymentEnv[] = ["development", "production", "sandbox", "staging", "test"];
const flowVariantValues: FlowVariant[] = ["monerium", "mykobo"];

function readDeploymentEnv(): DeploymentEnv {
const rawDeploymentEnv = process.env.DEPLOYMENT_ENV || (nodeEnv === "production" ? "production" : nodeEnv);
Expand All @@ -37,9 +43,20 @@ function readDeploymentEnv(): DeploymentEnv {
return rawDeploymentEnv as DeploymentEnv;
}

function readFlowVariant(): FlowVariant {
const rawFlowVariant = process.env.FLOW_VARIANT || "monerium";

if (!flowVariantValues.includes(rawFlowVariant as FlowVariant)) {
throw new Error(`FLOW_VARIANT must be one of: ${flowVariantValues.join(", ")} (got '${rawFlowVariant}')`);
}

return rawFlowVariant as FlowVariant;
}

interface Config {
env: string;
deploymentEnv: DeploymentEnv;
flowVariant: FlowVariant;
port: string | number;
amplitudeWss: string;
pendulumWss: string;
Expand Down Expand Up @@ -132,6 +149,7 @@ export const config: Config = {
},
deploymentEnv: readDeploymentEnv(),
env: nodeEnv,
flowVariant: readFlowVariant(),

integrations: {
alchemy: {
Expand Down Expand Up @@ -230,6 +248,7 @@ if (config.env === "production") {
if (!config.supabase.serviceRoleKey) missing.push("SUPABASE_SERVICE_KEY");
if (!config.secrets.webhookPrivateKey) missing.push("WEBHOOK_PRIVATE_KEY");
if (!config.adminSecret) missing.push("ADMIN_SECRET");
if (!process.env.FLOW_VARIANT) missing.push("FLOW_VARIANT");

if (missing.length > 0) {
throw new Error(`Missing required environment variables in production: ${missing.join(", ")}`);
Expand Down
30 changes: 30 additions & 0 deletions apps/api/src/database/migrations/028-add-flow-variant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { DataTypes, QueryInterface } from "sequelize";

const TABLES = ["quote_tickets", "ramp_states"] as const;

export async function up(queryInterface: QueryInterface): Promise<void> {
for (const table of TABLES) {
await queryInterface.addColumn(table, "flow_variant", {
allowNull: true,
type: DataTypes.STRING(16)
});

await queryInterface.sequelize.query(`UPDATE "${table}" SET "flow_variant" = 'monerium' WHERE "flow_variant" IS NULL`);

await queryInterface.changeColumn(table, "flow_variant", {
allowNull: false,
type: DataTypes.STRING(16)
});

await queryInterface.addIndex(table, ["flow_variant"], {
name: `idx_${table}_flow_variant`
});
}
}

export async function down(queryInterface: QueryInterface): Promise<void> {
for (const table of TABLES) {
await queryInterface.removeIndex(table, `idx_${table}_flow_variant`);
await queryInterface.removeColumn(table, "flow_variant");
}
}
13 changes: 13 additions & 0 deletions apps/api/src/models/quoteTicket.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { DestinationType, Networks, PaymentMethod, RampCurrency, RampDirection }
import { DataTypes, Model, Optional } from "sequelize";
import { QuoteTicketMetadata } from "../api/services/quote/core/types";
import sequelize from "../config/database";
import { FlowVariant } from "../config/vars";

// Define the attributes of the QuoteTicket model
export interface QuoteTicketAttributes {
Expand All @@ -22,6 +23,7 @@ export interface QuoteTicketAttributes {
paymentMethod: PaymentMethod;
countryCode: string | null;
network: Networks;
flowVariant: FlowVariant;
createdAt: Date;
updatedAt: Date;
}
Expand Down Expand Up @@ -65,6 +67,8 @@ class QuoteTicket extends Model<QuoteTicketAttributes, QuoteTicketCreationAttrib

declare network: Networks;

declare flowVariant: FlowVariant;

declare createdAt: Date;

declare updatedAt: Date;
Expand Down Expand Up @@ -95,6 +99,11 @@ QuoteTicket.init(
field: "expires_at",
type: DataTypes.DATE
},
flowVariant: {
allowNull: false,
field: "flow_variant",
type: DataTypes.STRING(16)
},
from: {
allowNull: false,
type: DataTypes.STRING(20)
Expand Down Expand Up @@ -192,6 +201,10 @@ QuoteTicket.init(
{
fields: ["partner_id"],
name: "idx_quote_tickets_partner"
},
{
fields: ["flow_variant"],
name: "idx_quote_tickets_flow_variant"
}
],
modelName: "QuoteTicket",
Expand Down
Loading
Loading