From 99495a15d055857f9c99f38beb024c65aa66faab Mon Sep 17 00:00:00 2001 From: "Kevin.Du" Date: Tue, 20 Sep 2022 16:44:23 +1000 Subject: [PATCH] added codepipeline folder --- ml_ops/continuous_deployment/README.md | 36 + .../forecast_deploy_cfn.yaml | 380 +++ ml_ops/continuous_deployment/seed/.gitignore | 0 ml_ops/continuous_deployment/seed/README.md | 37 + .../continuous_deployment/seed/buildspec.yml | 27 + .../seed/forecast-mlops-dependency.yml | 681 +++++ .../seed/forecast-mlops-solution-guidance.yml | 2202 +++++++++++++++++ .../seed/prod-config.json | 24 + .../seed/prod-dep-config.json | 6 + .../seed/staging-config.json | 24 + .../seed/staging-dep-config.json | 6 + 11 files changed, 3423 insertions(+) create mode 100644 ml_ops/continuous_deployment/README.md create mode 100644 ml_ops/continuous_deployment/forecast_deploy_cfn.yaml create mode 100644 ml_ops/continuous_deployment/seed/.gitignore create mode 100644 ml_ops/continuous_deployment/seed/README.md create mode 100644 ml_ops/continuous_deployment/seed/buildspec.yml create mode 100644 ml_ops/continuous_deployment/seed/forecast-mlops-dependency.yml create mode 100644 ml_ops/continuous_deployment/seed/forecast-mlops-solution-guidance.yml create mode 100644 ml_ops/continuous_deployment/seed/prod-config.json create mode 100644 ml_ops/continuous_deployment/seed/prod-dep-config.json create mode 100644 ml_ops/continuous_deployment/seed/staging-config.json create mode 100644 ml_ops/continuous_deployment/seed/staging-dep-config.json diff --git a/ml_ops/continuous_deployment/README.md b/ml_ops/continuous_deployment/README.md new file mode 100644 index 0000000..2f3fb41 --- /dev/null +++ b/ml_ops/continuous_deployment/README.md @@ -0,0 +1,36 @@ +## Amazon Forecast End-to-End Deployments CD pipeline + +This solution is to demo how to orchestrate the the continuous deployment (CD) of Amazon Forecast solutions using CodeCommit, CodePipeline, CodeBuild and CloudFormation. You will use the CloudFormation templates created in [Amazon Forecast End-to-End Deployments Made Simple](https://github.com/aws-samples/amazon-forecast-samples/tree/main/ml_ops) to deploy Forecast MLOps workflows. + + +To begin with, you need to create a zip file which contains seed code and upload to Amazon S3. +```bash +LOCAL_PATH=forecast-mlops-workflow.zip +S3_BUCKET= + +(cd seed && zip -r ../${LOCAL_PATH} * ) +aws s3 cp ${LOCAL_PATH} s3://${S3_BUCKET}/cfn/forecast-mlops-workflow.zip +``` + +Then you can run the following bash code to deploy the solution. +```bash +REGION= +STACK_NAME=mlops-forecast-infra +CFN_FILE=forecast_deploy_cfn.yaml +SeedCodeS3Bucket=${S3_BUCKET} +SeedCodeS3Key=cfn/forecast-mlops-workflow.zip + +aws cloudformation deploy --region ${REGION} \ + --stack-name ${STACK_NAME} \ + --template-file ${CFN_FILE} \ + --capabilities CAPABILITY_IAM CAPABILITY_NAMED_IAM \ + --parameter-overrides SeedCodeS3Bucket=${SeedCodeS3Bucket} \ + SeedCodeS3Key=${SeedCodeS3Key} +``` + +## Move to Production + +1. You need to revisit and refine the roles and permissions created in CloudFormation templates to align with your security policies. +2. It's recommended to refine data ingestion Amazon S3 structures so that you are able to ingest data in incremental fashion. +3. You should consider enabling [Amazon Forecast predictor monitoring](https://aws.amazon.com/blogs/machine-learning/continuously-monitor-predictor-accuracy-with-amazon-forecast/) in your Prod deployment, so you will have more information to determine and improve retraining strategy. +4. You can refer to the workshop [Building a Cross-account CI/CD Pipeline](https://catalog.us-east-1.prod.workshops.aws/workshops/00bc829e-fd7c-4204-9da1-faea3cf8bd88/en-US/) for multi-accounts deployment. \ No newline at end of file diff --git a/ml_ops/continuous_deployment/forecast_deploy_cfn.yaml b/ml_ops/continuous_deployment/forecast_deploy_cfn.yaml new file mode 100644 index 0000000..d8c6d3e --- /dev/null +++ b/ml_ops/continuous_deployment/forecast_deploy_cfn.yaml @@ -0,0 +1,380 @@ +Description: |- + Toolchain template which provides the resources needed to represent infrastructure as code. This template specifically creates a CI/CD pipeline to deploy Amazon Forecast MLOps workflow to two stages in CD -- staging and production. +Parameters: + SeedCodeS3Bucket: + Type: String + AllowedPattern: ^[a-zA-Z](-*[a-zA-Z0-9])* + Description: Seed Code S3 Bucket + MaxLength: 32 + MinLength: 1 + SeedCodeS3Key: + Type: String + Description: Seed Code S3 Key + MaxLength: 64 + MinLength: 1 +Resources: + MlOpsArtifactsBucket: + Type: AWS::S3::Bucket + Properties: + BucketName: + Fn::Sub: forecast-project-${AWS::StackName}-deploy + DeletionPolicy: Retain + ModelDeployCodeCommitRepository: + Type: AWS::CodeCommit::Repository + Properties: + RepositoryName: + Fn::Sub: forecast-${AWS::StackName}-modeldeploy + Code: + BranchName: main + S3: + Bucket: !Ref SeedCodeS3Bucket + Key: !Ref SeedCodeS3Key + RepositoryDescription: + Fn::Sub: Amazon Forecast infrastructure as code for the Project ${AWS::StackName} + CodePipelineServiceRole: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: + - codepipeline.amazonaws.com + - cloudformation.amazonaws.com + Action: 'sts:AssumeRole' + Path: / + Policies: + - PolicyName: AWS-CodePipeline-Service-3 + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - 'codecommit:CancelUploadArchive' + - 'codecommit:GetBranch' + - 'codecommit:GetCommit' + - 'codecommit:GetUploadArchiveStatus' + - 'codecommit:UploadArchive' + Resource: '*' + - Effect: Allow + Action: + - 'codedeploy:CreateDeployment' + - 'codedeploy:GetApplicationRevision' + - 'codedeploy:GetDeployment' + - 'codedeploy:GetDeploymentConfig' + - 'codedeploy:RegisterApplicationRevision' + Resource: '*' + - Effect: Allow + Action: + - 'codebuild:BatchGetBuilds' + - 'codebuild:StartBuild' + Resource: '*' + - Effect: Allow + Action: + - 'devicefarm:ListProjects' + - 'devicefarm:ListDevicePools' + - 'devicefarm:GetRun' + - 'devicefarm:GetUpload' + - 'devicefarm:CreateUpload' + - 'devicefarm:ScheduleRun' + Resource: '*' + - Effect: Allow + Action: + - 'lambda:InvokeFunction' + - 'lambda:ListFunctions' + Resource: '*' + - Effect: Allow + Action: + - 'iam:PassRole' + Resource: '*' + - Effect: Allow + Action: + - 'elasticbeanstalk:*' + - 'ec2:*' + - 'elasticloadbalancing:*' + - 'autoscaling:*' + - 'cloudwatch:*' + - 's3:*' + - 'sns:*' + - 'cloudformation:*' + - 'rds:*' + - 'sqs:*' + - 'ecs:*' + - 'states:*' + - 'ssm:*' + - 'events:*' + - 'lambda:*' + - 'forecast:*' + - 'iam:*' + Resource: '*' + CodeBuildRole: + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: sts:AssumeRole + Effect: Allow + Principal: + Service: codebuild.amazonaws.com + Path: / + RoleName: !Join + - '-' + - - !Ref 'AWS::StackName' + - CodeBuild + Policies: + - PolicyName: "logs" + PolicyDocument: + Version: "2012-10-17" + Statement: + - + Effect: "Allow" + Action: + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + - ecr:GetAuthorizationToken + - ssm:GetParameters + Resource: "*" + - PolicyName: "S3" + PolicyDocument: + Version: "2012-10-17" + Statement: + - + Effect: "Allow" + Action: + - s3:GetObject + - s3:PutObject + - s3:GetObjectVersion + Resource: !Sub arn:aws:s3:::${MlOpsArtifactsBucket}/* + Type: AWS::IAM::Role + EventBridgeIAMrole: + Type: 'AWS::IAM::Role' + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: events.amazonaws.com + Action: 'sts:AssumeRole' + Path: / + Policies: + - PolicyName: StartCodePipeline + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - 'codepipeline:StartPipelineExecution' + Resource: "*" + ModelDeployBuildProject: + Type: AWS::CodeBuild::Project + Properties: + Artifacts: + Type: CODEPIPELINE + Environment: + ComputeType: BUILD_GENERAL1_SMALL + EnvironmentVariables: + - Name: ARTIFACT_BUCKET + Value: + Ref: MlOpsArtifactsBucket + - Name: AWS_REGION + Value: + Ref: AWS::Region + - Name: EXPORT_DEP_TEMPLATE_NAME + Value: template-dep-export.yml + - Name: EXPORT_TEMPLATE_NAME + Value: template-export.yml + - Name: EXPORT_TEMPLATE_STAGING_CONFIG + Value: staging-config.json + - Name: EXPORT_TEMPLATE_PROD_CONFIG + Value: prod-config.json + - Name: EXPORT_TEMPLATE_DEP_STAGING_CONFIG + Value: staging-dep-config.json + - Name: EXPORT_TEMPLATE_DEP_PROD_CONFIG + Value: prod-dep-config.json + Image: aws/codebuild/amazonlinux2-x86_64-standard:3.0 + Type: LINUX_CONTAINER + ServiceRole: !Ref 'CodeBuildRole' + Source: + BuildSpec: buildspec.yml + Type: CODEPIPELINE + Description: Builds the Cfn templates which defines the Forecast solutions with specified configuration + Name: + Fn::Sub: forecast-${AWS::StackName}-solution-deploy + TimeoutInMinutes: 30 + ModelDeployPipeline: + Type: AWS::CodePipeline::Pipeline + Properties: + RoleArn: !GetAtt + - CodePipelineServiceRole + - Arn + Stages: + - Actions: + - ActionTypeId: + Category: Source + Owner: AWS + Provider: CodeCommit + Version: '1' + Configuration: + PollForSourceChanges: false + RepositoryName: + Fn::GetAtt: ModelDeployCodeCommitRepository.Name + BranchName: main + Name: ModelDeployInfraCode + OutputArtifacts: + - Name: SourceArtifact + Name: Source + - Actions: + - ActionTypeId: + Category: Build + Owner: AWS + Provider: CodeBuild + Version: '1' + Configuration: + ProjectName: + Ref: ModelDeployBuildProject + InputArtifacts: + - Name: SourceArtifact + Name: BuildDeploymentTemplates + OutputArtifacts: + - Name: BuildArtifact + RunOrder: 1 + Name: Build + - Actions: + - ActionTypeId: + Category: Deploy + Owner: AWS + Provider: CloudFormation + Version: '1' + Configuration: + ActionMode: REPLACE_ON_FAILURE + Capabilities: CAPABILITY_NAMED_IAM + RoleArn: !GetAtt + - CodePipelineServiceRole + - Arn + StackName: forecast-mlops-dependency-staging + TemplateConfiguration: BuildArtifact::staging-dep-config.json + TemplatePath: BuildArtifact::template-dep-export.yml + InputArtifacts: + - Name: BuildArtifact + Name: DeployResourcesStagingDep + RunOrder: 1 + - ActionTypeId: + Category: Deploy + Owner: AWS + Provider: CloudFormation + Version: '1' + Configuration: + ActionMode: REPLACE_ON_FAILURE + Capabilities: CAPABILITY_NAMED_IAM + RoleArn: !GetAtt + - CodePipelineServiceRole + - Arn + StackName: forecast-mlops-staging + TemplateConfiguration: BuildArtifact::staging-config.json + TemplatePath: BuildArtifact::template-export.yml + InputArtifacts: + - Name: BuildArtifact + Name: DeployResourcesStaging + RunOrder: 2 + - ActionTypeId: + Category: Approval + Owner: AWS + Provider: Manual + Version: '1' + Configuration: + CustomData: Approve this model for Production + Name: ApproveDeployment + RunOrder: 3 + Name: DeployStaging + - Actions: + # The following part is a not needed for a single account deployment. + # - ActionTypeId: + # Category: Deploy + # Owner: AWS + # Provider: CloudFormation + # Version: '1' + # Configuration: + # ActionMode: REPLACE_ON_FAILURE + # Capabilities: CAPABILITY_NAMED_IAM + # RoleArn: !GetAtt + # - CodePipelineServiceRole + # - Arn + # StackName: forecast-mlops-dependency-prod + # TemplateConfiguration: BuildArtifact::prod-dep-config.json + # TemplatePath: BuildArtifact::template-dep-export.yml + # InputArtifacts: + # - Name: BuildArtifact + # Name: DeployResourcesProdDep + # RunOrder: 1 + - ActionTypeId: + Category: Deploy + Owner: AWS + Provider: CloudFormation + Version: '1' + Configuration: + ActionMode: REPLACE_ON_FAILURE + Capabilities: CAPABILITY_NAMED_IAM + RoleArn: !GetAtt + - CodePipelineServiceRole + - Arn + StackName: forecast-mlops-prod + TemplateConfiguration: BuildArtifact::prod-config.json + TemplatePath: BuildArtifact::template-export.yml + InputArtifacts: + - Name: BuildArtifact + Name: DeployResourcesProd + RunOrder: 2 + Name: DeployProd + ArtifactStore: + Location: + Ref: MlOpsArtifactsBucket + Type: S3 + Name: + Fn::Sub: forecast-${AWS::StackName}-solution-deploy + DependsOn: + - MlOpsArtifactsBucket + ModelDeployCodeCommitEventRule: + Type: AWS::Events::Rule + Properties: + Description: Rule to trigger a deployment when CodeCommit is updated with a commit + EventPattern: + source: + - aws.codecommit + detail-type: + - CodeCommit Repository State Change + resources: + - Fn::GetAtt: ModelDeployCodeCommitRepository.Arn + detail: + referenceType: + - branch + referenceName: + - main + Name: + Fn::Sub: forecast-${AWS::StackName}-code + State: ENABLED + Targets: + - Arn: + Fn::Join: + - ':' + - - arn + - Ref: AWS::Partition + - codepipeline + - Ref: AWS::Region + - Ref: AWS::AccountId + - Ref: ModelDeployPipeline + Id: + Fn::Sub: codecommit-${AWS::StackName}-trigger + RoleArn: !GetAtt + - EventBridgeIAMrole + - Arn +Outputs: + ModelDeployPipeline: + Value: + Fn::Join: + - '' + - - https://console.aws.amazon.com/codesuite/codepipeline/pipelines/ + - Ref: ModelDeployPipeline + - /view?region= + - Ref: AWS::Region \ No newline at end of file diff --git a/ml_ops/continuous_deployment/seed/.gitignore b/ml_ops/continuous_deployment/seed/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/ml_ops/continuous_deployment/seed/README.md b/ml_ops/continuous_deployment/seed/README.md new file mode 100644 index 0000000..4ebd204 --- /dev/null +++ b/ml_ops/continuous_deployment/seed/README.md @@ -0,0 +1,37 @@ +## Amazon Forecast End-to-End Deployments CD pipeline + +This is a sample code repository for demonstrating how you can organize your code for deploying a complete end-to-end workflow with CodeCommit, CodePipeline and CodeBuild. The CloudFormation templates you will use are from [Amazon Forecast End-to-End Deployments Made Simple](https://github.com/aws-samples/amazon-forecast-samples/tree/main/ml_ops). + +The example uses [Food Demand](https://github.com/aws-samples/amazon-forecast-samples/blob/main/ml_ops/sample_data/FoodDemand.md) configuration to demonstrate Amazon Forecast solutions deployment. + +This code repository defines the CloudFormation templates which define the Step functions, Lambda functions, and AWS Systems Manager, etc as infrastructure. It also has configuration files associated with `staging` and `prod` stages. + +Upon triggering a deployment, the CodePipeline pipeline will deploy two Forecast solutions - `staging` and `prod`. After the first deployment is completed, the CodePipeline waits for a manual approval step for promotion to the prod stage. You will need to go to CodePipeline AWS Managed Console to complete this step. + +You own this code and you can modify this template to change as you need it, add additional tests for your custom validation. + +A description of some of the artifacts is provided below: + + +## Layout of the Seed Forecast Project Template + +`buildspec.yml` + - this file is used by the CodePipeline's Build stage to build CloudFormation templates. + +`forecast-mlops-dependency.yml` + - this CloudFormation template file is packaged by the build step in the CodePipeline and is deployed in different stages. It is built for creating Amazon S3 buckets, Lambda functions and IAM permissions for the following orchestration workload. The description of the template can be found [here](https://github.com/aws-samples/amazon-forecast-samples/tree/main/ml_ops) + +`forecast-mlops-solution-guidance.yml` + - this CloudFormation template file is packaged by the build step in the CodePipeline and is deployed in different stages. It creates Step functions which help coordinate the machine learning pipelines which orchestrate all the Amazon Forecast processes for each workload. The description of the template can be found [here](https://github.com/aws-samples/amazon-forecast-samples/tree/main/ml_ops) + +`staging-dep-config.json` + - this configuration file is used to customize dependency of `staging` stage in the pipeline. You can configure the Amazon S3 bucket. + +`staging-config.json` + - this configuration file is used to customize `staging` stage in the pipeline. You can configure the Amazon Forecast solution here. + +`prod-dep-config.json` + - this configuration file is used to customize dependency of `prod` stage in the pipeline. You can configure the Amazon S3 bucket. + +`prod-config.json` + - this configuration file is used to customize `prod` stage in the pipeline. You can configure the Amazon Forecast solution here. diff --git a/ml_ops/continuous_deployment/seed/buildspec.yml b/ml_ops/continuous_deployment/seed/buildspec.yml new file mode 100644 index 0000000..ecdeeb0 --- /dev/null +++ b/ml_ops/continuous_deployment/seed/buildspec.yml @@ -0,0 +1,27 @@ +version: 0.2 + +phases: + install: + runtime-versions: + python: 3.8 + commands: + # Upgrade AWS CLI to the latest version + - pip install --upgrade --force-reinstall "botocore>1.21.30" "boto3>1.18.30" "awscli>1.20.30" + + build: + commands: + # Package the infrastructure as code defined in forecast-mlops-dependency.yml by using AWS CloudFormation. + # Note that the Environment Variables like ARTIFACT_BUCKET, etc,. used below are expected to be setup by the + # CodeBuild resource in the infra pipeline + - aws cloudformation package --template forecast-mlops-dependency.yml --s3-bucket $ARTIFACT_BUCKET --output-template $EXPORT_DEP_TEMPLATE_NAME + - aws cloudformation package --template forecast-mlops-solution-guidance.yml --s3-bucket $ARTIFACT_BUCKET --output-template $EXPORT_TEMPLATE_NAME + +artifacts: + files: + - $EXPORT_DEP_TEMPLATE_NAME + - $EXPORT_TEMPLATE_NAME + - $EXPORT_TEMPLATE_STAGING_CONFIG + - $EXPORT_TEMPLATE_PROD_CONFIG + - $EXPORT_TEMPLATE_DEP_STAGING_CONFIG + - $EXPORT_TEMPLATE_DEP_PROD_CONFIG + diff --git a/ml_ops/continuous_deployment/seed/forecast-mlops-dependency.yml b/ml_ops/continuous_deployment/seed/forecast-mlops-dependency.yml new file mode 100644 index 0000000..8917801 --- /dev/null +++ b/ml_ops/continuous_deployment/seed/forecast-mlops-dependency.yml @@ -0,0 +1,681 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: > + One-time creation for Amazon Forecast MLOps dependencies + Publish: 20220909 + +Parameters: + + S3Bucket: + Description: Provide the S3 Bucket Name to be used + Type: String + AllowedPattern: ^[a-z0-9][a-z0-9-_]*[a-z0-9] + + ExistingS3Bucket: + Description: Does your S3 bucket already exist? + Default: false + Type: String + AllowedValues: + - true + - false + ConstraintDescription: must specify true or false + +Conditions: + CreateS3Resource: !Equals + - !Ref ExistingS3Bucket + - false + +Resources: + + ForecastArtifactBucket: + Type: AWS::S3::Bucket + Condition: CreateS3Resource + DeletionPolicy: Retain + Properties: + BucketName: !Ref S3Bucket + + ForecastProcessorLambdaExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: + - sts:AssumeRole + Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + - forecast.amazonaws.com + Version: '2012-10-17' + Path: "/" + Policies: + - PolicyDocument: + Statement: + - Action: + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Effect: Allow + Resource: 'arn:aws:logs:*:*:*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-cloudwatch-ops' + - PolicyDocument: + Statement: + - Action: + - ssm:Get* + - ssm:PutParameter* + Effect: Allow + Resource: + - !Sub 'arn:aws:ssm:*:${AWS::AccountId}:parameter/forecast/*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-ssm-ops' + - PolicyDocument: + Statement: + - Action: + - forecast:Describe* + - forecast:Get* + - forecast:List* + - forecast:Query* + - forecast:Invoke* + - forecast:Tag* + - forecast:Untag* + - forecast:Create* + - forecast:Delete* + - forecast:Stop* + - forecast:Update* + - forecast:Resume* + Effect: Allow + Resource: '*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-forecast-ops' + - PolicyDocument: + Statement: + - Action: + - s3:PutObject + - s3:DeleteObject + - s3:Get* + - s3:List* + Effect: Allow + Resource: + - !Join + - '' + - - 'arn:aws:s3:::' + - !Ref S3Bucket + - !Join + - '' + - - 'arn:aws:s3:::' + - !Ref S3Bucket + - '/*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-s3-ops' + - PolicyDocument: + Statement: + - Action: + - glue:GetTable + - glue:CreateTable + - glue:BatchCreatePartition + Effect: Allow + Resource: + - !Sub 'arn:aws:glue:*:${AWS::AccountId}:database/*' + - !Sub 'arn:aws:glue:*:${AWS::AccountId}:catalog' + - !Sub 'arn:aws:glue:*:${AWS::AccountId}:table/*/*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-glue-ops' + RoleName: !Sub 'ForecastProcessorLambdaExecutionRole' + + LambdaGetForecastMetadata: + Type: AWS::Lambda::Function + Properties: + FunctionName: GetForecastMetadata + Handler: index.lambda_handler + Runtime: python3.9 + Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn + Timeout: 5 + MemorySize: 128 + Code: + ZipFile: | + from datetime import datetime + + def lambda_handler(event, context): + + try: + forecastHorizon = int(event.get('ForecastHorizon')) + except: + forecastHorizon = 0 + + return { + 'timeKey': datetime.today().strftime('%Y%m%d%H%M%S'), + 'dateKey': datetime.today().strftime('%Y%m%d'), + 'forecastHorizon': forecastHorizon + } + Description: Simple function that provides variables for unique Predictors, Forecasts and job names. + + LambdaForecastCreateDatasetGroup: + Type: AWS::Lambda::Function + Properties: + FunctionName: ForecastCreateDatasetGroup + Handler: index.lambda_handler + Runtime: python3.9 + Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn + Timeout: 30 + MemorySize: 128 + Code: + ZipFile: | + import os + import boto3 + + def lambda_handler(event, context): + + StackName = event.get('StackName') + + ssm = boto3.client('ssm') + + region= os.environ['AWS_REGION'] + aws_account_id = context.invoked_function_arn.split(":")[4] + + session = boto3.Session(region_name=region) + forecast = session.client(service_name='forecast') + + ParameterPrefix = '/forecast/'+StackName+'/DatasetGroup/' + + parameter = ssm.get_parameter(Name=ParameterPrefix+'DatasetGroupName') + DatasetGroupName = parameter['Parameter']['Value'] + + parameter = ssm.get_parameter(Name=ParameterPrefix+'DataDomain') + DataDomain = parameter['Parameter']['Value'] + + DatasetArns = [] + + BaseArnPrefix='arn:aws:forecast:'+region+':'+aws_account_id+':dataset/' + + try: + response = forecast.describe_dataset(DatasetArn=BaseArnPrefix+DatasetGroupName+'_ITEM') + item_arn=response['DatasetArn'] + DatasetArns.append(item_arn) + + response = ssm.put_parameter(Name=ParameterPrefix+'DatasetArnItem', + Value=item_arn, + Type='String', + Overwrite=True) + + except forecast.exceptions.ResourceNotFoundException: + item_arn=None + + try: + response = forecast.describe_dataset(DatasetArn=BaseArnPrefix+DatasetGroupName+'_RTS') + rts_arn=response['DatasetArn'] + DatasetArns.append(rts_arn) + + response = ssm.put_parameter(Name=ParameterPrefix+'DatasetArnRTS', + Value=rts_arn, + Type='String', + Overwrite=True) + + except forecast.exceptions.ResourceNotFoundException: + rts_arn=None + + try: + response = forecast.describe_dataset(DatasetArn=BaseArnPrefix+DatasetGroupName+'_TTS') + tts_arn=response['DatasetArn'] + DatasetArns.append(tts_arn) + + response = ssm.put_parameter(Name=ParameterPrefix+'DatasetArnTTS', + Value=tts_arn, + Type='String', + Overwrite=True) + + except forecast.exceptions.ResourceNotFoundException: + tts_arn=None + + try: + response = forecast.create_dataset_group( + DatasetGroupName=DatasetGroupName, + Domain=DataDomain, + DatasetArns=DatasetArns, + Tags=[ + { + 'Key': 'Createdby', + 'Value': 'MLOps' + }, + ] + ) + + DatasetGroupArn = response['DatasetGroupArn'] + + response = ssm.put_parameter(Name=ParameterPrefix+'DatasetGroupArn', + Value=DatasetGroupArn, + Type='String', + Overwrite=True) + + return { + 'DatasetGroupArn': DatasetGroupArn + } + + except forecast.exceptions.ResourceAlreadyExistsException: + pass + + return { + 'DatasetGroupArn': None + } + + except: + raise + + return { + 'DatasetGroupArn': None + } + Description: Simple function that creates a variable dataset group based on dataset member existence + + LambdaForecastSelectPredictor: + Type: AWS::Lambda::Function + Properties: + FunctionName: ForecastSelectPredictor + Handler: index.lambda_handler + Runtime: python3.9 + Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn + Timeout: 30 + MemorySize: 128 + Code: + ZipFile: | + #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + #INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + #PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + #HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + #OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + #SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + import os + import boto3 + from datetime import datetime, timezone + + + def lambda_handler(event, context): + + try: + + StackName = event.get('StackName') + + ssm = boto3.client('ssm') + + region= os.environ['AWS_REGION'] + + session = boto3.Session(region_name=region) + forecast = session.client(service_name='forecast') + + ParameterPrefix = '/forecast/'+StackName + + try: + parameter = ssm.get_parameter(Name=ParameterPrefix+'/Forecast/PredictorCutoffAge') + PredictorCutoffAge = int(parameter['Parameter']['Value']) + except: + PredictorCutoffAge = 14 + response = ssm.put_parameter(Name=ParameterPrefix+'/Predictor/PredictorCutoffAge', + Value=str(PredictorCutoffAge), + Type='String', + Overwrite=True) + + try: + parameter = ssm.get_parameter(Name=ParameterPrefix+'/Forecast/ForecastOptimizationMetric') + ForecastOptimizationMetric = parameter['Parameter']['Value'] + except: + + parameter = ssm.get_parameter(Name=ParameterPrefix+'/Predictor/ForecastOptimizationMetric') + ForecastOptimizationMetric = parameter['Parameter']['Value'] + + response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/ForecastOptimizationMetric', + Value=ForecastOptimizationMetric, + Type='String', + Overwrite=True) + + print('Evaluating Predictors under %s days aged, using lowest error metric of %s' % ( str(PredictorCutoffAge), ForecastOptimizationMetric)) + + + parameter = ssm.get_parameter(Name=ParameterPrefix+'/DatasetGroup/DatasetGroupArn') + DatasetGroupArn = parameter['Parameter']['Value'] + + response = forecast.list_predictors( + Filters=[ + { + 'Key': 'DatasetGroupArn', + 'Value': DatasetGroupArn, + 'Condition': 'IS' + }, + ] + ) + + lowest_metric = 2**32 + lowest_arn = '' + + + for i in response['Predictors']: + + + if i['Status'] == 'ACTIVE': + eligible = True + + # first evalate tags in case you want to exclude items because of tag value + response = forecast.list_tags_for_resource(ResourceArn=i['PredictorArn']) + + for t in response['Tags']: + + # setup any other tags desired, this is an example + if t['Key']=='ELIGIBLE' and t['Value']=='FALSE': + eligible = False + + else: + eligible = False + + + # second, evaluate any criteria about the predictor to disqualify it + + if eligible: + + # consider other options avaialble with the payload response documented here + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/forecast.html#ForecastService.Client.describe_auto_predictor + predictor = forecast.describe_auto_predictor(PredictorArn=i['PredictorArn']) + delta = datetime.now(timezone.utc) - predictor['CreationTime'] + + if delta.days > PredictorCutoffAge: + + print('Disqualifying ARN %s due to age of %s' % (i['PredictorArn'], delta.days)) + eligible = False + + # if not disqualified by tag or predictor attributes, consider this predictor for CreateForecast operation + if eligible: + + # consider other options avaialble with the payload response documented here + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/forecast.html#ForecastService.Client.get_accuracy_metrics + predictor = forecast.get_accuracy_metrics(PredictorArn=i['PredictorArn']) + + for p in predictor['PredictorEvaluationResults']: + + for tw in p['TestWindows']: + + + if ForecastOptimizationMetric != 'AverageWeightedQuantileLoss': + for em in tw['Metrics']['ErrorMetrics']: + + error = float(em[ForecastOptimizationMetric]) + print('Evaluating ARN %s with error %s' % ( i['PredictorArn'], error)) + + if error > 0 and error < lowest_metric: + lowest_metric = error + lowest_arn = i['PredictorArn'] + else: + + AWQL = float(tw['Metrics']['AverageWeightedQuantileLoss']) + print('Evaluating ARN %s with AWQL %s' % ( i['PredictorArn'], AWQL)) + if AWQL > 0 and AWQL < lowest_metric: + lowest_metric = AWQL + lowest_arn = i['PredictorArn'] + + # determine existing CreateAutoPredictor ARN + parameter = ssm.get_parameter(Name=ParameterPrefix+'/Forecast/PredictorArn') + ExistingPredictorArn = parameter['Parameter']['Value'] + + + # if a new ARN was discovered + if lowest_metric > 0 and len(lowest_arn)>0 and ExistingPredictorArn != str(lowest_arn): + + print('Setting Predictor ARN for loss winner=',lowest_arn,lowest_metric ) + + # Set the ARN of winning Predictor + response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/PredictorArn', + Value=lowest_arn, + Type='String', + Overwrite=True) + + # Turn on forecasting in case disabled + response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/Generate', + Value='TRUE', + Type='String', + Overwrite=True) + + # Set reference ARN and move to RETRAIN strategy + response = ssm.put_parameter(Name=ParameterPrefix+'/Predictor/ReferenceArn', + Value=lowest_arn, + Type='String', + Overwrite=True) + + # Turn on forecasting in case disabled + response = ssm.put_parameter(Name=ParameterPrefix+'/Predictor/Strategy', + Value='RETRAIN', + Type='String', + Overwrite=True) + + else: + print('No new or additional predictors were found for CreateForecast operation') + + except: + raise + Description: Function to choose a predictor based on age and accuracy conditions + + LambdaForecastPurgeS3Folder: + Type: AWS::Lambda::Function + Properties: + FunctionName: ForecastPurgeS3Folder + Handler: index.lambda_handler + Runtime: python3.9 + Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn + Timeout: 120 + MemorySize: 128 + Code: + ZipFile: | + #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + #INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + #PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + #HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + #OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + #SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + import os + import json + import boto3 + + def lambda_handler(event, context): + + #clear S3 location to prevent duplicate inputs + + s3_client = boto3.client('s3') + + BUCKET = event.get('BucketName') + PREFIX = event.get('Prefix') + + response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=PREFIX) + + if int(response['KeyCount'])>0: + + for object in response['Contents']: + s3_client.delete_object(Bucket=BUCKET, Key=object['Key']) + + return event + + Description: Function that purges a named S3 bucket and prefix to then allow it to be populated with fresh TTS,RTS,IM for Amazon Forecast to import + + LambdaForecastRemoveAthenaQuotes: + Type: AWS::Lambda::Function + Properties: + FunctionName: ForecastRemoveAthenaQuotes + Handler: index.lambda_handler + Runtime: python3.9 + Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn + Timeout: 300 + MemorySize: 3008 + Code: + ZipFile: | + #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, + #INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A + #PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + #HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + #OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + #SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + import os + import json + import boto3 + import re + + def lambda_handler(event, context): + + #clear S3 location to prevent duplicate inputs + + s3_client = boto3.client('s3') + + BUCKET = event.get('BucketName') + PREFIX = event.get('Prefix') + + response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=PREFIX) + + if int(response['KeyCount'])>0: + + for object in response['Contents']: + + # delete metadata files + if object['Key'].endswith('metadata'): + print('Deleted ',object['Key']) + s3_client.delete_object(Bucket=BUCKET, Key=object['Key']) + + + if object['Key'].endswith('csv'): + response = s3_client.get_object(Bucket=BUCKET,Key=object['Key']) + + content = response['Body'].read() + + b = bytearray(content) + str1 = b.decode() + + # replace double quotes + str1= str1.replace('"','') + + # overwrite edited file to S3 + s3_client.put_object( + Bucket=BUCKET, + Key=object['Key'], + Body=str1 + ) + + return event + Description: Function to remove double quotes created by Athena Query during step function execution. + + StateMachineExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: + - sts:AssumeRole + Effect: Allow + Principal: + Service: + - !Sub 'states.${AWS::Region}.amazonaws.com' + Version: '2012-10-17' + Path: "/" + Policies: + - PolicyDocument: + Statement: + - Action: + - lambda:InvokeFunction + Effect: Allow + Resource: + - !Join + - '' + - - !GetAtt LambdaGetForecastMetadata.Arn + - ':*' + - !Join + - '' + - - !GetAtt LambdaForecastCreateDatasetGroup.Arn + - ':*' + - !Join + - '' + - - !GetAtt LambdaForecastSelectPredictor.Arn + - ':*' + - !Join + - '' + - - !GetAtt LambdaForecastPurgeS3Folder.Arn + - ':*' + - !Join + - '' + - - !GetAtt LambdaForecastRemoveAthenaQuotes.Arn + - ':*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-lambda-ops' + - PolicyDocument: + Statement: + - Action: + - athena:StartQueryExecution + - athena:GetQueryExecution + - athena:GetQueryResults + - athena:GetDataCatalog + - glue:GetTable + - glue:GetPartitions + Effect: Allow + Resource: '*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-athena-ops' + - PolicyDocument: + Statement: + - Action: + - s3:PutObject + - s3:DeleteObject + - s3:Get* + - s3:List* + Effect: Allow + Resource: + - !Join + - '' + - - 'arn:aws:s3:::' + - !Ref S3Bucket + - !Join + - '' + - - 'arn:aws:s3:::' + - !Ref S3Bucket + - '/*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-s3-ops' + - PolicyDocument: + Statement: + - Action: + - forecast:Describe* + - forecast:Get* + - forecast:List* + - forecast:Query* + - forecast:Invoke* + - forecast:Tag* + - forecast:Untag* + - forecast:Create* + - forecast:Delete* + - forecast:Stop* + - forecast:Update* + - forecast:Resume* + Effect: Allow + Resource: '*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-forecast-ops' + - PolicyDocument: + Statement: + - Action: + - iam:PassRole + Effect: Allow + Resource: !GetAtt ForecastProcessorLambdaExecutionRole.Arn + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-iam-passrole-ops' + - PolicyDocument: + Statement: + - Action: + - sns:Publish + Effect: Allow + Resource: + - !Sub 'arn:aws:sns:*:${AWS::AccountId}:*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-sns-ops' + - PolicyDocument: + Statement: + - Action: + - ssm:GetParameter + - ssm:GetParameters + - ssm:PutParameter + Effect: Allow + Resource: + - !Sub 'arn:aws:ssm:*:${AWS::AccountId}:parameter/forecast/*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-ssm-ops' + RoleName: !Sub 'ForecastStepFunctionExecutionRole' \ No newline at end of file diff --git a/ml_ops/continuous_deployment/seed/forecast-mlops-solution-guidance.yml b/ml_ops/continuous_deployment/seed/forecast-mlops-solution-guidance.yml new file mode 100644 index 0000000..2cb3b1b --- /dev/null +++ b/ml_ops/continuous_deployment/seed/forecast-mlops-solution-guidance.yml @@ -0,0 +1,2202 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: > + Amazon Forecast MLOps Solution Guidance + Publish: 20220909 + +Parameters: + + DatasetIncludeRTS: + Description: Do you wish to provide a related time series (RTS) for this use-case? + Default: true + Type: String + AllowedValues: + - true + - false + ConstraintDescription: must specify true or false + + DatasetIncludeItem: + Description: Do you wish to provide item metadata for this use-case? + Default: true + Type: String + AllowedValues: + - true + - false + ConstraintDescription: must specify true or false + + DatasetGroupName: + Description: Short name for dataset group, a self-contained workload + Default: MyDatasetGroup + Type: String + AllowedPattern: ^[a-zA-Z][a-zA-Z0-9_]* + + TimestampFormatTTS: + Description: Which timestamp format is provided for Target Time Series + Type: String + Default: yyyy-MM-dd + AllowedValues: + - yyyy-MM-dd + - yyyy-MM-dd HH:mm:ss + + TimestampFormatRTS: + Description: Which timestamp format is provided for Related Time Series + Type: String + Default: yyyy-MM-dd + AllowedValues: + - yyyy-MM-dd + - yyyy-MM-dd HH:mm:ss + + SNSEndpoint: + Description: Provide a valid e-mail address to receive task notifications + Type: String + + S3Bucket: + Description: Provide the S3 Bucket Name to be used + Type: String + AllowedPattern: ^[a-z0-9][a-z0-9-_]*[a-z0-9] + + SchemaTTS: + Description: Provide a valid JSON string to define the Target Time Series Schema + Type: String + Default: + '{ + "Attributes": [ + { + "AttributeName": "location_id", + "AttributeType": "string" + }, + { + "AttributeName": "item_id", + "AttributeType": "string" + }, + { + "AttributeName": "target_value", + "AttributeType": "integer" + }, + { + "AttributeName": "timestamp", + "AttributeType": "timestamp" + } + ] + }' + + SchemaRTS: + Description: Provide a valid JSON string to define the Related Time Series Schema + Type: String + Default: + '{ + "Attributes": [ + { + "AttributeName": "location_id", + "AttributeType": "string" + }, + { + "AttributeName": "item_id", + "AttributeType": "string" + }, + { + "AttributeName": "checkout_price", + "AttributeType": "float" + }, + { + "AttributeName": "base_price", + "AttributeType": "float" + }, + { + "AttributeName": "emailer_for_promotion", + "AttributeType": "integer" + }, + { + "AttributeName": "homepage_featured", + "AttributeType": "integer" + }, + { + "AttributeName": "timestamp", + "AttributeType": "timestamp" + } + ] + }' + + + SchemaITEM: + Description: Provide a valid JSON string to define the Item Metadata Schema + Type: String + Default: + '{ + "Attributes": [ + { + "AttributeName": "item_id", + "AttributeType": "string" + }, + { + "AttributeName": "food_category", + "AttributeType": "string" + }, + { + "AttributeName": "food_cuisine", + "AttributeType": "string" + } + ] + }' + + PredictorForecastFrequency: + Description: What period are forecasts generated at + Type: String + Default: W + AllowedValues: + - Y + - M + - W + - D + - H + - 30min + - 15min + - 10min + - 5min + - 1min + + DatasetGroupFrequencyTTS: + Description: The frequency of data collection for TARGET TIME SERIES dataset + Type: String + Default: W + AllowedValues: + - Y + - M + - W + - D + - H + - 30min + - 15min + - 10min + - 5min + - 1min + + DatasetGroupFrequencyRTS: + Description: The frequency of data collection for RELATED TIME SERIES dataset + Type: String + Default: W + AllowedValues: + - Y + - M + - W + - D + - H + - 30min + - 15min + - 10min + - 5min + - 1min + + PredictorForecastDimensions: + Description: What period are forecasts generated at + Type: String + Default: + '[ + "location_id" + ]' + + PredictorForecastOptimizationMetric: + Description: What period are forecasts generated at + Type: String + Default: AverageWeightedQuantileLoss + AllowedValues: + - WAPE + - RMSE + - AverageWeightedQuantileLoss + - MASE + - MAPE + + PredictorExplainPredictor: + Description: Generate Explainability + Type: String + Default: TRUE + AllowedValues: + - TRUE + - FALSE + + PredictorForecastHorizon: + Description: How many future steps to forecast + Type: String + Default: 3 + + PredictorForecastTypes: + Description: which quantiles to choose for training a predictor + Type: String + Default: + '[ + "0.30", + "0.40", + "0.50", + "0.60", + "0.70" + ]' + + PredictorAttributeConfigs: + Description: Provide JSON string to featurize data + Type: String + Default: + '[ + { + "AttributeName": "checkout_price", + "Transformations": { + "backfill": "mean", + "futurefill": "mean", + "middlefill": "mean" + } + }, + { + "AttributeName": "base_price", + "Transformations": { + "backfill": "mean", + "futurefill": "mean", + "middlefill": "mean" + } + }, + { + "AttributeName": "emailer_for_promotion", + "Transformations": { + "backfill": "zero", + "futurefill": "zero", + "middlefill": "zero" + } + }, + { + "AttributeName": "homepage_featured", + "Transformations": { + "backfill": "zero", + "futurefill": "zero", + "middlefill": "zero" + } + }, + { + "AttributeName": "target_value", + "Transformations": { + "aggregation": "sum", + "backfill": "nan", + "frontfill": "none", + "middlefill": "nan" + } + } + ]' + + ForecastForecastTypes: + Description: When a CreateForecast job runs, this declares which quantiles to produce predictions for. You may choose up to 5 values in this array. Edit this value to include values according to need. + Type: String + Default: + '[ + "0.50" + ]' + +Resources: + + StepFunctionsWorkflowRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Statement: + - Action: + - sts:AssumeRole + Effect: Allow + Principal: + Service: + - states.amazonaws.com + Version: '2012-10-17' + Path: "/" + Policies: + - PolicyDocument: + Statement: + - Action: + - lambda:InvokeFunction + Effect: Allow + Resource: + - !Join + - '' + - - !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastSelectPredictor' + - ':*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-lambda-ops' + - PolicyDocument: + Statement: + - Action: + - states:StartExecution + - states:DescribeExecution + - states:StopExecution + Effect: Allow + Resource: + - !Join + - '' + - - !Sub 'arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${AWS::StackName}' + - '*' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-states-ops' + - PolicyDocument: + Statement: + - Action: + - events:PutTargets + - events:PutRule + - events:DescribeRule + Effect: Allow + Resource: + - !Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule' + Version: '2012-10-17' + PolicyName: !Sub '${AWS::StackName}-events-ops' + RoleName: !Sub "StepFunctions-${AWS::StackName}-Workflow-Role" + + + CreateDatasetGroupStateMachine: + Type: "AWS::StepFunctions::StateMachine" + Properties: + StateMachineName: !Sub "${AWS::StackName}-Create-Dataset-Group" + DefinitionString: + !Sub | + { + "Comment": "An automation pipeline to create Amazon Forecast datasets and associate dataset group", + "StartAt": "ParametersDatasetGroup", + "States": { + "ParametersDatasetGroup": { + "Type": "Task", + "Next": "Create Datasets", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DataDomain", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetGroupName", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeItem", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeRTS" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "Data_Domain.$": "$.Parameters[0].Value", + "DatasetGroupName.$": "$.Parameters[1].Value", + "DatasetIncludeItem.$": "$.Parameters[2].Value", + "DatasetIncludeRTS.$": "$.Parameters[3].Value" + }, + "ResultPath": "$.GetParameters" + }, + "Create Datasets": { + "Type": "Parallel", + "Next": "CreateDatasetGroup", + "Branches": [ + { + "StartAt": "ParametersTTS", + "States": { + "ParametersTTS": { + "Type": "Task", + "Next": "TTS", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/FrequencyTTS", + "/forecast/${AWS::StackName}/DatasetGroup/SchemaTTS" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultPath": "$.GetTTSParameters", + "ResultSelector": { + "TTS_Data_Frequency.$": "$.Parameters[0].Value", + "TTS_Schema.$": "$.Parameters[1].Value" + } + }, + "TTS": { + "Type": "Task", + "Parameters": { + "DatasetName.$": "States.Format('{}{}',$.GetParameters.DatasetGroupName,'_TTS')", + "Domain.$": "$.GetParameters.Data_Domain", + "DatasetType": "TARGET_TIME_SERIES", + "DataFrequency.$": "$.GetTTSParameters.TTS_Data_Frequency", + "Schema.$": "States.StringToJson($.GetTTSParameters.TTS_Schema)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createDataset", + "Catch": [ + { + "ErrorEquals": [ + "Forecast.ResourceAlreadyExistsException" + ], + "Next": "TTS Exit", + "Comment": "Forecast.ResourceAlreadyExistsException", + "ResultPath": "$.CreateDatasetTTSCatcher" + } + ], + "ResultPath": "$.CreateDatasetTTS", + "Next": "TTS Exit" + }, + "TTS Exit": { + "Type": "Wait", + "Seconds": 2, + "End": true + } + } + }, + { + "StartAt": "IncludeRTS", + "States": { + "IncludeRTS": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.GetParameters.DatasetIncludeRTS", + "StringEquals": "true", + "Next": "ParametersRTS" + }, + { + "Variable": "$.GetParameters.DatasetIncludeRTS", + "StringEquals": "false", + "Next": "RTS Exit" + } + ] + }, + "ParametersRTS": { + "Type": "Task", + "Next": "RTS", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/FrequencyRTS", + "/forecast/${AWS::StackName}/DatasetGroup/SchemaRTS" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultPath": "$.GetRTSParameters", + "ResultSelector": { + "RTS_Data_Frequency.$": "$.Parameters[0].Value", + "RTS_Schema.$": "$.Parameters[1].Value" + } + }, + "RTS": { + "Type": "Task", + "Parameters": { + "DatasetName.$": "States.Format('{}{}',$.GetParameters.DatasetGroupName,'_RTS')", + "Domain.$": "$.GetParameters.Data_Domain", + "DatasetType": "RELATED_TIME_SERIES", + "DataFrequency.$": "$.GetRTSParameters.RTS_Data_Frequency", + "Schema.$": "States.StringToJson($.GetRTSParameters.RTS_Schema)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createDataset", + "Next": "RTS Exit", + "ResultPath": "$.CreateDatasetRTS", + "Catch": [ + { + "ErrorEquals": [ + "Forecast.ResourceAlreadyExistsException" + ], + "Comment": "Forecast.ResourceAlreadyExistsException", + "Next": "RTS Exit", + "ResultPath": "$.CreateDatasetRTSCatcher" + } + ] + }, + "RTS Exit": { + "Type": "Wait", + "Seconds": 2, + "End": true + } + } + }, + { + "StartAt": "IncludeItem", + "States": { + "IncludeItem": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.GetParameters.DatasetIncludeItem", + "StringEquals": "true", + "Next": "ParametersITEM" + }, + { + "Variable": "$.GetParameters.DatasetIncludeItem", + "StringEquals": "false", + "Next": "Item Exit" + } + ] + }, + "ParametersITEM": { + "Type": "Task", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/SchemaITEM" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "ITEM_Schema.$": "$.Parameters[0].Value" + }, + "ResultPath": "$.GetITEMParameters", + "Next": "ITEM Metadata" + }, + "ITEM Metadata": { + "Type": "Task", + "Parameters": { + "DatasetName.$": "States.Format('{}{}',$.GetParameters.DatasetGroupName,'_ITEM')", + "Domain.$": "$.GetParameters.Data_Domain", + "DatasetType": "ITEM_METADATA", + "Schema.$": "States.StringToJson($.GetITEMParameters.ITEM_Schema)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createDataset", + "Catch": [ + { + "ErrorEquals": [ + "Forecast.ResourceAlreadyExistsException" + ], + "Comment": "Forecast.ResourceAlreadyExistsException", + "ResultPath": "$.CreateDatasetITEMCatcher", + "Next": "Item Exit" + } + ], + "ResultPath": "$.CreateDatasetITEM", + "Next": "Item Exit" + }, + "Item Exit": { + "Type": "Wait", + "Seconds": 2, + "End": true + } + } + } + ], + "ResultPath": "$.ParallelCreateDatasets" + }, + "CreateDatasetGroup": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastCreateDatasetGroup:$LATEST", + "Payload": { + "StackName": "${AWS::StackName}" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Next": "Success" + }, + "Success": { + "Type": "Succeed" + } + } + } + RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/ForecastStepFunctionExecutionRole" + + + CreateImportDatasetStateMachine: + Type: "AWS::StepFunctions::StateMachine" + Properties: + StateMachineName: !Sub "${AWS::StackName}-Import-Dataset" + DefinitionString: + !Sub | + { + "Comment": "An automation pipeline to import data into Amazon Forecast", + "StartAt": "Create Runtime Metadata", + "States": { + "Create Runtime Metadata": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:GetForecastMetadata:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Next": "GetParameters" + }, + "GetParameters": { + "Type": "Task", + "Next": "Prepare Datasets", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeItem", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeRTS", + "/forecast/${AWS::StackName}/DatasetGroup/S3Bucket" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetIncludeItem.$": "$.Parameters[0].Value", + "DatasetIncludeRTS.$": "$.Parameters[1].Value", + "DatasetS3Bucket.$": "$.Parameters[2].Value" + }, + "ResultPath": "$.GetParameters" + }, + "Prepare Datasets": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "GetParametersTTS", + "States": { + "GetParametersTTS": { + "Type": "Task", + "Next": "TTS Import", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetArnTTS", + "/forecast/${AWS::StackName}/DatasetGroup/TimestampFormatTTS" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetArnTTS.$": "$.Parameters[0].Value", + "TimestampFormatTTS.$": "$.Parameters[1].Value" + }, + "ResultPath": "$.GetParametersTTS" + }, + "TTS Import": { + "Type": "Task", + "Parameters": { + "TimestampFormat.$": "$.GetParametersTTS.TimestampFormatTTS", + "DataSource": { + "S3Config": { + "Path.$": "States.Format('{}{}{}','s3://',$.GetParameters.DatasetS3Bucket,'/${AWS::StackName}/tts/')", + "RoleArn": "arn:aws:iam::${AWS::AccountId}:role/ForecastProcessorLambdaExecutionRole" + } + }, + "DatasetArn.$": "$.GetParametersTTS.DatasetArnTTS", + "DatasetImportJobName.$": "States.Format('{}_{}', 'TTSImport',$.timeKey)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createDatasetImportJob", + "ResultPath": "$.TTSImportJob", + "Next": "DescribeDatasetImportJobTTS" + }, + "DescribeDatasetImportJobTTS": { + "Type": "Task", + "Next": "Evaluate TTS Status", + "Parameters": { + "DatasetImportJobArn.$": "$.TTSImportJob.DatasetImportJobArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:describeDatasetImportJob", + "ResultPath": "$.describeDatasetImportJobResultTTS" + }, + "Evaluate TTS Status": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "Variable": "$.describeDatasetImportJobResultTTS.Status", + "StringEquals": "CREATE_PENDING" + }, + { + "Variable": "$.describeDatasetImportJobResultTTS.Status", + "StringEquals": "CREATE_IN_PROGRESS" + } + ], + "Next": "Retry TTS Not Active" + }, + { + "Variable": "$.describeDatasetImportJobResultTTS.Status", + "StringEquals": "ACTIVE", + "Next": "TTS Active" + } + ], + "Default": "TTS Fail" + }, + "TTS Fail": { + "Type": "Fail" + }, + "Retry TTS Not Active": { + "Type": "Wait", + "Seconds": 90, + "Next": "DescribeDatasetImportJobTTS" + }, + "TTS Active": { + "Type": "Wait", + "Seconds": 0, + "End": true + } + } + }, + { + "StartAt": "Include RTS", + "States": { + "Include RTS": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.GetParameters.DatasetIncludeRTS", + "StringEquals": "true", + "Next": "GetParametersRTS" + }, + { + "Variable": "$.GetParameters.DatasetIncludeRTS", + "StringEquals": "false", + "Next": "RTS Active" + } + ] + }, + "GetParametersRTS": { + "Type": "Task", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetArnRTS", + "/forecast/${AWS::StackName}/DatasetGroup/TimestampFormatRTS" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetArnRTS.$": "$.Parameters[0].Value", + "TimestampFormatRTS.$": "$.Parameters[1].Value" + }, + "ResultPath": "$.GetParametersRTS", + "Next": "RTS Import" + }, + "RTS Import": { + "Type": "Task", + "Parameters": { + "TimestampFormat.$": "$.GetParametersRTS.TimestampFormatRTS", + "DataSource": { + "S3Config": { + "Path.$": "States.Format('{}{}{}','s3://',$.GetParameters.DatasetS3Bucket,'/${AWS::StackName}/rts/')", + "RoleArn": "arn:aws:iam::${AWS::AccountId}:role/ForecastProcessorLambdaExecutionRole" + } + }, + "DatasetArn.$": "$.GetParametersRTS.DatasetArnRTS", + "DatasetImportJobName.$": "States.Format('{}_{}', 'RTSImport',$.timeKey)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createDatasetImportJob", + "ResultPath": "$.RTSImportJob", + "Next": "DescribeDatasetImportJobRTS" + }, + "DescribeDatasetImportJobRTS": { + "Type": "Task", + "Parameters": { + "DatasetImportJobArn.$": "$.RTSImportJob.DatasetImportJobArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:describeDatasetImportJob", + "ResultPath": "$.describeDatasetImportJobResultRTS", + "Next": "Evaluate RTS Status" + }, + "Evaluate RTS Status": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "Variable": "$.describeDatasetImportJobResultRTS.Status", + "StringEquals": "CREATE_PENDING" + }, + { + "Variable": "$.describeDatasetImportJobResultRTS.Status", + "StringEquals": "CREATE_IN_PROGRESS" + } + ], + "Next": "Retry RTS Not Active" + }, + { + "Variable": "$.describeDatasetImportJobResultRTS.Status", + "StringEquals": "ACTIVE", + "Next": "RTS Active" + } + ], + "Default": "RTS Fail" + }, + "Retry RTS Not Active": { + "Type": "Wait", + "Seconds": 90, + "Next": "DescribeDatasetImportJobRTS" + }, + "RTS Fail": { + "Type": "Fail" + }, + "RTS Active": { + "Type": "Wait", + "Seconds": 0, + "End": true + } + } + }, + { + "StartAt": "Include Item", + "States": { + "Include Item": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.GetParameters.DatasetIncludeItem", + "StringEquals": "true", + "Next": "GetParametersItem" + }, + { + "Variable": "$.GetParameters.DatasetIncludeItem", + "StringEquals": "false", + "Next": "ITEM Active" + } + ] + }, + "GetParametersItem": { + "Type": "Task", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetArnItem" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetArnItem.$": "$.Parameters[0].Value" + }, + "ResultPath": "$.GetParametersItem", + "Next": "ITEM Import" + }, + "ITEM Import": { + "Type": "Task", + "Parameters": { + "DataSource": { + "S3Config": { + "Path.$": "States.Format('{}{}{}','s3://',$.GetParameters.DatasetS3Bucket,'/${AWS::StackName}/item/')", + "RoleArn": "arn:aws:iam::${AWS::AccountId}:role/ForecastProcessorLambdaExecutionRole" + } + }, + "DatasetArn.$": "$.GetParametersItem.DatasetArnItem", + "DatasetImportJobName.$": "States.Format('{}_{}', 'ITEMImport',$.timeKey)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createDatasetImportJob", + "ResultPath": "$.ITEMImportJob", + "Next": "DescribeDatasetImportJobItem" + }, + "DescribeDatasetImportJobItem": { + "Type": "Task", + "Parameters": { + "DatasetImportJobArn.$": "$.ITEMImportJob.DatasetImportJobArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:describeDatasetImportJob", + "ResultPath": "$.describeDatasetImportJobResultItem", + "Next": "Evaluate ITEM Status" + }, + "Evaluate ITEM Status": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "Variable": "$.describeDatasetImportJobResultItem.Status", + "StringEquals": "CREATE_PENDING" + }, + { + "Variable": "$.describeDatasetImportJobResultItem.Status", + "StringEquals": "CREATE_IN_PROGRESS" + } + ], + "Next": "Retry ITEM Not Active" + }, + { + "Variable": "$.describeDatasetImportJobResultItem.Status", + "StringEquals": "ACTIVE", + "Next": "ITEM Active" + } + ], + "Default": "ITEM Fail" + }, + "Retry ITEM Not Active": { + "Type": "Wait", + "Seconds": 90, + "Next": "DescribeDatasetImportJobItem" + }, + "ITEM Fail": { + "Type": "Fail" + }, + "ITEM Active": { + "Type": "Wait", + "Seconds": 0, + "End": true + } + } + } + ], + "ResultPath": "$.ParallelPrepareDatasets", + "End": true + } + } + } + RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/ForecastStepFunctionExecutionRole" + + + PGenerateForecast: + Type: AWS::SSM::Parameter + Properties: + Description: Should a forecast be generated? Values are TRUE and FALSE + Name: !Sub "/forecast/${AWS::StackName}/Forecast/Generate" + Type: String + Value: "FALSE" + + PPredictorArnToForecast: + Type: AWS::SSM::Parameter + Properties: + Description: Which PredictorARN to use when generating a forecast job + Name: !Sub "/forecast/${AWS::StackName}/Forecast/PredictorArn" + Type: String + Value: insert true ARN after available and approved for use + + PPredictorArnForRetrain: + Type: AWS::SSM::Parameter + Properties: + Description: For RETRAIN Strategy, this is the base ARN to use for retrain + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ReferenceArn" + Type: String + Value: insert true ARN after available and approved for use + + PPredictorStrategy: + Type: AWS::SSM::Parameter + Properties: + Description: Values are TRAIN, RETRAIN and NONE, determines which action is taken. + Name: !Sub "/forecast/${AWS::StackName}/Predictor/Strategy" + Type: String + Value: TRAIN + + PPredictorAttributeConfigs: + Type: AWS::SSM::Parameter + Properties: + Description: JSON string defining how filled, numerical RTS/TTS are featurized + Name: !Sub "/forecast/${AWS::StackName}/Predictor/AttributeConfigs" + Type: String + Value: !Ref PredictorAttributeConfigs + + PPredictorForecastHorizon: + Type: AWS::SSM::Parameter + Properties: + Description: How many steps is the future prediction horizon + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ForecastHorizon" + Type: String + Value: !Ref PredictorForecastHorizon + + PPredictorForecastFrequency: + Type: AWS::SSM::Parameter + Properties: + Description: Forecast Frequency + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ForecastFrequency" + Type: String + Value: !Ref PredictorForecastFrequency + + PPredictorForecastDimensions: + Type: AWS::SSM::Parameter + Properties: + Description: How many steps is the future prediction horizon + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ForecastDimensions" + Type: String + Value: !Ref PredictorForecastDimensions + + PPredictorForecastOptimizationMetric: + Type: AWS::SSM::Parameter + Properties: + Description: Which optimization metric is used + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ForecastOptimizationMetric" + Type: String + Value: !Ref PredictorForecastOptimizationMetric + + PPredictorExplainPredictor: + Type: AWS::SSM::Parameter + Properties: + Description: Generate Predictor Explainability + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ExplainPredictor" + Type: String + Value: !Ref PredictorExplainPredictor + + PPredictorForecastTypes: + Type: AWS::SSM::Parameter + Properties: + Description: which quantile values selected for training in predictor + Name: !Sub "/forecast/${AWS::StackName}/Predictor/ForecastTypes" + Type: String + Value: !Ref PredictorForecastTypes + + PForecastForecastTypes: + Type: AWS::SSM::Parameter + Properties: + Description: which quantile values selected for forecast data points + Name: !Sub "/forecast/${AWS::StackName}/Forecast/ForecastTypes" + Type: String + Value: !Ref ForecastForecastTypes + + PDatasetGroupDomain: + Type: AWS::SSM::Parameter + Properties: + Description: See https://docs.aws.amazon.com/forecast/latest/dg/howitworks-domains-ds-types.html + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/DataDomain" + Type: String + Value: CUSTOM + + PDatasetTimestampFormatRTS: + Type: AWS::SSM::Parameter + Properties: + Description: Format of Related Time Series Timestamp Format + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/TimestampFormatRTS" + Type: String + Value: !Ref TimestampFormatRTS + + PDatasetTimestampFormatTTS: + Type: AWS::SSM::Parameter + Properties: + Description: Format of Target Time Series Timestamp Format + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/TimestampFormatTTS" + Type: String + Value: !Ref TimestampFormatTTS + + PDatasetGroupFrequencyTTS: + Type: AWS::SSM::Parameter + Properties: + Description: Valid intervals are Y (Year), M (Month), W (Week), D (Day), H (Hour), 30min (30 minutes), 15min (15 minutes), 10min (10 minutes), 5min (5 minutes), and 1min (1 minute) + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/FrequencyTTS" + Type: String + Value: !Ref DatasetGroupFrequencyTTS + + PDatasetGroupFrequencyRTS: + Type: AWS::SSM::Parameter + Properties: + Description: Valid intervals are Y (Year), M (Month), W (Week), D (Day), H (Hour), 30min (30 minutes), 15min (15 minutes), 10min (10 minutes), 5min (5 minutes), and 1min (1 minute) + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/FrequencyRTS" + Type: String + Value: !Ref DatasetGroupFrequencyRTS + + PDatasetGroupSchemaITEM: + Type: AWS::SSM::Parameter + Properties: + Description: Schema for the dataset, datatype and order must match the fields in your data. + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/SchemaITEM" + Type: String + Value: !Ref SchemaITEM + + PDatasetGroupSchemaRTS: + Type: AWS::SSM::Parameter + Properties: + Description: Schema for the dataset, datatype and order must match the fields in your data. + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/SchemaRTS" + Type: String + Value: !Ref SchemaRTS + + PDatasetGroupSchemaTTS: + Type: AWS::SSM::Parameter + Properties: + Description: Schema for the dataset, datatype and order must match the fields in your data. + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/SchemaTTS" + Type: String + Value: !Ref SchemaTTS + + PDatasetIncludeRTS: + Type: AWS::SSM::Parameter + Properties: + Description: Determines if RTS creation and import are attempted + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeRTS" + Type: String + Value: !Ref DatasetIncludeRTS + + PDatasetIncludeItem: + Type: AWS::SSM::Parameter + Properties: + Description: Determines if Item Metadata creation and import are attempted + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeItem" + Type: String + Value: !Ref DatasetIncludeItem + + PS3Bucket: + Type: AWS::SSM::Parameter + Properties: + Description: S3 bucket storing the inputs and outputs produced by the Amazon Forecast workflow + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/S3Bucket" + Type: String + Value: !Ref S3Bucket + + PDatasetGroupName: + Type: AWS::SSM::Parameter + Properties: + Description: Name for dataset group + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/DatasetGroupName" + Type: String + Value: !Ref DatasetGroupName + + PQueryTTS: + Type: AWS::SSM::Parameter + Properties: + Description: Valid SQL Statement to fetch TTS data + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/QueryTTS" + Type: String + Value: "select x,y,z from table where x=..." + + PQueryRTS: + Type: AWS::SSM::Parameter + Properties: + Description: Valid SQL Statement to fetch RTS data + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/QueryRTS" + Type: String + Value: "select x,y,z from table where x=..." + + PQueryItem: + Type: AWS::SSM::Parameter + Properties: + Description: Valid SQL Statement to fetch Item Metadata data + Name: !Sub "/forecast/${AWS::StackName}/DatasetGroup/QueryITEM" + Type: String + Value: "select x,y,z from table where x=..." + + StateMachineSNSTopic: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub "${AWS::StackName}" + Subscription: + - Endpoint: !Ref SNSEndpoint + Protocol: email + + CreatePredictorStateMachine: + Type: "AWS::StepFunctions::StateMachine" + Properties: + StateMachineName: !Sub "${AWS::StackName}-Create-Predictor" + DefinitionString: + !Sub | + { + "Comment": "An automation pipeline to train (or retrain) an Amazon Forecast Predictor", + "StartAt": "GetParameters", + "States": { + "GetParameters": { + "Type": "Task", + "Next": "Retrain Get Runtime Metadata", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetGroupArn", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetGroupName", + "/forecast/${AWS::StackName}/Predictor/AttributeConfigs", + "/forecast/${AWS::StackName}/Predictor/ExplainPredictor", + "/forecast/${AWS::StackName}/Predictor/ForecastDimensions", + "/forecast/${AWS::StackName}/Predictor/ForecastFrequency", + "/forecast/${AWS::StackName}/Predictor/ForecastHorizon", + "/forecast/${AWS::StackName}/Predictor/ForecastOptimizationMetric", + "/forecast/${AWS::StackName}/Predictor/ForecastTypes", + "/forecast/${AWS::StackName}/Predictor/Strategy" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetGroupArn.$": "$.Parameters[0].Value", + "DatasetGroupName.$": "$.Parameters[1].Value", + "AttributeConfigs.$": "$.Parameters[2].Value", + "ExplainPredictor.$": "$.Parameters[3].Value", + "ForecastDimensions.$": "$.Parameters[4].Value", + "ForecastFrequency.$": "$.Parameters[5].Value", + "ForecastHorizon.$": "$.Parameters[6].Value", + "ForecastOptimizationMetric.$": "$.Parameters[7].Value", + "ForecastTypes.$": "$.Parameters[8].Value", + "Strategy.$": "$.Parameters[9].Value" + } + }, + "Retrain Get Runtime Metadata": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload.$": "$", + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:GetForecastMetadata:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "ResultPath": "$.CreateRuntimeMetadata", + "Next": "Create Auto Predictor" + }, + "Create Auto Predictor": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.Strategy", + "StringEquals": "TRAIN", + "Next": "Attempt New Auto Predictor" + }, + { + "Variable": "$.Strategy", + "StringEquals": "RETRAIN", + "Next": "GetParameter" + } + ], + "Default": "Strategy Not Train or Retain Success" + }, + "Attempt New Auto Predictor": { + "Type": "Task", + "Next": "DescribeAutoPredictor", + "Parameters": { + "PredictorName.$": "States.Format('{}_{}', $.DatasetGroupName,$.CreateRuntimeMetadata.Payload.timeKey)", + "DataConfig": { + "DatasetGroupArn.$": "$.DatasetGroupArn", + "AttributeConfigs.$": "States.StringToJson($.AttributeConfigs)", + "AdditionalDatasets": null + }, + "ForecastFrequency.$": "$.ForecastFrequency", + "ForecastHorizon.$": "$.CreateRuntimeMetadata.Payload.forecastHorizon", + "ForecastDimensions.$": "States.StringToJson($.ForecastDimensions)", + "OptimizationMetric.$": "$.ForecastOptimizationMetric", + "ExplainPredictor.$": "$.ExplainPredictor", + "ForecastTypes.$": "States.StringToJson($.ForecastTypes)", + "Tags": [ + { + "Key": "MLOpsPublishDate", + "Value": "20220909" + } + ] + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createAutoPredictor", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Predictor Fail", + "ResultPath": "$.CreateAutoPredictorError", + "Comment": "All Errors" + } + ], + "ResultPath": "$.CreateAutoPredictor" + }, + "SNS Predictor Fail": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "Message.$": "$", + "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${AWS::StackName}" + }, + "Next": "Fail Train Predictor" + }, + "Fail Train Predictor": { + "Type": "Fail" + }, + "DescribeAutoPredictor": { + "Type": "Task", + "Next": "State of New Predictor", + "Parameters": { + "PredictorArn.$": "$.CreateAutoPredictor.PredictorArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:describeAutoPredictor", + "ResultPath": "$.DescribeAutoPredictor", + "ResultSelector": { + "PredictorName.$": "$.PredictorName", + "PredictorStatus.$": "$.Status", + "PredictorArn.$": "$.PredictorArn" + } + }, + "State of New Predictor": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.DescribeAutoPredictor.PredictorStatus", + "StringEquals": "ACTIVE", + "Next": "Parallel Post Predictor Tasks" + }, + { + "Or": [ + { + "Variable": "$.DescribeAutoPredictor.PredictorStatus", + "StringEquals": "CREATE_IN_PROGRESS" + }, + { + "Variable": "$.DescribeAutoPredictor.PredictorStatus", + "StringEquals": "CREATE_PENDING" + } + ], + "Next": "New Predictor Create In Progress Wait" + } + ], + "Default": "Fail Train Predictor" + }, + "Parallel Post Predictor Tasks": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "GetS3Parameters", + "States": { + "GetS3Parameters": { + "Type": "Task", + "Next": "CreatePredictorBacktestExportJob", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/S3Bucket" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetS3Bucket.$": "$.Parameters[0].Value" + }, + "ResultPath": "$.GetS3Parameters" + }, + "CreatePredictorBacktestExportJob": { + "Type": "Task", + "Parameters": { + "Destination": { + "S3Config": { + "Path.$": "States.Format('{}{}{}','s3://',$.GetS3Parameters.DatasetS3Bucket,'/${AWS::StackName}/backtest-export/')", + "RoleArn": "arn:aws:iam::${AWS::AccountId}:role/ForecastProcessorLambdaExecutionRole" + } + }, + "PredictorArn.$": "$.DescribeAutoPredictor.PredictorArn", + "PredictorBacktestExportJobName.$": "States.Format('{}_{}', $.DatasetGroupName, $.CreateRuntimeMetadata.Payload.timeKey)" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createPredictorBacktestExportJob", + "Next": "ListPredictorBacktestExportJobs", + "ResultPath": "$.CreatePredictorBacktestExport", + "Catch": [ + { + "ErrorEquals": [ + "Forecast.ResourceAlreadyExistsException" + ], + "Comment": "Forecast.ResourceAlreadyExistsException", + "Next": "ListPredictorBacktestExportJobs" + } + ], + "HeartbeatSeconds": 10 + }, + "ListPredictorBacktestExportJobs": { + "Type": "Task", + "Next": "Choice", + "Parameters": { + "Filters": [ + { + "Condition": "IS", + "Key": "PredictorArn", + "Value.$": "$.DescribeAutoPredictor.PredictorArn" + } + ] + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:listPredictorBacktestExportJobs", + "ResultPath": "$.PredictorBacktestExportJobs", + "ResultSelector": { + "Status.$": "$.PredictorBacktestExportJobs[0].Status" + }, + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Predictor Backtest Export Fail" + } + ] + }, + "Choice": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "Variable": "$.PredictorBacktestExportJobs.Status", + "StringEquals": "CREATE_IN_PROGRESS" + }, + { + "Variable": "$.PredictorBacktestExportJobs.Status", + "StringEquals": "CREATE_PENDING" + } + ], + "Next": "Wait for Backtest Export to Complete" + }, + { + "Variable": "$.PredictorBacktestExportJobs.Status", + "StringEquals": "ACTIVE", + "Next": "GetAccuracyMetrics" + } + ], + "Default": "SNS Predictor Backtest Export Fail" + }, + "SNS Predictor Backtest Export Fail": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "Message.$": "$", + "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${AWS::StackName}" + }, + "Next": "Fail" + }, + "GetAccuracyMetrics": { + "Type": "Task", + "Next": "SNS Publish", + "Parameters": { + "PredictorArn.$": "$.DescribeAutoPredictor.PredictorArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:getAccuracyMetrics", + "ResultPath": "$.AccuracyMetrics", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Publish" + } + ] + }, + "Fail": { + "Type": "Fail" + }, + "SNS Publish": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "Message.$": "$", + "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${AWS::StackName}" + }, + "End": true + }, + "Wait for Backtest Export to Complete": { + "Type": "Wait", + "Seconds": 120, + "Next": "ListPredictorBacktestExportJobs" + } + } + } + ], + "ResultPath": "$.PredictorMetadataCreation", + "End": true + }, + "New Predictor Create In Progress Wait": { + "Type": "Wait", + "Seconds": 300, + "Next": "DescribeAutoPredictor" + }, + "Strategy Not Train or Retain Success": { + "Type": "Succeed" + }, + "GetParameter": { + "Type": "Task", + "Next": "Attempt Retrain Predictor", + "Parameters": { + "Name": "/forecast/${AWS::StackName}/Predictor/ReferenceArn" + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameter", + "ResultPath": "$.GetReference" + }, + "Attempt Retrain Predictor": { + "Type": "Task", + "Parameters": { + "PredictorName.$": "States.Format('{}_{}', $.DatasetGroupName ,$.CreateRuntimeMetadata.Payload.timeKey)", + "ReferencePredictorArn.$": "$.GetReference.Parameter.Value", + "Tags": [ + { + "Key": "MLOpsPublishDate", + "Value": "20220909" + } + ] + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createAutoPredictor", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Comment": "All Errors", + "Next": "SNS Predictor Fail", + "ResultPath": "$.RetrainPredictorCatch" + } + ], + "Next": "DescribeAutoPredictor", + "ResultPath": "$.CreateAutoPredictor" + } + } + } + RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/ForecastStepFunctionExecutionRole" + + CreateForecastStateMachine: + Type: AWS::StepFunctions::StateMachine + Properties: + StateMachineName: !Sub "${AWS::StackName}-Create-Forecast" + DefinitionString: + !Sub | + { + "Comment": "An automation pipeline to generate prediction data from an Amazon Forecast predictor", + "StartAt": "GetParameters", + "States": { + "GetParameters": { + "Type": "Task", + "Next": "Produce Forecast", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetGroupName", + "/forecast/${AWS::StackName}/DatasetGroup/S3Bucket", + "/forecast/${AWS::StackName}/Forecast/ForecastTypes", + "/forecast/${AWS::StackName}/Forecast/Generate", + "/forecast/${AWS::StackName}/Forecast/PredictorArn" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetGroupName.$": "$.Parameters[0].Value", + "S3Bucket.$": "$.Parameters[1].Value", + "ForecastTypes.$": "$.Parameters[2].Value", + "GenerateForecast.$": "$.Parameters[3].Value", + "PredictorArn.$": "$.Parameters[4].Value" + } + }, + "Produce Forecast": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.GenerateForecast", + "StringEquals": "TRUE", + "Next": "Lambda Invoke" + } + ], + "Default": "Success - No Forecast Produced" + }, + "Lambda Invoke": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload.$": "$", + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:GetForecastMetadata:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Next": "CreateForecast", + "ResultPath": "$.CreateRuntimeMetadata" + }, + "Success - No Forecast Produced": { + "Type": "Succeed" + }, + "CreateForecast": { + "Type": "Task", + "Parameters": { + "PredictorArn.$": "$.PredictorArn", + "ForecastName.$": "States.Format('{}_{}', $.DatasetGroupName, $.CreateRuntimeMetadata.Payload.timeKey)", + "ForecastTypes.$": "States.StringToJson($.ForecastTypes)", + "Tags": [ + { + "Key": "MLOpsPublishDate", + "Value": "20220909" + } + ] + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createForecast", + "ResultPath": "$.CreateForecast", + "Next": "DescribeForecast", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Fail" + } + ] + }, + "DescribeForecast": { + "Type": "Task", + "Next": "Forecast Active", + "Parameters": { + "ForecastArn.$": "$.CreateForecast.ForecastArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:describeForecast", + "ResultPath": "$.DescribeForecast", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Fail" + } + ] + }, + "Forecast Active": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.DescribeForecast.Status", + "StringEquals": "ACTIVE", + "Next": "CreateForecastExportJob" + }, + { + "Or": [ + { + "Variable": "$.DescribeForecast.Status", + "StringEquals": "CREATE_IN_PROGRESS" + }, + { + "Variable": "$.DescribeForecast.Status", + "StringEquals": "CREATE_PENDING" + } + ], + "Next": "Wait for Forecast Active" + } + ], + "Default": "SNS Fail" + }, + "CreateForecastExportJob": { + "Type": "Task", + "Parameters": { + "Destination": { + "S3Config": { + "Path.$": "States.Format('{}{}{}','s3://',$.S3Bucket,'/${AWS::StackName}/forecast/')", + "RoleArn": "arn:aws:iam::${AWS::AccountId}:role/ForecastProcessorLambdaExecutionRole" + } + }, + "ForecastArn.$": "$.DescribeForecast.ForecastArn", + "ForecastExportJobName.$": "$.DescribeForecast.ForecastName" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:createForecastExportJob", + "ResultPath": "$.CreateForecastExportJob", + "Next": "DescribeForecastExportJob", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Fail" + } + ] + }, + "DescribeForecastExportJob": { + "Type": "Task", + "Next": "Forecast Export Active", + "Parameters": { + "ForecastExportJobArn.$": "$.CreateForecastExportJob.ForecastExportJobArn" + }, + "Resource": "arn:aws:states:::aws-sdk:forecast:describeForecastExportJob", + "ResultPath": "$.DescribeForecastExportJob", + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "Next": "SNS Fail" + } + ] + }, + "Forecast Export Active": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.DescribeForecastExportJob.Status", + "StringEquals": "ACTIVE", + "Next": "SNS Success" + }, + { + "Or": [ + { + "Variable": "$.DescribeForecastExportJob.Status", + "StringEquals": "CREATE_IN_PROGRESS" + }, + { + "Variable": "$.DescribeForecastExportJob.Status", + "StringEquals": "CREATE_PENDING" + } + ], + "Next": "Wait for Forecast Export Job Active" + } + ], + "Default": "SNS Fail" + }, + "SNS Fail": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "Message.$": "$", + "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${AWS::StackName}" + }, + "Next": "Fail" + }, + "SNS Success": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "Message.$": "$", + "TopicArn": "arn:aws:sns:${AWS::Region}:${AWS::AccountId}:${AWS::StackName}" + }, + "End": true + }, + "Fail": { + "Type": "Fail" + }, + "Wait for Forecast Export Job Active": { + "Type": "Wait", + "Seconds": 120, + "Next": "DescribeForecastExportJob" + }, + "Wait for Forecast Active": { + "Type": "Wait", + "Seconds": 180, + "Next": "DescribeForecast" + } + } + } + RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/ForecastStepFunctionExecutionRole" + + AthenaConnectorStateMachine: + Type: AWS::StepFunctions::StateMachine + Properties: + StateMachineName: !Sub "${AWS::StackName}-Athena-Connector" + DefinitionString: + !Sub | + { + "Comment": "A description of my state machine", + "StartAt": "GetParameters", + "States": { + "GetParameters": { + "Type": "Task", + "Next": "Parallel", + "Parameters": { + "Names": [ + "/forecast/${AWS::StackName}/DatasetGroup/DatasetGroupName", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeItem", + "/forecast/${AWS::StackName}/DatasetGroup/DatasetIncludeRTS", + "/forecast/${AWS::StackName}/DatasetGroup/QueryITEM", + "/forecast/${AWS::StackName}/DatasetGroup/QueryRTS", + "/forecast/${AWS::StackName}/DatasetGroup/QueryTTS", + "/forecast/${AWS::StackName}/DatasetGroup/S3Bucket" + ] + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameters", + "ResultSelector": { + "DatasetGroupName.$": "$.Parameters[0].Value", + "DatasetIncludeItem.$": "$.Parameters[1].Value", + "DatasetIncludeRTS.$": "$.Parameters[2].Value", + "QueryITEM.$": "$.Parameters[3].Value", + "QueryRTS.$": "$.Parameters[4].Value", + "QueryTTS.$": "$.Parameters[5].Value", + "DatasetS3Bucket.$": "$.Parameters[6].Value" + }, + "ResultPath": "$.getParameters" + }, + "Parallel": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Reset S3 TTS", + "States": { + "Reset S3 TTS": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastPurgeS3Folder:$LATEST", + "Payload": { + "BucketName.$": "$.getParameters.DatasetS3Bucket", + "Prefix.$": "States.Format('{}{}',$.getParameters.DatasetGroupName,'/tts')" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "ResultPath": null, + "Next": "Start TTS Query" + }, + "Start TTS Query": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:startQueryExecution", + "Parameters": { + "QueryString.$": "$.getParameters.QueryTTS", + "WorkGroup": "primary", + "ResultConfiguration": { + "OutputLocation.$": "States.Format('{}{}{}{}{}','s3://',$.getParameters.DatasetS3Bucket,'/', $.getParameters.DatasetGroupName,'/tts')" + } + }, + "ResultPath": "$.QueryExecutionIdTTS", + "Next": "Get TTS Query Status" + }, + "Get TTS Query Status": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:getQueryExecution", + "Parameters": { + "QueryExecutionId.$": "$.QueryExecutionIdTTS.QueryExecutionId" + }, + "ResultPath": "$.getTTSQueryExecutionResults", + "Next": "Evaluate TTS Query Status" + }, + "Evaluate TTS Query Status": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.getTTSQueryExecutionResults.QueryExecution.Status.State", + "StringEquals": "SUCCEEDED", + "Next": "CSV Process S3 TTS" + }, + { + "Variable": "$.getTTSQueryExecutionResults.QueryExecution.Status.State", + "StringEquals": "FAILED", + "Next": "TTS Fail" + } + ], + "Default": "TTS Wait Query" + }, + "CSV Process S3 TTS": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastRemoveAthenaQuotes:$LATEST", + "Payload": { + "BucketName.$": "$.getParameters.DatasetS3Bucket", + "Prefix.$": "States.Format('{}{}',$.getParameters.DatasetGroupName,'/tts')" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "End": true + }, + "TTS Wait Query": { + "Type": "Wait", + "Seconds": 15, + "Next": "Get TTS Query Status" + }, + "TTS Fail": { + "Type": "Fail" + } + } + }, + { + "StartAt": "Fetch RTS", + "States": { + "Fetch RTS": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.getParameters.DatasetIncludeRTS", + "StringEquals": "true", + "Next": "Reset S3 RTS" + } + ], + "Default": "No RTS Required" + }, + "No RTS Required": { + "Type": "Pass", + "End": true + }, + "Reset S3 RTS": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastPurgeS3Folder:$LATEST", + "Payload": { + "BucketName.$": "$.getParameters.DatasetS3Bucket", + "Prefix.$": "States.Format('{}{}',$.getParameters.DatasetGroupName,'/rts')" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "ResultPath": null, + "Next": "Start RTS Query" + }, + "Start RTS Query": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:startQueryExecution", + "Parameters": { + "QueryString.$": "$.getParameters.QueryRTS", + "WorkGroup": "primary", + "ResultConfiguration": { + "OutputLocation.$": "States.Format('{}{}{}{}{}','s3://',$.getParameters.DatasetS3Bucket,'/', $.getParameters.DatasetGroupName,'/rts')" + } + }, + "ResultPath": "$.QueryExecutionIdRTS", + "Next": "Get RTS Query Status" + }, + "Get RTS Query Status": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:getQueryExecution", + "Parameters": { + "QueryExecutionId.$": "$.QueryExecutionIdRTS.QueryExecutionId" + }, + "ResultPath": "$.getRTSQueryExecutionResults", + "Next": "Evaluate RTS Query Status" + }, + "Evaluate RTS Query Status": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.getRTSQueryExecutionResults.QueryExecution.Status.State", + "StringEquals": "SUCCEEDED", + "Next": "CSV Process S3 RTS" + }, + { + "Variable": "$.getRTSQueryExecutionResults.QueryExecution.Status.State", + "StringEquals": "FAILED", + "Next": "RTS Fail" + } + ], + "Default": "Wait RTS Query" + }, + "CSV Process S3 RTS": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastRemoveAthenaQuotes:$LATEST", + "Payload": { + "BucketName.$": "$.getParameters.DatasetS3Bucket", + "Prefix.$": "States.Format('{}{}',$.getParameters.DatasetGroupName,'/rts')" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "End": true + }, + "Wait RTS Query": { + "Type": "Wait", + "Seconds": 15, + "Next": "Get RTS Query Status" + }, + "RTS Fail": { + "Type": "Fail" + } + } + }, + { + "StartAt": "Fetch Item", + "States": { + "Fetch Item": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.getParameters.DatasetIncludeItem", + "StringEquals": "true", + "Next": "Reset S3 Item" + } + ], + "Default": "No Item Required" + }, + "No Item Required": { + "Type": "Pass", + "End": true + }, + "Reset S3 Item": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastPurgeS3Folder:$LATEST", + "Payload": { + "BucketName.$": "$.getParameters.DatasetS3Bucket", + "Prefix.$": "States.Format('{}{}',$.getParameters.DatasetGroupName,'/item')" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "ResultPath": null, + "Next": "Start ITEM Query" + }, + "Start ITEM Query": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:startQueryExecution", + "Parameters": { + "QueryString.$": "$.getParameters.QueryITEM", + "WorkGroup": "primary", + "ResultConfiguration": { + "OutputLocation.$": "States.Format('{}{}{}{}{}','s3://',$.getParameters.DatasetS3Bucket,'/', $.getParameters.DatasetGroupName,'/item')" + } + }, + "ResultPath": "$.QueryExecutionIdITEM", + "Next": "Get ITEM Query Status" + }, + "Get ITEM Query Status": { + "Type": "Task", + "Resource": "arn:aws:states:::athena:getQueryExecution", + "Parameters": { + "QueryExecutionId.$": "$.QueryExecutionIdITEM.QueryExecutionId" + }, + "ResultPath": "$.getITEMQueryExecutionResults", + "Next": "Evaluate ITEM Query Status" + }, + "Evaluate ITEM Query Status": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.getITEMQueryExecutionResults.QueryExecution.Status.State", + "StringEquals": "SUCCEEDED", + "Next": "CSV Process S3 Item" + }, + { + "Variable": "$.getITEMQueryExecutionResults.QueryExecution.Status.State", + "StringEquals": "FAILED", + "Next": "ITEM Fail" + } + ], + "Default": "ITEM Wait Query" + }, + "CSV Process S3 Item": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastRemoveAthenaQuotes:$LATEST", + "Payload": { + "BucketName.$": "$.getParameters.DatasetS3Bucket", + "Prefix.$": "States.Format('{}{}',$.getParameters.DatasetGroupName,'/item')" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "End": true + }, + "ITEM Wait Query": { + "Type": "Wait", + "Seconds": 15, + "Next": "Get ITEM Query Status" + }, + "ITEM Fail": { + "Type": "Fail" + } + } + } + ], + "End": true + } + } + } + + RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/ForecastStepFunctionExecutionRole" + + StepFunctionWorkflowStateMachine: + Type: "AWS::StepFunctions::StateMachine" + Properties: + StateMachineName: !Sub "${AWS::StackName}-Workflow" + DefinitionString: + !Sub | + { + "Comment": "An automation pipeline, edit for use case prior to execution", + "StartAt": "Athena-Connector", + "States": { + "Athena-Connector": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${AWS::StackName}-Athena-Connector", + "Input": { + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + } + }, + "Next": "Import-Dataset" + }, + "Import-Dataset": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${AWS::StackName}-Import-Dataset", + "Input": { + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + } + }, + "Next": "Create-Predictor" + }, + "Create-Predictor": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${AWS::StackName}-Create-Predictor", + "Input": { + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + } + }, + "Next": "Select-Best-Predictor" + }, + "Select-Best-Predictor": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:ForecastSelectPredictor:$LATEST", + "Payload": { + "StackName": "${AWS::StackName}" + } + }, + "Next": "Create-Forecast", + "Catch": [ + { + "ErrorEquals": [ + "States.HeartbeatTimeout" + ], + "Next": "Create-Forecast", + "Comment": "Heartbeat" + } + ], + "HeartbeatSeconds": 15 + }, + "Create-Forecast": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "arn:aws:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${AWS::StackName}-Create-Forecast", + "Input": { + "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id" + } + }, + "End": true + } + } + } + + RoleArn: !Sub "arn:aws:iam::${AWS::AccountId}:role/StepFunctions-${AWS::StackName}-Workflow-Role" diff --git a/ml_ops/continuous_deployment/seed/prod-config.json b/ml_ops/continuous_deployment/seed/prod-config.json new file mode 100644 index 0000000..c5daad4 --- /dev/null +++ b/ml_ops/continuous_deployment/seed/prod-config.json @@ -0,0 +1,24 @@ +{ + "Parameters": { + "S3Bucket": "forecast-mlops-prod", + "SNSEndpoint": "@example.com", + "DatasetGroupFrequencyRTS": "W", + "DatasetGroupFrequencyTTS": "W", + "DatasetGroupName": "fooddemo_prod", + "DatasetIncludeItem": "true", + "DatasetIncludeRTS": "true", + "ForecastForecastTypes": "[\"0.50\"]", + "PredictorExplainPredictor": "true", + "PredictorForecastDimensions": "[\"location_id\"]", + "PredictorForecastFrequency": "W", + "PredictorForecastHorizon": "10", + "PredictorForecastOptimizationMetric": "AverageWeightedQuantileLoss", + "PredictorForecastTypes": "[\"0.30\",\"0.40\",\"0.50\",\"0.60\",\"0.70\"]", + "TimestampFormatRTS": "yyyy-MM-dd", + "TimestampFormatTTS": "yyyy-MM-dd", + "PredictorAttributeConfigs": "[{\"AttributeName\":\"checkout_price\",\"Transformations\":{\"backfill\":\"mean\",\"futurefill\":\"mean\",\"middlefill\":\"mean\"}},{\"AttributeName\":\"base_price\",\"Transformations\":{\"backfill\":\"mean\",\"futurefill\":\"mean\",\"middlefill\":\"mean\"}},{\"AttributeName\":\"emailer_for_promotion\",\"Transformations\":{\"backfill\":\"zero\",\"futurefill\":\"zero\",\"middlefill\":\"zero\"}},{\"AttributeName\":\"homepage_featured\",\"Transformations\":{\"backfill\":\"zero\",\"futurefill\":\"zero\",\"middlefill\":\"zero\"}},{\"AttributeName\":\"target_value\",\"Transformations\":{\"aggregation\":\"sum\",\"backfill\":\"nan\",\"frontfill\":\"none\",\"middlefill\":\"nan\"}}]", + "SchemaITEM": "{\"Attributes\":[{\"AttributeName\":\"item_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"food_category\",\"AttributeType\":\"string\"},{\"AttributeName\":\"food_cuisine\",\"AttributeType\":\"string\"}]}", + "SchemaRTS": "{\"Attributes\":[{\"AttributeName\":\"location_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"item_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"checkout_price\",\"AttributeType\":\"float\"},{\"AttributeName\":\"base_price\",\"AttributeType\":\"float\"},{\"AttributeName\":\"emailer_for_promotion\",\"AttributeType\":\"integer\"},{\"AttributeName\":\"homepage_featured\",\"AttributeType\":\"integer\"},{\"AttributeName\":\"timestamp\",\"AttributeType\":\"timestamp\"}]}", + "SchemaTTS": "{\"Attributes\":[{\"AttributeName\":\"location_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"item_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"target_value\",\"AttributeType\":\"integer\"},{\"AttributeName\":\"timestamp\",\"AttributeType\":\"timestamp\"}]}" + } +} \ No newline at end of file diff --git a/ml_ops/continuous_deployment/seed/prod-dep-config.json b/ml_ops/continuous_deployment/seed/prod-dep-config.json new file mode 100644 index 0000000..7a9764a --- /dev/null +++ b/ml_ops/continuous_deployment/seed/prod-dep-config.json @@ -0,0 +1,6 @@ +{ + "Parameters": { + "S3Bucket": "forecast-mlops-prod", + "ExistingS3Bucket": "false" + } +} diff --git a/ml_ops/continuous_deployment/seed/staging-config.json b/ml_ops/continuous_deployment/seed/staging-config.json new file mode 100644 index 0000000..09d9db4 --- /dev/null +++ b/ml_ops/continuous_deployment/seed/staging-config.json @@ -0,0 +1,24 @@ +{ + "Parameters": { + "S3Bucket": "forecast-mlops-staging", + "SNSEndpoint": "@example.com", + "DatasetGroupFrequencyRTS": "W", + "DatasetGroupFrequencyTTS": "W", + "DatasetGroupName": "fooddemo_staging", + "DatasetIncludeItem": "true", + "DatasetIncludeRTS": "true", + "ForecastForecastTypes": "[\"0.50\"]", + "PredictorExplainPredictor": "true", + "PredictorForecastDimensions": "[\"location_id\"]", + "PredictorForecastFrequency": "W", + "PredictorForecastHorizon": "10", + "PredictorForecastOptimizationMetric": "AverageWeightedQuantileLoss", + "PredictorForecastTypes": "[\"0.30\",\"0.40\",\"0.50\",\"0.60\",\"0.70\"]", + "TimestampFormatRTS": "yyyy-MM-dd", + "TimestampFormatTTS": "yyyy-MM-dd", + "PredictorAttributeConfigs": "[{\"AttributeName\":\"checkout_price\",\"Transformations\":{\"backfill\":\"mean\",\"futurefill\":\"mean\",\"middlefill\":\"mean\"}},{\"AttributeName\":\"base_price\",\"Transformations\":{\"backfill\":\"mean\",\"futurefill\":\"mean\",\"middlefill\":\"mean\"}},{\"AttributeName\":\"emailer_for_promotion\",\"Transformations\":{\"backfill\":\"zero\",\"futurefill\":\"zero\",\"middlefill\":\"zero\"}},{\"AttributeName\":\"homepage_featured\",\"Transformations\":{\"backfill\":\"zero\",\"futurefill\":\"zero\",\"middlefill\":\"zero\"}},{\"AttributeName\":\"target_value\",\"Transformations\":{\"aggregation\":\"sum\",\"backfill\":\"nan\",\"frontfill\":\"none\",\"middlefill\":\"nan\"}}]", + "SchemaITEM": "{\"Attributes\":[{\"AttributeName\":\"item_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"food_category\",\"AttributeType\":\"string\"},{\"AttributeName\":\"food_cuisine\",\"AttributeType\":\"string\"}]}", + "SchemaRTS": "{\"Attributes\":[{\"AttributeName\":\"location_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"item_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"checkout_price\",\"AttributeType\":\"float\"},{\"AttributeName\":\"base_price\",\"AttributeType\":\"float\"},{\"AttributeName\":\"emailer_for_promotion\",\"AttributeType\":\"integer\"},{\"AttributeName\":\"homepage_featured\",\"AttributeType\":\"integer\"},{\"AttributeName\":\"timestamp\",\"AttributeType\":\"timestamp\"}]}", + "SchemaTTS": "{\"Attributes\":[{\"AttributeName\":\"location_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"item_id\",\"AttributeType\":\"string\"},{\"AttributeName\":\"target_value\",\"AttributeType\":\"integer\"},{\"AttributeName\":\"timestamp\",\"AttributeType\":\"timestamp\"}]}" + } +} \ No newline at end of file diff --git a/ml_ops/continuous_deployment/seed/staging-dep-config.json b/ml_ops/continuous_deployment/seed/staging-dep-config.json new file mode 100644 index 0000000..69c09b6 --- /dev/null +++ b/ml_ops/continuous_deployment/seed/staging-dep-config.json @@ -0,0 +1,6 @@ +{ + "Parameters": { + "S3Bucket": "forecast-mlops-staging", + "ExistingS3Bucket": "false" + } +}