Skip to content

Commit 40ad58d

Browse files
authored
Merge pull request aws-samples#1562 from vaibhavjainv/msk-lambda-iam-node-sam
MSK - Lambda integration with Node JS and IAM Auth
2 parents e0b9342 + 4f95863 commit 40ad58d

8 files changed

Lines changed: 393 additions & 0 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
tests/*
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//Lambda Runtime delivers a batch of messages to the lambda function
2+
//Each batch of messages has two fields EventSource and EventSourceARN
3+
//Each batch of messages also has a field called Records
4+
//The Records is a map with multiple keys and values
5+
//Each key is a combination of the Topic Name and the Partition Number
6+
//One batch of messages can contain messages from multiple partitions
7+
8+
export const handler = async (event) => {
9+
// Iterate through keys
10+
for (let key in event.records) {
11+
console.log('Key: ', key)
12+
// Iterate through messages inside a key
13+
event.records[key].map((record) => {
14+
//printing the fields of each message
15+
console.log('Record: ', record)
16+
console.log('Topic: ', record.topic)
17+
console.log('Partition: ', record.partition)
18+
console.log('Offset: ', record.offset)
19+
console.log('Timestamp: ', record.timestamp)
20+
console.log('TimestampType: ', record.timestampType)
21+
//key and value are base64 encoded and need to be decoded
22+
if (null != record.key){
23+
const thisKey = Buffer.from(record.key, 'base64').toString()
24+
console.log('Key:', thisKey)
25+
} else {
26+
console.log('Key:', 'null')
27+
}
28+
const value = Buffer.from(record.value, 'base64').toString()
29+
console.log('Value:', value)
30+
})
31+
}
32+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "msk-event-source",
3+
"version": "1.0.0",
4+
"description": "MSK Event Source for Lambda in NodeJS",
5+
"main": "app.js",
6+
"repository": "https://github.com/awslabs/aws-sam-cli/tree/develop/samcli/local/init/templates/cookiecutter-aws-sam-hello-nodejs",
7+
"author": "SAM CLI",
8+
"license": "MIT",
9+
"dependencies": {
10+
"axios": ">=0.21.1"
11+
},
12+
"scripts": {
13+
"test": "mocha tests/unit/"
14+
},
15+
"devDependencies": {
16+
"chai": "^4.3.6",
17+
"mocha": "^10.1.0"
18+
}
19+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
'use strict';
2+
3+
import { lambdaHandler } from '../../app.mjs';
4+
import { expect } from 'chai';
5+
var event, context;
6+
7+
describe('Tests index', function () {
8+
it('verifies successful response', async () => {
9+
const result = await lambdaHandler(event, context)
10+
11+
expect(result).to.be.an('object');
12+
expect(result.statusCode).to.equal(200);
13+
expect(result.body).to.be.an('string');
14+
15+
let response = JSON.parse(result.body);
16+
17+
expect(response).to.be.an('object');
18+
expect(response.message).to.be.equal("hello world");
19+
});
20+
});

msk-lambda-iam-node-sam/README.md

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Node.js Lambda Kafka Consumer with IAM Auth, using SAM
2+
3+
This pattern is an example of a Lambda function that will consume messages from an Managed Streaming for Kafka(MSK) topic, where the MSK Cluster has been configured to use IAM Authentication. This pattern assumes you already have an MSK Cluster with a Topic configured, if you need a sample pattern to deploy an MSK Cluster either in Provisioned or Serverless modes please see the [msk-cfn-sasl-lambda pattern](https://serverlessland.com/patterns/msk-cfn-sasl-lambda)
4+
5+
This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders.
6+
7+
- HandlerKafka - Code for the application's Lambda function.
8+
- events - Invocation events that you can use to invoke the function.
9+
- template.yaml - A template that defines the application's AWS resources.
10+
11+
The application creates a Lambda function that will listen to Kafka messages on a topic linked to an MSK Cluster. These resources are defined in the `template.yaml` file in this project. You can update the template to add AWS resources through the same deployment process that updates your application code.
12+
13+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
14+
15+
## Requirements
16+
17+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
18+
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
19+
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
20+
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed
21+
* Create MSK cluster and topic that will be used for testing. It is important to create the topic before deploying the Lambda function, otherwise the event source mapping will stay disabled.
22+
23+
## Deploy the sample application
24+
25+
The Serverless Application Model Command Line Interface (SAM CLI) is an extension of the AWS CLI that adds functionality for building and testing Lambda applications. It uses Docker to run your functions in an Amazon Linux environment that matches Lambda. It can also emulate your application's build environment and API.
26+
27+
To use the SAM CLI, you need the following tools.
28+
29+
* SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html)
30+
* Node.js - [Install Node.js 18](https://nodejs.org/en/), including the NPM package management tool.
31+
* Docker - [Install Docker community edition](https://hub.docker.com/search/?type=edition&offering=community)
32+
33+
To build and deploy your application for the first time, run the following in your shell:
34+
35+
```bash
36+
sam build
37+
sam deploy --guided
38+
```
39+
40+
The first command will build the source of your application. The second command will package and deploy your application to AWS, with a series of prompts:
41+
42+
* **Stack Name**: The name of the stack to deploy to CloudFormation. This should be unique to your account and region, and a good starting point would be something matching your project name.
43+
* **AWS Region**: The AWS region you want to deploy your app to.
44+
* **Parameter MSKClusterName**: The name of the MSKCluster
45+
* **Parameter MSKClusterId**: The unique ID of the MSKCluster
46+
* **Parameter MSKTopic**: The Kafka topic on which the lambda function will listen on
47+
* **Confirm changes before deploy**: If set to yes, any change sets will be shown to you before execution for manual review. If set to no, the AWS SAM CLI will automatically deploy application changes.
48+
* **Allow SAM CLI IAM role creation**: Many AWS SAM templates, including this example, create AWS IAM roles required for the AWS Lambda function(s) included to access AWS services. By default, these are scoped down to minimum required permissions. To deploy an AWS CloudFormation stack which creates or modifies IAM roles, the `CAPABILITY_IAM` value for `capabilities` must be provided. If permission isn't provided through this prompt, to deploy this example you must explicitly pass `--capabilities CAPABILITY_IAM` to the `sam deploy` command.
49+
* **Disable rollback**: Defaults to No and it preserves the state of previously provisioned resources when an operation fails
50+
* **Save arguments to configuration file**: If set to yes, your choices will be saved to a configuration file inside the project, so that in the future you can just re-run `sam deploy` without parameters to deploy changes to your application.
51+
* **SAM configuration file [samconfig.toml]**: Name of the configuration file to store configuration information locally
52+
* **SAM configuration environment [default]**: Environment for storing deployment information locally
53+
54+
You should get a message "Successfully created/updated stack - <StackName> in <Region>" if all goes well
55+
56+
## Test the sample application
57+
58+
Once the lambda function is deployed, send some Kafka messages on the topic that the lambda function is listening on, on the MSK server.
59+
60+
Either send at least 10 messages or wait for 300 seconds (check the values of BatchSize: 10 and MaximumBatchingWindowInSeconds: 300 in the template.yaml file)
61+
62+
Then check Cloudwatch logs and you should see messages for the Cloudwatch Log Group with the name of the deployed Lambda function.
63+
64+
The lambda code parses the Kafka messages and outputs the fields in the Kafka messages to Cloudwatch logs
65+
66+
A single lambda function receives a batch of messages. The messages are received as a map with each key being a combination of the topic and the partition, as a single batch can receive messages from multiple partitions.
67+
68+
Each key has a list of messages. Each Kafka message has the following properties - Topic, Partition, Offset, TimeStamp, TimeStampType, Key and Value
69+
70+
The Key and Value are base64 encoded and have to be decoded. A message can also have a list of headers, each header having a key and a value.
71+
72+
The code in this example prints out the fields in the Kafka message and also decrypts the key and the value and logs them in Cloudwatch logs.
73+
74+
### Local development
75+
76+
**Invoking function locally using sam local**
77+
78+
```bash
79+
sam local invoke --event=events/event.json
80+
```
81+
82+
You should see a response similar to the below
83+
84+
START RequestId: 2d1041e7-fb49-4181-a8ac-15277f5d2b6c Version: $LATEST
85+
2023-03-31T22:29:21.659Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Key: myTopic-0
86+
2023-03-31T22:29:21.699Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO R} headers: []=', 'CREATE_TIME',
87+
2023-03-31T22:29:21.701Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Topic: myTopic
88+
2023-03-31T22:29:21.701Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Partition: 0
89+
2023-03-31T22:29:21.701Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Offset: 250
90+
2023-03-31T22:29:21.701Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Timestamp: 1678072110111
91+
2023-03-31T22:29:21.702Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO TimestampType: CREATE_TIME
92+
2023-03-31T22:29:21.702Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Key: null
93+
2023-03-31T22:29:21.705Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Value: f
94+
2023-03-31T22:29:21.710Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO R} headers: []=', 'CREATE_TIME',
95+
2023-03-31T22:29:21.712Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Topic: myTopic
96+
2023-03-31T22:29:21.713Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Partition: 0
97+
2023-03-31T22:29:21.719Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Offset: 251
98+
2023-03-31T22:29:21.725Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Timestamp: 1678072111086
99+
2023-03-31T22:29:21.725Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO TimestampType: CREATE_TIME
100+
2023-03-31T22:29:21.725Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Key: null
101+
2023-03-31T22:29:21.726Z 2d1041e7-fb49-4181-a8ac-15277f5d2b6c INFO Value: g
102+
END RequestId: 2d1041e7-fb49-4181-a8ac-15277f5d2b6c
103+
REPORT RequestId: 2d1041e7-fb49-4181-a8ac-15277f5d2b6c Init Duration: 11.37 msDuration: 2696.60 ms Billed Duration: 2697 ms Memory Size: 128 MB Max Memory Used: 128 MB
104+
105+
## Cleanup
106+
107+
1. Delete the stack
108+
```bash
109+
aws cloudformation delete-stack --stack-name STACK_NAME
110+
```
111+
1. Confirm the stack has been deleted
112+
```bash
113+
aws cloudformation list-stacks --query "StackSummaries[?contains(StackName,'STACK_NAME')].StackStatus"
114+
```
115+
116+
----
117+
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
118+
119+
SPDX-License-Identifier: MIT-0
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"records":{
3+
"myTopic-0":[
4+
{
5+
"topic":"myTopic",
6+
"partition":0,
7+
"offset":250,
8+
"timestamp":1678072110111,
9+
"timestampType":"CREATE_TIME",
10+
"value":"Zg==",
11+
"headers":[
12+
13+
]
14+
},
15+
{
16+
"topic":"myTopic",
17+
"partition":0,
18+
"offset":251,
19+
"timestamp":1678072111086,
20+
"timestampType":"CREATE_TIME",
21+
"value":"Zw==",
22+
"headers":[
23+
24+
]
25+
}
26+
]
27+
},
28+
"eventSource":"aws:kafka",
29+
"eventSourceArn":"arn:aws:kafka:us-west-2:123456789012:cluster/MSKWorkshopCluster/a93759a9-c9d0-4952-984c-492c6bfa2be8-13",
30+
"bootstrapServers":"b-2.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-3.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098,b-1.mskworkshopcluster.z9kc4f.c13.kafka.us-west-2.amazonaws.com:9098"
31+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
{
2+
"title": "Lambda function that is subscribed to an MSK Topic",
3+
"description": "Create a Lambda function that uses an Amazon MSK Topic as an event source with IAM Authentication.",
4+
"language": "Node.js",
5+
"level": "200",
6+
"framework": "SAM",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"This pattern provides a Lambda function along with an Event Source Mapping to a Kafka topic ",
11+
"It requires that you already have a Managed Streaming for Kafka(MSK) Cluster setup with a topic created. If you don't already have an MSK Cluster ",
12+
"You can use the example in this pattern https://serverlessland.com/patterns/msk-cfn-sasl-lambda (linked in the resources) to deploy a cluster.",
13+
"This pattern will work with either a provisioned or serverless MSK cluster as long as the cluster is configured to use IAM authentication. ",
14+
"For detailed deployment instructions instructions see the README "
15+
]
16+
},
17+
"gitHub": {
18+
"template": {
19+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/msk-lambda-nodejs-sam",
20+
"templateURL": "serverless-patterns/msk-lambda-nodejs-sam",
21+
"projectFolder": "msk-lambda-nodejs-sam",
22+
"templateFile": "msk-lambda-nodejs-sam/template.yml"
23+
}
24+
},
25+
"resources": {
26+
"bullets": [
27+
{
28+
"text": "MSK Cluster Pattern",
29+
"link": "https://serverlessland.com/patterns/msk-cfn-sasl-lambda"
30+
},
31+
{
32+
"text": "Using Lambda with Amazon MSK",
33+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html"
34+
},
35+
{
36+
"text": "CloudFormation Provisioned MSK cluster reference",
37+
"link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-cluster.html"
38+
},
39+
{
40+
"text": "CloudFormation serverless MSK cluster reference",
41+
"link": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-serverlesscluster.html"
42+
}
43+
]
44+
},
45+
"deploy": {
46+
"text": [
47+
"sam deploy --guided"
48+
]
49+
},
50+
"testing": {
51+
"text": [
52+
"See the Github repo for detailed testing instructions."
53+
]
54+
},
55+
"cleanup": {
56+
"text": [
57+
"Delete the Lambda Function: <code>sam delete</code>."
58+
]
59+
},
60+
"authors": [
61+
{
62+
"name": "Vaibhav Jain",
63+
"bio": "AWS - Sr. Application Architect",
64+
"image": "https://media.licdn.com/dms/image/C4E03AQEqzZWHGT4dBQ/profile-displayphoto-shrink_800_800/0/1580165399872?e=1687392000&v=beta&t=zdxENLnqCpqCz9i1Uf5Yx4YXlR9EYvgxP8N5UTsy6J8",
65+
"linkedin": "https://www.linkedin.com/in/vaibhavjainv/"
66+
},
67+
{
68+
"name": "Paveen Allam",
69+
"bio": "Senior Solutions Architect",
70+
"image": "https://www.fintail.me/images/pa.jpg",
71+
"linkedin": "https://www.linkedin.com/in/pallam/"
72+
},
73+
{
74+
"name": "Suraj Tripathi",
75+
"bio": "AWS - AppDev Cloud Consultant",
76+
"linkedin": "https://www.linkedin.com/in/suraj-tripathi-01b49a140/"
77+
},
78+
{
79+
"name": "Adam Wagner",
80+
"bio": "AWS - Principal Serverless Solutions Architect",
81+
"linkedin": "https://www.linkedin.com/in/adam-wagner-4bb412/"
82+
},
83+
{
84+
"name": "Indranil Banerjee",
85+
"bio": "AWS - Senior Solutions Architect",
86+
"linkedin": "https://www.linkedin.com/in/indranil-banerjee-b00a261/"
87+
}
88+
]
89+
}

0 commit comments

Comments
 (0)