Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ In local mode:
### Connections

```typescript
import { defineKafkaConnection, defineS3Connection, defineGCSConnection, secret } from "@tinybirdco/sdk";
import { defineKafkaConnection, defineS3Connection, defineGCSConnection, defineDynamoDBConnection, secret } from "@tinybirdco/sdk";

export const eventsKafka = defineKafkaConnection("events_kafka", {
bootstrapServers: "kafka.example.com:9092",
Expand All @@ -575,13 +575,18 @@ export const landingS3 = defineS3Connection("landing_s3", {
export const landingGCS = defineGCSConnection("landing_gcs", {
serviceAccountCredentialsJson: secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
});

export const eventsDynamo = defineDynamoDBConnection("events_dynamo", {
region: "us-east-1",
arn: secret("DYNAMODB_ROLE_ARN"),
});
```

Use connections from datasources:

```typescript
import { defineDatasource, t, engine } from "@tinybirdco/sdk";
import { eventsKafka, landingS3, landingGCS } from "./connections";
import { eventsKafka, landingS3, landingGCS, eventsDynamo } from "./connections";

export const kafkaEvents = defineDatasource("kafka_events", {
schema: {
Expand Down Expand Up @@ -622,6 +627,26 @@ export const gcsLanding = defineDatasource("gcs_landing", {
schedule: "@auto",
},
});

export const dynamoEvents = defineDatasource("dynamo_events", {
schema: {
id: t.string().jsonPath("$.Id"),
_record: t.string().jsonPath("$.NewImage"),
_timestamp: t.dateTime64(3).jsonPath("$.ApproximateCreationDateTime"),
_event_name: t.string().lowCardinality().jsonPath("$.eventName"),
_is_deleted: t.uint8().jsonPath("$._is_deleted"),
},
engine: engine.replacingMergeTree({
sortingKey: ["id"],
ver: "_timestamp",
isDeleted: "_is_deleted",
}),
dynamodb: {
connection: eventsDynamo,
tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/events",
exportBucket: "s3://my-export-bucket",
},
});
```

### Datasources
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tinybirdco/sdk",
"version": "0.0.77",
"version": "0.0.78",
"description": "TypeScript SDK for Tinybird Forward - define datasources and pipes as TypeScript",
"type": "module",
"main": "./dist/index.js",
Expand Down
95 changes: 95 additions & 0 deletions src/cli/commands/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,101 @@ IMPORT_SCHEDULE '@on-demand'
expect(output).toContain('schedule: "@on-demand"');
});

it("migrates dynamodb connection and import datasource directives", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-"));
tempDirs.push(tempDir);

writeFile(
tempDir,
"dynamo_sample.connection",
`TYPE dynamodb
DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }}
DYNAMODB_REGION us-east-1
`
);

writeFile(
tempDir,
"events_dynamo.datasource",
`SCHEMA >
id String \`json:$.Id\`,
_record String \`json:$.NewImage\`

ENGINE "ReplacingMergeTree"
ENGINE_SORTING_KEY "id"
IMPORT_CONNECTION_NAME dynamo_sample
IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events
IMPORT_EXPORT_BUCKET s3://my-export-bucket
`
);

const result = await runMigrate({
cwd: tempDir,
patterns: ["."],
strict: true,
});

expect(result.success).toBe(true);
expect(result.errors).toHaveLength(0);
expect(result.migrated.filter((resource) => resource.kind === "connection")).toHaveLength(1);
expect(result.migrated.filter((resource) => resource.kind === "datasource")).toHaveLength(1);

const output = fs.readFileSync(result.outputPath, "utf-8");
expect(output).toContain("defineDynamoDBConnection");
expect(output).toContain('export const dynamoSample = defineDynamoDBConnection("dynamo_sample", {');
expect(output).toContain('region: "us-east-1"');
expect(output).toContain('arn: secret("DYNAMODB_ROLE_ARN")');
expect(output).toContain("dynamodb: {");
expect(output).toContain("connection: dynamoSample");
expect(output).toContain(
'tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/events"'
);
expect(output).toContain('exportBucket: "s3://my-export-bucket"');
});

