Skip to content

Commit 8745b13

Browse files
committed
Publishing Durable Function ESM and Chaining Pattern
1 parent 015b84c commit 8745b13

9 files changed

Lines changed: 551 additions & 0 deletions

File tree

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
# Event-Driven Data Pipeline with Lambda Durable Functions
2+
3+
This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with **direct SQS Event Source Mapping** and Lambda invoke chaining.
4+
5+
## How It Works
6+
7+
This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows.
8+
9+
## Architecture Overview
10+
11+
The pattern showcases two key Durable Functions capabilities:
12+
1. **Direct Event Source Mapping**: SQS directly triggers the durable function (15-minute limit)
13+
2. **Lambda Invoke Chaining**: Orchestrates specialized processing functions
14+
15+
![Architecture Diagram](architecture-diagram.png)
16+
17+
## Key Features
18+
19+
- **Direct ESM Integration**: No intermediary function needed
20+
- **15-minute execution constraint**: Demonstrates ESM time limits
21+
- **Fault-tolerant processing**: Automatic checkpointing and recovery
22+
- **Microservices coordination**: Chains specialized Lambda functions
23+
- **Batch processing**: Handles multiple SQS records per invocation
24+
- **Simple storage**: Uses DynamoDB for processed data
25+
26+
## Important ESM Constraints
27+
28+
⚠️ **15-Minute Execution Limit**: When using Event Source Mapping with Durable Functions, the total execution time cannot exceed 15 minutes. This includes:
29+
- All processing steps
30+
- Function invocations
31+
- No long wait operations
32+
33+
## Use Cases
34+
35+
- ETL pipelines with validation and transformation
36+
- Event-driven microservices orchestration
37+
- Batch processing with fault tolerance
38+
- Data processing workflows requiring checkpointing
39+
40+
## Prerequisites
41+
42+
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) configured with appropriate permissions
43+
- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) latest version installed
44+
- [Python 3.14](https://www.python.org/downloads/release/python-3140/) runtime installed
45+
46+
## Deployment
47+
48+
1. **Build the application**:
49+
```bash
50+
sam build
51+
```
52+
53+
2. **Deploy to AWS**:
54+
```bash
55+
sam deploy --guided
56+
```
57+
58+
Note the outputs after deployment:
59+
- `DataProcessingQueueUrl`: Use this for `<QUEUE_URL>`
60+
- `ProcessedDataTable`: Use this for `<PROCESSED_DATA_TABLE>`
61+
62+
3. **Test the pipeline**:
63+
```bash
64+
# Send a test message to SQS
65+
aws sqs send-message \
66+
--queue-url <QUEUE_URL> \
67+
--message-body '{"data_source": "test.csv", "processing_type": "standard"}'
68+
```
69+
70+
4. **Verify successful processing**:
71+
```bash
72+
# Check if data was processed and stored in DynamoDB
73+
aws dynamodb scan --table-name <PROCESSED_DATA_TABLE> --query 'Items[*]'
74+
```
75+
76+
**Success indicators:**
77+
- You should see at least one item in the DynamoDB table
78+
- Original input data: `"data_source": "test.csv"`
79+
- Transformed data: `"data_source": "TEST.CSV"` (uppercase transformation applied)
80+
- Execution tracking with unique `execution_id`
81+
- Timestamps showing when data was processed and stored
82+
83+
This confirms the entire pipeline worked: SQS → Durable Function → Validation → Transformation → Storage → DynamoDB
84+
85+
## Components
86+
87+
### 1. Durable Pipeline Function (`src/durable_pipeline/`)
88+
- **Direct SQS Event Source Mapping**: Receives SQS events directly
89+
- **15-minute execution limit**: Must complete all processing within ESM constraints
90+
- **Batch processing**: Handles multiple SQS records per invocation
91+
- **Lambda invoke chaining**: Orchestrates validation, transformation, and storage
92+
- **Automatic checkpointing**: Recovers from failures without losing progress
93+
94+
### 2. Specialized Processing Functions
95+
- **Validation Function**: Simple data validation checks
96+
- **Transformation Function**: Basic data transformation
97+
- **Storage Function**: Persists processed data to DynamoDB
98+
99+
## Monitoring
100+
101+
- CloudWatch Logs for execution tracking
102+
- DynamoDB table for processed data
103+
- SQS DLQ for failed messages
104+
105+
## Configuration
106+
107+
Key environment variables:
108+
- `ENVIRONMENT`: Deployment environment (dev/prod)
109+
- `PROCESSED_DATA_TABLE`: DynamoDB table for processed data
110+
- `VALIDATION_FUNCTION_ARN`: ARN of validation function
111+
- `TRANSFORMATION_FUNCTION_ARN`: ARN of transformation function
112+
- `STORAGE_FUNCTION_ARN`: ARN of storage function
113+
114+
## ESM-Specific Considerations
115+
116+
- **Execution Timeout**: Set to 900 seconds (15 minutes) maximum
117+
- **Batch Size**: Configured for optimal processing (5 records)
118+
- **Error Handling**: Uses SQS DLQ for failed batches
119+
- **Efficient Processing**: Optimized for speed to stay within time limits
120+
121+
## Error Handling
122+
123+
- Automatic retries with exponential backoff
124+
- Dead Letter Queue for failed messages
125+
- Partial batch failure support
126+
- Checkpoint-based recovery
127+
128+
## Cost Optimization
129+
130+
- Pay only for active compute time
131+
- Efficient batch processing
132+
- Automatic scaling based on queue depth
133+
134+
## Cleanup
135+
136+
```bash
137+
sam delete
138+
```
139+
140+
## Learn More
141+
142+
- [AWS Lambda Durable Functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html)
143+
- [Event Source Mappings with Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html)
144+
- [Lambda Invoke Chaining](https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations)
44.7 KB
Loading
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
{
2+
"title": "Event-Driven Data Pipeline with Lambda Durable Functions",
3+
"description": "This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping and Lambda invoke chaining.",
4+
"language": "Python",
5+
"level": "200",
6+
"framework": "SAM",
7+
"services": ["sqs","lambda", "dynamoDB"],
8+
"introBox": {
9+
"headline": "How it works",
10+
"text": [
11+
"This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows."
12+
]
13+
},
14+
"testing": {
15+
"headline": "Testing",
16+
"text": [
17+
"See the GitHub repo for detailed testing instructions."
18+
]
19+
},
20+
"cleanup": {
21+
"headline": "Cleanup",
22+
"text": [
23+
"Delete the stack: <code>sam delete</code>."
24+
]
25+
},
26+
"deploy": {
27+
"text": [
28+
"sam build",
29+
"sam deploy --guided"
30+
]
31+
},
32+
"gitHub": {
33+
"template": {
34+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-esm-and-chaining",
35+
"templateURL":"serverles-patterns/lambda-durable-esm-and-chaining",
36+
"templateFile": "template.yaml",
37+
"projectFolder": "lambda-durable-esm-and-chaining"
38+
}
39+
},
40+
"resources": {
41+
"headline": "Additional resources",
42+
"bullets": [
43+
{
44+
"text": "AWS Lambda Durable Functions Documentation",
45+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html"
46+
},
47+
{
48+
"text": "Event Source Mappings with Durable Functions",
49+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html"
50+
},
51+
{
52+
"text": "Durbale Function Lambda Invoke Chaining",
53+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations"
54+
}
55+
]
56+
},
57+
"authors": [
58+
{
59+
"name": "Sahithi Ginjupalli",
60+
"image": "https://drive.google.com/file/d/1YcKYuGz3LfzSxiwb2lWJfpyi49SbvOSr/view?usp=sharing",
61+
"bio": "Cloud Engineer at AWS with a passion for diving deep into cloud and AI services to build innovative serverless applications.",
62+
"linkedin": "ginjupalli-sahithi-37460a18b",
63+
"twitter": ""
64+
}
65+
]
66+
}
67+
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import json
2+
import os
3+
import boto3
4+
from datetime import datetime
5+
from typing import Dict, Any, List
6+
from aws_durable_execution_sdk_python import DurableContext, durable_execution
7+
8+
# Initialize AWS clients
9+
dynamodb = boto3.resource('dynamodb')
10+
lambda_client = boto3.client('lambda')
11+
12+
@durable_execution
13+
def lambda_handler(event: Dict[str, Any], context: DurableContext) -> Dict[str, Any]:
14+
"""
15+
Main durable pipeline function that processes SQS events directly via ESM.
16+
Demonstrates lambda invoke chaining with checkpointing and recovery.
17+
Limited to 15 minutes total execution time due to ESM constraints.
18+
"""
19+
20+
# Extract configuration from environment
21+
validation_function_arn = os.environ['VALIDATION_FUNCTION_ARN']
22+
transformation_function_arn = os.environ['TRANSFORMATION_FUNCTION_ARN']
23+
storage_function_arn = os.environ['STORAGE_FUNCTION_ARN']
24+
processed_data_table = os.environ['PROCESSED_DATA_TABLE']
25+
environment = os.environ.get('ENVIRONMENT', 'dev')
26+
27+
print(f"Processing SQS batch with {len(event.get('Records', []))} records")
28+
29+
# Process each SQS record in the batch
30+
batch_results = []
31+
32+
for record in event.get('Records', []):
33+
try:
34+
# Extract data from SQS record
35+
message_id = record['messageId']
36+
data = json.loads(record['body'])
37+
execution_name = f"{environment}-esm-{message_id}"
38+
39+
print(f"Processing record: {message_id}")
40+
41+
# Step 1: Validate data by invoking validation function
42+
validation_result = context.invoke(
43+
validation_function_arn,
44+
{'data': data, 'execution_id': execution_name},
45+
name=f'validate-data-{message_id}'
46+
)
47+
48+
if not validation_result.get('is_valid', False):
49+
batch_results.append({
50+
'message_id': message_id,
51+
'status': 'failed',
52+
'reason': 'validation_failed'
53+
})
54+
continue
55+
56+
# Step 2: Transform data by invoking transformation function
57+
transformation_result = context.invoke(
58+
transformation_function_arn,
59+
{'data': data, 'execution_id': execution_name},
60+
name=f'transform-data-{message_id}'
61+
)
62+
63+
# Step 3: Store processed data by invoking storage function
64+
storage_result = context.invoke(
65+
storage_function_arn,
66+
{
67+
'transformed_data': transformation_result,
68+
'execution_id': execution_name,
69+
'original_data': data
70+
},
71+
name=f'store-data-{message_id}'
72+
)
73+
74+
batch_results.append({
75+
'message_id': message_id,
76+
'status': 'completed',
77+
'execution_id': execution_name
78+
})
79+
80+
except Exception as e:
81+
print(f"Error processing record {record.get('messageId', 'unknown')}: {str(e)}")
82+
batch_results.append({
83+
'message_id': record.get('messageId', 'unknown'),
84+
'status': 'error',
85+
'error': str(e)
86+
})
87+
88+
# Return batch processing summary
89+
successful_records = len([r for r in batch_results if r['status'] == 'completed'])
90+
failed_records = len([r for r in batch_results if r['status'] in ['failed', 'error']])
91+
92+
return {
93+
'batch_summary': {
94+
'total_records': len(batch_results),
95+
'successful_records': successful_records,
96+
'failed_records': failed_records
97+
},
98+
'record_results': batch_results,
99+
'processed_at': datetime.utcnow().isoformat()
100+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
aws-durable-execution-sdk-python
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import boto3
2+
import os
3+
from typing import Dict, Any
4+
from datetime import datetime
5+
6+
dynamodb = boto3.resource('dynamodb')
7+
8+
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
9+
"""Simple data storage function that saves to DynamoDB"""
10+
11+
transformed_data = event['transformed_data']
12+
execution_id = event['execution_id']
13+
original_data = event['original_data']
14+
15+
table_name = os.environ['PROCESSED_DATA_TABLE']
16+
17+
print(f"Storing processed data for execution: {execution_id}")
18+
19+
try:
20+
table = dynamodb.Table(table_name)
21+
22+
# Store processed data in DynamoDB
23+
item = {
24+
'execution_id': execution_id,
25+
'original_data': original_data,
26+
'transformed_data': transformed_data,
27+
'stored_at': datetime.utcnow().isoformat(),
28+
'data_source': original_data.get('data_source', 'unknown'),
29+
'processing_type': original_data.get('processing_type', 'standard')
30+
}
31+
32+
table.put_item(Item=item)
33+
34+
return {
35+
'success': True,
36+
'execution_id': execution_id,
37+
'table_name': table_name,
38+
'stored_at': datetime.utcnow().isoformat()
39+
}
40+
41+
except Exception as e:
42+
print(f"Error storing data: {str(e)}")
43+
return {
44+
'success': False,
45+
'execution_id': execution_id,
46+
'error': str(e)
47+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from typing import Dict, Any
2+
from datetime import datetime
3+
4+
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
5+
"""Simple data transformation function"""
6+
7+
data = event['data']
8+
execution_id = event['execution_id']
9+
10+
print(f"Transforming data for execution: {execution_id}")
11+
12+
# Simple transformation: add processing metadata and uppercase data_source
13+
transformed_data = {
14+
'original_data': data,
15+
'data_source': data.get('data_source', '').upper(),
16+
'processing_type': data.get('processing_type', 'standard'),
17+
'processed_at': datetime.utcnow().isoformat(),
18+
'execution_id': execution_id,
19+
'transformation_applied': 'uppercase_data_source'
20+
}
21+
22+
return transformed_data

0 commit comments

Comments
 (0)