Skip to content

Commit 19347ed

Browse files
authored
Merge pull request aws-samples#1611 from raffaeu/raffaeu-feature-sqs-stepfunctions-message-aggregator
2 parents 249f3eb + fe75ece commit 19347ed

19 files changed

Lines changed: 506 additions & 0 deletions
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# SQS to Lambda and DynamoDB (Message Aggregator pattern)
2+
3+
This pattern create two SQS queue, one for receiving the splitted messages and one for the final aggregation and a lambda function and a DynamoDB Table. The pattern is an implementation of the Integration pattern: "Message Aggregator" available here: [https://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html)
4+
5+
Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/sqs-lambda-dynamodb-message-aggregator-cdk](https://serverlessland.com/patterns/sqs-lambda-dynamodb-message-aggregator-cdk)
6+
7+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
8+
9+
## Requirements
10+
11+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
12+
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
13+
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
14+
* [AWS CDK](https://docs.aws.amazon.com/cdk/v2/guide/getting_started.html) (AWS SAMCDK) installed
15+
16+
## Deployment Instructions
17+
18+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
19+
```bash
20+
$: git clone https://github.com/aws-samples/serverless-patterns
21+
```
22+
1. Change directory to the pattern directory:
23+
```bash
24+
$: cd sqs-lambda-dynamodb-message-aggregator-cdk/src
25+
```
26+
1. From the command line, use AWS SAM to deploy the AWS resources for the pattern as specified in the template.yml file:
27+
```bash
28+
$: cdk deploy
29+
```
30+
1. Note the outputs from the CDK deployment process. These contain the resource names and/or ARNs which are used for testing.
31+
32+
## How it works
33+
34+
The pattern is composed by an SQS Queue which is capable to receive portion of messages. Messages can be aggregated together by providing, for each message, two distinct attributes. `CorrelationId` will inform the AWS Lambda function how to aggregate multiple messages and `Total` will inform the AWS Lambda about the total amount of messages that should be expected for one single aggregation.
35+
36+
Messages are tracked inside a DynamoDB table. When all messages with the same `CorrelationId` are received, the message is aggregated, forwarded to a destination SQS Queue and the record is deleted from DynamoDB, to avoid unecessary costs.
37+
38+
## Testing
39+
40+
Inside the Folder `msg` there are 3 distinct messages and the relative attribute file. The script `test.sh` will send all three messages to the source SqS queue.
41+
42+
```bash
43+
$: ./msg/test.sh
44+
Provide the name of the SQS queue to send messages to
45+
https://sqs.us-east-1.amazonaws.com/[your account]/[your account]-src-queue
46+
{
47+
"MD5OfMessageBody": "8be1b3b225c0f5113fa4c37f421667d2",
48+
"MD5OfMessageAttributes": "a512f8c4c91e1bd27eaedaa9916243fe",
49+
"MessageId": "35527c0c-3dcc-4ec7-8afa-b997019d0549"
50+
}
51+
{
52+
"MD5OfMessageBody": "4018bae8eed190d3c907720c3f3cdf1c",
53+
"MD5OfMessageAttributes": "a512f8c4c91e1bd27eaedaa9916243fe",
54+
"MessageId": "dda99dbf-2f97-4e88-ab28-68011b707a9a"
55+
}
56+
{
57+
"MD5OfMessageBody": "a9d29a076761cb5a4a1f955814338fcb",
58+
"MD5OfMessageAttributes": "a512f8c4c91e1bd27eaedaa9916243fe",
59+
"MessageId": "33e07f8d-5c37-4ffa-aaef-b8f90b9b7646"
60+
}
61+
```
62+
63+
The script will wait 5 seconds then try to pull the destination queue:
64+
65+
```bash
66+
Provide the name of the SQS queue to receive the aggregated message
67+
https://sqs.us-east-1.amazonaws.com/[your account]/[your account]-dest-queue
68+
{
69+
"Messages": [
70+
{
71+
"MessageId": "52505cb6-7013-4f0f-a6f7-4d4f2f5e223d",
72+
"ReceiptHandle": "AQEB8E0k8R6Qn4BdU8fxWM93NTavPuy6+0lbtg7avweEoMWgETQar1puOAROTB5XZuiqSyaEi7pRH6qaBAgfEuzBLX9aC4gGxRPBXrYyu3byntEk2k6T68q2sLjrhlHpa5dbUuuJGk5ahluQkbMaLqtq3jMRHfVp8QLrzAGdBqZzIGLTJAI9kfD1H38anwnlWF1fSMsY/jC21spEeq8zM72w4Lc83qMynrsPJMCysrzoyG7/PVXYnf8OWHj+am9FPzGPSq0YE+1nW1ATJKEcug2DXJgVdbWpXe+uJ5osOHfEk+e/pTf0uHd2ZL1trr1ICwU4F4Af81tZH8GYP4S7aJ8EMLZycyZDxXBuoHIj2wh86E0gy4Ke0vygbskyYkLAZXSqmiwP87oCIYUcxuITgiA3Hg==",
73+
"MD5OfBody": "0871a008c15d4c8ccede318f8b942ee5",
74+
"Body": "{\"VAT\":\"7.7\",\"Street\":\"Louisiana Road 11\",\"Bank\":\"New York State Bank\",\"TotalAmount\":\"123.00 USD\",\"OrderId\":\"123ABC\",\"State\":\"NY\",\"Transaction\":\"123ABC\",\"Date\":\"2023-01-01\",\"Time\":\"08:00:00\"}"
75+
}
76+
]
77+
}
78+
```
79+
80+
## Cleanup
81+
82+
1. Delete the stack
83+
```bash
84+
$: cdk destroy
85+
```
86+
----
87+
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
88+
89+
SPDX-License-Identifier: MIT-0
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"title": "Message Aggregator with SQS, Lambda and DynamoDB",
3+
"description": "Creates a Lambda function that keep tracks of single messages using DynamoDB and forward a single aggregate message to a different SQS queue.",
4+
"language": "TypeScript",
5+
"level": "200",
6+
"framework": "CDK",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"This CDK project demonstrate how to create a Message Aggregator pattern using Amazon SQS, AWS Lambda and Amazon DynamoDB.",
11+
"The solution creates a source and a destination SQS Queue and a Lambda function that keeps track of the messages by leveraging a DynamoDB Table.",
12+
"This architecture is an implementation of the Integration pattern: Message Aggregator explained in the Integration Patterns book."
13+
]
14+
},
15+
"gitHub": {
16+
"template": {
17+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sqs-lambda-dynamodb-message-aggregator-cdk",
18+
"templateURL": "serverless-patterns/sqs-lambda-dynamodb-message-aggregator-cdk",
19+
"projectFolder": "sqs-lambda-dynamodb-message-aggregator-cdk",
20+
"templateFile": "sqs-lambda-dynamodb-message-aggregator-cdk/src/lib/src-stack.ts"
21+
}
22+
},
23+
"resources": {
24+
"bullets": [
25+
{
26+
"text": "Message Aggregator Pattern",
27+
"link": "https://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html"
28+
}
29+
]
30+
},
31+
"deploy": {
32+
"text": [
33+
"cdk deploy"
34+
]
35+
},
36+
"testing": {
37+
"text": [
38+
"See the GitHub repo for detailed testing instructions."
39+
]
40+
},
41+
"cleanup": {
42+
"text": [
43+
"Delete the stack: <code>cdk destroy</code>."
44+
]
45+
},
46+
"authors": [
47+
{
48+
"name": "Raffaele Garofalo (Raf)",
49+
"image": "https://avatars.githubusercontent.com/raffaeu",
50+
"bio": "Raffaele Garofalo (Raf) is a Senior Solutions Architect at AWS and member of the AWS Serverless TFC (Technical Field Community).",
51+
"linkedin": "raffaeu",
52+
"twitter": "raffaeu"
53+
}
54+
]
55+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
*.js
2+
!jest.config.js
3+
*.d.ts
4+
node_modules
5+
6+
# CDK asset staging directory
7+
.cdk.staging
8+
cdk.out
9+
!code/*.js
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*.ts
2+
!*.d.ts
3+
4+
# CDK asset staging directory
5+
.cdk.staging
6+
cdk.out
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/usr/bin/env node
2+
import 'source-map-support/register';
3+
import * as cdk from 'aws-cdk-lib';
4+
import { SrcStack } from '../lib/src-stack';
5+
6+
const app = new cdk.App();
7+
new SrcStack(app, 'SrcStack', {
8+
/* If you don't specify 'env', this stack will be environment-agnostic.
9+
* Account/Region-dependent features and context lookups will not work,
10+
* but a single synthesized template can be deployed anywhere. */
11+
12+
/* Uncomment the next line to specialize this stack for the AWS Account
13+
* and Region that are implied by the current CLI configuration. */
14+
// env: { account: process.env.CDK_DEFAULT_ACCOUNT, region: process.env.CDK_DEFAULT_REGION },
15+
16+
/* Uncomment the next line if you know exactly what Account and Region you
17+
* want to deploy the stack to. */
18+
// env: { account: '123456789012', region: 'us-east-1' },
19+
20+
/* For more information, see https://docs.aws.amazon.com/cdk/latest/guide/environments.html */
21+
});
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"app": "npx ts-node --prefer-ts-exts bin/src.ts",
3+
"watch": {
4+
"include": [
5+
"**"
6+
],
7+
"exclude": [
8+
"README.md",
9+
"cdk*.json",
10+
"**/*.d.ts",
11+
"**/*.js",
12+
"tsconfig.json",
13+
"package*.json",
14+
"yarn.lock",
15+
"node_modules",
16+
"test"
17+
]
18+
},
19+
"context": {
20+
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
21+
"@aws-cdk/core:checkSecretUsage": true,
22+
"@aws-cdk/core:target-partitions": [
23+
"aws",
24+
"aws-cn"
25+
],
26+
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
27+
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
28+
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
29+
"@aws-cdk/aws-iam:minimizePolicies": true,
30+
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
31+
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
32+
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
33+
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
34+
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
35+
"@aws-cdk/core:enablePartitionLiterals": true,
36+
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
37+
"@aws-cdk/aws-iam:standardizedServicePrincipals": true,
38+
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
39+
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
40+
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
41+
"@aws-cdk/aws-route53-patters:useCertificate": true,
42+
"@aws-cdk/customresources:installLatestAwsSdkDefault": false,
43+
"@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true,
44+
"@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true,
45+
"@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true,
46+
"@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true,
47+
"@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true,
48+
"@aws-cdk/aws-redshift:columnId": true,
49+
"@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true,
50+
"@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true,
51+
"@aws-cdk/aws-apigateway:requestValidatorUniqueId": true,
52+
"@aws-cdk/aws-kms:aliasNameRef": true,
53+
"@aws-cdk/core:includePrefixInUniqueNameGeneration": true
54+
}
55+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
const AWS = require('aws-sdk');
2+
3+
exports.handler = async (event, context, callback) => {
4+
5+
// retrieve dynamo name from environment variable
6+
const dynamoDB = new AWS.DynamoDB.DocumentClient();
7+
const queue = new AWS.SQS();
8+
9+
// loop through all sqs records
10+
for (const record of event.Records) {
11+
12+
// create dynamo record
13+
const correlationId = record.messageAttributes.CorrelationId.stringValue;
14+
const total = Number(record.messageAttributes.Total.stringValue);
15+
const body = JSON.parse(record.body);
16+
let item = {
17+
id: correlationId,
18+
body: body,
19+
count: 0
20+
};
21+
22+
// check if item exists in dynamo
23+
const dynamoRecord = await dynamoDB.get({
24+
TableName: process.env.DYNAMODB_TABLE_NAME,
25+
Key: {
26+
'id': correlationId
27+
}
28+
}).promise();
29+
// if item exists, update item
30+
if (dynamoRecord.Item) {
31+
item.body = Object.assign(dynamoRecord.Item.body, item.body);
32+
item.count = dynamoRecord.Item.count + 1;
33+
} else {
34+
// if item doesn't exist, create item
35+
item.body = item.body;
36+
item.count = 1;
37+
}
38+
// put item in dynamo
39+
const result = await dynamoDB.put({
40+
TableName: process.env.DYNAMODB_TABLE_NAME,
41+
Item: item,
42+
ReturnValues: 'ALL_OLD'
43+
}).promise();
44+
45+
// if item is last, trigger aggregation
46+
// and delete item from Dynamo
47+
if (item.count === total) {
48+
await queue.sendMessage({
49+
QueueUrl: process.env.DESTINATION_QUEUE_URL,
50+
MessageBody: JSON.stringify(item.body)
51+
}).promise();
52+
await dynamoDB.delete({
53+
TableName: process.env.DYNAMODB_TABLE_NAME,
54+
Key: {
55+
'id': correlationId
56+
}
57+
}).promise();
58+
}
59+
}
60+
61+
//complete
62+
callback(null, {
63+
statusCode: '200',
64+
body: JSON.stringify({ 'status': 'complete' })
65+
});
66+
};
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module.exports = {
2+
testEnvironment: 'node',
3+
roots: ['<rootDir>/test'],
4+
testMatch: ['**/*.test.ts'],
5+
transform: {
6+
'^.+\\.tsx?$': 'ts-jest'
7+
}
8+
};
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import * as cdk from 'aws-cdk-lib';
2+
import { Construct } from 'constructs';
3+
4+
export class SrcStack extends cdk.Stack {
5+
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
6+
super(scope, id, props);
7+
8+
// get account id
9+
const account = cdk.Stack.of(this).account;
10+
11+
// create an sqs queue starting with account id
12+
const srcSqs = new cdk.aws_sqs.Queue(this, 'SrcQueue', {
13+
queueName: `${account}-src-queue`,
14+
removalPolicy: cdk.RemovalPolicy.DESTROY,
15+
});
16+
17+
// create the destination queue
18+
const destSqs = new cdk.aws_sqs.Queue(this, 'DestQueue', {
19+
queueName: `${account}-dest-queue`,
20+
removalPolicy: cdk.RemovalPolicy.DESTROY,
21+
});
22+
23+
// create a dynamodb table to aggregate messages
24+
const srcTable = new cdk.aws_dynamodb.Table(this, 'SrcTable', {
25+
partitionKey: { name: 'id', type: cdk.aws_dynamodb.AttributeType.STRING },
26+
removalPolicy: cdk.RemovalPolicy.DESTROY,
27+
tableName: `${account}-events-table`
28+
});
29+
30+
// create a lambda using the file code/lambda.handler.js
31+
const srcLambda = new cdk.aws_lambda.Function(this, 'SrcLambda', {
32+
code: cdk.aws_lambda.Code.fromAsset('code'),
33+
handler: 'handler.handler',
34+
runtime: cdk.aws_lambda.Runtime.NODEJS_14_X,
35+
environment: {
36+
DYNAMODB_TABLE_NAME: srcTable.tableName,
37+
DESTINATION_QUEUE_URL: destSqs.queueUrl,
38+
},
39+
functionName: `${account}-aggregator-lambda`
40+
});
41+
42+
// grant permissions
43+
srcSqs.grantConsumeMessages(srcLambda);
44+
srcTable.grantReadWriteData(srcLambda);
45+
destSqs.grantSendMessages(srcLambda);
46+
47+
// register src queue as a trigger of the lambda
48+
srcLambda.addEventSource(new cdk.aws_lambda_event_sources.SqsEventSource(srcSqs, { batchSize: 10 }));
49+
}
50+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"CorrelationId": {
3+
"DataType": "String",
4+
"StringValue": "01"
5+
},
6+
"Total": {
7+
"DataType": "String",
8+
"StringValue": "3"
9+
}
10+
}

0 commit comments

Comments
 (0)