it("reports an error when DynamoDB directives use a non-dynamodb connection type", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-"));
tempDirs.push(tempDir);

writeFile(
tempDir,
"s3sample.connection",
`TYPE s3
S3_REGION us-east-1
S3_ARN "arn:aws:iam::123456789012:role/tinybird-s3-access"
`
);

writeFile(
tempDir,
"events_dynamo.datasource",
`SCHEMA >
id String

ENGINE "ReplacingMergeTree"
ENGINE_SORTING_KEY "id"
IMPORT_CONNECTION_NAME s3sample
IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events
IMPORT_EXPORT_BUCKET s3://my-export-bucket
`
);

const result = await runMigrate({
cwd: tempDir,
patterns: ["."],
strict: true,
});

expect(result.success).toBe(false);
expect(
result.errors.some((error) =>
error.message.includes(
'Datasource DynamoDB ingestion requires a dynamodb connection, found "s3".'
)
)
).toBe(true);
});

it("reports an error when import directives use a non-bucket connection type", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "tinybird-migrate-"));
tempDirs.push(tempDir);
Expand Down
18 changes: 17 additions & 1 deletion src/cli/commands/migrate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ export async function runMigrate(
const referencedConnectionName =
datasource.kafka?.connectionName ??
datasource.s3?.connectionName ??
datasource.gcs?.connectionName;
datasource.gcs?.connectionName ??
datasource.dynamodb?.connectionName;

if (
referencedConnectionName &&
Expand Down Expand Up @@ -186,6 +187,21 @@ export async function runMigrate(
}
}

if (datasource.dynamodb) {
const dynamodbConnectionType = parsedConnectionTypeByName.get(
datasource.dynamodb.connectionName
);
if (dynamodbConnectionType !== "dynamodb") {
errors.push({
filePath: datasource.filePath,
resourceName: datasource.name,
resourceKind: datasource.kind,
message: `Datasource DynamoDB ingestion requires a dynamodb connection, found "${dynamodbConnectionType ?? "(none)"}".`,
});
continue;
}
}

try {
validateResourceForEmission(datasource);
migrated.push(datasource);
Expand Down
17 changes: 17 additions & 0 deletions src/generator/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
defineKafkaConnection,
defineS3Connection,
defineGCSConnection,
defineDynamoDBConnection,
} from "../schema/connection.js";

describe("Connection Generator", () => {
Expand Down Expand Up @@ -218,6 +219,22 @@ describe("Connection Generator", () => {
'GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON {{ tb_secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON") }}'
);
});

it("generates DynamoDB connection with arn and region", () => {
const conn = defineDynamoDBConnection("my_dynamo", {
region: "us-east-1",
arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}',
});

const result = generateConnection(conn);

expect(result.name).toBe("my_dynamo");
expect(result.content).toBe(
['TYPE dynamodb', 'DYNAMODB_ARN {{ tb_secret("DYNAMODB_ROLE_ARN") }}', "DYNAMODB_REGION us-east-1"].join(
"\n"
)
);
});
});

