From 77d57e18b78c982d72492befbc863743376d7688 Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Sat, 11 Apr 2026 21:30:49 -0700 Subject: [PATCH 01/12] Add resilient Stately backfill job and manifest --- api/stately/init/export-share-ids.ts | 328 +++++++++++++++++++ kubernetes/README.md | 44 +++ kubernetes/dim-api-stately-backfill-job.yaml | 90 +++++ 3 files changed, 462 insertions(+) create mode 100644 api/stately/init/export-share-ids.ts create mode 100644 kubernetes/dim-api-stately-backfill-job.yaml diff --git a/api/stately/init/export-share-ids.ts b/api/stately/init/export-share-ids.ts new file mode 100644 index 00000000..1ad8d6de --- /dev/null +++ b/api/stately/init/export-share-ids.ts @@ -0,0 +1,328 @@ +import fs from 'node:fs/promises'; +import { DatabaseError } from 'pg-protocol'; +import { closeDbPool, transaction } from '../../db/index.js'; +import { addLoadoutShareIgnoring } from '../../db/loadout-share-queries.js'; +import { backfillMigrationState } from '../../db/migration-state-queries.js'; +import { replaceSettingsIfNotPresent } from '../../db/settings-queries.js'; +import { Loadout } from '../../shapes/loadouts.js'; +import { defaultSettings } from '../../shapes/settings.js'; +import { delay, subtractObject } from '../../utils.js'; +import { client } from '../client.js'; +import { Settings } from '../generated/stately_pb.js'; +import { keyFor as keyForLoadoutShare } from '../loadout-share-queries.js'; +import { convertLoadoutFromStately } from '../loadouts-queries.js'; +import { convertToDimSettings, keyFor as settingsKey } from '../settings-queries.js'; + +const tokenPath = process.env.BACKFILL_TOKEN_PATH ?? 'backfill-token.bin'; +const profileBatchSize = parseNumberEnv('BACKFILL_PROFILE_BATCH_SIZE', 1000); +const settingsBatchSize = parseNumberEnv('BACKFILL_SETTINGS_BATCH_SIZE', 50); +const shareBatchSize = parseNumberEnv('BACKFILL_SHARE_BATCH_SIZE', 50); +const continuationDelayMs = parseNumberEnv('BACKFILL_CONTINUATION_DELAY_MS', 1000); +const retryMaxAttempts = parseNumberEnv('BACKFILL_RETRY_MAX_ATTEMPTS', 12); +const retryBaseDelayMs = parseNumberEnv('BACKFILL_RETRY_BASE_DELAY_MS', 1000); +const retryMaxDelayMs = parseNumberEnv('BACKFILL_RETRY_MAX_DELAY_MS', 30000); + +interface RetryableError { + code?: string; + status?: number; + statusCode?: number; + headers?: Record; + response?: { + status?: number; + headers?: Record; + }; + cause?: unknown; + message?: string; +} + +function parseNumberEnv(envName: string, defaultValue: number) { + const raw = process.env[envName]; + if (!raw) { + return defaultValue; + } + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed <= 0) { + throw new Error(`${envName} must be a positive number`); + } + return parsed; +} + +function getNestedStatus(error: RetryableError): number | undefined { + return error.status ?? error.statusCode ?? error.response?.status; +} + +function getHeaders(error: RetryableError): Record | undefined { + return error.headers ?? error.response?.headers; +} + +function getRetryAfterMs(error: RetryableError): number | undefined { + const headers = getHeaders(error); + const retryAfter = headers?.['retry-after'] ?? headers?.['Retry-After']; + if (retryAfter === undefined) { + return undefined; + } + + const retryAfterSeconds = Number(retryAfter); + if (Number.isFinite(retryAfterSeconds) && retryAfterSeconds > 0) { + return retryAfterSeconds * 1000; + } + + if (typeof retryAfter === 'string') { + const retryAt = Date.parse(retryAfter); + if (!Number.isNaN(retryAt)) { + return Math.max(0, retryAt - Date.now()); + } + } + + return undefined; +} + +function isRetryableError(error: unknown): boolean { + if (error instanceof DatabaseError) { + return error.code ? ['40001', '40P01', '53300', '57P03'].includes(error.code) : false; + } + + if (!error || typeof error !== 'object') { + return false; + } + + const retryable = error as RetryableError; + const status = getNestedStatus(retryable); + + if (status === 429 || (status !== undefined && status >= 500)) { + return true; + } + + if ( + retryable.code && + ['ETIMEDOUT', 'ECONNRESET', 'ECONNREFUSED', 'EPIPE', 'ENOTFOUND'].includes(retryable.code) + ) { + return true; + } + + if (typeof retryable.message === 'string') { + const msg = retryable.message.toLowerCase(); + return ( + msg.includes('throttl') || + msg.includes('too many requests') || + msg.includes('rate limit') || + msg.includes('temporar') || + msg.includes('timeout') + ); + } + + if (retryable.cause) { + return isRetryableError(retryable.cause); + } + + return false; +} + +function retryDelayMs(error: unknown, attempt: number) { + const forcedDelay = + error && typeof error === 'object' ? getRetryAfterMs(error as RetryableError) : undefined; + if (forcedDelay !== undefined) { + return Math.min(retryMaxDelayMs, Math.max(retryBaseDelayMs, forcedDelay)); + } + + const exponential = retryBaseDelayMs * 2 ** (attempt - 1); + const jitter = Math.floor(Math.random() * retryBaseDelayMs); + return Math.min(retryMaxDelayMs, exponential + jitter); +} + +async function withRetry(name: string, fn: () => T | Promise): Promise { + let attempt = 0; + while (true) { + attempt += 1; + try { + return await fn(); + } catch (error) { + if (!isRetryableError(error) || attempt >= retryMaxAttempts) { + throw error; + } + + const waitMs = retryDelayMs(error, attempt); + const message = error instanceof Error ? error.message : String(error); + console.log( + `${name} failed with retryable error (attempt ${attempt}/${retryMaxAttempts}). Waiting ${waitMs}ms before retry.`, + message, + ); + await delay(waitMs); + } + } +} + +const tokenData = await fs.readFile(tokenPath).catch(() => null); + +type ScanList = ReturnType; + +let list: ScanList = tokenData + ? await withRetry('Continue scan from token file', () => client.continueScan(tokenData)) + : await withRetry('Begin scan', () => + client.beginScan({ + itemTypes: [ + 'LoadoutShare', + 'ItemAnnotation', + 'ItemHashTag', + 'Loadout', + 'Search', + 'Triumph', + 'Settings', + ], + }), + ); + +const profileIds = new Set(); + +const settingsQueue: Settings[] = []; +const shareQueue: { + loadout: Loadout; + viewCount: number; + platformMembershipId: string; + shareId: string; +}[] = []; + +async function flushProfileIds(force = false) { + if (!force && profileIds.size < profileBatchSize) { + return; + } + if (profileIds.size === 0) { + return; + } + + console.log('Backfilling migration states for', profileIds.size, 'profiles...'); + const items = [...profileIds]; + await withRetry('Backfill migration states', async () => { + await transaction(async (pgClient) => { + for (const profileId of items) { + await backfillMigrationState(pgClient, profileId.toString(), undefined); + } + }); + }); + profileIds.clear(); + console.log('Done'); +} + +async function flushSettingsQueue(force = false) { + if (!force && settingsQueue.length < settingsBatchSize) { + return; + } + if (settingsQueue.length === 0) { + return; + } + + console.log('Backfilling settings for', settingsQueue.length, 'users...'); + const batch = [...settingsQueue]; + await withRetry('Backfill settings', async () => { + await transaction(async (pgClient) => { + for (const settings of batch) { + await replaceSettingsIfNotPresent( + pgClient, + Number(settings.memberId), + subtractObject(convertToDimSettings(settings), defaultSettings), + ); + } + }); + }); + + await withRetry('Delete migrated settings from Stately', async () => { + await client.del(...batch.map((s) => settingsKey(Number(s.memberId)))); + }); + + settingsQueue.splice(0, batch.length); + console.log('Done'); +} + +async function flushShareQueue(force = false) { + if (!force && shareQueue.length < shareBatchSize) { + return; + } + if (shareQueue.length === 0) { + return; + } + + console.log('Backfilling', shareQueue.length, 'loadout shares...'); + const batch = [...shareQueue]; + + await withRetry('Backfill loadout shares', async () => { + await transaction(async (pgClient) => { + for (const share of batch) { + try { + await addLoadoutShareIgnoring( + pgClient, + undefined, + share.platformMembershipId, + share.shareId, + share.loadout, + ); + } catch (error) { + if (error instanceof DatabaseError && error.code === '23505') { + // unique violation, delete the stately one + console.log('Loadout share collision ignoring', share.shareId); + } else { + throw error; + } + } + } + }); + }); + + await withRetry('Delete migrated loadout shares from Stately', async () => { + await client.del(...batch.map((s) => keyForLoadoutShare(s.shareId))); + }); + + shareQueue.splice(0, batch.length); + console.log('Done'); +} + +let scanComplete = false; + +try { + while (true) { + for await (const item of list) { + if (client.isType(item, 'LoadoutShare')) { + shareQueue.push({ + loadout: convertLoadoutFromStately(item), + viewCount: item.viewCount, + platformMembershipId: item.profileId.toString(), + shareId: item.id, + }); + } else if (client.isType(item, 'Settings')) { + settingsQueue.push(item); + } else if ('profileId' in item) { + profileIds.add(item.profileId); + } + + await flushProfileIds(); + await flushSettingsQueue(); + await flushShareQueue(); + } + + const token = list.token; + if (!token) { + console.log('Scan token missing, ending scan loop.'); + break; + } + + await fs.writeFile(tokenPath, token.tokenData); + if (!token.canContinue) { + console.log('Scan complete.'); + scanComplete = true; + break; + } + + await delay(continuationDelayMs); + console.log('Continuing scan...'); + list = await withRetry('Continue scan', () => client.continueScan(token)); + } + + await flushProfileIds(true); + await flushSettingsQueue(true); + await flushShareQueue(true); + + if (scanComplete) { + await fs.unlink(tokenPath).catch(() => undefined); + } +} finally { + await closeDbPool(); +} diff --git a/kubernetes/README.md b/kubernetes/README.md index fbf93e76..79efe644 100644 --- a/kubernetes/README.md +++ b/kubernetes/README.md @@ -19,3 +19,47 @@ An Ingress Controller that uses NGINX. Deployed only on DigitalOcean, by followi ## DIM API Our NodeJS service. Deployed via dim-api-\*.yaml. + +## Stately Backfill Job + +Run a one-time migration scan from StatelyDB into Postgres with +`dim-api-stately-backfill-job.yaml`. + +The job is resumable across pod restarts by storing scan token state in a PVC. + +Required environment inputs: + +- `dim-api-config` ConfigMap (same values used by API deployment) +- `dim-api-secret` keys: + - `pg_password` + - `stately_access_key` + - `stately_store_id` + +Build and push image: + +```sh +COMMITHASH=$(git rev-parse HEAD) +docker buildx build --platform linux/amd64 --push -t destinyitemmanager/dim-api:$COMMITHASH . +``` + +Apply the job manifest: + +```sh +mkdir -p deployment +cp kubernetes/dim-api-stately-backfill-job.yaml deployment/ +sed -i'' -e "s/\$COMMITHASH/$COMMITHASH/" deployment/dim-api-stately-backfill-job.yaml +kubectl apply -f deployment/dim-api-stately-backfill-job.yaml +``` + +Inspect progress: + +```sh +kubectl logs -f job/dim-api-stately-backfill +``` + +If re-running after completion, delete and recreate the job: + +```sh +kubectl delete job dim-api-stately-backfill +kubectl apply -f deployment/dim-api-stately-backfill-job.yaml +``` diff --git a/kubernetes/dim-api-stately-backfill-job.yaml b/kubernetes/dim-api-stately-backfill-job.yaml new file mode 100644 index 00000000..93c89cc2 --- /dev/null +++ b/kubernetes/dim-api-stately-backfill-job.yaml @@ -0,0 +1,90 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: dim-api-stately-backfill-token + labels: + app: dim-api-stately-backfill +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: dim-api-stately-backfill + labels: + app: dim-api-stately-backfill +spec: + completions: 1 + parallelism: 1 + backoffLimit: 12 + template: + metadata: + labels: + app: dim-api-stately-backfill + annotations: + cluster-autoscaler.kubernetes.io/safe-to-evict: "false" + spec: + restartPolicy: OnFailure + containers: + - name: stately-backfill + image: destinyitemmanager/dim-api:$COMMITHASH + imagePullPolicy: IfNotPresent + command: ["node"] + args: + - --enable-source-maps + - api/stately/init/export-share-ids.js + env: + - name: NODE_ENV + value: production + - name: BACKFILL_TOKEN_PATH + value: /var/lib/dim-api-backfill/backfill-token.bin + - name: BACKFILL_RETRY_MAX_ATTEMPTS + value: "12" + - name: BACKFILL_RETRY_BASE_DELAY_MS + value: "1000" + - name: BACKFILL_RETRY_MAX_DELAY_MS + value: "30000" + - name: BACKFILL_PROFILE_BATCH_SIZE + value: "1000" + - name: BACKFILL_SETTINGS_BATCH_SIZE + value: "50" + - name: BACKFILL_SHARE_BATCH_SIZE + value: "50" + - name: BACKFILL_CONTINUATION_DELAY_MS + value: "1000" + - name: PGPASSWORD + valueFrom: + secretKeyRef: + name: dim-api-secret + key: pg_password + - name: STATELY_ACCESS_KEY + valueFrom: + secretKeyRef: + name: dim-api-secret + key: stately_access_key + - name: STATELY_STORE_ID + valueFrom: + secretKeyRef: + name: dim-api-secret + key: stately_store_id + envFrom: + - configMapRef: + name: dim-api-config + resources: + requests: + cpu: 200m + memory: 256Mi + limits: + cpu: "1" + memory: 1Gi + volumeMounts: + - name: token-state + mountPath: /var/lib/dim-api-backfill + volumes: + - name: token-state + persistentVolumeClaim: + claimName: dim-api-stately-backfill-token From 36aedbd67a20e6cc9694c19c0d28a3e98057b8bb Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Sat, 11 Apr 2026 21:37:44 -0700 Subject: [PATCH 02/12] Rename backfill entrypoint to stately-backfill --- ...xport-share-ids.ts => stately-backfill.ts} | 24 +++++++++++++++---- kubernetes/dim-api-stately-backfill-job.yaml | 2 +- 2 files changed, 21 insertions(+), 5 deletions(-) rename api/stately/init/{export-share-ids.ts => stately-backfill.ts} (93%) diff --git a/api/stately/init/export-share-ids.ts b/api/stately/init/stately-backfill.ts similarity index 93% rename from api/stately/init/export-share-ids.ts rename to api/stately/init/stately-backfill.ts index 1ad8d6de..5a51f804 100644 --- a/api/stately/init/export-share-ids.ts +++ b/api/stately/init/stately-backfill.ts @@ -1,18 +1,33 @@ import fs from 'node:fs/promises'; +import { keyPath } from '@stately-cloud/client'; import { DatabaseError } from 'pg-protocol'; import { closeDbPool, transaction } from '../../db/index.js'; -import { addLoadoutShareIgnoring } from '../../db/loadout-share-queries.js'; +import { addLoadoutShare } from '../../db/loadout-share-queries.js'; import { backfillMigrationState } from '../../db/migration-state-queries.js'; -import { replaceSettingsIfNotPresent } from '../../db/settings-queries.js'; +import { getSettings, replaceSettings } from '../../db/settings-queries.js'; import { Loadout } from '../../shapes/loadouts.js'; import { defaultSettings } from '../../shapes/settings.js'; import { delay, subtractObject } from '../../utils.js'; import { client } from '../client.js'; import { Settings } from '../generated/stately_pb.js'; -import { keyFor as keyForLoadoutShare } from '../loadout-share-queries.js'; import { convertLoadoutFromStately } from '../loadouts-queries.js'; import { convertToDimSettings, keyFor as settingsKey } from '../settings-queries.js'; +function keyForLoadoutShare(shareId: string) { + return keyPath`/loadoutShare-${shareId}`; +} + +async function replaceSettingsIfNotPresent( + pgClient: Parameters[0], + bungieMembershipId: number, + settings: Partial, +) { + const existing = await getSettings(pgClient, bungieMembershipId); + if (!existing || existing.deleted) { + await replaceSettings(pgClient, bungieMembershipId, settings); + } +} + const tokenPath = process.env.BACKFILL_TOKEN_PATH ?? 'backfill-token.bin'; const profileBatchSize = parseNumberEnv('BACKFILL_PROFILE_BATCH_SIZE', 1000); const settingsBatchSize = parseNumberEnv('BACKFILL_SETTINGS_BATCH_SIZE', 50); @@ -248,12 +263,13 @@ async function flushShareQueue(force = false) { await transaction(async (pgClient) => { for (const share of batch) { try { - await addLoadoutShareIgnoring( + await addLoadoutShare( pgClient, undefined, share.platformMembershipId, share.shareId, share.loadout, + share.viewCount, ); } catch (error) { if (error instanceof DatabaseError && error.code === '23505') { diff --git a/kubernetes/dim-api-stately-backfill-job.yaml b/kubernetes/dim-api-stately-backfill-job.yaml index 93c89cc2..d1bfe17f 100644 --- a/kubernetes/dim-api-stately-backfill-job.yaml +++ b/kubernetes/dim-api-stately-backfill-job.yaml @@ -36,7 +36,7 @@ spec: command: ["node"] args: - --enable-source-maps - - api/stately/init/export-share-ids.js + - api/stately/init/stately-backfill.js env: - name: NODE_ENV value: production From b12c39f731c55c669d9ad1bdc048c489b89e49ed Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Sat, 11 Apr 2026 21:38:08 -0700 Subject: [PATCH 03/12] Add deploy command for stately backfill job --- kubernetes/deploy-stately-backfill-job.sh | 18 ++++++++++++++ kubernetes/dim-api-stately-backfill-job.yaml | 25 ++++++++------------ package.json | 1 + 3 files changed, 29 insertions(+), 15 deletions(-) create mode 100755 kubernetes/deploy-stately-backfill-job.sh diff --git a/kubernetes/deploy-stately-backfill-job.sh b/kubernetes/deploy-stately-backfill-job.sh new file mode 100755 index 00000000..b6ad9edf --- /dev/null +++ b/kubernetes/deploy-stately-backfill-job.sh @@ -0,0 +1,18 @@ +#!/bin/bash -ex + +ROOT=$(git rev-parse --show-toplevel) +COMMITHASH=${GITHUB_SHA:-$(git rev-parse HEAD)} +IMAGE="destinyitemmanager/dim-api:$COMMITHASH" + +rm -rf dist && pnpm build:api && docker buildx build --platform linux/amd64 --push -t "$IMAGE" "$ROOT" + +mkdir -p "$ROOT/deployment" +cp "$ROOT/kubernetes/dim-api-stately-backfill-job.yaml" "$ROOT/deployment" + +sed -i'' -e "s/\$COMMITHASH/$COMMITHASH/" "$ROOT/deployment/dim-api-stately-backfill-job.yaml" + +# Job specs are immutable in Kubernetes; delete the old Job before applying updates. +kubectl delete job dim-api-stately-backfill --ignore-not-found=true +kubectl apply -f "$ROOT/deployment/dim-api-stately-backfill-job.yaml" + +rm -rf "$ROOT/deployment" diff --git a/kubernetes/dim-api-stately-backfill-job.yaml b/kubernetes/dim-api-stately-backfill-job.yaml index d1bfe17f..7f2914da 100644 --- a/kubernetes/dim-api-stately-backfill-job.yaml +++ b/kubernetes/dim-api-stately-backfill-job.yaml @@ -26,14 +26,14 @@ spec: labels: app: dim-api-stately-backfill annotations: - cluster-autoscaler.kubernetes.io/safe-to-evict: "false" + cluster-autoscaler.kubernetes.io/safe-to-evict: 'false' spec: restartPolicy: OnFailure containers: - name: stately-backfill image: destinyitemmanager/dim-api:$COMMITHASH imagePullPolicy: IfNotPresent - command: ["node"] + command: ['node'] args: - --enable-source-maps - api/stately/init/stately-backfill.js @@ -43,19 +43,19 @@ spec: - name: BACKFILL_TOKEN_PATH value: /var/lib/dim-api-backfill/backfill-token.bin - name: BACKFILL_RETRY_MAX_ATTEMPTS - value: "12" + value: '12' - name: BACKFILL_RETRY_BASE_DELAY_MS - value: "1000" + value: '1000' - name: BACKFILL_RETRY_MAX_DELAY_MS - value: "30000" + value: '30000' - name: BACKFILL_PROFILE_BATCH_SIZE - value: "1000" + value: '1000' - name: BACKFILL_SETTINGS_BATCH_SIZE - value: "50" + value: '50' - name: BACKFILL_SHARE_BATCH_SIZE - value: "50" + value: '50' - name: BACKFILL_CONTINUATION_DELAY_MS - value: "1000" + value: '1000' - name: PGPASSWORD valueFrom: secretKeyRef: @@ -66,11 +66,6 @@ spec: secretKeyRef: name: dim-api-secret key: stately_access_key - - name: STATELY_STORE_ID - valueFrom: - secretKeyRef: - name: dim-api-secret - key: stately_store_id envFrom: - configMapRef: name: dim-api-config @@ -79,7 +74,7 @@ spec: cpu: 200m memory: 256Mi limits: - cpu: "1" + cpu: '1' memory: 1Gi volumeMounts: - name: token-state diff --git a/package.json b/package.json index df4c93c5..e9a8fb05 100644 --- a/package.json +++ b/package.json @@ -15,6 +15,7 @@ "start": "rm -rf dist && pnpm build:api && node -r dotenv/config dist/api/index.js", "build:api": "mkdir -p ./dist/api/stately/generated && cp -r ./api/stately/generated/*.js ./dist/api/stately/generated && tsc -p ./api/tsconfig.json", "deploy": "kubernetes/deploy.sh", + "deploy:stately-backfill": "kubernetes/deploy-stately-backfill-job.sh", "docker:build": "rm -rf dist && pnpm build:api && docker build -t destinyitemmanager/dim-api .", "docker:run": "docker run -p 3000:3000 destinyitemmanager/dim-api:latest", "docker:push": "docker push destinyitemmanager/dim-api:latest", From 22324aad17e9e3595d9b5424dbed17b8c4ab1d38 Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Sat, 11 Apr 2026 21:46:12 -0700 Subject: [PATCH 04/12] Fixes --- api/stately/init/stately-backfill.ts | 40 +++++++++++++++++++- kubernetes/dim-api-stately-backfill-job.yaml | 5 +++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/api/stately/init/stately-backfill.ts b/api/stately/init/stately-backfill.ts index 5a51f804..1d1c965f 100644 --- a/api/stately/init/stately-backfill.ts +++ b/api/stately/init/stately-backfill.ts @@ -1,5 +1,5 @@ import fs from 'node:fs/promises'; -import { keyPath } from '@stately-cloud/client'; +import { keyPath, StatelyError } from '@stately-cloud/client'; import { DatabaseError } from 'pg-protocol'; import { closeDbPool, transaction } from '../../db/index.js'; import { addLoadoutShare } from '../../db/loadout-share-queries.js'; @@ -36,9 +36,11 @@ const continuationDelayMs = parseNumberEnv('BACKFILL_CONTINUATION_DELAY_MS', 100 const retryMaxAttempts = parseNumberEnv('BACKFILL_RETRY_MAX_ATTEMPTS', 12); const retryBaseDelayMs = parseNumberEnv('BACKFILL_RETRY_BASE_DELAY_MS', 1000); const retryMaxDelayMs = parseNumberEnv('BACKFILL_RETRY_MAX_DELAY_MS', 30000); +const statelyThrottleMinDelayMs = parseNumberEnv('BACKFILL_STATELY_THROTTLE_DELAY_MS', 15000); interface RetryableError { code?: string; + statelyCode?: string; status?: number; statusCode?: number; headers?: Record; @@ -50,6 +52,33 @@ interface RetryableError { message?: string; } +function isStatelyThroughputError(error: unknown): boolean { + if (error instanceof StatelyError) { + return ( + error.statelyCode === 'StoreThroughputExceeded' || + error.statelyCode === 'StoreRequestLimitExceeded' + ); + } + + if (!error || typeof error !== 'object') { + return false; + } + + const retryable = error as RetryableError; + if ( + retryable.statelyCode === 'StoreThroughputExceeded' || + retryable.statelyCode === 'StoreRequestLimitExceeded' + ) { + return true; + } + + if (retryable.cause) { + return isStatelyThroughputError(retryable.cause); + } + + return false; +} + function parseNumberEnv(envName: string, defaultValue: number) { const raw = process.env[envName]; if (!raw) { @@ -142,7 +171,14 @@ function retryDelayMs(error: unknown, attempt: number) { const exponential = retryBaseDelayMs * 2 ** (attempt - 1); const jitter = Math.floor(Math.random() * retryBaseDelayMs); - return Math.min(retryMaxDelayMs, exponential + jitter); + const retryDelay = exponential + jitter; + + if (isStatelyThroughputError(error)) { + // Give Stately autoscaling enough time to react to hot partitions and traffic bursts. + return Math.min(retryMaxDelayMs, Math.max(statelyThrottleMinDelayMs, retryDelay)); + } + + return Math.min(retryMaxDelayMs, retryDelay); } async function withRetry(name: string, fn: () => T | Promise): Promise { diff --git a/kubernetes/dim-api-stately-backfill-job.yaml b/kubernetes/dim-api-stately-backfill-job.yaml index 7f2914da..2075b65e 100644 --- a/kubernetes/dim-api-stately-backfill-job.yaml +++ b/kubernetes/dim-api-stately-backfill-job.yaml @@ -28,6 +28,11 @@ spec: annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: 'false' spec: + securityContext: + runAsUser: 1000 + runAsGroup: 1000 + fsGroup: 1000 + fsGroupChangePolicy: OnRootMismatch restartPolicy: OnFailure containers: - name: stately-backfill From 4a127b31019ff98e4ab40b21f02115014915181e Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Sat, 11 Apr 2026 21:51:49 -0700 Subject: [PATCH 05/12] hardier --- api/stately/init/stately-backfill.ts | 66 +++++++++++++++++++++------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/api/stately/init/stately-backfill.ts b/api/stately/init/stately-backfill.ts index 1d1c965f..ee54a0b3 100644 --- a/api/stately/init/stately-backfill.ts +++ b/api/stately/init/stately-backfill.ts @@ -122,6 +122,17 @@ function getRetryAfterMs(error: RetryableError): number | undefined { } function isRetryableError(error: unknown): boolean { + if (error instanceof StatelyError) { + return [ + 'StoreThroughputExceeded', + 'StoreRequestLimitExceeded', + 'StoreInUse', + 'ConcurrentModification', + 'CachedSchemaTooOld', + 'BackupsUnavailable', + ].includes(error.statelyCode); + } + if (error instanceof DatabaseError) { return error.code ? ['40001', '40P01', '53300', '57P03'].includes(error.code) : false; } @@ -331,23 +342,48 @@ let scanComplete = false; try { while (true) { - for await (const item of list) { - if (client.isType(item, 'LoadoutShare')) { - shareQueue.push({ - loadout: convertLoadoutFromStately(item), - viewCount: item.viewCount, - platformMembershipId: item.profileId.toString(), - shareId: item.id, - }); - } else if (client.isType(item, 'Settings')) { - settingsQueue.push(item); - } else if ('profileId' in item) { - profileIds.add(item.profileId); + try { + for await (const item of list) { + if (client.isType(item, 'LoadoutShare')) { + shareQueue.push({ + loadout: convertLoadoutFromStately(item), + viewCount: item.viewCount, + platformMembershipId: item.profileId.toString(), + shareId: item.id, + }); + } else if (client.isType(item, 'Settings')) { + settingsQueue.push(item); + } else if ('profileId' in item) { + profileIds.add(item.profileId); + } + + await flushProfileIds(); + await flushSettingsQueue(); + await flushShareQueue(); + } + } catch (error) { + if (!isRetryableError(error)) { + throw error; } - await flushProfileIds(); - await flushSettingsQueue(); - await flushShareQueue(); + const token = list.token; + if (!token) { + throw error; + } + + await fs.writeFile(tokenPath, token.tokenData); + const waitMs = retryDelayMs(error, 1); + const message = error instanceof Error ? error.message : String(error); + console.log( + `Scan iteration failed with retryable error. Waiting ${waitMs}ms before continuing scan.`, + message, + ); + await delay(waitMs); + + list = await withRetry('Continue scan after retryable scan failure', () => + client.continueScan(token), + ); + continue; } const token = list.token; From 0f252508e79267cf44f61a9672409a84fec2d532 Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Thu, 23 Apr 2026 22:33:38 -0700 Subject: [PATCH 06/12] Ignore loadout collisions --- api/stately/init/stately-backfill.ts | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/api/stately/init/stately-backfill.ts b/api/stately/init/stately-backfill.ts index ee54a0b3..2f6c4a98 100644 --- a/api/stately/init/stately-backfill.ts +++ b/api/stately/init/stately-backfill.ts @@ -2,7 +2,7 @@ import fs from 'node:fs/promises'; import { keyPath, StatelyError } from '@stately-cloud/client'; import { DatabaseError } from 'pg-protocol'; import { closeDbPool, transaction } from '../../db/index.js'; -import { addLoadoutShare } from '../../db/loadout-share-queries.js'; +import { addLoadoutShareIfNotPresent } from '../../db/loadout-share-queries.js'; import { backfillMigrationState } from '../../db/migration-state-queries.js'; import { getSettings, replaceSettings } from '../../db/settings-queries.js'; import { Loadout } from '../../shapes/loadouts.js'; @@ -309,22 +309,16 @@ async function flushShareQueue(force = false) { await withRetry('Backfill loadout shares', async () => { await transaction(async (pgClient) => { for (const share of batch) { - try { - await addLoadoutShare( - pgClient, - undefined, - share.platformMembershipId, - share.shareId, - share.loadout, - share.viewCount, - ); - } catch (error) { - if (error instanceof DatabaseError && error.code === '23505') { - // unique violation, delete the stately one - console.log('Loadout share collision ignoring', share.shareId); - } else { - throw error; - } + const inserted = await addLoadoutShareIfNotPresent( + pgClient, + undefined, + share.platformMembershipId, + share.shareId, + share.loadout, + share.viewCount, + ); + if (!inserted) { + console.log('Loadout share collision ignoring', share.shareId); } } }); From 26861f73357b728e6bafb0379217bee7de6885fd Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Thu, 23 Apr 2026 22:52:19 -0700 Subject: [PATCH 07/12] Paralellize --- api/stately/init/stately-backfill.ts | 285 ++++++++++++------- kubernetes/README.md | 11 + kubernetes/dim-api-stately-backfill-job.yaml | 2 + 3 files changed, 188 insertions(+), 110 deletions(-) diff --git a/api/stately/init/stately-backfill.ts b/api/stately/init/stately-backfill.ts index 2f6c4a98..7e2152b0 100644 --- a/api/stately/init/stately-backfill.ts +++ b/api/stately/init/stately-backfill.ts @@ -28,7 +28,7 @@ async function replaceSettingsIfNotPresent( } } -const tokenPath = process.env.BACKFILL_TOKEN_PATH ?? 'backfill-token.bin'; +const configuredTokenPath = process.env.BACKFILL_TOKEN_PATH ?? 'backfill-token.bin'; const profileBatchSize = parseNumberEnv('BACKFILL_PROFILE_BATCH_SIZE', 1000); const settingsBatchSize = parseNumberEnv('BACKFILL_SETTINGS_BATCH_SIZE', 50); const shareBatchSize = parseNumberEnv('BACKFILL_SHARE_BATCH_SIZE', 50); @@ -37,6 +37,11 @@ const retryMaxAttempts = parseNumberEnv('BACKFILL_RETRY_MAX_ATTEMPTS', 12); const retryBaseDelayMs = parseNumberEnv('BACKFILL_RETRY_BASE_DELAY_MS', 1000); const retryMaxDelayMs = parseNumberEnv('BACKFILL_RETRY_MAX_DELAY_MS', 30000); const statelyThrottleMinDelayMs = parseNumberEnv('BACKFILL_STATELY_THROTTLE_DELAY_MS', 15000); +const configuredParallelSegments = parseNumberEnv('BACKFILL_PARALLEL_SEGMENTS', 1); +const configuredTotalSegments = process.env.BACKFILL_TOTAL_SEGMENTS + ? parseNumberEnv('BACKFILL_TOTAL_SEGMENTS', 1) + : undefined; +const configuredSegmentIndex = parseNonNegativeIntEnv('BACKFILL_SEGMENT_INDEX'); interface RetryableError { code?: string; @@ -91,6 +96,34 @@ function parseNumberEnv(envName: string, defaultValue: number) { return parsed; } +function parseNonNegativeIntEnv(envName: string): number | undefined { + const raw = process.env[envName]; + if (raw === undefined || raw === '') { + return undefined; + } + + const parsed = Number(raw); + if (!Number.isInteger(parsed) || parsed < 0) { + throw new Error(`${envName} must be a non-negative integer`); + } + + return parsed; +} + +function tokenPathForSegment(basePath: string, segmentIndex: number, totalSegments: number): string { + if (totalSegments <= 1) { + return basePath; + } + + const marker = `.segment-${segmentIndex + 1}-of-${totalSegments}`; + const dotIndex = basePath.lastIndexOf('.'); + if (dotIndex <= 0) { + return `${basePath}${marker}`; + } + + return `${basePath.slice(0, dotIndex)}${marker}${basePath.slice(dotIndex)}`; +} + function getNestedStatus(error: RetryableError): number | undefined { return error.status ?? error.statusCode ?? error.response?.status; } @@ -214,127 +247,134 @@ async function withRetry(name: string, fn: () => T | Promise): Promise } } -const tokenData = await fs.readFile(tokenPath).catch(() => null); - type ScanList = ReturnType; -let list: ScanList = tokenData - ? await withRetry('Continue scan from token file', () => client.continueScan(tokenData)) - : await withRetry('Begin scan', () => - client.beginScan({ - itemTypes: [ - 'LoadoutShare', - 'ItemAnnotation', - 'ItemHashTag', - 'Loadout', - 'Search', - 'Triumph', - 'Settings', - ], - }), - ); - -const profileIds = new Set(); - -const settingsQueue: Settings[] = []; -const shareQueue: { - loadout: Loadout; - viewCount: number; - platformMembershipId: string; - shareId: string; -}[] = []; +async function runSegment(segmentIndex: number, totalSegments: number, workerCount: number) { + const tokenPath = tokenPathForSegment(configuredTokenPath, segmentIndex, totalSegments); + const logPrefix = `[segment ${segmentIndex + 1}/${totalSegments}]`; + const tokenData = await fs.readFile(tokenPath).catch(() => null); + + console.log(logPrefix, 'Starting scan using token file', tokenPath); + + let list: ScanList = tokenData + ? await withRetry(`${logPrefix} Continue scan from token file`, () => + client.continueScan(tokenData), + ) + : await withRetry(`${logPrefix} Begin scan`, () => + client.beginScan({ + itemTypes: [ + 'LoadoutShare', + 'ItemAnnotation', + 'ItemHashTag', + 'Loadout', + 'Search', + 'Triumph', + 'Settings', + ], + totalSegments, + segmentIndex, + }), + ); -async function flushProfileIds(force = false) { - if (!force && profileIds.size < profileBatchSize) { - return; - } - if (profileIds.size === 0) { - return; - } + const profileIds = new Set(); + const settingsQueue: Settings[] = []; + const shareQueue: { + loadout: Loadout; + viewCount: number; + platformMembershipId: string; + shareId: string; + }[] = []; + + async function flushProfileIds(force = false) { + if (!force && profileIds.size < profileBatchSize) { + return; + } + if (profileIds.size === 0) { + return; + } - console.log('Backfilling migration states for', profileIds.size, 'profiles...'); - const items = [...profileIds]; - await withRetry('Backfill migration states', async () => { - await transaction(async (pgClient) => { - for (const profileId of items) { - await backfillMigrationState(pgClient, profileId.toString(), undefined); - } + console.log(logPrefix, 'Backfilling migration states for', profileIds.size, 'profiles...'); + const items = [...profileIds]; + await withRetry(`${logPrefix} Backfill migration states`, async () => { + await transaction(async (pgClient) => { + for (const profileId of items) { + await backfillMigrationState(pgClient, profileId.toString(), undefined); + } + }); }); - }); - profileIds.clear(); - console.log('Done'); -} - -async function flushSettingsQueue(force = false) { - if (!force && settingsQueue.length < settingsBatchSize) { - return; - } - if (settingsQueue.length === 0) { - return; + profileIds.clear(); + console.log(logPrefix, 'Done'); } - console.log('Backfilling settings for', settingsQueue.length, 'users...'); - const batch = [...settingsQueue]; - await withRetry('Backfill settings', async () => { - await transaction(async (pgClient) => { - for (const settings of batch) { - await replaceSettingsIfNotPresent( - pgClient, - Number(settings.memberId), - subtractObject(convertToDimSettings(settings), defaultSettings), - ); - } - }); - }); + async function flushSettingsQueue(force = false) { + if (!force && settingsQueue.length < settingsBatchSize) { + return; + } + if (settingsQueue.length === 0) { + return; + } - await withRetry('Delete migrated settings from Stately', async () => { - await client.del(...batch.map((s) => settingsKey(Number(s.memberId)))); - }); + console.log(logPrefix, 'Backfilling settings for', settingsQueue.length, 'users...'); + const batch = [...settingsQueue]; + await withRetry(`${logPrefix} Backfill settings`, async () => { + await transaction(async (pgClient) => { + for (const settings of batch) { + await replaceSettingsIfNotPresent( + pgClient, + Number(settings.memberId), + subtractObject(convertToDimSettings(settings), defaultSettings), + ); + } + }); + }); - settingsQueue.splice(0, batch.length); - console.log('Done'); -} + await withRetry(`${logPrefix} Delete migrated settings from Stately`, async () => { + await client.del(...batch.map((s) => settingsKey(Number(s.memberId)))); + }); -async function flushShareQueue(force = false) { - if (!force && shareQueue.length < shareBatchSize) { - return; - } - if (shareQueue.length === 0) { - return; + settingsQueue.splice(0, batch.length); + console.log(logPrefix, 'Done'); } - console.log('Backfilling', shareQueue.length, 'loadout shares...'); - const batch = [...shareQueue]; - - await withRetry('Backfill loadout shares', async () => { - await transaction(async (pgClient) => { - for (const share of batch) { - const inserted = await addLoadoutShareIfNotPresent( - pgClient, - undefined, - share.platformMembershipId, - share.shareId, - share.loadout, - share.viewCount, - ); - if (!inserted) { - console.log('Loadout share collision ignoring', share.shareId); + async function flushShareQueue(force = false) { + if (!force && shareQueue.length < shareBatchSize) { + return; + } + if (shareQueue.length === 0) { + return; + } + + console.log(logPrefix, 'Backfilling', shareQueue.length, 'loadout shares...'); + const batch = [...shareQueue]; + + await withRetry(`${logPrefix} Backfill loadout shares`, async () => { + await transaction(async (pgClient) => { + for (const share of batch) { + const inserted = await addLoadoutShareIfNotPresent( + pgClient, + undefined, + share.platformMembershipId, + share.shareId, + share.loadout, + share.viewCount, + ); + if (!inserted) { + console.log(logPrefix, 'Loadout share collision ignoring', share.shareId); + } } - } + }); }); - }); - await withRetry('Delete migrated loadout shares from Stately', async () => { - await client.del(...batch.map((s) => keyForLoadoutShare(s.shareId))); - }); + await withRetry(`${logPrefix} Delete migrated loadout shares from Stately`, async () => { + await client.del(...batch.map((s) => keyForLoadoutShare(s.shareId))); + }); - shareQueue.splice(0, batch.length); - console.log('Done'); -} + shareQueue.splice(0, batch.length); + console.log(logPrefix, 'Done'); + } -let scanComplete = false; + let scanComplete = false; -try { while (true) { try { for await (const item of list) { @@ -369,12 +409,12 @@ try { const waitMs = retryDelayMs(error, 1); const message = error instanceof Error ? error.message : String(error); console.log( - `Scan iteration failed with retryable error. Waiting ${waitMs}ms before continuing scan.`, + `${logPrefix} Scan iteration failed with retryable error. Waiting ${waitMs}ms before continuing scan.`, message, ); await delay(waitMs); - list = await withRetry('Continue scan after retryable scan failure', () => + list = await withRetry(`${logPrefix} Continue scan after retryable scan failure`, () => client.continueScan(token), ); continue; @@ -382,20 +422,20 @@ try { const token = list.token; if (!token) { - console.log('Scan token missing, ending scan loop.'); + console.log(logPrefix, 'Scan token missing, ending scan loop.'); break; } await fs.writeFile(tokenPath, token.tokenData); if (!token.canContinue) { - console.log('Scan complete.'); + console.log(logPrefix, 'Scan complete.'); scanComplete = true; break; } await delay(continuationDelayMs); - console.log('Continuing scan...'); - list = await withRetry('Continue scan', () => client.continueScan(token)); + console.log(logPrefix, 'Continuing scan...'); + list = await withRetry(`${logPrefix} Continue scan`, () => client.continueScan(token)); } await flushProfileIds(true); @@ -405,6 +445,31 @@ try { if (scanComplete) { await fs.unlink(tokenPath).catch(() => undefined); } + + if (workerCount > 1) { + console.log(logPrefix, 'Worker complete.'); + } +} + +try { + const totalSegments = configuredTotalSegments ?? configuredParallelSegments; + + if (configuredSegmentIndex !== undefined) { + if (configuredTotalSegments === undefined) { + throw new Error('BACKFILL_TOTAL_SEGMENTS must be set when BACKFILL_SEGMENT_INDEX is set'); + } + if (configuredSegmentIndex >= configuredTotalSegments) { + throw new Error('BACKFILL_SEGMENT_INDEX must be less than BACKFILL_TOTAL_SEGMENTS'); + } + await runSegment(configuredSegmentIndex, configuredTotalSegments, 1); + } else if (totalSegments <= 1) { + await runSegment(0, 1, 1); + } else { + const workers = Array.from({ length: totalSegments }, (_, segmentIndex) => + runSegment(segmentIndex, totalSegments, totalSegments), + ); + await Promise.all(workers); + } } finally { await closeDbPool(); } diff --git a/kubernetes/README.md b/kubernetes/README.md index 79efe644..f12da557 100644 --- a/kubernetes/README.md +++ b/kubernetes/README.md @@ -27,6 +27,17 @@ Run a one-time migration scan from StatelyDB into Postgres with The job is resumable across pod restarts by storing scan token state in a PVC. +One-pod parallelization options: + +- `BACKFILL_PARALLEL_SEGMENTS`: Number of segment workers to run concurrently in one pod. +- `BACKFILL_TOTAL_SEGMENTS`: Optional explicit total segment count (defaults to `BACKFILL_PARALLEL_SEGMENTS`). +- `BACKFILL_SEGMENT_INDEX`: Optional explicit segment index for single-segment worker mode. + +For one-pod parallel mode, set only `BACKFILL_PARALLEL_SEGMENTS` to the desired worker count and leave +`BACKFILL_SEGMENT_INDEX` unset. + +Segment workers automatically use segment-specific token files derived from `BACKFILL_TOKEN_PATH`. + Required environment inputs: - `dim-api-config` ConfigMap (same values used by API deployment) diff --git a/kubernetes/dim-api-stately-backfill-job.yaml b/kubernetes/dim-api-stately-backfill-job.yaml index 2075b65e..836d42c4 100644 --- a/kubernetes/dim-api-stately-backfill-job.yaml +++ b/kubernetes/dim-api-stately-backfill-job.yaml @@ -45,6 +45,8 @@ spec: env: - name: NODE_ENV value: production + - name: BACKFILL_PARALLEL_SEGMENTS + value: '4' - name: BACKFILL_TOKEN_PATH value: /var/lib/dim-api-backfill/backfill-token.bin - name: BACKFILL_RETRY_MAX_ATTEMPTS From fd0eca5254fa804dec2e3bdb017faf65f0ed9c2a Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Thu, 23 Apr 2026 22:56:05 -0700 Subject: [PATCH 08/12] Continue --- api/stately/init/stately-backfill.ts | 70 ++++++++++++++++++++-------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/api/stately/init/stately-backfill.ts b/api/stately/init/stately-backfill.ts index 7e2152b0..4f1a36d4 100644 --- a/api/stately/init/stately-backfill.ts +++ b/api/stately/init/stately-backfill.ts @@ -37,6 +37,7 @@ const retryMaxAttempts = parseNumberEnv('BACKFILL_RETRY_MAX_ATTEMPTS', 12); const retryBaseDelayMs = parseNumberEnv('BACKFILL_RETRY_BASE_DELAY_MS', 1000); const retryMaxDelayMs = parseNumberEnv('BACKFILL_RETRY_MAX_DELAY_MS', 30000); const statelyThrottleMinDelayMs = parseNumberEnv('BACKFILL_STATELY_THROTTLE_DELAY_MS', 15000); +const retryThrottlingForever = (process.env.BACKFILL_RETRY_THROTTLING_FOREVER ?? 'true') !== 'false'; const configuredParallelSegments = parseNumberEnv('BACKFILL_PARALLEL_SEGMENTS', 1); const configuredTotalSegments = process.env.BACKFILL_TOTAL_SEGMENTS ? parseNumberEnv('BACKFILL_TOTAL_SEGMENTS', 1) @@ -232,14 +233,17 @@ async function withRetry(name: string, fn: () => T | Promise): Promise try { return await fn(); } catch (error) { - if (!isRetryableError(error) || attempt >= retryMaxAttempts) { + const retryable = isRetryableError(error); + const throttleRetry = retryThrottlingForever && isStatelyThroughputError(error); + if (!retryable || (!throttleRetry && attempt >= retryMaxAttempts)) { throw error; } const waitMs = retryDelayMs(error, attempt); const message = error instanceof Error ? error.message : String(error); + const attemptLabel = throttleRetry ? `${attempt}/unbounded` : `${attempt}/${retryMaxAttempts}`; console.log( - `${name} failed with retryable error (attempt ${attempt}/${retryMaxAttempts}). Waiting ${waitMs}ms before retry.`, + `${name} failed with retryable error (attempt ${attemptLabel}). Waiting ${waitMs}ms before retry.`, message, ); await delay(waitMs); @@ -248,6 +252,15 @@ async function withRetry(name: string, fn: () => T | Promise): Promise } type ScanList = ReturnType; +const scanItemTypes: NonNullable[0]>['itemTypes'] = [ + 'LoadoutShare', + 'ItemAnnotation', + 'ItemHashTag', + 'Loadout', + 'Search', + 'Triumph', + 'Settings', +]; async function runSegment(segmentIndex: number, totalSegments: number, workerCount: number) { const tokenPath = tokenPathForSegment(configuredTokenPath, segmentIndex, totalSegments); @@ -262,15 +275,7 @@ async function runSegment(segmentIndex: number, totalSegments: number, workerCou ) : await withRetry(`${logPrefix} Begin scan`, () => client.beginScan({ - itemTypes: [ - 'LoadoutShare', - 'ItemAnnotation', - 'ItemHashTag', - 'Loadout', - 'Search', - 'Triumph', - 'Settings', - ], + itemTypes: scanItemTypes, totalSegments, segmentIndex, }), @@ -400,22 +405,47 @@ async function runSegment(segmentIndex: number, totalSegments: number, workerCou throw error; } + const waitMs = retryDelayMs(error, 1); + const message = error instanceof Error ? error.message : String(error); const token = list.token; - if (!token) { - throw error; + if (token) { + await fs.writeFile(tokenPath, token.tokenData); + console.log( + `${logPrefix} Scan iteration failed with retryable error. Waiting ${waitMs}ms before continuing scan.`, + message, + ); + await delay(waitMs); + + list = await withRetry(`${logPrefix} Continue scan after retryable scan failure`, () => + client.continueScan(token), + ); + continue; + } + + const persistedTokenData = await fs.readFile(tokenPath).catch(() => null); + if (persistedTokenData) { + console.log( + `${logPrefix} Retryable scan failure without in-memory token. Waiting ${waitMs}ms before resuming from persisted token.`, + message, + ); + await delay(waitMs); + list = await withRetry(`${logPrefix} Continue scan from persisted token after retryable scan failure`, () => + client.continueScan(persistedTokenData), + ); + continue; } - await fs.writeFile(tokenPath, token.tokenData); - const waitMs = retryDelayMs(error, 1); - const message = error instanceof Error ? error.message : String(error); console.log( - `${logPrefix} Scan iteration failed with retryable error. Waiting ${waitMs}ms before continuing scan.`, + `${logPrefix} Retryable scan failure without token. Waiting ${waitMs}ms before restarting segment scan.`, message, ); await delay(waitMs); - - list = await withRetry(`${logPrefix} Continue scan after retryable scan failure`, () => - client.continueScan(token), + list = await withRetry(`${logPrefix} Restart segment scan after retryable scan failure`, () => + client.beginScan({ + itemTypes: scanItemTypes, + totalSegments, + segmentIndex, + }), ); continue; } From 027ad8cbeda40295fb20510c88caf9b1547858c7 Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Sat, 25 Apr 2026 23:11:55 -0700 Subject: [PATCH 09/12] Compat --- api/db/loadout-share-queries.ts | 4 ++-- api/stately/init/stately-backfill.ts | 28 +++++++++++++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/api/db/loadout-share-queries.ts b/api/db/loadout-share-queries.ts index 47a40b18..e4ee80ac 100644 --- a/api/db/loadout-share-queries.ts +++ b/api/db/loadout-share-queries.ts @@ -75,7 +75,7 @@ export async function addLoadoutShareIgnoring( shareId: string, loadout: Loadout, viewCount = 0, -): Promise { +): Promise { const response = await client.query({ name: 'add_loadout_share_ignoring', text: `insert into loadout_shares (id, membership_id, platform_membership_id, name, notes, class_type, items, parameters, view_count) @@ -96,7 +96,7 @@ values ($1, $2, $3, $4, $5, $6, $7, $8, $9) on conflict (id) do nothing`, ], }); - return response; + return (response.rowCount ?? 0) > 1; } /** diff --git a/api/stately/init/stately-backfill.ts b/api/stately/init/stately-backfill.ts index 4f1a36d4..85d5b442 100644 --- a/api/stately/init/stately-backfill.ts +++ b/api/stately/init/stately-backfill.ts @@ -1,8 +1,8 @@ -import fs from 'node:fs/promises'; import { keyPath, StatelyError } from '@stately-cloud/client'; +import fs from 'node:fs/promises'; import { DatabaseError } from 'pg-protocol'; import { closeDbPool, transaction } from '../../db/index.js'; -import { addLoadoutShareIfNotPresent } from '../../db/loadout-share-queries.js'; +import { addLoadoutShareIgnoring } from '../../db/loadout-share-queries.js'; import { backfillMigrationState } from '../../db/migration-state-queries.js'; import { getSettings, replaceSettings } from '../../db/settings-queries.js'; import { Loadout } from '../../shapes/loadouts.js'; @@ -37,7 +37,8 @@ const retryMaxAttempts = parseNumberEnv('BACKFILL_RETRY_MAX_ATTEMPTS', 12); const retryBaseDelayMs = parseNumberEnv('BACKFILL_RETRY_BASE_DELAY_MS', 1000); const retryMaxDelayMs = parseNumberEnv('BACKFILL_RETRY_MAX_DELAY_MS', 30000); const statelyThrottleMinDelayMs = parseNumberEnv('BACKFILL_STATELY_THROTTLE_DELAY_MS', 15000); -const retryThrottlingForever = (process.env.BACKFILL_RETRY_THROTTLING_FOREVER ?? 'true') !== 'false'; +const retryThrottlingForever = + (process.env.BACKFILL_RETRY_THROTTLING_FOREVER ?? 'true') !== 'false'; const configuredParallelSegments = parseNumberEnv('BACKFILL_PARALLEL_SEGMENTS', 1); const configuredTotalSegments = process.env.BACKFILL_TOTAL_SEGMENTS ? parseNumberEnv('BACKFILL_TOTAL_SEGMENTS', 1) @@ -111,7 +112,11 @@ function parseNonNegativeIntEnv(envName: string): number | undefined { return parsed; } -function tokenPathForSegment(basePath: string, segmentIndex: number, totalSegments: number): string { +function tokenPathForSegment( + basePath: string, + segmentIndex: number, + totalSegments: number, +): string { if (totalSegments <= 1) { return basePath; } @@ -129,7 +134,9 @@ function getNestedStatus(error: RetryableError): number | undefined { return error.status ?? error.statusCode ?? error.response?.status; } -function getHeaders(error: RetryableError): Record | undefined { +function getHeaders( + error: RetryableError, +): Record | undefined { return error.headers ?? error.response?.headers; } @@ -241,7 +248,9 @@ async function withRetry(name: string, fn: () => T | Promise): Promise const waitMs = retryDelayMs(error, attempt); const message = error instanceof Error ? error.message : String(error); - const attemptLabel = throttleRetry ? `${attempt}/unbounded` : `${attempt}/${retryMaxAttempts}`; + const attemptLabel = throttleRetry + ? `${attempt}/unbounded` + : `${attempt}/${retryMaxAttempts}`; console.log( `${name} failed with retryable error (attempt ${attemptLabel}). Waiting ${waitMs}ms before retry.`, message, @@ -355,7 +364,7 @@ async function runSegment(segmentIndex: number, totalSegments: number, workerCou await withRetry(`${logPrefix} Backfill loadout shares`, async () => { await transaction(async (pgClient) => { for (const share of batch) { - const inserted = await addLoadoutShareIfNotPresent( + const inserted = await addLoadoutShareIgnoring( pgClient, undefined, share.platformMembershipId, @@ -429,8 +438,9 @@ async function runSegment(segmentIndex: number, totalSegments: number, workerCou message, ); await delay(waitMs); - list = await withRetry(`${logPrefix} Continue scan from persisted token after retryable scan failure`, () => - client.continueScan(persistedTokenData), + list = await withRetry( + `${logPrefix} Continue scan from persisted token after retryable scan failure`, + () => client.continueScan(persistedTokenData), ); continue; } From c11fcffcd74eecfe0ff3be9904be832588a610b0 Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Tue, 28 Apr 2026 20:51:31 -0700 Subject: [PATCH 10/12] Migration script --- api/db/item-annotations-queries.ts | 4 +- api/db/item-hash-tags-queries.ts | 4 +- api/db/loadouts-queries.ts | 4 +- api/db/migration-state-queries.ts | 77 +++++++-- api/db/searches-queries.ts | 12 +- api/db/triumphs-queries.ts | 4 +- .../20260426062237-nullable-membership.js | 45 +++++ .../init/migrate-stately-to-postgres.ts | 156 ++++++++++++++++++ kubernetes/README.md | 41 ++++- kubernetes/deploy-migration-worker.sh | 17 ++ .../dim-api-migration-worker-deployment.yaml | 56 +++++++ package.json | 1 + 12 files changed, 390 insertions(+), 31 deletions(-) create mode 100644 api/migrations/20260426062237-nullable-membership.js create mode 100644 api/stately/init/migrate-stately-to-postgres.ts create mode 100755 kubernetes/deploy-migration-worker.sh create mode 100644 kubernetes/dim-api-migration-worker-deployment.yaml diff --git a/api/db/item-annotations-queries.ts b/api/db/item-annotations-queries.ts index 0d2054d9..fb5c7df3 100644 --- a/api/db/item-annotations-queries.ts +++ b/api/db/item-annotations-queries.ts @@ -82,7 +82,7 @@ function convertItemAnnotation(row: ItemAnnotationRow): ItemAnnotation { */ export async function updateItemAnnotation( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, destinyVersion: DestinyVersion, itemAnnotation: ItemAnnotation, @@ -135,7 +135,7 @@ export async function updateItemAnnotation( END) `, values: [ - bungieMembershipId, // $1 + bungieMembershipId ?? null, // $1 platformMembershipId, // $2 destinyVersion, // $3 itemAnnotation.id, // $4 diff --git a/api/db/item-hash-tags-queries.ts b/api/db/item-hash-tags-queries.ts index 99c00c6c..74d235a3 100644 --- a/api/db/item-hash-tags-queries.ts +++ b/api/db/item-hash-tags-queries.ts @@ -66,7 +66,7 @@ function convertItemHashTag(row: ItemHashTagRow): ItemHashTag { */ export async function updateItemHashTag( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, itemHashTag: ItemHashTag, ): Promise { @@ -114,7 +114,7 @@ export async function updateItemHashTag( END) `, values: [ - bungieMembershipId, + bungieMembershipId ?? null, platformMembershipId, itemHashTag.hash, tagValue === null ? null : TagValueEnum[tagValue], diff --git a/api/db/loadouts-queries.ts b/api/db/loadouts-queries.ts index 3ab4ae6f..2b88079d 100644 --- a/api/db/loadouts-queries.ts +++ b/api/db/loadouts-queries.ts @@ -79,7 +79,7 @@ export function convertLoadout(row: LoadoutRow): Loadout { */ export async function updateLoadout( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, destinyVersion: DestinyVersion, loadout: Loadout, @@ -101,7 +101,7 @@ do update set created_at = CASE WHEN loadouts.deleted_at IS NOT NULL THEN now() ELSE loadouts.created_at END`, values: [ loadout.id, - bungieMembershipId, + bungieMembershipId ?? null, platformMembershipId, destinyVersion, loadout.name, diff --git a/api/db/migration-state-queries.ts b/api/db/migration-state-queries.ts index 02a0fe77..191e82e5 100644 --- a/api/db/migration-state-queries.ts +++ b/api/db/migration-state-queries.ts @@ -13,15 +13,21 @@ export const enum MigrationState { export interface MigrationStateInfo { platformMembershipId: string; - bungieMembershipId: number; + bungieMembershipId: number | undefined; state: MigrationState; lastStateChangeAt: number; attemptCount: number; lastError?: string; } +export interface MigrationWorkItem { + platformMembershipId: string; + bungieMembershipId: number | undefined; + attemptCount: number; +} + interface MigrationStateRow { - membership_id: number; + membership_id: number | null; platform_membership_id: string; state: number; last_state_change_at: Date; @@ -45,12 +51,54 @@ on conflict (platform_membership_id) do update set state = migration_state.state return result.rows[0].state; } -export async function getUsersToMigrate(client: ClientBase): Promise { +export async function getUsersToMigrate(client: ClientBase): Promise { const results = await client.query({ name: 'get_users_to_migrate', - text: 'select membership_id from migration_state where state != 3 limit 1000', + text: 'select platform_membership_id from migration_state where state != 3 limit 1000', }); - return results.rows.map((row) => row.membership_id); + return results.rows.map((row) => row.platform_membership_id); +} + +export async function claimMigrationWork( + client: ClientBase, + batchSize: number, +): Promise { + const results = await client.query<{ + platform_membership_id: string; + membership_id: number | null; + attempt_count: number; + }>({ + name: 'claim_migration_work', + text: `with candidates as ( + select platform_membership_id + from migration_state + where state = $1 + and attempt_count < $2 + order by last_state_change_at asc + limit $3 + ) + update migration_state + set state = $4, + attempt_count = migration_state.attempt_count + 1, + last_state_change_at = current_timestamp + from candidates + where migration_state.platform_membership_id = candidates.platform_membership_id + and migration_state.state = $1 + and migration_state.attempt_count < $2 + returning migration_state.platform_membership_id, migration_state.membership_id, migration_state.attempt_count`, + values: [ + MigrationState.Stately, + MAX_MIGRATION_ATTEMPTS, + batchSize, + MigrationState.MigratingToPostgres, + ], + }); + + return results.rows.map((row) => ({ + platformMembershipId: row.platform_membership_id, + bungieMembershipId: row.membership_id ?? undefined, + attemptCount: row.attempt_count, + })); } export async function getMigrationState( @@ -66,7 +114,7 @@ export async function getMigrationState( return convert(results.rows[0]); } else { return { - bungieMembershipId: 0, + bungieMembershipId: undefined, platformMembershipId, state: MigrationState.Stately, lastStateChangeAt: 0, @@ -77,7 +125,7 @@ export async function getMigrationState( function convert(row: MigrationStateRow): MigrationStateInfo { return { - bungieMembershipId: row.membership_id, + bungieMembershipId: row.membership_id ?? undefined, platformMembershipId: row.platform_membership_id, state: row.state, lastStateChangeAt: row.last_state_change_at.getTime(), @@ -88,7 +136,7 @@ function convert(row: MigrationStateRow): MigrationStateInfo { export function startMigrationToPostgres( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, ): Promise { return updateMigrationState( @@ -103,7 +151,7 @@ export function startMigrationToPostgres( export function finishMigrationToPostgres( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, ): Promise { return updateMigrationState( @@ -118,7 +166,7 @@ export function finishMigrationToPostgres( export function abortMigrationToPostgres( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, err: string, ): Promise { @@ -135,7 +183,7 @@ export function abortMigrationToPostgres( async function updateMigrationState( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, state: MigrationState, expectedState: MigrationState, @@ -149,13 +197,14 @@ async function updateMigrationState( on conflict (platform_membership_id) do update set state = $3, + membership_id = coalesce($2, migration_state.membership_id), last_state_change_at = current_timestamp, attempt_count = migration_state.attempt_count + $4, last_error = $5 where migration_state.state = $6`, values: [ platformMembershipId, - bungieMembershipId, + bungieMembershipId ?? null, state, incrementAttempt ? 1 : 0, err ?? null, @@ -186,7 +235,7 @@ export async function deleteMigrationState( export async function setMigrationStateForTest( client: ClientBase, platformMembershipId: string, - bungieMembershipId: number, + bungieMembershipId: number | undefined, state: MigrationState, ): Promise { await client.query({ @@ -194,7 +243,7 @@ export async function setMigrationStateForTest( VALUES ($1, $2, $3) ON CONFLICT (platform_membership_id) DO UPDATE SET state = $3, last_state_change_at = NOW()`, - values: [platformMembershipId, bungieMembershipId, state], + values: [platformMembershipId, bungieMembershipId ?? null, state], }); } diff --git a/api/db/searches-queries.ts b/api/db/searches-queries.ts index 502b4cf3..b6a40ae7 100644 --- a/api/db/searches-queries.ts +++ b/api/db/searches-queries.ts @@ -75,7 +75,7 @@ function convertSearch(row: SearchRow): Search { */ export async function updateUsedSearch( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, destinyVersion: DestinyVersion, query: string, @@ -90,7 +90,7 @@ do update set usage_count = CASE WHEN searches.deleted_at IS NOT NULL THEN 1 ELSE searches.usage_count + 1 END, last_used = current_timestamp, deleted_at = null`, - values: [bungieMembershipId, platformMembershipId, destinyVersion, query, type], + values: [bungieMembershipId ?? null, platformMembershipId, destinyVersion, query, type], }); if (response.rowCount! < 1) { @@ -107,7 +107,7 @@ do update set */ export async function saveSearch( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, destinyVersion: DestinyVersion, query: string, @@ -123,7 +123,7 @@ DO UPDATE SET saved = $6, usage_count = CASE WHEN searches.deleted_at IS NOT NULL THEN 1 ELSE searches.usage_count END, deleted_at = null`, - values: [bungieMembershipId, platformMembershipId, destinyVersion, query, type, saved], + values: [bungieMembershipId ?? null, platformMembershipId, destinyVersion, query, type, saved], }); } @@ -132,7 +132,7 @@ DO UPDATE SET */ export async function importSearch( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, destinyVersion: DestinyVersion, query: string, @@ -148,7 +148,7 @@ values ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (platform_membership_id, destiny_version, qhash) DO UPDATE SET saved = $5, usage_count = $7, last_used = $8, deleted_at = null`, values: [ - bungieMembershipId, + bungieMembershipId ?? null, platformMembershipId, destinyVersion, query, diff --git a/api/db/triumphs-queries.ts b/api/db/triumphs-queries.ts index a9e4543d..a8a2514c 100644 --- a/api/db/triumphs-queries.ts +++ b/api/db/triumphs-queries.ts @@ -44,7 +44,7 @@ export async function syncTrackedTriumphsForProfile( */ export async function trackTriumph( client: ClientBase, - bungieMembershipId: number, + bungieMembershipId: number | undefined, platformMembershipId: string, recordHash: number, ): Promise { @@ -53,7 +53,7 @@ export async function trackTriumph( text: `insert INTO tracked_triumphs (membership_id, platform_membership_id, record_hash) values ($1, $2, $3) on conflict (platform_membership_id, record_hash) do update set deleted_at = null, membership_id = $1`, - values: [bungieMembershipId, platformMembershipId, recordHash], + values: [bungieMembershipId ?? null, platformMembershipId, recordHash], }); return response; diff --git a/api/migrations/20260426062237-nullable-membership.js b/api/migrations/20260426062237-nullable-membership.js new file mode 100644 index 00000000..5ee9cb57 --- /dev/null +++ b/api/migrations/20260426062237-nullable-membership.js @@ -0,0 +1,45 @@ +'use strict'; + +var dbm; +var type; +var seed; + +/** + * We receive the dbmigrate dependency from dbmigrate initially. + * This enables us to not have to rely on NODE_PATH. + */ +exports.setup = function (options, seedLink) { + dbm = options.dbmigrate; + type = dbm.dataType; + seed = seedLink; +}; + +exports.up = function (db, callback) { + db.runSql( + `alter table loadouts alter column membership_id drop not null; +alter table item_annotations alter column membership_id drop not null; +alter table item_hash_tags alter column membership_id drop not null; +alter table tracked_triumphs alter column membership_id drop not null; +alter table searches alter column membership_id drop not null; +alter table loadout_shares alter column membership_id drop not null; +alter table migration_state alter column membership_id drop not null;`, + callback, + ); +}; + +exports.down = function (db, callback) { + db.runSql( + `alter table loadouts alter column membership_id set not null; +alter table item_annotations alter column membership_id set not null; +alter table item_hash_tags alter column membership_id set not null; +alter table tracked_triumphs alter column membership_id set not null; +alter table searches alter column membership_id set not null; +alter table loadout_shares alter column membership_id set not null; +alter table migration_state alter column membership_id set not null;`, + callback, + ); +}; + +exports._meta = { + version: 1, +}; diff --git a/api/stately/init/migrate-stately-to-postgres.ts b/api/stately/init/migrate-stately-to-postgres.ts new file mode 100644 index 00000000..ca6fd0cb --- /dev/null +++ b/api/stately/init/migrate-stately-to-postgres.ts @@ -0,0 +1,156 @@ +import { ClientBase } from 'pg'; +import { closeDbPool, transaction } from '../../db/index.js'; +import { updateItemAnnotation } from '../../db/item-annotations-queries.js'; +import { updateItemHashTag } from '../../db/item-hash-tags-queries.js'; +import { updateLoadout } from '../../db/loadouts-queries.js'; +import { + abortMigrationToPostgres, + claimMigrationWork, + finishMigrationToPostgres, +} from '../../db/migration-state-queries.js'; +import { importSearch } from '../../db/searches-queries.js'; +import { trackTriumph } from '../../db/triumphs-queries.js'; +import { extractImportData } from '../../routes/import.js'; +import { delay } from '../../utils.js'; +import { exportDataForProfile } from '../bulk-queries.js'; + +const workerBatchSize = parseNumberEnv('MIGRATION_WORKER_BATCH_SIZE', 25); +const idleDelayMs = parseNumberEnv('MIGRATION_WORKER_IDLE_DELAY_MS', 5000); +const betweenUsersDelayMs = parseNumberEnv('MIGRATION_WORKER_BETWEEN_USERS_DELAY_MS', 100); +const runOnce = (process.env.MIGRATION_WORKER_RUN_ONCE ?? 'false') === 'true'; + +function parseNumberEnv(envName: string, defaultValue: number): number { + const raw = process.env[envName]; + if (!raw) { + return defaultValue; + } + + const parsed = Number(raw); + if (!Number.isFinite(parsed) || parsed <= 0) { + throw new Error(`${envName} must be a positive number`); + } + + return parsed; +} + +function toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message.slice(0, 500); + } + return String(error).slice(0, 500); +} + +async function migrateOneClaimedUser( + pgClient: ClientBase, + bungieMembershipId: number | undefined, + platformMembershipId: string, +): Promise { + const exportResponse = await exportDataForProfile(platformMembershipId); + const { loadouts, itemAnnotations, triumphs, searches, itemHashTags } = + extractImportData(exportResponse); + + for (const loadoutData of loadouts) { + await updateLoadout( + pgClient, + bungieMembershipId, + loadoutData.platformMembershipId, + loadoutData.destinyVersion, + loadoutData, + ); + } + + for (const tagData of itemAnnotations) { + await updateItemAnnotation( + pgClient, + bungieMembershipId, + tagData.platformMembershipId, + tagData.destinyVersion, + tagData, + ); + } + + for (const hashTag of itemHashTags) { + await updateItemHashTag(pgClient, bungieMembershipId, platformMembershipId, hashTag); + } + + for (const triumphSet of triumphs) { + for (const recordHash of triumphSet.triumphs) { + await trackTriumph(pgClient, bungieMembershipId, triumphSet.platformMembershipId, recordHash); + } + } + + for (const searchData of searches) { + await importSearch( + pgClient, + bungieMembershipId, + platformMembershipId, + searchData.destinyVersion, + searchData.search.query, + searchData.search.saved, + searchData.search.lastUsage, + searchData.search.usageCount, + searchData.search.type, + ); + } +} + +let loopsWithoutWork = 0; + +try { + while (true) { + const claimed = await transaction((client) => claimMigrationWork(client, workerBatchSize)); + + if (claimed.length === 0) { + loopsWithoutWork += 1; + if (loopsWithoutWork % 12 === 1) { + console.log('No migration work available, waiting...'); + } + if (runOnce) { + break; + } + await delay(idleDelayMs); + continue; + } + + loopsWithoutWork = 0; + console.log(`Claimed ${claimed.length} migration record(s)`); + + for (const workItem of claimed) { + const { platformMembershipId, bungieMembershipId, attemptCount } = workItem; + console.log( + `Migrating ${platformMembershipId} (bungie=${String(bungieMembershipId)}, attempt=${attemptCount})`, + ); + + try { + await transaction(async (pgClient) => { + await migrateOneClaimedUser(pgClient, bungieMembershipId, platformMembershipId); + }); + + await transaction(async (pgClient) => { + await finishMigrationToPostgres(pgClient, bungieMembershipId, platformMembershipId); + }); + + console.log(`Migration finished for ${platformMembershipId}`); + } catch (error) { + const errorMessage = toErrorMessage(error); + console.error(`Migration failed for ${platformMembershipId}:`, errorMessage); + await transaction(async (pgClient) => { + await abortMigrationToPostgres( + pgClient, + bungieMembershipId, + platformMembershipId, + errorMessage, + ); + }); + } + + await delay(betweenUsersDelayMs); + } + + if (runOnce) { + break; + } + } +} finally { + await closeDbPool(); +} diff --git a/kubernetes/README.md b/kubernetes/README.md index f12da557..65c4f955 100644 --- a/kubernetes/README.md +++ b/kubernetes/README.md @@ -42,9 +42,9 @@ Required environment inputs: - `dim-api-config` ConfigMap (same values used by API deployment) - `dim-api-secret` keys: - - `pg_password` - - `stately_access_key` - - `stately_store_id` + - `pg_password` + - `stately_access_key` + - `stately_store_id` Build and push image: @@ -74,3 +74,38 @@ If re-running after completion, delete and recreate the job: kubectl delete job dim-api-stately-backfill kubectl apply -f deployment/dim-api-stately-backfill-job.yaml ``` + +## Stately to Postgres Migration Worker + +Run continuous workers that claim `migration_state` rows in `Stately` state and +migrate each claimed profile to Postgres. + +Use `dim-api-migration-worker-deployment.yaml`. + +This is a Deployment (not a Job) so you can scale replicas for parallelism. Each +replica safely claims work via optimistic state transitions in `migration_state` +(`Stately` -> `MigratingToPostgres`). + +Worker environment knobs: + +- `MIGRATION_WORKER_BATCH_SIZE`: Number of rows to claim per poll (default `25`). +- `MIGRATION_WORKER_IDLE_DELAY_MS`: Sleep delay when no work is available (default `5000`). +- `MIGRATION_WORKER_BETWEEN_USERS_DELAY_MS`: Delay between processing users in a claimed batch (default `100`). + +Apply the worker deployment: + +```sh +pnpm run deploy:migration-worker +``` + +Scale workers: + +```sh +kubectl scale deployment dim-api-migration-worker --replicas=4 +``` + +Inspect worker logs: + +```sh +kubectl logs -f deployment/dim-api-migration-worker +``` diff --git a/kubernetes/deploy-migration-worker.sh b/kubernetes/deploy-migration-worker.sh new file mode 100755 index 00000000..47be273c --- /dev/null +++ b/kubernetes/deploy-migration-worker.sh @@ -0,0 +1,17 @@ +#!/bin/bash -ex + +ROOT=$(git rev-parse --show-toplevel) +COMMITHASH=${GITHUB_SHA:-$(git rev-parse HEAD)} +IMAGE="destinyitemmanager/dim-api:$COMMITHASH" + +rm -rf dist && pnpm build:api && docker buildx build --platform linux/amd64 --push -t "$IMAGE" "$ROOT" + +mkdir -p "$ROOT/deployment" +cp "$ROOT/kubernetes/dim-api-migration-worker-deployment.yaml" "$ROOT/deployment" + +sed -i'' -e "s/\$COMMITHASH/$COMMITHASH/" "$ROOT/deployment/dim-api-migration-worker-deployment.yaml" + +kubectl apply -f "$ROOT/deployment/dim-api-migration-worker-deployment.yaml" +kubectl rollout status deployment/dim-api-migration-worker + +rm -rf "$ROOT/deployment" diff --git a/kubernetes/dim-api-migration-worker-deployment.yaml b/kubernetes/dim-api-migration-worker-deployment.yaml new file mode 100644 index 00000000..d0f8fb63 --- /dev/null +++ b/kubernetes/dim-api-migration-worker-deployment.yaml @@ -0,0 +1,56 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dim-api-migration-worker + labels: + app: dim-api-migration-worker +spec: + replicas: 2 + selector: + matchLabels: + app: dim-api-migration-worker + template: + metadata: + labels: + app: dim-api-migration-worker + annotations: + cluster-autoscaler.kubernetes.io/safe-to-evict: 'true' + spec: + restartPolicy: Always + containers: + - name: dim-api-migration-worker + image: destinyitemmanager/dim-api:$COMMITHASH + imagePullPolicy: IfNotPresent + command: ['node'] + args: + - --enable-source-maps + - api/stately/init/migrate-stately-to-postgres.js + env: + - name: NODE_ENV + value: production + - name: MIGRATION_WORKER_BATCH_SIZE + value: '25' + - name: MIGRATION_WORKER_IDLE_DELAY_MS + value: '5000' + - name: MIGRATION_WORKER_BETWEEN_USERS_DELAY_MS + value: '100' + - name: PGPASSWORD + valueFrom: + secretKeyRef: + name: dim-api-secret + key: pg_password + - name: STATELY_ACCESS_KEY + valueFrom: + secretKeyRef: + name: dim-api-secret + key: stately_access_key + envFrom: + - configMapRef: + name: dim-api-config + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: '1' + memory: 1Gi diff --git a/package.json b/package.json index e9a8fb05..71e8cf62 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "build:api": "mkdir -p ./dist/api/stately/generated && cp -r ./api/stately/generated/*.js ./dist/api/stately/generated && tsc -p ./api/tsconfig.json", "deploy": "kubernetes/deploy.sh", "deploy:stately-backfill": "kubernetes/deploy-stately-backfill-job.sh", + "deploy:migration-worker": "kubernetes/deploy-migration-worker.sh", "docker:build": "rm -rf dist && pnpm build:api && docker build -t destinyitemmanager/dim-api .", "docker:run": "docker run -p 3000:3000 destinyitemmanager/dim-api:latest", "docker:push": "docker push destinyitemmanager/dim-api:latest", From d7dc975d363983b9d73009703b7c6303e4098a5a Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Tue, 28 Apr 2026 21:03:50 -0700 Subject: [PATCH 11/12] Handle stately shenanigans --- api/db/migration-state-queries.ts | 1 + .../init/migrate-stately-to-postgres.ts | 216 ++++++++++++++++-- 2 files changed, 198 insertions(+), 19 deletions(-) diff --git a/api/db/migration-state-queries.ts b/api/db/migration-state-queries.ts index 191e82e5..0419ac90 100644 --- a/api/db/migration-state-queries.ts +++ b/api/db/migration-state-queries.ts @@ -73,6 +73,7 @@ export async function claimMigrationWork( select platform_membership_id from migration_state where state = $1 + and platform_membership_id = '4611686018433092312' and attempt_count < $2 order by last_state_change_at asc limit $3 diff --git a/api/stately/init/migrate-stately-to-postgres.ts b/api/stately/init/migrate-stately-to-postgres.ts index ca6fd0cb..b9e5b3c0 100644 --- a/api/stately/init/migrate-stately-to-postgres.ts +++ b/api/stately/init/migrate-stately-to-postgres.ts @@ -1,4 +1,6 @@ +import { StatelyError } from '@stately-cloud/client'; import { ClientBase } from 'pg'; +import { DatabaseError } from 'pg-protocol'; import { closeDbPool, transaction } from '../../db/index.js'; import { updateItemAnnotation } from '../../db/item-annotations-queries.js'; import { updateItemHashTag } from '../../db/item-hash-tags-queries.js'; @@ -16,9 +18,177 @@ import { exportDataForProfile } from '../bulk-queries.js'; const workerBatchSize = parseNumberEnv('MIGRATION_WORKER_BATCH_SIZE', 25); const idleDelayMs = parseNumberEnv('MIGRATION_WORKER_IDLE_DELAY_MS', 5000); -const betweenUsersDelayMs = parseNumberEnv('MIGRATION_WORKER_BETWEEN_USERS_DELAY_MS', 100); +const retryMaxAttempts = parseNumberEnv('MIGRATION_RETRY_MAX_ATTEMPTS', 12); +const retryBaseDelayMs = parseNumberEnv('MIGRATION_RETRY_BASE_DELAY_MS', 1000); +const retryMaxDelayMs = parseNumberEnv('MIGRATION_RETRY_MAX_DELAY_MS', 30000); +const statelyThrottleMinDelayMs = parseNumberEnv('MIGRATION_STATELY_THROTTLE_DELAY_MS', 15000); +const retryThrottlingForever = + (process.env.MIGRATION_RETRY_THROTTLING_FOREVER ?? 'true') !== 'false'; const runOnce = (process.env.MIGRATION_WORKER_RUN_ONCE ?? 'false') === 'true'; +interface RetryableError { + code?: string; + statelyCode?: string; + status?: number; + statusCode?: number; + headers?: Record; + response?: { + status?: number; + headers?: Record; + }; + cause?: unknown; + message?: string; +} + +function isStatelyThroughputError(error: unknown): boolean { + if (error instanceof StatelyError) { + return ( + error.statelyCode === 'StoreThroughputExceeded' || + error.statelyCode === 'StoreRequestLimitExceeded' + ); + } + + if (!error || typeof error !== 'object') { + return false; + } + + const retryable = error as RetryableError; + if ( + retryable.statelyCode === 'StoreThroughputExceeded' || + retryable.statelyCode === 'StoreRequestLimitExceeded' + ) { + return true; + } + + return retryable.cause ? isStatelyThroughputError(retryable.cause) : false; +} + +function getNestedStatus(error: RetryableError): number | undefined { + return error.status ?? error.statusCode ?? error.response?.status; +} + +function getHeaders( + error: RetryableError, +): Record | undefined { + return error.headers ?? error.response?.headers; +} + +function getRetryAfterMs(error: RetryableError): number | undefined { + const headers = getHeaders(error); + const retryAfter = headers?.['retry-after'] ?? headers?.['Retry-After']; + if (retryAfter === undefined) { + return undefined; + } + + const retryAfterSeconds = Number(retryAfter); + if (Number.isFinite(retryAfterSeconds) && retryAfterSeconds > 0) { + return retryAfterSeconds * 1000; + } + + if (typeof retryAfter === 'string') { + const retryAt = Date.parse(retryAfter); + if (!Number.isNaN(retryAt)) { + return Math.max(0, retryAt - Date.now()); + } + } + + return undefined; +} + +function isRetryableError(error: unknown): boolean { + if (error instanceof StatelyError) { + return [ + 'StoreThroughputExceeded', + 'StoreRequestLimitExceeded', + 'StoreInUse', + 'ConcurrentModification', + 'CachedSchemaTooOld', + 'BackupsUnavailable', + ].includes(error.statelyCode); + } + + if (error instanceof DatabaseError) { + return error.code ? ['40001', '40P01', '53300', '57P03'].includes(error.code) : false; + } + + if (!error || typeof error !== 'object') { + return false; + } + + const retryable = error as RetryableError; + const status = getNestedStatus(retryable); + if (status === 429 || (status !== undefined && status >= 500)) { + return true; + } + + if ( + retryable.code && + ['ETIMEDOUT', 'ECONNRESET', 'ECONNREFUSED', 'EPIPE', 'ENOTFOUND'].includes(retryable.code) + ) { + return true; + } + + if (typeof retryable.message === 'string') { + const msg = retryable.message.toLowerCase(); + if ( + msg.includes('throttl') || + msg.includes('too many requests') || + msg.includes('rate limit') || + msg.includes('temporar') || + msg.includes('timeout') + ) { + return true; + } + } + + return retryable.cause ? isRetryableError(retryable.cause) : false; +} + +function retryDelayMs(error: unknown, attempt: number): number { + const forcedDelay = + error && typeof error === 'object' ? getRetryAfterMs(error as RetryableError) : undefined; + if (forcedDelay !== undefined) { + return Math.min(retryMaxDelayMs, Math.max(retryBaseDelayMs, forcedDelay)); + } + + const exponential = retryBaseDelayMs * 2 ** (attempt - 1); + const jitter = Math.floor(Math.random() * retryBaseDelayMs); + const retryDelay = exponential + jitter; + + if (isStatelyThroughputError(error)) { + return Math.min(retryMaxDelayMs, Math.max(statelyThrottleMinDelayMs, retryDelay)); + } + + return Math.min(retryMaxDelayMs, retryDelay); +} + +async function withRetry(name: string, fn: () => T | Promise): Promise { + let attempt = 0; + while (true) { + attempt += 1; + try { + return await fn(); + } catch (error) { + const retryable = isRetryableError(error); + const throttleRetry = retryThrottlingForever && isStatelyThroughputError(error); + if (!retryable || (!throttleRetry && attempt >= retryMaxAttempts)) { + throw error; + } + + const waitMs = retryDelayMs(error, attempt); + const message = error instanceof Error ? error.message : String(error); + const attemptLabel = throttleRetry + ? `${attempt}/unbounded` + : `${attempt}/${retryMaxAttempts}`; + console.log( + `${name} failed with retryable error (attempt ${attemptLabel}). Waiting ${waitMs}ms before retry.`, + message, + ); + await delay(waitMs); + } + } +} + function parseNumberEnv(envName: string, defaultValue: number): number { const raw = process.env[envName]; if (!raw) { @@ -45,7 +215,9 @@ async function migrateOneClaimedUser( bungieMembershipId: number | undefined, platformMembershipId: string, ): Promise { - const exportResponse = await exportDataForProfile(platformMembershipId); + const exportResponse = await withRetry(`exportDataForProfile:${platformMembershipId}`, () => + exportDataForProfile(platformMembershipId), + ); const { loadouts, itemAnnotations, triumphs, searches, itemHashTags } = extractImportData(exportResponse); @@ -98,7 +270,9 @@ let loopsWithoutWork = 0; try { while (true) { - const claimed = await transaction((client) => claimMigrationWork(client, workerBatchSize)); + const claimed = await withRetry('claimMigrationWork', () => + transaction((client) => claimMigrationWork(client, workerBatchSize)), + ); if (claimed.length === 0) { loopsWithoutWork += 1; @@ -122,29 +296,33 @@ try { ); try { - await transaction(async (pgClient) => { - await migrateOneClaimedUser(pgClient, bungieMembershipId, platformMembershipId); - }); + await withRetry(`migrateOne:${platformMembershipId}`, () => + transaction(async (pgClient) => { + await migrateOneClaimedUser(pgClient, bungieMembershipId, platformMembershipId); + }), + ); - await transaction(async (pgClient) => { - await finishMigrationToPostgres(pgClient, bungieMembershipId, platformMembershipId); - }); + await withRetry(`finishMigration:${platformMembershipId}`, () => + transaction(async (pgClient) => { + await finishMigrationToPostgres(pgClient, bungieMembershipId, platformMembershipId); + }), + ); console.log(`Migration finished for ${platformMembershipId}`); } catch (error) { const errorMessage = toErrorMessage(error); console.error(`Migration failed for ${platformMembershipId}:`, errorMessage); - await transaction(async (pgClient) => { - await abortMigrationToPostgres( - pgClient, - bungieMembershipId, - platformMembershipId, - errorMessage, - ); - }); + await withRetry(`abortMigration:${platformMembershipId}`, () => + transaction(async (pgClient) => { + await abortMigrationToPostgres( + pgClient, + bungieMembershipId, + platformMembershipId, + errorMessage, + ); + }), + ); } - - await delay(betweenUsersDelayMs); } if (runOnce) { From 7397b81e2666c4aea5866c3ddd117be290153a8e Mon Sep 17 00:00:00 2001 From: Ben Hollis Date: Tue, 28 Apr 2026 21:25:57 -0700 Subject: [PATCH 12/12] Remove the restriction on platform membership id --- api/db/migration-state-queries.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/api/db/migration-state-queries.ts b/api/db/migration-state-queries.ts index 0419ac90..191e82e5 100644 --- a/api/db/migration-state-queries.ts +++ b/api/db/migration-state-queries.ts @@ -73,7 +73,6 @@ export async function claimMigrationWork( select platform_membership_id from migration_state where state = $1 - and platform_membership_id = '4611686018433092312' and attempt_count < $2 order by last_state_change_at asc limit $3