From b50a51842e732bd84ed32edb0a45b4e8b62b34ef Mon Sep 17 00:00:00 2001 From: temycodes Date: Sun, 28 Jun 2026 17:36:49 +0100 Subject: [PATCH] feat: streaming CSV and Parquet export endpoints (#134) - Add GET /transfers.csv and GET /transfers.parquet - Cursor-based batch fetching (500 rows/batch) keeps memory flat at any scale - CSV streams directly into response via @fast-csv/format - Parquet writes to tmp file then pipes to response, cleaned up after send - Both endpoints accept the same query params as the transfers route (address, contractId, fromLedger, toLedger, fromDate, toDate, eventType) --- package-lock.json | 56 +++++++++++++ package.json | 2 + src/api.ts | 4 + src/routes/exports.ts | 177 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 239 insertions(+) create mode 100644 src/routes/exports.ts diff --git a/package-lock.json b/package-lock.json index f5e82579..5ea22baf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "@apollo/server": "^5.5.1", "@as-integrations/express4": "^1.1.2", "@asteasolutions/zod-to-openapi": "^8.5.0", + "@fast-csv/format": "^5.0.7", "@prisma/client": "^5.10.0", "@stellar/stellar-sdk": "^15.0.1", "cors": "^2.8.5", @@ -18,6 +19,7 @@ "express": "^4.18.3", "express-rate-limit": "^8.3.2", "graphql": "^16.11.0", + "parquetjs-lite": "^0.8.7", "ws": "^8.20.0", "zod": "^4.4.3" }, @@ -1129,6 +1131,15 @@ "tslib": "^2.4.0" } }, + "node_modules/@fast-csv/format": { + "version": "5.0.7", + "resolved": "https://registry.npmjs.org/@fast-csv/format/-/format-5.0.7.tgz", + "integrity": "sha512-VdypoRxv7PF+LsyPouTMKdB0d76hync+gLpgdNqfqVK44MsgW4oiCJSdrki2FisWT7v2QGUYDHjp4L7w5oO6gw==", + "license": "MIT", + "dependencies": { + "lodash.escaperegexp": "^4.1.2" + } + }, "node_modules/@gerrit0/mini-shiki": { "version": "3.23.0", "resolved": "https://registry.npmjs.org/@gerrit0/mini-shiki/-/mini-shiki-3.23.0.tgz", @@ -4781,6 +4792,12 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "license": "ISC" }, + "node_modules/int53": { + "version": "0.2.4", + "resolved": "https://registry.npmjs.org/int53/-/int53-0.2.4.tgz", + "integrity": "sha512-a5jlKftS7HUOhkUyYD7j2sJ/ZnvWiNlZS1ldR+g1ifQ+/UuZXIE+YTc/lK1qGj/GwAU5F8Z0e1eVq2t1J5Ob2g==", + "license": "BSD-3-Clause" + }, "node_modules/ip-address": { "version": "10.1.0", "resolved": "https://registry.npmjs.org/ip-address/-/ip-address-10.1.0.tgz", @@ -5955,6 +5972,12 @@ "node": ">=8" } }, + "node_modules/lodash.escaperegexp": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/lodash.escaperegexp/-/lodash.escaperegexp-4.1.2.tgz", + "integrity": "sha512-TM9YBvyC84ZxE3rgfefxUWiQKLilstD6k7PTGt6wfbtXF8ixIJLOL3VYyV/z+ZiPLsVxAsKAFVwWlWeb2Y8Yyw==", + "license": "MIT" + }, "node_modules/lodash.memoize": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/lodash.memoize/-/lodash.memoize-4.1.2.tgz", @@ -6436,6 +6459,27 @@ "dev": true, "license": "BlueOak-1.0.0" }, + "node_modules/parquetjs-lite": { + "version": "0.8.7", + "resolved": "https://registry.npmjs.org/parquetjs-lite/-/parquetjs-lite-0.8.7.tgz", + "integrity": "sha512-L0tdHzvy0btLJvAFZvjO0+ru+FL8tHPqiVE90KEnMy5jDqv+WmEy9Ii8SjiC+exAOi3RGyPYIdgJvFijYyEahA==", + "license": "MIT", + "dependencies": { + "int53": "^0.2.4", + "node-int64": "^0.3.3", + "snappyjs": "^0.6.0", + "varint": "^5.0.0" + }, + "engines": { + "node": ">=7.6" + } + }, + "node_modules/parquetjs-lite/node_modules/node-int64": { + "version": "0.3.3", + "resolved": "https://registry.npmjs.org/node-int64/-/node-int64-0.3.3.tgz", + "integrity": "sha512-bLdNOp5SYyqfDz/ssGHt2OTg8u+jEkCx4EoZIzprqeonFIUhlSBrKu40e/x6hIFYJx4ZEt64/9IZJyafvhGZrw==", + "license": "MIT" + }, "node_modules/parse-json": { "version": "5.2.0", "resolved": "https://registry.npmjs.org/parse-json/-/parse-json-5.2.0.tgz", @@ -7071,6 +7115,12 @@ "node": ">=8" } }, + "node_modules/snappyjs": { + "version": "0.6.1", + "resolved": "https://registry.npmjs.org/snappyjs/-/snappyjs-0.6.1.tgz", + "integrity": "sha512-YIK6I2lsH072UE0aOFxxY1dPDCS43I5ktqHpeAsuLNYWkE5pGxRGWfDM4/vSUfNzXjC1Ivzt3qx31PCLmc9yqg==", + "license": "MIT" + }, "node_modules/source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", @@ -7891,6 +7941,12 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, + "node_modules/varint": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/varint/-/varint-5.0.2.tgz", + "integrity": "sha512-lKxKYG6H03yCZUpAGOPOsMcGxd1RHCu1iKvEHYDPmTyq2HueGhD73ssNBqqQWfvYs04G9iUFRvmAVLW20Jw6ow==", + "license": "MIT" + }, "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", diff --git a/package.json b/package.json index b9d7c4b8..50e8ba45 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "@apollo/server": "^5.5.1", "@as-integrations/express4": "^1.1.2", "@asteasolutions/zod-to-openapi": "^8.5.0", + "@fast-csv/format": "^5.0.7", "@prisma/client": "^5.10.0", "@stellar/stellar-sdk": "^15.0.1", "cors": "^2.8.5", @@ -67,6 +68,7 @@ "express": "^4.18.3", "express-rate-limit": "^8.3.2", "graphql": "^16.11.0", + "parquetjs-lite": "^0.8.7", "ws": "^8.20.0", "zod": "^4.4.3" }, diff --git a/src/api.ts b/src/api.ts index 4f164dd6..02edd4ee 100644 --- a/src/api.ts +++ b/src/api.ts @@ -9,6 +9,7 @@ import { createAccountsRouter } from "./api/accounts"; import { createWebhooksRouter } from "./api/webhooks"; import { createGraphQLMiddleware } from "./graphql/server"; import { createPopularAssetsRouter } from "./routes/assets/popular"; +import { createExportsRouter } from "./routes/exports"; import { hostFnQuerySchema, nftOwnerParamsSchema, @@ -96,6 +97,9 @@ export function createApp(): express.Application { // ── Assets routes ─────────────────────────────────────────────────────────── app.use("/assets", createPopularAssetsRouter()); + // ── Export routes ───────────────────────────────────────────────────────────── + app.use("/", createExportsRouter()); + // ── Helpers ────────────────────────────────────────────────────────────────── const parseIntParam = (val: unknown, fallback: number): number => { const n = parseInt(String(val), 10); diff --git a/src/routes/exports.ts b/src/routes/exports.ts new file mode 100644 index 00000000..0fd0844d --- /dev/null +++ b/src/routes/exports.ts @@ -0,0 +1,177 @@ +import { Router, Request, Response, NextFunction } from "express"; +import { format as csvFormat } from "@fast-csv/format"; +import { prisma, toDisplayAmount } from "../db"; +import os from "os"; +import path from "path"; +import fs from "fs"; + +// How many rows we fetch per DB round-trip. Keeps memory flat. +const BATCH_SIZE = 500; + +// ── Shared: parse query params into a Prisma where clause ──────────────────── +function buildWhere(query: Record) { + const { + address, + contractId, + fromLedger, + toLedger, + fromDate, + toDate, + eventType, + } = query; + + const where: Record = {}; + + if (address) { + where.OR = [{ fromAddress: address }, { toAddress: address }]; + } + if (contractId) where.contractId = contractId; + if (eventType) { + const types = String(eventType).split(",").map((s) => s.trim()).filter(Boolean); + if (types.length) where.eventType = { in: types }; + } + + const ledgerRange: Record = {}; + if (fromLedger) ledgerRange.gte = parseInt(String(fromLedger), 10); + if (toLedger) ledgerRange.lte = parseInt(String(toLedger), 10); + if (Object.keys(ledgerRange).length) where.ledger = ledgerRange; + + const dateRange: Record = {}; + if (fromDate) dateRange.gte = new Date(String(fromDate)); + if (toDate) dateRange.lte = new Date(String(toDate)); + if (Object.keys(dateRange).length) where.ledgerClosedAt = dateRange; + + return where; +} + +// ── Shared: async generator that yields rows in batches via cursor ──────────── +async function* streamTransfers(where: Record) { + let lastId: number | undefined = undefined; + + while (true) { + const rows: Awaited> = await prisma.tokenTransfer.findMany({ + where, + orderBy: { id: "asc" }, + take: BATCH_SIZE, + ...(lastId !== undefined ? { cursor: { id: lastId }, skip: 1 } : {}), + }); + + if (rows.length === 0) break; + + for (const row of rows) { + yield row; + } + + if (rows.length < BATCH_SIZE) break; + lastId = rows[rows.length - 1].id; + } +} + +// ── CSV endpoint ───────────────────────────────────────────────────────────── +async function handleCsvExport(req: Request, res: Response, next: NextFunction) { + try { + const where = buildWhere(req.query as Record); + + res.setHeader("Content-Type", "text/csv"); + res.setHeader("Content-Disposition", "attachment; filename=\"transfers.csv\""); + res.setHeader("Transfer-Encoding", "chunked"); + + const csvStream = csvFormat({ headers: true }); + csvStream.pipe(res); + + for await (const row of streamTransfers(where)) { + csvStream.write({ + id: row.id, + contractId: row.contractId, + eventType: row.eventType, + fromAddress: row.fromAddress ?? "", + toAddress: row.toAddress ?? "", + amount: row.amount, + displayAmount: toDisplayAmount(row.amount), + ledger: row.ledger, + ledgerClosedAt: row.ledgerClosedAt.toISOString(), + txHash: row.txHash, + eventId: row.eventId, + isSac: row.isSac ?? false, + createdAt: row.createdAt.toISOString(), + }); + } + + csvStream.end(); + } catch (err) { + next(err); + } +} + +// ── Parquet endpoint ───────────────────────────────────────────────────────── +async function handleParquetExport(req: Request, res: Response, next: NextFunction) { + // parquetjs-lite is a CommonJS module — require() avoids ESM interop issues + // eslint-disable-next-line @typescript-eslint/no-var-requires + const parquet = require("parquetjs-lite"); + + const tmpFile = path.join(os.tmpdir(), `transfers-${Date.now()}-${Math.random().toString(36).slice(2)}.parquet`); + + try { + const where = buildWhere(req.query as Record); + + const schema = new parquet.ParquetSchema({ + id: { type: "INT64" }, + contractId: { type: "UTF8" }, + eventType: { type: "UTF8" }, + fromAddress: { type: "UTF8", optional: true }, + toAddress: { type: "UTF8", optional: true }, + amount: { type: "UTF8" }, + displayAmount: { type: "UTF8" }, + ledger: { type: "INT32" }, + ledgerClosedAt: { type: "UTF8" }, + txHash: { type: "UTF8" }, + eventId: { type: "UTF8" }, + isSac: { type: "BOOLEAN", optional: true }, + createdAt: { type: "UTF8" }, + }); + + const writer = await parquet.ParquetWriter.openFile(schema, tmpFile); + + for await (const row of streamTransfers(where)) { + await writer.appendRow({ + id: row.id, + contractId: row.contractId, + eventType: row.eventType, + fromAddress: row.fromAddress ?? null, + toAddress: row.toAddress ?? null, + amount: row.amount, + displayAmount: toDisplayAmount(row.amount), + ledger: row.ledger, + ledgerClosedAt: row.ledgerClosedAt.toISOString(), + txHash: row.txHash, + eventId: row.eventId, + isSac: row.isSac ?? null, + createdAt: row.createdAt.toISOString(), + }); + } + + await writer.close(); + + res.setHeader("Content-Type", "application/octet-stream"); + res.setHeader("Content-Disposition", "attachment; filename=\"transfers.parquet\""); + + const fileStream = fs.createReadStream(tmpFile); + fileStream.pipe(res); + fileStream.on("end", () => fs.unlink(tmpFile, () => {})); + fileStream.on("error", (err) => { + fs.unlink(tmpFile, () => {}); + next(err); + }); + } catch (err) { + fs.unlink(tmpFile, () => {}); + next(err); + } +} + +// ── Router ─────────────────────────────────────────────────────────────────── +export function createExportsRouter(): Router { + const router = Router(); + router.get("/transfers.csv", handleCsvExport); + router.get("/transfers.parquet", handleParquetExport); + return router; +}