AWS Step Functions with CDK

In this post, you learn how to use AWS CDK to create a basic yet powerful workflow in AWS. You can use this workflow to implement very complex applications in AWS and reduce the complexity of your microservice-based application. The components used for this application are, AWS API Gateway which receives http requests as an entrance to our application, AWS Lambda where we implement our microservices, Dynamodb a NoSQL Database where we store our data and finally AWS Step Functions where we specify the order of execution.

If you are a developer in an organization, you probably don’t know what is exactly happening on the business side. You could use the new feature of Step Functions, where you visually specify the workflow for a use case, to communicate more effectively with your colleagues.

Step Functions has 6 different types of state. Firstly, you have the Start and End state. Then there is the Task state. In this State, you specify what piece of work needs to be done. You could refer the tasks states (more commonly) to Lambda functions, but also to EC2 instances and some other AWS resources. You can also refer to another Step function which enables you implement very complex scenarios in multiple abstraction layers. In order to specify condition and branching in your workflow, you can use Choice States. Using Parallel States you can run one or a series of tasks in parallel. If you need to wait a certain amount of time after a certain task before you can proceed to the next one, you can leverage Wait State. Finally, you have Fail and Success states.

Using the new rich UI of designing workflow with step functions, you can easily drag the mentioned tasks from the Flow portion of the diagram. Using the Actions portion you could specify what type of work you want to do and what service must perform it. image info

Let’s add some very basic tasks for the purpose of this demonstration. What we want to achieve is the following. An API Gateway which receives requests from our client applications, forwards them to our workflow. Inside our workflow we just use a lambda function to store that into a DB and after that we send some email notification. Obviously you can achieve very complex workflows with Step Functions, but our goal here is, showing how CDK can help building Enterprise grade applications. You definitely need a CICD chain when you change something in your application and integrating such a GUI into that chain is pretty unconventional. However, this GUI really helps you communicate your ideas and understanding of the business with your colleagues with non-IT background. So here is a very basic workflow: image info We receive some data from another lambda function, which itself is triggered by API Gateway. We persist the data in DynamoDB and send an email notification, saying an order received.

First thing’s first, let’s initialize an empty CDK typescript project: cdk init app –language=typescript.

Then we go to package.json to declare the dependencies for our project. Add the following modules to the dependencies:

"@aws-cdk/aws-apigateway": "*",
"@aws-cdk/aws-dynamodb": "*",
"@aws-cdk/aws-lambda": "*",
"@aws-cdk/aws-sns": "*",
"@aws-cdk/aws-sns-subscriptions": "*",
"@aws-cdk/aws-stepfunctions": "*",
"@aws-cdk/aws-stepfunctions-tasks": "*",
"@aws-cdk/core": "*",

This is an optional step, but go ahead and run “npm install” in the root directory of your project. This will download the dependencies and give you intellisense which comes very handy. The go to the lib folder and the typescript file for your application stack.

Add the following import statements to before class declaration:

import cdk = require(“@aws-cdk/core”); import apigateway = require(“@aws-cdk/aws-apigateway”); import dynamodb = require(“@aws-cdk/aws-dynamodb”); import lambda = require(“@aws-cdk/aws-lambda”); import sfn = require(“@aws-cdk/aws-stepfunctions”); import tasks = require(“@aws-cdk/aws-stepfunctions-tasks”); import sns = require(“@aws-cdk/aws-sns”); import subscriptions = require(“@aws-cdk/aws-sns-subscriptions”); import { TaskInput } from “@aws-cdk/aws-stepfunctions”;

In order to create our database, add the following:

const dynamoTable=new dynamodb.Table(this,'stepfuncDynamoDB',{
  partitionKey:{
    name:'itemId',
    type:dynamodb.AttributeType.STRING
  },
    removalPolicy:RemovalPolicy.RETAIN
});

Here we defined a partition key ‘itemId’ of type string, and the removal policy of our database. As data is very sensitive (of course, not for this demo but in real life), we chose the Retain policy if our stack gets destroyed. You could also chose Snapshot, or destroy policy if you like. Then we create a lambda function which is responsible for persisting the data into DB

