Skip to content
Merged
5 changes: 5 additions & 0 deletions packages/server/src/IdentityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ export class IdentityManager {
return await this.stripeManager.getCustomerWithDefaultSource(customerId)
}

public async updateStripeCustomerEmail(customerId: string, email: string) {
if (!this.stripeManager) return
Comment thread
0xi4o marked this conversation as resolved.
Outdated
await this.stripeManager.updateCustomerEmail(customerId, email)
}

public async getAdditionalSeatsProration(subscriptionId: string, newQuantity: number) {
if (!subscriptionId) return {}
if (!this.stripeManager) {
Expand Down
5 changes: 5 additions & 0 deletions packages/server/src/StripeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ export class StripeManager {
}
}

public async updateCustomerEmail(customerId: string, email: string) {
if (!this.stripe) return
Comment thread
0xi4o marked this conversation as resolved.
Outdated
await this.stripe.customers.update(customerId, { email })
}

public async getAdditionalSeatsProration(subscriptionId: string, quantity: number) {
if (!this.stripe) {
throw new Error('Stripe is not initialized')
Expand Down
30 changes: 30 additions & 0 deletions packages/server/src/enterprise/services/user.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { generateId } from '../../utils'
import { GeneralErrorMessage } from '../../utils/constants'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import logger from '../../utils/logger'
import { sanitizeUser } from '../../utils/sanitize.util'
import { Telemetry, TelemetryEventType } from '../../utils/telemetry'
import { Platform } from '../../Interface'
import { User, UserStatus } from '../database/entities/user.entity'
import { destroyAllSessionsForUser } from '../middleware/passport/SessionPersistance'
import { compareHash, getHash } from '../utils/encryption.util'
import { isInvalidEmail, isInvalidName, isInvalidPassword, isInvalidUUID } from '../utils/validation.util'
import { OrganizationService } from './organization.service'
import { OrganizationUserService } from './organization-user.service'

export const enum UserErrorMessage {
EXPIRED_TEMP_TOKEN = 'Expired Temporary Token',
Expand Down Expand Up @@ -145,6 +149,8 @@ export class UserService {
const oldUserData = await this.readUserById(newUserData.id, queryRunner)
if (!oldUserData) throw new InternalFlowiseError(StatusCodes.NOT_FOUND, UserErrorMessage.USER_NOT_FOUND)

const currentEmail = oldUserData.email

if (newUserData.updatedBy) {
const updateUserData = await this.readUserById(newUserData.updatedBy, queryRunner)
if (!updateUserData) throw new InternalFlowiseError(StatusCodes.NOT_FOUND, UserErrorMessage.USER_NOT_FOUND)
Expand Down Expand Up @@ -185,6 +191,30 @@ export class UserService {
if (newUserData.oldPassword && newUserData.newPassword && newUserData.confirmPassword) {
await destroyAllSessionsForUser(updatedUser.id as string)
}

// Update Stripe customer email when user changes email (CLOUD only; expect exactly one org)
const appServer = getRunningExpressApp()
if (appServer.identityManager.getPlatformType() === Platform.CLOUD && updatedUser.email && updatedUser.email !== currentEmail) {
const organizationUserService = new OrganizationUserService()
Comment thread
0xi4o marked this conversation as resolved.
Outdated
const organizationService = new OrganizationService()
let syncQueryRunner: QueryRunner | undefined
try {
syncQueryRunner = this.dataSource.createQueryRunner()
await syncQueryRunner.connect()
const orgUsers = await organizationUserService.readOrganizationUserByUserId(updatedUser.id as string, syncQueryRunner)
const orgUsersCreatedByUser = orgUsers.filter((ou) => ou.createdBy === updatedUser.id)
if (orgUsersCreatedByUser.length === 1) {
const org = await organizationService.readOrganizationById(orgUsersCreatedByUser[0].organizationId, syncQueryRunner)
Comment thread
0xi4o marked this conversation as resolved.
Outdated
if (org?.customerId) {
await appServer.identityManager.updateStripeCustomerEmail(org.customerId, updatedUser.email as string)
}
}
} catch (stripeError) {
logger.warn(`Failed to update Stripe customer email for user ${updatedUser.id}:`, stripeError)
} finally {
if (syncQueryRunner && !syncQueryRunner.isReleased) await syncQueryRunner.release()
}
}
} catch (error) {
if (queryRunner && queryRunner.isTransactionActive) await queryRunner.rollbackTransaction()
throw error
Expand Down
Loading