Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,11 @@ pr.md
CLAUDE.md
plan.md

....
# Generated / temporary / snapshot files
*.snap
*.test-snapshot
__snapshots__/
*.tmp
*.temp
generated/
prisma/generated/
55 changes: 55 additions & 0 deletions docs/indexer/ownership-read-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Ownership Read Model

The `key_ownership` table is the source of truth for wallet holdings and holder lists in Access Layer. It is a denormalized read model maintained by the indexer on every trade event.

## Table schema

| Column | Type | Description |
| -------------- | ----------- | ----------- |
| `id` | `String` | Primary key (CUID). |
| `ownerAddress` | `String` | Stellar wallet address of the token holder. |
| `creatorId` | `String` | References `creator_profiles.id`. Identifies which creator's tokens are held. |
| `balance` | `Int` | Number of creator tokens currently held by this wallet. Always ≥ 0. |
| `updatedAt` | `DateTime` | Timestamp of the last write, set automatically on each upsert. |

The combination `(ownerAddress, creatorId)` is a unique index — there is exactly one row per wallet/creator pair.

## When the model is updated

The indexer calls `updateOwnership` after processing each confirmed trade event:

- **Buy**: the buyer's `balance` is incremented by the number of tokens purchased.
- **Sell**: the seller's `balance` is decremented by the number of tokens sold.
- **Peer-to-peer transfer**: the sender's `balance` is decremented and the recipient's `balance` is incremented by the transfer amount. Both upserts are applied in the same logical operation.

Rows are created on first acquisition and persist after a full sell (balance reaches zero). A zero-balance row is valid and indicates the wallet previously held tokens.

## Balance conservation invariant

For any given creator, the sum of `balance` across all rows with that `creatorId` must equal the creator's total circulating supply at all times:

```
SUM(key_ownership.balance WHERE creatorId = X) = creator_profiles.totalSupply WHERE id = X
```

The indexer enforces this invariant by applying increments and decrements through Prisma's atomic `{ increment }` / `{ decrement }` operations. No row is overwritten with an absolute value derived from off-chain computation, which eliminates the class of race conditions that would arise from a read-modify-write cycle.

## Replay consistency

If the indexer misses one or more trade events (e.g. due to a crash or a ledger gap), the read model will be inconsistent with the on-chain state.

Consistency is restored by **replaying** the missed ledger range:

1. The gap-detection service (`ledger-gap-detection.service.ts`) identifies missing ledger sequences by comparing the indexed cursor against the chain's latest ledger.
2. The indexer re-fetches and re-processes all events within the gap in sequence order.
3. Because every write uses `upsert` with atomic increments, replaying the same event twice does not corrupt the balance — however, the idempotency guard in `upsertPriceSnapshot` (timestamp comparison) must also be respected for associated price data.
4. After replay, `SUM(balance)` per creator converges back to the expected total supply.

A full rebuild from genesis is possible by truncating `key_ownership` and replaying all events from ledger 0. The `read-model-rebuild.md` document describes the rebuild procedure in detail.

## Related files

- `src/modules/ownership/ownership.service.ts` — `fetchOwnership`, `updateOwnership`
- `src/modules/ownership/ownership.schemas.ts` — Zod schemas for API I/O
- `src/modules/indexer/ledger-gap-detection.service.ts` — gap detection
- `docs/read-model-rebuild.md` — full rebuild procedure
72 changes: 72 additions & 0 deletions src/__tests__/integration/webhook-max-limit.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// src/__tests__/integration/webhook-max-limit.integration.test.ts
// Integration test for #487: webhook max registration limit enforcement.

import supertest from 'supertest';
import app from '../../app';
import { prisma } from '../../utils/prisma.utils';
import { Keypair } from '@stellar/stellar-base';
import { createHash } from 'crypto';
import { envConfig } from '../../config';

const keypair = Keypair.random();
const walletAddress = keypair.publicKey();
const userId = 'webhook-max-limit-user';
const creatorId = 'webhook-max-limit-creator';
const basePath = `/api/v1/creators/${creatorId}/webhooks`;

function authHeaders(method: string, path: string) {
const timestamp = Date.now().toString();
const payload = `${method.toUpperCase()}:${path}:${creatorId}:${timestamp}`;
const hash = createHash('sha256').update(payload, 'utf8').digest();
const signature = keypair.sign(hash).toString('base64');
return { 'x-wallet-address': walletAddress, 'x-signature': signature, 'x-timestamp': timestamp };
}

beforeAll(async () => {
await prisma.user.create({
data: { id: userId, email: 'webhook-max-limit@example.com', passwordHash: 'dummy', firstName: 'Max', lastName: 'Limit' },
});
await prisma.stellarWallet.create({ data: { address: walletAddress, userId } });
await prisma.creatorProfile.create({ data: { id: creatorId, userId, handle: 'webhook-max-limit', displayName: 'Max Limit Creator' } });
});

