diff --git a/README.md b/README.md index 15faac8..c289670 100644 --- a/README.md +++ b/README.md @@ -557,7 +557,7 @@ In local mode: ### Connections ```typescript -import { defineKafkaConnection, defineS3Connection, defineGCSConnection, secret } from "@tinybirdco/sdk"; +import { defineKafkaConnection, defineS3Connection, defineGCSConnection, defineDynamoDBConnection, secret } from "@tinybirdco/sdk"; export const eventsKafka = defineKafkaConnection("events_kafka", { bootstrapServers: "kafka.example.com:9092", @@ -575,13 +575,18 @@ export const landingS3 = defineS3Connection("landing_s3", { export const landingGCS = defineGCSConnection("landing_gcs", { serviceAccountCredentialsJson: secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"), }); + +export const eventsDynamo = defineDynamoDBConnection("events_dynamo", { + region: "us-east-1", + arn: secret("DYNAMODB_ROLE_ARN"), +}); ``` Use connections from datasources: ```typescript import { defineDatasource, t, engine } from "@tinybirdco/sdk"; -import { eventsKafka, landingS3, landingGCS } from "./connections"; +import { eventsKafka, landingS3, landingGCS, eventsDynamo } from "./connections"; export const kafkaEvents = defineDatasource("kafka_events", { schema: { @@ -622,6 +627,26 @@ export const gcsLanding = defineDatasource("gcs_landing", { schedule: "@auto", }, }); + +export const dynamoEvents = defineDatasource("dynamo_events", { + schema: { + id: t.string().jsonPath("$.Id"), + _record: t.string().jsonPath("$.NewImage"), + _timestamp: t.dateTime64(3).jsonPath("$.ApproximateCreationDateTime"), + _event_name: t.string().lowCardinality().jsonPath("$.eventName"), + _is_deleted: t.uint8().jsonPath("$._is_deleted"), + }, + engine: engine.replacingMergeTree({ + sortingKey: ["id"], + ver: "_timestamp", + isDeleted: "_is_deleted", + }), + dynamodb: { + connection: eventsDynamo, + tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/events", + exportBucket: "s3://my-export-bucket", + }, +}); ``` ### Datasources diff --git a/package.json b/package.json index 57a499f..775cc38 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tinybirdco/sdk", - "version": "0.0.77", + "version": "0.0.78", "description": "TypeScript SDK for Tinybird Forward - define datasources and pipes as TypeScript", "type": "module", "main": "./dist/index.js", diff --git a/src/cli/commands/migrate.test.ts b/src/cli/commands/migrate.test.ts index da584f4..0bef5b0 100644 --- a/src/cli/commands/migrate.test.ts +++ b/src/cli/commands/migrate.test.ts @@ -716,6 +716,101 @@ IMPORT_SCHEDULE '@on-demand' expect(output).toContain('schedule: "@on-demand"'); }); + it("migrates dynamodb connection and import datasource directives", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); + tempDirs.push(tempDir); + + writeFile( + tempDir, + "dynamo_sample.connection", + `TYPE dynamodb +DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }} +DYNAMODB_REGION us-east-1 +` + ); + + writeFile( + tempDir, + "events_dynamo.datasource", + `SCHEMA > + id String \`json:$.Id\`, + _record String \`json:$.NewImage\` + +ENGINE "ReplacingMergeTree" +ENGINE_SORTING_KEY "id" +IMPORT_CONNECTION_NAME dynamo_sample +IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events +IMPORT_EXPORT_BUCKET s3://my-export-bucket +` + ); + + const result = await runMigrate({ + cwd: tempDir, + patterns: ["."], + strict: true, + }); + + expect(result.success).toBe(true); + expect(result.errors).toHaveLength(0); + expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1); + expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(1); + + const output = fs.readFileSync(result.outputPath, "utf-8"); + expect(output).toContain("defineDynamoDBConnection"); + expect(output).toContain('export const dynamoSample = defineDynamoDBConnection("dynamo_sample", {'); + expect(output).toContain('region: "us-east-1"'); + expect(output).toContain('arn: secret("DYNAMODB_ROLE_ARN")'); + expect(output).toContain("dynamodb: {"); + expect(output).toContain("connection: dynamoSample"); + expect(output).toContain( + 'tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/events"' + ); + expect(output).toContain('exportBucket: "s3://my-export-bucket"'); + }); + + it("reports an error when DynamoDB directives use a non-dynamodb connection type", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); + tempDirs.push(tempDir); + + writeFile( + tempDir, + "s3sample.connection", + `TYPE s3 +S3_REGION us-east-1 +S3_ARN "arn:aws:iam::123456789012:role/tinybird-s3-access" +` + ); + + writeFile( + tempDir, + "events_dynamo.datasource", + `SCHEMA > + id String + +ENGINE "ReplacingMergeTree" +ENGINE_SORTING_KEY "id" +IMPORT_CONNECTION_NAME s3sample +IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events +IMPORT_EXPORT_BUCKET s3://my-export-bucket +` + ); + + const result = await runMigrate({ + cwd: tempDir, + patterns: ["."], + strict: true, + }); + + expect(result.success).toBe(false); + expect( + result.errors.some((error) => + error.message.includes( + 'Datasource DynamoDB ingestion requires a dynamodb connection, found "s3".' + ) + ) + ).toBe(true); + }); + it("reports an error when import directives use a non-bucket connection type", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-")); tempDirs.push(tempDir); diff --git a/src/cli/commands/migrate.ts b/src/cli/commands/migrate.ts index e7b9582..67b2b57 100644 --- a/src/cli/commands/migrate.ts +++ b/src/cli/commands/migrate.ts @@ -135,7 +135,8 @@ export async function runMigrate( const referencedConnectionName = datasource.kafka?.connectionName ?? datasource.s3?.connectionName ?? - datasource.gcs?.connectionName; + datasource.gcs?.connectionName ?? + datasource.dynamodb?.connectionName; if ( referencedConnectionName && @@ -186,6 +187,21 @@ export async function runMigrate( } } + if (datasource.dynamodb) { + const dynamodbConnectionType = parsedConnectionTypeByName.get( + datasource.dynamodb.connectionName + ); + if (dynamodbConnectionType !== "dynamodb") { + errors.push({ + filePath: datasource.filePath, + resourceName: datasource.name, + resourceKind: datasource.kind, + message: `Datasource DynamoDB ingestion requires a dynamodb connection, found "${dynamodbConnectionType ?? "(none)"}".`, + }); + continue; + } + } + try { validateResourceForEmission(datasource); migrated.push(datasource); diff --git a/src/generator/connection.test.ts b/src/generator/connection.test.ts index 93e85f8..fff6d6f 100644 --- a/src/generator/connection.test.ts +++ b/src/generator/connection.test.ts @@ -4,6 +4,7 @@ import { defineKafkaConnection, defineS3Connection, defineGCSConnection, + defineDynamoDBConnection, } from "../schema/connection.js"; describe("Connection Generator", () => { @@ -218,6 +219,22 @@ describe("Connection Generator", () => { 'GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON") }}' ); }); + + it("generates DynamoDB connection with arn and region", () => { + const conn = defineDynamoDBConnection("my_dynamo", { + region: "us-east-1", + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }); + + const result = generateConnection(conn); + + expect(result.name).toBe("my_dynamo"); + expect(result.content).toBe( + ['TYPE dynamodb', 'DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }}', "DYNAMODB_REGION us-east-1"].join( + "\n" + ) + ); + }); }); describe("generateAllConnections", () => { diff --git a/src/generator/connection.ts b/src/generator/connection.ts index b37fa61..a52ebd0 100644 --- a/src/generator/connection.ts +++ b/src/generator/connection.ts @@ -7,10 +7,12 @@ import type { ConnectionDefinition, KafkaConnectionDefinition, GCSConnectionDefinition, + DynamoDBConnectionDefinition, } from "../schema/connection.js"; import { isS3ConnectionDefinition, isGCSConnectionDefinition, + isDynamoDBConnectionDefinition, type S3ConnectionDefinition, } from "../schema/connection.js"; @@ -123,6 +125,20 @@ function generateGCSConnection(connection: GCSConnectionDefinition): string { return parts.join("\n"); } +/** + * Generate a DynamoDB connection content + */ +function generateDynamoDBConnection(connection: DynamoDBConnectionDefinition): string { + const parts: string[] = []; + const options = connection.options; + + parts.push("TYPE dynamodb"); + parts.push(`DYNAMODB_ARN ${options.arn}`); + parts.push(`DYNAMODB_REGION ${options.region}`); + + return parts.join("\n"); +} + /** * Generate a .connection file content from a ConnectionDefinition * @@ -160,6 +176,8 @@ export function generateConnection( content = generateS3Connection(connection); } else if (isGCSConnectionDefinition(connection)) { content = generateGCSConnection(connection); + } else if (isDynamoDBConnectionDefinition(connection)) { + content = generateDynamoDBConnection(connection); } else { throw new Error("Unsupported connection type."); } diff --git a/src/generator/datasource.test.ts b/src/generator/datasource.test.ts index 5c2ec8c..3a83664 100644 --- a/src/generator/datasource.test.ts +++ b/src/generator/datasource.test.ts @@ -1,7 +1,7 @@ import { describe, it, expect } from 'vitest'; import { generateDatasource, generateAllDatasources } from './datasource.js'; import { defineDatasource } from '../schema/datasource.js'; -import { defineKafkaConnection, defineS3Connection, defineGCSConnection } from '../schema/connection.js'; +import { defineKafkaConnection, defineS3Connection, defineGCSConnection, defineDynamoDBConnection } from '../schema/connection.js'; import { defineToken } from '../schema/token.js'; import { t } from '../schema/types.js'; import { engine } from '../schema/engines.js'; @@ -629,6 +629,59 @@ describe('Datasource Generator', () => { }); }); + describe('DynamoDB configuration', () => { + it('includes DynamoDB import directives', () => { + const dynamoConn = defineDynamoDBConnection('my_dynamo', { + region: 'us-east-1', + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }); + + const ds = defineDatasource('dynamo_events', { + schema: { + id: t.string(), + _record: t.string(), + }, + engine: engine.replacingMergeTree({ sortingKey: ['id'] }), + dynamodb: { + connection: dynamoConn, + tableArn: 'arn:aws:dynamodb:us-east-1:123456789012:table/events', + exportBucket: 's3://my-export-bucket', + }, + }); + + const result = generateDatasource(ds); + + expect(result.content).toContain('IMPORT_CONNECTION_NAME my_dynamo'); + expect(result.content).toContain( + 'IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events' + ); + expect(result.content).toContain('IMPORT_EXPORT_BUCKET s3://my-export-bucket'); + }); + + it('rejects mixing DynamoDB with other ingestion options', () => { + const dynamoConn = defineDynamoDBConnection('my_dynamo', { + region: 'us-east-1', + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }); + const s3Conn = defineS3Connection('my_s3', { + region: 'us-east-1', + arn: 'arn:aws:iam::123456789012:role/x', + }); + + expect(() => + defineDatasource('mixed', { + schema: { id: t.string() }, + dynamodb: { + connection: dynamoConn, + tableArn: 'arn:aws:dynamodb:us-east-1:123456789012:table/events', + exportBucket: 's3://my-export-bucket', + }, + s3: { connection: s3Conn, bucketUri: 's3://b/*.csv' }, + }) + ).toThrow('only define one ingestion option'); + }); + }); + describe('Token generation', () => { it('generates TOKEN lines with inline config', () => { const ds = defineDatasource('test_ds', { diff --git a/src/generator/datasource.ts b/src/generator/datasource.ts index 1fbbd2e..ba19504 100644 --- a/src/generator/datasource.ts +++ b/src/generator/datasource.ts @@ -10,6 +10,7 @@ import type { KafkaConfig, S3Config, GCSConfig, + DynamoDBConfig, TokenConfig, DatasourceIndex, } from "../schema/datasource.js"; @@ -199,6 +200,19 @@ function generateImportConfig(importConfig: S3Config | GCSConfig): string { return parts.join("\n"); } +/** + * Generate DynamoDB import configuration lines + */ +function generateDynamoDBConfig(dynamodb: DynamoDBConfig): string { + const parts: string[] = []; + + parts.push(`IMPORT_CONNECTION_NAME ${dynamodb.connection._name}`); + parts.push(`IMPORT_TABLE_ARN ${dynamodb.tableArn}`); + parts.push(`IMPORT_EXPORT_BUCKET ${dynamodb.exportBucket}`); + + return parts.join("\n"); +} + /** * Generate forward query section */ @@ -318,9 +332,12 @@ export function generateDatasource( datasource.options.kafka, datasource.options.s3, datasource.options.gcs, + datasource.options.dynamodb, ].filter(Boolean).length; if (ingestionConfigCount > 1) { - throw new Error("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`."); + throw new Error( + "Datasource can only define one ingestion option: `kafka`, `s3`, `gcs`, or `dynamodb`." + ); } // Add description if present @@ -369,6 +386,12 @@ export function generateDatasource( parts.push(generateImportConfig(datasource.options.gcs)); } + // Add DynamoDB configuration if present + if (datasource.options.dynamodb) { + parts.push(""); + parts.push(generateDynamoDBConfig(datasource.options.dynamodb)); + } + // Add forward query if present const forwardQuery = generateForwardQuery(datasource.options.forwardQuery); if (forwardQuery) { diff --git a/src/index.ts b/src/index.ts index 0e9ce9a..184b5a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -116,6 +116,7 @@ export type { KafkaConfig, S3Config, GCSConfig, + DynamoDBConfig, DatasourceIndex, } from "./schema/datasource.js"; @@ -125,10 +126,12 @@ export { createKafkaConnection, defineS3Connection, defineGCSConnection, + defineDynamoDBConnection, isConnectionDefinition, isKafkaConnectionDefinition, isS3ConnectionDefinition, isGCSConnectionDefinition, + isDynamoDBConnectionDefinition, getConnectionType, } from "./schema/connection.js"; export type { @@ -142,6 +145,8 @@ export type { S3ConnectionOptions, GCSConnectionDefinition, GCSConnectionOptions, + DynamoDBConnectionDefinition, + DynamoDBConnectionOptions, } from "./schema/connection.js"; // ============ Token ============ diff --git a/src/migrate/emit-ts.ts b/src/migrate/emit-ts.ts index 85160f3..473eb10 100644 --- a/src/migrate/emit-ts.ts +++ b/src/migrate/emit-ts.ts @@ -4,6 +4,7 @@ import { parseLiteralFromDatafile, toTsLiteral } from "./parser-utils.js"; import type { DatasourceModel, DatasourceEngineModel, + DynamoDBConnectionModel, GCSConnectionModel, KafkaConnectionModel, ParsedResource, @@ -72,6 +73,9 @@ function hasSecretTemplate(resources: ParsedResource[]): boolean { if (resource.arn) values.push(resource.arn); if (resource.accessKey) values.push(resource.accessKey); if (resource.secret) values.push(resource.secret); + } else if (resource.connectionType === "dynamodb") { + values.push(resource.region); + values.push(resource.arn); } else { values.push(resource.serviceAccountCredentialsJson); } @@ -95,6 +99,10 @@ function hasSecretTemplate(resources: ParsedResource[]): boolean { if (resource.gcs.schedule) values.push(resource.gcs.schedule); if (resource.gcs.fromTimestamp) values.push(resource.gcs.fromTimestamp); } + if (resource.dynamodb) { + values.push(resource.dynamodb.tableArn); + values.push(resource.dynamodb.exportBucket); + } continue; } } @@ -388,6 +396,15 @@ function emitDatasource(ds: DatasourceModel): string { lines.push(" },"); } + if (ds.dynamodb) { + const connectionVar = toCamelCase(ds.dynamodb.connectionName); + lines.push(" dynamodb: {"); + lines.push(` connection: ${connectionVar},`); + lines.push(` tableArn: ${emitStringOrSecret(ds.dynamodb.tableArn)},`); + lines.push(` exportBucket: ${emitStringOrSecret(ds.dynamodb.exportBucket)},`); + lines.push(" },"); + } + if (ds.forwardQuery) { lines.push(" forwardQuery: `"); lines.push(escapeTemplateLiteral(ds.forwardQuery)); @@ -416,7 +433,11 @@ function emitDatasource(ds: DatasourceModel): string { } function emitConnection( - connection: KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel + connection: + | KafkaConnectionModel + | S3ConnectionModel + | GCSConnectionModel + | DynamoDBConnectionModel ): string { const variableName = toCamelCase(connection.name); const lines: string[] = []; @@ -486,6 +507,17 @@ function emitConnection( return lines.join("\n"); } + if (connection.connectionType === "dynamodb") { + lines.push( + `export const ${variableName} = defineDynamoDBConnection(${escapeString(connection.name)}, {` + ); + lines.push(` region: ${emitStringOrSecret(connection.region)},`); + lines.push(` arn: ${emitStringOrSecret(connection.arn)},`); + lines.push("});"); + lines.push(""); + return lines.join("\n"); + } + lines.push( `export const ${variableName} = defineGCSConnection(${escapeString(connection.name)}, {` ); @@ -625,8 +657,13 @@ function emitPipe(pipe: PipeModel): string { export function emitMigrationFileContent(resources: ParsedResource[]): string { const connections = resources.filter( - (resource): resource is KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel => - resource.kind === "connection" + ( + resource + ): resource is + | KafkaConnectionModel + | S3ConnectionModel + | GCSConnectionModel + | DynamoDBConnectionModel => resource.kind === "connection" ); const datasources = resources.filter( (resource): resource is DatasourceModel => resource.kind === "datasource" @@ -655,6 +692,9 @@ export function emitMigrationFileContent(resources: ParsedResource[]): string { if (connections.some((connection) => connection.connectionType === "gcs")) { imports.add("defineGCSConnection"); } + if (connections.some((connection) => connection.connectionType === "dynamodb")) { + imports.add("defineDynamoDBConnection"); + } if (needsParams) { imports.add("p"); } @@ -672,6 +712,7 @@ export function emitMigrationFileContent(resources: ParsedResource[]): string { "defineKafkaConnection", "defineS3Connection", "defineGCSConnection", + "defineDynamoDBConnection", "defineDatasource", "definePipe", "defineMaterializedView", diff --git a/src/migrate/parse-connection.test.ts b/src/migrate/parse-connection.test.ts index 1e3f464..0521f6a 100644 --- a/src/migrate/parse-connection.test.ts +++ b/src/migrate/parse-connection.test.ts @@ -122,4 +122,45 @@ KAFKA_SASL_OAUTHBEARER_AWS_EXTERNAL_ID {{ tb_secret("KAFKA_AWS_EXTERNAL_ID") }}` '{{ tb_secret("KAFKA_AWS_EXTERNAL_ID") }}' ); }); + + it("parses basic DynamoDB connection", () => { + const result = parseConnectionFile( + resource( + "my_dynamo", + `TYPE dynamodb +DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }} +DYNAMODB_REGION us-east-1` + ) + ); + + expect(result.connectionType).toBe("dynamodb"); + expect(result).toHaveProperty("arn", '{{ tb_secret("DYNAMODB_ROLE_ARN") }}'); + expect(result).toHaveProperty("region", "us-east-1"); + }); + + it("throws when DynamoDB connection is missing DYNAMODB_ARN", () => { + expect(() => + parseConnectionFile( + resource( + "my_dynamo", + `TYPE dynamodb +DYNAMODB_REGION us-east-1` + ) + ) + ).toThrow("DYNAMODB_ARN is required"); + }); + + it("throws when DynamoDB connection mixes S3 directives", () => { + expect(() => + parseConnectionFile( + resource( + "my_dynamo", + `TYPE dynamodb +DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }} +DYNAMODB_REGION us-east-1 +S3_ARN arn:aws:iam::1:role/x` + ) + ) + ).toThrow("Kafka/S3/GCS directives are not valid for dynamodb connections."); + }); }); diff --git a/src/migrate/parse-connection.ts b/src/migrate/parse-connection.ts index 28af388..39f4a10 100644 --- a/src/migrate/parse-connection.ts +++ b/src/migrate/parse-connection.ts @@ -1,4 +1,5 @@ import type { + DynamoDBConnectionModel, GCSConnectionModel, KafkaConnectionModel, ResourceFile, @@ -31,6 +32,8 @@ const CONNECTION_DIRECTIVES = new Set([ "S3_ACCESS_KEY", "S3_SECRET", "GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON", + "DYNAMODB_ARN", + "DYNAMODB_REGION", ]); function isConnectionDirectiveLine(line: string): boolean { @@ -43,7 +46,7 @@ function isConnectionDirectiveLine(line: string): boolean { export function parseConnectionFile( resource: ResourceFile -): KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel { +): KafkaConnectionModel | S3ConnectionModel | GCSConnectionModel | DynamoDBConnectionModel { const lines = splitLines(resource.content); let connectionType: string | undefined; @@ -74,6 +77,9 @@ export function parseConnectionFile( let accessSecret: string | undefined; let serviceAccountCredentialsJson: string | undefined; + let dynamodbArn: string | undefined; + let dynamodbRegion: string | undefined; + let i = 0; while (i < lines.length) { const rawLine = lines[i] ?? ""; @@ -177,6 +183,12 @@ export function parseConnectionFile( } serviceAccountCredentialsJson = parseQuotedValue(value); break; + case "DYNAMODB_ARN": + dynamodbArn = parseQuotedValue(value); + break; + case "DYNAMODB_REGION": + dynamodbRegion = parseQuotedValue(value); + break; default: throw new MigrationParseError( resource.filePath, @@ -198,12 +210,20 @@ export function parseConnectionFile( } if (connectionType === "kafka") { - if (region || arn || accessKey || accessSecret || serviceAccountCredentialsJson) { + if ( + region || + arn || + accessKey || + accessSecret || + serviceAccountCredentialsJson || + dynamodbArn || + dynamodbRegion + ) { throw new MigrationParseError( resource.filePath, "connection", resource.name, - "S3/GCS directives are not valid for kafka connections." + "S3/GCS/DynamoDB directives are not valid for kafka connections." ); } @@ -248,13 +268,15 @@ export function parseConnectionFile( secret || schemaRegistryUrl || sslCaPem || - serviceAccountCredentialsJson + serviceAccountCredentialsJson || + dynamodbArn || + dynamodbRegion ) { throw new MigrationParseError( resource.filePath, "connection", resource.name, - "Kafka/GCS directives are not valid for s3 connections." + "Kafka/GCS/DynamoDB directives are not valid for s3 connections." ); } @@ -313,13 +335,15 @@ export function parseConnectionFile( region || arn || accessKey || - accessSecret + accessSecret || + dynamodbArn || + dynamodbRegion ) { throw new MigrationParseError( resource.filePath, "connection", resource.name, - "Kafka/S3 directives are not valid for gcs connections." + "Kafka/S3/DynamoDB directives are not valid for gcs connections." ); } @@ -341,6 +365,61 @@ export function parseConnectionFile( }; } + if (connectionType === "dynamodb") { + if ( + bootstrapServers || + securityProtocol || + saslMechanism || + saslOauthbearerMethod || + saslOauthbearerAwsRegion || + saslOauthbearerAwsRoleArn || + saslOauthbearerAwsExternalId || + key || + secret || + schemaRegistryUrl || + sslCaPem || + region || + arn || + accessKey || + accessSecret || + serviceAccountCredentialsJson + ) { + throw new MigrationParseError( + resource.filePath, + "connection", + resource.name, + "Kafka/S3/GCS directives are not valid for dynamodb connections." + ); + } + + if (!dynamodbArn) { + throw new MigrationParseError( + resource.filePath, + "connection", + resource.name, + "DYNAMODB_ARN is required for dynamodb connections." + ); + } + + if (!dynamodbRegion) { + throw new MigrationParseError( + resource.filePath, + "connection", + resource.name, + "DYNAMODB_REGION is required for dynamodb connections." + ); + } + + return { + kind: "connection", + name: resource.name, + filePath: resource.filePath, + connectionType: "dynamodb", + arn: dynamodbArn, + region: dynamodbRegion, + }; + } + throw new MigrationParseError( resource.filePath, "connection", diff --git a/src/migrate/parse-datasource.ts b/src/migrate/parse-datasource.ts index 0e1cedb..fb653d2 100644 --- a/src/migrate/parse-datasource.ts +++ b/src/migrate/parse-datasource.ts @@ -39,6 +39,8 @@ const DATASOURCE_DIRECTIVES = new Set([ "IMPORT_BUCKET_URI", "IMPORT_SCHEDULE", "IMPORT_FROM_TIMESTAMP", + "IMPORT_TABLE_ARN", + "IMPORT_EXPORT_BUCKET", "TOKEN", ]); @@ -321,6 +323,8 @@ export function parseDatasourceFile(resource: ResourceFile): DatasourceModel { let importBucketUri: string | undefined; let importSchedule: string | undefined; let importFromTimestamp: string | undefined; + let importTableArn: string | undefined; + let importExportBucket: string | undefined; let i = 0; while (i < lines.length) { @@ -498,6 +502,12 @@ export function parseDatasourceFile(resource: ResourceFile): DatasourceModel { case "IMPORT_FROM_TIMESTAMP": importFromTimestamp = parseQuotedValue(value); break; + case "IMPORT_TABLE_ARN": + importTableArn = parseQuotedValue(value); + break; + case "IMPORT_EXPORT_BUCKET": + importExportBucket = parseQuotedValue(value); + break; case "TOKEN": tokens.push(parseToken(resource.filePath, resource.name, value)); break; @@ -573,8 +583,30 @@ export function parseDatasourceFile(resource: ResourceFile): DatasourceModel { ); } + // DynamoDB import directives share IMPORT_CONNECTION_NAME with blob storage, + // so treat the import block as DynamoDB whenever a DynamoDB-only directive is present. + const isDynamoDB = importTableArn !== undefined || importExportBucket !== undefined; + + const dynamodb = isDynamoDB + ? { + connectionName: importConnectionName ?? "", + tableArn: importTableArn ?? "", + exportBucket: importExportBucket ?? "", + } + : undefined; + + if (dynamodb && (!dynamodb.connectionName || !dynamodb.tableArn || !dynamodb.exportBucket)) { + throw new MigrationParseError( + resource.filePath, + "datasource", + resource.name, + "IMPORT_CONNECTION_NAME, IMPORT_TABLE_ARN and IMPORT_EXPORT_BUCKET are required when DynamoDB directives are used." + ); + } + const s3 = - importConnectionName || importBucketUri || importSchedule || importFromTimestamp + !isDynamoDB && + (importConnectionName || importBucketUri || importSchedule || importFromTimestamp) ? { connectionName: importConnectionName ?? "", bucketUri: importBucketUri ?? "", @@ -592,7 +624,7 @@ export function parseDatasourceFile(resource: ResourceFile): DatasourceModel { ); } - if (kafka && s3) { + if (kafka && (s3 || dynamodb)) { throw new MigrationParseError( resource.filePath, "datasource", @@ -625,6 +657,7 @@ export function parseDatasourceFile(resource: ResourceFile): DatasourceModel { indexes, kafka, s3, + dynamodb, forwardQuery, tokens, sharedWith, diff --git a/src/migrate/types.ts b/src/migrate/types.ts index a2aa354..05b48e1 100644 --- a/src/migrate/types.ts +++ b/src/migrate/types.ts @@ -59,6 +59,12 @@ export interface DatasourceGCSModel { fromTimestamp?: string; } +export interface DatasourceDynamoDBModel { + connectionName: string; + tableArn: string; + exportBucket: string; +} + export interface DatasourceTokenModel { name: string; scope: "READ" | "APPEND"; @@ -82,6 +88,7 @@ export interface DatasourceModel { kafka?: DatasourceKafkaModel; s3?: DatasourceS3Model; gcs?: DatasourceGCSModel; + dynamodb?: DatasourceDynamoDBModel; forwardQuery?: string; tokens: DatasourceTokenModel[]; sharedWith: string[]; @@ -184,12 +191,22 @@ export interface GCSConnectionModel { serviceAccountCredentialsJson: string; } +export interface DynamoDBConnectionModel { + kind: "connection"; + name: string; + filePath: string; + connectionType: "dynamodb"; + region: string; + arn: string; +} + export type ParsedResource = | DatasourceModel | PipeModel | KafkaConnectionModel | S3ConnectionModel - | GCSConnectionModel; + | GCSConnectionModel + | DynamoDBConnectionModel; export interface MigrationResult { success: boolean; diff --git a/src/schema/connection.test.ts b/src/schema/connection.test.ts index ff064c7..2c0afe9 100644 --- a/src/schema/connection.test.ts +++ b/src/schema/connection.test.ts @@ -3,10 +3,12 @@ import { defineKafkaConnection, defineS3Connection, defineGCSConnection, + defineDynamoDBConnection, isConnectionDefinition, isKafkaConnectionDefinition, isS3ConnectionDefinition, isGCSConnectionDefinition, + isDynamoDBConnectionDefinition, getConnectionType, } from "./connection.js"; @@ -199,6 +201,48 @@ describe("Connection Schema", () => { }); }); + describe("defineDynamoDBConnection", () => { + it("creates a DynamoDB connection with required fields", () => { + const conn = defineDynamoDBConnection("my_dynamo", { + region: "us-east-1", + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }); + + expect(conn._name).toBe("my_dynamo"); + expect(conn._type).toBe("connection"); + expect(conn._connectionType).toBe("dynamodb"); + expect(conn.options.region).toBe("us-east-1"); + expect(conn.options.arn).toBe('{{ tb_secret("DYNAMODB_ROLE_ARN") }}'); + }); + + it("throws when region is missing", () => { + expect(() => + defineDynamoDBConnection("my_dynamo", { + region: " ", + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }) + ).toThrow("DynamoDB connection `region` is required."); + }); + + it("throws when arn is missing", () => { + expect(() => + defineDynamoDBConnection("my_dynamo", { + region: "us-east-1", + arn: "", + }) + ).toThrow("DynamoDB connection `arn` is required."); + }); + + it("throws for invalid connection name", () => { + expect(() => + defineDynamoDBConnection("123-invalid", { + region: "us-east-1", + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }) + ).toThrow("Invalid connection name"); + }); + }); + describe("isConnectionDefinition", () => { it("returns true for valid connection", () => { const conn = defineKafkaConnection("my_kafka", { @@ -264,6 +308,27 @@ describe("Connection Schema", () => { }); }); + describe("isDynamoDBConnectionDefinition", () => { + it("returns true for DynamoDB connection", () => { + const conn = defineDynamoDBConnection("my_dynamo", { + region: "us-east-1", + arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + }); + + expect(isDynamoDBConnectionDefinition(conn)).toBe(true); + }); + + it("returns false for non-DynamoDB objects", () => { + expect(isDynamoDBConnectionDefinition({})).toBe(false); + expect(isDynamoDBConnectionDefinition(null)).toBe(false); + expect( + isDynamoDBConnectionDefinition( + defineS3Connection("my_s3", { region: "us-east-1", arn: "arn:aws:iam::1:role/x" }) + ) + ).toBe(false); + }); + }); + describe("getConnectionType", () => { it("returns the connection type", () => { const conn = defineKafkaConnection("my_kafka", { diff --git a/src/schema/connection.ts b/src/schema/connection.ts index 305d6fe..e3dac3d 100644 --- a/src/schema/connection.ts +++ b/src/schema/connection.ts @@ -117,13 +117,39 @@ export interface GCSConnectionDefinition { readonly options: GCSConnectionOptions; } +/** + * Options for defining a DynamoDB connection + */ +export interface DynamoDBConnectionOptions { + /** DynamoDB table region (for example: us-east-1) */ + region: string; + /** IAM role ARN used by Tinybird to access the table - can use {{ tb_secret(...) }} */ + arn: string; +} + +/** + * DynamoDB-specific connection definition + */ +export interface DynamoDBConnectionDefinition { + readonly [CONNECTION_BRAND]: true; + /** Connection name */ + readonly _name: string; + /** Type marker for inference */ + readonly _type: "connection"; + /** Connection type */ + readonly _connectionType: "dynamodb"; + /** DynamoDB options */ + readonly options: DynamoDBConnectionOptions; +} + /** * A connection definition - union of all connection types */ export type ConnectionDefinition = | KafkaConnectionDefinition | S3ConnectionDefinition - | GCSConnectionDefinition; + | GCSConnectionDefinition + | DynamoDBConnectionDefinition; function validateConnectionName(name: string): void { if (!/^[a-zA-Z_][a-zA-Z0-9_]*$/.test(name)) { @@ -231,6 +257,46 @@ export function defineGCSConnection( }; } +/** + * Define a DynamoDB connection + * + * @param name - The connection name (must be valid identifier) + * @param options - DynamoDB connection configuration + * @returns A connection definition that can be used in a project + * + * @example + * ```ts + * import { defineDynamoDBConnection } from '@tinybirdco/sdk'; + * + * export const myDynamo = defineDynamoDBConnection('my_dynamo', { + * region: 'us-east-1', + * arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}', + * }); + * ``` + */ +export function defineDynamoDBConnection( + name: string, + options: DynamoDBConnectionOptions +): DynamoDBConnectionDefinition { + validateConnectionName(name); + + if (!options.region?.trim()) { + throw new Error("DynamoDB connection `region` is required."); + } + + if (!options.arn?.trim()) { + throw new Error("DynamoDB connection `arn` is required."); + } + + return { + [CONNECTION_BRAND]: true, + _name: name, + _type: "connection", + _connectionType: "dynamodb", + options, + }; +} + /** * Check if a value is a connection definition */ @@ -264,6 +330,15 @@ export function isGCSConnectionDefinition(value: unknown): value is GCSConnectio return isConnectionDefinition(value) && value._connectionType === "gcs"; } +/** + * Check if a value is a DynamoDB connection definition + */ +export function isDynamoDBConnectionDefinition( + value: unknown +): value is DynamoDBConnectionDefinition { + return isConnectionDefinition(value) && value._connectionType === "dynamodb"; +} + /** * Get the connection type from a connection definition */ diff --git a/src/schema/datasource.test.ts b/src/schema/datasource.test.ts index 21a1e31..78ebfbb 100644 --- a/src/schema/datasource.test.ts +++ b/src/schema/datasource.test.ts @@ -112,7 +112,7 @@ describe("Datasource Schema", () => { bucketUri: "s3://my-bucket/events/*.csv", }, }) - ).toThrow("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`."); + ).toThrow("Datasource can only define one ingestion option: `kafka`, `s3`, `gcs`, or `dynamodb`."); expect(() => defineDatasource("events_gcs", { @@ -126,7 +126,7 @@ describe("Datasource Schema", () => { bucketUri: "gs://my-bucket/events/*.csv", }, }) - ).toThrow("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`."); + ).toThrow("Datasource can only define one ingestion option: `kafka`, `s3`, `gcs`, or `dynamodb`."); }); it("accepts gcs ingestion configuration", () => { diff --git a/src/schema/datasource.ts b/src/schema/datasource.ts index d854bcd..c8d23f4 100644 --- a/src/schema/datasource.ts +++ b/src/schema/datasource.ts @@ -9,6 +9,7 @@ import type { KafkaConnectionDefinition, S3ConnectionDefinition, GCSConnectionDefinition, + DynamoDBConnectionDefinition, } from "./connection.js"; import type { TokenDefinition, DatasourceTokenScope } from "./token.js"; @@ -102,6 +103,18 @@ export interface GCSConfig { fromTimestamp?: string; } +/** + * DynamoDB import configuration for a datasource + */ +export interface DynamoDBConfig { + /** DynamoDB connection to use */ + connection: DynamoDBConnectionDefinition; + /** Source DynamoDB table ARN */ + tableArn: string; + /** S3 bucket Tinybird uses to stage the initial table export */ + exportBucket: string; +} + /** * Datasource index configuration. * Emits as: ` TYPE GRANULARITY ` @@ -152,6 +165,8 @@ export interface DatasourceOptions { s3?: S3Config; /** GCS ingestion configuration */ gcs?: GCSConfig; + /** DynamoDB ingestion configuration */ + dynamodb?: DynamoDBConfig; } /** @@ -209,9 +224,13 @@ export function defineDatasource( ); } - const ingestionConfigCount = [options.kafka, options.s3, options.gcs].filter(Boolean).length; + const ingestionConfigCount = [options.kafka, options.s3, options.gcs, options.dynamodb].filter( + Boolean + ).length; if (ingestionConfigCount > 1) { - throw new Error("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`."); + throw new Error( + "Datasource can only define one ingestion option: `kafka`, `s3`, `gcs`, or `dynamodb`." + ); } if (options.indexes) {