const createLambda = new lambda.Function(this, 'createItem',{
  code: new lambda.AssetCode('./src'),
  handler:'create.handler',
  runtime:lambda.Runtime.NODEJS_10_X,
  environment:{
    TABLE_NAME: dynamoTable.tableName,
    PRIMARY_KEY:'itemId'
  },
});

We also generate the necessary IAM Role with the following:

dynamoTable.grantReadWriteData(createLambda);

Then we create an SNS notification topic and subscription:

const stepfuncTopic = new sns.Topic(this, 'stepfuncTopic');
stepfuncTopic.addSubscription(new subscriptions.EmailSubscription('your email'));

Now we define each step that our step function has:

const createItem = new tasks.LambdaInvoke(this, 'Create Item', {
  lambdaFunction: createLambda,
  outputPath: '$.Payload',
});

const snsTask=new tasks.SnsPublish(this, "snstask", {
  topic: stepfuncTopic,
  message: TaskInput.fromText('new request arrived!')
});

Each of these steps are discrete and still are not chained together. To do that, we add a definition and then create a state machine from that:

const definition = createItem
  .next(snsTask);
  
const stateMachine = new sfn.StateMachine(this, 'SimpleStepFunc', {
  definition,
  timeout: cdk.Duration.minutes(5),
});

As mentioned, we need another lambda function which triggers our step function:

const stepfuncStarterLambda = new lambda.Function(this, 'stepfuncStarterLambdaitems',{
  code: new lambda.AssetCode('./src'),
  handler:'stepfunc-starter.handler',
  runtime:lambda.Runtime.NODEJS_10_X,
  environment:{
    STEP_FUNCTION_ARN: stateMachine.stateMachineArn
  }
});
// Grant lambda execution roles
createLambda.grantInvoke(stateMachine.role);
stateMachine.grantStartExecution(stepfuncStarterLambda);

Then we add our API definition and hang our step function starter lambda to one of its methods:

const api = new apigateway.RestApi(this,' stepfuncApi',{
  restApiName:'my step function api'
});
const rootApi=api.root.addResource('start');

const stepfuncStarterLambdaApi=new apigateway.LambdaIntegration(stepfuncStarterLambda);
rootApi.addMethod('POST',stepfuncStarterLambdaApi);

const plan = api.addUsagePlan('UsagePlan', {
  name:'EASY',
  throttle:{
    rateLimit:20,
    burstLimit:2
  }
});
const key =api.addApiKey('ApiKey');
plan.addApiKey(key);

For the lambda function that persists the data we add the following:

import * as AWS from 'aws-sdk';

const TABLE_NAME = process.env.TABLE_NAME || '';
const PRIMARY_KEY = process.env.PRIMARY_KEY || '';

const db = new AWS.DynamoDB.DocumentClient();
export const handler = async (event: any = {}): Promise<any> => {
  const item = event.body;
  const ID = String.fromCharCode(65 + Math.floor(Math.random() * 26));
  console.log(ID);
  item[PRIMARY_KEY] = ID;
  const params = {
    TableName: TABLE_NAME,
    Item: item
  };

  try {
    await db.put(params).promise();
    return { statusCode: 201, body: 'success' };
  } catch (dbError) {
    const errorResponse = dbError.code === 'ValidationException' && dbError.message.includes('reserved keyword') ?
      DYNAMODB_EXECUTION_ERROR : RESERVED_RESPONSE;
    return { statusCode: 500, body: errorResponse };
  }
};

For the one which starts the step function:

const AWS = require('aws-sdk');
const StepFunctions = new AWS.StepFunctions();
const STEP_FUNCTION_ARN = process.env.STEP_FUNCTION_ARN;

const stepfunctions = new StepFunctions();

export async function handler(event: any, context: any) {
  const ID = String.fromCharCode(65 + Math.floor(Math.random()*26));
  stepfunctions.startExecution({
    stateMachineArn: STEP_FUNCTION_ARN,
    name: ID,
    input: JSON.stringify({msg: event.body})
  })

};

That was a lot of steps to perform. But once you have this construct, you can easily add as many discrete steps as you want and extend the step function.

This project is maintained by pedramha