From a9f13537e77221246512b6b09e49f4afbb965c4e Mon Sep 17 00:00:00 2001 From: Marcel Ebert Date: Fri, 29 May 2026 11:28:52 +0200 Subject: [PATCH] Make monerium vs mykobo flows distinguishable --- .../api/services/phases/phase-processor.ts | 8 +++++ .../services/quote/engines/finalize/index.ts | 2 ++ apps/api/src/api/services/quote/index.ts | 5 +++ .../api/src/api/services/ramp/ramp.service.ts | 36 +++++++++++++++++-- apps/api/src/api/workers/cleanup.worker.ts | 2 ++ .../src/api/workers/ramp-recovery.worker.ts | 2 ++ .../api/workers/unhandled-payment.worker.ts | 3 ++ apps/api/src/config/vars.ts | 19 ++++++++++ .../migrations/028-add-flow-variant.ts | 30 ++++++++++++++++ apps/api/src/models/quoteTicket.model.ts | 13 +++++++ apps/api/src/models/rampState.model.ts | 13 +++++++ 11 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 apps/api/src/database/migrations/028-add-flow-variant.ts diff --git a/apps/api/src/api/services/phases/phase-processor.ts b/apps/api/src/api/services/phases/phase-processor.ts index fc765a2d1..fad4828e0 100644 --- a/apps/api/src/api/services/phases/phase-processor.ts +++ b/apps/api/src/api/services/phases/phase-processor.ts @@ -1,6 +1,7 @@ import httpStatus from "http-status"; import logger from "../../../config/logger"; import { runWithRampContext } from "../../../config/ramp-context"; +import { config } from "../../../config/vars"; import RampState from "../../../models/rampState.model"; import { APIError } from "../../errors/api-error"; import { PhaseError, RecoverablePhaseError } from "../../errors/phase-error"; @@ -40,6 +41,13 @@ export class PhaseProcessor { }); } + if (state.flowVariant !== config.flowVariant) { + logger.warn( + `Refusing to process ramp ${rampId}: belongs to flow ${state.flowVariant}, this backend is ${config.flowVariant}` + ); + return; + } + // Try to acquire the lock let lockAcquired = await this.acquireLock(state); if (!lockAcquired) { diff --git a/apps/api/src/api/services/quote/engines/finalize/index.ts b/apps/api/src/api/services/quote/engines/finalize/index.ts index 6d25bc285..811bb281c 100644 --- a/apps/api/src/api/services/quote/engines/finalize/index.ts +++ b/apps/api/src/api/services/quote/engines/finalize/index.ts @@ -1,6 +1,7 @@ import { getPaymentMethodFromDestinations, QuoteResponse, RampDirection } from "@vortexfi/shared"; import Big from "big.js"; import httpStatus from "http-status"; +import { config } from "../../../../../config/vars"; import QuoteTicket from "../../../../../models/quoteTicket.model"; import { APIError } from "../../../../errors/api-error"; import { trimTrailingZeros } from "../../core/helpers"; @@ -149,6 +150,7 @@ export abstract class BaseFinalizeEngine implements Stage { apiKey: request.apiKey || null, countryCode: request.countryCode, expiresAt, + flowVariant: config.flowVariant, from: request.from, inputAmount: request.inputAmount, inputCurrency: request.inputCurrency, diff --git a/apps/api/src/api/services/quote/index.ts b/apps/api/src/api/services/quote/index.ts index f924df0af..b35562f93 100644 --- a/apps/api/src/api/services/quote/index.ts +++ b/apps/api/src/api/services/quote/index.ts @@ -15,6 +15,7 @@ import Big from "big.js"; import httpStatus from "http-status"; import pLimit from "p-limit"; import logger from "../../../config/logger"; +import { config } from "../../../config/vars"; import Partner from "../../../models/partner.model"; import { APIError } from "../../errors/api-error"; import { BaseRampService } from "../ramp/base.service"; @@ -38,6 +39,10 @@ export class QuoteService extends BaseRampService { return null; } + if (quote.flowVariant !== config.flowVariant) { + return null; + } + return buildQuoteResponse(quote); } diff --git a/apps/api/src/api/services/ramp/ramp.service.ts b/apps/api/src/api/services/ramp/ramp.service.ts index 8b5f93986..a6f9418ad 100644 --- a/apps/api/src/api/services/ramp/ramp.service.ts +++ b/apps/api/src/api/services/ramp/ramp.service.ts @@ -10,7 +10,6 @@ import { AveniaPaymentMethod, BrlaApiService, BrlaCurrency, - CreateAlfredpayOfframpRequest, CreateAlfredpayOnrampRequest, EphemeralAccountType, ERC20_EURE_POLYGON_TOKEN_NAME, @@ -47,6 +46,7 @@ import { Op, Transaction, WhereOptions } from "sequelize"; import { StrKey } from "stellar-sdk"; import { isAddress } from "viem"; import logger from "../../../config/logger"; +import { config } from "../../../config/vars"; import { SEQUENCE_TIME_WINDOW_IN_SECONDS } from "../../../constants/constants"; import Partner from "../../../models/partner.model"; import QuoteTicket from "../../../models/quoteTicket.model"; @@ -168,6 +168,16 @@ export function normalizeAndValidateSigningAccounts(accounts: AccountMeta[]) { } export class RampService extends BaseRampService { + // Two backends share one database; each must only touch ramps/quotes for its own flow. + // We return 404 on mismatch so the wrong backend looks indistinguishable from "not found". + private static assertOwnedByThisFlow(entity: { flowVariant: string; id: string }, kind: "Ramp" | "Quote"): void { + if (entity.flowVariant !== config.flowVariant) { + throw new APIError({ + message: `${kind} not found`, + status: httpStatus.NOT_FOUND + }); + } + } /** * Register a new ramping process. This will create a new ramp state and create transactions that need to be signed * on the client side. @@ -185,6 +195,8 @@ export class RampService extends BaseRampService { }); } + RampService.assertOwnedByThisFlow(quote, "Quote"); + if (quote.status !== "pending") { throw new APIError({ message: `Quote is ${quote.status}`, @@ -230,6 +242,7 @@ export class RampService extends BaseRampService { const rampState = await this.createRampState( { currentPhase: "initial" as RampPhase, + flowVariant: quote.flowVariant, from: quote.from, paymentMethod: quote.paymentMethod, postCompleteState: { @@ -297,6 +310,8 @@ export class RampService extends BaseRampService { }); } + RampService.assertOwnedByThisFlow(rampState, "Ramp"); + const quote = await QuoteTicket.findByPk(rampState.quoteId, { transaction }); if (!quote) { @@ -413,6 +428,8 @@ export class RampService extends BaseRampService { }); } + RampService.assertOwnedByThisFlow(rampState, "Ramp"); + const quote = await QuoteTicket.findByPk(rampState.quoteId, { transaction }); if (!quote) { @@ -508,6 +525,10 @@ export class RampService extends BaseRampService { return null; } + if (rampState.flowVariant !== config.flowVariant) { + return null; + } + // Fetch associated quote for fee data const quote = await QuoteTicket.findByPk(rampState.quoteId); @@ -619,6 +640,10 @@ export class RampService extends BaseRampService { return null; } + if (rampState.flowVariant !== config.flowVariant) { + return null; + } + return rampState.errorLogs; } @@ -635,7 +660,8 @@ export class RampService extends BaseRampService { [Op.or]: [{ "state.walletAddress": walletAddress }, { "state.destinationAddress": walletAddress }], currentPhase: { [Op.ne]: "initial" - } + }, + flowVariant: config.flowVariant }; let where: WhereOptions; @@ -750,6 +776,8 @@ export class RampService extends BaseRampService { }); } + RampService.assertOwnedByThisFlow(rampState, "Ramp"); + // Limit the number of error logs to 100 const updatedErrorLogs = [...(rampState.errorLogs || []), errorLog].slice(-100); await rampState.update({ @@ -1312,6 +1340,8 @@ export class RampService extends BaseRampService { throw new Error("Ramp not found."); } + RampService.assertOwnedByThisFlow(rampState, "Ramp"); + await this.updateRampState(id, { currentPhase: "timedOut" }); @@ -1337,6 +1367,8 @@ export class RampService extends BaseRampService { throw new Error(`RampState with id ${id} not found`); } + RampService.assertOwnedByThisFlow(rampState, "Ramp"); + const oldPhase = rampState.currentPhase; await super.logPhaseTransition(id, newPhase, metadata); diff --git a/apps/api/src/api/workers/cleanup.worker.ts b/apps/api/src/api/workers/cleanup.worker.ts index a747c169a..33360e6a5 100644 --- a/apps/api/src/api/workers/cleanup.worker.ts +++ b/apps/api/src/api/workers/cleanup.worker.ts @@ -2,6 +2,7 @@ import { CronJob } from "cron"; import { Op } from "sequelize"; import logger from "../../config/logger"; import { runWithRampContext } from "../../config/ramp-context"; +import { config } from "../../config/vars"; import RampState from "../../models/rampState.model"; import { postProcessHandlers } from "../services/phases/post-process"; import { BaseRampService } from "../services/ramp/base.service"; @@ -152,6 +153,7 @@ class CleanupWorker { order: [["updatedAt", "DESC"]], where: { currentPhase: { [Op.in]: ["complete", "failed", "timedOut"] }, + flowVariant: config.flowVariant, postCompleteState: { cleanup: { [Op.or]: [{ cleanupCompleted: false }, { cleanupCompleted: { [Op.is]: null } }] diff --git a/apps/api/src/api/workers/ramp-recovery.worker.ts b/apps/api/src/api/workers/ramp-recovery.worker.ts index 61b048d1e..ab24bce6d 100644 --- a/apps/api/src/api/workers/ramp-recovery.worker.ts +++ b/apps/api/src/api/workers/ramp-recovery.worker.ts @@ -2,6 +2,7 @@ import { RampErrorLog } from "@vortexfi/shared"; import { CronJob } from "cron"; import { Op } from "sequelize"; import logger from "../../config/logger"; +import { config } from "../../config/vars"; import RampState from "../../models/rampState.model"; import phaseProcessor from "../services/phases/phase-processor"; import rampService from "../services/ramp/ramp.service"; @@ -58,6 +59,7 @@ class RampRecoveryWorker { currentPhase: { [Op.notIn]: ["complete", "failed", "initial"] }, + flowVariant: config.flowVariant, presignedTxs: { [Op.not]: null }, updatedAt: { [Op.lt]: new Date(Date.now() - TEN_MINUTES_IN_MS) // 10 minutes ago diff --git a/apps/api/src/api/workers/unhandled-payment.worker.ts b/apps/api/src/api/workers/unhandled-payment.worker.ts index d9e681f38..a76a8035b 100644 --- a/apps/api/src/api/workers/unhandled-payment.worker.ts +++ b/apps/api/src/api/workers/unhandled-payment.worker.ts @@ -2,6 +2,7 @@ import { BrlaApiService, generateReferenceLabel, normalizeTaxId } from "@vortexf import { CronJob } from "cron"; import { Op } from "sequelize"; import logger from "../../config/logger"; +import { config } from "../../config/vars"; import RampState from "../../models/rampState.model"; import TaxId from "../../models/taxId.model"; import { SlackNotifier } from "../services/slack.service"; @@ -79,6 +80,7 @@ class UnhandledPaymentWorker { [Op.gt]: threeDaysAgo }, currentPhase: "initial", + flowVariant: config.flowVariant, id: { [Op.notIn]: Array.from(this.processedStateIds) } @@ -105,6 +107,7 @@ class UnhandledPaymentWorker { [Op.gt]: threeDaysAgo }, currentPhase: "failed", + flowVariant: config.flowVariant, id: { [Op.notIn]: Array.from(this.processedStateIds) } diff --git a/apps/api/src/config/vars.ts b/apps/api/src/config/vars.ts index f39fa69ed..0c178af72 100644 --- a/apps/api/src/config/vars.ts +++ b/apps/api/src/config/vars.ts @@ -24,8 +24,14 @@ interface SpreadsheetConfig { type DeploymentEnv = "development" | "production" | "sandbox" | "staging" | "test"; +// Identifies which onramp flow this backend instance serves. Two backends +// share one database; each ignores ramps/quotes belonging to the other flow. +// "monerium" is the legacy grace-period backend; "mykobo" is the new replacement. +export type FlowVariant = "monerium" | "mykobo"; + const nodeEnv = process.env.NODE_ENV || "production"; const deploymentEnvValues: DeploymentEnv[] = ["development", "production", "sandbox", "staging", "test"]; +const flowVariantValues: FlowVariant[] = ["monerium", "mykobo"]; function readDeploymentEnv(): DeploymentEnv { const rawDeploymentEnv = process.env.DEPLOYMENT_ENV || (nodeEnv === "production" ? "production" : nodeEnv); @@ -37,9 +43,20 @@ function readDeploymentEnv(): DeploymentEnv { return rawDeploymentEnv as DeploymentEnv; } +function readFlowVariant(): FlowVariant { + const rawFlowVariant = process.env.FLOW_VARIANT || "monerium"; + + if (!flowVariantValues.includes(rawFlowVariant as FlowVariant)) { + throw new Error(`FLOW_VARIANT must be one of: ${flowVariantValues.join(", ")} (got '${rawFlowVariant}')`); + } + + return rawFlowVariant as FlowVariant; +} + interface Config { env: string; deploymentEnv: DeploymentEnv; + flowVariant: FlowVariant; port: string | number; amplitudeWss: string; pendulumWss: string; @@ -132,6 +149,7 @@ export const config: Config = { }, deploymentEnv: readDeploymentEnv(), env: nodeEnv, + flowVariant: readFlowVariant(), integrations: { alchemy: { @@ -230,6 +248,7 @@ if (config.env === "production") { if (!config.supabase.serviceRoleKey) missing.push("SUPABASE_SERVICE_KEY"); if (!config.secrets.webhookPrivateKey) missing.push("WEBHOOK_PRIVATE_KEY"); if (!config.adminSecret) missing.push("ADMIN_SECRET"); + if (!process.env.FLOW_VARIANT) missing.push("FLOW_VARIANT"); if (missing.length > 0) { throw new Error(`Missing required environment variables in production: ${missing.join(", ")}`); diff --git a/apps/api/src/database/migrations/028-add-flow-variant.ts b/apps/api/src/database/migrations/028-add-flow-variant.ts new file mode 100644 index 000000000..92459ac7e --- /dev/null +++ b/apps/api/src/database/migrations/028-add-flow-variant.ts @@ -0,0 +1,30 @@ +import { DataTypes, QueryInterface } from "sequelize"; + +const TABLES = ["quote_tickets", "ramp_states"] as const; + +export async function up(queryInterface: QueryInterface): Promise { + for (const table of TABLES) { + await queryInterface.addColumn(table, "flow_variant", { + allowNull: true, + type: DataTypes.STRING(16) + }); + + await queryInterface.sequelize.query(`UPDATE "${table}" SET "flow_variant" = 'monerium' WHERE "flow_variant" IS NULL`); + + await queryInterface.changeColumn(table, "flow_variant", { + allowNull: false, + type: DataTypes.STRING(16) + }); + + await queryInterface.addIndex(table, ["flow_variant"], { + name: `idx_${table}_flow_variant` + }); + } +} + +export async function down(queryInterface: QueryInterface): Promise { + for (const table of TABLES) { + await queryInterface.removeIndex(table, `idx_${table}_flow_variant`); + await queryInterface.removeColumn(table, "flow_variant"); + } +} diff --git a/apps/api/src/models/quoteTicket.model.ts b/apps/api/src/models/quoteTicket.model.ts index 4d3bbb9d8..6b2b90f4e 100644 --- a/apps/api/src/models/quoteTicket.model.ts +++ b/apps/api/src/models/quoteTicket.model.ts @@ -2,6 +2,7 @@ import { DestinationType, Networks, PaymentMethod, RampCurrency, RampDirection } import { DataTypes, Model, Optional } from "sequelize"; import { QuoteTicketMetadata } from "../api/services/quote/core/types"; import sequelize from "../config/database"; +import { FlowVariant } from "../config/vars"; // Define the attributes of the QuoteTicket model export interface QuoteTicketAttributes { @@ -22,6 +23,7 @@ export interface QuoteTicketAttributes { paymentMethod: PaymentMethod; countryCode: string | null; network: Networks; + flowVariant: FlowVariant; createdAt: Date; updatedAt: Date; } @@ -65,6 +67,8 @@ class QuoteTicket extends Model declare postCompleteState: PostCompleteState; + declare flowVariant: FlowVariant; + declare createdAt: Date; declare updatedAt: Date; @@ -118,6 +122,11 @@ RampState.init( field: "error_logs", type: DataTypes.JSONB }, + flowVariant: { + allowNull: false, + field: "flow_variant", + type: DataTypes.STRING(16) + }, from: { allowNull: false, type: DataTypes.STRING(20) @@ -229,6 +238,10 @@ RampState.init( fields: ["quoteId"], name: "uq_ramp_states_quote_id", unique: true + }, + { + fields: ["flow_variant"], + name: "idx_ramp_states_flow_variant" } ], modelName: "RampState",