afterAll(async () => {
await prisma.webhook.deleteMany({ where: { creatorId } });
await prisma.creatorProfile.delete({ where: { id: creatorId } }).catch(() => {});
await prisma.stellarWallet.delete({ where: { address: walletAddress } }).catch(() => {});
await prisma.user.delete({ where: { id: userId } }).catch(() => {});
await prisma.$disconnect();
});

describe('webhook max registration limit (#487)', () => {
it('registers webhooks up to the configured maximum', async () => {
for (let i = 0; i < envConfig.WEBHOOK_MAX_PER_CREATOR; i++) {
const res = await supertest(app)
.post(basePath)
.set(authHeaders('POST', basePath))
.send({ callback_url: `https://example.com/hook-${i}`, events: ['buy'] });

expect(res.status).toBe(201);
}

const count = await prisma.webhook.count({ where: { creatorId, isActive: true } });
expect(count).toBe(envConfig.WEBHOOK_MAX_PER_CREATOR);
});

it('returns 422 when attempting to register beyond the limit', async () => {
const res = await supertest(app)
.post(basePath)
.set(authHeaders('POST', basePath))
.send({ callback_url: 'https://example.com/over-limit', events: ['buy'] });

expect(res.status).toBe(422);
expect(res.body.success).toBe(false);
expect(res.body.error.code).toBe('MAX_WEBHOOKS_REACHED');
expect(res.body.error.message).toMatch(/maximum/i);
});

it('does not increase the webhook count after the failed registration', async () => {
const count = await prisma.webhook.count({ where: { creatorId, isActive: true } });
expect(count).toBe(envConfig.WEBHOOK_MAX_PER_CREATOR);
});
});
30 changes: 29 additions & 1 deletion src/modules/indexer/price-snapshot.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export interface TradeEventPayload {
price: bigint;
/** ISO timestamp of the trade */
tradeAt: Date;
/** Ledger sequence number of the trade event */
ledgerSequence?: number;
}

/**
Expand All @@ -23,7 +25,7 @@ export interface TradeEventPayload {
* Idempotent: re-processing the same event produces the same state.
*/
export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<void> {
const { creatorId, price, tradeAt } = event;
const { creatorId, price, tradeAt, ledgerSequence = null } = event;

try {
const existing = await prisma.creatorPriceSnapshot.findUnique({
Expand All @@ -40,6 +42,16 @@ export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<voi
lastTradeAt: tradeAt,
},
});
logger.debug(
{
creator_id: creatorId,
new_price: price.toString(),
previous_price: null,
ledger_sequence: ledgerSequence,
written_at: new Date().toISOString(),
},
'price-snapshot: written (initial)'
);
return;
}

Expand All @@ -52,6 +64,11 @@ export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<voi
return;
}

// Idempotency: skip write when the price is unchanged.
if (existing.currentPrice === price) {
return;
}

// Promote currentPrice → price24hAgo when the snapshot is older than 24 h.
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const shouldRotate24h =
Expand All @@ -65,6 +82,17 @@ export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<voi
lastTradeAt: tradeAt,
},
});

logger.debug(
{
creator_id: creatorId,
new_price: price.toString(),
previous_price: existing.currentPrice.toString(),
ledger_sequence: ledgerSequence,
written_at: new Date().toISOString(),
},
'price-snapshot: written'
);
} catch (err) {
logger.error({ err, creatorId }, 'price-snapshot: failed to upsert');
throw err;
Expand Down
2 changes: 1 addition & 1 deletion src/modules/webhooks/webhook.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ describe('POST /api/v1/creators/:id/webhooks', () => {
.set(authHeaders('POST', basePath, creatorId))
.send({ callback_url: 'https://example.com/too-many', events: ['buy'] });

expect(res.status).toBe(409);
expect(res.status).toBe(422);
expect(res.body.error.code).toBe('MAX_WEBHOOKS_REACHED');
});
});
Expand Down
2 changes: 1 addition & 1 deletion src/modules/webhooks/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export async function createWebhook(
new Error(
`Maximum of ${envConfig.WEBHOOK_MAX_PER_CREATOR} active webhooks per creator reached`
),
{ statusCode: 409, code: 'MAX_WEBHOOKS_REACHED' }
{ statusCode: 422, code: 'MAX_WEBHOOKS_REACHED' }
);
}

Expand Down
93 changes: 62 additions & 31 deletions src/utils/bigint-serializer.utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,73 @@
import { strict as assert } from 'assert';
import { bigIntReplacer, safeJsonStringify, sanitizeBigInts } from './bigint-serializer.utils';
import { bigIntReplacer, safeJsonStringify, sanitizeBigInts, serializeBigInt } from './bigint-serializer.utils';