describe("generateAllConnections", () => {
Expand Down
18 changes: 18 additions & 0 deletions src/generator/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import type {
ConnectionDefinition,
KafkaConnectionDefinition,
GCSConnectionDefinition,
DynamoDBConnectionDefinition,
} from "../schema/connection.js";
import {
isS3ConnectionDefinition,
isGCSConnectionDefinition,
isDynamoDBConnectionDefinition,
type S3ConnectionDefinition,
} from "../schema/connection.js";

Expand Down Expand Up @@ -123,6 +125,20 @@ function generateGCSConnection(connection: GCSConnectionDefinition): string {
return parts.join("\n");
}

/**
* Generate a DynamoDB connection content
*/
function generateDynamoDBConnection(connection: DynamoDBConnectionDefinition): string {
const parts: string[] = [];
const options = connection.options;

parts.push("TYPE dynamodb");
parts.push(`DYNAMODB_ARN ${options.arn}`);
parts.push(`DYNAMODB_REGION ${options.region}`);

return parts.join("\n");
}

/**
* Generate a .connection file content from a ConnectionDefinition
*
Expand Down Expand Up @@ -160,6 +176,8 @@ export function generateConnection(
content = generateS3Connection(connection);
} else if (isGCSConnectionDefinition(connection)) {
content = generateGCSConnection(connection);
} else if (isDynamoDBConnectionDefinition(connection)) {
content = generateDynamoDBConnection(connection);
} else {
throw new Error("Unsupported connection type.");
}
Expand Down
55 changes: 54 additions & 1 deletion src/generator/datasource.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it, expect } from 'vitest';
import { generateDatasource, generateAllDatasources } from './datasource.js';
import { defineDatasource } from '../schema/datasource.js';
import { defineKafkaConnection, defineS3Connection, defineGCSConnection } from '../schema/connection.js';
import { defineKafkaConnection, defineS3Connection, defineGCSConnection, defineDynamoDBConnection } from '../schema/connection.js';
import { defineToken } from '../schema/token.js';
import { t } from '../schema/types.js';
import { engine } from '../schema/engines.js';
Expand Down Expand Up @@ -629,6 +629,59 @@ describe('Datasource Generator', () => {
});
});

describe('DynamoDB configuration', () => {
it('includes DynamoDB import directives', () => {
const dynamoConn = defineDynamoDBConnection('my_dynamo', {
region: 'us-east-1',
arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}',
});

const ds = defineDatasource('dynamo_events', {
schema: {
id: t.string(),
_record: t.string(),
},
engine: engine.replacingMergeTree({ sortingKey: ['id'] }),
dynamodb: {
connection: dynamoConn,
tableArn: 'arn:aws:dynamodb:us-east-1:123456789012:table/events',
exportBucket: 's3://my-export-bucket',
},
});

const result = generateDatasource(ds);

expect(result.content).toContain('IMPORT_CONNECTION_NAME my_dynamo');
expect(result.content).toContain(
'IMPORT_TABLE_ARN arn:aws:dynamodb:us-east-1:123456789012:table/events'
);
expect(result.content).toContain('IMPORT_EXPORT_BUCKET s3://my-export-bucket');
});

it('rejects mixing DynamoDB with other ingestion options', () => {
const dynamoConn = defineDynamoDBConnection('my_dynamo', {
region: 'us-east-1',
arn: '{{ tb_secret("DYNAMODB_ROLE_ARN") }}',
});
const s3Conn = defineS3Connection('my_s3', {
region: 'us-east-1',
arn: 'arn:aws:iam::123456789012:role/x',
});

expect(() =>
defineDatasource('mixed', {
schema: { id: t.string() },
dynamodb: {
connection: dynamoConn,
tableArn: 'arn:aws:dynamodb:us-east-1:123456789012:table/events',
exportBucket: 's3://my-export-bucket',
},
s3: { connection: s3Conn, bucketUri: 's3://b/*.csv' },
})
).toThrow('only define one ingestion option');
});
});

describe('Token generation', () => {
it('generates TOKEN lines with inline config', () => {
const ds = defineDatasource('test_ds', {
Expand Down
25 changes: 24 additions & 1 deletion src/generator/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
KafkaConfig,
S3Config,
GCSConfig,
DynamoDBConfig,
TokenConfig,
DatasourceIndex,
} from "../schema/datasource.js";
Expand Down Expand Up @@ -199,6 +200,19 @@ function generateImportConfig(importConfig: S3Config | GCSConfig): string {
return parts.join("\n");
}

/**
* Generate DynamoDB import configuration lines
*/
function generateDynamoDBConfig(dynamodb: DynamoDBConfig): string {
const parts: string[] = [];

parts.push(`IMPORT_CONNECTION_NAME ${dynamodb.connection._name}`);
parts.push(`IMPORT_TABLE_ARN ${dynamodb.tableArn}`);
parts.push(`IMPORT_EXPORT_BUCKET ${dynamodb.exportBucket}`);

return parts.join("\n");
}

/**
* Generate forward query section
*/
Expand Down Expand Up @@ -318,9 +332,12 @@ export function generateDatasource(
datasource.options.kafka,
datasource.options.s3,
datasource.options.gcs,
datasource.options.dynamodb,
].filter(Boolean).length;
if (ingestionConfigCount > 1) {
throw new Error("Datasource can only define one ingestion option: `kafka`, `s3`, or `gcs`.");
throw new Error(
"Datasource can only define one ingestion option: `kafka`, `s3`, `gcs`, or `dynamodb`."
);
}

// Add description if present
Expand Down Expand Up @@ -369,6 +386,12 @@ export function generateDatasource(
parts.push(generateImportConfig(datasource.options.gcs));
}

// Add DynamoDB configuration if present
if (datasource.options.dynamodb) {
parts.push("");
parts.push(generateDynamoDBConfig(datasource.options.dynamodb));
}

// Add forward query if present
const forwardQuery = generateForwardQuery(datasource.options.forwardQuery);
if (forwardQuery) {
Expand Down
Loading
Loading