function run() {
// bigIntReplacer converts BigInt to string
assert.equal(bigIntReplacer('id', 9007199254740993n), '9007199254740993');
describe('bigIntReplacer', () => {
it('converts BigInt to string', () => {
expect(bigIntReplacer('id', 9007199254740993n)).toBe('9007199254740993');
});

// bigIntReplacer passes non-BigInt values through unchanged
assert.equal(bigIntReplacer('n', 42), 42);
assert.equal(bigIntReplacer('s', 'hello'), 'hello');
assert.equal(bigIntReplacer('b', true), true);
assert.equal(bigIntReplacer('n', null), null);
it('passes non-BigInt values through unchanged', () => {
expect(bigIntReplacer('n', 42)).toBe(42);
expect(bigIntReplacer('s', 'hello')).toBe('hello');
expect(bigIntReplacer('b', true)).toBe(true);
expect(bigIntReplacer('n', null)).toBeNull();
});
});

// safeJsonStringify handles BigInt without throwing
const json = safeJsonStringify({ id: 1000000000000000001n, label: 'test' });
assert.equal(json, '{"id":"1000000000000000001","label":"test"}');
describe('safeJsonStringify', () => {
it('serializes a top-level BigInt without throwing', () => {
const json = safeJsonStringify({ id: 1000000000000000001n, label: 'test' });
expect(json).toBe('{"id":"1000000000000000001","label":"test"}');
});

// safeJsonStringify handles nested BigInt
const nested = safeJsonStringify({ a: { b: 2n } });
assert.equal(nested, '{"a":{"b":"2"}}');
it('serializes nested BigInt values', () => {
expect(safeJsonStringify({ a: { b: 2n } })).toBe('{"a":{"b":"2"}}');
});

// safeJsonStringify with no BigInt behaves like JSON.stringify
assert.equal(safeJsonStringify({ x: 1 }), JSON.stringify({ x: 1 }));
it('behaves like JSON.stringify when no BigInt is present', () => {
expect(safeJsonStringify({ x: 1 })).toBe(JSON.stringify({ x: 1 }));
});
});

// sanitizeBigInts – top-level BigInt
assert.equal(sanitizeBigInts(5n), '5');
describe('sanitizeBigInts', () => {
it('converts a top-level BigInt to string', () => {
expect(sanitizeBigInts(5n)).toBe('5');
});

// sanitizeBigInts – nested object
const sanitized = sanitizeBigInts({ id: 1n, nested: { amount: 500n }, label: 'ok' });
assert.deepEqual(sanitized, { id: '1', nested: { amount: '500' }, label: 'ok' });
it('converts nested BigInt in an object', () => {
expect(sanitizeBigInts({ id: 1n, nested: { amount: 500n }, label: 'ok' })).toEqual({
id: '1',
nested: { amount: '500' },
label: 'ok',
});
});

// sanitizeBigInts – array
assert.deepEqual(sanitizeBigInts([1n, 2n, 3n]), ['1', '2', '3']);
it('converts BigInt inside an array', () => {
expect(sanitizeBigInts([1n, 2n, 3n])).toEqual(['1', '2', '3']);
});

// sanitizeBigInts – non-BigInt primitives pass through
assert.equal(sanitizeBigInts(42), 42);
assert.equal(sanitizeBigInts('str'), 'str');
it('passes non-BigInt primitives through unchanged', () => {
expect(sanitizeBigInts(42)).toBe(42);
expect(sanitizeBigInts('str')).toBe('str');
});
});

console.log('bigint-serializer.utils tests passed');
}
describe('serializeBigInt', () => {
it('converts a top-level BigInt to string', () => {
expect(serializeBigInt(9007199254740993n)).toBe('9007199254740993');
});

run();
it('converts nested BigInt in an object', () => {
expect(serializeBigInt({ amount: 1000n, label: 'x' })).toEqual({ amount: '1000', label: 'x' });
});

it('converts BigInt inside an array', () => {
expect(serializeBigInt([1n, 2n])).toEqual(['1', '2']);
});

it('passes non-BigInt values through unchanged', () => {
expect(serializeBigInt(42)).toBe(42);
expect(serializeBigInt('hello')).toBe('hello');
expect(serializeBigInt(true)).toBe(true);
expect(serializeBigInt(null)).toBeNull();
});
});
18 changes: 18 additions & 0 deletions src/utils/bigint-serializer.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,21 @@ export function sanitizeBigInts(value: unknown): unknown {
}
return value;
}

/**
* Recursively converts BigInt values to strings in objects, arrays, and
* primitives. Safe to call on any value before passing it to the response
* serializer, so XLM amounts and ledger sequences never trigger a
* `TypeError: Do not know how to serialize a BigInt` at the response layer.
*
* Alias for `sanitizeBigInts` — prefer this name in response/serializer paths.
*
* @example
* serializeBigInt(9007199254740993n); // → "9007199254740993"
* serializeBigInt({ amount: 1000n, label: 'x' }); // → { amount: "1000", label: "x" }
* serializeBigInt([1n, 2n]); // → ["1", "2"]
* serializeBigInt(42); // → 42 (unchanged)
*/
export function serializeBigInt(value: unknown): unknown {
return sanitizeBigInts(value);
}
Loading